Files
dennisthiessen aadec7d403
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 1m8s
Deploy / deploy (push) Successful in 35s
promote residual momentum ranking
2026-07-02 21:00:39 +02:00

137 lines
5.1 KiB
Python

"""Cross-sectional residual 12-1 momentum ranking for the universe.
The activation gate selects the top ``min_momentum_percentile`` of the universe
by residual 12-1 month momentum: the stock's 12-1 return after subtracting its
estimated benchmark beta contribution over the same formation window. The daily
scan ranks every ticker and stores each setup's percentile (see
``rr_scanner_service``), so the live list, the Track Record's qualified stats,
and outcome evaluation all gate on the same value.
"""
from __future__ import annotations
import json
import logging
from datetime import date
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ticker import Ticker
from app.services.price_service import query_ohlcv
logger = logging.getLogger(__name__)
# 12-1 momentum: ~12 months of daily history (252 bars) with the last ~1 month
# (21 bars) skipped. Matches the backtest's _signal_values / _window_setups.
_MOM_LOOKBACK = 252
_MOM_SKIP = 21
def compute_12_1_momentum(closes: list[float]) -> float | None:
"""Return over the window ending ~1 month ago, starting ~12 months ago.
None when there isn't a full year of history."""
if len(closes) >= _MOM_LOOKBACK + 1 and closes[-(_MOM_LOOKBACK + 1)] > 0:
return closes[-(_MOM_SKIP + 1)] / closes[-(_MOM_LOOKBACK + 1)] - 1.0
return None
def compute_residual_12_1_momentum(
dates: list[date],
closes: list[float],
benchmark_closes: dict[date, float],
) -> float | None:
"""12-1 momentum after removing linear benchmark exposure.
Estimate beta from daily stock/benchmark returns over the standard 12-1
formation window, then sum stock return minus beta * benchmark return. No
intercept is subtracted: fitting an intercept over the same window would make
residuals sum to roughly zero and destroy the ranking signal.
"""
i = len(closes) - 1
if not benchmark_closes or len(dates) != len(closes) or i - _MOM_LOOKBACK < 0:
return None
stock_rets: list[float] = []
market_rets: list[float] = []
for k in range(i - _MOM_LOOKBACK + 1, i - _MOM_SKIP + 1):
prev_close = closes[k - 1]
bench_prev = benchmark_closes.get(dates[k - 1])
bench_cur = benchmark_closes.get(dates[k])
if prev_close <= 0 or bench_prev is None or bench_cur is None or bench_prev <= 0:
continue
stock_rets.append(closes[k] / prev_close - 1.0)
market_rets.append(bench_cur / bench_prev - 1.0)
if len(stock_rets) < 100:
return None
mean_market = sum(market_rets) / len(market_rets)
mean_stock = sum(stock_rets) / len(stock_rets)
var_market = sum((x - mean_market) ** 2 for x in market_rets)
if var_market <= 0:
return None
cov = sum(
(stock_rets[k] - mean_stock) * (market_rets[k] - mean_market)
for k in range(len(stock_rets))
)
beta = cov / var_market
return sum(stock_rets[k] - beta * market_rets[k] for k in range(len(stock_rets)))
async def _load_activation_benchmark(db: AsyncSession) -> dict[date, float]:
"""Load SPY closes for residual momentum; refresh once if the table is empty."""
try:
from app.services.benchmark_service import load_benchmark_closes, refresh_benchmark_prices
closes = await load_benchmark_closes(db)
if closes:
return closes
await refresh_benchmark_prices(db)
return await load_benchmark_closes(db)
except Exception:
logger.exception("Residual momentum benchmark load failed; falling back to raw momentum")
return {}
async def compute_momentum_percentiles(db: AsyncSession) -> dict[str, float]:
"""Compute each ticker's activation momentum rank.
Production uses residual 12-1 momentum when benchmark data is available. If
SPY data is absent, fall back to raw 12-1 momentum rather than disabling the
scanner. Tickers without enough stock/benchmark history are absent.
"""
result = await db.execute(select(Ticker).order_by(Ticker.symbol))
tickers = list(result.scalars().all())
benchmark_closes = await _load_activation_benchmark(db)
using_residual = len(benchmark_closes) >= _MOM_LOOKBACK
values: dict[str, float] = {}
for ticker in tickers:
try:
records = await query_ohlcv(db, ticker.symbol)
except Exception:
logger.exception("Momentum fetch failed for %s", ticker.symbol)
continue
closes = [float(r.close) for r in records]
value = (
compute_residual_12_1_momentum([r.date for r in records], closes, benchmark_closes)
if using_residual
else compute_12_1_momentum(closes)
)
if value is not None:
values[ticker.symbol] = value
ranked = sorted(values, key=lambda s: values[s])
n = len(ranked)
percentiles = {
sym: round((rank / (n - 1) * 100.0) if n > 1 else 100.0, 2)
for rank, sym in enumerate(ranked)
}
logger.info(json.dumps({
"event": "momentum_ranked",
"signal": "residual_12_1" if using_residual else "raw_12_1_fallback",
"tickers": n,
}))
return percentiles