5d41ccac1c
Closes the action loop — instead of polling the dashboard, the platform pushes actionable signals to Telegram. New hourly 'alerts' job dispatches four toggleable triggers, deduped via a new alert_log table (cooldown-based for qualified/S-R/digest, watermark-based for score deterioration). Admin → Settings gains a Telegram panel (write-only bot token, chat ID, per-trigger toggles, Send Test). Credentials follow DB > env precedence (TELEGRAM_BOT_TOKEN / _CHAT_ID). Backend: alert_service + AlertLog model + migration 005, scheduler job, admin endpoints/schema. Frontend: AlertSettings panel, hooks, api, types. Deploy: run alembic upgrade (new alert_log table). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
405 lines
15 KiB
Python
405 lines
15 KiB
Python
"""Telegram alerts: notify on actionable signals so the dashboard isn't a
|
|
poll-only tool.
|
|
|
|
Triggers (each toggleable):
|
|
- qualified setups: a (symbol, direction) setup that clears the activation gate
|
|
- watchlist S/R proximity: a watched ticker's price entering a strong S/R zone
|
|
- score deterioration: a watched ticker's composite dropping sharply vs a
|
|
running watermark
|
|
- daily digest: one end-of-day summary
|
|
|
|
Dedup is via the AlertLog table: cooldown-based for the first two and the digest,
|
|
watermark-based for score drops. Telegram credentials follow the usual
|
|
precedence DB > env; the bot token is write-only (never returned on read).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
from types import SimpleNamespace
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import settings
|
|
from app.models.alert import AlertLog
|
|
from app.models.ohlcv import OHLCVRecord
|
|
from app.models.score import CompositeScore
|
|
from app.models.settings import SystemSetting
|
|
from app.models.sr_level import SRLevel
|
|
from app.models.ticker import Ticker
|
|
from app.models.watchlist import WatchlistEntry
|
|
from app.services.admin_service import get_activation_config, update_setting
|
|
from app.services.qualification import best_target_probability, setup_qualifies
|
|
from app.services.rr_scanner_service import get_trade_setups
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# SystemSetting keys
|
|
KEY_ENABLED = "alerts_enabled"
|
|
KEY_TOKEN = "alerts_telegram_bot_token"
|
|
KEY_CHAT_ID = "alerts_telegram_chat_id"
|
|
KEY_QUALIFIED = "alerts_qualified_enabled"
|
|
KEY_SR = "alerts_sr_proximity_enabled"
|
|
KEY_SCORE_DROP = "alerts_score_drop_enabled"
|
|
KEY_DIGEST = "alerts_digest_enabled"
|
|
|
|
_BOOL_DEFAULTS = {
|
|
KEY_ENABLED: False,
|
|
KEY_QUALIFIED: True,
|
|
KEY_SR: True,
|
|
KEY_SCORE_DROP: True,
|
|
KEY_DIGEST: True,
|
|
}
|
|
|
|
# Tunables (kept as constants for now; promote to settings if needed)
|
|
SR_PROXIMITY_PCT = 2.0 # within this % of a strong level → alert
|
|
SR_MIN_STRENGTH = 60 # only strong levels are alert-worthy
|
|
SCORE_DROP_POINTS = 15.0 # composite drop vs watermark that triggers an alert
|
|
COOLDOWN_HOURS = 72 # don't re-send the same key within this window
|
|
DIGEST_HOUR_UTC = 22 # send the daily digest on the first run at/after this hour
|
|
|
|
WATERMARK_TYPE = "score_watermark"
|
|
|
|
|
|
def _as_bool(value: str | None, default: bool) -> bool:
|
|
if value is None:
|
|
return default
|
|
return value.strip().lower() == "true"
|
|
|
|
|
|
async def _settings_map(db: AsyncSession) -> dict[str, str]:
|
|
keys = [KEY_ENABLED, KEY_TOKEN, KEY_CHAT_ID, KEY_QUALIFIED, KEY_SR, KEY_SCORE_DROP, KEY_DIGEST]
|
|
result = await db.execute(select(SystemSetting).where(SystemSetting.key.in_(keys)))
|
|
return {s.key: s.value for s in result.scalars().all()}
|
|
|
|
|
|
async def _resolve(db: AsyncSession) -> dict:
|
|
stored = await _settings_map(db)
|
|
|
|
db_token = (stored.get(KEY_TOKEN) or "").strip()
|
|
if db_token:
|
|
token, token_source = db_token, "database"
|
|
elif settings.telegram_bot_token:
|
|
token, token_source = settings.telegram_bot_token, "environment"
|
|
else:
|
|
token, token_source = "", "none"
|
|
|
|
chat_id = (stored.get(KEY_CHAT_ID) or "").strip() or (settings.telegram_chat_id or "").strip()
|
|
|
|
return {
|
|
"enabled": _as_bool(stored.get(KEY_ENABLED), _BOOL_DEFAULTS[KEY_ENABLED]),
|
|
"token": token,
|
|
"token_source": token_source,
|
|
"chat_id": chat_id,
|
|
"qualified": _as_bool(stored.get(KEY_QUALIFIED), _BOOL_DEFAULTS[KEY_QUALIFIED]),
|
|
"sr": _as_bool(stored.get(KEY_SR), _BOOL_DEFAULTS[KEY_SR]),
|
|
"score_drop": _as_bool(stored.get(KEY_SCORE_DROP), _BOOL_DEFAULTS[KEY_SCORE_DROP]),
|
|
"digest": _as_bool(stored.get(KEY_DIGEST), _BOOL_DEFAULTS[KEY_DIGEST]),
|
|
}
|
|
|
|
|
|
async def get_alert_config(db: AsyncSession) -> dict:
|
|
"""Public config — never includes the raw bot token."""
|
|
r = await _resolve(db)
|
|
return {
|
|
"enabled": r["enabled"],
|
|
"telegram_chat_id": r["chat_id"],
|
|
"bot_token_configured": bool(r["token"]),
|
|
"bot_token_source": r["token_source"],
|
|
"qualified_enabled": r["qualified"],
|
|
"sr_proximity_enabled": r["sr"],
|
|
"score_drop_enabled": r["score_drop"],
|
|
"digest_enabled": r["digest"],
|
|
}
|
|
|
|
|
|
async def update_alert_config(
|
|
db: AsyncSession,
|
|
*,
|
|
enabled: bool | None = None,
|
|
bot_token: str | None = None,
|
|
telegram_chat_id: str | None = None,
|
|
qualified_enabled: bool | None = None,
|
|
sr_proximity_enabled: bool | None = None,
|
|
score_drop_enabled: bool | None = None,
|
|
digest_enabled: bool | None = None,
|
|
) -> dict:
|
|
"""Persist config. An empty/omitted bot_token leaves the stored token intact."""
|
|
bool_updates = {
|
|
KEY_ENABLED: enabled,
|
|
KEY_QUALIFIED: qualified_enabled,
|
|
KEY_SR: sr_proximity_enabled,
|
|
KEY_SCORE_DROP: score_drop_enabled,
|
|
KEY_DIGEST: digest_enabled,
|
|
}
|
|
for key, val in bool_updates.items():
|
|
if val is not None:
|
|
await update_setting(db, key, "true" if val else "false")
|
|
|
|
if telegram_chat_id is not None:
|
|
await update_setting(db, KEY_CHAT_ID, telegram_chat_id.strip())
|
|
|
|
if bot_token: # only overwrite when a non-empty token is supplied
|
|
await update_setting(db, KEY_TOKEN, bot_token.strip())
|
|
|
|
return await get_alert_config(db)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Telegram transport
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _send(client: httpx.AsyncClient, token: str, chat_id: str, text: str) -> None:
|
|
resp = await client.post(
|
|
f"https://api.telegram.org/bot{token}/sendMessage",
|
|
json={
|
|
"chat_id": chat_id,
|
|
"text": text,
|
|
"parse_mode": "HTML",
|
|
"disable_web_page_preview": True,
|
|
},
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dedup helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _recently_alerted(
|
|
db: AsyncSession, alert_type: str, key: str, cooldown_hours: int = COOLDOWN_HOURS
|
|
) -> bool:
|
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=cooldown_hours)
|
|
result = await db.execute(
|
|
select(AlertLog.id)
|
|
.where(
|
|
AlertLog.alert_type == alert_type,
|
|
AlertLog.dedup_key == key,
|
|
AlertLog.created_at > cutoff,
|
|
)
|
|
.limit(1)
|
|
)
|
|
return result.first() is not None
|
|
|
|
|
|
def _log_alert(db: AsyncSession, alert_type: str, key: str, value: float | None = None) -> None:
|
|
db.add(
|
|
AlertLog(
|
|
alert_type=alert_type,
|
|
dedup_key=key,
|
|
value=value,
|
|
created_at=datetime.now(timezone.utc),
|
|
)
|
|
)
|
|
|
|
|
|
async def _watermark(db: AsyncSession, symbol: str) -> float | None:
|
|
result = await db.execute(
|
|
select(AlertLog.value)
|
|
.where(AlertLog.alert_type == WATERMARK_TYPE, AlertLog.dedup_key == symbol)
|
|
.order_by(AlertLog.created_at.desc())
|
|
.limit(1)
|
|
)
|
|
row = result.first()
|
|
return row[0] if row else None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Trigger collectors
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _watchlist_tickers(db: AsyncSession) -> list[tuple[int, str]]:
|
|
"""Distinct tickers across all watchlists (single-user app → one chat)."""
|
|
result = await db.execute(
|
|
select(WatchlistEntry.ticker_id, Ticker.symbol)
|
|
.join(Ticker, WatchlistEntry.ticker_id == Ticker.id)
|
|
.where(WatchlistEntry.entry_type != "dismissed")
|
|
.distinct()
|
|
)
|
|
return [(tid, sym) for tid, sym in result.all()]
|
|
|
|
|
|
async def _qualified_setups(db: AsyncSession) -> list[dict]:
|
|
setups = await get_trade_setups(db)
|
|
config = await get_activation_config(db)
|
|
return [s for s in setups if setup_qualifies(SimpleNamespace(**s), config)]
|
|
|
|
|
|
def _format_qualified(s: dict) -> str:
|
|
prob = best_target_probability(SimpleNamespace(**s))
|
|
arrow = "🟢" if s["direction"] == "long" else "🔴"
|
|
return (
|
|
f"{arrow} <b>{s['symbol']} {s['direction'].upper()}</b> — qualified setup\n"
|
|
f"entry {s['entry_price']:.2f} → target {s['target']:.2f} "
|
|
f"(R:R {s['rr_ratio']:.1f}:1)\n"
|
|
f"confidence {(s.get('confidence_score') or 0):.0f}% · P(target) {prob:.0f}%"
|
|
)
|
|
|
|
|
|
async def _collect_qualified(db: AsyncSession) -> list[tuple[str, str]]:
|
|
out: list[tuple[str, str]] = []
|
|
for s in await _qualified_setups(db):
|
|
key = f"qualified:{s['symbol']}:{s['direction']}"
|
|
out.append((key, _format_qualified(s)))
|
|
return out
|
|
|
|
|
|
async def _latest_close(db: AsyncSession, ticker_id: int) -> float | None:
|
|
result = await db.execute(
|
|
select(OHLCVRecord.close)
|
|
.where(OHLCVRecord.ticker_id == ticker_id)
|
|
.order_by(OHLCVRecord.date.desc())
|
|
.limit(1)
|
|
)
|
|
row = result.first()
|
|
return float(row[0]) if row else None
|
|
|
|
|
|
async def _collect_sr_proximity(db: AsyncSession) -> list[tuple[str, str]]:
|
|
out: list[tuple[str, str]] = []
|
|
for tid, symbol in await _watchlist_tickers(db):
|
|
price = await _latest_close(db, tid)
|
|
if not price:
|
|
continue
|
|
levels_result = await db.execute(
|
|
select(SRLevel).where(
|
|
SRLevel.ticker_id == tid,
|
|
SRLevel.strength >= SR_MIN_STRENGTH,
|
|
)
|
|
)
|
|
for lv in levels_result.scalars().all():
|
|
dist_pct = abs(price - lv.price_level) / price * 100
|
|
if dist_pct <= SR_PROXIMITY_PCT:
|
|
key = f"sr:{symbol}:{lv.price_level:.2f}"
|
|
out.append((
|
|
key,
|
|
f"📍 <b>{symbol}</b> approaching {lv.type} at {lv.price_level:.2f} "
|
|
f"(now {price:.2f}, {dist_pct:.1f}% away)",
|
|
))
|
|
return out
|
|
|
|
|
|
async def _collect_score_drops(db: AsyncSession) -> list[tuple[str, str]]:
|
|
"""Returns drop messages and (as a side effect) advances watermarks.
|
|
|
|
Watermark = the reference composite. Alert when current drops
|
|
SCORE_DROP_POINTS below it, then rebaseline to current so a single slide
|
|
doesn't re-fire; let the watermark rise with the score so the next drop is
|
|
measured from the new high.
|
|
"""
|
|
out: list[tuple[str, str]] = []
|
|
for tid, symbol in await _watchlist_tickers(db):
|
|
comp_result = await db.execute(
|
|
select(CompositeScore.score).where(CompositeScore.ticker_id == tid)
|
|
)
|
|
row = comp_result.first()
|
|
if row is None or row[0] is None:
|
|
continue
|
|
current = float(row[0])
|
|
|
|
base = await _watermark(db, symbol)
|
|
if base is None:
|
|
_log_alert(db, WATERMARK_TYPE, symbol, value=current) # seed, no alert
|
|
continue
|
|
if current <= base - SCORE_DROP_POINTS:
|
|
out.append((
|
|
f"scoredrop:{symbol}",
|
|
f"🔻 <b>{symbol}</b> composite score fell to {current:.0f} (from {base:.0f})",
|
|
))
|
|
_log_alert(db, WATERMARK_TYPE, symbol, value=current) # rebaseline
|
|
elif current > base:
|
|
_log_alert(db, WATERMARK_TYPE, symbol, value=current) # track the rise
|
|
return out
|
|
|
|
|
|
async def _collect_digest(db: AsyncSession) -> tuple[str, str] | None:
|
|
now = datetime.now(timezone.utc)
|
|
if now.hour < DIGEST_HOUR_UTC:
|
|
return None
|
|
key = f"digest:{now.date().isoformat()}"
|
|
if await _recently_alerted(db, "digest", key, cooldown_hours=20):
|
|
return None
|
|
|
|
qualified = await _qualified_setups(db)
|
|
lines = [f"📊 <b>Daily digest</b> — {now.date().isoformat()}"]
|
|
if qualified:
|
|
top = sorted(qualified, key=lambda s: s["rr_ratio"], reverse=True)[:5]
|
|
lines.append(f"{len(qualified)} qualified setup(s):")
|
|
for s in top:
|
|
lines.append(
|
|
f"• {s['symbol']} {s['direction'].upper()} "
|
|
f"R:R {s['rr_ratio']:.1f}:1, conf {(s.get('confidence_score') or 0):.0f}%"
|
|
)
|
|
else:
|
|
lines.append("No qualified setups today.")
|
|
return key, "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dispatch
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def dispatch_alerts(db: AsyncSession) -> dict:
|
|
"""Gather all enabled triggers, dedup, and push to Telegram. Job entrypoint."""
|
|
cfg = await _resolve(db)
|
|
if not cfg["enabled"]:
|
|
return {"status": "disabled", "sent": 0}
|
|
if not cfg["token"] or not cfg["chat_id"]:
|
|
return {"status": "no_credentials", "sent": 0}
|
|
|
|
outgoing: list[tuple[str, str, str]] = [] # (alert_type, key, text)
|
|
|
|
if cfg["qualified"]:
|
|
for key, text in await _collect_qualified(db):
|
|
if not await _recently_alerted(db, "qualified", key):
|
|
outgoing.append(("qualified", key, text))
|
|
|
|
if cfg["sr"]:
|
|
for key, text in await _collect_sr_proximity(db):
|
|
if not await _recently_alerted(db, "sr_proximity", key):
|
|
outgoing.append(("sr_proximity", key, text))
|
|
|
|
if cfg["score_drop"]:
|
|
# also seeds/advances watermarks as a side effect
|
|
for key, text in await _collect_score_drops(db):
|
|
outgoing.append(("score_drop", key, text))
|
|
|
|
if cfg["digest"]:
|
|
digest = await _collect_digest(db)
|
|
if digest is not None:
|
|
outgoing.append(("digest", digest[0], digest[1]))
|
|
|
|
sent = 0
|
|
if outgoing:
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
for alert_type, key, text in outgoing:
|
|
try:
|
|
await _send(client, cfg["token"], cfg["chat_id"], text)
|
|
_log_alert(db, alert_type, key)
|
|
sent += 1
|
|
except Exception:
|
|
logger.exception("Failed to send alert %s", key)
|
|
|
|
await db.commit() # persist watermark seeds/advances and sent-logs
|
|
return {"status": "ok", "sent": sent, "candidates": len(outgoing)}
|
|
|
|
|
|
async def send_test_alert(db: AsyncSession) -> dict:
|
|
"""Send a fixed message to verify Telegram credentials."""
|
|
cfg = await _resolve(db)
|
|
if not cfg["token"] or not cfg["chat_id"]:
|
|
return {"ok": False, "error": "Bot token and chat ID must both be configured."}
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
await _send(
|
|
client, cfg["token"], cfg["chat_id"],
|
|
"✅ <b>Signal Platform</b> — test alert. Notifications are wired up correctly.",
|
|
)
|
|
return {"ok": True}
|
|
except Exception as exc:
|
|
logger.warning("Test alert failed: %s", exc)
|
|
return {"ok": False, "error": str(exc)}
|