"""Cross-sectional residual 12-1 momentum ranking for the universe. The activation gate selects the top ``min_momentum_percentile`` of the universe by residual 12-1 month momentum: the stock's 12-1 return after subtracting its estimated benchmark beta contribution over the same formation window. The daily scan ranks every ticker and stores each setup's percentile (see ``rr_scanner_service``), so the live list, the Track Record's qualified stats, and outcome evaluation all gate on the same value. """ from __future__ import annotations import json import logging from datetime import date from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.ticker import Ticker from app.services.price_service import query_ohlcv logger = logging.getLogger(__name__) # 12-1 momentum: ~12 months of daily history (252 bars) with the last ~1 month # (21 bars) skipped. Matches the backtest's _signal_values / _window_setups. _MOM_LOOKBACK = 252 _MOM_SKIP = 21 def compute_12_1_momentum(closes: list[float]) -> float | None: """Return over the window ending ~1 month ago, starting ~12 months ago. None when there isn't a full year of history.""" if len(closes) >= _MOM_LOOKBACK + 1 and closes[-(_MOM_LOOKBACK + 1)] > 0: return closes[-(_MOM_SKIP + 1)] / closes[-(_MOM_LOOKBACK + 1)] - 1.0 return None def compute_residual_12_1_momentum( dates: list[date], closes: list[float], benchmark_closes: dict[date, float], ) -> float | None: """12-1 momentum after removing linear benchmark exposure. Estimate beta from daily stock/benchmark returns over the standard 12-1 formation window, then sum stock return minus beta * benchmark return. No intercept is subtracted: fitting an intercept over the same window would make residuals sum to roughly zero and destroy the ranking signal. """ i = len(closes) - 1 if not benchmark_closes or len(dates) != len(closes) or i - _MOM_LOOKBACK < 0: return None stock_rets: list[float] = [] market_rets: list[float] = [] for k in range(i - _MOM_LOOKBACK + 1, i - _MOM_SKIP + 1): prev_close = closes[k - 1] bench_prev = benchmark_closes.get(dates[k - 1]) bench_cur = benchmark_closes.get(dates[k]) if prev_close <= 0 or bench_prev is None or bench_cur is None or bench_prev <= 0: continue stock_rets.append(closes[k] / prev_close - 1.0) market_rets.append(bench_cur / bench_prev - 1.0) if len(stock_rets) < 100: return None mean_market = sum(market_rets) / len(market_rets) mean_stock = sum(stock_rets) / len(stock_rets) var_market = sum((x - mean_market) ** 2 for x in market_rets) if var_market <= 0: return None cov = sum( (stock_rets[k] - mean_stock) * (market_rets[k] - mean_market) for k in range(len(stock_rets)) ) beta = cov / var_market return sum(stock_rets[k] - beta * market_rets[k] for k in range(len(stock_rets))) async def _load_activation_benchmark(db: AsyncSession) -> dict[date, float]: """Load SPY closes for residual momentum; refresh once if the table is empty.""" try: from app.services.benchmark_service import load_benchmark_closes, refresh_benchmark_prices closes = await load_benchmark_closes(db) if closes: return closes await refresh_benchmark_prices(db) return await load_benchmark_closes(db) except Exception: logger.exception("Residual momentum benchmark load failed; falling back to raw momentum") return {} async def compute_momentum_percentiles(db: AsyncSession) -> dict[str, float]: """Compute each ticker's activation momentum rank. Production uses residual 12-1 momentum when benchmark data is available. If SPY data is absent, fall back to raw 12-1 momentum rather than disabling the scanner. Tickers without enough stock/benchmark history are absent. """ result = await db.execute(select(Ticker).order_by(Ticker.symbol)) tickers = list(result.scalars().all()) benchmark_closes = await _load_activation_benchmark(db) using_residual = len(benchmark_closes) >= _MOM_LOOKBACK values: dict[str, float] = {} for ticker in tickers: try: records = await query_ohlcv(db, ticker.symbol) except Exception: logger.exception("Momentum fetch failed for %s", ticker.symbol) continue closes = [float(r.close) for r in records] value = ( compute_residual_12_1_momentum([r.date for r in records], closes, benchmark_closes) if using_residual else compute_12_1_momentum(closes) ) if value is not None: values[ticker.symbol] = value ranked = sorted(values, key=lambda s: values[s]) n = len(ranked) percentiles = { sym: round((rank / (n - 1) * 100.0) if n > 1 else 100.0, 2) for rank, sym in enumerate(ranked) } logger.info(json.dumps({ "event": "momentum_ranked", "signal": "residual_12_1" if using_residual else "raw_12_1_fallback", "tickers": n, })) return percentiles