Bundle signal alert notifications
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 1m4s
Deploy / deploy (push) Successful in 35s

This commit is contained in:
2026-07-03 13:32:59 +02:00
parent d4ccea2d69
commit eaad935a2a
2 changed files with 198 additions and 21 deletions
+120 -21
View File
@@ -75,6 +75,13 @@ COOLDOWN_HOURS = 72 # don't re-send the same key within this window
DIGEST_HOUR_UTC = 22 # send the daily digest on the first run at/after this hour
WATERMARK_TYPE = "score_watermark"
SIGNAL_BUNDLE_ALERT_TYPES = ("qualified", "sr_proximity", "score_drop")
SIGNAL_BUNDLE_SECTIONS = (
("qualified", "Qualified setups"),
("sr_proximity", "Near support/resistance"),
("score_drop", "Score drops"),
)
SIGNAL_BUNDLE_MAX_CHARS = 3900 # Telegram limit is 4096; keep room for HTML parsing
# Regime quadrant-change alert: (regime index x early-warning) quadrant.
# Hysteresis (a deadband around each divider) stops a point sitting on a boundary
@@ -91,6 +98,9 @@ QUAD_LABELS = {
"4": "④ Real downturn",
}
AlertItem = tuple[str, str, str] # alert_type, dedup_key, text
AlertLogRef = tuple[str, str] # alert_type, dedup_key
def _as_bool(value: str | None, default: bool) -> bool:
if value is None:
@@ -261,14 +271,30 @@ async def _qualified_setups(db: AsyncSession) -> list[dict]:
return [s for s in setups if setup_qualifies(SimpleNamespace(**s), config)]
def _fmt_price(value: float | int | None) -> str:
return "n/a" if value is None else f"{float(value):.2f}"
def _fmt_signed_move(from_price: float | int | None, to_price: float | int | None) -> str:
if from_price is None or to_price is None:
return "n/a"
from_float = float(from_price)
if from_float == 0:
return "n/a"
pct = (float(to_price) - from_float) / from_float * 100.0
return f"{pct:+.1f}%"
def _format_qualified(s: dict) -> str:
prob = best_target_probability(SimpleNamespace(**s))
arrow = "🟢" if s["direction"] == "long" else "🔴"
current = s.get("current_price") or s.get("entry_price")
return (
f"{arrow} <b>{s['symbol']} {s['direction'].upper()}</b> — qualified setup\n"
f"entry {s['entry_price']:.2f} → target {s['target']:.2f} "
f"(R:R {s['rr_ratio']:.1f}:1)\n"
f"confidence {(s.get('confidence_score') or 0):.0f}% · P(target) {prob:.0f}%"
f"{arrow} <b>{s['symbol']} {s['direction'].upper()}</b> | "
f"now {_fmt_price(current)} | entry {_fmt_price(s['entry_price'])} | "
f"target {_fmt_price(s['target'])} ({_fmt_signed_move(current, s['target'])}) | "
f"stop {_fmt_price(s['stop_loss'])} | R:R {s['rr_ratio']:.1f} | "
f"conf {(s.get('confidence_score') or 0):.0f}% | P(target) {prob:.0f}%"
)
@@ -291,6 +317,34 @@ async def _latest_close(db: AsyncSession, ticker_id: int) -> float | None:
return float(row[0]) if row else None
def _sr_zone_label(zone: dict) -> str:
return (
f"{zone['low']:.2f}{zone['high']:.2f}"
if zone["level_count"] > 1
else f"{zone['midpoint']:.2f}"
)
def _sr_touch_price(zone: dict, current_price: float) -> float:
low = float(zone["low"])
high = float(zone["high"])
if current_price < low:
return low
if current_price > high:
return high
return float(zone["midpoint"])
def _format_sr_proximity(symbol: str, zone: dict, current_price: float) -> str:
touch_price = _sr_touch_price(zone, current_price)
return (
f"📍 <b>{symbol}</b> {zone['type']} | "
f"now {_fmt_price(current_price)} -> {_sr_zone_label(zone)} "
f"({_fmt_signed_move(current_price, touch_price)}) | "
f"strength {float(zone['strength']):.0f}"
)
async def _collect_sr_proximity(db: AsyncSession) -> list[tuple[str, str]]:
"""One alert per watchlist ticker for the NEAREST strong S/R zone within range.
@@ -320,21 +374,12 @@ async def _collect_sr_proximity(db: AsyncSession) -> list[tuple[str, str]]:
# Nearest strong zone only.
nearest = min(strong, key=lambda z: abs(price - z["midpoint"]))
dist_pct = abs(price - nearest["midpoint"]) / price * 100
dist_pct = abs(price - _sr_touch_price(nearest, price)) / price * 100
if dist_pct > SR_PROXIMITY_PCT:
continue
label = (
f"{nearest['low']:.2f}{nearest['high']:.2f}"
if nearest["level_count"] > 1
else f"{nearest['midpoint']:.2f}"
)
key = f"sr:{symbol}:{nearest['type']}" # one per side per ticker per cooldown
out.append((
key,
f"📍 <b>{symbol}</b> approaching {nearest['type']} {label} "
f"(now {price:.2f}, {dist_pct:.1f}% away)",
))
out.append((key, _format_sr_proximity(symbol, nearest, price)))
return out
@@ -552,6 +597,49 @@ async def _collect_regime_quadrant(db: AsyncSession) -> list[tuple[str, str]]:
# Dispatch
# ---------------------------------------------------------------------------
def _signal_bundle_messages(items: list[AlertItem]) -> list[tuple[list[AlertLogRef], str]]:
if not items:
return []
by_type: dict[str, list[AlertItem]] = {key: [] for key in SIGNAL_BUNDLE_ALERT_TYPES}
for item in items:
by_type.setdefault(item[0], []).append(item)
total = sum(len(group) for group in by_type.values())
header = f"📣 <b>Signal run</b> — {total} new alert(s)"
bundles: list[tuple[list[AlertLogRef], str]] = []
lines = [header]
logs: list[AlertLogRef] = []
current_section: str | None = None
def flush() -> None:
nonlocal lines, logs, current_section
if logs:
bundles.append((logs.copy(), "\n".join(lines)))
lines = [f"{header} (continued)"]
logs = []
current_section = None
for alert_type, section_title in SIGNAL_BUNDLE_SECTIONS:
for item_type, key, text in by_type.get(alert_type, []):
block: list[str] = []
if current_section != alert_type:
block.extend(["", f"<b>{section_title}</b>"])
block.append(text)
if logs and len("\n".join(lines + block)) > SIGNAL_BUNDLE_MAX_CHARS:
flush()
block = ["", f"<b>{section_title}</b>", text]
lines.extend(block)
logs.append((item_type, key))
current_section = alert_type
if logs:
bundles.append((logs.copy(), "\n".join(lines)))
return bundles
async def dispatch_alerts(db: AsyncSession) -> dict:
"""Gather all enabled triggers, dedup, and push to Telegram. Job entrypoint."""
cfg = await _resolve(db)
@@ -560,22 +648,23 @@ async def dispatch_alerts(db: AsyncSession) -> dict:
if not cfg["token"] or not cfg["chat_id"]:
return {"status": "no_credentials", "sent": 0}
outgoing: list[tuple[str, str, str]] = [] # (alert_type, key, text)
signal_outgoing: list[AlertItem] = []
outgoing: list[AlertItem] = []
if cfg["qualified"]:
for key, text in await _collect_qualified(db):
if not await _recently_alerted(db, "qualified", key):
outgoing.append(("qualified", key, text))
signal_outgoing.append(("qualified", key, text))
if cfg["sr"]:
for key, text in await _collect_sr_proximity(db):
if not await _recently_alerted(db, "sr_proximity", key):
outgoing.append(("sr_proximity", key, text))
signal_outgoing.append(("sr_proximity", key, text))
if cfg["score_drop"]:
# also seeds/advances watermarks as a side effect
for key, text in await _collect_score_drops(db):
outgoing.append(("score_drop", key, text))
signal_outgoing.append(("score_drop", key, text))
if cfg["digest"]:
digest = await _collect_digest(db)
@@ -593,8 +682,18 @@ async def dispatch_alerts(db: AsyncSession) -> dict:
outgoing.append((TRADE_CLOSED_TYPE, key, text))
sent = 0
if outgoing:
candidates = len(signal_outgoing) + len(outgoing)
if signal_outgoing or outgoing:
async with httpx.AsyncClient(timeout=15) as client:
for log_refs, text in _signal_bundle_messages(signal_outgoing):
try:
await _send(client, cfg["token"], cfg["chat_id"], text)
for alert_type, key in log_refs:
_log_alert(db, alert_type, key)
sent += 1
except Exception:
logger.exception("Failed to send signal alert bundle")
for alert_type, key, text in outgoing:
try:
await _send(client, cfg["token"], cfg["chat_id"], text)
@@ -604,7 +703,7 @@ async def dispatch_alerts(db: AsyncSession) -> dict:
logger.exception("Failed to send alert %s", key)
await db.commit() # persist watermark seeds/advances and sent-logs
return {"status": "ok", "sent": sent, "candidates": len(outgoing)}
return {"status": "ok", "sent": sent, "candidates": candidates}
async def send_test_alert(db: AsyncSession) -> dict: