"""R:R Scanner service. Scans tracked tickers for asymmetric risk-reward trade setups. Long: target = nearest SR above, stop = entry - ATR × multiplier. Short: target = nearest SR below, stop = entry + ATR × multiplier. Filters by configurable R:R threshold (default 1.5). """ from __future__ import annotations import json import logging from collections.abc import Callable from datetime import date, datetime, timezone from sqlalchemy import and_, func, select from sqlalchemy.ext.asyncio import AsyncSession from app.exceptions import NotFoundError from app.models.fundamental import FundamentalData from app.models.ohlcv import OHLCVRecord from app.models.score import CompositeScore, DimensionScore from app.models.sentiment import SentimentScore from app.models.signal_context_snapshot import SignalContextSnapshot from app.models.sr_level import SRLevel from app.models.ticker import Ticker from app.models.trade_setup import TradeSetup from app.services.indicator_service import _extract_ohlcv, compute_atr from app.services.price_service import query_ohlcv from app.services.recommendation_service import enhance_trade_setup logger = logging.getLogger(__name__) STRATEGY_VERSION = "residual_momentum_12_1_rr_time_v2" async def _get_ticker(db: AsyncSession, symbol: str) -> Ticker: normalised = symbol.strip().upper() result = await db.execute(select(Ticker).where(Ticker.symbol == normalised)) ticker = result.scalar_one_or_none() if ticker is None: raise NotFoundError(f"Ticker not found: {normalised}") return ticker def _compute_quality_score( rr: float, strength: int, distance: float, entry_price: float, *, w_rr: float = 0.35, w_strength: float = 0.35, w_proximity: float = 0.30, rr_cap: float = 10.0, ) -> float: """Compute a quality score for a candidate S/R level.""" norm_rr = min(rr / rr_cap, 1.0) norm_strength = strength / 100.0 norm_proximity = 1.0 - min(distance / entry_price, 1.0) return w_rr * norm_rr + w_strength * norm_strength + w_proximity * norm_proximity async def _get_dimension_scores(db: AsyncSession, ticker_id: int) -> dict[str, float]: result = await db.execute( select(DimensionScore).where(DimensionScore.ticker_id == ticker_id) ) rows = result.scalars().all() return {row.dimension: float(row.score) for row in rows} async def _get_latest_sentiment(db: AsyncSession, ticker_id: int) -> str | None: result = await db.execute( select(SentimentScore) .where(SentimentScore.ticker_id == ticker_id) .order_by(SentimentScore.timestamp.desc()) .limit(1) ) row = result.scalar_one_or_none() return row.classification if row else None def _json_default(value): if isinstance(value, (datetime, date)): return value.isoformat() return str(value) async def _create_signal_context_snapshots( db: AsyncSession, setups: list[TradeSetup], *, strategy_version: str = STRATEGY_VERSION, ) -> None: """Capture point-in-time discretionary context for freshly generated setups. The scanner stores the setup itself first so each snapshot can be keyed by ``trade_setup_id``. This is intentionally forward-only: old sentiment, fundamentals and composite scores are not reconstructed from today's data. """ if not setups: return ticker_ids = {s.ticker_id for s in setups} dims: dict[int, dict[str, dict]] = {} dim_rows = ( await db.execute(select(DimensionScore).where(DimensionScore.ticker_id.in_(ticker_ids))) ).scalars().all() for row in dim_rows: dims.setdefault(row.ticker_id, {})[row.dimension] = { "score": float(row.score), "is_stale": bool(row.is_stale), "computed_at": row.computed_at, } composites: dict[int, CompositeScore] = {} comp_rows = ( await db.execute( select(CompositeScore) .where(CompositeScore.ticker_id.in_(ticker_ids)) .order_by(CompositeScore.ticker_id, CompositeScore.computed_at.desc()) ) ).scalars().all() for row in comp_rows: composites.setdefault(row.ticker_id, row) sentiments: dict[int, SentimentScore] = {} sent_rows = ( await db.execute( select(SentimentScore) .where(SentimentScore.ticker_id.in_(ticker_ids)) .order_by(SentimentScore.ticker_id, SentimentScore.timestamp.desc()) ) ).scalars().all() for row in sent_rows: sentiments.setdefault(row.ticker_id, row) fundamentals: dict[int, FundamentalData] = {} fund_rows = ( await db.execute( select(FundamentalData) .where(FundamentalData.ticker_id.in_(ticker_ids)) .order_by(FundamentalData.ticker_id, FundamentalData.fetched_at.desc()) ) ).scalars().all() for row in fund_rows: fundamentals.setdefault(row.ticker_id, row) now = datetime.now(timezone.utc) for setup in setups: comp = composites.get(setup.ticker_id) sent = sentiments.get(setup.ticker_id) fund = fundamentals.get(setup.ticker_id) score_context = { "composite_score": float(comp.score) if comp else float(setup.composite_score), "composite_is_stale": bool(comp.is_stale) if comp else None, "composite_computed_at": comp.computed_at if comp else None, "dimensions": dims.get(setup.ticker_id, {}), } sentiment_context = ( { "classification": sent.classification, "confidence": int(sent.confidence), "recommendation": sent.recommendation, "timestamp": sent.timestamp, "source": sent.source, } if sent else {} ) fundamental_context = ( { "pe_ratio": fund.pe_ratio, "revenue_growth": fund.revenue_growth, "earnings_surprise": fund.earnings_surprise, "market_cap": fund.market_cap, "next_earnings_date": fund.next_earnings_date, "fetched_at": fund.fetched_at, } if fund else {} ) db.add( SignalContextSnapshot( trade_setup_id=setup.id, ticker_id=setup.ticker_id, detected_at=setup.detected_at, created_at=now, strategy_version=strategy_version, direction=setup.direction, entry_price=float(setup.entry_price), stop_loss=float(setup.stop_loss), target=float(setup.target), rr_ratio=float(setup.rr_ratio), confidence_score=( float(setup.confidence_score) if setup.confidence_score is not None else None ), recommended_action=setup.recommended_action, risk_level=setup.risk_level, momentum_percentile=( float(setup.momentum_percentile) if setup.momentum_percentile is not None else None ), score_context_json=json.dumps(score_context, default=_json_default), sentiment_context_json=json.dumps(sentiment_context, default=_json_default), fundamental_context_json=json.dumps(fundamental_context, default=_json_default), ) ) async def scan_ticker( db: AsyncSession, symbol: str, rr_threshold: float = 1.5, atr_multiplier: float = 1.5, momentum_percentile: float | None = None, ) -> list[TradeSetup]: """Scan a single ticker for trade setups meeting the R:R threshold. ``momentum_percentile`` is the ticker's residual 12-1 momentum activation rank across the universe (computed by the caller), stored on each setup so the activation gate can select the top slice.""" ticker = await _get_ticker(db, symbol) records = await query_ohlcv(db, symbol) if not records or len(records) < 15: logger.info( "Skipping %s: insufficient OHLCV data (%d bars, need 15+)", symbol, len(records), ) return [] _, highs, lows, closes, _ = _extract_ohlcv(records) entry_price = closes[-1] try: atr_result = compute_atr(highs, lows, closes) atr_value = atr_result["atr"] except Exception: logger.info("Skipping %s: cannot compute ATR", symbol) return [] if atr_value <= 0: logger.info("Skipping %s: ATR is zero or negative", symbol) return [] sr_result = await db.execute( select(SRLevel).where(SRLevel.ticker_id == ticker.id) ) sr_levels = list(sr_result.scalars().all()) if not sr_levels: logger.info("Skipping %s: no SR levels available", symbol) return [] levels_above = sorted( [lv for lv in sr_levels if lv.price_level > entry_price], key=lambda lv: lv.price_level, ) levels_below = sorted( [lv for lv in sr_levels if lv.price_level < entry_price], key=lambda lv: lv.price_level, reverse=True, ) comp_result = await db.execute( select(CompositeScore).where(CompositeScore.ticker_id == ticker.id) ) comp = comp_result.scalar_one_or_none() composite_score = comp.score if comp else 0.0 dimension_scores = await _get_dimension_scores(db, ticker.id) sentiment_classification = await _get_latest_sentiment(db, ticker.id) now = datetime.now(timezone.utc) setups: list[TradeSetup] = [] if levels_above: stop = entry_price - (atr_value * atr_multiplier) risk = entry_price - stop if risk > 0: best_quality = 0.0 best_candidate_rr = 0.0 best_candidate_target = 0.0 for lv in levels_above: reward = lv.price_level - entry_price if reward <= 0: continue rr = reward / risk if rr < rr_threshold: continue distance = lv.price_level - entry_price quality = _compute_quality_score(rr, lv.strength, distance, entry_price) if quality > best_quality: best_quality = quality best_candidate_rr = rr best_candidate_target = lv.price_level if best_candidate_rr > 0: setups.append(TradeSetup( ticker_id=ticker.id, direction="long", entry_price=round(entry_price, 4), stop_loss=round(stop, 4), target=round(best_candidate_target, 4), rr_ratio=round(best_candidate_rr, 4), composite_score=round(composite_score, 4), detected_at=now, momentum_percentile=momentum_percentile, )) if levels_below: stop = entry_price + (atr_value * atr_multiplier) risk = stop - entry_price if risk > 0: best_quality = 0.0 best_candidate_rr = 0.0 best_candidate_target = 0.0 for lv in levels_below: reward = entry_price - lv.price_level if reward <= 0: continue rr = reward / risk if rr < rr_threshold: continue distance = entry_price - lv.price_level quality = _compute_quality_score(rr, lv.strength, distance, entry_price) if quality > best_quality: best_quality = quality best_candidate_rr = rr best_candidate_target = lv.price_level if best_candidate_rr > 0: setups.append(TradeSetup( ticker_id=ticker.id, direction="short", entry_price=round(entry_price, 4), stop_loss=round(stop, 4), target=round(best_candidate_target, 4), rr_ratio=round(best_candidate_rr, 4), composite_score=round(composite_score, 4), detected_at=now, momentum_percentile=momentum_percentile, )) available_directions = {s.direction for s in setups} enhanced_setups: list[TradeSetup] = [] for setup in setups: try: enhanced = await enhance_trade_setup( db=db, ticker=ticker, setup=setup, dimension_scores=dimension_scores, sr_levels=sr_levels, sentiment_classification=sentiment_classification, atr_value=atr_value, available_directions=available_directions, ) enhanced_setups.append(enhanced) except Exception: logger.exception("Error enhancing setup for %s (%s)", ticker.symbol, setup.direction) enhanced_setups.append(setup) for setup in enhanced_setups: db.add(setup) await db.commit() for s in enhanced_setups: await db.refresh(s) await _create_signal_context_snapshots(db, enhanced_setups) await db.commit() return enhanced_setups async def scan_all_tickers( db: AsyncSession, rr_threshold: float = 1.5, atr_multiplier: float = 1.5, progress_callback: Callable[[int, int, str], None] | None = None, ) -> list[TradeSetup]: """Scan all tracked tickers for trade setups. ``progress_callback(processed, total, current_symbol)`` is invoked as each ticker is scanned so callers (e.g. the scheduler) can surface live progress. """ result = await db.execute(select(Ticker).order_by(Ticker.symbol)) tickers = list(result.scalars().all()) total = len(tickers) # Rank the universe by residual 12-1 momentum up front so each new setup # carries its activation percentile. Best-effort; the ranker falls back to # raw 12-1 momentum only if benchmark data is unavailable. try: from app.services import momentum_service percentiles = await momentum_service.compute_momentum_percentiles(db) except Exception: logger.exception("Momentum ranking refresh failed") percentiles = {} all_setups: list[TradeSetup] = [] for index, ticker in enumerate(tickers): if progress_callback is not None: progress_callback(index, total, ticker.symbol) try: # Refresh scores first so the scheduled scan works off current data. # Nothing else marks scores stale, so without this they'd never # update for tickers the user doesn't manually fetch. try: from app.services import scoring_service await scoring_service.compute_all_dimensions(db, ticker.symbol) await scoring_service.compute_composite_score(db, ticker.symbol) await db.commit() except Exception: logger.exception("Error refreshing scores for %s", ticker.symbol) setups = await scan_ticker( db, ticker.symbol, rr_threshold, atr_multiplier, momentum_percentile=percentiles.get(ticker.symbol), ) all_setups.extend(setups) except Exception: logger.exception("Error scanning ticker %s", ticker.symbol) if progress_callback is not None and total: progress_callback(total, total, "") return all_setups async def get_trade_setups( db: AsyncSession, direction: str | None = None, min_confidence: float | None = None, recommended_action: str | None = None, symbol: str | None = None, ) -> list[dict]: """Get latest stored trade setups, optionally filtered.""" stmt = ( select(TradeSetup, Ticker.symbol) .join(Ticker, TradeSetup.ticker_id == Ticker.id) ) if direction is not None: stmt = stmt.where(TradeSetup.direction == direction.lower()) if symbol is not None: stmt = stmt.where(Ticker.symbol == symbol.strip().upper()) if min_confidence is not None: stmt = stmt.where(TradeSetup.confidence_score >= min_confidence) if recommended_action is not None: stmt = stmt.where(TradeSetup.recommended_action == recommended_action) stmt = stmt.order_by(TradeSetup.detected_at.desc(), TradeSetup.id.desc()) result = await db.execute(stmt) rows = result.all() latest_by_key: dict[tuple[str, str], tuple[TradeSetup, str]] = {} for setup, ticker_symbol in rows: dedupe_key = (ticker_symbol, setup.direction) if dedupe_key not in latest_by_key: latest_by_key[dedupe_key] = (setup, ticker_symbol) latest_rows = list(latest_by_key.values()) latest_rows.sort( key=lambda row: ( row[0].confidence_score if row[0].confidence_score is not None else -1.0, row[0].rr_ratio, row[0].composite_score, ), reverse=True, ) prices = await _latest_closes(db, {s.ticker_id for s, _ in latest_rows}) return [ _trade_setup_to_dict(setup, ticker_symbol, prices.get(setup.ticker_id)) for setup, ticker_symbol in latest_rows ] async def _latest_closes(db: AsyncSession, ticker_ids: set[int]) -> dict[int, float]: """Most recent close per ticker — used to judge a setup's current relevance.""" if not ticker_ids: return {} latest = ( select(OHLCVRecord.ticker_id, func.max(OHLCVRecord.date).label("md")) .where(OHLCVRecord.ticker_id.in_(ticker_ids)) .group_by(OHLCVRecord.ticker_id) .subquery() ) stmt = select(OHLCVRecord.ticker_id, OHLCVRecord.close).join( latest, and_( OHLCVRecord.ticker_id == latest.c.ticker_id, OHLCVRecord.date == latest.c.md, ), ) result = await db.execute(stmt) return {tid: float(close) for tid, close in result.all()} async def get_trade_setup_history( db: AsyncSession, symbol: str, ) -> list[dict]: """Get full recommendation history for a symbol (newest first).""" stmt = ( select(TradeSetup, Ticker.symbol) .join(Ticker, TradeSetup.ticker_id == Ticker.id) .where(Ticker.symbol == symbol.strip().upper()) .order_by(TradeSetup.detected_at.desc(), TradeSetup.id.desc()) ) result = await db.execute(stmt) rows = result.all() prices = await _latest_closes(db, {s.ticker_id for s, _ in rows}) return [ _trade_setup_to_dict(setup, ticker_symbol, prices.get(setup.ticker_id)) for setup, ticker_symbol in rows ] def _trade_setup_to_dict(setup: TradeSetup, symbol: str, current_price: float | None = None) -> dict: targets: list[dict] = [] conflicts: list[str] = [] if setup.targets_json: try: parsed_targets = json.loads(setup.targets_json) if isinstance(parsed_targets, list): targets = parsed_targets except (TypeError, ValueError): targets = [] if setup.conflict_flags_json: try: parsed_conflicts = json.loads(setup.conflict_flags_json) if isinstance(parsed_conflicts, list): conflicts = [str(item) for item in parsed_conflicts] except (TypeError, ValueError): conflicts = [] return { "id": setup.id, "symbol": symbol, "direction": setup.direction, "entry_price": setup.entry_price, "stop_loss": setup.stop_loss, "target": setup.target, "rr_ratio": setup.rr_ratio, "composite_score": setup.composite_score, "detected_at": setup.detected_at, "confidence_score": setup.confidence_score, "targets": targets, "conflict_flags": conflicts, "recommended_action": setup.recommended_action, "reasoning": setup.reasoning, "risk_level": setup.risk_level, "actual_outcome": setup.actual_outcome, "outcome_date": setup.outcome_date, "evaluated_at": setup.evaluated_at, "current_price": current_price, "momentum_percentile": setup.momentum_percentile, }