"""Tests for the Telegram alert service: config, dedup, watermark, dispatch."""
from __future__ import annotations
from datetime import date, datetime, timedelta, timezone
from types import SimpleNamespace
import pytest
from sqlalchemy import select
from app.models.alert import AlertLog
from app.models.ohlcv import OHLCVRecord
from app.models.paper_trade import PaperTrade
from app.models.score import CompositeScore
from app.models.sr_level import SRLevel
from app.models.ticker import Ticker
from app.models.user import User
from app.models.watchlist import WatchlistEntry
from app.services import alert_service as svc
from tests.conftest import _test_session_factory # type: ignore
@pytest.fixture
async def session():
async with _test_session_factory() as s:
yield s
async def test_config_defaults(session):
cfg = await svc.get_alert_config(session)
assert cfg["enabled"] is False
assert cfg["bot_token_configured"] is False
assert cfg["bot_token_source"] == "none"
# trigger toggles default on
assert cfg["qualified_enabled"] is True
assert cfg["digest_enabled"] is True
async def test_update_config_token_write_only(session):
cfg = await svc.update_alert_config(
session, enabled=True, bot_token="secret123", telegram_chat_id="42",
)
assert cfg["enabled"] is True
assert cfg["telegram_chat_id"] == "42"
assert cfg["bot_token_configured"] is True
assert cfg["bot_token_source"] == "database"
# raw token never surfaced
assert "bot_token" not in cfg
assert "secret123" not in str(cfg)
async def test_update_empty_token_keeps_existing(session):
await svc.update_alert_config(session, bot_token="keepme", telegram_chat_id="1")
cfg = await svc.update_alert_config(session, bot_token="") # empty → keep
assert cfg["bot_token_configured"] is True
async def test_recently_alerted_cooldown(session):
assert await svc._recently_alerted(session, "qualified", "AAA:long") is False
svc._log_alert(session, "qualified", "AAA:long")
await session.commit()
assert await svc._recently_alerted(session, "qualified", "AAA:long") is True
# different key is independent
assert await svc._recently_alerted(session, "qualified", "BBB:short") is False
async def test_recently_alerted_expires(session):
old = datetime.now(timezone.utc) - timedelta(hours=100)
session.add(AlertLog(alert_type="qualified", dedup_key="old", created_at=old))
await session.commit()
# default cooldown 72h → the 100h-old entry no longer suppresses
assert await svc._recently_alerted(session, "qualified", "old") is False
async def _seed_watchlisted_ticker(session, symbol: str, score: float) -> None:
user = await session.get(User, 1)
if user is None:
user = User(id=1, username="u", password_hash="x", role="user", has_access=True)
session.add(user)
await session.flush()
t = Ticker(symbol=symbol)
session.add(t)
await session.flush()
session.add(WatchlistEntry(user_id=1, ticker_id=t.id, entry_type="manual",
added_at=datetime.now(timezone.utc)))
session.add(CompositeScore(ticker_id=t.id, score=score, is_stale=False,
weights_json="{}", computed_at=datetime.now(timezone.utc)))
await session.commit()
async def test_score_drop_seeds_then_alerts(session):
await _seed_watchlisted_ticker(session, "AAA", 80.0)
# First pass seeds the watermark, no alert
msgs = await svc._collect_score_drops(session)
await session.commit()
assert msgs == []
assert await svc._watermark(session, "AAA") == 80.0
# Drop the composite well past the threshold
row = (await session.execute(
CompositeScore.__table__.update().values(score=60.0)
))
await session.commit()
assert row.rowcount == 1
msgs = await svc._collect_score_drops(session)
await session.commit()
assert len(msgs) == 1
key, text = msgs[0]
assert key == "scoredrop:AAA"
assert "AAA" in text
# rebaselined to the new (lower) level
assert await svc._watermark(session, "AAA") == 60.0
def test_format_qualified_includes_current_price_and_target_move():
text = svc._format_qualified({
"symbol": "AAPL",
"direction": "long",
"current_price": 196.42,
"entry_price": 195.80,
"target": 207.50,
"stop_loss": 190.20,
"rr_ratio": 2.1,
"confidence_score": 76.0,
"targets": [{"probability": 63.0}],
})
assert "now 196.42" in text
assert "entry 195.80" in text
assert "target 207.50 (+5.6%)" in text
assert "stop 190.20" in text
assert "P(target) 63%" in text
async def _add_ticker(session, symbol: str, *, watchlisted: bool, close: float,
levels: list[tuple[float, str, int]]) -> int:
user = await session.get(User, 1)
if user is None:
session.add(User(id=1, username="u", password_hash="x", role="user", has_access=True))
await session.flush()
t = Ticker(symbol=symbol)
session.add(t)
await session.flush()
if watchlisted:
session.add(WatchlistEntry(user_id=1, ticker_id=t.id, entry_type="manual",
added_at=datetime.now(timezone.utc)))
session.add(OHLCVRecord(ticker_id=t.id, date=date.today(),
open=close, high=close, low=close, close=close, volume=1))
for price, kind, strength in levels:
session.add(SRLevel(ticker_id=t.id, price_level=price, type=kind,
strength=strength, detection_method="test"))
await session.commit()
return t.id
async def test_sr_proximity_merges_close_levels_to_one_alert(session):
# Two resistance levels ~1% apart just above price → one merged zone, one alert
await _add_ticker(session, "CVX", watchlisted=True, close=182.0,
levels=[(183.0, "resistance", 60), (185.0, "resistance", 60)])
msgs = await svc._collect_sr_proximity(session)
cvx = [m for m in msgs if "CVX" in m[1]]
assert len(cvx) == 1
assert "183.00–185.00" in cvx[0][1]
assert "now 182.00 -> 183.00–185.00 (+0.5%)" in cvx[0][1]
assert "strength 100" in cvx[0][1]
async def test_sr_proximity_skips_non_watchlist_unqualified(session):
await _add_ticker(session, "ZZZ", watchlisted=False, close=182.0,
levels=[(183.0, "resistance", 80)])
msgs = await svc._collect_sr_proximity(session)
assert not any("ZZZ" in m[1] for m in msgs)
async def test_sr_proximity_only_nearest_zone(session):
# A near resistance (~1%) and a far one (~10%); only the near one alerts
await _add_ticker(session, "AAA", watchlisted=True, close=100.0,
levels=[(101.0, "resistance", 70), (110.0, "resistance", 90)])
msgs = await svc._collect_sr_proximity(session)
aaa = [m for m in msgs if "AAA" in m[1]]
assert len(aaa) == 1
assert "101.00" in aaa[0][1]
async def test_dispatch_disabled_short_circuits(session):
res = await svc.dispatch_alerts(session)
assert res["status"] == "disabled"
async def test_dispatch_no_credentials(session):
await svc.update_alert_config(session, enabled=True) # enabled but no token/chat
res = await svc.dispatch_alerts(session)
assert res["status"] == "no_credentials"
async def test_dispatch_bundles_discovery_alerts_and_logs_each_item(session, monkeypatch):
async def fake_collect_qualified(_db):
return [
("qualified:AAPL:long", "🟢 AAPL LONG | now 196.42 | target 207.50 (+5.6%)"),
("qualified:TSLA:short", "🔴 TSLA SHORT | now 292.10 | target 276.50 (-5.3%)"),
]
async def fake_collect_sr(_db):
return [
("sr:MSFT:resistance", "📍 MSFT resistance | now 508.20 -> 512.00 (+0.7%)"),
]
sent: list[str] = []
async def fake_send(_client, _token, _chat_id, text):
sent.append(text)
monkeypatch.setattr(svc, "_collect_qualified", fake_collect_qualified)
monkeypatch.setattr(svc, "_collect_sr_proximity", fake_collect_sr)
monkeypatch.setattr(svc, "_send", fake_send)
await svc.update_alert_config(
session,
enabled=True,
bot_token="token",
telegram_chat_id="chat",
score_drop_enabled=False,
digest_enabled=False,
regime_quadrant_enabled=False,
trade_closed_enabled=False,
)
res = await svc.dispatch_alerts(session)
assert res == {"status": "ok", "sent": 1, "candidates": 3}
assert len(sent) == 1
assert "Signal run — 3 new alert(s)" in sent[0]
assert "Qualified setups" in sent[0]
assert "Near support/resistance" in sent[0]
assert "AAPL LONG" in sent[0]
assert "MSFT" in sent[0]
rows = (
await session.execute(
select(AlertLog.alert_type, AlertLog.dedup_key)
.order_by(AlertLog.alert_type, AlertLog.dedup_key)
)
).all()
assert rows == [
("qualified", "qualified:AAPL:long"),
("qualified", "qualified:TSLA:short"),
("sr_proximity", "sr:MSFT:resistance"),
]
async def _add_closed_trade(session, symbol: str, reason: str, *,
close: float = 110.0, closed_hours_ago: float = 1.0) -> None:
if await session.get(User, 1) is None:
session.add(User(id=1, username="u", password_hash="x", role="user", has_access=True))
await session.flush()
t = Ticker(symbol=symbol)
session.add(t)
await session.flush()
now = datetime.now(timezone.utc)
session.add(PaperTrade(
user_id=1, ticker_id=t.id, direction="long",
entry_price=100.0, shares=10.0, stop_loss=95.0, target=120.0,
status="closed", opened_at=now - timedelta(days=5),
close_price=close, closed_at=now - timedelta(hours=closed_hours_ago),
close_reason=reason,
))
await session.commit()
async def test_config_includes_trade_closed_toggle(session):
assert (await svc.get_alert_config(session))["trade_closed_enabled"] is True
cfg = await svc.update_alert_config(session, trade_closed_enabled=False)
assert cfg["trade_closed_enabled"] is False
async def test_collect_closed_trades_filters_manual_and_old(session):
await _add_closed_trade(session, "WIN", "trailing", close=110.0, closed_hours_ago=1)
await _add_closed_trade(session, "MAN", "manual", close=110.0, closed_hours_ago=1) # manual → skip
await _add_closed_trade(session, "OLD", "stop", close=95.0, closed_hours_ago=100) # too old → skip
out = await svc._collect_closed_trades(session)
assert len(out) == 1
_, text = out[0]
assert "WIN" in text and "trailing stop" in text
def test_format_closed_trade_win():
now = datetime.now(timezone.utc)
trade = SimpleNamespace(
direction="long", entry_price=100.0, close_price=110.0, shares=10.0,
stop_loss=95.0, opened_at=now - timedelta(days=12), closed_at=now,
close_reason="trailing",
)
txt = svc._format_closed_trade(trade, "AAA")
assert "✅" in txt # win
assert "+10.0%" in txt
assert "+2.00R" in txt # +10% over a 5% stop
assert "held 12d" in txt