"""Trade setup outcome evaluation service. Closes the feedback loop on R:R scanner setups: walks daily OHLCV bars after detection and records whether the stop or the target was hit first. Outcome semantics (entry is the close at detection time, i.e. market entry): - target_hit: target reached before the stop - stop_hit: stop reached before the target - ambiguous: stop AND target both within the same daily bar — with daily granularity the order is unknowable, counted as a loss in stats - expired: neither level hit within ``max_bars`` trading days - (NULL): not enough bars yet to decide — re-evaluated on the next run """ from __future__ import annotations import logging from dataclasses import dataclass from datetime import date, datetime, timezone from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.ohlcv import OHLCVRecord from app.models.trade_setup import TradeSetup from app.services.qualification import setup_qualifies logger = logging.getLogger(__name__) OUTCOME_TARGET_HIT = "target_hit" OUTCOME_STOP_HIT = "stop_hit" OUTCOME_AMBIGUOUS = "ambiguous" OUTCOME_EXPIRED = "expired" DEFAULT_MAX_BARS = 30 # Confidence buckets for the performance breakdown _CONFIDENCE_BUCKETS = [ ("<50%", 0.0, 50.0), ("50-70%", 50.0, 70.0), ("≥70%", 70.0, 100.01), ] @dataclass(frozen=True) class Bar: date: date high: float low: float def evaluate_setup_against_bars( direction: str, stop_loss: float, target: float, bars: list[Bar], max_bars: int = DEFAULT_MAX_BARS, ) -> tuple[str | None, date | None]: """Determine a setup's outcome from daily bars strictly after detection. Returns (outcome, outcome_date); (None, None) while still undecided. """ for i, bar in enumerate(bars): if i >= max_bars: break if direction == "long": stop_hit = bar.low <= stop_loss target_hit = bar.high >= target else: stop_hit = bar.high >= stop_loss target_hit = bar.low <= target if stop_hit and target_hit: return OUTCOME_AMBIGUOUS, bar.date if stop_hit: return OUTCOME_STOP_HIT, bar.date if target_hit: return OUTCOME_TARGET_HIT, bar.date if len(bars) >= max_bars: return OUTCOME_EXPIRED, bars[max_bars - 1].date return None, None async def evaluate_pending_setups( db: AsyncSession, max_bars: int = DEFAULT_MAX_BARS, ) -> dict[str, int]: """Evaluate all unevaluated trade setups against stored OHLCV data. Bars are fetched once per ticker. Setups that cannot be decided yet remain NULL and are picked up on the next run. """ result = await db.execute( select(TradeSetup).where(TradeSetup.actual_outcome.is_(None)) ) pending = list(result.scalars().all()) summary = {"evaluated": 0, "still_pending": 0, "by_outcome": {}} if not pending: return summary by_ticker: dict[int, list[TradeSetup]] = {} for setup in pending: by_ticker.setdefault(setup.ticker_id, []).append(setup) now = datetime.now(timezone.utc) for ticker_id, setups in by_ticker.items(): earliest = min(s.detected_at for s in setups).date() bars_result = await db.execute( select(OHLCVRecord) .where( OHLCVRecord.ticker_id == ticker_id, OHLCVRecord.date > earliest, ) .order_by(OHLCVRecord.date.asc()) ) records = list(bars_result.scalars().all()) all_bars = [Bar(date=r.date, high=r.high, low=r.low) for r in records] for setup in setups: detected_date = setup.detected_at.date() bars = [b for b in all_bars if b.date > detected_date] outcome, outcome_date = evaluate_setup_against_bars( setup.direction, setup.stop_loss, setup.target, bars, max_bars ) if outcome is None: summary["still_pending"] += 1 continue setup.actual_outcome = outcome setup.outcome_date = outcome_date setup.evaluated_at = now summary["evaluated"] += 1 summary["by_outcome"][outcome] = summary["by_outcome"].get(outcome, 0) + 1 await db.commit() return summary def _realized_r(setup: TradeSetup) -> float | None: """Realized result in R-multiples: win = +rr_ratio, loss = -1R, expired = 0R.""" if setup.actual_outcome == OUTCOME_TARGET_HIT: return setup.rr_ratio if setup.actual_outcome in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS): return -1.0 if setup.actual_outcome == OUTCOME_EXPIRED: return 0.0 return None def _bucket_stats(setups: list[TradeSetup]) -> dict: wins = sum(1 for s in setups if s.actual_outcome == OUTCOME_TARGET_HIT) losses = sum( 1 for s in setups if s.actual_outcome in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS) ) expired = sum(1 for s in setups if s.actual_outcome == OUTCOME_EXPIRED) decided = wins + losses realized = [r for s in setups if (r := _realized_r(s)) is not None] return { "total": len(setups), "wins": wins, "losses": losses, "expired": expired, "hit_rate": round(wins / decided * 100, 1) if decided else None, "avg_r": round(sum(realized) / len(realized), 3) if realized else None, "total_r": round(sum(realized), 2) if realized else None, } def _confidence_bucket(score: float | None) -> str | None: if score is None: return None for label, lo, hi in _CONFIDENCE_BUCKETS: if lo <= score < hi: return label return None async def get_performance_stats( db: AsyncSession, config: dict | None = None, ) -> dict: """Aggregate outcome statistics over all evaluated trade setups. avg_r is the expectancy per trade in R-multiples (win = +rr_ratio, loss = -1R, expired = 0R). A positive avg_r means the signals have been profitable on a risk-adjusted basis. When ``config`` (an activation-gate dict) is supplied, the overall, direction and action breakdowns cover only qualified setups. The confidence breakdown deliberately stays unfiltered: it is the instrument for validating the gate itself. """ result = await db.execute( select(TradeSetup).where(TradeSetup.actual_outcome.is_not(None)) ) evaluated = list(result.scalars().all()) pending_result = await db.execute( select(TradeSetup.id).where(TradeSetup.actual_outcome.is_(None)) ) pending_count = len(pending_result.scalars().all()) if config is not None: qualified = [s for s in evaluated if setup_qualifies(s, config)] else: qualified = evaluated by_direction: dict[str, list[TradeSetup]] = {} by_action: dict[str, list[TradeSetup]] = {} by_confidence: dict[str, list[TradeSetup]] = {} for setup in qualified: by_direction.setdefault(setup.direction, []).append(setup) action = setup.recommended_action or "NONE" by_action.setdefault(action, []).append(setup) # Confidence buckets always cover the full evaluated population for setup in evaluated: bucket = _confidence_bucket(setup.confidence_score) if bucket is not None: by_confidence.setdefault(bucket, []).append(setup) bucket_order = [label for label, _, _ in _CONFIDENCE_BUCKETS] return { "overall": _bucket_stats(qualified), "pending": pending_count, "by_direction": {k: _bucket_stats(v) for k, v in sorted(by_direction.items())}, "by_action": {k: _bucket_stats(v) for k, v in sorted(by_action.items())}, "by_confidence": { label: _bucket_stats(by_confidence[label]) for label in bucket_order if label in by_confidence }, }