1844 lines
74 KiB
Python
1844 lines
74 KiB
Python
"""Historical backtest (Phase 1): replay the price-derived engine over stored
|
||
OHLCV and measure how the CURRENT config would have performed.
|
||
|
||
For each ticker we step through history (weekly), and at each as-of date D we
|
||
rebuild the setup using only bars ≤ D (no lookahead), then walk the actual bars
|
||
after D to record the realized outcome. The report contains:
|
||
|
||
- hit-rate / expectancy of qualified setups vs the all-setups control group,
|
||
gross and net of costs, with robustness stats (median, profit factor,
|
||
expectancy without the top winners)
|
||
- the momentum-percentile sweep and the gate ablation (each floor removed in
|
||
turn, graded under both the target and the hold-to-horizon exit)
|
||
- the time-exit sweep (hold N days with the initial stop)
|
||
- cross-sectional factor rank-IC ("signal edge")
|
||
- a capital-constrained portfolio simulation (equity curve → CAGR, drawdown,
|
||
Sharpe, SPY comparison)
|
||
- a data-driven recommendation derived from this report's numbers
|
||
|
||
Limitation: sentiment and fundamentals have no point-in-time history, so they're
|
||
held neutral here — this calibrates the price/S-R machinery only.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import math
|
||
import multiprocessing
|
||
import os
|
||
import statistics
|
||
from collections import defaultdict
|
||
from collections.abc import Callable
|
||
from concurrent.futures import ProcessPoolExecutor
|
||
from datetime import date, datetime, timezone
|
||
from types import SimpleNamespace
|
||
from typing import Any
|
||
|
||
from sqlalchemy import select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.config import settings
|
||
from app.models.ticker import Ticker
|
||
from app.services import settings_store
|
||
from app.services.admin_service import get_activation_config, update_setting
|
||
from app.services.indicator_service import _extract_ohlcv, compute_atr
|
||
from app.services.outcome_service import (
|
||
OUTCOME_AMBIGUOUS,
|
||
OUTCOME_STOP_HIT,
|
||
OUTCOME_TARGET_HIT,
|
||
Bar,
|
||
evaluate_setup_against_bars,
|
||
)
|
||
from app.services.price_service import query_ohlcv
|
||
from app.services.qualification import (
|
||
HIGH_CONVICTION_ACTIONS,
|
||
_action_direction,
|
||
best_target_probability,
|
||
setup_qualifies,
|
||
)
|
||
from app.services.recommendation_service import (
|
||
_choose_recommended_action,
|
||
_classify_by_probability,
|
||
_risk_level_from_conflicts,
|
||
_select_primary_target,
|
||
_zone_representative_levels,
|
||
direction_analyzer,
|
||
get_recommendation_config,
|
||
probability_estimator,
|
||
signal_conflict_detector,
|
||
target_generator,
|
||
)
|
||
from app.services.scoring_service import (
|
||
compute_momentum_from_closes,
|
||
compute_technical_from_arrays,
|
||
)
|
||
from app.services.sr_service import detect_sr_levels
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
KEY_REPORT = "backtest_report"
|
||
|
||
STEP_DAYS = 5 # weekly cadence (≈ 5 trading days)
|
||
MIN_LOOKBACK = 60 # bars needed before D for indicators (EMA cross needs 51)
|
||
HORIZON = 30 # trading days to resolve an outcome (matches the evaluator)
|
||
ATR_MULTIPLIER = 1.5
|
||
|
||
# Cross-sectional signal evaluation (factor IC). Each candidate signal is a
|
||
# point-in-time number computed from closes alone (sentiment/fundamentals have no
|
||
# history here), sampled one as-of per ISO week, and graded by how its rank
|
||
# correlates with the forward HORIZON-day return ACROSS the universe — i.e. does
|
||
# ranking stocks by this signal sort tomorrow's winners from losers. This is the
|
||
# test the per-setup hit-rate report can't do: it measures predictive power of a
|
||
# signal, not the outcome of a target/stop structure built on top of one.
|
||
MIN_CROSS_SECTION = 20 # min tickers present in a week to score that week
|
||
MIN_RELIABLE_PERIODS = 12 # min non-overlapping windows before a signal's IC is trusted
|
||
PRODUCTION_PERCENTILE_KEY = "activation_momentum_percentile"
|
||
RAW_PERCENTILE_KEY = "momentum_percentile"
|
||
RESIDUAL_PERCENTILE_KEY = "residual_momentum_percentile"
|
||
|
||
|
||
def _wrap_levels(level_dicts: list[dict]) -> list[Any]:
|
||
return [
|
||
SimpleNamespace(
|
||
id=i,
|
||
price_level=float(d["price_level"]),
|
||
type=d["type"],
|
||
strength=int(d["strength"]),
|
||
)
|
||
for i, d in enumerate(level_dicts)
|
||
]
|
||
|
||
|
||
def _window_setups(
|
||
window_records: list,
|
||
config: dict,
|
||
activation: dict,
|
||
) -> list[dict]:
|
||
"""Rebuild the setup(s) at the last bar of ``window_records`` (the as-of date),
|
||
using only those bars. Returns one dict per tradeable direction."""
|
||
if len(window_records) < MIN_LOOKBACK:
|
||
return []
|
||
|
||
_, highs, lows, closes, volumes = _extract_ohlcv(window_records)
|
||
entry = closes[-1]
|
||
if entry <= 0:
|
||
return []
|
||
|
||
# 12-1 month momentum (skip the last month) — the universe ranks on this.
|
||
# None until a year of history exists; such setups can't qualify on momentum.
|
||
mom_12_1 = (
|
||
closes[-22] / closes[-253] - 1.0
|
||
if len(closes) >= 253 and closes[-253] > 0
|
||
else None
|
||
)
|
||
|
||
try:
|
||
atr = compute_atr(highs, lows, closes)["atr"]
|
||
except Exception:
|
||
return []
|
||
if atr <= 0:
|
||
return []
|
||
|
||
sr_levels = _wrap_levels(detect_sr_levels(highs, lows, closes, volumes))
|
||
if not sr_levels:
|
||
return []
|
||
|
||
technical = (compute_technical_from_arrays(highs, lows, closes, volumes)[0]) or 50.0
|
||
momentum = (compute_momentum_from_closes(closes)[0]) or 50.0
|
||
dim_scores = {"technical": technical, "momentum": momentum}
|
||
|
||
conflicts = signal_conflict_detector.detect_conflicts(dim_scores, None, config)
|
||
confidences = {
|
||
"long": direction_analyzer.calculate_confidence("long", dim_scores, None, conflicts),
|
||
"short": direction_analyzer.calculate_confidence("short", dim_scores, None, conflicts),
|
||
}
|
||
|
||
# First pass: build targets per direction
|
||
per_dir: dict[str, dict] = {}
|
||
for direction in ("long", "short"):
|
||
stop = entry - atr * ATR_MULTIPLIER if direction == "long" else entry + atr * ATR_MULTIPLIER
|
||
zone_levels = _zone_representative_levels(sr_levels, entry)
|
||
targets = target_generator.generate_targets(direction, entry, stop, zone_levels, atr)
|
||
if not targets:
|
||
continue
|
||
for t in targets:
|
||
t["probability"] = probability_estimator.estimate_probability(
|
||
t, dim_scores, None, direction, config
|
||
)
|
||
t["classification"] = _classify_by_probability(t["probability"])
|
||
primary = _select_primary_target(targets)
|
||
if primary is None:
|
||
continue
|
||
# Flag the primary so qualification's EV uses the primary target's
|
||
# probability (matching production's enhance_trade_setup).
|
||
for t in targets:
|
||
t["is_primary"] = t is primary
|
||
per_dir[direction] = {"stop": stop, "targets": targets, "primary": primary}
|
||
|
||
available = set(per_dir.keys())
|
||
if not available:
|
||
return []
|
||
|
||
action = _choose_recommended_action(confidences["long"], confidences["short"], config, available)
|
||
|
||
out: list[dict] = []
|
||
for direction, data in per_dir.items():
|
||
targets, primary, stop = data["targets"], data["primary"], data["stop"]
|
||
setup_conflicts = list(conflicts)
|
||
if len(targets) < 3:
|
||
setup_conflicts.append("target-availability: Fewer than 3 valid S/R targets available")
|
||
risk_level = _risk_level_from_conflicts(setup_conflicts)
|
||
rr = float(primary["rr_ratio"])
|
||
target_price = float(primary["price"])
|
||
|
||
setup_ns = SimpleNamespace(
|
||
rr_ratio=rr,
|
||
confidence_score=confidences[direction],
|
||
recommended_action=action,
|
||
risk_level=risk_level,
|
||
targets=targets,
|
||
direction=direction,
|
||
target=target_price,
|
||
stop_loss=stop,
|
||
entry_price=entry,
|
||
)
|
||
# meets_core = clears every gate EXCEPT the cross-sectional momentum
|
||
# percentile, which can only be assigned once all tickers' setups for a
|
||
# week are known. run_backtest ranks momentum and finalizes `qualified`.
|
||
core_config = {**activation, "min_momentum_percentile": 0.0}
|
||
meets_core = setup_qualifies(setup_ns, core_config)
|
||
best_prob = best_target_probability(setup_ns)
|
||
out.append({
|
||
"direction": direction,
|
||
"entry": entry,
|
||
"stop": stop,
|
||
"target": target_price,
|
||
"rr": rr,
|
||
"confidence": confidences[direction],
|
||
"primary_prob": float(primary["probability"]),
|
||
"best_prob": best_prob,
|
||
"momentum": mom_12_1,
|
||
"meets_core": meets_core,
|
||
"action": action,
|
||
"risk_level": risk_level,
|
||
})
|
||
return out
|
||
|
||
|
||
def _stop_fill_r(direction: str, entry: float, stop: float, bar) -> float:
|
||
"""Realized R when the stop is hit on ``bar``: filled at the stop, or at the
|
||
bar's open when price gapped through it — so a gap can lose more than −1R,
|
||
matching real fills. Targets are never filled better than their level, so
|
||
gap modeling only ever makes results more conservative."""
|
||
risk = abs(entry - stop)
|
||
if risk <= 0 or entry <= 0:
|
||
return -1.0
|
||
if direction == "long":
|
||
fill = min(stop, bar.open)
|
||
return (fill - entry) / risk
|
||
fill = max(stop, bar.open)
|
||
return (entry - fill) / risk
|
||
|
||
|
||
def _risk_and_stop_day(
|
||
direction: str, entry: float, stop: float, forward: list, horizon: int
|
||
) -> tuple[float, int | None]:
|
||
"""``(risk_pct, stop_day)`` from the bars after detection: the 1R stop
|
||
distance as a fraction of entry, and the 1-based trading day the initial
|
||
stop was first pierced within the horizon (None if never). Feeds the cost
|
||
conversion and the time-exit hold accounting."""
|
||
long = direction == "long"
|
||
risk_pct = abs(entry - stop) / entry if entry else 0.0
|
||
for i, r in enumerate(forward[:horizon]):
|
||
if (r.low <= stop) if long else (r.high >= stop):
|
||
return risk_pct, i + 1
|
||
return risk_pct, None
|
||
|
||
|
||
def _time_exits(
|
||
direction: str, entry: float, stop: float, forward: list, horizons
|
||
) -> dict[int, float]:
|
||
"""Realized R per hold-N-days exit, in one pass over the post-entry bars.
|
||
|
||
The initial stop stays active (fill at the stop level → −1R); otherwise the
|
||
trade exits at the day-N close (the last available close when history ends
|
||
early). No target, no trailing — the classic momentum implementation: buy,
|
||
hold ~N days, re-rank. Conservative bar logic: a bar that pierces the stop
|
||
is a loss before that bar's close counts.
|
||
"""
|
||
long = direction == "long"
|
||
risk = abs(entry - stop) / entry if entry else 0.0
|
||
if risk <= 0:
|
||
return {int(n): 0.0 for n in horizons}
|
||
bars = forward[: max(int(n) for n in horizons)]
|
||
if not bars:
|
||
return {int(n): 0.0 for n in horizons}
|
||
|
||
stop_day: int | None = None # 1-based trading day the stop was pierced
|
||
stop_r = -1.0
|
||
closes: list[float] = []
|
||
for i, r in enumerate(bars):
|
||
if (r.low <= stop) if long else (r.high >= stop):
|
||
stop_day = i + 1
|
||
stop_r = _stop_fill_r(direction, entry, stop, r)
|
||
break
|
||
closes.append(r.close)
|
||
|
||
result: dict[int, float] = {}
|
||
for h in horizons:
|
||
n = int(h)
|
||
if stop_day is not None and stop_day <= n:
|
||
result[n] = stop_r
|
||
else:
|
||
# closes can't be empty here: an empty closes means the stop hit on
|
||
# day 1, which the branch above catches for every n >= 1.
|
||
c = closes[min(n, len(closes)) - 1]
|
||
move = (c - entry) / entry if long else (entry - c) / entry
|
||
result[n] = move / risk
|
||
return result
|
||
|
||
|
||
def _replay_ticker(
|
||
symbol: str,
|
||
records: list,
|
||
config: dict,
|
||
activation: dict,
|
||
benchmark_closes: dict[date, float] | None = None,
|
||
) -> list[dict]:
|
||
"""Walk one ticker's history weekly, building setups and their realized outcomes."""
|
||
candidates: list[dict] = []
|
||
n = len(records)
|
||
if n < MIN_LOOKBACK + HORIZON:
|
||
return candidates
|
||
|
||
for i in range(MIN_LOOKBACK - 1, n - HORIZON, STEP_DAYS):
|
||
window = records[: i + 1]
|
||
forward = records[i + 1 :]
|
||
forward_bars = [Bar(date=r.date, high=r.high, low=r.low) for r in forward]
|
||
closes = [float(r.close) for r in window]
|
||
dates = [r.date for r in window]
|
||
residual_momentum = _residual_momentum_12_1(
|
||
dates, closes, len(window) - 1, benchmark_closes
|
||
)
|
||
|
||
for s in _window_setups(window, config, activation):
|
||
outcome, outcome_date = evaluate_setup_against_bars(
|
||
s["direction"], s["stop"], s["target"], forward_bars, HORIZON
|
||
)
|
||
if outcome is None:
|
||
continue
|
||
# Trading days from detection to resolution (expired = full horizon).
|
||
hold_days = next(
|
||
(idx + 1 for idx, r in enumerate(forward[:HORIZON]) if r.date == outcome_date),
|
||
min(HORIZON, len(forward)),
|
||
)
|
||
target_hit = outcome == OUTCOME_TARGET_HIT
|
||
if outcome == OUTCOME_TARGET_HIT:
|
||
realized_r = s["rr"]
|
||
elif outcome in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS):
|
||
# Fill at the stop, or at the open when the bar gapped through it.
|
||
realized_r = _stop_fill_r(
|
||
s["direction"], s["entry"], s["stop"], forward[hold_days - 1]
|
||
)
|
||
else: # expired
|
||
realized_r = 0.0
|
||
risk_pct, stop_day = _risk_and_stop_day(
|
||
s["direction"], s["entry"], s["stop"], forward, HORIZON
|
||
)
|
||
time_r = _time_exits(
|
||
s["direction"], s["entry"], s["stop"], forward, TIME_EXIT_DAYS
|
||
)
|
||
iso = records[i].date.isocalendar()
|
||
candidates.append({
|
||
"symbol": symbol,
|
||
"date": records[i].date.isoformat(),
|
||
"iso_week": (iso[0], iso[1]),
|
||
"direction": s["direction"],
|
||
"entry": s["entry"],
|
||
"stop": s["stop"],
|
||
"target": s["target"],
|
||
"rr": s["rr"],
|
||
"confidence": s["confidence"],
|
||
"primary_prob": s["primary_prob"],
|
||
"best_prob": s["best_prob"],
|
||
"momentum": s["momentum"],
|
||
"residual_momentum": residual_momentum,
|
||
"meets_core": s["meets_core"],
|
||
# Gate fields the ablation recomputes floors from — without them
|
||
# every candidate looks NEUTRAL and the ablation rows collapse.
|
||
"action": s["action"],
|
||
"risk_level": s["risk_level"],
|
||
"outcome": outcome,
|
||
"target_hit": target_hit,
|
||
"realized_r": realized_r,
|
||
"hold_days": hold_days,
|
||
"stop_day": stop_day,
|
||
"risk_pct": risk_pct,
|
||
"time_r": time_r,
|
||
})
|
||
return candidates
|
||
|
||
|
||
def _bucket_stats(cands: list[dict]) -> dict:
|
||
wins = sum(1 for c in cands if c["target_hit"])
|
||
losses = sum(1 for c in cands if c["outcome"] in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS))
|
||
expired = sum(1 for c in cands if c["outcome"] not in (OUTCOME_TARGET_HIT, OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS))
|
||
decided = wins + losses
|
||
rs = [c["realized_r"] for c in cands]
|
||
net_rs = [c["realized_r"] - _cost_r(c) for c in cands]
|
||
holds = [c["hold_days"] for c in cands if c.get("hold_days")]
|
||
avg_hold = sum(holds) / len(holds) if holds else None
|
||
net_avg = sum(net_rs) / len(net_rs) if net_rs else None
|
||
return {
|
||
"total": len(cands),
|
||
"wins": wins,
|
||
"losses": losses,
|
||
"expired": expired,
|
||
"hit_rate": round(wins / decided * 100, 1) if decided else None,
|
||
"avg_r": round(sum(rs) / len(rs), 3) if rs else None,
|
||
"total_r": round(sum(rs), 2) if rs else None,
|
||
"net_avg_r": round(net_avg, 3) if net_avg is not None else None,
|
||
"net_total_r": round(sum(net_rs), 2) if net_rs else None,
|
||
"best_r": round(max(rs), 2) if rs else None,
|
||
"worst_r": round(min(rs), 2) if rs else None,
|
||
"avg_hold_days": round(avg_hold, 1) if avg_hold is not None else None,
|
||
# Capital efficiency: net expectancy per trading day the capital is tied up.
|
||
"net_r_per_day": (
|
||
round(net_avg / avg_hold, 4) if net_avg is not None and avg_hold else None
|
||
),
|
||
**_robustness_stats(net_rs),
|
||
}
|
||
|
||
|
||
def _robustness_stats(net_rs: list[float]) -> dict:
|
||
"""Distribution-shape stats: the median (typical) trade, gross wins vs
|
||
losses, and the expectancy with the top 5% of winners removed — the direct
|
||
test of whether the edge depends on a handful of outliers."""
|
||
if not net_rs:
|
||
return {"median_net_r": None, "profit_factor": None, "net_avg_r_ex_top5": None}
|
||
gains = sum(r for r in net_rs if r > 0)
|
||
losses_abs = -sum(r for r in net_rs if r < 0)
|
||
trimmed = sorted(net_rs, reverse=True)[math.ceil(len(net_rs) * 0.05):]
|
||
return {
|
||
"median_net_r": round(statistics.median(net_rs), 3),
|
||
"profit_factor": round(gains / losses_abs, 2) if losses_abs > 0 else None,
|
||
"net_avg_r_ex_top5": (
|
||
round(sum(trimmed) / len(trimmed), 3) if trimmed else None
|
||
),
|
||
}
|
||
|
||
|
||
# The fixed take-profit and trailing-stop sweeps were retired 2026-07: swept
|
||
# TPs never found an interior optimum (momentum's edge lives in the right tail)
|
||
# and wide trails converged to the hold-to-horizon exit, so the time-exit sweep
|
||
# is the exit-decision surface.
|
||
|
||
# Hold-N-days exits (initial stop stays active, exit at the day-N close) — the
|
||
# classic cross-sectional momentum implementation: buy, hold ~a month, re-rank.
|
||
TIME_EXIT_DAYS = (5, 10, 21, 30)
|
||
|
||
# Assumed transaction cost per side as a fraction of notional (commission +
|
||
# slippage). Aggregates report gross and net side by side; net subtracts a full
|
||
# round trip, converted into R via the setup's stop distance (the 1R unit).
|
||
COST_PER_SIDE = 0.001
|
||
|
||
|
||
def _cost_r(cand: dict) -> float:
|
||
"""Round-trip transaction cost in R units: two sides over the 1R stop
|
||
distance. 0 when the candidate carries no usable risk_pct."""
|
||
risk = cand.get("risk_pct") or 0.0
|
||
return (2.0 * COST_PER_SIDE) / risk if risk > 0 else 0.0
|
||
|
||
|
||
def _time_exit_bucket(cands: list[dict], hold_days: int) -> dict:
|
||
"""Stats for the hold-``hold_days`` exit: initial stop active, otherwise out
|
||
at the day-N close. Each candidate carries its realized R per hold length in
|
||
``time_r``; a "win" is an exit in profit (R > 0). The realized hold is the
|
||
full N days unless the stop cut it short (``stop_day``)."""
|
||
rows = [
|
||
(
|
||
c["time_r"][hold_days],
|
||
_cost_r(c),
|
||
min(hold_days, c.get("stop_day") or hold_days),
|
||
)
|
||
for c in cands
|
||
if c.get("time_r", {}).get(hold_days) is not None
|
||
]
|
||
total = len(rows)
|
||
rs = [r for r, _, _ in rows]
|
||
net_rs = [r - cost for r, cost, _ in rows]
|
||
holds = [h for _, _, h in rows]
|
||
wins = sum(1 for r in rs if r > 0)
|
||
avg_hold = sum(holds) / total if total else None
|
||
net_avg = sum(net_rs) / total if total else None
|
||
return {
|
||
"hold_days": hold_days,
|
||
"total": total,
|
||
"wins": wins,
|
||
"win_rate": round(wins / total * 100, 1) if total else None,
|
||
"avg_r": round(sum(rs) / total, 3) if total else None,
|
||
"total_r": round(sum(rs), 2) if total else None,
|
||
"net_avg_r": round(net_avg, 3) if net_avg is not None else None,
|
||
"net_total_r": round(sum(net_rs), 2) if total else None,
|
||
"best_r": round(max(rs), 2) if rs else None,
|
||
"worst_r": round(min(rs), 2) if rs else None,
|
||
"avg_hold_days": round(avg_hold, 1) if avg_hold is not None else None,
|
||
"net_r_per_day": (
|
||
round(net_avg / avg_hold, 4) if net_avg is not None and avg_hold else None
|
||
),
|
||
**_robustness_stats(net_rs),
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Cross-sectional signal evaluation (factor information-coefficient)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _weekly_asof_indices(records: list) -> list[int]:
|
||
"""Index of the last bar in each ISO week — the weekly rebalance as-of bars.
|
||
|
||
Keying on the calendar week (not the raw bar index) makes every ticker's
|
||
as-of dates line up, so the cross-section on a given week is comparable.
|
||
"""
|
||
last_by_week: dict[tuple[int, int], int] = {}
|
||
for idx, r in enumerate(records):
|
||
iso = r.date.isocalendar()
|
||
last_by_week[(iso[0], iso[1])] = idx
|
||
return sorted(last_by_week.values())
|
||
|
||
|
||
def _residual_momentum_12_1(
|
||
dates: list[date],
|
||
closes: list[float],
|
||
i: int,
|
||
benchmark_closes: dict[date, float] | None,
|
||
) -> float | None:
|
||
"""12-1 momentum after removing the stock's linear benchmark exposure.
|
||
|
||
This is a practical beta-adjusted residual momentum approximation: estimate
|
||
beta from daily stock/benchmark returns over the same formation window
|
||
(12 months ending 1 month ago), then rank on cumulative stock return minus
|
||
beta * benchmark return. We deliberately do not subtract a fitted intercept:
|
||
with an intercept estimated over the same window, the arithmetic residuals
|
||
sum to ~zero by construction, which would destroy the signal.
|
||
"""
|
||
if not benchmark_closes or i - 252 < 0:
|
||
return None
|
||
|
||
stock_rets: list[float] = []
|
||
market_rets: list[float] = []
|
||
# Same daily intervals as mom_12_1: close[i-252] -> close[i-21].
|
||
for k in range(i - 251, i - 20):
|
||
prev_close = closes[k - 1]
|
||
bench_prev = benchmark_closes.get(dates[k - 1])
|
||
bench_cur = benchmark_closes.get(dates[k])
|
||
if prev_close <= 0 or bench_prev is None or bench_cur is None or bench_prev <= 0:
|
||
continue
|
||
stock_rets.append(closes[k] / prev_close - 1.0)
|
||
market_rets.append(bench_cur / bench_prev - 1.0)
|
||
|
||
if len(stock_rets) < 100:
|
||
return None
|
||
mean_market = sum(market_rets) / len(market_rets)
|
||
mean_stock = sum(stock_rets) / len(stock_rets)
|
||
var_market = sum((x - mean_market) ** 2 for x in market_rets)
|
||
if var_market <= 0:
|
||
return None
|
||
cov = sum((stock_rets[k] - mean_stock) * (market_rets[k] - mean_market) for k in range(len(stock_rets)))
|
||
beta = cov / var_market
|
||
return sum(stock_rets[k] - beta * market_rets[k] for k in range(len(stock_rets)))
|
||
|
||
|
||
def _signal_values(
|
||
dates: list[date],
|
||
closes: list[float],
|
||
highs: list[float],
|
||
i: int,
|
||
benchmark_closes: dict[date, float] | None = None,
|
||
) -> dict[str, float]:
|
||
"""Point-in-time candidate signals at as-of index ``i`` (price-only).
|
||
|
||
Momentum factors follow the standard "skip the last month" convention
|
||
(return up to ~1 month ago) to avoid the short-term reversal effect, which
|
||
``reversal_1m`` isolates on purpose — we expect its IC to be negative if the
|
||
universe mean-reverts. ``trend_200`` is price vs its 200-bar SMA. ``high_52w``
|
||
is closeness to the trailing 52-week high (George/Hwang anchoring effect:
|
||
higher = nearer the high, expect positive IC). ``vol_6m`` is 126-day realized
|
||
volatility (expect negative IC if the low-volatility anomaly holds).
|
||
"""
|
||
out: dict[str, float] = {}
|
||
if i - 252 >= 0 and closes[i - 252] > 0:
|
||
out["mom_12_1"] = closes[i - 21] / closes[i - 252] - 1.0
|
||
residual = _residual_momentum_12_1(dates, closes, i, benchmark_closes)
|
||
if residual is not None:
|
||
out["mom_12_1_resid"] = residual
|
||
if i - 126 >= 0 and closes[i - 126] > 0:
|
||
out["mom_6_1"] = closes[i - 21] / closes[i - 126] - 1.0
|
||
if i - 63 >= 0 and closes[i - 63] > 0:
|
||
out["mom_3_1"] = closes[i - 21] / closes[i - 63] - 1.0
|
||
if i - 21 >= 0 and closes[i - 21] > 0:
|
||
out["reversal_1m"] = closes[i] / closes[i - 21] - 1.0
|
||
if i - 199 >= 0:
|
||
sma = sum(closes[i - 199 : i + 1]) / 200.0
|
||
if sma > 0:
|
||
out["trend_200"] = closes[i] / sma - 1.0
|
||
if i - 251 >= 0:
|
||
high_52w = max(highs[i - 251 : i + 1])
|
||
if high_52w > 0:
|
||
out["high_52w"] = closes[i] / high_52w
|
||
if i - 126 >= 0:
|
||
rets = [
|
||
closes[k] / closes[k - 1] - 1.0
|
||
for k in range(i - 125, i + 1)
|
||
if closes[k - 1] > 0
|
||
]
|
||
if len(rets) >= 2:
|
||
mean = sum(rets) / len(rets)
|
||
var = sum((x - mean) ** 2 for x in rets) / (len(rets) - 1)
|
||
out["vol_6m"] = math.sqrt(var)
|
||
return out
|
||
|
||
|
||
def _accumulate_signal_series(
|
||
records: list,
|
||
collected: dict,
|
||
benchmark_closes: dict[date, float] | None = None,
|
||
) -> None:
|
||
"""For each weekly as-of bar, emit (signal, forward-return) pairs keyed by ISO
|
||
week into ``collected[name][week_key]``. Forward return is close-to-close over
|
||
HORIZON trading days. Mutates ``collected`` (a dict of dict of list)."""
|
||
n = len(records)
|
||
if n < HORIZON + 21:
|
||
return
|
||
closes = [float(r.close) for r in records]
|
||
highs = [float(r.high) for r in records]
|
||
dates = [r.date for r in records]
|
||
for i in _weekly_asof_indices(records):
|
||
j = i + HORIZON
|
||
if j >= n or closes[i] <= 0:
|
||
continue
|
||
fwd = closes[j] / closes[i] - 1.0
|
||
iso = records[i].date.isocalendar()
|
||
week_key = (iso[0], iso[1])
|
||
for name, val in _signal_values(dates, closes, highs, i, benchmark_closes).items():
|
||
collected[name][week_key].append((val, fwd))
|
||
|
||
|
||
def _rank(xs: list[float]) -> list[float]:
|
||
"""Average (tie-corrected) ranks, 1-based."""
|
||
order = sorted(range(len(xs)), key=lambda k: xs[k])
|
||
ranks = [0.0] * len(xs)
|
||
i = 0
|
||
while i < len(xs):
|
||
j = i
|
||
while j + 1 < len(xs) and xs[order[j + 1]] == xs[order[i]]:
|
||
j += 1
|
||
avg_rank = (i + j) / 2.0 + 1.0
|
||
for k in range(i, j + 1):
|
||
ranks[order[k]] = avg_rank
|
||
i = j + 1
|
||
return ranks
|
||
|
||
|
||
def _pearson(a: list[float], b: list[float]) -> float | None:
|
||
n = len(a)
|
||
if n < 3:
|
||
return None
|
||
ma, mb = sum(a) / n, sum(b) / n
|
||
va = sum((x - ma) ** 2 for x in a)
|
||
vb = sum((y - mb) ** 2 for y in b)
|
||
if va <= 0 or vb <= 0:
|
||
return None
|
||
cov = sum((a[k] - ma) * (b[k] - mb) for k in range(n))
|
||
return cov / math.sqrt(va * vb)
|
||
|
||
|
||
def _spearman(xs: list[float], ys: list[float]) -> float | None:
|
||
"""Rank correlation = Pearson on the ranks. None if too few/degenerate."""
|
||
if len(xs) < 3:
|
||
return None
|
||
return _pearson(_rank(xs), _rank(ys))
|
||
|
||
|
||
def _quintile_spread(pairs: list[tuple[float, float]]) -> float | None:
|
||
"""Mean forward return of the top signal-quintile minus the bottom quintile."""
|
||
n = len(pairs)
|
||
if n < 10:
|
||
return None
|
||
ordered = sorted(pairs, key=lambda p: p[0])
|
||
k = n // 5
|
||
top = ordered[-k:]
|
||
bottom = ordered[:k]
|
||
return sum(p[1] for p in top) / k - sum(p[1] for p in bottom) / k
|
||
|
||
|
||
def _week_ordinal(week_key: tuple[int, int]) -> int:
|
||
"""Monotonic absolute week number from an (ISO year, ISO week) key."""
|
||
year, week = week_key
|
||
return year * 53 + week
|
||
|
||
|
||
def _nonoverlapping_weeks(
|
||
week_keys: list[tuple[int, int]], stride: int
|
||
) -> list[tuple[int, int]]:
|
||
"""Thin to weeks at least ``stride`` apart so their forward windows don't
|
||
overlap — greedy earliest-first. Removes the autocorrelation that would
|
||
otherwise inflate the IC t-stat across adjacent weekly rebalances."""
|
||
kept: list[tuple[int, int]] = []
|
||
last: int | None = None
|
||
for wk in sorted(week_keys, key=_week_ordinal):
|
||
o = _week_ordinal(wk)
|
||
if last is None or o - last >= stride:
|
||
kept.append(wk)
|
||
last = o
|
||
return kept
|
||
|
||
|
||
def _signal_evaluation(collected: dict) -> list[dict]:
|
||
"""Per-signal factor diagnostics, one row per candidate signal:
|
||
|
||
mean_ic average rank-IC (Spearman of signal vs fwd ret)
|
||
ic_t_stat mean_ic / stderr — is the IC reliably non-zero?
|
||
ic_positive_pct share of windows the IC is positive (consistency)
|
||
mean_quintile_spread avg top-minus-bottom-quintile forward return
|
||
reliable True once there are >= MIN_RELIABLE_PERIODS windows
|
||
|
||
IC is measured on NON-OVERLAPPING forward windows (weeks thinned to ~HORIZON
|
||
apart) so the t-stat isn't inflated by autocorrelation. A signal with no edge
|
||
lands near IC 0 / spread 0; one with too few independent windows is flagged
|
||
unreliable rather than trusted on a lucky handful.
|
||
"""
|
||
stride = max(1, round(HORIZON / 5)) # ISO weeks spanned by the forward window
|
||
rows: list[dict] = []
|
||
for name in sorted(collected):
|
||
weeks_map = collected[name]
|
||
usable = [wk for wk, recs in weeks_map.items() if len(recs) >= MIN_CROSS_SECTION]
|
||
kept = _nonoverlapping_weeks(usable, stride)
|
||
ics: list[float] = []
|
||
spreads: list[float] = []
|
||
sizes: list[int] = []
|
||
for wk in kept:
|
||
recs = weeks_map[wk]
|
||
ic = _spearman([r[0] for r in recs], [r[1] for r in recs])
|
||
if ic is not None:
|
||
ics.append(ic)
|
||
spread = _quintile_spread(recs)
|
||
if spread is not None:
|
||
spreads.append(spread)
|
||
sizes.append(len(recs))
|
||
if not ics:
|
||
continue
|
||
mean_ic = sum(ics) / len(ics)
|
||
if len(ics) > 1:
|
||
std = math.sqrt(sum((x - mean_ic) ** 2 for x in ics) / (len(ics) - 1))
|
||
else:
|
||
std = 0.0
|
||
t_stat = mean_ic / std * math.sqrt(len(ics)) if std > 0 else None
|
||
rows.append({
|
||
"signal": name,
|
||
"weeks": len(ics),
|
||
"avg_cross_section": round(sum(sizes) / len(sizes), 1) if sizes else None,
|
||
"mean_ic": round(mean_ic, 4),
|
||
"ic_t_stat": round(t_stat, 2) if t_stat is not None else None,
|
||
"ic_positive_pct": round(sum(1 for x in ics if x > 0) / len(ics) * 100, 1),
|
||
"mean_quintile_spread": round(sum(spreads) / len(spreads), 4) if spreads else None,
|
||
"reliable": len(ics) >= MIN_RELIABLE_PERIODS,
|
||
})
|
||
rows.sort(key=lambda r: r["mean_ic"], reverse=True)
|
||
return rows
|
||
|
||
|
||
def _signal_series(records: list, benchmark_closes: dict[date, float] | None = None) -> dict:
|
||
"""Per-ticker signal/forward-return series as a PLAIN (picklable) nested dict
|
||
— no defaultdict/lambda — so it can cross a process boundary."""
|
||
tmp: dict = defaultdict(lambda: defaultdict(list))
|
||
_accumulate_signal_series(records, tmp, benchmark_closes)
|
||
return {name: dict(weeks) for name, weeks in tmp.items()}
|
||
|
||
|
||
def _replay_and_signals(
|
||
symbol: str,
|
||
columns: tuple,
|
||
config: dict,
|
||
activation: dict,
|
||
benchmark_closes: dict[date, float] | None = None,
|
||
) -> tuple[list[dict], dict]:
|
||
"""The CPU-bound per-ticker work, as a top-level (picklable) function so it can
|
||
run in a worker process. Takes primitive column arrays (cheap to pickle),
|
||
rebuilds bar objects, and returns (candidates, signal_series)."""
|
||
date_ords, opens, highs, lows, closes, volumes = columns
|
||
bars = [
|
||
SimpleNamespace(
|
||
date=date.fromordinal(o), open=op, high=hi, low=lo, close=cl, volume=vo
|
||
)
|
||
for o, op, hi, lo, cl, vo in zip(date_ords, opens, highs, lows, closes, volumes)
|
||
]
|
||
return (
|
||
_replay_ticker(symbol, bars, config, activation, benchmark_closes),
|
||
_signal_series(bars, benchmark_closes),
|
||
)
|
||
|
||
|
||
def _backtest_worker_count() -> int:
|
||
"""How many worker processes to replay tickers across. Capped to cpu_count-1
|
||
so a core stays free for the web server; 1 means sequential."""
|
||
configured = int(getattr(settings, "backtest_workers", 4))
|
||
if configured <= 1:
|
||
return 1
|
||
cpu = os.cpu_count() or 1
|
||
return max(1, min(configured, cpu - 1))
|
||
|
||
|
||
def _offline_snapshot_mode() -> bool:
|
||
return os.getenv("BACKTEST_SNAPSHOT_OFFLINE", "").strip().lower() in {"1", "true", "yes", "on"}
|
||
|
||
|
||
async def _load_benchmark_closes_for_backtest(
|
||
db: AsyncSession, *, days: int | None = None, refresh: bool = True
|
||
) -> dict[date, float]:
|
||
from app.services.benchmark_service import load_benchmark_closes, refresh_benchmark_prices
|
||
|
||
if refresh and not _offline_snapshot_mode():
|
||
if days is None:
|
||
await refresh_benchmark_prices(db)
|
||
else:
|
||
await refresh_benchmark_prices(db, days=days)
|
||
return await load_benchmark_closes(db)
|
||
|
||
|
||
def _mp_context():
|
||
"""A start method safe to use from the threaded asyncio server: ``forkserver``
|
||
(workers forked from a clean, single-threaded server — avoids the
|
||
fork-with-threads deadlock) when available, else ``fork``. Returns None on
|
||
spawn-only platforms (Windows), where the caller falls back to a thread."""
|
||
methods = multiprocessing.get_all_start_methods()
|
||
for method in ("forkserver", "fork"):
|
||
if method in methods:
|
||
return multiprocessing.get_context(method)
|
||
if (
|
||
_offline_snapshot_mode()
|
||
and os.getenv("BACKTEST_ALLOW_SPAWN", "").strip().lower() in {"1", "true", "yes", "on"}
|
||
and "spawn" in methods
|
||
):
|
||
return multiprocessing.get_context("spawn")
|
||
return None
|
||
|
||
|
||
async def _fetch_columns(db: AsyncSession, symbol: str) -> tuple | None:
|
||
"""Read one ticker's OHLCV and detach it to primitive column arrays in the
|
||
event loop (safe ORM access), ready to hand to a worker. None if no data."""
|
||
records = await query_ohlcv(db, symbol)
|
||
if not records:
|
||
return None
|
||
return (
|
||
[r.date.toordinal() for r in records],
|
||
[float(r.open) for r in records],
|
||
[float(r.high) for r in records],
|
||
[float(r.low) for r in records],
|
||
[float(r.close) for r in records],
|
||
[int(r.volume) for r in records],
|
||
)
|
||
|
||
|
||
def _assign_signal_percentiles(
|
||
candidates: list[dict],
|
||
value_key: str,
|
||
percentile_key: str,
|
||
) -> None:
|
||
"""Per ISO week, rank candidates by ``value_key`` and attach a 0-100
|
||
percentile under ``percentile_key`` (100 = strongest). Missing values get
|
||
None and therefore cannot clear a gate based on that signal."""
|
||
by_week: dict = defaultdict(list)
|
||
for c in candidates:
|
||
if c.get(value_key) is not None:
|
||
by_week[c["iso_week"]].append(c)
|
||
for group in by_week.values():
|
||
ordered = sorted(group, key=lambda c: c[value_key])
|
||
n = len(ordered)
|
||
for rank, c in enumerate(ordered):
|
||
c[percentile_key] = (rank / (n - 1) * 100.0) if n > 1 else 100.0
|
||
for c in candidates:
|
||
c.setdefault(percentile_key, None)
|
||
|
||
|
||
def _assign_momentum_percentiles(candidates: list[dict]) -> None:
|
||
"""Per ISO week, rank candidates by their ticker's 12-1 momentum and attach a
|
||
0-100 ``momentum_percentile`` (100 = highest momentum in the universe that
|
||
week). Candidates whose momentum is unknown (insufficient lookback) get None
|
||
and therefore can't clear a momentum gate. Mutates ``candidates``."""
|
||
_assign_signal_percentiles(candidates, "momentum", "momentum_percentile")
|
||
|
||
|
||
def _assign_residual_momentum_percentiles(candidates: list[dict]) -> None:
|
||
"""Residual-momentum percentile promoted to production activation ranking."""
|
||
_assign_signal_percentiles(
|
||
candidates, "residual_momentum", RESIDUAL_PERCENTILE_KEY
|
||
)
|
||
|
||
|
||
def _assign_activation_momentum_percentiles(candidates: list[dict]) -> None:
|
||
"""Production activation rank: residual 12-1 when available, raw fallback.
|
||
|
||
The raw fallback mirrors the live scanner's behavior when benchmark history
|
||
is unavailable. In normal backtests, SPY is loaded and this is residual.
|
||
"""
|
||
for c in candidates:
|
||
c[PRODUCTION_PERCENTILE_KEY] = (
|
||
c.get(RESIDUAL_PERCENTILE_KEY)
|
||
if c.get(RESIDUAL_PERCENTILE_KEY) is not None
|
||
else c.get(RAW_PERCENTILE_KEY)
|
||
)
|
||
|
||
|
||
def _momentum_qualifies(cand: dict, threshold: float) -> bool:
|
||
"""Whether a candidate clears the floors (meets_core) and the momentum gate.
|
||
Threshold 0 disables the momentum gate (floors only). The gate is long-only:
|
||
while it's active, shorts (fighting the trend) never qualify."""
|
||
if not cand["meets_core"]:
|
||
return False
|
||
if threshold <= 0:
|
||
return True
|
||
if cand["direction"] == "short":
|
||
return False
|
||
mp = cand.get(PRODUCTION_PERCENTILE_KEY)
|
||
return mp is not None and mp >= threshold
|
||
|
||
|
||
def _gate_ablation(candidates: list[dict], activation: dict, threshold: float) -> list[dict]:
|
||
"""Which floors earn their keep: re-qualify the same candidates at the
|
||
current momentum cutoff with one floor removed per row (long-only
|
||
throughout, matching the live gate).
|
||
|
||
``all_floors`` uses the stored ``meets_core`` so it reproduces the qualified
|
||
set exactly; the ablation rows recompute the remaining floors from stored
|
||
candidate fields with the same comparisons as
|
||
``qualification.setup_qualifies``. Optional tighteners (high-conviction /
|
||
conflict exclusion), when enabled, stay applied in every ablation row so
|
||
only the named floor varies.
|
||
"""
|
||
min_rr = float(activation.get("min_rr", 0.0))
|
||
min_conf = float(activation.get("min_confidence", 0.0))
|
||
exclude_neutral = bool(activation.get("exclude_neutral", False))
|
||
require_high = bool(activation.get("require_high_conviction", False))
|
||
exclude_conflicts = bool(activation.get("exclude_conflicts", False))
|
||
|
||
def momentum_ok(c: dict) -> bool:
|
||
# Mirrors the momentum part of _momentum_qualifies: long-only while the
|
||
# gate is active; threshold 0 disables it (shorts pass too).
|
||
if threshold <= 0:
|
||
return True
|
||
if c["direction"] == "short":
|
||
return False
|
||
mp = c.get(PRODUCTION_PERCENTILE_KEY)
|
||
return mp is not None and mp >= threshold
|
||
|
||
def rr_ok(c: dict) -> bool:
|
||
return c["rr"] >= min_rr
|
||
|
||
def conf_ok(c: dict) -> bool:
|
||
return (c["confidence"] or 0.0) >= min_conf
|
||
|
||
def neutral_ok(c: dict) -> bool:
|
||
if not exclude_neutral:
|
||
return True
|
||
action_direction = _action_direction(c.get("action"))
|
||
return action_direction != "neutral" and action_direction == c["direction"]
|
||
|
||
def tighteners_ok(c: dict) -> bool:
|
||
if require_high and (c.get("action") or "") not in HIGH_CONVICTION_ACTIONS:
|
||
return False
|
||
if exclude_conflicts and (c.get("risk_level") or "") != "Low":
|
||
return False
|
||
return True
|
||
|
||
def core_ok(c: dict) -> bool:
|
||
return bool(c["meets_core"])
|
||
|
||
variants: list[tuple[str, list]] = [
|
||
("all_floors", [core_ok]),
|
||
("no_confidence_floor", [rr_ok, neutral_ok, tighteners_ok]),
|
||
("no_rr_floor", [conf_ok, neutral_ok, tighteners_ok]),
|
||
("no_neutral_exclusion", [rr_ok, conf_ok, tighteners_ok]),
|
||
("momentum_only", []),
|
||
]
|
||
# Grade each variant under BOTH exit models: the target/stop outcome
|
||
# (_bucket_stats) and the hold-to-horizon time exit. A floor that pays under
|
||
# the target model may be meaningless once the exit is a fixed hold — the
|
||
# hold_* columns are what a time-exit gate decision should read.
|
||
hold_days = max(TIME_EXIT_DAYS)
|
||
rows: list[dict] = []
|
||
for name, checks in variants:
|
||
matching = [
|
||
c for c in candidates
|
||
if momentum_ok(c) and all(check(c) for check in checks)
|
||
]
|
||
hold = _time_exit_bucket(matching, hold_days)
|
||
rows.append({
|
||
"variant": name,
|
||
**_bucket_stats(matching),
|
||
"hold_days": hold_days,
|
||
"hold_avg_r": hold["avg_r"],
|
||
"hold_net_avg_r": hold["net_avg_r"],
|
||
"hold_total_r": hold["total_r"],
|
||
})
|
||
return rows
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Portfolio simulation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Book parameters: fixed starting capital, a capped number of concurrent
|
||
# positions (one per ticker), fixed-fractional risk sizing with a no-leverage
|
||
# notional cap, and the same per-side cost as the per-trade tables. Entries are
|
||
# the QUALIFIED setups at their detection close, best momentum first while
|
||
# slots and cash allow.
|
||
SIM_STARTING_CAPITAL = 10_000.0
|
||
SIM_MAX_POSITIONS = 10
|
||
SIM_RISK_PER_TRADE = 0.01 # fraction of equity risked per position (entry→stop)
|
||
SIM_NOTIONAL_CAP = 0.20 # max fraction of equity per position (no margin)
|
||
|
||
|
||
def _simulate_portfolio(
|
||
candidates: list[dict],
|
||
prices: dict[str, tuple],
|
||
spy_closes: dict | None,
|
||
exit_policy: str,
|
||
hold_days: int,
|
||
*,
|
||
qualified_fn: Callable[[dict], bool] | None = None,
|
||
ranking_key: str = PRODUCTION_PERCENTILE_KEY,
|
||
max_positions: int = SIM_MAX_POSITIONS,
|
||
risk_per_trade: float = SIM_RISK_PER_TRADE,
|
||
) -> dict | None:
|
||
"""Replay the qualified setups as ONE capital-constrained book and report
|
||
portfolio economics from the daily equity curve (return, CAGR, drawdown,
|
||
Sharpe) — the numbers the per-setup tables cannot give, because they grade
|
||
every setup as if capital were infinite.
|
||
|
||
``exit_policy``: "target" races the S/R target against the stop with a
|
||
timeout at ``hold_days``; "hold" keeps only the initial stop and exits at
|
||
the ``hold_days``-th close. Stops fill at the worse of stop or open (gaps
|
||
modeled); positions still open at the end are closed at their last mark.
|
||
Returns None when there is nothing to trade.
|
||
"""
|
||
if qualified_fn is None:
|
||
def _default_qualified(c: dict) -> bool:
|
||
return bool(c.get("qualified"))
|
||
|
||
qualified_fn = _default_qualified
|
||
|
||
entries_by_ord: dict[int, list[dict]] = defaultdict(list)
|
||
for c in candidates:
|
||
if not qualified_fn(c) or c.get("direction") != "long":
|
||
continue
|
||
if not c.get("entry") or not c.get("stop"):
|
||
continue
|
||
entries_by_ord[date.fromisoformat(c["date"]).toordinal()].append(c)
|
||
if not entries_by_ord:
|
||
return None
|
||
|
||
# Per-symbol bar lookup: date ordinal -> index into the column arrays.
|
||
index_of: dict[str, dict[int, int]] = {
|
||
sym: {o: i for i, o in enumerate(cols[0])} for sym, cols in prices.items()
|
||
}
|
||
|
||
first_ord = min(entries_by_ord)
|
||
calendar = sorted({o for cols in prices.values() for o in cols[0] if o >= first_ord})
|
||
if not calendar:
|
||
return None
|
||
|
||
cash = SIM_STARTING_CAPITAL
|
||
positions: dict[str, dict] = {}
|
||
curve: list[tuple[int, float]] = []
|
||
trades: list[dict] = []
|
||
skipped_full = 0
|
||
|
||
def _bar(sym: str, o: int):
|
||
idx = index_of.get(sym, {}).get(o)
|
||
if idx is None:
|
||
return None
|
||
cols = prices[sym]
|
||
return SimpleNamespace(
|
||
open=cols[1][idx], high=cols[2][idx], low=cols[3][idx], close=cols[4][idx]
|
||
)
|
||
|
||
def _close_trade(sym: str, fill: float, reason: str) -> None:
|
||
nonlocal cash
|
||
pos = positions.pop(sym)
|
||
proceeds = pos["shares"] * fill
|
||
cost = proceeds * COST_PER_SIDE
|
||
cash += proceeds - cost
|
||
risk = pos["entry"] - pos["stop"]
|
||
trades.append({
|
||
"pnl": proceeds - pos["shares"] * pos["entry"] - cost - pos["entry_cost"],
|
||
"r": (fill - pos["entry"]) / risk if risk > 0 else 0.0,
|
||
"hold": pos["bars_held"],
|
||
"reason": reason,
|
||
})
|
||
|
||
def _marked_equity() -> float:
|
||
return cash + sum(p["shares"] * p["last_close"] for p in positions.values())
|
||
|
||
for o in calendar:
|
||
# 1) exits on today's bars (stop intraday, target intraday, time at close)
|
||
for sym in list(positions):
|
||
pos = positions[sym]
|
||
bar = _bar(sym, o)
|
||
if bar is None:
|
||
continue
|
||
pos["bars_held"] += 1
|
||
pos["last_close"] = bar.close
|
||
if bar.low <= pos["stop"]:
|
||
# Same-bar stop+target resolves as the loss (conservative, like
|
||
# the evaluator); gap through the stop fills at the open.
|
||
_close_trade(sym, min(pos["stop"], bar.open), "stop")
|
||
continue
|
||
if exit_policy == "target" and pos["target"] and bar.high >= pos["target"]:
|
||
_close_trade(sym, pos["target"], "target")
|
||
continue
|
||
if pos["bars_held"] >= hold_days:
|
||
_close_trade(sym, bar.close, "time")
|
||
|
||
# 2) entries at today's close, best momentum first
|
||
equity = _marked_equity()
|
||
todays = sorted(
|
||
entries_by_ord.get(o, ()),
|
||
key=lambda c: c.get(ranking_key) or 0.0,
|
||
reverse=True,
|
||
)
|
||
for c in todays:
|
||
sym = c["symbol"]
|
||
if sym in positions:
|
||
continue
|
||
if len(positions) >= max_positions:
|
||
skipped_full += 1
|
||
continue
|
||
entry, stop = float(c["entry"]), float(c["stop"])
|
||
risk_ps = entry - stop
|
||
if risk_ps <= 0 or entry <= 0:
|
||
continue
|
||
shares = min(
|
||
(equity * risk_per_trade) / risk_ps,
|
||
(equity * SIM_NOTIONAL_CAP) / entry,
|
||
max(cash, 0.0) / (entry * (1.0 + COST_PER_SIDE)),
|
||
)
|
||
if shares * entry < 1.0: # can't fund a meaningful position
|
||
continue
|
||
entry_cost = shares * entry * COST_PER_SIDE
|
||
cash -= shares * entry + entry_cost
|
||
positions[sym] = {
|
||
"shares": shares,
|
||
"entry": entry,
|
||
"stop": stop,
|
||
"target": float(c["target"]) if c.get("target") else None,
|
||
"entry_cost": entry_cost,
|
||
"bars_held": 0,
|
||
"last_close": entry,
|
||
}
|
||
equity = _marked_equity()
|
||
|
||
curve.append((o, _marked_equity()))
|
||
|
||
# Close whatever is still open at its last mark so final equity is realized.
|
||
for sym in list(positions):
|
||
_close_trade(sym, positions[sym]["last_close"], "open_at_end")
|
||
final_equity = cash
|
||
curve[-1] = (calendar[-1], final_equity)
|
||
|
||
total_return_pct = (final_equity / SIM_STARTING_CAPITAL - 1.0) * 100.0
|
||
years = (calendar[-1] - calendar[0]) / 365.25
|
||
cagr_pct = (
|
||
((final_equity / SIM_STARTING_CAPITAL) ** (1.0 / years) - 1.0) * 100.0
|
||
if years > 0.25 and final_equity > 0
|
||
else None
|
||
)
|
||
|
||
peak = float("-inf")
|
||
max_dd = 0.0
|
||
for _, eq in curve:
|
||
peak = max(peak, eq)
|
||
if peak > 0:
|
||
max_dd = max(max_dd, (peak - eq) / peak)
|
||
|
||
rets = [b / a - 1.0 for (_, a), (_, b) in zip(curve, curve[1:]) if a > 0]
|
||
sharpe = None
|
||
if len(rets) > 2:
|
||
mean = sum(rets) / len(rets)
|
||
var = sum((x - mean) ** 2 for x in rets) / (len(rets) - 1)
|
||
if var > 0:
|
||
sharpe = mean / math.sqrt(var) * math.sqrt(252)
|
||
|
||
# Per-calendar-year returns off the equity curve — shows whether every year
|
||
# contributed or one exceptional stretch carried the result.
|
||
yearly: list[dict] = []
|
||
year_start_eq = curve[0][1]
|
||
cur_year = date.fromordinal(curve[0][0]).year
|
||
last_eq = curve[0][1]
|
||
for o, eq in curve:
|
||
y = date.fromordinal(o).year
|
||
if y != cur_year:
|
||
yearly.append({
|
||
"year": cur_year,
|
||
"return_pct": (
|
||
round((last_eq / year_start_eq - 1) * 100, 1) if year_start_eq > 0 else None
|
||
),
|
||
})
|
||
cur_year = y
|
||
year_start_eq = last_eq
|
||
last_eq = eq
|
||
yearly.append({
|
||
"year": cur_year,
|
||
"return_pct": (
|
||
round((last_eq / year_start_eq - 1) * 100, 1) if year_start_eq > 0 else None
|
||
),
|
||
})
|
||
|
||
pnls = [t["pnl"] for t in trades]
|
||
wins = sum(1 for p in pnls if p > 0)
|
||
spy_pct = None
|
||
if spy_closes:
|
||
from app.services.benchmark_service import benchmark_return_pct
|
||
|
||
spy_pct = benchmark_return_pct(
|
||
spy_closes, date.fromordinal(calendar[0]), date.fromordinal(calendar[-1])
|
||
)
|
||
|
||
return {
|
||
"starting_capital": SIM_STARTING_CAPITAL,
|
||
"final_equity": round(final_equity, 2),
|
||
"total_return_pct": round(total_return_pct, 1),
|
||
"cagr_pct": round(cagr_pct, 1) if cagr_pct is not None else None,
|
||
"max_drawdown_pct": round(max_dd * 100.0, 1),
|
||
"sharpe": round(sharpe, 2) if sharpe is not None else None,
|
||
"trades": len(trades),
|
||
"win_rate": round(wins / len(trades) * 100.0, 1) if trades else None,
|
||
"avg_trade_pnl": round(sum(pnls) / len(pnls), 2) if pnls else None,
|
||
"best_trade_r": round(max(t["r"] for t in trades), 2) if trades else None,
|
||
"worst_trade_r": round(min(t["r"] for t in trades), 2) if trades else None,
|
||
"best_trade_pnl": round(max(pnls), 2) if pnls else None,
|
||
"worst_trade_pnl": round(min(pnls), 2) if pnls else None,
|
||
"avg_hold_days": (
|
||
round(sum(t["hold"] for t in trades) / len(trades), 1) if trades else None
|
||
),
|
||
"skipped_book_full": skipped_full,
|
||
"spy_return_pct": round(spy_pct, 1) if spy_pct is not None else None,
|
||
"yearly_returns": yearly,
|
||
"start_date": date.fromordinal(calendar[0]).isoformat(),
|
||
"end_date": date.fromordinal(calendar[-1]).isoformat(),
|
||
}
|
||
|
||
|
||
STRATEGY_VARIANTS: tuple[dict, ...] = (
|
||
{
|
||
"variant": "production_residual_80_fixed10",
|
||
"label": "Production residual 80 / max 10",
|
||
"percentile_key": PRODUCTION_PERCENTILE_KEY,
|
||
"cutoff": 80.0,
|
||
"max_positions": 10,
|
||
"risk_per_trade": 0.01,
|
||
"risk_scale": None,
|
||
},
|
||
{
|
||
"variant": "legacy_raw_80_fixed10",
|
||
"label": "Legacy raw 80 / max 10",
|
||
"percentile_key": RAW_PERCENTILE_KEY,
|
||
"cutoff": 80.0,
|
||
"max_positions": 10,
|
||
"risk_per_trade": 0.01,
|
||
"risk_scale": None,
|
||
},
|
||
{
|
||
"variant": "raw_90_fixed10",
|
||
"label": "Raw 90 / max 10",
|
||
"percentile_key": RAW_PERCENTILE_KEY,
|
||
"cutoff": 90.0,
|
||
"max_positions": 10,
|
||
"risk_per_trade": 0.01,
|
||
"risk_scale": None,
|
||
},
|
||
{
|
||
"variant": "residual_80_fixed15",
|
||
"label": "Residual 80 / max 15 capacity check",
|
||
"percentile_key": PRODUCTION_PERCENTILE_KEY,
|
||
"cutoff": 80.0,
|
||
"max_positions": 15,
|
||
"risk_per_trade": 0.01,
|
||
"risk_scale": None,
|
||
},
|
||
)
|
||
|
||
|
||
def _qualifies_by_percentile(cand: dict, percentile_key: str, threshold: float) -> bool:
|
||
"""Variant qualification: production floors + long-only signal percentile.
|
||
This does not mutate or replace the production ``qualified`` field."""
|
||
if not cand.get("meets_core"):
|
||
return False
|
||
if threshold <= 0:
|
||
return True
|
||
if cand.get("direction") == "short":
|
||
return False
|
||
pct = cand.get(percentile_key)
|
||
return pct is not None and pct >= threshold
|
||
|
||
|
||
def _strategy_variant_sims(
|
||
candidates: list[dict],
|
||
prices: dict[str, tuple],
|
||
_spy_closes: dict[date, float] | None,
|
||
hold_days: int,
|
||
) -> list[dict]:
|
||
"""Research-only portfolio variants for comparing rank signals, cutoff and
|
||
book capacity. Live qualification is untouched."""
|
||
rows: list[dict] = []
|
||
for cfg in STRATEGY_VARIANTS:
|
||
percentile_key = str(cfg["percentile_key"])
|
||
cutoff = float(cfg["cutoff"])
|
||
sim = _simulate_portfolio(
|
||
candidates,
|
||
prices,
|
||
_spy_closes,
|
||
"hold",
|
||
hold_days,
|
||
qualified_fn=lambda c, pk=percentile_key, th=cutoff: _qualifies_by_percentile(c, pk, th),
|
||
ranking_key=percentile_key,
|
||
max_positions=int(cfg["max_positions"]),
|
||
risk_per_trade=float(cfg["risk_per_trade"]),
|
||
)
|
||
if sim is None:
|
||
continue
|
||
rows.append({
|
||
"variant": cfg["variant"],
|
||
"label": cfg["label"],
|
||
"ranking": "raw" if percentile_key == RAW_PERCENTILE_KEY else "residual",
|
||
"cutoff": cutoff,
|
||
"max_positions": int(cfg["max_positions"]),
|
||
"risk_per_trade_pct": round(float(cfg["risk_per_trade"]) * 100, 2),
|
||
"risk_scale": cfg["risk_scale"],
|
||
**sim,
|
||
})
|
||
return rows
|
||
|
||
|
||
def _pct_loss(base: float | None, candidate: float | None) -> float | None:
|
||
if base is None or candidate is None or base <= 0:
|
||
return None
|
||
return (base - candidate) / base
|
||
|
||
|
||
def _build_research_recommendation(report: dict) -> dict:
|
||
"""Advisory rules for the remaining research variants after residual promotion."""
|
||
variants = {
|
||
v.get("variant"): v
|
||
for v in (report.get("strategy_variants") or {}).get("variants", [])
|
||
}
|
||
base = variants.get("production_residual_80_fixed10")
|
||
items: list[dict] = []
|
||
if base is None:
|
||
return {
|
||
"items": [],
|
||
"note": "Strategy variants unavailable; re-run the backtest after benchmark data is present.",
|
||
}
|
||
|
||
base_sharpe = base.get("sharpe")
|
||
base_dd = base.get("max_drawdown_pct")
|
||
base_cagr = base.get("cagr_pct")
|
||
|
||
capacity = variants.get("residual_80_fixed15")
|
||
if (
|
||
capacity and base_sharpe is not None and base_cagr is not None
|
||
and capacity.get("sharpe") is not None and capacity.get("cagr_pct") is not None
|
||
and capacity.get("max_drawdown_pct") is not None and base_dd is not None
|
||
):
|
||
candidate = (
|
||
capacity["sharpe"] > base_sharpe
|
||
and capacity["cagr_pct"] > base_cagr
|
||
and capacity["max_drawdown_pct"] <= base_dd + 1.0
|
||
)
|
||
items.append({
|
||
"topic": "capacity_15",
|
||
"candidate": candidate,
|
||
"text": (
|
||
f"Max-15 capacity {'is worth promoting' if candidate else 'is not needed yet'}: "
|
||
f"Sharpe {capacity['sharpe']:.2f} vs {base_sharpe:.2f}, "
|
||
f"CAGR {capacity['cagr_pct']:+.1f}% vs {base_cagr:+.1f}%, "
|
||
f"skipped {capacity.get('skipped_book_full', 0)} vs {base.get('skipped_book_full', 0)}."
|
||
),
|
||
})
|
||
|
||
raw_90s = [
|
||
v for key, v in variants.items()
|
||
if key.startswith("raw_90_") and v.get("risk_scale") is None
|
||
]
|
||
raw_90 = max(raw_90s, key=lambda v: v.get("sharpe") or -999, default=None)
|
||
if (
|
||
raw_90 and base_sharpe is not None and base_dd is not None and base_cagr is not None
|
||
and raw_90.get("sharpe") is not None and raw_90.get("cagr_pct") is not None
|
||
and raw_90.get("max_drawdown_pct") is not None
|
||
):
|
||
cagr_loss = _pct_loss(base_cagr, raw_90.get("cagr_pct"))
|
||
raw_90_sharpe = raw_90.get("sharpe")
|
||
candidate = (
|
||
raw_90_sharpe is not None
|
||
and raw_90_sharpe > base_sharpe
|
||
and raw_90["max_drawdown_pct"] < base_dd
|
||
and cagr_loss is not None and cagr_loss < 0.10
|
||
)
|
||
items.append({
|
||
"topic": "cutoff_90",
|
||
"candidate": candidate,
|
||
"text": (
|
||
f"Cutoff 90 {'is a promotion candidate' if candidate else 'stays research-only'}: "
|
||
f"{raw_90['label']} Sharpe {raw_90_sharpe:.2f} vs {base_sharpe:.2f}, "
|
||
f"drawdown {raw_90['max_drawdown_pct']:.1f}% vs {base_dd:.1f}%, "
|
||
f"CAGR {raw_90.get('cagr_pct'):+.1f}% vs {base_cagr:+.1f}%."
|
||
),
|
||
})
|
||
|
||
return {
|
||
"items": items,
|
||
"note": (
|
||
"Residual 12-1 momentum is now the production activation rank. "
|
||
"Remaining rows are research comparisons only."
|
||
),
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Data-driven recommendation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# A floor whose removal costs less than this (R net per trade, under the hold
|
||
# exit) is judged not to be pulling its weight.
|
||
_FLOOR_KEEP_THRESHOLD = 0.02
|
||
# The hold exit must beat the target exit by at least this much to be advised.
|
||
_EXIT_SWITCH_THRESHOLD = 0.05
|
||
|
||
|
||
def _build_recommendation(report: dict) -> dict:
|
||
"""Strategy advice derived from THIS report's numbers — recomputed every
|
||
run, so if the data flips, the advice flips. Rules are deliberately simple
|
||
and transparent; thresholds are module constants above."""
|
||
items: list[dict] = []
|
||
q = report.get("overall_qualified") or {}
|
||
target_net = q.get("net_avg_r")
|
||
|
||
# Exit policy: the production target/stop race vs the best fixed hold.
|
||
time_rows = [r for r in report.get("time_exit_sweep") or [] if r.get("net_avg_r") is not None]
|
||
best_hold = max(time_rows, key=lambda r: r["net_avg_r"], default=None)
|
||
sim_rows = {
|
||
p.get("policy"): p
|
||
for p in (report.get("portfolio_sim") or {}).get("policies", [])
|
||
}
|
||
hold_sim = sim_rows.get("hold")
|
||
if best_hold is not None and target_net is not None:
|
||
if best_hold["net_avg_r"] > target_net + _EXIT_SWITCH_THRESHOLD:
|
||
text = (
|
||
f"Exit: hold {best_hold['hold_days']} trading days with the initial stop "
|
||
f"({best_hold['net_avg_r']:+.2f}R net/trade vs {target_net:+.2f}R for the S/R target exit)."
|
||
)
|
||
target_sim = sim_rows.get("target")
|
||
if (
|
||
hold_sim is not None and target_sim is not None
|
||
and hold_sim.get("cagr_pct") is not None and target_sim.get("cagr_pct") is not None
|
||
):
|
||
text += (
|
||
f" The simulated book agrees: {hold_sim['cagr_pct']:+.1f}% vs "
|
||
f"{target_sim['cagr_pct']:+.1f}% CAGR at similar drawdown."
|
||
)
|
||
items.append({"topic": "exit", "text": text})
|
||
else:
|
||
items.append({
|
||
"topic": "exit",
|
||
"text": (
|
||
f"Exit: keep the S/R target exit ({target_net:+.2f}R net/trade) — "
|
||
"no fixed hold beats it by a meaningful margin."
|
||
),
|
||
})
|
||
|
||
# Gate floors, judged under the hold exit (the ablation's Hold column).
|
||
ablation = {r["variant"]: r for r in report.get("gate_ablation") or []}
|
||
base_row = ablation.get("all_floors")
|
||
base_hold = (base_row or {}).get("hold_net_avg_r")
|
||
floor_labels = {
|
||
"no_confidence_floor": "confidence floor",
|
||
"no_rr_floor": "R:R floor",
|
||
"no_neutral_exclusion": "NEUTRAL exclusion",
|
||
}
|
||
if base_hold is not None:
|
||
for variant, label in floor_labels.items():
|
||
row = ablation.get(variant)
|
||
if row is None or row.get("hold_net_avg_r") is None:
|
||
continue
|
||
delta = base_hold - row["hold_net_avg_r"]
|
||
extra = row["total"] - base_row["total"]
|
||
if delta <= _FLOOR_KEEP_THRESHOLD:
|
||
items.append({
|
||
"topic": "gate",
|
||
"text": (
|
||
f"Gate: the {label} adds nothing — dropping it costs {delta:+.2f}R/trade "
|
||
f"and adds {extra} trades."
|
||
),
|
||
})
|
||
else:
|
||
items.append({
|
||
"topic": "gate",
|
||
"text": f"Gate: keep the {label} (worth {delta:+.2f}R/trade under the hold exit).",
|
||
})
|
||
|
||
# Activation cutoff: best per-trade net among the promoted residual-momentum
|
||
# sweep rows.
|
||
sweep_rows = [
|
||
r for r in report.get("sweep") or []
|
||
if r.get("net_avg_r") is not None and (r.get("min_momentum_percentile") or 0) > 0
|
||
]
|
||
if sweep_rows:
|
||
best_cut = max(sweep_rows, key=lambda r: r["net_avg_r"])
|
||
items.append({
|
||
"topic": "cutoff",
|
||
"text": (
|
||
f"Residual-momentum cutoff: {best_cut['min_momentum_percentile']:.0f} has the best "
|
||
f"per-trade net ({best_cut['net_avg_r']:+.2f}R over {best_cut['total']} setups)."
|
||
),
|
||
})
|
||
|
||
# Book vs benchmark.
|
||
book = hold_sim or sim_rows.get("target")
|
||
if book is not None and book.get("spy_return_pct") is not None:
|
||
edge = book["total_return_pct"] - book["spy_return_pct"]
|
||
verdict = "beats" if edge > 0 else "LAGS"
|
||
items.append({
|
||
"topic": "benchmark",
|
||
"text": (
|
||
f"Book vs SPY: {verdict} buy-and-hold by {edge:+.1f} points "
|
||
f"({book['total_return_pct']:+.1f}% vs {book['spy_return_pct']:+.1f}%), "
|
||
f"max drawdown −{book['max_drawdown_pct']:.1f}%."
|
||
),
|
||
})
|
||
|
||
# Robustness: does the edge survive without the biggest winners? Judged on
|
||
# the RECOMMENDED exit — outlier dependence under an exit we'd abandon
|
||
# would be the wrong warning.
|
||
hold_recommended = (
|
||
best_hold is not None and target_net is not None
|
||
and best_hold["net_avg_r"] > target_net + _EXIT_SWITCH_THRESHOLD
|
||
)
|
||
if hold_recommended and best_hold.get("net_avg_r_ex_top5") is not None:
|
||
trimmed = best_hold["net_avg_r_ex_top5"]
|
||
basis = f"under the recommended {best_hold['hold_days']}d hold"
|
||
else:
|
||
trimmed = q.get("net_avg_r_ex_top5")
|
||
basis = "under the S/R target exit"
|
||
if trimmed is not None:
|
||
if trimmed > 0:
|
||
items.append({
|
||
"topic": "robustness",
|
||
"text": (
|
||
f"Robustness: expectancy survives removing the top 5% of winners "
|
||
f"({trimmed:+.2f}R net/trade {basis}) — the edge is not a handful "
|
||
"of outliers."
|
||
),
|
||
})
|
||
else:
|
||
items.append({
|
||
"topic": "robustness",
|
||
"text": (
|
||
f"Robustness WARNING: without the top 5% of winners the edge disappears "
|
||
f"({trimmed:+.2f}R net/trade {basis}) — outlier-dependent, treat the "
|
||
"headline expectancy with caution."
|
||
),
|
||
})
|
||
|
||
headline = None
|
||
if hold_recommended:
|
||
cagr_note = (
|
||
f" (~{hold_sim['cagr_pct']:.0f}% CAGR simulated)"
|
||
if hold_sim is not None and hold_sim.get("cagr_pct") is not None
|
||
else ""
|
||
)
|
||
headline = (
|
||
f"Trade the qualified list long-only; hold {best_hold['hold_days']} trading days "
|
||
f"with the initial ATR stop{cagr_note}."
|
||
)
|
||
|
||
return {
|
||
"headline": headline,
|
||
"items": items,
|
||
"note": "Derived from this report's numbers on every run — the advice flips if the data does.",
|
||
}
|
||
|
||
|
||
async def run_backtest(
|
||
db: AsyncSession,
|
||
progress_cb: Callable[[int, int, str], None] | None = None,
|
||
) -> dict:
|
||
"""Replay every ticker and aggregate the Phase-1 reports for the current config."""
|
||
config = await get_recommendation_config(db)
|
||
activation = await get_activation_config(db)
|
||
|
||
result = await db.execute(select(Ticker).order_by(Ticker.symbol))
|
||
tickers = list(result.scalars().all())
|
||
total = len(tickers)
|
||
|
||
candidates: list[dict] = []
|
||
# collected[signal_name][iso_week] -> list of (signal_value, forward_return)
|
||
collected: dict = defaultdict(lambda: defaultdict(list))
|
||
|
||
# Residual momentum needs a point-in-time benchmark return stream. Best-effort:
|
||
# if SPY benchmark data is unavailable, the residual signal simply won't be
|
||
# emitted and the rest of the report remains valid.
|
||
benchmark_closes: dict[date, float] = {}
|
||
try:
|
||
benchmark_closes = await _load_benchmark_closes_for_backtest(
|
||
db, days=settings.ohlcv_history_days + 365
|
||
)
|
||
except Exception:
|
||
logger.exception("Benchmark load for residual momentum failed")
|
||
|
||
def _merge(result: tuple[list[dict], dict]) -> None:
|
||
cands, series = result
|
||
candidates.extend(cands)
|
||
for name, weeks in series.items():
|
||
for wk, pairs in weeks.items():
|
||
collected[name][wk].extend(pairs)
|
||
|
||
workers = _backtest_worker_count()
|
||
ctx = _mp_context() if (workers > 1 and total > 1) else None
|
||
loop = asyncio.get_running_loop()
|
||
|
||
pool = None
|
||
if ctx is not None:
|
||
try:
|
||
pool = ProcessPoolExecutor(max_workers=workers, mp_context=ctx)
|
||
except Exception:
|
||
logger.exception("Backtest process pool unavailable; falling back to sequential")
|
||
|
||
if pool is not None:
|
||
# Parallel: replay tickers across worker processes — true multi-core, since
|
||
# the GIL only serializes work within a single process. Bars are fetched in
|
||
# the event loop (ORM-safe) and a bounded batch is fanned out to the pool.
|
||
logger.info(json.dumps({
|
||
"event": "backtest_parallel", "workers": workers,
|
||
"start_method": ctx.get_start_method(),
|
||
}))
|
||
chunk = workers * 2
|
||
done = 0
|
||
with pool:
|
||
for start in range(0, total, chunk):
|
||
batch = tickers[start : start + chunk]
|
||
futures = []
|
||
for ticker in batch:
|
||
try:
|
||
columns = await _fetch_columns(db, ticker.symbol)
|
||
except Exception:
|
||
logger.exception("Backtest fetch failed for %s", ticker.symbol)
|
||
continue
|
||
if columns is not None:
|
||
futures.append(loop.run_in_executor(
|
||
pool, _replay_and_signals, ticker.symbol, columns, config, activation,
|
||
benchmark_closes,
|
||
))
|
||
for result in await asyncio.gather(*futures, return_exceptions=True):
|
||
if isinstance(result, Exception):
|
||
logger.error(json.dumps({"event": "backtest_worker_error", "message": str(result)}))
|
||
else:
|
||
_merge(result)
|
||
done += len(batch)
|
||
if progress_cb is not None:
|
||
progress_cb(min(done, total), total, "")
|
||
else:
|
||
# Sequential fallback (Windows / 1 worker): run each replay in a worker
|
||
# thread so the event loop — and the API server — stays responsive.
|
||
for index, ticker in enumerate(tickers):
|
||
if progress_cb is not None:
|
||
progress_cb(index, total, ticker.symbol)
|
||
try:
|
||
columns = await _fetch_columns(db, ticker.symbol)
|
||
if columns is not None:
|
||
_merge(await asyncio.to_thread(
|
||
_replay_and_signals, ticker.symbol, columns, config, activation,
|
||
benchmark_closes,
|
||
))
|
||
except Exception:
|
||
logger.exception("Backtest replay failed for %s", ticker.symbol)
|
||
|
||
if progress_cb is not None and total:
|
||
progress_cb(total, total, "")
|
||
|
||
# Cross-sectional momentum: rank every week's universe, then "qualified" means
|
||
# floors + top ``min_momentum_percentile`` by promoted residual 12-1 momentum
|
||
# (raw 12-1 fallback only when benchmark data is unavailable).
|
||
_assign_momentum_percentiles(candidates)
|
||
_assign_residual_momentum_percentiles(candidates)
|
||
_assign_activation_momentum_percentiles(candidates)
|
||
current_min_pct = float(activation.get("min_momentum_percentile", 80.0))
|
||
for c in candidates:
|
||
c["qualified"] = _momentum_qualifies(c, current_min_pct)
|
||
|
||
qualified = [c for c in candidates if c["qualified"]]
|
||
longs = [c for c in qualified if c["direction"] == "long"]
|
||
shorts = [c for c in qualified if c["direction"] == "short"]
|
||
|
||
# Threshold sweep: re-apply the momentum gate at several percentile cutoffs
|
||
# (floors held fixed) so the trade-off between how many setups qualify and
|
||
# their expectancy is visible without re-replaying. 0 = floors only.
|
||
sweep = []
|
||
for threshold in (90.0, 80.0, 70.0, 60.0, 50.0, 0.0):
|
||
cands = [c for c in candidates if _momentum_qualifies(c, threshold)]
|
||
sweep.append({"min_momentum_percentile": threshold, **_bucket_stats(cands)})
|
||
|
||
# Portfolio simulation: re-fetch bars for just the qualified symbols (memory-
|
||
# light vs retaining every ticker's columns through the replay) and replay
|
||
# the book once per exit policy. Best-effort — the report stands without it.
|
||
hold_horizon = max(TIME_EXIT_DAYS)
|
||
sim_policies: list[dict] = []
|
||
strategy_variant_rows: list[dict] = []
|
||
try:
|
||
qual_symbols = sorted({
|
||
c["symbol"]
|
||
for c in candidates
|
||
if c.get("qualified")
|
||
or any(
|
||
_qualifies_by_percentile(
|
||
c, str(cfg["percentile_key"]), float(cfg["cutoff"])
|
||
)
|
||
for cfg in STRATEGY_VARIANTS
|
||
)
|
||
})
|
||
price_columns: dict[str, tuple] = {}
|
||
for sym in qual_symbols:
|
||
cols = await _fetch_columns(db, sym)
|
||
if cols is not None:
|
||
price_columns[sym] = cols
|
||
|
||
spy_closes: dict | None = None
|
||
try:
|
||
oldest = min((cols[0][0] for cols in price_columns.values()), default=None)
|
||
days_needed = None
|
||
if oldest is not None and not _offline_snapshot_mode():
|
||
days_needed = (date.today() - date.fromordinal(oldest)).days + 30
|
||
spy_closes = await _load_benchmark_closes_for_backtest(
|
||
db, days=days_needed, refresh=oldest is not None
|
||
)
|
||
except Exception:
|
||
logger.exception("Benchmark load for the portfolio sim failed")
|
||
|
||
for policy in ("target", "hold"):
|
||
sim = _simulate_portfolio(
|
||
candidates, price_columns, spy_closes, policy, hold_horizon
|
||
)
|
||
if sim is not None:
|
||
sim_policies.append({"policy": policy, **sim})
|
||
strategy_variant_rows = _strategy_variant_sims(
|
||
candidates, price_columns, spy_closes, hold_horizon
|
||
)
|
||
except Exception:
|
||
logger.exception("Portfolio simulation failed")
|
||
|
||
report = {
|
||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||
"tickers": total,
|
||
"candidates": len(candidates),
|
||
"qualified": len(qualified),
|
||
"params": {
|
||
"step_days": STEP_DAYS,
|
||
"horizon_days": HORIZON,
|
||
"min_lookback": MIN_LOOKBACK,
|
||
"cost_per_side_pct": round(COST_PER_SIDE * 100, 3),
|
||
},
|
||
"activation": activation,
|
||
"overall_qualified": _bucket_stats(qualified),
|
||
"overall_all": _bucket_stats(candidates),
|
||
"by_direction": {
|
||
"long": _bucket_stats(longs),
|
||
"short": _bucket_stats(shorts),
|
||
},
|
||
"min_momentum_percentile": current_min_pct,
|
||
"sweep": sweep,
|
||
"gate_ablation": _gate_ablation(candidates, activation, current_min_pct),
|
||
"gate_ablation_note": (
|
||
"Each row re-qualifies the same candidates at the current momentum "
|
||
f"cutoff ({current_min_pct:.0f}) with one floor removed (long-only "
|
||
"while the momentum gate is active). If dropping a floor doesn't "
|
||
"hurt net expectancy, that floor isn't pulling its weight. The Hold "
|
||
"columns grade the same variants under the hold-to-horizon time exit "
|
||
"instead of the S/R target — the view that matters if the exit "
|
||
"policy moves to a fixed hold."
|
||
),
|
||
"time_exit_sweep": [_time_exit_bucket(qualified, n) for n in TIME_EXIT_DAYS],
|
||
"portfolio_sim": {
|
||
"params": {
|
||
"starting_capital": SIM_STARTING_CAPITAL,
|
||
"max_positions": SIM_MAX_POSITIONS,
|
||
"risk_per_trade_pct": round(SIM_RISK_PER_TRADE * 100, 2),
|
||
"notional_cap_pct": round(SIM_NOTIONAL_CAP * 100, 1),
|
||
"cost_per_side_pct": round(COST_PER_SIDE * 100, 3),
|
||
"hold_days": hold_horizon,
|
||
},
|
||
"policies": sim_policies,
|
||
"note": (
|
||
"One capital-constrained book over the same qualified setups the "
|
||
"tables above grade per-setup: at most "
|
||
f"{SIM_MAX_POSITIONS} concurrent positions (one per ticker), best "
|
||
"momentum first, fixed-fractional risk sizing with a no-leverage "
|
||
"cap, entries at the detection close, stops filled at the worse "
|
||
"of stop or open. 'target' races the S/R target against the stop "
|
||
"(timeout at the horizon); 'hold' keeps the initial stop and "
|
||
"exits at the horizon close. SPY return is price-only over the "
|
||
"same window. In-sample; no dividends."
|
||
),
|
||
},
|
||
"strategy_variants": {
|
||
"variants": strategy_variant_rows,
|
||
"note": (
|
||
"Research-only hold-to-horizon portfolio variants. Production now "
|
||
"uses residual 12-1 momentum at cutoff 80; the remaining rows compare "
|
||
"the legacy raw rank, raw cutoff 90, and one max-15 capacity check."
|
||
),
|
||
},
|
||
"signal_eval": _signal_evaluation(collected),
|
||
"signal_eval_note": (
|
||
"Cross-sectional rank-IC of price-only signals vs the forward "
|
||
f"{HORIZON}-day return (min {MIN_CROSS_SECTION} names/window). |IC| ≳ "
|
||
"0.03 with a consistent sign is a real (if small) edge; near 0 means "
|
||
"ranking on it sorts nothing. Momentum factors and high_52w are expected "
|
||
"positive; reversal_1m and vol_6m expected negative (mean-reversion / "
|
||
"low-vol anomaly). IC is measured on non-overlapping windows; signals "
|
||
f"with fewer than {MIN_RELIABLE_PERIODS} independent windows are flagged "
|
||
"unreliable (too few regimes — deepen history with the Data Backfill job)."
|
||
),
|
||
"note": (
|
||
"Sentiment & fundamentals held neutral (no point-in-time history). "
|
||
"Stops fill at the worse of the stop or the bar's open (gaps through "
|
||
"the stop are modeled, so a loss can exceed −1R); targets never fill "
|
||
"better than their level. "
|
||
"~6 months ≈ one market regime — treat as directional, not gospel."
|
||
),
|
||
}
|
||
report["recommendation"] = _build_recommendation(report)
|
||
report["research_recommendation"] = _build_research_recommendation(report)
|
||
return report
|
||
|
||
|
||
async def run_and_store(
|
||
db: AsyncSession,
|
||
progress_cb: Callable[[int, int, str], None] | None = None,
|
||
) -> dict:
|
||
"""Run the backtest and cache the report in a SystemSetting. Job entrypoint."""
|
||
report = await run_backtest(db, progress_cb)
|
||
await update_setting(db, KEY_REPORT, json.dumps(report))
|
||
return report
|
||
|
||
|
||
async def get_backtest_report(db: AsyncSession) -> dict | None:
|
||
"""Return the last cached backtest report, or None if never run."""
|
||
setting = await settings_store.get_setting(db, KEY_REPORT)
|
||
if setting is None:
|
||
return None
|
||
try:
|
||
return json.loads(setting.value)
|
||
except (TypeError, ValueError):
|
||
return None
|