diff --git a/app/services/alert_service.py b/app/services/alert_service.py index 0141a6c..d65eb8f 100644 --- a/app/services/alert_service.py +++ b/app/services/alert_service.py @@ -75,6 +75,13 @@ COOLDOWN_HOURS = 72 # don't re-send the same key within this window DIGEST_HOUR_UTC = 22 # send the daily digest on the first run at/after this hour WATERMARK_TYPE = "score_watermark" +SIGNAL_BUNDLE_ALERT_TYPES = ("qualified", "sr_proximity", "score_drop") +SIGNAL_BUNDLE_SECTIONS = ( + ("qualified", "Qualified setups"), + ("sr_proximity", "Near support/resistance"), + ("score_drop", "Score drops"), +) +SIGNAL_BUNDLE_MAX_CHARS = 3900 # Telegram limit is 4096; keep room for HTML parsing # Regime quadrant-change alert: (regime index x early-warning) quadrant. # Hysteresis (a deadband around each divider) stops a point sitting on a boundary @@ -91,6 +98,9 @@ QUAD_LABELS = { "4": "โ‘ฃ Real downturn", } +AlertItem = tuple[str, str, str] # alert_type, dedup_key, text +AlertLogRef = tuple[str, str] # alert_type, dedup_key + def _as_bool(value: str | None, default: bool) -> bool: if value is None: @@ -261,14 +271,30 @@ async def _qualified_setups(db: AsyncSession) -> list[dict]: return [s for s in setups if setup_qualifies(SimpleNamespace(**s), config)] +def _fmt_price(value: float | int | None) -> str: + return "n/a" if value is None else f"{float(value):.2f}" + + +def _fmt_signed_move(from_price: float | int | None, to_price: float | int | None) -> str: + if from_price is None or to_price is None: + return "n/a" + from_float = float(from_price) + if from_float == 0: + return "n/a" + pct = (float(to_price) - from_float) / from_float * 100.0 + return f"{pct:+.1f}%" + + def _format_qualified(s: dict) -> str: prob = best_target_probability(SimpleNamespace(**s)) arrow = "๐ŸŸข" if s["direction"] == "long" else "๐Ÿ”ด" + current = s.get("current_price") or s.get("entry_price") return ( - f"{arrow} {s['symbol']} {s['direction'].upper()} โ€” qualified setup\n" - f"entry {s['entry_price']:.2f} โ†’ target {s['target']:.2f} " - f"(R:R {s['rr_ratio']:.1f}:1)\n" - f"confidence {(s.get('confidence_score') or 0):.0f}% ยท P(target) {prob:.0f}%" + f"{arrow} {s['symbol']} {s['direction'].upper()} | " + f"now {_fmt_price(current)} | entry {_fmt_price(s['entry_price'])} | " + f"target {_fmt_price(s['target'])} ({_fmt_signed_move(current, s['target'])}) | " + f"stop {_fmt_price(s['stop_loss'])} | R:R {s['rr_ratio']:.1f} | " + f"conf {(s.get('confidence_score') or 0):.0f}% | P(target) {prob:.0f}%" ) @@ -291,6 +317,34 @@ async def _latest_close(db: AsyncSession, ticker_id: int) -> float | None: return float(row[0]) if row else None +def _sr_zone_label(zone: dict) -> str: + return ( + f"{zone['low']:.2f}โ€“{zone['high']:.2f}" + if zone["level_count"] > 1 + else f"{zone['midpoint']:.2f}" + ) + + +def _sr_touch_price(zone: dict, current_price: float) -> float: + low = float(zone["low"]) + high = float(zone["high"]) + if current_price < low: + return low + if current_price > high: + return high + return float(zone["midpoint"]) + + +def _format_sr_proximity(symbol: str, zone: dict, current_price: float) -> str: + touch_price = _sr_touch_price(zone, current_price) + return ( + f"๐Ÿ“ {symbol} {zone['type']} | " + f"now {_fmt_price(current_price)} -> {_sr_zone_label(zone)} " + f"({_fmt_signed_move(current_price, touch_price)}) | " + f"strength {float(zone['strength']):.0f}" + ) + + async def _collect_sr_proximity(db: AsyncSession) -> list[tuple[str, str]]: """One alert per watchlist ticker for the NEAREST strong S/R zone within range. @@ -320,21 +374,12 @@ async def _collect_sr_proximity(db: AsyncSession) -> list[tuple[str, str]]: # Nearest strong zone only. nearest = min(strong, key=lambda z: abs(price - z["midpoint"])) - dist_pct = abs(price - nearest["midpoint"]) / price * 100 + dist_pct = abs(price - _sr_touch_price(nearest, price)) / price * 100 if dist_pct > SR_PROXIMITY_PCT: continue - label = ( - f"{nearest['low']:.2f}โ€“{nearest['high']:.2f}" - if nearest["level_count"] > 1 - else f"{nearest['midpoint']:.2f}" - ) key = f"sr:{symbol}:{nearest['type']}" # one per side per ticker per cooldown - out.append(( - key, - f"๐Ÿ“ {symbol} approaching {nearest['type']} {label} " - f"(now {price:.2f}, {dist_pct:.1f}% away)", - )) + out.append((key, _format_sr_proximity(symbol, nearest, price))) return out @@ -552,6 +597,49 @@ async def _collect_regime_quadrant(db: AsyncSession) -> list[tuple[str, str]]: # Dispatch # --------------------------------------------------------------------------- +def _signal_bundle_messages(items: list[AlertItem]) -> list[tuple[list[AlertLogRef], str]]: + if not items: + return [] + + by_type: dict[str, list[AlertItem]] = {key: [] for key in SIGNAL_BUNDLE_ALERT_TYPES} + for item in items: + by_type.setdefault(item[0], []).append(item) + + total = sum(len(group) for group in by_type.values()) + header = f"๐Ÿ“ฃ Signal run โ€” {total} new alert(s)" + bundles: list[tuple[list[AlertLogRef], str]] = [] + lines = [header] + logs: list[AlertLogRef] = [] + current_section: str | None = None + + def flush() -> None: + nonlocal lines, logs, current_section + if logs: + bundles.append((logs.copy(), "\n".join(lines))) + lines = [f"{header} (continued)"] + logs = [] + current_section = None + + for alert_type, section_title in SIGNAL_BUNDLE_SECTIONS: + for item_type, key, text in by_type.get(alert_type, []): + block: list[str] = [] + if current_section != alert_type: + block.extend(["", f"{section_title}"]) + block.append(text) + + if logs and len("\n".join(lines + block)) > SIGNAL_BUNDLE_MAX_CHARS: + flush() + block = ["", f"{section_title}", text] + + lines.extend(block) + logs.append((item_type, key)) + current_section = alert_type + + if logs: + bundles.append((logs.copy(), "\n".join(lines))) + return bundles + + async def dispatch_alerts(db: AsyncSession) -> dict: """Gather all enabled triggers, dedup, and push to Telegram. Job entrypoint.""" cfg = await _resolve(db) @@ -560,22 +648,23 @@ async def dispatch_alerts(db: AsyncSession) -> dict: if not cfg["token"] or not cfg["chat_id"]: return {"status": "no_credentials", "sent": 0} - outgoing: list[tuple[str, str, str]] = [] # (alert_type, key, text) + signal_outgoing: list[AlertItem] = [] + outgoing: list[AlertItem] = [] if cfg["qualified"]: for key, text in await _collect_qualified(db): if not await _recently_alerted(db, "qualified", key): - outgoing.append(("qualified", key, text)) + signal_outgoing.append(("qualified", key, text)) if cfg["sr"]: for key, text in await _collect_sr_proximity(db): if not await _recently_alerted(db, "sr_proximity", key): - outgoing.append(("sr_proximity", key, text)) + signal_outgoing.append(("sr_proximity", key, text)) if cfg["score_drop"]: # also seeds/advances watermarks as a side effect for key, text in await _collect_score_drops(db): - outgoing.append(("score_drop", key, text)) + signal_outgoing.append(("score_drop", key, text)) if cfg["digest"]: digest = await _collect_digest(db) @@ -593,8 +682,18 @@ async def dispatch_alerts(db: AsyncSession) -> dict: outgoing.append((TRADE_CLOSED_TYPE, key, text)) sent = 0 - if outgoing: + candidates = len(signal_outgoing) + len(outgoing) + if signal_outgoing or outgoing: async with httpx.AsyncClient(timeout=15) as client: + for log_refs, text in _signal_bundle_messages(signal_outgoing): + try: + await _send(client, cfg["token"], cfg["chat_id"], text) + for alert_type, key in log_refs: + _log_alert(db, alert_type, key) + sent += 1 + except Exception: + logger.exception("Failed to send signal alert bundle") + for alert_type, key, text in outgoing: try: await _send(client, cfg["token"], cfg["chat_id"], text) @@ -604,7 +703,7 @@ async def dispatch_alerts(db: AsyncSession) -> dict: logger.exception("Failed to send alert %s", key) await db.commit() # persist watermark seeds/advances and sent-logs - return {"status": "ok", "sent": sent, "candidates": len(outgoing)} + return {"status": "ok", "sent": sent, "candidates": candidates} async def send_test_alert(db: AsyncSession) -> dict: diff --git a/tests/unit/test_alert_service.py b/tests/unit/test_alert_service.py index 8c42984..5451e4e 100644 --- a/tests/unit/test_alert_service.py +++ b/tests/unit/test_alert_service.py @@ -6,6 +6,7 @@ from datetime import date, datetime, timedelta, timezone from types import SimpleNamespace import pytest +from sqlalchemy import select from app.models.alert import AlertLog from app.models.ohlcv import OHLCVRecord @@ -113,6 +114,26 @@ async def test_score_drop_seeds_then_alerts(session): assert await svc._watermark(session, "AAA") == 60.0 +def test_format_qualified_includes_current_price_and_target_move(): + text = svc._format_qualified({ + "symbol": "AAPL", + "direction": "long", + "current_price": 196.42, + "entry_price": 195.80, + "target": 207.50, + "stop_loss": 190.20, + "rr_ratio": 2.1, + "confidence_score": 76.0, + "targets": [{"probability": 63.0}], + }) + + assert "now 196.42" in text + assert "entry 195.80" in text + assert "target 207.50 (+5.6%)" in text + assert "stop 190.20" in text + assert "P(target) 63%" in text + + async def _add_ticker(session, symbol: str, *, watchlisted: bool, close: float, levels: list[tuple[float, str, int]]) -> int: user = await session.get(User, 1) @@ -142,6 +163,8 @@ async def test_sr_proximity_merges_close_levels_to_one_alert(session): cvx = [m for m in msgs if "CVX" in m[1]] assert len(cvx) == 1 assert "183.00โ€“185.00" in cvx[0][1] + assert "now 182.00 -> 183.00โ€“185.00 (+0.5%)" in cvx[0][1] + assert "strength 100" in cvx[0][1] async def test_sr_proximity_skips_non_watchlist_unqualified(session): @@ -172,6 +195,61 @@ async def test_dispatch_no_credentials(session): assert res["status"] == "no_credentials" +async def test_dispatch_bundles_discovery_alerts_and_logs_each_item(session, monkeypatch): + async def fake_collect_qualified(_db): + return [ + ("qualified:AAPL:long", "๐ŸŸข AAPL LONG | now 196.42 | target 207.50 (+5.6%)"), + ("qualified:TSLA:short", "๐Ÿ”ด TSLA SHORT | now 292.10 | target 276.50 (-5.3%)"), + ] + + async def fake_collect_sr(_db): + return [ + ("sr:MSFT:resistance", "๐Ÿ“ MSFT resistance | now 508.20 -> 512.00 (+0.7%)"), + ] + + sent: list[str] = [] + + async def fake_send(_client, _token, _chat_id, text): + sent.append(text) + + monkeypatch.setattr(svc, "_collect_qualified", fake_collect_qualified) + monkeypatch.setattr(svc, "_collect_sr_proximity", fake_collect_sr) + monkeypatch.setattr(svc, "_send", fake_send) + + await svc.update_alert_config( + session, + enabled=True, + bot_token="token", + telegram_chat_id="chat", + score_drop_enabled=False, + digest_enabled=False, + regime_quadrant_enabled=False, + trade_closed_enabled=False, + ) + + res = await svc.dispatch_alerts(session) + + assert res == {"status": "ok", "sent": 1, "candidates": 3} + assert len(sent) == 1 + assert "Signal run โ€” 3 new alert(s)" in sent[0] + assert "Qualified setups" in sent[0] + assert "Near support/resistance" in sent[0] + assert "AAPL LONG" in sent[0] + assert "MSFT" in sent[0] + + rows = ( + await session.execute( + select(AlertLog.alert_type, AlertLog.dedup_key) + .order_by(AlertLog.alert_type, AlertLog.dedup_key) + ) + ).all() + assert rows == [ + ("qualified", "qualified:AAPL:long"), + ("qualified", "qualified:TSLA:short"), + ("sr_proximity", "sr:MSFT:resistance"), + ] + + async def _add_closed_trade(session, symbol: str, reason: str, *, close: float = 110.0, closed_hours_ago: float = 1.0) -> None: if await session.get(User, 1) is None: