feat: add strategy variant lab and signal context snapshots
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 1m1s
Deploy / deploy (push) Successful in 33s

Backtest report now includes research-only hold-to-horizon portfolio variants comparing raw vs residual 12-1 momentum, cutoff 80 vs 90, max 10 vs 15 positions, and SPY-200 risk scaling. A dynamic research recommendation panel flags residual momentum, cutoff 90, or regime scaling only when transparent promotion rules pass.

Adds signal_context_snapshots with migration 016 and captures one point-in-time context row per newly generated TradeSetup: setup fields, composite/dimensions, latest sentiment, latest fundamentals, and strategy_version=momentum_12_1_rr_time_v1. This is forward-only; no historical sentiment/fundamental backfill is attempted.

No live gate, paper-trade exit, or production ranking behavior changes.

Verification: 458 backend tests pass, ruff check app/ clean, frontend npm run build clean.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-07-02 16:25:04 +02:00
parent 13374087db
commit 80b4113280
10 changed files with 885 additions and 28 deletions
+331 -16
View File
@@ -296,7 +296,13 @@ def _time_exits(
return result
def _replay_ticker(symbol: str, records: list, config: dict, activation: dict) -> list[dict]:
def _replay_ticker(
symbol: str,
records: list,
config: dict,
activation: dict,
benchmark_closes: dict[date, float] | None = None,
) -> list[dict]:
"""Walk one ticker's history weekly, building setups and their realized outcomes."""
candidates: list[dict] = []
n = len(records)
@@ -307,6 +313,11 @@ def _replay_ticker(symbol: str, records: list, config: dict, activation: dict) -
window = records[: i + 1]
forward = records[i + 1 :]
forward_bars = [Bar(date=r.date, high=r.high, low=r.low) for r in forward]
closes = [float(r.close) for r in window]
dates = [r.date for r in window]
residual_momentum = _residual_momentum_12_1(
dates, closes, len(window) - 1, benchmark_closes
)
for s in _window_setups(window, config, activation):
outcome, outcome_date = evaluate_setup_against_bars(
@@ -349,6 +360,7 @@ def _replay_ticker(symbol: str, records: list, config: dict, activation: dict) -
"primary_prob": s["primary_prob"],
"best_prob": s["best_prob"],
"momentum": s["momentum"],
"residual_momentum": residual_momentum,
"meets_core": s["meets_core"],
# Gate fields the ablation recomputes floors from — without them
# every candidate looks NEUTRAL and the ablation rows collapse.
@@ -759,7 +771,10 @@ def _replay_and_signals(
)
for o, op, hi, lo, cl, vo in zip(date_ords, opens, highs, lows, closes, volumes)
]
return _replay_ticker(symbol, bars, config, activation), _signal_series(bars, benchmark_closes)
return (
_replay_ticker(symbol, bars, config, activation, benchmark_closes),
_signal_series(bars, benchmark_closes),
)
def _backtest_worker_count() -> int:
@@ -800,22 +815,40 @@ async def _fetch_columns(db: AsyncSession, symbol: str) -> tuple | None:
)
def _assign_momentum_percentiles(candidates: list[dict]) -> None:
"""Per ISO week, rank candidates by their ticker's 12-1 momentum and attach a
0100 ``momentum_percentile`` (100 = highest momentum in the universe that
week). Candidates whose momentum is unknown (insufficient lookback) get None
and therefore can't clear a momentum gate. Mutates ``candidates``."""
def _assign_signal_percentiles(
candidates: list[dict],
value_key: str,
percentile_key: str,
) -> None:
"""Per ISO week, rank candidates by ``value_key`` and attach a 0-100
percentile under ``percentile_key`` (100 = strongest). Missing values get
None and therefore cannot clear a gate based on that signal."""
by_week: dict = defaultdict(list)
for c in candidates:
if c.get("momentum") is not None:
if c.get(value_key) is not None:
by_week[c["iso_week"]].append(c)
for group in by_week.values():
ordered = sorted(group, key=lambda c: c["momentum"])
ordered = sorted(group, key=lambda c: c[value_key])
n = len(ordered)
for rank, c in enumerate(ordered):
c["momentum_percentile"] = (rank / (n - 1) * 100.0) if n > 1 else 100.0
c[percentile_key] = (rank / (n - 1) * 100.0) if n > 1 else 100.0
for c in candidates:
c.setdefault("momentum_percentile", None)
c.setdefault(percentile_key, None)
def _assign_momentum_percentiles(candidates: list[dict]) -> None:
"""Per ISO week, rank candidates by their ticker's 12-1 momentum and attach a
0-100 ``momentum_percentile`` (100 = highest momentum in the universe that
week). Candidates whose momentum is unknown (insufficient lookback) get None
and therefore can't clear a momentum gate. Mutates ``candidates``."""
_assign_signal_percentiles(candidates, "momentum", "momentum_percentile")
def _assign_residual_momentum_percentiles(candidates: list[dict]) -> None:
"""Research-only residual-momentum percentile used by strategy variants."""
_assign_signal_percentiles(
candidates, "residual_momentum", "residual_momentum_percentile"
)
def _momentum_qualifies(cand: dict, threshold: float) -> bool:
@@ -930,6 +963,12 @@ def _simulate_portfolio(
spy_closes: dict | None,
exit_policy: str,
hold_days: int,
*,
qualified_fn: Callable[[dict], bool] | None = None,
ranking_key: str = "momentum_percentile",
max_positions: int = SIM_MAX_POSITIONS,
risk_per_trade: float = SIM_RISK_PER_TRADE,
risk_scale_by_ord: dict[int, float] | None = None,
) -> dict | None:
"""Replay the qualified setups as ONE capital-constrained book and report
portfolio economics from the daily equity curve (return, CAGR, drawdown,
@@ -942,9 +981,15 @@ def _simulate_portfolio(
modeled); positions still open at the end are closed at their last mark.
Returns None when there is nothing to trade.
"""
if qualified_fn is None:
def _default_qualified(c: dict) -> bool:
return bool(c.get("qualified"))
qualified_fn = _default_qualified
entries_by_ord: dict[int, list[dict]] = defaultdict(list)
for c in candidates:
if not c.get("qualified") or c.get("direction") != "long":
if not qualified_fn(c) or c.get("direction") != "long":
continue
if not c.get("entry") or not c.get("stop"):
continue
@@ -1018,22 +1063,26 @@ def _simulate_portfolio(
equity = _marked_equity()
todays = sorted(
entries_by_ord.get(o, ()),
key=lambda c: c.get("momentum_percentile") or 0.0,
key=lambda c: c.get(ranking_key) or 0.0,
reverse=True,
)
for c in todays:
sym = c["symbol"]
if sym in positions:
continue
if len(positions) >= SIM_MAX_POSITIONS:
if len(positions) >= max_positions:
skipped_full += 1
continue
entry, stop = float(c["entry"]), float(c["stop"])
risk_ps = entry - stop
if risk_ps <= 0 or entry <= 0:
continue
risk_scale = (risk_scale_by_ord or {}).get(o, 1.0)
effective_risk = risk_per_trade * risk_scale
if effective_risk <= 0:
continue
shares = min(
(equity * SIM_RISK_PER_TRADE) / risk_ps,
(equity * effective_risk) / risk_ps,
(equity * SIM_NOTIONAL_CAP) / entry,
max(cash, 0.0) / (entry * (1.0 + COST_PER_SIDE)),
)
@@ -1143,6 +1192,247 @@ def _simulate_portfolio(
}
STRATEGY_VARIANTS: tuple[dict, ...] = (
{
"variant": "production_raw_80_fixed10",
"label": "Production raw 80 / max 10",
"percentile_key": "momentum_percentile",
"cutoff": 80.0,
"max_positions": 10,
"risk_per_trade": 0.01,
"risk_scale": None,
},
{
"variant": "raw_90_fixed10",
"label": "Raw 90 / max 10",
"percentile_key": "momentum_percentile",
"cutoff": 90.0,
"max_positions": 10,
"risk_per_trade": 0.01,
"risk_scale": None,
},
{
"variant": "residual_80_fixed10",
"label": "Residual 80 / max 10",
"percentile_key": "residual_momentum_percentile",
"cutoff": 80.0,
"max_positions": 10,
"risk_per_trade": 0.01,
"risk_scale": None,
},
{
"variant": "residual_90_fixed10",
"label": "Residual 90 / max 10",
"percentile_key": "residual_momentum_percentile",
"cutoff": 90.0,
"max_positions": 10,
"risk_per_trade": 0.01,
"risk_scale": None,
},
{
"variant": "raw_80_fixed15",
"label": "Raw 80 / max 15",
"percentile_key": "momentum_percentile",
"cutoff": 80.0,
"max_positions": 15,
"risk_per_trade": 0.01,
"risk_scale": None,
},
{
"variant": "raw_80_regime_scaled",
"label": "Raw 80 / SPY-200 risk scale",
"percentile_key": "momentum_percentile",
"cutoff": 80.0,
"max_positions": 10,
"risk_per_trade": 0.01,
"risk_scale": "spy_200",
},
{
"variant": "residual_80_regime_scaled",
"label": "Residual 80 / SPY-200 risk scale",
"percentile_key": "residual_momentum_percentile",
"cutoff": 80.0,
"max_positions": 10,
"risk_per_trade": 0.01,
"risk_scale": "spy_200",
},
)
def _qualifies_by_percentile(cand: dict, percentile_key: str, threshold: float) -> bool:
"""Variant qualification: production floors + long-only signal percentile.
This does not mutate or replace the production ``qualified`` field."""
if not cand.get("meets_core"):
return False
if threshold <= 0:
return True
if cand.get("direction") == "short":
return False
pct = cand.get(percentile_key)
return pct is not None and pct >= threshold
def _spy_200_risk_scale(spy_closes: dict[date, float] | None) -> dict[int, float]:
"""Entry-date risk scale: 0.5 when SPY closes below its 200-day SMA, else 1.0.
Missing/short benchmark history returns an empty map, which the simulator
treats as unscaled 1.0 risk."""
if not spy_closes:
return {}
rows = sorted((d, c) for d, c in spy_closes.items() if c and c > 0)
out: dict[int, float] = {}
closes: list[float] = []
for d, close in rows:
closes.append(float(close))
if len(closes) < 200:
continue
sma = sum(closes[-200:]) / 200.0
out[d.toordinal()] = 0.5 if close < sma else 1.0
return out
def _strategy_variant_sims(
candidates: list[dict],
prices: dict[str, tuple],
spy_closes: dict[date, float] | None,
hold_days: int,
) -> list[dict]:
"""Research-only portfolio variants for comparing rank signals, cutoff, book
capacity, and simple SPY-200 risk scaling. Live qualification is untouched."""
risk_scales = {"spy_200": _spy_200_risk_scale(spy_closes)}
rows: list[dict] = []
for cfg in STRATEGY_VARIANTS:
percentile_key = str(cfg["percentile_key"])
cutoff = float(cfg["cutoff"])
sim = _simulate_portfolio(
candidates,
prices,
spy_closes,
"hold",
hold_days,
qualified_fn=lambda c, pk=percentile_key, th=cutoff: _qualifies_by_percentile(c, pk, th),
ranking_key=percentile_key,
max_positions=int(cfg["max_positions"]),
risk_per_trade=float(cfg["risk_per_trade"]),
risk_scale_by_ord=risk_scales.get(cfg["risk_scale"]),
)
if sim is None:
continue
rows.append({
"variant": cfg["variant"],
"label": cfg["label"],
"ranking": "residual" if "residual" in percentile_key else "raw",
"cutoff": cutoff,
"max_positions": int(cfg["max_positions"]),
"risk_per_trade_pct": round(float(cfg["risk_per_trade"]) * 100, 2),
"risk_scale": cfg["risk_scale"],
**sim,
})
return rows
def _pct_loss(base: float | None, candidate: float | None) -> float | None:
if base is None or candidate is None or base <= 0:
return None
return (base - candidate) / base
def _build_research_recommendation(report: dict) -> dict:
"""Advisory rules for research variants. These are deliberately conservative:
production only changes later if a portfolio variant beats the baseline under
transparent drawdown/Sharpe/CAGR constraints."""
variants = {
v.get("variant"): v
for v in (report.get("strategy_variants") or {}).get("variants", [])
}
base = variants.get("production_raw_80_fixed10")
items: list[dict] = []
if base is None:
return {
"items": [],
"note": "Strategy variants unavailable; re-run the backtest after benchmark data is present.",
}
base_sharpe = base.get("sharpe")
base_dd = base.get("max_drawdown_pct")
base_cagr = base.get("cagr_pct")
residuals = [
v for key, v in variants.items()
if key.startswith("residual_") and v.get("risk_scale") is None
]
residual = max(residuals, key=lambda v: v.get("sharpe") or -999, default=None)
if (
residual and base_sharpe is not None and residual.get("sharpe") is not None
and base_dd is not None and residual.get("max_drawdown_pct") is not None
):
sharpe_delta = residual["sharpe"] - base_sharpe
dd_delta = residual["max_drawdown_pct"] - base_dd
candidate = sharpe_delta >= 0.10 and dd_delta <= 2.0
items.append({
"topic": "residual_momentum",
"candidate": candidate,
"text": (
f"Residual momentum {'is a promotion candidate' if candidate else 'stays research-only'}: "
f"{residual['label']} Sharpe {residual['sharpe']:.2f} vs {base_sharpe:.2f}, "
f"drawdown {residual['max_drawdown_pct']:.1f}% vs {base_dd:.1f}%."
),
})
raw_regime = variants.get("raw_80_regime_scaled")
if (
raw_regime and base_dd is not None and base_cagr is not None
and raw_regime.get("cagr_pct") is not None
and raw_regime.get("max_drawdown_pct") is not None
):
dd_reduction = (base_dd - raw_regime["max_drawdown_pct"]) / base_dd if base_dd > 0 else None
cagr_loss = _pct_loss(base_cagr, raw_regime.get("cagr_pct"))
candidate = (
dd_reduction is not None and cagr_loss is not None
and dd_reduction >= 0.20 and cagr_loss <= 0.15
)
items.append({
"topic": "regime_scaling",
"candidate": candidate,
"text": (
f"SPY-200 risk scaling {'is a promotion candidate' if candidate else 'stays research-only'}: "
f"drawdown {raw_regime['max_drawdown_pct']:.1f}% vs {base_dd:.1f}%, "
f"CAGR {raw_regime.get('cagr_pct'):+.1f}% vs {base_cagr:+.1f}%."
),
})
raw_90 = variants.get("raw_90_fixed10")
if (
raw_90 and base_sharpe is not None and base_dd is not None and base_cagr is not None
and raw_90.get("sharpe") is not None and raw_90.get("cagr_pct") is not None
):
cagr_loss = _pct_loss(base_cagr, raw_90.get("cagr_pct"))
raw_90_sharpe = raw_90.get("sharpe")
candidate = (
raw_90_sharpe is not None
and raw_90_sharpe > base_sharpe
and raw_90["max_drawdown_pct"] < base_dd
and cagr_loss is not None and cagr_loss < 0.10
)
items.append({
"topic": "cutoff_90",
"candidate": candidate,
"text": (
f"Cutoff 90 {'is a promotion candidate' if candidate else 'stays research-only'}: "
f"Sharpe {raw_90_sharpe:.2f} vs {base_sharpe:.2f}, "
f"drawdown {raw_90['max_drawdown_pct']:.1f}% vs {base_dd:.1f}%, "
f"CAGR {raw_90.get('cagr_pct'):+.1f}% vs {base_cagr:+.1f}%."
),
})
return {
"items": items,
"note": (
"Advisory only. Production changes require a variant to pass the rule "
"and then be adopted explicitly in a later strategy-version change."
),
}
# ---------------------------------------------------------------------------
# Data-driven recommendation
# ---------------------------------------------------------------------------
@@ -1407,6 +1697,7 @@ async def run_backtest(
# Cross-sectional momentum: rank every week's universe, then "qualified" means
# floors + top ``min_momentum_percentile`` by 12-1 momentum.
_assign_momentum_percentiles(candidates)
_assign_residual_momentum_percentiles(candidates)
current_min_pct = float(activation.get("min_momentum_percentile", 80.0))
for c in candidates:
c["qualified"] = _momentum_qualifies(c, current_min_pct)
@@ -1428,8 +1719,19 @@ async def run_backtest(
# the book once per exit policy. Best-effort — the report stands without it.
hold_horizon = max(TIME_EXIT_DAYS)
sim_policies: list[dict] = []
strategy_variant_rows: list[dict] = []
try:
qual_symbols = sorted({c["symbol"] for c in candidates if c.get("qualified")})
qual_symbols = sorted({
c["symbol"]
for c in candidates
if c.get("qualified")
or any(
_qualifies_by_percentile(
c, str(cfg["percentile_key"]), float(cfg["cutoff"])
)
for cfg in STRATEGY_VARIANTS
)
})
price_columns: dict[str, tuple] = {}
for sym in qual_symbols:
cols = await _fetch_columns(db, sym)
@@ -1457,6 +1759,9 @@ async def run_backtest(
)
if sim is not None:
sim_policies.append({"policy": policy, **sim})
strategy_variant_rows = _strategy_variant_sims(
candidates, price_columns, spy_closes, hold_horizon
)
except Exception:
logger.exception("Portfolio simulation failed")
@@ -1513,6 +1818,15 @@ async def run_backtest(
"same window. In-sample; no dividends."
),
},
"strategy_variants": {
"variants": strategy_variant_rows,
"note": (
"Research-only hold-to-horizon portfolio variants. These compare "
"raw vs residual momentum ranking, cutoff 80 vs 90, max 10 vs 15 "
"positions, and SPY-200 risk scaling. They do not change live "
"qualification or paper-trade behavior."
),
},
"signal_eval": _signal_evaluation(collected),
"signal_eval_note": (
"Cross-sectional rank-IC of price-only signals vs the forward "
@@ -1533,6 +1847,7 @@ async def run_backtest(
),
}
report["recommendation"] = _build_recommendation(report)
report["research_recommendation"] = _build_research_recommendation(report)
return report
+138 -1
View File
@@ -11,15 +11,17 @@ from __future__ import annotations
import json
import logging
from collections.abc import Callable
from datetime import datetime, timezone
from datetime import date, datetime, timezone
from sqlalchemy import and_, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.exceptions import NotFoundError
from app.models.fundamental import FundamentalData
from app.models.ohlcv import OHLCVRecord
from app.models.score import CompositeScore, DimensionScore
from app.models.sentiment import SentimentScore
from app.models.signal_context_snapshot import SignalContextSnapshot
from app.models.sr_level import SRLevel
from app.models.ticker import Ticker
from app.models.trade_setup import TradeSetup
@@ -29,6 +31,8 @@ from app.services.recommendation_service import enhance_trade_setup
logger = logging.getLogger(__name__)
STRATEGY_VERSION = "momentum_12_1_rr_time_v1"
async def _get_ticker(db: AsyncSession, symbol: str) -> Ticker:
normalised = symbol.strip().upper()
@@ -76,6 +80,136 @@ async def _get_latest_sentiment(db: AsyncSession, ticker_id: int) -> str | None:
return row.classification if row else None
def _json_default(value):
if isinstance(value, (datetime, date)):
return value.isoformat()
return str(value)
async def _create_signal_context_snapshots(
db: AsyncSession,
setups: list[TradeSetup],
*,
strategy_version: str = STRATEGY_VERSION,
) -> None:
"""Capture point-in-time discretionary context for freshly generated setups.
The scanner stores the setup itself first so each snapshot can be keyed by
``trade_setup_id``. This is intentionally forward-only: old sentiment,
fundamentals and composite scores are not reconstructed from today's data.
"""
if not setups:
return
ticker_ids = {s.ticker_id for s in setups}
dims: dict[int, dict[str, dict]] = {}
dim_rows = (
await db.execute(select(DimensionScore).where(DimensionScore.ticker_id.in_(ticker_ids)))
).scalars().all()
for row in dim_rows:
dims.setdefault(row.ticker_id, {})[row.dimension] = {
"score": float(row.score),
"is_stale": bool(row.is_stale),
"computed_at": row.computed_at,
}
composites: dict[int, CompositeScore] = {}
comp_rows = (
await db.execute(
select(CompositeScore)
.where(CompositeScore.ticker_id.in_(ticker_ids))
.order_by(CompositeScore.ticker_id, CompositeScore.computed_at.desc())
)
).scalars().all()
for row in comp_rows:
composites.setdefault(row.ticker_id, row)
sentiments: dict[int, SentimentScore] = {}
sent_rows = (
await db.execute(
select(SentimentScore)
.where(SentimentScore.ticker_id.in_(ticker_ids))
.order_by(SentimentScore.ticker_id, SentimentScore.timestamp.desc())
)
).scalars().all()
for row in sent_rows:
sentiments.setdefault(row.ticker_id, row)
fundamentals: dict[int, FundamentalData] = {}
fund_rows = (
await db.execute(
select(FundamentalData)
.where(FundamentalData.ticker_id.in_(ticker_ids))
.order_by(FundamentalData.ticker_id, FundamentalData.fetched_at.desc())
)
).scalars().all()
for row in fund_rows:
fundamentals.setdefault(row.ticker_id, row)
now = datetime.now(timezone.utc)
for setup in setups:
comp = composites.get(setup.ticker_id)
sent = sentiments.get(setup.ticker_id)
fund = fundamentals.get(setup.ticker_id)
score_context = {
"composite_score": float(comp.score) if comp else float(setup.composite_score),
"composite_is_stale": bool(comp.is_stale) if comp else None,
"composite_computed_at": comp.computed_at if comp else None,
"dimensions": dims.get(setup.ticker_id, {}),
}
sentiment_context = (
{
"classification": sent.classification,
"confidence": int(sent.confidence),
"recommendation": sent.recommendation,
"timestamp": sent.timestamp,
"source": sent.source,
}
if sent
else {}
)
fundamental_context = (
{
"pe_ratio": fund.pe_ratio,
"revenue_growth": fund.revenue_growth,
"earnings_surprise": fund.earnings_surprise,
"market_cap": fund.market_cap,
"next_earnings_date": fund.next_earnings_date,
"fetched_at": fund.fetched_at,
}
if fund
else {}
)
db.add(
SignalContextSnapshot(
trade_setup_id=setup.id,
ticker_id=setup.ticker_id,
detected_at=setup.detected_at,
created_at=now,
strategy_version=strategy_version,
direction=setup.direction,
entry_price=float(setup.entry_price),
stop_loss=float(setup.stop_loss),
target=float(setup.target),
rr_ratio=float(setup.rr_ratio),
confidence_score=(
float(setup.confidence_score) if setup.confidence_score is not None else None
),
recommended_action=setup.recommended_action,
risk_level=setup.risk_level,
momentum_percentile=(
float(setup.momentum_percentile)
if setup.momentum_percentile is not None
else None
),
score_context_json=json.dumps(score_context, default=_json_default),
sentiment_context_json=json.dumps(sentiment_context, default=_json_default),
fundamental_context_json=json.dumps(fundamental_context, default=_json_default),
)
)
async def scan_ticker(
db: AsyncSession,
symbol: str,
@@ -238,6 +372,9 @@ async def scan_ticker(
for s in enhanced_setups:
await db.refresh(s)
await _create_signal_context_snapshots(db, enhanced_setups)
await db.commit()
return enhanced_setups