Files
signal-platform/app/services/backtest_service.py
T
dennisthiessen 942a22ce65
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 55s
Deploy / deploy (push) Successful in 33s
feat: grade gate-ablation variants under the hold-to-horizon exit too
The ablation judged floors under the target/stop model, but the exit
sweeps point at replacing that exit with a fixed hold — under which the
R:R floor's rationale (bigger payoff at the target) may not apply. Each
ablation row now also carries hold_avg_r / hold_net_avg_r / hold_total_r
(30d hold, initial stop only), so the Phase 3 gate decision can be read
under the exit policy that would actually be used.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-02 11:34:41 +02:00

1111 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Historical backtest (Phase 1): replay the price-derived engine over stored
OHLCV and measure how the CURRENT config would have performed.
For each ticker we step through history (weekly), and at each as-of date D we
rebuild the setup using only bars ≤ D (no lookahead), then walk the actual bars
after D to record the realized outcome. Two reports come out:
- realized hit-rate / expectancy of qualified setups (and of all setups)
- a probability calibration curve: do "60% likely" targets hit ~60% of the time?
Limitation: sentiment and fundamentals have no point-in-time history, so they're
held neutral here — this calibrates the price/S-R/probability machinery only.
"""
from __future__ import annotations
import asyncio
import json
import logging
import math
import multiprocessing
import os
from collections import defaultdict
from collections.abc import Callable
from concurrent.futures import ProcessPoolExecutor
from datetime import date, datetime, timezone
from types import SimpleNamespace
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.models.ticker import Ticker
from app.services import settings_store
from app.services.admin_service import get_activation_config, update_setting
from app.services.indicator_service import _extract_ohlcv, compute_atr
from app.services.outcome_service import (
OUTCOME_AMBIGUOUS,
OUTCOME_STOP_HIT,
OUTCOME_TARGET_HIT,
Bar,
evaluate_setup_against_bars,
)
from app.services.price_service import query_ohlcv
from app.services.qualification import (
HIGH_CONVICTION_ACTIONS,
best_target_probability,
setup_qualifies,
)
from app.services.recommendation_service import (
_choose_recommended_action,
_classify_by_probability,
_risk_level_from_conflicts,
_select_primary_target,
_zone_representative_levels,
direction_analyzer,
get_recommendation_config,
probability_estimator,
signal_conflict_detector,
target_generator,
)
from app.services.scoring_service import (
compute_momentum_from_closes,
compute_technical_from_arrays,
)
from app.services.sr_service import detect_sr_levels
logger = logging.getLogger(__name__)
KEY_REPORT = "backtest_report"
STEP_DAYS = 5 # weekly cadence (≈ 5 trading days)
MIN_LOOKBACK = 60 # bars needed before D for indicators (EMA cross needs 51)
HORIZON = 30 # trading days to resolve an outcome (matches the evaluator)
ATR_MULTIPLIER = 1.5
_CAL_BUCKETS = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100.01)]
# Cross-sectional signal evaluation (factor IC). Each candidate signal is a
# point-in-time number computed from closes alone (sentiment/fundamentals have no
# history here), sampled one as-of per ISO week, and graded by how its rank
# correlates with the forward HORIZON-day return ACROSS the universe — i.e. does
# ranking stocks by this signal sort tomorrow's winners from losers. This is the
# test the per-setup hit-rate report can't do: it measures predictive power of a
# signal, not the outcome of a target/stop structure built on top of one.
MIN_CROSS_SECTION = 20 # min tickers present in a week to score that week
MIN_RELIABLE_PERIODS = 12 # min non-overlapping windows before a signal's IC is trusted
def _wrap_levels(level_dicts: list[dict]) -> list[Any]:
return [
SimpleNamespace(
id=i,
price_level=float(d["price_level"]),
type=d["type"],
strength=int(d["strength"]),
)
for i, d in enumerate(level_dicts)
]
def _window_setups(
window_records: list,
config: dict,
activation: dict,
) -> list[dict]:
"""Rebuild the setup(s) at the last bar of ``window_records`` (the as-of date),
using only those bars. Returns one dict per tradeable direction."""
if len(window_records) < MIN_LOOKBACK:
return []
_, highs, lows, closes, volumes = _extract_ohlcv(window_records)
entry = closes[-1]
if entry <= 0:
return []
# 12-1 month momentum (skip the last month) — the universe ranks on this.
# None until a year of history exists; such setups can't qualify on momentum.
mom_12_1 = (
closes[-22] / closes[-253] - 1.0
if len(closes) >= 253 and closes[-253] > 0
else None
)
try:
atr = compute_atr(highs, lows, closes)["atr"]
except Exception:
return []
if atr <= 0:
return []
sr_levels = _wrap_levels(detect_sr_levels(highs, lows, closes, volumes))
if not sr_levels:
return []
technical = (compute_technical_from_arrays(highs, lows, closes, volumes)[0]) or 50.0
momentum = (compute_momentum_from_closes(closes)[0]) or 50.0
dim_scores = {"technical": technical, "momentum": momentum}
conflicts = signal_conflict_detector.detect_conflicts(dim_scores, None, config)
confidences = {
"long": direction_analyzer.calculate_confidence("long", dim_scores, None, conflicts),
"short": direction_analyzer.calculate_confidence("short", dim_scores, None, conflicts),
}
# First pass: build targets per direction
per_dir: dict[str, dict] = {}
for direction in ("long", "short"):
stop = entry - atr * ATR_MULTIPLIER if direction == "long" else entry + atr * ATR_MULTIPLIER
zone_levels = _zone_representative_levels(sr_levels, entry)
targets = target_generator.generate_targets(direction, entry, stop, zone_levels, atr)
if not targets:
continue
for t in targets:
t["probability"] = probability_estimator.estimate_probability(
t, dim_scores, None, direction, config
)
t["classification"] = _classify_by_probability(t["probability"])
primary = _select_primary_target(targets)
if primary is None:
continue
# Flag the primary so qualification's EV uses the primary target's
# probability (matching production's enhance_trade_setup).
for t in targets:
t["is_primary"] = t is primary
per_dir[direction] = {"stop": stop, "targets": targets, "primary": primary}
available = set(per_dir.keys())
if not available:
return []
action = _choose_recommended_action(confidences["long"], confidences["short"], config, available)
out: list[dict] = []
for direction, data in per_dir.items():
targets, primary, stop = data["targets"], data["primary"], data["stop"]
setup_conflicts = list(conflicts)
if len(targets) < 3:
setup_conflicts.append("target-availability: Fewer than 3 valid S/R targets available")
risk_level = _risk_level_from_conflicts(setup_conflicts)
rr = float(primary["rr_ratio"])
target_price = float(primary["price"])
setup_ns = SimpleNamespace(
rr_ratio=rr,
confidence_score=confidences[direction],
recommended_action=action,
risk_level=risk_level,
targets=targets,
direction=direction,
target=target_price,
stop_loss=stop,
entry_price=entry,
)
# meets_core = clears every gate EXCEPT the cross-sectional momentum
# percentile, which can only be assigned once all tickers' setups for a
# week are known. run_backtest ranks momentum and finalizes `qualified`.
core_config = {**activation, "min_momentum_percentile": 0.0}
meets_core = setup_qualifies(setup_ns, core_config)
best_prob = best_target_probability(setup_ns)
out.append({
"direction": direction,
"entry": entry,
"stop": stop,
"target": target_price,
"rr": rr,
"confidence": confidences[direction],
"primary_prob": float(primary["probability"]),
"best_prob": best_prob,
"momentum": mom_12_1,
"meets_core": meets_core,
"action": action,
"risk_level": risk_level,
})
return out
def _tp_primitives(
direction: str, entry: float, stop: float, forward: list, horizon: int
) -> tuple[float, bool, float, float]:
"""Primitives for the take-profit exit model, from the bars after detection.
Returns ``(risk_pct, stopped, mfe_pct, close_pct)``:
- ``risk_pct`` fraction from entry to stop (the 1R distance)
- ``stopped`` whether the stop was hit within the horizon
- ``mfe_pct`` best favourable excursion (fraction) reachable *before* the
stop — strictly before the stop bar, so a same-bar tp+stop
counts as a loss (matching the conservative target model);
over the whole horizon if the stop is never hit
- ``close_pct`` directional return at the horizon-end close (the timeout exit)
From these any fixed take-profit level can be scored without re-walking bars:
tp reached before stop (``mfe_pct >= tp``) → +tp; else stop → 1R; else the
horizon-close move.
"""
long = direction == "long"
risk_pct = abs(entry - stop) / entry if entry else 0.0
bars = forward[:horizon]
if not bars:
return risk_pct, False, 0.0, 0.0
mfe = 0.0
stopped = False
for r in bars:
if (r.low <= stop) if long else (r.high >= stop):
stopped = True
break
fav = (r.high - entry) / entry if long else (entry - r.low) / entry
if fav > mfe:
mfe = fav
close_pct = ((bars[-1].close - entry) / entry) * (1.0 if long else -1.0)
return risk_pct, stopped, mfe, close_pct
def _trailing_exits(
direction: str, entry: float, init_stop: float, trail_fracs, forward: list, horizon: int
) -> dict[int, float]:
"""Realized R per trailing-stop width, in one pass over the post-entry bars.
The stop ratchets up (never below the initial stop): ``max(init_stop,
peak*(1-trail))`` for a long. Exit when a bar pierces the current stop (filled
at the stop level), else at the horizon-end close. Each width is keyed by its
integer percent (5 for 0.05). Conservative: the stop for a bar uses the peak
through the *previous* bar (this bar's high is folded in only afterwards).
R is relative to the initial risk (entry → init_stop).
"""
long = direction == "long"
risk = abs(entry - init_stop) / entry if entry else 0.0
if risk <= 0:
return {round(f * 100): 0.0 for f in trail_fracs}
bars = forward[:horizon]
if not bars:
return {round(f * 100): 0.0 for f in trail_fracs}
result: dict[int, float] = {}
peak = entry
active = list(trail_fracs)
for r in bars:
remaining = []
for f in active:
if long:
stop_level = max(init_stop, peak * (1 - f))
if r.low <= stop_level:
result[round(f * 100)] = ((stop_level - entry) / entry) / risk
continue
else:
stop_level = min(init_stop, peak * (1 + f))
if r.high >= stop_level:
result[round(f * 100)] = ((entry - stop_level) / entry) / risk
continue
remaining.append(f)
active = remaining
if not active:
break
if long:
if r.high > peak:
peak = r.high
elif r.low < peak:
peak = r.low
last_close = bars[-1].close
timeout_r = (((last_close - entry) / entry) if long else ((entry - last_close) / entry)) / risk
for f in active:
result[round(f * 100)] = timeout_r
return result
def _time_exits(
direction: str, entry: float, stop: float, forward: list, horizons
) -> dict[int, float]:
"""Realized R per hold-N-days exit, in one pass over the post-entry bars.
The initial stop stays active (fill at the stop level → 1R); otherwise the
trade exits at the day-N close (the last available close when history ends
early). No target, no trailing — the classic momentum implementation: buy,
hold ~N days, re-rank. Same conservative bar logic as ``_tp_primitives``: a
bar that pierces the stop is a loss before that bar's close counts.
"""
long = direction == "long"
risk = abs(entry - stop) / entry if entry else 0.0
if risk <= 0:
return {int(n): 0.0 for n in horizons}
bars = forward[: max(int(n) for n in horizons)]
if not bars:
return {int(n): 0.0 for n in horizons}
stop_day: int | None = None # 1-based trading day the stop was pierced
closes: list[float] = []
for i, r in enumerate(bars):
if (r.low <= stop) if long else (r.high >= stop):
stop_day = i + 1
break
closes.append(r.close)
result: dict[int, float] = {}
for h in horizons:
n = int(h)
if stop_day is not None and stop_day <= n:
result[n] = -1.0
else:
# closes can't be empty here: an empty closes means the stop hit on
# day 1, which the branch above catches for every n >= 1.
c = closes[min(n, len(closes)) - 1]
move = (c - entry) / entry if long else (entry - c) / entry
result[n] = move / risk
return result
def _replay_ticker(symbol: str, records: list, config: dict, activation: dict) -> list[dict]:
"""Walk one ticker's history weekly, building setups and their realized outcomes."""
candidates: list[dict] = []
n = len(records)
if n < MIN_LOOKBACK + HORIZON:
return candidates
for i in range(MIN_LOOKBACK - 1, n - HORIZON, STEP_DAYS):
window = records[: i + 1]
forward = records[i + 1 :]
forward_bars = [Bar(date=r.date, high=r.high, low=r.low) for r in forward]
for s in _window_setups(window, config, activation):
outcome, _ = evaluate_setup_against_bars(
s["direction"], s["stop"], s["target"], forward_bars, HORIZON
)
if outcome is None:
continue
target_hit = outcome == OUTCOME_TARGET_HIT
if outcome == OUTCOME_TARGET_HIT:
realized_r = s["rr"]
elif outcome in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS):
realized_r = -1.0
else: # expired
realized_r = 0.0
# Take-profit exit primitives (parallel to the target-vs-stop outcome
# above; aggregated separately into the take-profit sweep).
risk_pct, tp_stopped, mfe_pct, tp_close_pct = _tp_primitives(
s["direction"], s["entry"], s["stop"], forward, HORIZON
)
trail_r = _trailing_exits(
s["direction"], s["entry"], s["stop"], TRAIL_LEVELS, forward, HORIZON
)
time_r = _time_exits(
s["direction"], s["entry"], s["stop"], forward, TIME_EXIT_DAYS
)
iso = records[i].date.isocalendar()
candidates.append({
"symbol": symbol,
"date": records[i].date.isoformat(),
"iso_week": (iso[0], iso[1]),
"direction": s["direction"],
"rr": s["rr"],
"confidence": s["confidence"],
"primary_prob": s["primary_prob"],
"best_prob": s["best_prob"],
"momentum": s["momentum"],
"meets_core": s["meets_core"],
# Gate fields the ablation recomputes floors from — without them
# every candidate looks NEUTRAL and the ablation rows collapse.
"action": s["action"],
"risk_level": s["risk_level"],
"outcome": outcome,
"target_hit": target_hit,
"realized_r": realized_r,
"risk_pct": risk_pct,
"tp_stopped": tp_stopped,
"mfe_pct": mfe_pct,
"tp_close_pct": tp_close_pct,
"trail_r": trail_r,
"time_r": time_r,
})
return candidates
def _bucket_stats(cands: list[dict]) -> dict:
wins = sum(1 for c in cands if c["target_hit"])
losses = sum(1 for c in cands if c["outcome"] in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS))
expired = sum(1 for c in cands if c["outcome"] not in (OUTCOME_TARGET_HIT, OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS))
decided = wins + losses
rs = [c["realized_r"] for c in cands]
net_rs = [c["realized_r"] - _cost_r(c) for c in cands]
return {
"total": len(cands),
"wins": wins,
"losses": losses,
"expired": expired,
"hit_rate": round(wins / decided * 100, 1) if decided else None,
"avg_r": round(sum(rs) / len(rs), 3) if rs else None,
"total_r": round(sum(rs), 2) if rs else None,
"net_avg_r": round(sum(net_rs) / len(net_rs), 3) if net_rs else None,
"net_total_r": round(sum(net_rs), 2) if net_rs else None,
}
# Fixed take-profit levels (fractions) swept for the take-profit exit model.
# Extended into the tail so the avg-R peak/plateau is visible (it's where letting
# winners run stops paying). Note: this model ignores the setup's S/R target —
# it's a standalone fixed-% exit; exiting at the target is the target model.
TP_LEVELS = (0.04, 0.06, 0.08, 0.10, 0.12, 0.15, 0.20, 0.25, 0.30, 0.40, 0.50)
# Trailing-stop widths (give-back from the peak) swept for the trailing exit model.
TRAIL_LEVELS = (0.03, 0.05, 0.07, 0.10, 0.15, 0.20, 0.25, 0.30)
# Hold-N-days exits (initial stop stays active, exit at the day-N close) — the
# classic cross-sectional momentum implementation: buy, hold ~a month, re-rank.
TIME_EXIT_DAYS = (5, 10, 21, 30)
# Assumed transaction cost per side as a fraction of notional (commission +
# slippage). Aggregates report gross and net side by side; net subtracts a full
# round trip, converted into R via the setup's stop distance (the 1R unit).
COST_PER_SIDE = 0.001
def _cost_r(cand: dict) -> float:
"""Round-trip transaction cost in R units: two sides over the 1R stop
distance. 0 when the candidate carries no usable risk_pct."""
risk = cand.get("risk_pct") or 0.0
return (2.0 * COST_PER_SIDE) / risk if risk > 0 else 0.0
def _take_profit_bucket(cands: list[dict], tp: float) -> dict:
"""Stats for a fixed take-profit exit at +``tp`` (fraction): bank +tp if it's
reached before the stop, else 1R on a stop, else exit at the horizon close.
Results are in R (gain% / risk%) so they're comparable to the target model.
``hit_rate`` here = share that reached +tp before the stop (the MFE CDF)."""
rs: list[float] = []
net_rs: list[float] = []
wins = 0
for c in cands:
risk = c.get("risk_pct") or 0.0
if risk <= 0:
continue
if c.get("mfe_pct", 0.0) >= tp:
r = tp / risk
wins += 1
elif c.get("tp_stopped"):
r = -1.0
else:
r = (c.get("tp_close_pct", 0.0)) / risk
rs.append(r)
net_rs.append(r - _cost_r(c))
total = len(rs)
return {
"tp_pct": round(tp * 100, 1),
"total": total,
"wins": wins,
"hit_rate": round(wins / total * 100, 1) if total else None,
"avg_r": round(sum(rs) / total, 3) if total else None,
"total_r": round(sum(rs), 2) if total else None,
"net_avg_r": round(sum(net_rs) / total, 3) if total else None,
"net_total_r": round(sum(net_rs), 2) if total else None,
}
def _trailing_bucket(cands: list[dict], trail_pct: int) -> dict:
"""Stats for a trailing-stop exit of width ``trail_pct`` (integer percent).
Each candidate carries its realized R for this width in ``trail_r``; a "win"
is simply an exit in profit (R > 0)."""
pairs = [
(c["trail_r"][trail_pct], _cost_r(c))
for c in cands
if c.get("trail_r", {}).get(trail_pct) is not None
]
total = len(pairs)
rs = [r for r, _ in pairs]
net_rs = [r - cost for r, cost in pairs]
wins = sum(1 for r in rs if r > 0)
return {
"trail_pct": trail_pct,
"total": total,
"wins": wins,
"win_rate": round(wins / total * 100, 1) if total else None,
"avg_r": round(sum(rs) / total, 3) if total else None,
"total_r": round(sum(rs), 2) if total else None,
"net_avg_r": round(sum(net_rs) / total, 3) if total else None,
"net_total_r": round(sum(net_rs), 2) if total else None,
}
def _time_exit_bucket(cands: list[dict], hold_days: int) -> dict:
"""Stats for the hold-``hold_days`` exit: initial stop active, otherwise out
at the day-N close. Each candidate carries its realized R per hold length in
``time_r``; a "win" is an exit in profit (R > 0)."""
pairs = [
(c["time_r"][hold_days], _cost_r(c))
for c in cands
if c.get("time_r", {}).get(hold_days) is not None
]
total = len(pairs)
rs = [r for r, _ in pairs]
net_rs = [r - cost for r, cost in pairs]
wins = sum(1 for r in rs if r > 0)
return {
"hold_days": hold_days,
"total": total,
"wins": wins,
"win_rate": round(wins / total * 100, 1) if total else None,
"avg_r": round(sum(rs) / total, 3) if total else None,
"total_r": round(sum(rs), 2) if total else None,
"net_avg_r": round(sum(net_rs) / total, 3) if total else None,
"net_total_r": round(sum(net_rs), 2) if total else None,
}
def _calibration(cands: list[dict]) -> list[dict]:
"""Predicted target probability vs realized hit rate, per probability bucket."""
rows: list[dict] = []
for lo, hi in _CAL_BUCKETS:
bucket = [c for c in cands if lo <= c["primary_prob"] < hi]
if not bucket:
continue
hits = sum(1 for c in bucket if c["target_hit"])
rows.append({
"bucket": f"{int(lo)}-{int(min(hi, 100))}%",
"n": len(bucket),
"predicted_avg": round(sum(c["primary_prob"] for c in bucket) / len(bucket), 1),
"realized_hit_rate": round(hits / len(bucket) * 100, 1),
})
return rows
# ---------------------------------------------------------------------------
# Cross-sectional signal evaluation (factor information-coefficient)
# ---------------------------------------------------------------------------
def _weekly_asof_indices(records: list) -> list[int]:
"""Index of the last bar in each ISO week — the weekly rebalance as-of bars.
Keying on the calendar week (not the raw bar index) makes every ticker's
as-of dates line up, so the cross-section on a given week is comparable.
"""
last_by_week: dict[tuple[int, int], int] = {}
for idx, r in enumerate(records):
iso = r.date.isocalendar()
last_by_week[(iso[0], iso[1])] = idx
return sorted(last_by_week.values())
def _signal_values(closes: list[float], highs: list[float], i: int) -> dict[str, float]:
"""Point-in-time candidate signals at as-of index ``i`` (price-only).
Momentum factors follow the standard "skip the last month" convention
(return up to ~1 month ago) to avoid the short-term reversal effect, which
``reversal_1m`` isolates on purpose — we expect its IC to be negative if the
universe mean-reverts. ``trend_200`` is price vs its 200-bar SMA. ``high_52w``
is closeness to the trailing 52-week high (George/Hwang anchoring effect:
higher = nearer the high, expect positive IC). ``vol_6m`` is 126-day realized
volatility (expect negative IC if the low-volatility anomaly holds).
"""
out: dict[str, float] = {}
if i - 252 >= 0 and closes[i - 252] > 0:
out["mom_12_1"] = closes[i - 21] / closes[i - 252] - 1.0
if i - 126 >= 0 and closes[i - 126] > 0:
out["mom_6_1"] = closes[i - 21] / closes[i - 126] - 1.0
if i - 63 >= 0 and closes[i - 63] > 0:
out["mom_3_1"] = closes[i - 21] / closes[i - 63] - 1.0
if i - 21 >= 0 and closes[i - 21] > 0:
out["reversal_1m"] = closes[i] / closes[i - 21] - 1.0
if i - 199 >= 0:
sma = sum(closes[i - 199 : i + 1]) / 200.0
if sma > 0:
out["trend_200"] = closes[i] / sma - 1.0
if i - 251 >= 0:
high_52w = max(highs[i - 251 : i + 1])
if high_52w > 0:
out["high_52w"] = closes[i] / high_52w
if i - 126 >= 0:
rets = [
closes[k] / closes[k - 1] - 1.0
for k in range(i - 125, i + 1)
if closes[k - 1] > 0
]
if len(rets) >= 2:
mean = sum(rets) / len(rets)
var = sum((x - mean) ** 2 for x in rets) / (len(rets) - 1)
out["vol_6m"] = math.sqrt(var)
return out
def _accumulate_signal_series(records: list, collected: dict) -> None:
"""For each weekly as-of bar, emit (signal, forward-return) pairs keyed by ISO
week into ``collected[name][week_key]``. Forward return is close-to-close over
HORIZON trading days. Mutates ``collected`` (a dict of dict of list)."""
n = len(records)
if n < HORIZON + 21:
return
closes = [float(r.close) for r in records]
highs = [float(r.high) for r in records]
for i in _weekly_asof_indices(records):
j = i + HORIZON
if j >= n or closes[i] <= 0:
continue
fwd = closes[j] / closes[i] - 1.0
iso = records[i].date.isocalendar()
week_key = (iso[0], iso[1])
for name, val in _signal_values(closes, highs, i).items():
collected[name][week_key].append((val, fwd))
def _rank(xs: list[float]) -> list[float]:
"""Average (tie-corrected) ranks, 1-based."""
order = sorted(range(len(xs)), key=lambda k: xs[k])
ranks = [0.0] * len(xs)
i = 0
while i < len(xs):
j = i
while j + 1 < len(xs) and xs[order[j + 1]] == xs[order[i]]:
j += 1
avg_rank = (i + j) / 2.0 + 1.0
for k in range(i, j + 1):
ranks[order[k]] = avg_rank
i = j + 1
return ranks
def _pearson(a: list[float], b: list[float]) -> float | None:
n = len(a)
if n < 3:
return None
ma, mb = sum(a) / n, sum(b) / n
va = sum((x - ma) ** 2 for x in a)
vb = sum((y - mb) ** 2 for y in b)
if va <= 0 or vb <= 0:
return None
cov = sum((a[k] - ma) * (b[k] - mb) for k in range(n))
return cov / math.sqrt(va * vb)
def _spearman(xs: list[float], ys: list[float]) -> float | None:
"""Rank correlation = Pearson on the ranks. None if too few/degenerate."""
if len(xs) < 3:
return None
return _pearson(_rank(xs), _rank(ys))
def _quintile_spread(pairs: list[tuple[float, float]]) -> float | None:
"""Mean forward return of the top signal-quintile minus the bottom quintile."""
n = len(pairs)
if n < 10:
return None
ordered = sorted(pairs, key=lambda p: p[0])
k = n // 5
top = ordered[-k:]
bottom = ordered[:k]
return sum(p[1] for p in top) / k - sum(p[1] for p in bottom) / k
def _week_ordinal(week_key: tuple[int, int]) -> int:
"""Monotonic absolute week number from an (ISO year, ISO week) key."""
year, week = week_key
return year * 53 + week
def _nonoverlapping_weeks(
week_keys: list[tuple[int, int]], stride: int
) -> list[tuple[int, int]]:
"""Thin to weeks at least ``stride`` apart so their forward windows don't
overlap — greedy earliest-first. Removes the autocorrelation that would
otherwise inflate the IC t-stat across adjacent weekly rebalances."""
kept: list[tuple[int, int]] = []
last: int | None = None
for wk in sorted(week_keys, key=_week_ordinal):
o = _week_ordinal(wk)
if last is None or o - last >= stride:
kept.append(wk)
last = o
return kept
def _signal_evaluation(collected: dict) -> list[dict]:
"""Per-signal factor diagnostics, one row per candidate signal:
mean_ic average rank-IC (Spearman of signal vs fwd ret)
ic_t_stat mean_ic / stderr — is the IC reliably non-zero?
ic_positive_pct share of windows the IC is positive (consistency)
mean_quintile_spread avg top-minus-bottom-quintile forward return
reliable True once there are >= MIN_RELIABLE_PERIODS windows
IC is measured on NON-OVERLAPPING forward windows (weeks thinned to ~HORIZON
apart) so the t-stat isn't inflated by autocorrelation. A signal with no edge
lands near IC 0 / spread 0; one with too few independent windows is flagged
unreliable rather than trusted on a lucky handful.
"""
stride = max(1, round(HORIZON / 5)) # ISO weeks spanned by the forward window
rows: list[dict] = []
for name in sorted(collected):
weeks_map = collected[name]
usable = [wk for wk, recs in weeks_map.items() if len(recs) >= MIN_CROSS_SECTION]
kept = _nonoverlapping_weeks(usable, stride)
ics: list[float] = []
spreads: list[float] = []
sizes: list[int] = []
for wk in kept:
recs = weeks_map[wk]
ic = _spearman([r[0] for r in recs], [r[1] for r in recs])
if ic is not None:
ics.append(ic)
spread = _quintile_spread(recs)
if spread is not None:
spreads.append(spread)
sizes.append(len(recs))
if not ics:
continue
mean_ic = sum(ics) / len(ics)
if len(ics) > 1:
std = math.sqrt(sum((x - mean_ic) ** 2 for x in ics) / (len(ics) - 1))
else:
std = 0.0
t_stat = mean_ic / std * math.sqrt(len(ics)) if std > 0 else None
rows.append({
"signal": name,
"weeks": len(ics),
"avg_cross_section": round(sum(sizes) / len(sizes), 1) if sizes else None,
"mean_ic": round(mean_ic, 4),
"ic_t_stat": round(t_stat, 2) if t_stat is not None else None,
"ic_positive_pct": round(sum(1 for x in ics if x > 0) / len(ics) * 100, 1),
"mean_quintile_spread": round(sum(spreads) / len(spreads), 4) if spreads else None,
"reliable": len(ics) >= MIN_RELIABLE_PERIODS,
})
rows.sort(key=lambda r: r["mean_ic"], reverse=True)
return rows
def _signal_series(records: list) -> dict:
"""Per-ticker signal/forward-return series as a PLAIN (picklable) nested dict
— no defaultdict/lambda — so it can cross a process boundary."""
tmp: dict = defaultdict(lambda: defaultdict(list))
_accumulate_signal_series(records, tmp)
return {name: dict(weeks) for name, weeks in tmp.items()}
def _replay_and_signals(
symbol: str,
columns: tuple,
config: dict,
activation: dict,
) -> tuple[list[dict], dict]:
"""The CPU-bound per-ticker work, as a top-level (picklable) function so it can
run in a worker process. Takes primitive column arrays (cheap to pickle),
rebuilds bar objects, and returns (candidates, signal_series)."""
date_ords, opens, highs, lows, closes, volumes = columns
bars = [
SimpleNamespace(
date=date.fromordinal(o), open=op, high=hi, low=lo, close=cl, volume=vo
)
for o, op, hi, lo, cl, vo in zip(date_ords, opens, highs, lows, closes, volumes)
]
return _replay_ticker(symbol, bars, config, activation), _signal_series(bars)
def _backtest_worker_count() -> int:
"""How many worker processes to replay tickers across. Capped to cpu_count-1
so a core stays free for the web server; 1 means sequential."""
configured = int(getattr(settings, "backtest_workers", 4))
if configured <= 1:
return 1
cpu = os.cpu_count() or 1
return max(1, min(configured, cpu - 1))
def _mp_context():
"""A start method safe to use from the threaded asyncio server: ``forkserver``
(workers forked from a clean, single-threaded server — avoids the
fork-with-threads deadlock) when available, else ``fork``. Returns None on
spawn-only platforms (Windows), where the caller falls back to a thread."""
methods = multiprocessing.get_all_start_methods()
for method in ("forkserver", "fork"):
if method in methods:
return multiprocessing.get_context(method)
return None
async def _fetch_columns(db: AsyncSession, symbol: str) -> tuple | None:
"""Read one ticker's OHLCV and detach it to primitive column arrays in the
event loop (safe ORM access), ready to hand to a worker. None if no data."""
records = await query_ohlcv(db, symbol)
if not records:
return None
return (
[r.date.toordinal() for r in records],
[float(r.open) for r in records],
[float(r.high) for r in records],
[float(r.low) for r in records],
[float(r.close) for r in records],
[int(r.volume) for r in records],
)
def _assign_momentum_percentiles(candidates: list[dict]) -> None:
"""Per ISO week, rank candidates by their ticker's 12-1 momentum and attach a
0100 ``momentum_percentile`` (100 = highest momentum in the universe that
week). Candidates whose momentum is unknown (insufficient lookback) get None
and therefore can't clear a momentum gate. Mutates ``candidates``."""
by_week: dict = defaultdict(list)
for c in candidates:
if c.get("momentum") is not None:
by_week[c["iso_week"]].append(c)
for group in by_week.values():
ordered = sorted(group, key=lambda c: c["momentum"])
n = len(ordered)
for rank, c in enumerate(ordered):
c["momentum_percentile"] = (rank / (n - 1) * 100.0) if n > 1 else 100.0
for c in candidates:
c.setdefault("momentum_percentile", None)
def _momentum_qualifies(cand: dict, threshold: float) -> bool:
"""Whether a candidate clears the floors (meets_core) and the momentum gate.
Threshold 0 disables the momentum gate (floors only). The gate is long-only:
while it's active, shorts (fighting the trend) never qualify."""
if not cand["meets_core"]:
return False
if threshold <= 0:
return True
if cand["direction"] == "short":
return False
mp = cand.get("momentum_percentile")
return mp is not None and mp >= threshold
def _gate_ablation(candidates: list[dict], activation: dict, threshold: float) -> list[dict]:
"""Which floors earn their keep: re-qualify the same candidates at the
current momentum cutoff with one floor removed per row (long-only
throughout, matching the live gate).
``all_floors`` uses the stored ``meets_core`` so it reproduces the qualified
set exactly; the ablation rows recompute the remaining floors from stored
candidate fields with the same comparisons as
``qualification.setup_qualifies``. Optional tighteners (high-conviction /
conflict exclusion), when enabled, stay applied in every ablation row so
only the named floor varies.
"""
min_rr = float(activation.get("min_rr", 0.0))
min_conf = float(activation.get("min_confidence", 0.0))
exclude_neutral = bool(activation.get("exclude_neutral", False))
require_high = bool(activation.get("require_high_conviction", False))
exclude_conflicts = bool(activation.get("exclude_conflicts", False))
def momentum_ok(c: dict) -> bool:
# Mirrors the momentum part of _momentum_qualifies: long-only while the
# gate is active; threshold 0 disables it (shorts pass too).
if threshold <= 0:
return True
if c["direction"] == "short":
return False
mp = c.get("momentum_percentile")
return mp is not None and mp >= threshold
def rr_ok(c: dict) -> bool:
return c["rr"] >= min_rr
def conf_ok(c: dict) -> bool:
return (c["confidence"] or 0.0) >= min_conf
def neutral_ok(c: dict) -> bool:
return not exclude_neutral or (c.get("action") or "NEUTRAL") != "NEUTRAL"
def tighteners_ok(c: dict) -> bool:
if require_high and (c.get("action") or "") not in HIGH_CONVICTION_ACTIONS:
return False
if exclude_conflicts and (c.get("risk_level") or "") != "Low":
return False
return True
def core_ok(c: dict) -> bool:
return bool(c["meets_core"])
variants: list[tuple[str, list]] = [
("all_floors", [core_ok]),
("no_confidence_floor", [rr_ok, neutral_ok, tighteners_ok]),
("no_rr_floor", [conf_ok, neutral_ok, tighteners_ok]),
("no_neutral_exclusion", [rr_ok, conf_ok, tighteners_ok]),
("momentum_only", []),
]
# Grade each variant under BOTH exit models: the target/stop outcome
# (_bucket_stats) and the hold-to-horizon time exit. A floor that pays under
# the target model may be meaningless once the exit is a fixed hold — the
# hold_* columns are what a time-exit gate decision should read.
hold_days = max(TIME_EXIT_DAYS)
rows: list[dict] = []
for name, checks in variants:
matching = [
c for c in candidates
if momentum_ok(c) and all(check(c) for check in checks)
]
hold = _time_exit_bucket(matching, hold_days)
rows.append({
"variant": name,
**_bucket_stats(matching),
"hold_days": hold_days,
"hold_avg_r": hold["avg_r"],
"hold_net_avg_r": hold["net_avg_r"],
"hold_total_r": hold["total_r"],
})
return rows
async def run_backtest(
db: AsyncSession,
progress_cb: Callable[[int, int, str], None] | None = None,
) -> dict:
"""Replay every ticker and aggregate the Phase-1 reports for the current config."""
config = await get_recommendation_config(db)
activation = await get_activation_config(db)
result = await db.execute(select(Ticker).order_by(Ticker.symbol))
tickers = list(result.scalars().all())
total = len(tickers)
candidates: list[dict] = []
# collected[signal_name][iso_week] -> list of (signal_value, forward_return)
collected: dict = defaultdict(lambda: defaultdict(list))
def _merge(result: tuple[list[dict], dict]) -> None:
cands, series = result
candidates.extend(cands)
for name, weeks in series.items():
for wk, pairs in weeks.items():
collected[name][wk].extend(pairs)
workers = _backtest_worker_count()
ctx = _mp_context() if (workers > 1 and total > 1) else None
loop = asyncio.get_running_loop()
pool = None
if ctx is not None:
try:
pool = ProcessPoolExecutor(max_workers=workers, mp_context=ctx)
except Exception:
logger.exception("Backtest process pool unavailable; falling back to sequential")
if pool is not None:
# Parallel: replay tickers across worker processes — true multi-core, since
# the GIL only serializes work within a single process. Bars are fetched in
# the event loop (ORM-safe) and a bounded batch is fanned out to the pool.
logger.info(json.dumps({
"event": "backtest_parallel", "workers": workers,
"start_method": ctx.get_start_method(),
}))
chunk = workers * 2
done = 0
with pool:
for start in range(0, total, chunk):
batch = tickers[start : start + chunk]
futures = []
for ticker in batch:
try:
columns = await _fetch_columns(db, ticker.symbol)
except Exception:
logger.exception("Backtest fetch failed for %s", ticker.symbol)
continue
if columns is not None:
futures.append(loop.run_in_executor(
pool, _replay_and_signals, ticker.symbol, columns, config, activation
))
for result in await asyncio.gather(*futures, return_exceptions=True):
if isinstance(result, Exception):
logger.error(json.dumps({"event": "backtest_worker_error", "message": str(result)}))
else:
_merge(result)
done += len(batch)
if progress_cb is not None:
progress_cb(min(done, total), total, "")
else:
# Sequential fallback (Windows / 1 worker): run each replay in a worker
# thread so the event loop — and the API server — stays responsive.
for index, ticker in enumerate(tickers):
if progress_cb is not None:
progress_cb(index, total, ticker.symbol)
try:
columns = await _fetch_columns(db, ticker.symbol)
if columns is not None:
_merge(await asyncio.to_thread(
_replay_and_signals, ticker.symbol, columns, config, activation
))
except Exception:
logger.exception("Backtest replay failed for %s", ticker.symbol)
if progress_cb is not None and total:
progress_cb(total, total, "")
# Cross-sectional momentum: rank every week's universe, then "qualified" means
# floors + top ``min_momentum_percentile`` by 12-1 momentum.
_assign_momentum_percentiles(candidates)
current_min_pct = float(activation.get("min_momentum_percentile", 80.0))
for c in candidates:
c["qualified"] = _momentum_qualifies(c, current_min_pct)
qualified = [c for c in candidates if c["qualified"]]
longs = [c for c in qualified if c["direction"] == "long"]
shorts = [c for c in qualified if c["direction"] == "short"]
# Threshold sweep: re-apply the momentum gate at several percentile cutoffs
# (floors held fixed) so the trade-off between how many setups qualify and
# their expectancy is visible without re-replaying. 0 = floors only.
sweep = []
for threshold in (90.0, 80.0, 70.0, 60.0, 50.0, 0.0):
cands = [c for c in candidates if _momentum_qualifies(c, threshold)]
sweep.append({"min_momentum_percentile": threshold, **_bucket_stats(cands)})
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"tickers": total,
"candidates": len(candidates),
"qualified": len(qualified),
"params": {
"step_days": STEP_DAYS,
"horizon_days": HORIZON,
"min_lookback": MIN_LOOKBACK,
"cost_per_side_pct": round(COST_PER_SIDE * 100, 3),
},
"activation": activation,
"overall_qualified": _bucket_stats(qualified),
"overall_all": _bucket_stats(candidates),
"by_direction": {
"long": _bucket_stats(longs),
"short": _bucket_stats(shorts),
},
"min_momentum_percentile": current_min_pct,
"sweep": sweep,
"gate_ablation": _gate_ablation(candidates, activation, current_min_pct),
"gate_ablation_note": (
"Each row re-qualifies the same candidates at the current momentum "
f"cutoff ({current_min_pct:.0f}) with one floor removed (long-only "
"while the momentum gate is active). If dropping a floor doesn't "
"hurt net expectancy, that floor isn't pulling its weight. The Hold "
"columns grade the same variants under the hold-to-horizon time exit "
"instead of the S/R target — the view that matters if the exit "
"policy moves to a fixed hold."
),
"take_profit_sweep": [_take_profit_bucket(qualified, tp) for tp in TP_LEVELS],
"trailing_sweep": [_trailing_bucket(qualified, round(f * 100)) for f in TRAIL_LEVELS],
"time_exit_sweep": [_time_exit_bucket(qualified, n) for n in TIME_EXIT_DAYS],
"calibration": _calibration(candidates),
"signal_eval": _signal_evaluation(collected),
"signal_eval_note": (
"Cross-sectional rank-IC of price-only signals vs the forward "
f"{HORIZON}-day return (min {MIN_CROSS_SECTION} names/window). |IC| ≳ "
"0.03 with a consistent sign is a real (if small) edge; near 0 means "
"ranking on it sorts nothing. Momentum factors and high_52w are expected "
"positive; reversal_1m and vol_6m expected negative (mean-reversion / "
"low-vol anomaly). IC is measured on non-overlapping windows; signals "
f"with fewer than {MIN_RELIABLE_PERIODS} independent windows are flagged "
"unreliable (too few regimes — deepen history with the Data Backfill job)."
),
"note": (
"Sentiment & fundamentals held neutral (no point-in-time history). "
"~6 months ≈ one market regime — treat as directional, not gospel."
),
}
async def run_and_store(
db: AsyncSession,
progress_cb: Callable[[int, int, str], None] | None = None,
) -> dict:
"""Run the backtest and cache the report in a SystemSetting. Job entrypoint."""
report = await run_backtest(db, progress_cb)
await update_setting(db, KEY_REPORT, json.dumps(report))
return report
async def get_backtest_report(db: AsyncSession) -> dict | None:
"""Return the last cached backtest report, or None if never run."""
setting = await settings_store.get_setting(db, KEY_REPORT)
if setting is None:
return None
try:
return json.loads(setting.value)
except (TypeError, ValueError):
return None