Files
signal-platform/app/services/rr_scanner_service.py
T
dennisthiessen 80b4113280
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 1m1s
Deploy / deploy (push) Successful in 33s
feat: add strategy variant lab and signal context snapshots
Backtest report now includes research-only hold-to-horizon portfolio variants comparing raw vs residual 12-1 momentum, cutoff 80 vs 90, max 10 vs 15 positions, and SPY-200 risk scaling. A dynamic research recommendation panel flags residual momentum, cutoff 90, or regime scaling only when transparent promotion rules pass.

Adds signal_context_snapshots with migration 016 and captures one point-in-time context row per newly generated TradeSetup: setup fields, composite/dimensions, latest sentiment, latest fundamentals, and strategy_version=momentum_12_1_rr_time_v1. This is forward-only; no historical sentiment/fundamental backfill is attempted.

No live gate, paper-trade exit, or production ranking behavior changes.

Verification: 458 backend tests pass, ruff check app/ clean, frontend npm run build clean.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-02 16:25:04 +02:00

570 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""R:R Scanner service.
Scans tracked tickers for asymmetric risk-reward trade setups.
Long: target = nearest SR above, stop = entry - ATR × multiplier.
Short: target = nearest SR below, stop = entry + ATR × multiplier.
Filters by configurable R:R threshold (default 1.5).
"""
from __future__ import annotations
import json
import logging
from collections.abc import Callable
from datetime import date, datetime, timezone
from sqlalchemy import and_, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.exceptions import NotFoundError
from app.models.fundamental import FundamentalData
from app.models.ohlcv import OHLCVRecord
from app.models.score import CompositeScore, DimensionScore
from app.models.sentiment import SentimentScore
from app.models.signal_context_snapshot import SignalContextSnapshot
from app.models.sr_level import SRLevel
from app.models.ticker import Ticker
from app.models.trade_setup import TradeSetup
from app.services.indicator_service import _extract_ohlcv, compute_atr
from app.services.price_service import query_ohlcv
from app.services.recommendation_service import enhance_trade_setup
logger = logging.getLogger(__name__)
STRATEGY_VERSION = "momentum_12_1_rr_time_v1"
async def _get_ticker(db: AsyncSession, symbol: str) -> Ticker:
normalised = symbol.strip().upper()
result = await db.execute(select(Ticker).where(Ticker.symbol == normalised))
ticker = result.scalar_one_or_none()
if ticker is None:
raise NotFoundError(f"Ticker not found: {normalised}")
return ticker
def _compute_quality_score(
rr: float,
strength: int,
distance: float,
entry_price: float,
*,
w_rr: float = 0.35,
w_strength: float = 0.35,
w_proximity: float = 0.30,
rr_cap: float = 10.0,
) -> float:
"""Compute a quality score for a candidate S/R level."""
norm_rr = min(rr / rr_cap, 1.0)
norm_strength = strength / 100.0
norm_proximity = 1.0 - min(distance / entry_price, 1.0)
return w_rr * norm_rr + w_strength * norm_strength + w_proximity * norm_proximity
async def _get_dimension_scores(db: AsyncSession, ticker_id: int) -> dict[str, float]:
result = await db.execute(
select(DimensionScore).where(DimensionScore.ticker_id == ticker_id)
)
rows = result.scalars().all()
return {row.dimension: float(row.score) for row in rows}
async def _get_latest_sentiment(db: AsyncSession, ticker_id: int) -> str | None:
result = await db.execute(
select(SentimentScore)
.where(SentimentScore.ticker_id == ticker_id)
.order_by(SentimentScore.timestamp.desc())
.limit(1)
)
row = result.scalar_one_or_none()
return row.classification if row else None
def _json_default(value):
if isinstance(value, (datetime, date)):
return value.isoformat()
return str(value)
async def _create_signal_context_snapshots(
db: AsyncSession,
setups: list[TradeSetup],
*,
strategy_version: str = STRATEGY_VERSION,
) -> None:
"""Capture point-in-time discretionary context for freshly generated setups.
The scanner stores the setup itself first so each snapshot can be keyed by
``trade_setup_id``. This is intentionally forward-only: old sentiment,
fundamentals and composite scores are not reconstructed from today's data.
"""
if not setups:
return
ticker_ids = {s.ticker_id for s in setups}
dims: dict[int, dict[str, dict]] = {}
dim_rows = (
await db.execute(select(DimensionScore).where(DimensionScore.ticker_id.in_(ticker_ids)))
).scalars().all()
for row in dim_rows:
dims.setdefault(row.ticker_id, {})[row.dimension] = {
"score": float(row.score),
"is_stale": bool(row.is_stale),
"computed_at": row.computed_at,
}
composites: dict[int, CompositeScore] = {}
comp_rows = (
await db.execute(
select(CompositeScore)
.where(CompositeScore.ticker_id.in_(ticker_ids))
.order_by(CompositeScore.ticker_id, CompositeScore.computed_at.desc())
)
).scalars().all()
for row in comp_rows:
composites.setdefault(row.ticker_id, row)
sentiments: dict[int, SentimentScore] = {}
sent_rows = (
await db.execute(
select(SentimentScore)
.where(SentimentScore.ticker_id.in_(ticker_ids))
.order_by(SentimentScore.ticker_id, SentimentScore.timestamp.desc())
)
).scalars().all()
for row in sent_rows:
sentiments.setdefault(row.ticker_id, row)
fundamentals: dict[int, FundamentalData] = {}
fund_rows = (
await db.execute(
select(FundamentalData)
.where(FundamentalData.ticker_id.in_(ticker_ids))
.order_by(FundamentalData.ticker_id, FundamentalData.fetched_at.desc())
)
).scalars().all()
for row in fund_rows:
fundamentals.setdefault(row.ticker_id, row)
now = datetime.now(timezone.utc)
for setup in setups:
comp = composites.get(setup.ticker_id)
sent = sentiments.get(setup.ticker_id)
fund = fundamentals.get(setup.ticker_id)
score_context = {
"composite_score": float(comp.score) if comp else float(setup.composite_score),
"composite_is_stale": bool(comp.is_stale) if comp else None,
"composite_computed_at": comp.computed_at if comp else None,
"dimensions": dims.get(setup.ticker_id, {}),
}
sentiment_context = (
{
"classification": sent.classification,
"confidence": int(sent.confidence),
"recommendation": sent.recommendation,
"timestamp": sent.timestamp,
"source": sent.source,
}
if sent
else {}
)
fundamental_context = (
{
"pe_ratio": fund.pe_ratio,
"revenue_growth": fund.revenue_growth,
"earnings_surprise": fund.earnings_surprise,
"market_cap": fund.market_cap,
"next_earnings_date": fund.next_earnings_date,
"fetched_at": fund.fetched_at,
}
if fund
else {}
)
db.add(
SignalContextSnapshot(
trade_setup_id=setup.id,
ticker_id=setup.ticker_id,
detected_at=setup.detected_at,
created_at=now,
strategy_version=strategy_version,
direction=setup.direction,
entry_price=float(setup.entry_price),
stop_loss=float(setup.stop_loss),
target=float(setup.target),
rr_ratio=float(setup.rr_ratio),
confidence_score=(
float(setup.confidence_score) if setup.confidence_score is not None else None
),
recommended_action=setup.recommended_action,
risk_level=setup.risk_level,
momentum_percentile=(
float(setup.momentum_percentile)
if setup.momentum_percentile is not None
else None
),
score_context_json=json.dumps(score_context, default=_json_default),
sentiment_context_json=json.dumps(sentiment_context, default=_json_default),
fundamental_context_json=json.dumps(fundamental_context, default=_json_default),
)
)
async def scan_ticker(
db: AsyncSession,
symbol: str,
rr_threshold: float = 1.5,
atr_multiplier: float = 1.5,
momentum_percentile: float | None = None,
) -> list[TradeSetup]:
"""Scan a single ticker for trade setups meeting the R:R threshold.
``momentum_percentile`` is the ticker's 12-1 momentum rank across the universe
(computed by the caller), stored on each setup so the activation gate can
select the top slice."""
ticker = await _get_ticker(db, symbol)
records = await query_ohlcv(db, symbol)
if not records or len(records) < 15:
logger.info(
"Skipping %s: insufficient OHLCV data (%d bars, need 15+)",
symbol, len(records),
)
return []
_, highs, lows, closes, _ = _extract_ohlcv(records)
entry_price = closes[-1]
try:
atr_result = compute_atr(highs, lows, closes)
atr_value = atr_result["atr"]
except Exception:
logger.info("Skipping %s: cannot compute ATR", symbol)
return []
if atr_value <= 0:
logger.info("Skipping %s: ATR is zero or negative", symbol)
return []
sr_result = await db.execute(
select(SRLevel).where(SRLevel.ticker_id == ticker.id)
)
sr_levels = list(sr_result.scalars().all())
if not sr_levels:
logger.info("Skipping %s: no SR levels available", symbol)
return []
levels_above = sorted(
[lv for lv in sr_levels if lv.price_level > entry_price],
key=lambda lv: lv.price_level,
)
levels_below = sorted(
[lv for lv in sr_levels if lv.price_level < entry_price],
key=lambda lv: lv.price_level,
reverse=True,
)
comp_result = await db.execute(
select(CompositeScore).where(CompositeScore.ticker_id == ticker.id)
)
comp = comp_result.scalar_one_or_none()
composite_score = comp.score if comp else 0.0
dimension_scores = await _get_dimension_scores(db, ticker.id)
sentiment_classification = await _get_latest_sentiment(db, ticker.id)
now = datetime.now(timezone.utc)
setups: list[TradeSetup] = []
if levels_above:
stop = entry_price - (atr_value * atr_multiplier)
risk = entry_price - stop
if risk > 0:
best_quality = 0.0
best_candidate_rr = 0.0
best_candidate_target = 0.0
for lv in levels_above:
reward = lv.price_level - entry_price
if reward <= 0:
continue
rr = reward / risk
if rr < rr_threshold:
continue
distance = lv.price_level - entry_price
quality = _compute_quality_score(rr, lv.strength, distance, entry_price)
if quality > best_quality:
best_quality = quality
best_candidate_rr = rr
best_candidate_target = lv.price_level
if best_candidate_rr > 0:
setups.append(TradeSetup(
ticker_id=ticker.id,
direction="long",
entry_price=round(entry_price, 4),
stop_loss=round(stop, 4),
target=round(best_candidate_target, 4),
rr_ratio=round(best_candidate_rr, 4),
composite_score=round(composite_score, 4),
detected_at=now,
momentum_percentile=momentum_percentile,
))
if levels_below:
stop = entry_price + (atr_value * atr_multiplier)
risk = stop - entry_price
if risk > 0:
best_quality = 0.0
best_candidate_rr = 0.0
best_candidate_target = 0.0
for lv in levels_below:
reward = entry_price - lv.price_level
if reward <= 0:
continue
rr = reward / risk
if rr < rr_threshold:
continue
distance = entry_price - lv.price_level
quality = _compute_quality_score(rr, lv.strength, distance, entry_price)
if quality > best_quality:
best_quality = quality
best_candidate_rr = rr
best_candidate_target = lv.price_level
if best_candidate_rr > 0:
setups.append(TradeSetup(
ticker_id=ticker.id,
direction="short",
entry_price=round(entry_price, 4),
stop_loss=round(stop, 4),
target=round(best_candidate_target, 4),
rr_ratio=round(best_candidate_rr, 4),
composite_score=round(composite_score, 4),
detected_at=now,
momentum_percentile=momentum_percentile,
))
available_directions = {s.direction for s in setups}
enhanced_setups: list[TradeSetup] = []
for setup in setups:
try:
enhanced = await enhance_trade_setup(
db=db,
ticker=ticker,
setup=setup,
dimension_scores=dimension_scores,
sr_levels=sr_levels,
sentiment_classification=sentiment_classification,
atr_value=atr_value,
available_directions=available_directions,
)
enhanced_setups.append(enhanced)
except Exception:
logger.exception("Error enhancing setup for %s (%s)", ticker.symbol, setup.direction)
enhanced_setups.append(setup)
for setup in enhanced_setups:
db.add(setup)
await db.commit()
for s in enhanced_setups:
await db.refresh(s)
await _create_signal_context_snapshots(db, enhanced_setups)
await db.commit()
return enhanced_setups
async def scan_all_tickers(
db: AsyncSession,
rr_threshold: float = 1.5,
atr_multiplier: float = 1.5,
progress_callback: Callable[[int, int, str], None] | None = None,
) -> list[TradeSetup]:
"""Scan all tracked tickers for trade setups.
``progress_callback(processed, total, current_symbol)`` is invoked as each
ticker is scanned so callers (e.g. the scheduler) can surface live progress.
"""
result = await db.execute(select(Ticker).order_by(Ticker.symbol))
tickers = list(result.scalars().all())
total = len(tickers)
# Rank the universe by 12-1 momentum up front so each new setup carries its
# ticker's percentile (used by the activation gate). Best-effort.
try:
from app.services import momentum_service
percentiles = await momentum_service.compute_momentum_percentiles(db)
except Exception:
logger.exception("Momentum ranking refresh failed")
percentiles = {}
all_setups: list[TradeSetup] = []
for index, ticker in enumerate(tickers):
if progress_callback is not None:
progress_callback(index, total, ticker.symbol)
try:
# Refresh scores first so the scheduled scan works off current data.
# Nothing else marks scores stale, so without this they'd never
# update for tickers the user doesn't manually fetch.
try:
from app.services import scoring_service
await scoring_service.compute_all_dimensions(db, ticker.symbol)
await scoring_service.compute_composite_score(db, ticker.symbol)
await db.commit()
except Exception:
logger.exception("Error refreshing scores for %s", ticker.symbol)
setups = await scan_ticker(
db, ticker.symbol, rr_threshold, atr_multiplier,
momentum_percentile=percentiles.get(ticker.symbol),
)
all_setups.extend(setups)
except Exception:
logger.exception("Error scanning ticker %s", ticker.symbol)
if progress_callback is not None and total:
progress_callback(total, total, "")
return all_setups
async def get_trade_setups(
db: AsyncSession,
direction: str | None = None,
min_confidence: float | None = None,
recommended_action: str | None = None,
symbol: str | None = None,
) -> list[dict]:
"""Get latest stored trade setups, optionally filtered."""
stmt = (
select(TradeSetup, Ticker.symbol)
.join(Ticker, TradeSetup.ticker_id == Ticker.id)
)
if direction is not None:
stmt = stmt.where(TradeSetup.direction == direction.lower())
if symbol is not None:
stmt = stmt.where(Ticker.symbol == symbol.strip().upper())
if min_confidence is not None:
stmt = stmt.where(TradeSetup.confidence_score >= min_confidence)
if recommended_action is not None:
stmt = stmt.where(TradeSetup.recommended_action == recommended_action)
stmt = stmt.order_by(TradeSetup.detected_at.desc(), TradeSetup.id.desc())
result = await db.execute(stmt)
rows = result.all()
latest_by_key: dict[tuple[str, str], tuple[TradeSetup, str]] = {}
for setup, ticker_symbol in rows:
dedupe_key = (ticker_symbol, setup.direction)
if dedupe_key not in latest_by_key:
latest_by_key[dedupe_key] = (setup, ticker_symbol)
latest_rows = list(latest_by_key.values())
latest_rows.sort(
key=lambda row: (
row[0].confidence_score if row[0].confidence_score is not None else -1.0,
row[0].rr_ratio,
row[0].composite_score,
),
reverse=True,
)
prices = await _latest_closes(db, {s.ticker_id for s, _ in latest_rows})
return [
_trade_setup_to_dict(setup, ticker_symbol, prices.get(setup.ticker_id))
for setup, ticker_symbol in latest_rows
]
async def _latest_closes(db: AsyncSession, ticker_ids: set[int]) -> dict[int, float]:
"""Most recent close per ticker — used to judge a setup's current relevance."""
if not ticker_ids:
return {}
latest = (
select(OHLCVRecord.ticker_id, func.max(OHLCVRecord.date).label("md"))
.where(OHLCVRecord.ticker_id.in_(ticker_ids))
.group_by(OHLCVRecord.ticker_id)
.subquery()
)
stmt = select(OHLCVRecord.ticker_id, OHLCVRecord.close).join(
latest,
and_(
OHLCVRecord.ticker_id == latest.c.ticker_id,
OHLCVRecord.date == latest.c.md,
),
)
result = await db.execute(stmt)
return {tid: float(close) for tid, close in result.all()}
async def get_trade_setup_history(
db: AsyncSession,
symbol: str,
) -> list[dict]:
"""Get full recommendation history for a symbol (newest first)."""
stmt = (
select(TradeSetup, Ticker.symbol)
.join(Ticker, TradeSetup.ticker_id == Ticker.id)
.where(Ticker.symbol == symbol.strip().upper())
.order_by(TradeSetup.detected_at.desc(), TradeSetup.id.desc())
)
result = await db.execute(stmt)
rows = result.all()
prices = await _latest_closes(db, {s.ticker_id for s, _ in rows})
return [
_trade_setup_to_dict(setup, ticker_symbol, prices.get(setup.ticker_id))
for setup, ticker_symbol in rows
]
def _trade_setup_to_dict(setup: TradeSetup, symbol: str, current_price: float | None = None) -> dict:
targets: list[dict] = []
conflicts: list[str] = []
if setup.targets_json:
try:
parsed_targets = json.loads(setup.targets_json)
if isinstance(parsed_targets, list):
targets = parsed_targets
except (TypeError, ValueError):
targets = []
if setup.conflict_flags_json:
try:
parsed_conflicts = json.loads(setup.conflict_flags_json)
if isinstance(parsed_conflicts, list):
conflicts = [str(item) for item in parsed_conflicts]
except (TypeError, ValueError):
conflicts = []
return {
"id": setup.id,
"symbol": symbol,
"direction": setup.direction,
"entry_price": setup.entry_price,
"stop_loss": setup.stop_loss,
"target": setup.target,
"rr_ratio": setup.rr_ratio,
"composite_score": setup.composite_score,
"detected_at": setup.detected_at,
"confidence_score": setup.confidence_score,
"targets": targets,
"conflict_flags": conflicts,
"recommended_action": setup.recommended_action,
"reasoning": setup.reasoning,
"risk_level": setup.risk_level,
"actual_outcome": setup.actual_outcome,
"outcome_date": setup.outcome_date,
"evaluated_at": setup.evaluated_at,
"current_price": current_price,
"momentum_percentile": setup.momentum_percentile,
}