feat: add residual momentum to signal-edge backtest
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 55s
Deploy / deploy (push) Successful in 33s

Adds a research-only 12-1 residual momentum signal to the cross-sectional signal-evaluation harness. The signal estimates benchmark beta over the 12-1 formation window and ranks cumulative stock return minus beta-adjusted benchmark return; it only appears when benchmark closes are available.

No production qualification behavior changes. The Backtest signal table labels the new row as 12-1 residual momentum. Tests cover benchmark-gated emission and beta removal while keeping stock-specific drift.

Verification: 453 backend tests pass, ruff check app/ clean, frontend npm run build clean.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-07-02 15:46:54 +02:00
parent 1e82dfad7f
commit 13374087db
4 changed files with 122 additions and 10 deletions
+79 -8
View File
@@ -493,7 +493,55 @@ def _weekly_asof_indices(records: list) -> list[int]:
return sorted(last_by_week.values())
def _signal_values(closes: list[float], highs: list[float], i: int) -> dict[str, float]:
def _residual_momentum_12_1(
dates: list[date],
closes: list[float],
i: int,
benchmark_closes: dict[date, float] | None,
) -> float | None:
"""12-1 momentum after removing the stock's linear benchmark exposure.
This is a practical beta-adjusted residual momentum approximation: estimate
beta from daily stock/benchmark returns over the same formation window
(12 months ending 1 month ago), then rank on cumulative stock return minus
beta * benchmark return. We deliberately do not subtract a fitted intercept:
with an intercept estimated over the same window, the arithmetic residuals
sum to ~zero by construction, which would destroy the signal.
"""
if not benchmark_closes or i - 252 < 0:
return None
stock_rets: list[float] = []
market_rets: list[float] = []
# Same daily intervals as mom_12_1: close[i-252] -> close[i-21].
for k in range(i - 251, i - 20):
prev_close = closes[k - 1]
bench_prev = benchmark_closes.get(dates[k - 1])
bench_cur = benchmark_closes.get(dates[k])
if prev_close <= 0 or bench_prev is None or bench_cur is None or bench_prev <= 0:
continue
stock_rets.append(closes[k] / prev_close - 1.0)
market_rets.append(bench_cur / bench_prev - 1.0)
if len(stock_rets) < 100:
return None
mean_market = sum(market_rets) / len(market_rets)
mean_stock = sum(stock_rets) / len(stock_rets)
var_market = sum((x - mean_market) ** 2 for x in market_rets)
if var_market <= 0:
return None
cov = sum((stock_rets[k] - mean_stock) * (market_rets[k] - mean_market) for k in range(len(stock_rets)))
beta = cov / var_market
return sum(stock_rets[k] - beta * market_rets[k] for k in range(len(stock_rets)))
def _signal_values(
dates: list[date],
closes: list[float],
highs: list[float],
i: int,
benchmark_closes: dict[date, float] | None = None,
) -> dict[str, float]:
"""Point-in-time candidate signals at as-of index ``i`` (price-only).
Momentum factors follow the standard "skip the last month" convention
@@ -507,6 +555,9 @@ def _signal_values(closes: list[float], highs: list[float], i: int) -> dict[str,
out: dict[str, float] = {}
if i - 252 >= 0 and closes[i - 252] > 0:
out["mom_12_1"] = closes[i - 21] / closes[i - 252] - 1.0
residual = _residual_momentum_12_1(dates, closes, i, benchmark_closes)
if residual is not None:
out["mom_12_1_resid"] = residual
if i - 126 >= 0 and closes[i - 126] > 0:
out["mom_6_1"] = closes[i - 21] / closes[i - 126] - 1.0
if i - 63 >= 0 and closes[i - 63] > 0:
@@ -534,7 +585,11 @@ def _signal_values(closes: list[float], highs: list[float], i: int) -> dict[str,
return out
def _accumulate_signal_series(records: list, collected: dict) -> None:
def _accumulate_signal_series(
records: list,
collected: dict,
benchmark_closes: dict[date, float] | None = None,
) -> None:
"""For each weekly as-of bar, emit (signal, forward-return) pairs keyed by ISO
week into ``collected[name][week_key]``. Forward return is close-to-close over
HORIZON trading days. Mutates ``collected`` (a dict of dict of list)."""
@@ -543,6 +598,7 @@ def _accumulate_signal_series(records: list, collected: dict) -> None:
return
closes = [float(r.close) for r in records]
highs = [float(r.high) for r in records]
dates = [r.date for r in records]
for i in _weekly_asof_indices(records):
j = i + HORIZON
if j >= n or closes[i] <= 0:
@@ -550,7 +606,7 @@ def _accumulate_signal_series(records: list, collected: dict) -> None:
fwd = closes[j] / closes[i] - 1.0
iso = records[i].date.isocalendar()
week_key = (iso[0], iso[1])
for name, val in _signal_values(closes, highs, i).items():
for name, val in _signal_values(dates, closes, highs, i, benchmark_closes).items():
collected[name][week_key].append((val, fwd))
@@ -678,11 +734,11 @@ def _signal_evaluation(collected: dict) -> list[dict]:
return rows
def _signal_series(records: list) -> dict:
def _signal_series(records: list, benchmark_closes: dict[date, float] | None = None) -> dict:
"""Per-ticker signal/forward-return series as a PLAIN (picklable) nested dict
— no defaultdict/lambda — so it can cross a process boundary."""
tmp: dict = defaultdict(lambda: defaultdict(list))
_accumulate_signal_series(records, tmp)
_accumulate_signal_series(records, tmp, benchmark_closes)
return {name: dict(weeks) for name, weeks in tmp.items()}
@@ -691,6 +747,7 @@ def _replay_and_signals(
columns: tuple,
config: dict,
activation: dict,
benchmark_closes: dict[date, float] | None = None,
) -> tuple[list[dict], dict]:
"""The CPU-bound per-ticker work, as a top-level (picklable) function so it can
run in a worker process. Takes primitive column arrays (cheap to pickle),
@@ -702,7 +759,7 @@ def _replay_and_signals(
)
for o, op, hi, lo, cl, vo in zip(date_ords, opens, highs, lows, closes, volumes)
]
return _replay_ticker(symbol, bars, config, activation), _signal_series(bars)
return _replay_ticker(symbol, bars, config, activation), _signal_series(bars, benchmark_closes)
def _backtest_worker_count() -> int:
@@ -1265,6 +1322,18 @@ async def run_backtest(
# collected[signal_name][iso_week] -> list of (signal_value, forward_return)
collected: dict = defaultdict(lambda: defaultdict(list))
# Residual momentum needs a point-in-time benchmark return stream. Best-effort:
# if SPY benchmark data is unavailable, the residual signal simply won't be
# emitted and the rest of the report remains valid.
benchmark_closes: dict[date, float] = {}
try:
from app.services.benchmark_service import load_benchmark_closes, refresh_benchmark_prices
await refresh_benchmark_prices(db, days=settings.ohlcv_history_days + 365)
benchmark_closes = await load_benchmark_closes(db)
except Exception:
logger.exception("Benchmark load for residual momentum failed")
def _merge(result: tuple[list[dict], dict]) -> None:
cands, series = result
candidates.extend(cands)
@@ -1305,7 +1374,8 @@ async def run_backtest(
continue
if columns is not None:
futures.append(loop.run_in_executor(
pool, _replay_and_signals, ticker.symbol, columns, config, activation
pool, _replay_and_signals, ticker.symbol, columns, config, activation,
benchmark_closes,
))
for result in await asyncio.gather(*futures, return_exceptions=True):
if isinstance(result, Exception):
@@ -1325,7 +1395,8 @@ async def run_backtest(
columns = await _fetch_columns(db, ticker.symbol)
if columns is not None:
_merge(await asyncio.to_thread(
_replay_and_signals, ticker.symbol, columns, config, activation
_replay_and_signals, ticker.symbol, columns, config, activation,
benchmark_closes,
))
except Exception:
logger.exception("Backtest replay failed for %s", ticker.symbol)