"""Historical backtest (Phase 1): replay the price-derived engine over stored OHLCV and measure how the CURRENT config would have performed. For each ticker we step through history (weekly), and at each as-of date D we rebuild the setup using only bars ≤ D (no lookahead), then walk the actual bars after D to record the realized outcome. The report contains: - hit-rate / expectancy of qualified setups vs the all-setups control group, gross and net of costs, with robustness stats (median, profit factor, expectancy without the top winners) - the momentum-percentile sweep and the gate ablation (each floor removed in turn, graded under both the target and the hold-to-horizon exit) - the time-exit sweep (hold N days with the initial stop) - cross-sectional factor rank-IC ("signal edge") - a capital-constrained portfolio simulation (equity curve → CAGR, drawdown, Sharpe, SPY comparison) - a data-driven recommendation derived from this report's numbers Limitation: sentiment and fundamentals have no point-in-time history, so they're held neutral here — this calibrates the price/S-R machinery only. """ from __future__ import annotations import asyncio import json import logging import math import multiprocessing import os import statistics from collections import defaultdict from collections.abc import Callable from concurrent.futures import ProcessPoolExecutor from datetime import date, datetime, timezone from types import SimpleNamespace from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.models.ticker import Ticker from app.services import settings_store from app.services.admin_service import get_activation_config, update_setting from app.services.indicator_service import _extract_ohlcv, compute_atr from app.services.outcome_service import ( OUTCOME_AMBIGUOUS, OUTCOME_STOP_HIT, OUTCOME_TARGET_HIT, Bar, evaluate_setup_against_bars, ) from app.services.price_service import query_ohlcv from app.services.qualification import ( HIGH_CONVICTION_ACTIONS, _action_direction, best_target_probability, setup_qualifies, ) from app.services.recommendation_service import ( _choose_recommended_action, _classify_by_probability, _risk_level_from_conflicts, _select_primary_target, _zone_representative_levels, direction_analyzer, get_recommendation_config, probability_estimator, signal_conflict_detector, target_generator, ) from app.services.scoring_service import ( compute_momentum_from_closes, compute_technical_from_arrays, ) from app.services.sr_service import detect_sr_levels logger = logging.getLogger(__name__) KEY_REPORT = "backtest_report" STEP_DAYS = 5 # weekly cadence (≈ 5 trading days) MIN_LOOKBACK = 60 # bars needed before D for indicators (EMA cross needs 51) HORIZON = 30 # trading days to resolve an outcome (matches the evaluator) ATR_MULTIPLIER = 1.5 # Cross-sectional signal evaluation (factor IC). Each candidate signal is a # point-in-time number computed from closes alone (sentiment/fundamentals have no # history here), sampled one as-of per ISO week, and graded by how its rank # correlates with the forward HORIZON-day return ACROSS the universe — i.e. does # ranking stocks by this signal sort tomorrow's winners from losers. This is the # test the per-setup hit-rate report can't do: it measures predictive power of a # signal, not the outcome of a target/stop structure built on top of one. MIN_CROSS_SECTION = 20 # min tickers present in a week to score that week MIN_RELIABLE_PERIODS = 12 # min non-overlapping windows before a signal's IC is trusted PRODUCTION_PERCENTILE_KEY = "activation_momentum_percentile" RAW_PERCENTILE_KEY = "momentum_percentile" RESIDUAL_PERCENTILE_KEY = "residual_momentum_percentile" def _wrap_levels(level_dicts: list[dict]) -> list[Any]: return [ SimpleNamespace( id=i, price_level=float(d["price_level"]), type=d["type"], strength=int(d["strength"]), ) for i, d in enumerate(level_dicts) ] def _window_setups( window_records: list, config: dict, activation: dict, ) -> list[dict]: """Rebuild the setup(s) at the last bar of ``window_records`` (the as-of date), using only those bars. Returns one dict per tradeable direction.""" if len(window_records) < MIN_LOOKBACK: return [] _, highs, lows, closes, volumes = _extract_ohlcv(window_records) entry = closes[-1] if entry <= 0: return [] # 12-1 month momentum (skip the last month) — the universe ranks on this. # None until a year of history exists; such setups can't qualify on momentum. mom_12_1 = ( closes[-22] / closes[-253] - 1.0 if len(closes) >= 253 and closes[-253] > 0 else None ) try: atr = compute_atr(highs, lows, closes)["atr"] except Exception: return [] if atr <= 0: return [] sr_levels = _wrap_levels(detect_sr_levels(highs, lows, closes, volumes)) if not sr_levels: return [] technical = (compute_technical_from_arrays(highs, lows, closes, volumes)[0]) or 50.0 momentum = (compute_momentum_from_closes(closes)[0]) or 50.0 dim_scores = {"technical": technical, "momentum": momentum} conflicts = signal_conflict_detector.detect_conflicts(dim_scores, None, config) confidences = { "long": direction_analyzer.calculate_confidence("long", dim_scores, None, conflicts), "short": direction_analyzer.calculate_confidence("short", dim_scores, None, conflicts), } # First pass: build targets per direction per_dir: dict[str, dict] = {} for direction in ("long", "short"): stop = entry - atr * ATR_MULTIPLIER if direction == "long" else entry + atr * ATR_MULTIPLIER zone_levels = _zone_representative_levels(sr_levels, entry) targets = target_generator.generate_targets(direction, entry, stop, zone_levels, atr) if not targets: continue for t in targets: t["probability"] = probability_estimator.estimate_probability( t, dim_scores, None, direction, config ) t["classification"] = _classify_by_probability(t["probability"]) primary = _select_primary_target(targets) if primary is None: continue # Flag the primary so qualification's EV uses the primary target's # probability (matching production's enhance_trade_setup). for t in targets: t["is_primary"] = t is primary per_dir[direction] = {"stop": stop, "targets": targets, "primary": primary} available = set(per_dir.keys()) if not available: return [] action = _choose_recommended_action(confidences["long"], confidences["short"], config, available) out: list[dict] = [] for direction, data in per_dir.items(): targets, primary, stop = data["targets"], data["primary"], data["stop"] setup_conflicts = list(conflicts) if len(targets) < 3: setup_conflicts.append("target-availability: Fewer than 3 valid S/R targets available") risk_level = _risk_level_from_conflicts(setup_conflicts) rr = float(primary["rr_ratio"]) target_price = float(primary["price"]) setup_ns = SimpleNamespace( rr_ratio=rr, confidence_score=confidences[direction], recommended_action=action, risk_level=risk_level, targets=targets, direction=direction, target=target_price, stop_loss=stop, entry_price=entry, ) # meets_core = clears every gate EXCEPT the cross-sectional momentum # percentile, which can only be assigned once all tickers' setups for a # week are known. run_backtest ranks momentum and finalizes `qualified`. core_config = {**activation, "min_momentum_percentile": 0.0} meets_core = setup_qualifies(setup_ns, core_config) best_prob = best_target_probability(setup_ns) out.append({ "direction": direction, "entry": entry, "stop": stop, "target": target_price, "rr": rr, "confidence": confidences[direction], "primary_prob": float(primary["probability"]), "best_prob": best_prob, "momentum": mom_12_1, "meets_core": meets_core, "action": action, "risk_level": risk_level, }) return out def _stop_fill_r(direction: str, entry: float, stop: float, bar) -> float: """Realized R when the stop is hit on ``bar``: filled at the stop, or at the bar's open when price gapped through it — so a gap can lose more than −1R, matching real fills. Targets are never filled better than their level, so gap modeling only ever makes results more conservative.""" risk = abs(entry - stop) if risk <= 0 or entry <= 0: return -1.0 if direction == "long": fill = min(stop, bar.open) return (fill - entry) / risk fill = max(stop, bar.open) return (entry - fill) / risk def _risk_and_stop_day( direction: str, entry: float, stop: float, forward: list, horizon: int ) -> tuple[float, int | None]: """``(risk_pct, stop_day)`` from the bars after detection: the 1R stop distance as a fraction of entry, and the 1-based trading day the initial stop was first pierced within the horizon (None if never). Feeds the cost conversion and the time-exit hold accounting.""" long = direction == "long" risk_pct = abs(entry - stop) / entry if entry else 0.0 for i, r in enumerate(forward[:horizon]): if (r.low <= stop) if long else (r.high >= stop): return risk_pct, i + 1 return risk_pct, None def _time_exits( direction: str, entry: float, stop: float, forward: list, horizons ) -> dict[int, float]: """Realized R per hold-N-days exit, in one pass over the post-entry bars. The initial stop stays active (fill at the stop level → −1R); otherwise the trade exits at the day-N close (the last available close when history ends early). No target, no trailing — the classic momentum implementation: buy, hold ~N days, re-rank. Conservative bar logic: a bar that pierces the stop is a loss before that bar's close counts. """ long = direction == "long" risk = abs(entry - stop) / entry if entry else 0.0 if risk <= 0: return {int(n): 0.0 for n in horizons} bars = forward[: max(int(n) for n in horizons)] if not bars: return {int(n): 0.0 for n in horizons} stop_day: int | None = None # 1-based trading day the stop was pierced stop_r = -1.0 closes: list[float] = [] for i, r in enumerate(bars): if (r.low <= stop) if long else (r.high >= stop): stop_day = i + 1 stop_r = _stop_fill_r(direction, entry, stop, r) break closes.append(r.close) result: dict[int, float] = {} for h in horizons: n = int(h) if stop_day is not None and stop_day <= n: result[n] = stop_r else: # closes can't be empty here: an empty closes means the stop hit on # day 1, which the branch above catches for every n >= 1. c = closes[min(n, len(closes)) - 1] move = (c - entry) / entry if long else (entry - c) / entry result[n] = move / risk return result def _replay_ticker( symbol: str, records: list, config: dict, activation: dict, benchmark_closes: dict[date, float] | None = None, ) -> list[dict]: """Walk one ticker's history weekly, building setups and their realized outcomes.""" candidates: list[dict] = [] n = len(records) if n < MIN_LOOKBACK + HORIZON: return candidates for i in range(MIN_LOOKBACK - 1, n - HORIZON, STEP_DAYS): window = records[: i + 1] forward = records[i + 1 :] forward_bars = [Bar(date=r.date, high=r.high, low=r.low) for r in forward] closes = [float(r.close) for r in window] dates = [r.date for r in window] residual_momentum = _residual_momentum_12_1( dates, closes, len(window) - 1, benchmark_closes ) for s in _window_setups(window, config, activation): outcome, outcome_date = evaluate_setup_against_bars( s["direction"], s["stop"], s["target"], forward_bars, HORIZON ) if outcome is None: continue # Trading days from detection to resolution (expired = full horizon). hold_days = next( (idx + 1 for idx, r in enumerate(forward[:HORIZON]) if r.date == outcome_date), min(HORIZON, len(forward)), ) target_hit = outcome == OUTCOME_TARGET_HIT if outcome == OUTCOME_TARGET_HIT: realized_r = s["rr"] elif outcome in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS): # Fill at the stop, or at the open when the bar gapped through it. realized_r = _stop_fill_r( s["direction"], s["entry"], s["stop"], forward[hold_days - 1] ) else: # expired realized_r = 0.0 risk_pct, stop_day = _risk_and_stop_day( s["direction"], s["entry"], s["stop"], forward, HORIZON ) time_r = _time_exits( s["direction"], s["entry"], s["stop"], forward, TIME_EXIT_DAYS ) iso = records[i].date.isocalendar() candidates.append({ "symbol": symbol, "date": records[i].date.isoformat(), "iso_week": (iso[0], iso[1]), "direction": s["direction"], "entry": s["entry"], "stop": s["stop"], "target": s["target"], "rr": s["rr"], "confidence": s["confidence"], "primary_prob": s["primary_prob"], "best_prob": s["best_prob"], "momentum": s["momentum"], "residual_momentum": residual_momentum, "meets_core": s["meets_core"], # Gate fields the ablation recomputes floors from — without them # every candidate looks NEUTRAL and the ablation rows collapse. "action": s["action"], "risk_level": s["risk_level"], "outcome": outcome, "target_hit": target_hit, "realized_r": realized_r, "hold_days": hold_days, "stop_day": stop_day, "risk_pct": risk_pct, "time_r": time_r, }) return candidates def _bucket_stats(cands: list[dict]) -> dict: wins = sum(1 for c in cands if c["target_hit"]) losses = sum(1 for c in cands if c["outcome"] in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS)) expired = sum(1 for c in cands if c["outcome"] not in (OUTCOME_TARGET_HIT, OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS)) decided = wins + losses rs = [c["realized_r"] for c in cands] net_rs = [c["realized_r"] - _cost_r(c) for c in cands] holds = [c["hold_days"] for c in cands if c.get("hold_days")] avg_hold = sum(holds) / len(holds) if holds else None net_avg = sum(net_rs) / len(net_rs) if net_rs else None return { "total": len(cands), "wins": wins, "losses": losses, "expired": expired, "hit_rate": round(wins / decided * 100, 1) if decided else None, "avg_r": round(sum(rs) / len(rs), 3) if rs else None, "total_r": round(sum(rs), 2) if rs else None, "net_avg_r": round(net_avg, 3) if net_avg is not None else None, "net_total_r": round(sum(net_rs), 2) if net_rs else None, "best_r": round(max(rs), 2) if rs else None, "worst_r": round(min(rs), 2) if rs else None, "avg_hold_days": round(avg_hold, 1) if avg_hold is not None else None, # Capital efficiency: net expectancy per trading day the capital is tied up. "net_r_per_day": ( round(net_avg / avg_hold, 4) if net_avg is not None and avg_hold else None ), **_robustness_stats(net_rs), } def _robustness_stats(net_rs: list[float]) -> dict: """Distribution-shape stats: the median (typical) trade, gross wins vs losses, and the expectancy with the top 5% of winners removed — the direct test of whether the edge depends on a handful of outliers.""" if not net_rs: return {"median_net_r": None, "profit_factor": None, "net_avg_r_ex_top5": None} gains = sum(r for r in net_rs if r > 0) losses_abs = -sum(r for r in net_rs if r < 0) trimmed = sorted(net_rs, reverse=True)[math.ceil(len(net_rs) * 0.05):] return { "median_net_r": round(statistics.median(net_rs), 3), "profit_factor": round(gains / losses_abs, 2) if losses_abs > 0 else None, "net_avg_r_ex_top5": ( round(sum(trimmed) / len(trimmed), 3) if trimmed else None ), } # The fixed take-profit and trailing-stop sweeps were retired 2026-07: swept # TPs never found an interior optimum (momentum's edge lives in the right tail) # and wide trails converged to the hold-to-horizon exit, so the time-exit sweep # is the exit-decision surface. # Hold-N-days exits (initial stop stays active, exit at the day-N close) — the # classic cross-sectional momentum implementation: buy, hold ~a month, re-rank. TIME_EXIT_DAYS = (5, 10, 21, 30) # Assumed transaction cost per side as a fraction of notional (commission + # slippage). Aggregates report gross and net side by side; net subtracts a full # round trip, converted into R via the setup's stop distance (the 1R unit). COST_PER_SIDE = 0.001 def _cost_r(cand: dict) -> float: """Round-trip transaction cost in R units: two sides over the 1R stop distance. 0 when the candidate carries no usable risk_pct.""" risk = cand.get("risk_pct") or 0.0 return (2.0 * COST_PER_SIDE) / risk if risk > 0 else 0.0 def _time_exit_bucket(cands: list[dict], hold_days: int) -> dict: """Stats for the hold-``hold_days`` exit: initial stop active, otherwise out at the day-N close. Each candidate carries its realized R per hold length in ``time_r``; a "win" is an exit in profit (R > 0). The realized hold is the full N days unless the stop cut it short (``stop_day``).""" rows = [ ( c["time_r"][hold_days], _cost_r(c), min(hold_days, c.get("stop_day") or hold_days), ) for c in cands if c.get("time_r", {}).get(hold_days) is not None ] total = len(rows) rs = [r for r, _, _ in rows] net_rs = [r - cost for r, cost, _ in rows] holds = [h for _, _, h in rows] wins = sum(1 for r in rs if r > 0) avg_hold = sum(holds) / total if total else None net_avg = sum(net_rs) / total if total else None return { "hold_days": hold_days, "total": total, "wins": wins, "win_rate": round(wins / total * 100, 1) if total else None, "avg_r": round(sum(rs) / total, 3) if total else None, "total_r": round(sum(rs), 2) if total else None, "net_avg_r": round(net_avg, 3) if net_avg is not None else None, "net_total_r": round(sum(net_rs), 2) if total else None, "best_r": round(max(rs), 2) if rs else None, "worst_r": round(min(rs), 2) if rs else None, "avg_hold_days": round(avg_hold, 1) if avg_hold is not None else None, "net_r_per_day": ( round(net_avg / avg_hold, 4) if net_avg is not None and avg_hold else None ), **_robustness_stats(net_rs), } # --------------------------------------------------------------------------- # Cross-sectional signal evaluation (factor information-coefficient) # --------------------------------------------------------------------------- def _weekly_asof_indices(records: list) -> list[int]: """Index of the last bar in each ISO week — the weekly rebalance as-of bars. Keying on the calendar week (not the raw bar index) makes every ticker's as-of dates line up, so the cross-section on a given week is comparable. """ last_by_week: dict[tuple[int, int], int] = {} for idx, r in enumerate(records): iso = r.date.isocalendar() last_by_week[(iso[0], iso[1])] = idx return sorted(last_by_week.values()) def _residual_momentum_12_1( dates: list[date], closes: list[float], i: int, benchmark_closes: dict[date, float] | None, ) -> float | None: """12-1 momentum after removing the stock's linear benchmark exposure. This is a practical beta-adjusted residual momentum approximation: estimate beta from daily stock/benchmark returns over the same formation window (12 months ending 1 month ago), then rank on cumulative stock return minus beta * benchmark return. We deliberately do not subtract a fitted intercept: with an intercept estimated over the same window, the arithmetic residuals sum to ~zero by construction, which would destroy the signal. """ if not benchmark_closes or i - 252 < 0: return None stock_rets: list[float] = [] market_rets: list[float] = [] # Same daily intervals as mom_12_1: close[i-252] -> close[i-21]. for k in range(i - 251, i - 20): prev_close = closes[k - 1] bench_prev = benchmark_closes.get(dates[k - 1]) bench_cur = benchmark_closes.get(dates[k]) if prev_close <= 0 or bench_prev is None or bench_cur is None or bench_prev <= 0: continue stock_rets.append(closes[k] / prev_close - 1.0) market_rets.append(bench_cur / bench_prev - 1.0) if len(stock_rets) < 100: return None mean_market = sum(market_rets) / len(market_rets) mean_stock = sum(stock_rets) / len(stock_rets) var_market = sum((x - mean_market) ** 2 for x in market_rets) if var_market <= 0: return None cov = sum((stock_rets[k] - mean_stock) * (market_rets[k] - mean_market) for k in range(len(stock_rets))) beta = cov / var_market return sum(stock_rets[k] - beta * market_rets[k] for k in range(len(stock_rets))) def _signal_values( dates: list[date], closes: list[float], highs: list[float], i: int, benchmark_closes: dict[date, float] | None = None, ) -> dict[str, float]: """Point-in-time candidate signals at as-of index ``i`` (price-only). Momentum factors follow the standard "skip the last month" convention (return up to ~1 month ago) to avoid the short-term reversal effect, which ``reversal_1m`` isolates on purpose — we expect its IC to be negative if the universe mean-reverts. ``trend_200`` is price vs its 200-bar SMA. ``high_52w`` is closeness to the trailing 52-week high (George/Hwang anchoring effect: higher = nearer the high, expect positive IC). ``vol_6m`` is 126-day realized volatility (expect negative IC if the low-volatility anomaly holds). """ out: dict[str, float] = {} if i - 252 >= 0 and closes[i - 252] > 0: out["mom_12_1"] = closes[i - 21] / closes[i - 252] - 1.0 residual = _residual_momentum_12_1(dates, closes, i, benchmark_closes) if residual is not None: out["mom_12_1_resid"] = residual if i - 126 >= 0 and closes[i - 126] > 0: out["mom_6_1"] = closes[i - 21] / closes[i - 126] - 1.0 if i - 63 >= 0 and closes[i - 63] > 0: out["mom_3_1"] = closes[i - 21] / closes[i - 63] - 1.0 if i - 21 >= 0 and closes[i - 21] > 0: out["reversal_1m"] = closes[i] / closes[i - 21] - 1.0 if i - 199 >= 0: sma = sum(closes[i - 199 : i + 1]) / 200.0 if sma > 0: out["trend_200"] = closes[i] / sma - 1.0 if i - 251 >= 0: high_52w = max(highs[i - 251 : i + 1]) if high_52w > 0: out["high_52w"] = closes[i] / high_52w if i - 126 >= 0: rets = [ closes[k] / closes[k - 1] - 1.0 for k in range(i - 125, i + 1) if closes[k - 1] > 0 ] if len(rets) >= 2: mean = sum(rets) / len(rets) var = sum((x - mean) ** 2 for x in rets) / (len(rets) - 1) out["vol_6m"] = math.sqrt(var) return out def _accumulate_signal_series( records: list, collected: dict, benchmark_closes: dict[date, float] | None = None, ) -> None: """For each weekly as-of bar, emit (signal, forward-return) pairs keyed by ISO week into ``collected[name][week_key]``. Forward return is close-to-close over HORIZON trading days. Mutates ``collected`` (a dict of dict of list).""" n = len(records) if n < HORIZON + 21: return closes = [float(r.close) for r in records] highs = [float(r.high) for r in records] dates = [r.date for r in records] for i in _weekly_asof_indices(records): j = i + HORIZON if j >= n or closes[i] <= 0: continue fwd = closes[j] / closes[i] - 1.0 iso = records[i].date.isocalendar() week_key = (iso[0], iso[1]) for name, val in _signal_values(dates, closes, highs, i, benchmark_closes).items(): collected[name][week_key].append((val, fwd)) def _rank(xs: list[float]) -> list[float]: """Average (tie-corrected) ranks, 1-based.""" order = sorted(range(len(xs)), key=lambda k: xs[k]) ranks = [0.0] * len(xs) i = 0 while i < len(xs): j = i while j + 1 < len(xs) and xs[order[j + 1]] == xs[order[i]]: j += 1 avg_rank = (i + j) / 2.0 + 1.0 for k in range(i, j + 1): ranks[order[k]] = avg_rank i = j + 1 return ranks def _pearson(a: list[float], b: list[float]) -> float | None: n = len(a) if n < 3: return None ma, mb = sum(a) / n, sum(b) / n va = sum((x - ma) ** 2 for x in a) vb = sum((y - mb) ** 2 for y in b) if va <= 0 or vb <= 0: return None cov = sum((a[k] - ma) * (b[k] - mb) for k in range(n)) return cov / math.sqrt(va * vb) def _spearman(xs: list[float], ys: list[float]) -> float | None: """Rank correlation = Pearson on the ranks. None if too few/degenerate.""" if len(xs) < 3: return None return _pearson(_rank(xs), _rank(ys)) def _quintile_spread(pairs: list[tuple[float, float]]) -> float | None: """Mean forward return of the top signal-quintile minus the bottom quintile.""" n = len(pairs) if n < 10: return None ordered = sorted(pairs, key=lambda p: p[0]) k = n // 5 top = ordered[-k:] bottom = ordered[:k] return sum(p[1] for p in top) / k - sum(p[1] for p in bottom) / k def _week_ordinal(week_key: tuple[int, int]) -> int: """Monotonic absolute week number from an (ISO year, ISO week) key.""" year, week = week_key return year * 53 + week def _nonoverlapping_weeks( week_keys: list[tuple[int, int]], stride: int ) -> list[tuple[int, int]]: """Thin to weeks at least ``stride`` apart so their forward windows don't overlap — greedy earliest-first. Removes the autocorrelation that would otherwise inflate the IC t-stat across adjacent weekly rebalances.""" kept: list[tuple[int, int]] = [] last: int | None = None for wk in sorted(week_keys, key=_week_ordinal): o = _week_ordinal(wk) if last is None or o - last >= stride: kept.append(wk) last = o return kept def _signal_evaluation(collected: dict) -> list[dict]: """Per-signal factor diagnostics, one row per candidate signal: mean_ic average rank-IC (Spearman of signal vs fwd ret) ic_t_stat mean_ic / stderr — is the IC reliably non-zero? ic_positive_pct share of windows the IC is positive (consistency) mean_quintile_spread avg top-minus-bottom-quintile forward return reliable True once there are >= MIN_RELIABLE_PERIODS windows IC is measured on NON-OVERLAPPING forward windows (weeks thinned to ~HORIZON apart) so the t-stat isn't inflated by autocorrelation. A signal with no edge lands near IC 0 / spread 0; one with too few independent windows is flagged unreliable rather than trusted on a lucky handful. """ stride = max(1, round(HORIZON / 5)) # ISO weeks spanned by the forward window rows: list[dict] = [] for name in sorted(collected): weeks_map = collected[name] usable = [wk for wk, recs in weeks_map.items() if len(recs) >= MIN_CROSS_SECTION] kept = _nonoverlapping_weeks(usable, stride) ics: list[float] = [] spreads: list[float] = [] sizes: list[int] = [] for wk in kept: recs = weeks_map[wk] ic = _spearman([r[0] for r in recs], [r[1] for r in recs]) if ic is not None: ics.append(ic) spread = _quintile_spread(recs) if spread is not None: spreads.append(spread) sizes.append(len(recs)) if not ics: continue mean_ic = sum(ics) / len(ics) if len(ics) > 1: std = math.sqrt(sum((x - mean_ic) ** 2 for x in ics) / (len(ics) - 1)) else: std = 0.0 t_stat = mean_ic / std * math.sqrt(len(ics)) if std > 0 else None rows.append({ "signal": name, "weeks": len(ics), "avg_cross_section": round(sum(sizes) / len(sizes), 1) if sizes else None, "mean_ic": round(mean_ic, 4), "ic_t_stat": round(t_stat, 2) if t_stat is not None else None, "ic_positive_pct": round(sum(1 for x in ics if x > 0) / len(ics) * 100, 1), "mean_quintile_spread": round(sum(spreads) / len(spreads), 4) if spreads else None, "reliable": len(ics) >= MIN_RELIABLE_PERIODS, }) rows.sort(key=lambda r: r["mean_ic"], reverse=True) return rows def _signal_series(records: list, benchmark_closes: dict[date, float] | None = None) -> dict: """Per-ticker signal/forward-return series as a PLAIN (picklable) nested dict — no defaultdict/lambda — so it can cross a process boundary.""" tmp: dict = defaultdict(lambda: defaultdict(list)) _accumulate_signal_series(records, tmp, benchmark_closes) return {name: dict(weeks) for name, weeks in tmp.items()} def _replay_and_signals( symbol: str, columns: tuple, config: dict, activation: dict, benchmark_closes: dict[date, float] | None = None, ) -> tuple[list[dict], dict]: """The CPU-bound per-ticker work, as a top-level (picklable) function so it can run in a worker process. Takes primitive column arrays (cheap to pickle), rebuilds bar objects, and returns (candidates, signal_series).""" date_ords, opens, highs, lows, closes, volumes = columns bars = [ SimpleNamespace( date=date.fromordinal(o), open=op, high=hi, low=lo, close=cl, volume=vo ) for o, op, hi, lo, cl, vo in zip(date_ords, opens, highs, lows, closes, volumes) ] return ( _replay_ticker(symbol, bars, config, activation, benchmark_closes), _signal_series(bars, benchmark_closes), ) def _backtest_worker_count() -> int: """How many worker processes to replay tickers across. Capped to cpu_count-1 so a core stays free for the web server; 1 means sequential.""" configured = int(getattr(settings, "backtest_workers", 4)) if configured <= 1: return 1 cpu = os.cpu_count() or 1 return max(1, min(configured, cpu - 1)) def _mp_context(): """A start method safe to use from the threaded asyncio server: ``forkserver`` (workers forked from a clean, single-threaded server — avoids the fork-with-threads deadlock) when available, else ``fork``. Returns None on spawn-only platforms (Windows), where the caller falls back to a thread.""" methods = multiprocessing.get_all_start_methods() for method in ("forkserver", "fork"): if method in methods: return multiprocessing.get_context(method) return None async def _fetch_columns(db: AsyncSession, symbol: str) -> tuple | None: """Read one ticker's OHLCV and detach it to primitive column arrays in the event loop (safe ORM access), ready to hand to a worker. None if no data.""" records = await query_ohlcv(db, symbol) if not records: return None return ( [r.date.toordinal() for r in records], [float(r.open) for r in records], [float(r.high) for r in records], [float(r.low) for r in records], [float(r.close) for r in records], [int(r.volume) for r in records], ) def _assign_signal_percentiles( candidates: list[dict], value_key: str, percentile_key: str, ) -> None: """Per ISO week, rank candidates by ``value_key`` and attach a 0-100 percentile under ``percentile_key`` (100 = strongest). Missing values get None and therefore cannot clear a gate based on that signal.""" by_week: dict = defaultdict(list) for c in candidates: if c.get(value_key) is not None: by_week[c["iso_week"]].append(c) for group in by_week.values(): ordered = sorted(group, key=lambda c: c[value_key]) n = len(ordered) for rank, c in enumerate(ordered): c[percentile_key] = (rank / (n - 1) * 100.0) if n > 1 else 100.0 for c in candidates: c.setdefault(percentile_key, None) def _assign_momentum_percentiles(candidates: list[dict]) -> None: """Per ISO week, rank candidates by their ticker's 12-1 momentum and attach a 0-100 ``momentum_percentile`` (100 = highest momentum in the universe that week). Candidates whose momentum is unknown (insufficient lookback) get None and therefore can't clear a momentum gate. Mutates ``candidates``.""" _assign_signal_percentiles(candidates, "momentum", "momentum_percentile") def _assign_residual_momentum_percentiles(candidates: list[dict]) -> None: """Residual-momentum percentile promoted to production activation ranking.""" _assign_signal_percentiles( candidates, "residual_momentum", RESIDUAL_PERCENTILE_KEY ) def _assign_activation_momentum_percentiles(candidates: list[dict]) -> None: """Production activation rank: residual 12-1 when available, raw fallback. The raw fallback mirrors the live scanner's behavior when benchmark history is unavailable. In normal backtests, SPY is loaded and this is residual. """ for c in candidates: c[PRODUCTION_PERCENTILE_KEY] = ( c.get(RESIDUAL_PERCENTILE_KEY) if c.get(RESIDUAL_PERCENTILE_KEY) is not None else c.get(RAW_PERCENTILE_KEY) ) def _momentum_qualifies(cand: dict, threshold: float) -> bool: """Whether a candidate clears the floors (meets_core) and the momentum gate. Threshold 0 disables the momentum gate (floors only). The gate is long-only: while it's active, shorts (fighting the trend) never qualify.""" if not cand["meets_core"]: return False if threshold <= 0: return True if cand["direction"] == "short": return False mp = cand.get(PRODUCTION_PERCENTILE_KEY) return mp is not None and mp >= threshold def _gate_ablation(candidates: list[dict], activation: dict, threshold: float) -> list[dict]: """Which floors earn their keep: re-qualify the same candidates at the current momentum cutoff with one floor removed per row (long-only throughout, matching the live gate). ``all_floors`` uses the stored ``meets_core`` so it reproduces the qualified set exactly; the ablation rows recompute the remaining floors from stored candidate fields with the same comparisons as ``qualification.setup_qualifies``. Optional tighteners (high-conviction / conflict exclusion), when enabled, stay applied in every ablation row so only the named floor varies. """ min_rr = float(activation.get("min_rr", 0.0)) min_conf = float(activation.get("min_confidence", 0.0)) exclude_neutral = bool(activation.get("exclude_neutral", False)) require_high = bool(activation.get("require_high_conviction", False)) exclude_conflicts = bool(activation.get("exclude_conflicts", False)) def momentum_ok(c: dict) -> bool: # Mirrors the momentum part of _momentum_qualifies: long-only while the # gate is active; threshold 0 disables it (shorts pass too). if threshold <= 0: return True if c["direction"] == "short": return False mp = c.get(PRODUCTION_PERCENTILE_KEY) return mp is not None and mp >= threshold def rr_ok(c: dict) -> bool: return c["rr"] >= min_rr def conf_ok(c: dict) -> bool: return (c["confidence"] or 0.0) >= min_conf def neutral_ok(c: dict) -> bool: if not exclude_neutral: return True action_direction = _action_direction(c.get("action")) return action_direction != "neutral" and action_direction == c["direction"] def tighteners_ok(c: dict) -> bool: if require_high and (c.get("action") or "") not in HIGH_CONVICTION_ACTIONS: return False if exclude_conflicts and (c.get("risk_level") or "") != "Low": return False return True def core_ok(c: dict) -> bool: return bool(c["meets_core"]) variants: list[tuple[str, list]] = [ ("all_floors", [core_ok]), ("no_confidence_floor", [rr_ok, neutral_ok, tighteners_ok]), ("no_rr_floor", [conf_ok, neutral_ok, tighteners_ok]), ("no_neutral_exclusion", [rr_ok, conf_ok, tighteners_ok]), ("momentum_only", []), ] # Grade each variant under BOTH exit models: the target/stop outcome # (_bucket_stats) and the hold-to-horizon time exit. A floor that pays under # the target model may be meaningless once the exit is a fixed hold — the # hold_* columns are what a time-exit gate decision should read. hold_days = max(TIME_EXIT_DAYS) rows: list[dict] = [] for name, checks in variants: matching = [ c for c in candidates if momentum_ok(c) and all(check(c) for check in checks) ] hold = _time_exit_bucket(matching, hold_days) rows.append({ "variant": name, **_bucket_stats(matching), "hold_days": hold_days, "hold_avg_r": hold["avg_r"], "hold_net_avg_r": hold["net_avg_r"], "hold_total_r": hold["total_r"], }) return rows # --------------------------------------------------------------------------- # Portfolio simulation # --------------------------------------------------------------------------- # Book parameters: fixed starting capital, a capped number of concurrent # positions (one per ticker), fixed-fractional risk sizing with a no-leverage # notional cap, and the same per-side cost as the per-trade tables. Entries are # the QUALIFIED setups at their detection close, best momentum first while # slots and cash allow. SIM_STARTING_CAPITAL = 10_000.0 SIM_MAX_POSITIONS = 10 SIM_RISK_PER_TRADE = 0.01 # fraction of equity risked per position (entry→stop) SIM_NOTIONAL_CAP = 0.20 # max fraction of equity per position (no margin) def _simulate_portfolio( candidates: list[dict], prices: dict[str, tuple], spy_closes: dict | None, exit_policy: str, hold_days: int, *, qualified_fn: Callable[[dict], bool] | None = None, ranking_key: str = PRODUCTION_PERCENTILE_KEY, max_positions: int = SIM_MAX_POSITIONS, risk_per_trade: float = SIM_RISK_PER_TRADE, ) -> dict | None: """Replay the qualified setups as ONE capital-constrained book and report portfolio economics from the daily equity curve (return, CAGR, drawdown, Sharpe) — the numbers the per-setup tables cannot give, because they grade every setup as if capital were infinite. ``exit_policy``: "target" races the S/R target against the stop with a timeout at ``hold_days``; "hold" keeps only the initial stop and exits at the ``hold_days``-th close. Stops fill at the worse of stop or open (gaps modeled); positions still open at the end are closed at their last mark. Returns None when there is nothing to trade. """ if qualified_fn is None: def _default_qualified(c: dict) -> bool: return bool(c.get("qualified")) qualified_fn = _default_qualified entries_by_ord: dict[int, list[dict]] = defaultdict(list) for c in candidates: if not qualified_fn(c) or c.get("direction") != "long": continue if not c.get("entry") or not c.get("stop"): continue entries_by_ord[date.fromisoformat(c["date"]).toordinal()].append(c) if not entries_by_ord: return None # Per-symbol bar lookup: date ordinal -> index into the column arrays. index_of: dict[str, dict[int, int]] = { sym: {o: i for i, o in enumerate(cols[0])} for sym, cols in prices.items() } first_ord = min(entries_by_ord) calendar = sorted({o for cols in prices.values() for o in cols[0] if o >= first_ord}) if not calendar: return None cash = SIM_STARTING_CAPITAL positions: dict[str, dict] = {} curve: list[tuple[int, float]] = [] trades: list[dict] = [] skipped_full = 0 def _bar(sym: str, o: int): idx = index_of.get(sym, {}).get(o) if idx is None: return None cols = prices[sym] return SimpleNamespace( open=cols[1][idx], high=cols[2][idx], low=cols[3][idx], close=cols[4][idx] ) def _close_trade(sym: str, fill: float, reason: str) -> None: nonlocal cash pos = positions.pop(sym) proceeds = pos["shares"] * fill cost = proceeds * COST_PER_SIDE cash += proceeds - cost risk = pos["entry"] - pos["stop"] trades.append({ "pnl": proceeds - pos["shares"] * pos["entry"] - cost - pos["entry_cost"], "r": (fill - pos["entry"]) / risk if risk > 0 else 0.0, "hold": pos["bars_held"], "reason": reason, }) def _marked_equity() -> float: return cash + sum(p["shares"] * p["last_close"] for p in positions.values()) for o in calendar: # 1) exits on today's bars (stop intraday, target intraday, time at close) for sym in list(positions): pos = positions[sym] bar = _bar(sym, o) if bar is None: continue pos["bars_held"] += 1 pos["last_close"] = bar.close if bar.low <= pos["stop"]: # Same-bar stop+target resolves as the loss (conservative, like # the evaluator); gap through the stop fills at the open. _close_trade(sym, min(pos["stop"], bar.open), "stop") continue if exit_policy == "target" and pos["target"] and bar.high >= pos["target"]: _close_trade(sym, pos["target"], "target") continue if pos["bars_held"] >= hold_days: _close_trade(sym, bar.close, "time") # 2) entries at today's close, best momentum first equity = _marked_equity() todays = sorted( entries_by_ord.get(o, ()), key=lambda c: c.get(ranking_key) or 0.0, reverse=True, ) for c in todays: sym = c["symbol"] if sym in positions: continue if len(positions) >= max_positions: skipped_full += 1 continue entry, stop = float(c["entry"]), float(c["stop"]) risk_ps = entry - stop if risk_ps <= 0 or entry <= 0: continue shares = min( (equity * risk_per_trade) / risk_ps, (equity * SIM_NOTIONAL_CAP) / entry, max(cash, 0.0) / (entry * (1.0 + COST_PER_SIDE)), ) if shares * entry < 1.0: # can't fund a meaningful position continue entry_cost = shares * entry * COST_PER_SIDE cash -= shares * entry + entry_cost positions[sym] = { "shares": shares, "entry": entry, "stop": stop, "target": float(c["target"]) if c.get("target") else None, "entry_cost": entry_cost, "bars_held": 0, "last_close": entry, } equity = _marked_equity() curve.append((o, _marked_equity())) # Close whatever is still open at its last mark so final equity is realized. for sym in list(positions): _close_trade(sym, positions[sym]["last_close"], "open_at_end") final_equity = cash curve[-1] = (calendar[-1], final_equity) total_return_pct = (final_equity / SIM_STARTING_CAPITAL - 1.0) * 100.0 years = (calendar[-1] - calendar[0]) / 365.25 cagr_pct = ( ((final_equity / SIM_STARTING_CAPITAL) ** (1.0 / years) - 1.0) * 100.0 if years > 0.25 and final_equity > 0 else None ) peak = float("-inf") max_dd = 0.0 for _, eq in curve: peak = max(peak, eq) if peak > 0: max_dd = max(max_dd, (peak - eq) / peak) rets = [b / a - 1.0 for (_, a), (_, b) in zip(curve, curve[1:]) if a > 0] sharpe = None if len(rets) > 2: mean = sum(rets) / len(rets) var = sum((x - mean) ** 2 for x in rets) / (len(rets) - 1) if var > 0: sharpe = mean / math.sqrt(var) * math.sqrt(252) # Per-calendar-year returns off the equity curve — shows whether every year # contributed or one exceptional stretch carried the result. yearly: list[dict] = [] year_start_eq = curve[0][1] cur_year = date.fromordinal(curve[0][0]).year last_eq = curve[0][1] for o, eq in curve: y = date.fromordinal(o).year if y != cur_year: yearly.append({ "year": cur_year, "return_pct": ( round((last_eq / year_start_eq - 1) * 100, 1) if year_start_eq > 0 else None ), }) cur_year = y year_start_eq = last_eq last_eq = eq yearly.append({ "year": cur_year, "return_pct": ( round((last_eq / year_start_eq - 1) * 100, 1) if year_start_eq > 0 else None ), }) pnls = [t["pnl"] for t in trades] wins = sum(1 for p in pnls if p > 0) spy_pct = None if spy_closes: from app.services.benchmark_service import benchmark_return_pct spy_pct = benchmark_return_pct( spy_closes, date.fromordinal(calendar[0]), date.fromordinal(calendar[-1]) ) return { "starting_capital": SIM_STARTING_CAPITAL, "final_equity": round(final_equity, 2), "total_return_pct": round(total_return_pct, 1), "cagr_pct": round(cagr_pct, 1) if cagr_pct is not None else None, "max_drawdown_pct": round(max_dd * 100.0, 1), "sharpe": round(sharpe, 2) if sharpe is not None else None, "trades": len(trades), "win_rate": round(wins / len(trades) * 100.0, 1) if trades else None, "avg_trade_pnl": round(sum(pnls) / len(pnls), 2) if pnls else None, "best_trade_r": round(max(t["r"] for t in trades), 2) if trades else None, "worst_trade_r": round(min(t["r"] for t in trades), 2) if trades else None, "best_trade_pnl": round(max(pnls), 2) if pnls else None, "worst_trade_pnl": round(min(pnls), 2) if pnls else None, "avg_hold_days": ( round(sum(t["hold"] for t in trades) / len(trades), 1) if trades else None ), "skipped_book_full": skipped_full, "spy_return_pct": round(spy_pct, 1) if spy_pct is not None else None, "yearly_returns": yearly, "start_date": date.fromordinal(calendar[0]).isoformat(), "end_date": date.fromordinal(calendar[-1]).isoformat(), } STRATEGY_VARIANTS: tuple[dict, ...] = ( { "variant": "production_residual_80_fixed10", "label": "Production residual 80 / max 10", "percentile_key": PRODUCTION_PERCENTILE_KEY, "cutoff": 80.0, "max_positions": 10, "risk_per_trade": 0.01, "risk_scale": None, }, { "variant": "legacy_raw_80_fixed10", "label": "Legacy raw 80 / max 10", "percentile_key": RAW_PERCENTILE_KEY, "cutoff": 80.0, "max_positions": 10, "risk_per_trade": 0.01, "risk_scale": None, }, { "variant": "raw_90_fixed10", "label": "Raw 90 / max 10", "percentile_key": RAW_PERCENTILE_KEY, "cutoff": 90.0, "max_positions": 10, "risk_per_trade": 0.01, "risk_scale": None, }, { "variant": "residual_80_fixed15", "label": "Residual 80 / max 15 capacity check", "percentile_key": PRODUCTION_PERCENTILE_KEY, "cutoff": 80.0, "max_positions": 15, "risk_per_trade": 0.01, "risk_scale": None, }, ) def _qualifies_by_percentile(cand: dict, percentile_key: str, threshold: float) -> bool: """Variant qualification: production floors + long-only signal percentile. This does not mutate or replace the production ``qualified`` field.""" if not cand.get("meets_core"): return False if threshold <= 0: return True if cand.get("direction") == "short": return False pct = cand.get(percentile_key) return pct is not None and pct >= threshold def _strategy_variant_sims( candidates: list[dict], prices: dict[str, tuple], _spy_closes: dict[date, float] | None, hold_days: int, ) -> list[dict]: """Research-only portfolio variants for comparing rank signals, cutoff and book capacity. Live qualification is untouched.""" rows: list[dict] = [] for cfg in STRATEGY_VARIANTS: percentile_key = str(cfg["percentile_key"]) cutoff = float(cfg["cutoff"]) sim = _simulate_portfolio( candidates, prices, _spy_closes, "hold", hold_days, qualified_fn=lambda c, pk=percentile_key, th=cutoff: _qualifies_by_percentile(c, pk, th), ranking_key=percentile_key, max_positions=int(cfg["max_positions"]), risk_per_trade=float(cfg["risk_per_trade"]), ) if sim is None: continue rows.append({ "variant": cfg["variant"], "label": cfg["label"], "ranking": "raw" if percentile_key == RAW_PERCENTILE_KEY else "residual", "cutoff": cutoff, "max_positions": int(cfg["max_positions"]), "risk_per_trade_pct": round(float(cfg["risk_per_trade"]) * 100, 2), "risk_scale": cfg["risk_scale"], **sim, }) return rows def _pct_loss(base: float | None, candidate: float | None) -> float | None: if base is None or candidate is None or base <= 0: return None return (base - candidate) / base def _build_research_recommendation(report: dict) -> dict: """Advisory rules for the remaining research variants after residual promotion.""" variants = { v.get("variant"): v for v in (report.get("strategy_variants") or {}).get("variants", []) } base = variants.get("production_residual_80_fixed10") items: list[dict] = [] if base is None: return { "items": [], "note": "Strategy variants unavailable; re-run the backtest after benchmark data is present.", } base_sharpe = base.get("sharpe") base_dd = base.get("max_drawdown_pct") base_cagr = base.get("cagr_pct") capacity = variants.get("residual_80_fixed15") if ( capacity and base_sharpe is not None and base_cagr is not None and capacity.get("sharpe") is not None and capacity.get("cagr_pct") is not None and capacity.get("max_drawdown_pct") is not None and base_dd is not None ): candidate = ( capacity["sharpe"] > base_sharpe and capacity["cagr_pct"] > base_cagr and capacity["max_drawdown_pct"] <= base_dd + 1.0 ) items.append({ "topic": "capacity_15", "candidate": candidate, "text": ( f"Max-15 capacity {'is worth promoting' if candidate else 'is not needed yet'}: " f"Sharpe {capacity['sharpe']:.2f} vs {base_sharpe:.2f}, " f"CAGR {capacity['cagr_pct']:+.1f}% vs {base_cagr:+.1f}%, " f"skipped {capacity.get('skipped_book_full', 0)} vs {base.get('skipped_book_full', 0)}." ), }) raw_90s = [ v for key, v in variants.items() if key.startswith("raw_90_") and v.get("risk_scale") is None ] raw_90 = max(raw_90s, key=lambda v: v.get("sharpe") or -999, default=None) if ( raw_90 and base_sharpe is not None and base_dd is not None and base_cagr is not None and raw_90.get("sharpe") is not None and raw_90.get("cagr_pct") is not None and raw_90.get("max_drawdown_pct") is not None ): cagr_loss = _pct_loss(base_cagr, raw_90.get("cagr_pct")) raw_90_sharpe = raw_90.get("sharpe") candidate = ( raw_90_sharpe is not None and raw_90_sharpe > base_sharpe and raw_90["max_drawdown_pct"] < base_dd and cagr_loss is not None and cagr_loss < 0.10 ) items.append({ "topic": "cutoff_90", "candidate": candidate, "text": ( f"Cutoff 90 {'is a promotion candidate' if candidate else 'stays research-only'}: " f"{raw_90['label']} Sharpe {raw_90_sharpe:.2f} vs {base_sharpe:.2f}, " f"drawdown {raw_90['max_drawdown_pct']:.1f}% vs {base_dd:.1f}%, " f"CAGR {raw_90.get('cagr_pct'):+.1f}% vs {base_cagr:+.1f}%." ), }) return { "items": items, "note": ( "Residual 12-1 momentum is now the production activation rank. " "Remaining rows are research comparisons only." ), } # --------------------------------------------------------------------------- # Data-driven recommendation # --------------------------------------------------------------------------- # A floor whose removal costs less than this (R net per trade, under the hold # exit) is judged not to be pulling its weight. _FLOOR_KEEP_THRESHOLD = 0.02 # The hold exit must beat the target exit by at least this much to be advised. _EXIT_SWITCH_THRESHOLD = 0.05 def _build_recommendation(report: dict) -> dict: """Strategy advice derived from THIS report's numbers — recomputed every run, so if the data flips, the advice flips. Rules are deliberately simple and transparent; thresholds are module constants above.""" items: list[dict] = [] q = report.get("overall_qualified") or {} target_net = q.get("net_avg_r") # Exit policy: the production target/stop race vs the best fixed hold. time_rows = [r for r in report.get("time_exit_sweep") or [] if r.get("net_avg_r") is not None] best_hold = max(time_rows, key=lambda r: r["net_avg_r"], default=None) sim_rows = { p.get("policy"): p for p in (report.get("portfolio_sim") or {}).get("policies", []) } hold_sim = sim_rows.get("hold") if best_hold is not None and target_net is not None: if best_hold["net_avg_r"] > target_net + _EXIT_SWITCH_THRESHOLD: text = ( f"Exit: hold {best_hold['hold_days']} trading days with the initial stop " f"({best_hold['net_avg_r']:+.2f}R net/trade vs {target_net:+.2f}R for the S/R target exit)." ) target_sim = sim_rows.get("target") if ( hold_sim is not None and target_sim is not None and hold_sim.get("cagr_pct") is not None and target_sim.get("cagr_pct") is not None ): text += ( f" The simulated book agrees: {hold_sim['cagr_pct']:+.1f}% vs " f"{target_sim['cagr_pct']:+.1f}% CAGR at similar drawdown." ) items.append({"topic": "exit", "text": text}) else: items.append({ "topic": "exit", "text": ( f"Exit: keep the S/R target exit ({target_net:+.2f}R net/trade) — " "no fixed hold beats it by a meaningful margin." ), }) # Gate floors, judged under the hold exit (the ablation's Hold column). ablation = {r["variant"]: r for r in report.get("gate_ablation") or []} base_row = ablation.get("all_floors") base_hold = (base_row or {}).get("hold_net_avg_r") floor_labels = { "no_confidence_floor": "confidence floor", "no_rr_floor": "R:R floor", "no_neutral_exclusion": "NEUTRAL exclusion", } if base_hold is not None: for variant, label in floor_labels.items(): row = ablation.get(variant) if row is None or row.get("hold_net_avg_r") is None: continue delta = base_hold - row["hold_net_avg_r"] extra = row["total"] - base_row["total"] if delta <= _FLOOR_KEEP_THRESHOLD: items.append({ "topic": "gate", "text": ( f"Gate: the {label} adds nothing — dropping it costs {delta:+.2f}R/trade " f"and adds {extra} trades." ), }) else: items.append({ "topic": "gate", "text": f"Gate: keep the {label} (worth {delta:+.2f}R/trade under the hold exit).", }) # Activation cutoff: best per-trade net among the promoted residual-momentum # sweep rows. sweep_rows = [ r for r in report.get("sweep") or [] if r.get("net_avg_r") is not None and (r.get("min_momentum_percentile") or 0) > 0 ] if sweep_rows: best_cut = max(sweep_rows, key=lambda r: r["net_avg_r"]) items.append({ "topic": "cutoff", "text": ( f"Residual-momentum cutoff: {best_cut['min_momentum_percentile']:.0f} has the best " f"per-trade net ({best_cut['net_avg_r']:+.2f}R over {best_cut['total']} setups)." ), }) # Book vs benchmark. book = hold_sim or sim_rows.get("target") if book is not None and book.get("spy_return_pct") is not None: edge = book["total_return_pct"] - book["spy_return_pct"] verdict = "beats" if edge > 0 else "LAGS" items.append({ "topic": "benchmark", "text": ( f"Book vs SPY: {verdict} buy-and-hold by {edge:+.1f} points " f"({book['total_return_pct']:+.1f}% vs {book['spy_return_pct']:+.1f}%), " f"max drawdown −{book['max_drawdown_pct']:.1f}%." ), }) # Robustness: does the edge survive without the biggest winners? Judged on # the RECOMMENDED exit — outlier dependence under an exit we'd abandon # would be the wrong warning. hold_recommended = ( best_hold is not None and target_net is not None and best_hold["net_avg_r"] > target_net + _EXIT_SWITCH_THRESHOLD ) if hold_recommended and best_hold.get("net_avg_r_ex_top5") is not None: trimmed = best_hold["net_avg_r_ex_top5"] basis = f"under the recommended {best_hold['hold_days']}d hold" else: trimmed = q.get("net_avg_r_ex_top5") basis = "under the S/R target exit" if trimmed is not None: if trimmed > 0: items.append({ "topic": "robustness", "text": ( f"Robustness: expectancy survives removing the top 5% of winners " f"({trimmed:+.2f}R net/trade {basis}) — the edge is not a handful " "of outliers." ), }) else: items.append({ "topic": "robustness", "text": ( f"Robustness WARNING: without the top 5% of winners the edge disappears " f"({trimmed:+.2f}R net/trade {basis}) — outlier-dependent, treat the " "headline expectancy with caution." ), }) headline = None if hold_recommended: cagr_note = ( f" (~{hold_sim['cagr_pct']:.0f}% CAGR simulated)" if hold_sim is not None and hold_sim.get("cagr_pct") is not None else "" ) headline = ( f"Trade the qualified list long-only; hold {best_hold['hold_days']} trading days " f"with the initial ATR stop{cagr_note}." ) return { "headline": headline, "items": items, "note": "Derived from this report's numbers on every run — the advice flips if the data does.", } async def run_backtest( db: AsyncSession, progress_cb: Callable[[int, int, str], None] | None = None, ) -> dict: """Replay every ticker and aggregate the Phase-1 reports for the current config.""" config = await get_recommendation_config(db) activation = await get_activation_config(db) result = await db.execute(select(Ticker).order_by(Ticker.symbol)) tickers = list(result.scalars().all()) total = len(tickers) candidates: list[dict] = [] # collected[signal_name][iso_week] -> list of (signal_value, forward_return) collected: dict = defaultdict(lambda: defaultdict(list)) # Residual momentum needs a point-in-time benchmark return stream. Best-effort: # if SPY benchmark data is unavailable, the residual signal simply won't be # emitted and the rest of the report remains valid. benchmark_closes: dict[date, float] = {} try: from app.services.benchmark_service import load_benchmark_closes, refresh_benchmark_prices await refresh_benchmark_prices(db, days=settings.ohlcv_history_days + 365) benchmark_closes = await load_benchmark_closes(db) except Exception: logger.exception("Benchmark load for residual momentum failed") def _merge(result: tuple[list[dict], dict]) -> None: cands, series = result candidates.extend(cands) for name, weeks in series.items(): for wk, pairs in weeks.items(): collected[name][wk].extend(pairs) workers = _backtest_worker_count() ctx = _mp_context() if (workers > 1 and total > 1) else None loop = asyncio.get_running_loop() pool = None if ctx is not None: try: pool = ProcessPoolExecutor(max_workers=workers, mp_context=ctx) except Exception: logger.exception("Backtest process pool unavailable; falling back to sequential") if pool is not None: # Parallel: replay tickers across worker processes — true multi-core, since # the GIL only serializes work within a single process. Bars are fetched in # the event loop (ORM-safe) and a bounded batch is fanned out to the pool. logger.info(json.dumps({ "event": "backtest_parallel", "workers": workers, "start_method": ctx.get_start_method(), })) chunk = workers * 2 done = 0 with pool: for start in range(0, total, chunk): batch = tickers[start : start + chunk] futures = [] for ticker in batch: try: columns = await _fetch_columns(db, ticker.symbol) except Exception: logger.exception("Backtest fetch failed for %s", ticker.symbol) continue if columns is not None: futures.append(loop.run_in_executor( pool, _replay_and_signals, ticker.symbol, columns, config, activation, benchmark_closes, )) for result in await asyncio.gather(*futures, return_exceptions=True): if isinstance(result, Exception): logger.error(json.dumps({"event": "backtest_worker_error", "message": str(result)})) else: _merge(result) done += len(batch) if progress_cb is not None: progress_cb(min(done, total), total, "") else: # Sequential fallback (Windows / 1 worker): run each replay in a worker # thread so the event loop — and the API server — stays responsive. for index, ticker in enumerate(tickers): if progress_cb is not None: progress_cb(index, total, ticker.symbol) try: columns = await _fetch_columns(db, ticker.symbol) if columns is not None: _merge(await asyncio.to_thread( _replay_and_signals, ticker.symbol, columns, config, activation, benchmark_closes, )) except Exception: logger.exception("Backtest replay failed for %s", ticker.symbol) if progress_cb is not None and total: progress_cb(total, total, "") # Cross-sectional momentum: rank every week's universe, then "qualified" means # floors + top ``min_momentum_percentile`` by promoted residual 12-1 momentum # (raw 12-1 fallback only when benchmark data is unavailable). _assign_momentum_percentiles(candidates) _assign_residual_momentum_percentiles(candidates) _assign_activation_momentum_percentiles(candidates) current_min_pct = float(activation.get("min_momentum_percentile", 80.0)) for c in candidates: c["qualified"] = _momentum_qualifies(c, current_min_pct) qualified = [c for c in candidates if c["qualified"]] longs = [c for c in qualified if c["direction"] == "long"] shorts = [c for c in qualified if c["direction"] == "short"] # Threshold sweep: re-apply the momentum gate at several percentile cutoffs # (floors held fixed) so the trade-off between how many setups qualify and # their expectancy is visible without re-replaying. 0 = floors only. sweep = [] for threshold in (90.0, 80.0, 70.0, 60.0, 50.0, 0.0): cands = [c for c in candidates if _momentum_qualifies(c, threshold)] sweep.append({"min_momentum_percentile": threshold, **_bucket_stats(cands)}) # Portfolio simulation: re-fetch bars for just the qualified symbols (memory- # light vs retaining every ticker's columns through the replay) and replay # the book once per exit policy. Best-effort — the report stands without it. hold_horizon = max(TIME_EXIT_DAYS) sim_policies: list[dict] = [] strategy_variant_rows: list[dict] = [] try: qual_symbols = sorted({ c["symbol"] for c in candidates if c.get("qualified") or any( _qualifies_by_percentile( c, str(cfg["percentile_key"]), float(cfg["cutoff"]) ) for cfg in STRATEGY_VARIANTS ) }) price_columns: dict[str, tuple] = {} for sym in qual_symbols: cols = await _fetch_columns(db, sym) if cols is not None: price_columns[sym] = cols spy_closes: dict | None = None try: from app.services.benchmark_service import ( load_benchmark_closes, refresh_benchmark_prices, ) oldest = min((cols[0][0] for cols in price_columns.values()), default=None) if oldest is not None: days_needed = (date.today() - date.fromordinal(oldest)).days + 30 await refresh_benchmark_prices(db, days=days_needed) spy_closes = await load_benchmark_closes(db) except Exception: logger.exception("Benchmark load for the portfolio sim failed") for policy in ("target", "hold"): sim = _simulate_portfolio( candidates, price_columns, spy_closes, policy, hold_horizon ) if sim is not None: sim_policies.append({"policy": policy, **sim}) strategy_variant_rows = _strategy_variant_sims( candidates, price_columns, spy_closes, hold_horizon ) except Exception: logger.exception("Portfolio simulation failed") report = { "generated_at": datetime.now(timezone.utc).isoformat(), "tickers": total, "candidates": len(candidates), "qualified": len(qualified), "params": { "step_days": STEP_DAYS, "horizon_days": HORIZON, "min_lookback": MIN_LOOKBACK, "cost_per_side_pct": round(COST_PER_SIDE * 100, 3), }, "activation": activation, "overall_qualified": _bucket_stats(qualified), "overall_all": _bucket_stats(candidates), "by_direction": { "long": _bucket_stats(longs), "short": _bucket_stats(shorts), }, "min_momentum_percentile": current_min_pct, "sweep": sweep, "gate_ablation": _gate_ablation(candidates, activation, current_min_pct), "gate_ablation_note": ( "Each row re-qualifies the same candidates at the current momentum " f"cutoff ({current_min_pct:.0f}) with one floor removed (long-only " "while the momentum gate is active). If dropping a floor doesn't " "hurt net expectancy, that floor isn't pulling its weight. The Hold " "columns grade the same variants under the hold-to-horizon time exit " "instead of the S/R target — the view that matters if the exit " "policy moves to a fixed hold." ), "time_exit_sweep": [_time_exit_bucket(qualified, n) for n in TIME_EXIT_DAYS], "portfolio_sim": { "params": { "starting_capital": SIM_STARTING_CAPITAL, "max_positions": SIM_MAX_POSITIONS, "risk_per_trade_pct": round(SIM_RISK_PER_TRADE * 100, 2), "notional_cap_pct": round(SIM_NOTIONAL_CAP * 100, 1), "cost_per_side_pct": round(COST_PER_SIDE * 100, 3), "hold_days": hold_horizon, }, "policies": sim_policies, "note": ( "One capital-constrained book over the same qualified setups the " "tables above grade per-setup: at most " f"{SIM_MAX_POSITIONS} concurrent positions (one per ticker), best " "momentum first, fixed-fractional risk sizing with a no-leverage " "cap, entries at the detection close, stops filled at the worse " "of stop or open. 'target' races the S/R target against the stop " "(timeout at the horizon); 'hold' keeps the initial stop and " "exits at the horizon close. SPY return is price-only over the " "same window. In-sample; no dividends." ), }, "strategy_variants": { "variants": strategy_variant_rows, "note": ( "Research-only hold-to-horizon portfolio variants. Production now " "uses residual 12-1 momentum at cutoff 80; the remaining rows compare " "the legacy raw rank, raw cutoff 90, and one max-15 capacity check." ), }, "signal_eval": _signal_evaluation(collected), "signal_eval_note": ( "Cross-sectional rank-IC of price-only signals vs the forward " f"{HORIZON}-day return (min {MIN_CROSS_SECTION} names/window). |IC| ≳ " "0.03 with a consistent sign is a real (if small) edge; near 0 means " "ranking on it sorts nothing. Momentum factors and high_52w are expected " "positive; reversal_1m and vol_6m expected negative (mean-reversion / " "low-vol anomaly). IC is measured on non-overlapping windows; signals " f"with fewer than {MIN_RELIABLE_PERIODS} independent windows are flagged " "unreliable (too few regimes — deepen history with the Data Backfill job)." ), "note": ( "Sentiment & fundamentals held neutral (no point-in-time history). " "Stops fill at the worse of the stop or the bar's open (gaps through " "the stop are modeled, so a loss can exceed −1R); targets never fill " "better than their level. " "~6 months ≈ one market regime — treat as directional, not gospel." ), } report["recommendation"] = _build_recommendation(report) report["research_recommendation"] = _build_research_recommendation(report) return report async def run_and_store( db: AsyncSession, progress_cb: Callable[[int, int, str], None] | None = None, ) -> dict: """Run the backtest and cache the report in a SystemSetting. Job entrypoint.""" report = await run_backtest(db, progress_cb) await update_setting(db, KEY_REPORT, json.dumps(report)) return report async def get_backtest_report(db: AsyncSession) -> dict | None: """Return the last cached backtest report, or None if never run.""" setting = await settings_store.get_setting(db, KEY_REPORT) if setting is None: return None try: return json.loads(setting.value) except (TypeError, ValueError): return None