Files
signal-platform/app/services/backtest_service.py
T
dennisthiessen 402025692a
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 40s
Deploy / deploy (push) Successful in 26s
add cross-sectional signal evaluation (factor rank-IC) to the backtest
The per-setup hit-rate report can't tell whether a signal predicts returns —
only how a target/stop structure built on one performs. This adds a
cross-sectional factor-IC pass: each week the universe is ranked by a price-only
signal and graded by its rank correlation (Spearman IC) and top-minus-bottom-
quintile spread against the forward 30-day return.

Candidate signals (point-in-time from price; sentiment/fundamentals have no
history in the replay): 12-1/6-1/3-1 month momentum, 1-month reversal,
price-vs-200d SMA, proximity to the 52-week high (George/Hwang), and 126-day
realized volatility (low-vol anomaly).

Reuses the existing per-ticker replay loop (no new data, no second DB pass);
results land in the cached backtest_report as `signal_eval` and render as a
"Signal edge" table in BacktestPanel beside the calibration curve.

330 backend tests pass (10 new in test_signal_eval); frontend build clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 17:58:40 +02:00

555 lines
21 KiB
Python

"""Historical backtest (Phase 1): replay the price-derived engine over stored
OHLCV and measure how the CURRENT config would have performed.
For each ticker we step through history (weekly), and at each as-of date D we
rebuild the setup using only bars ≤ D (no lookahead), then walk the actual bars
after D to record the realized outcome. Two reports come out:
- realized hit-rate / expectancy of qualified setups (and of all setups)
- a probability calibration curve: do "60% likely" targets hit ~60% of the time?
Limitation: sentiment and fundamentals have no point-in-time history, so they're
held neutral here — this calibrates the price/S-R/probability machinery only.
"""
from __future__ import annotations
import json
import logging
import math
from collections import defaultdict
from collections.abc import Callable
from datetime import datetime, timezone
from types import SimpleNamespace
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.settings import SystemSetting
from app.models.ticker import Ticker
from app.services.admin_service import get_activation_config, update_setting
from app.services.indicator_service import _extract_ohlcv, compute_atr
from app.services.outcome_service import (
OUTCOME_AMBIGUOUS,
OUTCOME_STOP_HIT,
OUTCOME_TARGET_HIT,
Bar,
evaluate_setup_against_bars,
)
from app.services.price_service import query_ohlcv
from app.services.qualification import (
best_target_probability,
expected_value_r,
setup_qualifies,
)
from app.services.recommendation_service import (
_choose_recommended_action,
_classify_by_probability,
_risk_level_from_conflicts,
_select_primary_target,
_zone_representative_levels,
direction_analyzer,
get_recommendation_config,
probability_estimator,
signal_conflict_detector,
target_generator,
)
from app.services.scoring_service import (
compute_momentum_from_closes,
compute_technical_from_arrays,
)
from app.services.sr_service import detect_sr_levels
logger = logging.getLogger(__name__)
KEY_REPORT = "backtest_report"
STEP_DAYS = 5 # weekly cadence (≈ 5 trading days)
MIN_LOOKBACK = 60 # bars needed before D for indicators (EMA cross needs 51)
HORIZON = 30 # trading days to resolve an outcome (matches the evaluator)
ATR_MULTIPLIER = 1.5
_CAL_BUCKETS = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100.01)]
# Cross-sectional signal evaluation (factor IC). Each candidate signal is a
# point-in-time number computed from closes alone (sentiment/fundamentals have no
# history here), sampled one as-of per ISO week, and graded by how its rank
# correlates with the forward HORIZON-day return ACROSS the universe — i.e. does
# ranking stocks by this signal sort tomorrow's winners from losers. This is the
# test the per-setup hit-rate report can't do: it measures predictive power of a
# signal, not the outcome of a target/stop structure built on top of one.
MIN_CROSS_SECTION = 20 # min tickers present in a week to score that week
def _wrap_levels(level_dicts: list[dict]) -> list[Any]:
return [
SimpleNamespace(
id=i,
price_level=float(d["price_level"]),
type=d["type"],
strength=int(d["strength"]),
)
for i, d in enumerate(level_dicts)
]
def _window_setups(
window_records: list,
config: dict,
activation: dict,
) -> list[dict]:
"""Rebuild the setup(s) at the last bar of ``window_records`` (the as-of date),
using only those bars. Returns one dict per tradeable direction."""
if len(window_records) < MIN_LOOKBACK:
return []
_, highs, lows, closes, volumes = _extract_ohlcv(window_records)
entry = closes[-1]
if entry <= 0:
return []
try:
atr = compute_atr(highs, lows, closes)["atr"]
except Exception:
return []
if atr <= 0:
return []
sr_levels = _wrap_levels(detect_sr_levels(highs, lows, closes, volumes))
if not sr_levels:
return []
technical = (compute_technical_from_arrays(highs, lows, closes, volumes)[0]) or 50.0
momentum = (compute_momentum_from_closes(closes)[0]) or 50.0
dim_scores = {"technical": technical, "momentum": momentum}
conflicts = signal_conflict_detector.detect_conflicts(dim_scores, None, config)
confidences = {
"long": direction_analyzer.calculate_confidence("long", dim_scores, None, conflicts),
"short": direction_analyzer.calculate_confidence("short", dim_scores, None, conflicts),
}
# First pass: build targets per direction
per_dir: dict[str, dict] = {}
for direction in ("long", "short"):
stop = entry - atr * ATR_MULTIPLIER if direction == "long" else entry + atr * ATR_MULTIPLIER
zone_levels = _zone_representative_levels(sr_levels, entry)
targets = target_generator.generate_targets(direction, entry, stop, zone_levels, atr)
if not targets:
continue
for t in targets:
t["probability"] = probability_estimator.estimate_probability(
t, dim_scores, None, direction, config
)
t["classification"] = _classify_by_probability(t["probability"])
primary = _select_primary_target(targets)
if primary is None:
continue
# Flag the primary so qualification's EV uses the primary target's
# probability (matching production's enhance_trade_setup).
for t in targets:
t["is_primary"] = t is primary
per_dir[direction] = {"stop": stop, "targets": targets, "primary": primary}
available = set(per_dir.keys())
if not available:
return []
action = _choose_recommended_action(confidences["long"], confidences["short"], config, available)
out: list[dict] = []
for direction, data in per_dir.items():
targets, primary, stop = data["targets"], data["primary"], data["stop"]
setup_conflicts = list(conflicts)
if len(targets) < 3:
setup_conflicts.append("target-availability: Fewer than 3 valid S/R targets available")
risk_level = _risk_level_from_conflicts(setup_conflicts)
rr = float(primary["rr_ratio"])
target_price = float(primary["price"])
setup_ns = SimpleNamespace(
rr_ratio=rr,
confidence_score=confidences[direction],
recommended_action=action,
risk_level=risk_level,
targets=targets,
direction=direction,
target=target_price,
stop_loss=stop,
entry_price=entry,
)
# meets_core = clears every gate EXCEPT the expected-value floor, so the
# report can sweep the min_expected_value threshold without re-replaying.
core_config = {**activation, "min_expected_value": float("-inf")}
meets_core = setup_qualifies(setup_ns, core_config)
ev = expected_value_r(setup_ns)
best_prob = best_target_probability(setup_ns)
min_ev = float(activation.get("min_expected_value", 0.0))
out.append({
"direction": direction,
"entry": entry,
"stop": stop,
"target": target_price,
"rr": rr,
"confidence": confidences[direction],
"primary_prob": float(primary["probability"]),
"best_prob": best_prob,
"ev": ev,
"meets_core": meets_core,
"action": action,
"risk_level": risk_level,
"qualified": meets_core and ev is not None and ev >= min_ev,
})
return out
def _replay_ticker(symbol: str, records: list, config: dict, activation: dict) -> list[dict]:
"""Walk one ticker's history weekly, building setups and their realized outcomes."""
candidates: list[dict] = []
n = len(records)
if n < MIN_LOOKBACK + HORIZON:
return candidates
for i in range(MIN_LOOKBACK - 1, n - HORIZON, STEP_DAYS):
window = records[: i + 1]
forward = records[i + 1 :]
forward_bars = [Bar(date=r.date, high=r.high, low=r.low) for r in forward]
for s in _window_setups(window, config, activation):
outcome, _ = evaluate_setup_against_bars(
s["direction"], s["stop"], s["target"], forward_bars, HORIZON
)
if outcome is None:
continue
target_hit = outcome == OUTCOME_TARGET_HIT
if outcome == OUTCOME_TARGET_HIT:
realized_r = s["rr"]
elif outcome in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS):
realized_r = -1.0
else: # expired
realized_r = 0.0
candidates.append({
"symbol": symbol,
"date": records[i].date.isoformat(),
"direction": s["direction"],
"rr": s["rr"],
"confidence": s["confidence"],
"primary_prob": s["primary_prob"],
"best_prob": s["best_prob"],
"ev": s["ev"],
"meets_core": s["meets_core"],
"qualified": s["qualified"],
"outcome": outcome,
"target_hit": target_hit,
"realized_r": realized_r,
})
return candidates
def _bucket_stats(cands: list[dict]) -> dict:
wins = sum(1 for c in cands if c["target_hit"])
losses = sum(1 for c in cands if c["outcome"] in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS))
expired = sum(1 for c in cands if c["outcome"] not in (OUTCOME_TARGET_HIT, OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS))
decided = wins + losses
rs = [c["realized_r"] for c in cands]
return {
"total": len(cands),
"wins": wins,
"losses": losses,
"expired": expired,
"hit_rate": round(wins / decided * 100, 1) if decided else None,
"avg_r": round(sum(rs) / len(rs), 3) if rs else None,
"total_r": round(sum(rs), 2) if rs else None,
}
def _calibration(cands: list[dict]) -> list[dict]:
"""Predicted target probability vs realized hit rate, per probability bucket."""
rows: list[dict] = []
for lo, hi in _CAL_BUCKETS:
bucket = [c for c in cands if lo <= c["primary_prob"] < hi]
if not bucket:
continue
hits = sum(1 for c in bucket if c["target_hit"])
rows.append({
"bucket": f"{int(lo)}-{int(min(hi, 100))}%",
"n": len(bucket),
"predicted_avg": round(sum(c["primary_prob"] for c in bucket) / len(bucket), 1),
"realized_hit_rate": round(hits / len(bucket) * 100, 1),
})
return rows
# ---------------------------------------------------------------------------
# Cross-sectional signal evaluation (factor information-coefficient)
# ---------------------------------------------------------------------------
def _weekly_asof_indices(records: list) -> list[int]:
"""Index of the last bar in each ISO week — the weekly rebalance as-of bars.
Keying on the calendar week (not the raw bar index) makes every ticker's
as-of dates line up, so the cross-section on a given week is comparable.
"""
last_by_week: dict[tuple[int, int], int] = {}
for idx, r in enumerate(records):
iso = r.date.isocalendar()
last_by_week[(iso[0], iso[1])] = idx
return sorted(last_by_week.values())
def _signal_values(closes: list[float], highs: list[float], i: int) -> dict[str, float]:
"""Point-in-time candidate signals at as-of index ``i`` (price-only).
Momentum factors follow the standard "skip the last month" convention
(return up to ~1 month ago) to avoid the short-term reversal effect, which
``reversal_1m`` isolates on purpose — we expect its IC to be negative if the
universe mean-reverts. ``trend_200`` is price vs its 200-bar SMA. ``high_52w``
is closeness to the trailing 52-week high (George/Hwang anchoring effect:
higher = nearer the high, expect positive IC). ``vol_6m`` is 126-day realized
volatility (expect negative IC if the low-volatility anomaly holds).
"""
out: dict[str, float] = {}
if i - 252 >= 0 and closes[i - 252] > 0:
out["mom_12_1"] = closes[i - 21] / closes[i - 252] - 1.0
if i - 126 >= 0 and closes[i - 126] > 0:
out["mom_6_1"] = closes[i - 21] / closes[i - 126] - 1.0
if i - 63 >= 0 and closes[i - 63] > 0:
out["mom_3_1"] = closes[i - 21] / closes[i - 63] - 1.0
if i - 21 >= 0 and closes[i - 21] > 0:
out["reversal_1m"] = closes[i] / closes[i - 21] - 1.0
if i - 199 >= 0:
sma = sum(closes[i - 199 : i + 1]) / 200.0
if sma > 0:
out["trend_200"] = closes[i] / sma - 1.0
if i - 251 >= 0:
high_52w = max(highs[i - 251 : i + 1])
if high_52w > 0:
out["high_52w"] = closes[i] / high_52w
if i - 126 >= 0:
rets = [
closes[k] / closes[k - 1] - 1.0
for k in range(i - 125, i + 1)
if closes[k - 1] > 0
]
if len(rets) >= 2:
mean = sum(rets) / len(rets)
var = sum((x - mean) ** 2 for x in rets) / (len(rets) - 1)
out["vol_6m"] = math.sqrt(var)
return out
def _accumulate_signal_series(records: list, collected: dict) -> None:
"""For each weekly as-of bar, emit (signal, forward-return) pairs keyed by ISO
week into ``collected[name][week_key]``. Forward return is close-to-close over
HORIZON trading days. Mutates ``collected`` (a dict of dict of list)."""
n = len(records)
if n < HORIZON + 21:
return
closes = [float(r.close) for r in records]
highs = [float(r.high) for r in records]
for i in _weekly_asof_indices(records):
j = i + HORIZON
if j >= n or closes[i] <= 0:
continue
fwd = closes[j] / closes[i] - 1.0
iso = records[i].date.isocalendar()
week_key = (iso[0], iso[1])
for name, val in _signal_values(closes, highs, i).items():
collected[name][week_key].append((val, fwd))
def _rank(xs: list[float]) -> list[float]:
"""Average (tie-corrected) ranks, 1-based."""
order = sorted(range(len(xs)), key=lambda k: xs[k])
ranks = [0.0] * len(xs)
i = 0
while i < len(xs):
j = i
while j + 1 < len(xs) and xs[order[j + 1]] == xs[order[i]]:
j += 1
avg_rank = (i + j) / 2.0 + 1.0
for k in range(i, j + 1):
ranks[order[k]] = avg_rank
i = j + 1
return ranks
def _pearson(a: list[float], b: list[float]) -> float | None:
n = len(a)
if n < 3:
return None
ma, mb = sum(a) / n, sum(b) / n
va = sum((x - ma) ** 2 for x in a)
vb = sum((y - mb) ** 2 for y in b)
if va <= 0 or vb <= 0:
return None
cov = sum((a[k] - ma) * (b[k] - mb) for k in range(n))
return cov / math.sqrt(va * vb)
def _spearman(xs: list[float], ys: list[float]) -> float | None:
"""Rank correlation = Pearson on the ranks. None if too few/degenerate."""
if len(xs) < 3:
return None
return _pearson(_rank(xs), _rank(ys))
def _quintile_spread(pairs: list[tuple[float, float]]) -> float | None:
"""Mean forward return of the top signal-quintile minus the bottom quintile."""
n = len(pairs)
if n < 10:
return None
ordered = sorted(pairs, key=lambda p: p[0])
k = n // 5
top = ordered[-k:]
bottom = ordered[:k]
return sum(p[1] for p in top) / k - sum(p[1] for p in bottom) / k
def _signal_evaluation(collected: dict) -> list[dict]:
"""Per-signal factor diagnostics, one row per candidate signal:
mean_ic average weekly rank-IC (Spearman of signal vs fwd ret)
ic_t_stat mean_ic / stderr — is the IC reliably non-zero?
ic_positive_pct share of weeks the IC is positive (consistency)
mean_quintile_spread avg top-minus-bottom-quintile forward return
A signal with no edge lands near IC 0 and spread 0. Caveat: weekly rebalances
with a HORIZON-day forward window overlap, so the t-stat overstates
significance — read it as directional, alongside ic_positive_pct.
"""
rows: list[dict] = []
for name in sorted(collected):
ics: list[float] = []
spreads: list[float] = []
sizes: list[int] = []
for recs in collected[name].values():
if len(recs) < MIN_CROSS_SECTION:
continue
ic = _spearman([r[0] for r in recs], [r[1] for r in recs])
if ic is not None:
ics.append(ic)
spread = _quintile_spread(recs)
if spread is not None:
spreads.append(spread)
sizes.append(len(recs))
if not ics:
continue
mean_ic = sum(ics) / len(ics)
if len(ics) > 1:
std = math.sqrt(sum((x - mean_ic) ** 2 for x in ics) / (len(ics) - 1))
else:
std = 0.0
t_stat = mean_ic / std * math.sqrt(len(ics)) if std > 0 else None
rows.append({
"signal": name,
"weeks": len(ics),
"avg_cross_section": round(sum(sizes) / len(sizes), 1) if sizes else None,
"mean_ic": round(mean_ic, 4),
"ic_t_stat": round(t_stat, 2) if t_stat is not None else None,
"ic_positive_pct": round(sum(1 for x in ics if x > 0) / len(ics) * 100, 1),
"mean_quintile_spread": round(sum(spreads) / len(spreads), 4) if spreads else None,
})
rows.sort(key=lambda r: r["mean_ic"], reverse=True)
return rows
async def run_backtest(
db: AsyncSession,
progress_cb: Callable[[int, int, str], None] | None = None,
) -> dict:
"""Replay every ticker and aggregate the Phase-1 reports for the current config."""
config = await get_recommendation_config(db)
activation = await get_activation_config(db)
result = await db.execute(select(Ticker).order_by(Ticker.symbol))
tickers = list(result.scalars().all())
total = len(tickers)
candidates: list[dict] = []
# collected[signal_name][iso_week] -> list of (signal_value, forward_return)
collected: dict = defaultdict(lambda: defaultdict(list))
for index, ticker in enumerate(tickers):
if progress_cb is not None:
progress_cb(index, total, ticker.symbol)
try:
records = await query_ohlcv(db, ticker.symbol)
candidates.extend(_replay_ticker(ticker.symbol, records, config, activation))
_accumulate_signal_series(records, collected)
except Exception:
logger.exception("Backtest replay failed for %s", ticker.symbol)
if progress_cb is not None and total:
progress_cb(total, total, "")
qualified = [c for c in candidates if c["qualified"]]
longs = [c for c in qualified if c["direction"] == "long"]
shorts = [c for c in qualified if c["direction"] == "short"]
# Threshold sweep: re-apply the gate at several min_expected_value values
# (holding the other conditions fixed) so the trade-off between how many
# setups qualify and their expectancy is visible without re-replaying.
current_min_ev = float(activation.get("min_expected_value", 0.15))
sweep = []
for threshold in (0.4, 0.3, 0.25, 0.2, 0.15, 0.1, 0.05, 0.0):
cands = [
c for c in candidates
if c["meets_core"] and c["ev"] is not None and c["ev"] >= threshold
]
sweep.append({"min_expected_value": threshold, **_bucket_stats(cands)})
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"tickers": total,
"candidates": len(candidates),
"qualified": len(qualified),
"params": {"step_days": STEP_DAYS, "horizon_days": HORIZON, "min_lookback": MIN_LOOKBACK},
"activation": activation,
"overall_qualified": _bucket_stats(qualified),
"overall_all": _bucket_stats(candidates),
"by_direction": {
"long": _bucket_stats(longs),
"short": _bucket_stats(shorts),
},
"min_expected_value": current_min_ev,
"sweep": sweep,
"calibration": _calibration(candidates),
"signal_eval": _signal_evaluation(collected),
"signal_eval_note": (
"Cross-sectional rank-IC of price-only signals vs the forward "
f"{HORIZON}-day return (weekly rebalance, min {MIN_CROSS_SECTION} "
"names/week). |IC| ≳ 0.03 with a consistent sign is a real (if small) "
"edge; near 0 means ranking on it sorts nothing. Momentum factors and "
"high_52w are expected positive; reversal_1m and vol_6m are expected "
"negative (mean-reversion / low-vol anomaly). Overlapping windows inflate "
"the t-stat — read directionally."
),
"note": (
"Sentiment & fundamentals held neutral (no point-in-time history). "
"~6 months ≈ one market regime — treat as directional, not gospel."
),
}
async def run_and_store(
db: AsyncSession,
progress_cb: Callable[[int, int, str], None] | None = None,
) -> dict:
"""Run the backtest and cache the report in a SystemSetting. Job entrypoint."""
report = await run_backtest(db, progress_cb)
await update_setting(db, KEY_REPORT, json.dumps(report))
return report
async def get_backtest_report(db: AsyncSession) -> dict | None:
"""Return the last cached backtest report, or None if never run."""
result = await db.execute(select(SystemSetting).where(SystemSetting.key == KEY_REPORT))
setting = result.scalar_one_or_none()
if setting is None:
return None
try:
return json.loads(setting.value)
except (TypeError, ValueError):
return None