Files
signal-platform/tests/unit/test_alert_service.py
T
dennisthiessen 88239e6ef8
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 34s
Deploy / deploy (push) Successful in 24s
S/R alerts: nearest zone only, scoped to watchlist + qualified, merged levels
Three fixes to over-firing S/R proximity alerts:
- Route through cluster_sr_zones (the same merger the chart uses) instead of raw
  SRLevel rows, so near-duplicate levels (e.g. CVX 183 + 185) collapse into one
  zone and one alert.
- Alert only the single NEAREST strong zone per ticker, not every nearby level.
- Scope to watchlist + qualified-setup tickers via _alert_scope_tickers (was
  iterating all watchlist entries only; qualified setups are now included too).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 10:07:06 +02:00

171 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for the Telegram alert service: config, dedup, watermark, dispatch."""
from __future__ import annotations
from datetime import date, datetime, timedelta, timezone
import pytest
from app.models.alert import AlertLog
from app.models.ohlcv import OHLCVRecord
from app.models.score import CompositeScore
from app.models.sr_level import SRLevel
from app.models.ticker import Ticker
from app.models.user import User
from app.models.watchlist import WatchlistEntry
from app.services import alert_service as svc
from tests.conftest import _test_session_factory # type: ignore
@pytest.fixture
async def session():
async with _test_session_factory() as s:
yield s
async def test_config_defaults(session):
cfg = await svc.get_alert_config(session)
assert cfg["enabled"] is False
assert cfg["bot_token_configured"] is False
assert cfg["bot_token_source"] == "none"
# trigger toggles default on
assert cfg["qualified_enabled"] is True
assert cfg["digest_enabled"] is True
async def test_update_config_token_write_only(session):
cfg = await svc.update_alert_config(
session, enabled=True, bot_token="secret123", telegram_chat_id="42",
)
assert cfg["enabled"] is True
assert cfg["telegram_chat_id"] == "42"
assert cfg["bot_token_configured"] is True
assert cfg["bot_token_source"] == "database"
# raw token never surfaced
assert "bot_token" not in cfg
assert "secret123" not in str(cfg)
async def test_update_empty_token_keeps_existing(session):
await svc.update_alert_config(session, bot_token="keepme", telegram_chat_id="1")
cfg = await svc.update_alert_config(session, bot_token="") # empty → keep
assert cfg["bot_token_configured"] is True
async def test_recently_alerted_cooldown(session):
assert await svc._recently_alerted(session, "qualified", "AAA:long") is False
svc._log_alert(session, "qualified", "AAA:long")
await session.commit()
assert await svc._recently_alerted(session, "qualified", "AAA:long") is True
# different key is independent
assert await svc._recently_alerted(session, "qualified", "BBB:short") is False
async def test_recently_alerted_expires(session):
old = datetime.now(timezone.utc) - timedelta(hours=100)
session.add(AlertLog(alert_type="qualified", dedup_key="old", created_at=old))
await session.commit()
# default cooldown 72h → the 100h-old entry no longer suppresses
assert await svc._recently_alerted(session, "qualified", "old") is False
async def _seed_watchlisted_ticker(session, symbol: str, score: float) -> None:
user = await session.get(User, 1)
if user is None:
user = User(id=1, username="u", password_hash="x", role="user", has_access=True)
session.add(user)
await session.flush()
t = Ticker(symbol=symbol)
session.add(t)
await session.flush()
session.add(WatchlistEntry(user_id=1, ticker_id=t.id, entry_type="manual",
added_at=datetime.now(timezone.utc)))
session.add(CompositeScore(ticker_id=t.id, score=score, is_stale=False,
weights_json="{}", computed_at=datetime.now(timezone.utc)))
await session.commit()
async def test_score_drop_seeds_then_alerts(session):
await _seed_watchlisted_ticker(session, "AAA", 80.0)
# First pass seeds the watermark, no alert
msgs = await svc._collect_score_drops(session)
await session.commit()
assert msgs == []
assert await svc._watermark(session, "AAA") == 80.0
# Drop the composite well past the threshold
row = (await session.execute(
CompositeScore.__table__.update().values(score=60.0)
))
await session.commit()
assert row.rowcount == 1
msgs = await svc._collect_score_drops(session)
await session.commit()
assert len(msgs) == 1
key, text = msgs[0]
assert key == "scoredrop:AAA"
assert "AAA" in text
# rebaselined to the new (lower) level
assert await svc._watermark(session, "AAA") == 60.0
async def _add_ticker(session, symbol: str, *, watchlisted: bool, close: float,
levels: list[tuple[float, str, int]]) -> int:
user = await session.get(User, 1)
if user is None:
session.add(User(id=1, username="u", password_hash="x", role="user", has_access=True))
await session.flush()
t = Ticker(symbol=symbol)
session.add(t)
await session.flush()
if watchlisted:
session.add(WatchlistEntry(user_id=1, ticker_id=t.id, entry_type="manual",
added_at=datetime.now(timezone.utc)))
session.add(OHLCVRecord(ticker_id=t.id, date=date.today(),
open=close, high=close, low=close, close=close, volume=1))
for price, kind, strength in levels:
session.add(SRLevel(ticker_id=t.id, price_level=price, type=kind,
strength=strength, detection_method="test"))
await session.commit()
return t.id
async def test_sr_proximity_merges_close_levels_to_one_alert(session):
# Two resistance levels ~1% apart just above price → one merged zone, one alert
await _add_ticker(session, "CVX", watchlisted=True, close=182.0,
levels=[(183.0, "resistance", 60), (185.0, "resistance", 60)])
msgs = await svc._collect_sr_proximity(session)
cvx = [m for m in msgs if "CVX" in m[1]]
assert len(cvx) == 1
assert "183.00185.00" in cvx[0][1]
async def test_sr_proximity_skips_non_watchlist_unqualified(session):
await _add_ticker(session, "ZZZ", watchlisted=False, close=182.0,
levels=[(183.0, "resistance", 80)])
msgs = await svc._collect_sr_proximity(session)
assert not any("ZZZ" in m[1] for m in msgs)
async def test_sr_proximity_only_nearest_zone(session):
# A near resistance (~1%) and a far one (~10%); only the near one alerts
await _add_ticker(session, "AAA", watchlisted=True, close=100.0,
levels=[(101.0, "resistance", 70), (110.0, "resistance", 90)])
msgs = await svc._collect_sr_proximity(session)
aaa = [m for m in msgs if "AAA" in m[1]]
assert len(aaa) == 1
assert "101.00" in aaa[0][1]
async def test_dispatch_disabled_short_circuits(session):
res = await svc.dispatch_alerts(session)
assert res["status"] == "disabled"
async def test_dispatch_no_credentials(session):
await svc.update_alert_config(session, enabled=True) # enabled but no token/chat
res = await svc.dispatch_alerts(session)
assert res["status"] == "no_credentials"