replace EV activation gate with cross-sectional 12-1 momentum ranking
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 41s
Deploy / deploy (push) Successful in 26s

The 5-year backtest confirmed the EV gate adds negative value (high threshold =
worst expectancy) and that 12-1 month momentum is the one price signal with a
plausible, right-signed cross-sectional IC (~0.05). So "qualified" now means:
clears the R:R + confidence floors AND the ticker ranks in the top
`min_momentum_percentile` of the universe by 12-1 momentum that week.

- qualification.py: drop expected_value_r / the EV gate; add a momentum-percentile
  gate (duck-typed `momentum_percentile`, only enforced when attached + threshold
  set, else defers to floors). Mirrored in frontend qualification.ts.
- activation config/schema: min_expected_value -> min_momentum_percentile
  (default 80 = top quintile). ActivationSettings, DashboardPage (ranks/【shows】
  momentum instead of EV), and the BacktestPanel sweep follow.
- backtest: rank each ISO week's universe by 12-1 momentum, assign a percentile,
  and qualify the top slice; the sweep now sweeps the percentile cutoff.

Also offload the backtest's per-ticker compute to a worker thread so the heavy
~5y run no longer blocks the API event loop (the "backend offline" flicker).

Production setups don't carry momentum_percentile yet — wiring the scanner to
attach it (a universe momentum-rank step) is the next step; until then the live
gate defers to floors while the backtest measures the momentum selection. 330
backend tests pass; frontend build clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-23 22:42:24 +02:00
parent 099846513b
commit ef523474ad
12 changed files with 202 additions and 196 deletions
+94 -23
View File
@@ -14,6 +14,7 @@ held neutral here — this calibrates the price/S-R/probability machinery only.
from __future__ import annotations
import asyncio
import json
import logging
import math
@@ -40,7 +41,6 @@ from app.services.outcome_service import (
from app.services.price_service import query_ohlcv
from app.services.qualification import (
best_target_probability,
expected_value_r,
setup_qualifies,
)
from app.services.recommendation_service import (
@@ -110,6 +110,14 @@ def _window_setups(
if entry <= 0:
return []
# 12-1 month momentum (skip the last month) — the universe ranks on this.
# None until a year of history exists; such setups can't qualify on momentum.
mom_12_1 = (
closes[-22] / closes[-253] - 1.0
if len(closes) >= 253 and closes[-253] > 0
else None
)
try:
atr = compute_atr(highs, lows, closes)["atr"]
except Exception:
@@ -180,13 +188,12 @@ def _window_setups(
stop_loss=stop,
entry_price=entry,
)
# meets_core = clears every gate EXCEPT the expected-value floor, so the
# report can sweep the min_expected_value threshold without re-replaying.
core_config = {**activation, "min_expected_value": float("-inf")}
# meets_core = clears every gate EXCEPT the cross-sectional momentum
# percentile, which can only be assigned once all tickers' setups for a
# week are known. run_backtest ranks momentum and finalizes `qualified`.
core_config = {**activation, "min_momentum_percentile": 0.0}
meets_core = setup_qualifies(setup_ns, core_config)
ev = expected_value_r(setup_ns)
best_prob = best_target_probability(setup_ns)
min_ev = float(activation.get("min_expected_value", 0.0))
out.append({
"direction": direction,
"entry": entry,
@@ -196,11 +203,10 @@ def _window_setups(
"confidence": confidences[direction],
"primary_prob": float(primary["probability"]),
"best_prob": best_prob,
"ev": ev,
"momentum": mom_12_1,
"meets_core": meets_core,
"action": action,
"risk_level": risk_level,
"qualified": meets_core and ev is not None and ev >= min_ev,
})
return out
@@ -230,17 +236,18 @@ def _replay_ticker(symbol: str, records: list, config: dict, activation: dict) -
realized_r = -1.0
else: # expired
realized_r = 0.0
iso = records[i].date.isocalendar()
candidates.append({
"symbol": symbol,
"date": records[i].date.isoformat(),
"iso_week": (iso[0], iso[1]),
"direction": s["direction"],
"rr": s["rr"],
"confidence": s["confidence"],
"primary_prob": s["primary_prob"],
"best_prob": s["best_prob"],
"ev": s["ev"],
"momentum": s["momentum"],
"meets_core": s["meets_core"],
"qualified": s["qualified"],
"outcome": outcome,
"target_hit": target_hit,
"realized_r": realized_r,
@@ -484,6 +491,49 @@ def _signal_evaluation(collected: dict) -> list[dict]:
return rows
def _process_ticker(
symbol: str,
records: list,
config: dict,
activation: dict,
collected: dict,
) -> list[dict]:
"""The CPU-bound per-ticker work — replay + signal accumulation — bundled so
run_backtest can hand it to a worker thread. Mutates ``collected``."""
cands = _replay_ticker(symbol, records, config, activation)
_accumulate_signal_series(records, collected)
return cands
def _assign_momentum_percentiles(candidates: list[dict]) -> None:
"""Per ISO week, rank candidates by their ticker's 12-1 momentum and attach a
0100 ``momentum_percentile`` (100 = highest momentum in the universe that
week). Candidates whose momentum is unknown (insufficient lookback) get None
and therefore can't clear a momentum gate. Mutates ``candidates``."""
by_week: dict = defaultdict(list)
for c in candidates:
if c.get("momentum") is not None:
by_week[c["iso_week"]].append(c)
for group in by_week.values():
ordered = sorted(group, key=lambda c: c["momentum"])
n = len(ordered)
for rank, c in enumerate(ordered):
c["momentum_percentile"] = (rank / (n - 1) * 100.0) if n > 1 else 100.0
for c in candidates:
c.setdefault("momentum_percentile", None)
def _momentum_qualifies(cand: dict, threshold: float) -> bool:
"""Whether a candidate clears the floors (meets_core) and the momentum gate.
Threshold 0 disables the momentum gate (floors only)."""
if not cand["meets_core"]:
return False
if threshold <= 0:
return True
mp = cand.get("momentum_percentile")
return mp is not None and mp >= threshold
async def run_backtest(
db: AsyncSession,
progress_cb: Callable[[int, int, str], None] | None = None,
@@ -504,29 +554,50 @@ async def run_backtest(
progress_cb(index, total, ticker.symbol)
try:
records = await query_ohlcv(db, ticker.symbol)
candidates.extend(_replay_ticker(ticker.symbol, records, config, activation))
_accumulate_signal_series(records, collected)
# Detach the ORM rows to plain objects in the event loop (safe to read
# here), then run the heavy replay in a worker thread. The compute is
# CPU-bound and used to block the event loop — and the API server with
# it — for the whole run; offloading lets CPython hand the GIL back to
# the loop every few ms so health checks / page loads stay responsive.
bars = [
SimpleNamespace(
date=r.date,
open=float(r.open),
high=float(r.high),
low=float(r.low),
close=float(r.close),
volume=int(r.volume),
)
for r in records
]
cands = await asyncio.to_thread(
_process_ticker, ticker.symbol, bars, config, activation, collected
)
candidates.extend(cands)
except Exception:
logger.exception("Backtest replay failed for %s", ticker.symbol)
if progress_cb is not None and total:
progress_cb(total, total, "")
# Cross-sectional momentum: rank every week's universe, then "qualified" means
# floors + top ``min_momentum_percentile`` by 12-1 momentum.
_assign_momentum_percentiles(candidates)
current_min_pct = float(activation.get("min_momentum_percentile", 80.0))
for c in candidates:
c["qualified"] = _momentum_qualifies(c, current_min_pct)
qualified = [c for c in candidates if c["qualified"]]
longs = [c for c in qualified if c["direction"] == "long"]
shorts = [c for c in qualified if c["direction"] == "short"]
# Threshold sweep: re-apply the gate at several min_expected_value values
# (holding the other conditions fixed) so the trade-off between how many
# setups qualify and their expectancy is visible without re-replaying.
current_min_ev = float(activation.get("min_expected_value", 0.15))
# Threshold sweep: re-apply the momentum gate at several percentile cutoffs
# (floors held fixed) so the trade-off between how many setups qualify and
# their expectancy is visible without re-replaying. 0 = floors only.
sweep = []
for threshold in (0.4, 0.3, 0.25, 0.2, 0.15, 0.1, 0.05, 0.0):
cands = [
c for c in candidates
if c["meets_core"] and c["ev"] is not None and c["ev"] >= threshold
]
sweep.append({"min_expected_value": threshold, **_bucket_stats(cands)})
for threshold in (90.0, 80.0, 70.0, 60.0, 50.0, 0.0):
cands = [c for c in candidates if _momentum_qualifies(c, threshold)]
sweep.append({"min_momentum_percentile": threshold, **_bucket_stats(cands)})
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
@@ -541,7 +612,7 @@ async def run_backtest(
"long": _bucket_stats(longs),
"short": _bucket_stats(shorts),
},
"min_expected_value": current_min_ev,
"min_momentum_percentile": current_min_pct,
"sweep": sweep,
"calibration": _calibration(candidates),
"signal_eval": _signal_evaluation(collected),