Files
signal-platform/app/services/rr_scanner_service.py
T
dennisthiessen ac51e23949
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 1m0s
Deploy / deploy (push) Successful in 32s
Serve live recommendation context on trade setup APIs and alerts
Stored TradeSetup rows are point-in-time snapshots from the RR scan, so
the ticker page could show stale confidence/reasoning/composite (e.g.
sentiment=neutral in the setup card while the sentiment panel showed
bullish). Overlay current score/sentiment context onto the API payload
for GET /trades and GET /trades/{symbol}, gate and format Telegram
qualified-setup alerts on the same live values, and apply the
min_confidence/recommended_action filters after the overlay so they
judge what the caller actually sees. Stored setups stay frozen for
outcome analysis and backtests.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 09:17:27 +02:00

710 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""R:R Scanner service.
Scans tracked tickers for asymmetric risk-reward trade setups.
Long: target = nearest SR above, stop = entry - ATR × multiplier.
Short: target = nearest SR below, stop = entry + ATR × multiplier.
Filters by configurable R:R threshold (default 1.5).
"""
from __future__ import annotations
import json
import logging
from collections.abc import Callable
from datetime import date, datetime, timezone
from sqlalchemy import and_, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.exceptions import NotFoundError
from app.models.fundamental import FundamentalData
from app.models.ohlcv import OHLCVRecord
from app.models.score import CompositeScore, DimensionScore
from app.models.sentiment import SentimentScore
from app.models.signal_context_snapshot import SignalContextSnapshot
from app.models.sr_level import SRLevel
from app.models.ticker import Ticker
from app.models.trade_setup import TradeSetup
from app.services.indicator_service import _extract_ohlcv, compute_atr
from app.services.price_service import query_ohlcv
from app.services.recommendation_service import (
build_recommendation_snapshot,
enhance_trade_setup,
get_recommendation_config,
)
logger = logging.getLogger(__name__)
STRATEGY_VERSION = "residual_momentum_12_1_rr_time_v2"
async def _get_ticker(db: AsyncSession, symbol: str) -> Ticker:
normalised = symbol.strip().upper()
result = await db.execute(select(Ticker).where(Ticker.symbol == normalised))
ticker = result.scalar_one_or_none()
if ticker is None:
raise NotFoundError(f"Ticker not found: {normalised}")
return ticker
def _compute_quality_score(
rr: float,
strength: int,
distance: float,
entry_price: float,
*,
w_rr: float = 0.35,
w_strength: float = 0.35,
w_proximity: float = 0.30,
rr_cap: float = 10.0,
) -> float:
"""Compute a quality score for a candidate S/R level."""
norm_rr = min(rr / rr_cap, 1.0)
norm_strength = strength / 100.0
norm_proximity = 1.0 - min(distance / entry_price, 1.0)
return w_rr * norm_rr + w_strength * norm_strength + w_proximity * norm_proximity
async def _get_dimension_scores(db: AsyncSession, ticker_id: int) -> dict[str, float]:
result = await db.execute(
select(DimensionScore).where(DimensionScore.ticker_id == ticker_id)
)
rows = result.scalars().all()
return {row.dimension: float(row.score) for row in rows}
async def _get_latest_sentiment(db: AsyncSession, ticker_id: int) -> str | None:
result = await db.execute(
select(SentimentScore)
.where(SentimentScore.ticker_id == ticker_id)
.order_by(SentimentScore.timestamp.desc())
.limit(1)
)
row = result.scalar_one_or_none()
return row.classification if row else None
async def _refresh_score_context_for_symbols(
db: AsyncSession,
symbols: set[str],
) -> None:
"""Refresh provider-free scores so live recommendation summaries match the page."""
if not symbols:
return
from app.services import scoring_service
refreshed = False
for symbol in sorted(symbols):
try:
await scoring_service.compute_all_dimensions(db, symbol)
await scoring_service.compute_composite_score(db, symbol)
refreshed = True
except Exception:
logger.exception("Error refreshing live score context for %s", symbol)
if refreshed:
await db.commit()
async def _apply_live_recommendation_context(
db: AsyncSession,
setup_rows: list[tuple[TradeSetup, str]],
rows: list[dict],
) -> list[dict]:
"""Decorate latest setup rows with current score/sentiment recommendation data.
This intentionally updates only the API payload. Stored trade setups and
history remain point-in-time records for outcome analysis.
"""
if not rows or not setup_rows:
return rows
ticker_ids = {setup.ticker_id for setup, _ in setup_rows}
setups_by_id = {setup.id: setup for setup, _ in setup_rows}
directions_by_ticker: dict[int, set[str]] = {}
for setup, _ in setup_rows:
directions_by_ticker.setdefault(setup.ticker_id, set()).add(setup.direction.lower())
dim_result = await db.execute(
select(DimensionScore).where(DimensionScore.ticker_id.in_(ticker_ids))
)
dims_by_ticker: dict[int, dict[str, float]] = {}
for ds in dim_result.scalars().all():
dims_by_ticker.setdefault(ds.ticker_id, {})[ds.dimension] = float(ds.score)
comp_result = await db.execute(
select(CompositeScore)
.where(CompositeScore.ticker_id.in_(ticker_ids))
.order_by(CompositeScore.ticker_id, CompositeScore.computed_at.desc())
)
composites: dict[int, CompositeScore] = {}
for comp in comp_result.scalars().all():
composites.setdefault(comp.ticker_id, comp)
sent_result = await db.execute(
select(SentimentScore)
.where(SentimentScore.ticker_id.in_(ticker_ids))
.order_by(SentimentScore.ticker_id, SentimentScore.timestamp.desc())
)
sentiments: dict[int, SentimentScore] = {}
for sent in sent_result.scalars().all():
sentiments.setdefault(sent.ticker_id, sent)
config = await get_recommendation_config(db)
live_rows: list[dict] = []
for row in rows:
setup = setups_by_id.get(row["id"])
if setup is None:
live_rows.append(row)
continue
ticker_id = setup.ticker_id
live_row = dict(row)
comp = composites.get(ticker_id)
if comp is not None:
live_row["composite_score"] = float(comp.score)
dimension_scores = dims_by_ticker.get(ticker_id)
if dimension_scores:
sentiment = sentiments.get(ticker_id)
snapshot = build_recommendation_snapshot(
dimension_scores=dimension_scores,
sentiment_classification=sentiment.classification if sentiment else None,
config=config,
available_directions=directions_by_ticker.get(ticker_id),
)
direction = setup.direction.lower()
confidence_key = "long_confidence" if direction == "long" else "short_confidence"
live_row["confidence_score"] = round(float(snapshot[confidence_key]), 2)
live_row["recommended_action"] = snapshot["action"]
live_row["reasoning"] = snapshot["reasoning"]
live_row["risk_level"] = snapshot["risk_level"]
live_rows.append(live_row)
return live_rows
def _json_default(value):
if isinstance(value, (datetime, date)):
return value.isoformat()
return str(value)
async def _create_signal_context_snapshots(
db: AsyncSession,
setups: list[TradeSetup],
*,
strategy_version: str = STRATEGY_VERSION,
) -> None:
"""Capture point-in-time discretionary context for freshly generated setups.
The scanner stores the setup itself first so each snapshot can be keyed by
``trade_setup_id``. This is intentionally forward-only: old sentiment,
fundamentals and composite scores are not reconstructed from today's data.
"""
if not setups:
return
ticker_ids = {s.ticker_id for s in setups}
dims: dict[int, dict[str, dict]] = {}
dim_rows = (
await db.execute(select(DimensionScore).where(DimensionScore.ticker_id.in_(ticker_ids)))
).scalars().all()
for row in dim_rows:
dims.setdefault(row.ticker_id, {})[row.dimension] = {
"score": float(row.score),
"is_stale": bool(row.is_stale),
"computed_at": row.computed_at,
}
composites: dict[int, CompositeScore] = {}
comp_rows = (
await db.execute(
select(CompositeScore)
.where(CompositeScore.ticker_id.in_(ticker_ids))
.order_by(CompositeScore.ticker_id, CompositeScore.computed_at.desc())
)
).scalars().all()
for row in comp_rows:
composites.setdefault(row.ticker_id, row)
sentiments: dict[int, SentimentScore] = {}
sent_rows = (
await db.execute(
select(SentimentScore)
.where(SentimentScore.ticker_id.in_(ticker_ids))
.order_by(SentimentScore.ticker_id, SentimentScore.timestamp.desc())
)
).scalars().all()
for row in sent_rows:
sentiments.setdefault(row.ticker_id, row)
fundamentals: dict[int, FundamentalData] = {}
fund_rows = (
await db.execute(
select(FundamentalData)
.where(FundamentalData.ticker_id.in_(ticker_ids))
.order_by(FundamentalData.ticker_id, FundamentalData.fetched_at.desc())
)
).scalars().all()
for row in fund_rows:
fundamentals.setdefault(row.ticker_id, row)
now = datetime.now(timezone.utc)
for setup in setups:
comp = composites.get(setup.ticker_id)
sent = sentiments.get(setup.ticker_id)
fund = fundamentals.get(setup.ticker_id)
score_context = {
"composite_score": float(comp.score) if comp else float(setup.composite_score),
"composite_is_stale": bool(comp.is_stale) if comp else None,
"composite_computed_at": comp.computed_at if comp else None,
"dimensions": dims.get(setup.ticker_id, {}),
}
sentiment_context = (
{
"classification": sent.classification,
"confidence": int(sent.confidence),
"recommendation": sent.recommendation,
"timestamp": sent.timestamp,
"source": sent.source,
}
if sent
else {}
)
fundamental_context = (
{
"pe_ratio": fund.pe_ratio,
"revenue_growth": fund.revenue_growth,
"earnings_surprise": fund.earnings_surprise,
"market_cap": fund.market_cap,
"next_earnings_date": fund.next_earnings_date,
"fetched_at": fund.fetched_at,
}
if fund
else {}
)
db.add(
SignalContextSnapshot(
trade_setup_id=setup.id,
ticker_id=setup.ticker_id,
detected_at=setup.detected_at,
created_at=now,
strategy_version=strategy_version,
direction=setup.direction,
entry_price=float(setup.entry_price),
stop_loss=float(setup.stop_loss),
target=float(setup.target),
rr_ratio=float(setup.rr_ratio),
confidence_score=(
float(setup.confidence_score) if setup.confidence_score is not None else None
),
recommended_action=setup.recommended_action,
risk_level=setup.risk_level,
momentum_percentile=(
float(setup.momentum_percentile)
if setup.momentum_percentile is not None
else None
),
score_context_json=json.dumps(score_context, default=_json_default),
sentiment_context_json=json.dumps(sentiment_context, default=_json_default),
fundamental_context_json=json.dumps(fundamental_context, default=_json_default),
)
)
async def scan_ticker(
db: AsyncSession,
symbol: str,
rr_threshold: float = 1.5,
atr_multiplier: float = 1.5,
momentum_percentile: float | None = None,
) -> list[TradeSetup]:
"""Scan a single ticker for trade setups meeting the R:R threshold.
``momentum_percentile`` is the ticker's residual 12-1 momentum activation
rank across the universe (computed by the caller), stored on each setup so
the activation gate can select the top slice."""
ticker = await _get_ticker(db, symbol)
records = await query_ohlcv(db, symbol)
if not records or len(records) < 15:
logger.info(
"Skipping %s: insufficient OHLCV data (%d bars, need 15+)",
symbol, len(records),
)
return []
_, highs, lows, closes, _ = _extract_ohlcv(records)
entry_price = closes[-1]
try:
atr_result = compute_atr(highs, lows, closes)
atr_value = atr_result["atr"]
except Exception:
logger.info("Skipping %s: cannot compute ATR", symbol)
return []
if atr_value <= 0:
logger.info("Skipping %s: ATR is zero or negative", symbol)
return []
sr_result = await db.execute(
select(SRLevel).where(SRLevel.ticker_id == ticker.id)
)
sr_levels = list(sr_result.scalars().all())
if not sr_levels:
logger.info("Skipping %s: no SR levels available", symbol)
return []
levels_above = sorted(
[lv for lv in sr_levels if lv.price_level > entry_price],
key=lambda lv: lv.price_level,
)
levels_below = sorted(
[lv for lv in sr_levels if lv.price_level < entry_price],
key=lambda lv: lv.price_level,
reverse=True,
)
comp_result = await db.execute(
select(CompositeScore).where(CompositeScore.ticker_id == ticker.id)
)
comp = comp_result.scalar_one_or_none()
composite_score = comp.score if comp else 0.0
dimension_scores = await _get_dimension_scores(db, ticker.id)
sentiment_classification = await _get_latest_sentiment(db, ticker.id)
now = datetime.now(timezone.utc)
setups: list[TradeSetup] = []
if levels_above:
stop = entry_price - (atr_value * atr_multiplier)
risk = entry_price - stop
if risk > 0:
best_quality = 0.0
best_candidate_rr = 0.0
best_candidate_target = 0.0
for lv in levels_above:
reward = lv.price_level - entry_price
if reward <= 0:
continue
rr = reward / risk
if rr < rr_threshold:
continue
distance = lv.price_level - entry_price
quality = _compute_quality_score(rr, lv.strength, distance, entry_price)
if quality > best_quality:
best_quality = quality
best_candidate_rr = rr
best_candidate_target = lv.price_level
if best_candidate_rr > 0:
setups.append(TradeSetup(
ticker_id=ticker.id,
direction="long",
entry_price=round(entry_price, 4),
stop_loss=round(stop, 4),
target=round(best_candidate_target, 4),
rr_ratio=round(best_candidate_rr, 4),
composite_score=round(composite_score, 4),
detected_at=now,
momentum_percentile=momentum_percentile,
))
if levels_below:
stop = entry_price + (atr_value * atr_multiplier)
risk = stop - entry_price
if risk > 0:
best_quality = 0.0
best_candidate_rr = 0.0
best_candidate_target = 0.0
for lv in levels_below:
reward = entry_price - lv.price_level
if reward <= 0:
continue
rr = reward / risk
if rr < rr_threshold:
continue
distance = entry_price - lv.price_level
quality = _compute_quality_score(rr, lv.strength, distance, entry_price)
if quality > best_quality:
best_quality = quality
best_candidate_rr = rr
best_candidate_target = lv.price_level
if best_candidate_rr > 0:
setups.append(TradeSetup(
ticker_id=ticker.id,
direction="short",
entry_price=round(entry_price, 4),
stop_loss=round(stop, 4),
target=round(best_candidate_target, 4),
rr_ratio=round(best_candidate_rr, 4),
composite_score=round(composite_score, 4),
detected_at=now,
momentum_percentile=momentum_percentile,
))
available_directions = {s.direction for s in setups}
enhanced_setups: list[TradeSetup] = []
for setup in setups:
try:
enhanced = await enhance_trade_setup(
db=db,
ticker=ticker,
setup=setup,
dimension_scores=dimension_scores,
sr_levels=sr_levels,
sentiment_classification=sentiment_classification,
atr_value=atr_value,
available_directions=available_directions,
)
enhanced_setups.append(enhanced)
except Exception:
logger.exception("Error enhancing setup for %s (%s)", ticker.symbol, setup.direction)
enhanced_setups.append(setup)
for setup in enhanced_setups:
db.add(setup)
await db.commit()
for s in enhanced_setups:
await db.refresh(s)
await _create_signal_context_snapshots(db, enhanced_setups)
await db.commit()
return enhanced_setups
async def scan_all_tickers(
db: AsyncSession,
rr_threshold: float = 1.5,
atr_multiplier: float = 1.5,
progress_callback: Callable[[int, int, str], None] | None = None,
) -> list[TradeSetup]:
"""Scan all tracked tickers for trade setups.
``progress_callback(processed, total, current_symbol)`` is invoked as each
ticker is scanned so callers (e.g. the scheduler) can surface live progress.
"""
result = await db.execute(select(Ticker).order_by(Ticker.symbol))
tickers = list(result.scalars().all())
total = len(tickers)
# Rank the universe by residual 12-1 momentum up front so each new setup
# carries its activation percentile. Best-effort; the ranker falls back to
# raw 12-1 momentum only if benchmark data is unavailable.
try:
from app.services import momentum_service
percentiles = await momentum_service.compute_momentum_percentiles(db)
except Exception:
logger.exception("Momentum ranking refresh failed")
percentiles = {}
all_setups: list[TradeSetup] = []
for index, ticker in enumerate(tickers):
if progress_callback is not None:
progress_callback(index, total, ticker.symbol)
try:
# Refresh scores first so the scheduled scan works off current data.
# Nothing else marks scores stale, so without this they'd never
# update for tickers the user doesn't manually fetch.
try:
from app.services import scoring_service
await scoring_service.compute_all_dimensions(db, ticker.symbol)
await scoring_service.compute_composite_score(db, ticker.symbol)
await db.commit()
except Exception:
logger.exception("Error refreshing scores for %s", ticker.symbol)
setups = await scan_ticker(
db, ticker.symbol, rr_threshold, atr_multiplier,
momentum_percentile=percentiles.get(ticker.symbol),
)
all_setups.extend(setups)
except Exception:
logger.exception("Error scanning ticker %s", ticker.symbol)
if progress_callback is not None and total:
progress_callback(total, total, "")
return all_setups
async def get_trade_setups(
db: AsyncSession,
direction: str | None = None,
min_confidence: float | None = None,
recommended_action: str | None = None,
symbol: str | None = None,
live_recommendation: bool = False,
recompute_scores: bool = False,
) -> list[dict]:
"""Get latest stored trade setups, optionally filtered."""
stmt = (
select(TradeSetup, Ticker.symbol)
.join(Ticker, TradeSetup.ticker_id == Ticker.id)
)
if direction is not None:
stmt = stmt.where(TradeSetup.direction == direction.lower())
if symbol is not None:
stmt = stmt.where(Ticker.symbol == symbol.strip().upper())
# With live_recommendation these fields are overlaid with current values
# below, so filtering happens there instead of against the stored columns.
if min_confidence is not None and not live_recommendation:
stmt = stmt.where(TradeSetup.confidence_score >= min_confidence)
if recommended_action is not None and not live_recommendation:
stmt = stmt.where(TradeSetup.recommended_action == recommended_action)
stmt = stmt.order_by(TradeSetup.detected_at.desc(), TradeSetup.id.desc())
result = await db.execute(stmt)
rows = result.all()
latest_by_key: dict[tuple[str, str], tuple[TradeSetup, str]] = {}
for setup, ticker_symbol in rows:
dedupe_key = (ticker_symbol, setup.direction)
if dedupe_key not in latest_by_key:
latest_by_key[dedupe_key] = (setup, ticker_symbol)
latest_rows = list(latest_by_key.values())
latest_rows.sort(
key=lambda row: (
row[0].confidence_score if row[0].confidence_score is not None else -1.0,
row[0].rr_ratio,
row[0].composite_score,
),
reverse=True,
)
if recompute_scores:
await _refresh_score_context_for_symbols(
db, {ticker_symbol for _, ticker_symbol in latest_rows}
)
prices = await _latest_closes(db, {s.ticker_id for s, _ in latest_rows})
rows_out = [
_trade_setup_to_dict(setup, ticker_symbol, prices.get(setup.ticker_id))
for setup, ticker_symbol in latest_rows
]
if live_recommendation:
rows_out = await _apply_live_recommendation_context(db, latest_rows, rows_out)
if min_confidence is not None:
rows_out = [
row for row in rows_out
if row["confidence_score"] is not None
and row["confidence_score"] >= min_confidence
]
if recommended_action is not None:
rows_out = [
row for row in rows_out
if row["recommended_action"] == recommended_action
]
rows_out.sort(
key=lambda row: (
row["confidence_score"] if row["confidence_score"] is not None else -1.0,
row["rr_ratio"],
row["composite_score"],
),
reverse=True,
)
return rows_out
async def _latest_closes(db: AsyncSession, ticker_ids: set[int]) -> dict[int, float]:
"""Most recent close per ticker — used to judge a setup's current relevance."""
if not ticker_ids:
return {}
latest = (
select(OHLCVRecord.ticker_id, func.max(OHLCVRecord.date).label("md"))
.where(OHLCVRecord.ticker_id.in_(ticker_ids))
.group_by(OHLCVRecord.ticker_id)
.subquery()
)
stmt = select(OHLCVRecord.ticker_id, OHLCVRecord.close).join(
latest,
and_(
OHLCVRecord.ticker_id == latest.c.ticker_id,
OHLCVRecord.date == latest.c.md,
),
)
result = await db.execute(stmt)
return {tid: float(close) for tid, close in result.all()}
async def get_trade_setup_history(
db: AsyncSession,
symbol: str,
) -> list[dict]:
"""Get full recommendation history for a symbol (newest first)."""
stmt = (
select(TradeSetup, Ticker.symbol)
.join(Ticker, TradeSetup.ticker_id == Ticker.id)
.where(Ticker.symbol == symbol.strip().upper())
.order_by(TradeSetup.detected_at.desc(), TradeSetup.id.desc())
)
result = await db.execute(stmt)
rows = result.all()
prices = await _latest_closes(db, {s.ticker_id for s, _ in rows})
return [
_trade_setup_to_dict(setup, ticker_symbol, prices.get(setup.ticker_id))
for setup, ticker_symbol in rows
]
def _trade_setup_to_dict(setup: TradeSetup, symbol: str, current_price: float | None = None) -> dict:
targets: list[dict] = []
conflicts: list[str] = []
if setup.targets_json:
try:
parsed_targets = json.loads(setup.targets_json)
if isinstance(parsed_targets, list):
targets = parsed_targets
except (TypeError, ValueError):
targets = []
if setup.conflict_flags_json:
try:
parsed_conflicts = json.loads(setup.conflict_flags_json)
if isinstance(parsed_conflicts, list):
conflicts = [str(item) for item in parsed_conflicts]
except (TypeError, ValueError):
conflicts = []
return {
"id": setup.id,
"symbol": symbol,
"direction": setup.direction,
"entry_price": setup.entry_price,
"stop_loss": setup.stop_loss,
"target": setup.target,
"rr_ratio": setup.rr_ratio,
"composite_score": setup.composite_score,
"detected_at": setup.detected_at,
"confidence_score": setup.confidence_score,
"targets": targets,
"conflict_flags": conflicts,
"recommended_action": setup.recommended_action,
"reasoning": setup.reasoning,
"risk_level": setup.risk_level,
"actual_outcome": setup.actual_outcome,
"outcome_date": setup.outcome_date,
"evaluated_at": setup.evaluated_at,
"current_price": current_price,
"momentum_percentile": setup.momentum_percentile,
}