"""Historical backtest (Phase 1): replay the price-derived engine over stored OHLCV and measure how the CURRENT config would have performed. For each ticker we step through history (weekly), and at each as-of date D we rebuild the setup using only bars ≤ D (no lookahead), then walk the actual bars after D to record the realized outcome. Two reports come out: - realized hit-rate / expectancy of qualified setups (and of all setups) - a probability calibration curve: do "60% likely" targets hit ~60% of the time? Limitation: sentiment and fundamentals have no point-in-time history, so they're held neutral here — this calibrates the price/S-R/probability machinery only. """ from __future__ import annotations import json import logging from collections.abc import Callable from datetime import datetime, timezone from types import SimpleNamespace from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.settings import SystemSetting from app.models.ticker import Ticker from app.services.admin_service import get_activation_config, update_setting from app.services.indicator_service import _extract_ohlcv, compute_atr from app.services.outcome_service import ( OUTCOME_AMBIGUOUS, OUTCOME_STOP_HIT, OUTCOME_TARGET_HIT, Bar, evaluate_setup_against_bars, ) from app.services.price_service import query_ohlcv from app.services.qualification import best_target_probability, setup_qualifies from app.services.recommendation_service import ( _choose_recommended_action, _classify_by_probability, _risk_level_from_conflicts, _select_primary_target, _zone_representative_levels, direction_analyzer, get_recommendation_config, probability_estimator, signal_conflict_detector, target_generator, ) from app.services.scoring_service import ( compute_momentum_from_closes, compute_technical_from_arrays, ) from app.services.sr_service import detect_sr_levels logger = logging.getLogger(__name__) KEY_REPORT = "backtest_report" STEP_DAYS = 5 # weekly cadence (≈ 5 trading days) MIN_LOOKBACK = 60 # bars needed before D for indicators (EMA cross needs 51) HORIZON = 30 # trading days to resolve an outcome (matches the evaluator) ATR_MULTIPLIER = 1.5 _CAL_BUCKETS = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100.01)] def _wrap_levels(level_dicts: list[dict]) -> list[Any]: return [ SimpleNamespace( id=i, price_level=float(d["price_level"]), type=d["type"], strength=int(d["strength"]), ) for i, d in enumerate(level_dicts) ] def _window_setups( window_records: list, config: dict, activation: dict, ) -> list[dict]: """Rebuild the setup(s) at the last bar of ``window_records`` (the as-of date), using only those bars. Returns one dict per tradeable direction.""" if len(window_records) < MIN_LOOKBACK: return [] _, highs, lows, closes, volumes = _extract_ohlcv(window_records) entry = closes[-1] if entry <= 0: return [] try: atr = compute_atr(highs, lows, closes)["atr"] except Exception: return [] if atr <= 0: return [] sr_levels = _wrap_levels(detect_sr_levels(highs, lows, closes, volumes)) if not sr_levels: return [] technical = (compute_technical_from_arrays(highs, lows, closes, volumes)[0]) or 50.0 momentum = (compute_momentum_from_closes(closes)[0]) or 50.0 dim_scores = {"technical": technical, "momentum": momentum} conflicts = signal_conflict_detector.detect_conflicts(dim_scores, None, config) confidences = { "long": direction_analyzer.calculate_confidence("long", dim_scores, None, conflicts), "short": direction_analyzer.calculate_confidence("short", dim_scores, None, conflicts), } # First pass: build targets per direction per_dir: dict[str, dict] = {} for direction in ("long", "short"): stop = entry - atr * ATR_MULTIPLIER if direction == "long" else entry + atr * ATR_MULTIPLIER zone_levels = _zone_representative_levels(sr_levels, entry) targets = target_generator.generate_targets(direction, entry, stop, zone_levels, atr) if not targets: continue for t in targets: t["probability"] = probability_estimator.estimate_probability( t, dim_scores, None, direction, config ) t["classification"] = _classify_by_probability(t["probability"]) primary = _select_primary_target(targets) if primary is None: continue per_dir[direction] = {"stop": stop, "targets": targets, "primary": primary} available = set(per_dir.keys()) if not available: return [] action = _choose_recommended_action(confidences["long"], confidences["short"], config, available) out: list[dict] = [] for direction, data in per_dir.items(): targets, primary, stop = data["targets"], data["primary"], data["stop"] setup_conflicts = list(conflicts) if len(targets) < 3: setup_conflicts.append("target-availability: Fewer than 3 valid S/R targets available") risk_level = _risk_level_from_conflicts(setup_conflicts) rr = float(primary["rr_ratio"]) target_price = float(primary["price"]) setup_ns = SimpleNamespace( rr_ratio=rr, confidence_score=confidences[direction], recommended_action=action, risk_level=risk_level, targets=targets, direction=direction, target=target_price, stop_loss=stop, entry_price=entry, ) out.append({ "direction": direction, "entry": entry, "stop": stop, "target": target_price, "rr": rr, "confidence": confidences[direction], "primary_prob": float(primary["probability"]), "best_prob": best_target_probability(setup_ns), "action": action, "risk_level": risk_level, "qualified": setup_qualifies(setup_ns, activation), }) return out def _replay_ticker(symbol: str, records: list, config: dict, activation: dict) -> list[dict]: """Walk one ticker's history weekly, building setups and their realized outcomes.""" candidates: list[dict] = [] n = len(records) if n < MIN_LOOKBACK + HORIZON: return candidates for i in range(MIN_LOOKBACK - 1, n - HORIZON, STEP_DAYS): window = records[: i + 1] forward = records[i + 1 :] forward_bars = [Bar(date=r.date, high=r.high, low=r.low) for r in forward] for s in _window_setups(window, config, activation): outcome, _ = evaluate_setup_against_bars( s["direction"], s["stop"], s["target"], forward_bars, HORIZON ) if outcome is None: continue target_hit = outcome == OUTCOME_TARGET_HIT if outcome == OUTCOME_TARGET_HIT: realized_r = s["rr"] elif outcome in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS): realized_r = -1.0 else: # expired realized_r = 0.0 candidates.append({ "symbol": symbol, "date": records[i].date.isoformat(), "direction": s["direction"], "rr": s["rr"], "confidence": s["confidence"], "primary_prob": s["primary_prob"], "qualified": s["qualified"], "outcome": outcome, "target_hit": target_hit, "realized_r": realized_r, }) return candidates def _bucket_stats(cands: list[dict]) -> dict: wins = sum(1 for c in cands if c["target_hit"]) losses = sum(1 for c in cands if c["outcome"] in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS)) expired = sum(1 for c in cands if c["outcome"] not in (OUTCOME_TARGET_HIT, OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS)) decided = wins + losses rs = [c["realized_r"] for c in cands] return { "total": len(cands), "wins": wins, "losses": losses, "expired": expired, "hit_rate": round(wins / decided * 100, 1) if decided else None, "avg_r": round(sum(rs) / len(rs), 3) if rs else None, "total_r": round(sum(rs), 2) if rs else None, } def _calibration(cands: list[dict]) -> list[dict]: """Predicted target probability vs realized hit rate, per probability bucket.""" rows: list[dict] = [] for lo, hi in _CAL_BUCKETS: bucket = [c for c in cands if lo <= c["primary_prob"] < hi] if not bucket: continue hits = sum(1 for c in bucket if c["target_hit"]) rows.append({ "bucket": f"{int(lo)}-{int(min(hi, 100))}%", "n": len(bucket), "predicted_avg": round(sum(c["primary_prob"] for c in bucket) / len(bucket), 1), "realized_hit_rate": round(hits / len(bucket) * 100, 1), }) return rows async def run_backtest( db: AsyncSession, progress_cb: Callable[[int, int, str], None] | None = None, ) -> dict: """Replay every ticker and aggregate the Phase-1 reports for the current config.""" config = await get_recommendation_config(db) activation = await get_activation_config(db) result = await db.execute(select(Ticker).order_by(Ticker.symbol)) tickers = list(result.scalars().all()) total = len(tickers) candidates: list[dict] = [] for index, ticker in enumerate(tickers): if progress_cb is not None: progress_cb(index, total, ticker.symbol) try: records = await query_ohlcv(db, ticker.symbol) candidates.extend(_replay_ticker(ticker.symbol, records, config, activation)) except Exception: logger.exception("Backtest replay failed for %s", ticker.symbol) if progress_cb is not None and total: progress_cb(total, total, "") qualified = [c for c in candidates if c["qualified"]] longs = [c for c in qualified if c["direction"] == "long"] shorts = [c for c in qualified if c["direction"] == "short"] return { "generated_at": datetime.now(timezone.utc).isoformat(), "tickers": total, "candidates": len(candidates), "qualified": len(qualified), "params": {"step_days": STEP_DAYS, "horizon_days": HORIZON, "min_lookback": MIN_LOOKBACK}, "activation": activation, "overall_qualified": _bucket_stats(qualified), "overall_all": _bucket_stats(candidates), "by_direction": { "long": _bucket_stats(longs), "short": _bucket_stats(shorts), }, "calibration": _calibration(candidates), "note": ( "Sentiment & fundamentals held neutral (no point-in-time history). " "~6 months ≈ one market regime — treat as directional, not gospel." ), } async def run_and_store( db: AsyncSession, progress_cb: Callable[[int, int, str], None] | None = None, ) -> dict: """Run the backtest and cache the report in a SystemSetting. Job entrypoint.""" report = await run_backtest(db, progress_cb) await update_setting(db, KEY_REPORT, json.dumps(report)) return report async def get_backtest_report(db: AsyncSession) -> dict | None: """Return the last cached backtest report, or None if never run.""" result = await db.execute(select(SystemSetting).where(SystemSetting.key == KEY_REPORT)) setting = result.scalar_one_or_none() if setting is None: return None try: return json.loads(setting.value) except (TypeError, ValueError): return None