momentum gate: long-only + wire the percentile onto live setups
Part 1 — long-only. The momentum edge is long top-momentum; the gate was
qualifying shorts on high-momentum names (fighting the trend), which showed as
the -0.13R Short(qual.) drag. While the gate is active, shorts no longer qualify
(backend qualification, backtest _momentum_qualifies, and the frontend mirror).
Part 2 — production wiring. Live setups now carry a real momentum rank, so the
dashboard, the Track Record's qualified stats, and outcome evaluation all gate on
the same value instead of deferring to floors:
- new momentum_service.compute_momentum_percentiles: 12-1 momentum per ticker,
ranked across the universe into a {symbol: percentile} map.
- the daily R:R scan ranks the universe up front and stores each setup's
percentile (new trade_setups.momentum_percentile column, migration 010).
- enhance_trade_setup mutates the same row, so the percentile is preserved;
_trade_setup_to_dict + TradeSetupResponse expose it to the API.
Until a fresh scan runs, pre-existing setups have a null percentile and the gate
falls back to floors for them (longs) / excludes them (shorts) — they fill in on
the next scan. 341 backend tests pass; frontend build clean.
Needs the alembic upgrade (migration 010) on deploy.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,34 @@
|
||||
"""add momentum_percentile to trade_setups
|
||||
|
||||
The activation gate selects the top slice of the universe by 12-1 month momentum.
|
||||
That rank is computed across all tickers at scan time and stored on each setup so
|
||||
the live list, the Track Record's qualified stats, and outcome evaluation all gate
|
||||
on the same value (momentum as of the setup's detection).
|
||||
|
||||
Revision ID: 010
|
||||
Revises: 009
|
||||
Create Date: 2026-06-23 00:00:00.000000
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "010"
|
||||
down_revision: Union[str, None] = "009"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column(
|
||||
"trade_setups",
|
||||
sa.Column("momentum_percentile", sa.Float(), nullable=True),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_column("trade_setups", "momentum_percentile")
|
||||
@@ -26,6 +26,9 @@ class TradeSetup(Base):
|
||||
)
|
||||
|
||||
confidence_score: Mapped[float | None] = mapped_column(Float, nullable=True)
|
||||
# Ticker's 12-1 momentum percentile across the universe at detection time
|
||||
# (0–100, 100 = strongest). Drives the activation gate's core selection.
|
||||
momentum_percentile: Mapped[float | None] = mapped_column(Float, nullable=True)
|
||||
targets_json: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
conflict_flags_json: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
recommended_action: Mapped[str | None] = mapped_column(String(20), nullable=True)
|
||||
|
||||
@@ -48,4 +48,5 @@ class TradeSetupResponse(BaseModel):
|
||||
outcome_date: date | None = None
|
||||
evaluated_at: datetime | None = None
|
||||
current_price: float | None = None
|
||||
momentum_percentile: float | None = None
|
||||
recommendation_summary: RecommendationSummaryResponse | None = None
|
||||
|
||||
@@ -580,11 +580,14 @@ def _assign_momentum_percentiles(candidates: list[dict]) -> None:
|
||||
|
||||
def _momentum_qualifies(cand: dict, threshold: float) -> bool:
|
||||
"""Whether a candidate clears the floors (meets_core) and the momentum gate.
|
||||
Threshold 0 disables the momentum gate (floors only)."""
|
||||
Threshold 0 disables the momentum gate (floors only). The gate is long-only:
|
||||
while it's active, shorts (fighting the trend) never qualify."""
|
||||
if not cand["meets_core"]:
|
||||
return False
|
||||
if threshold <= 0:
|
||||
return True
|
||||
if cand["direction"] == "short":
|
||||
return False
|
||||
mp = cand.get("momentum_percentile")
|
||||
return mp is not None and mp >= threshold
|
||||
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
"""Cross-sectional 12-1 momentum ranking for the universe.
|
||||
|
||||
The activation gate selects the top ``min_momentum_percentile`` of the universe
|
||||
by 12-1 month momentum (return from ~12 months ago to ~1 month ago — the one
|
||||
price signal the backtest showed sorts forward returns). The daily scan ranks
|
||||
every ticker and stores each setup's percentile (see ``rr_scanner_service``), so
|
||||
the live list, the Track Record's qualified stats, and outcome evaluation all gate
|
||||
on the same value.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.models.ticker import Ticker
|
||||
from app.services.price_service import query_ohlcv
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 12-1 momentum: ~12 months of daily history (252 bars) with the last ~1 month
|
||||
# (21 bars) skipped. Matches the backtest's _signal_values / _window_setups.
|
||||
_MOM_LOOKBACK = 252
|
||||
_MOM_SKIP = 21
|
||||
|
||||
|
||||
def compute_12_1_momentum(closes: list[float]) -> float | None:
|
||||
"""Return over the window ending ~1 month ago, starting ~12 months ago.
|
||||
None when there isn't a full year of history."""
|
||||
if len(closes) >= _MOM_LOOKBACK + 1 and closes[-(_MOM_LOOKBACK + 1)] > 0:
|
||||
return closes[-(_MOM_SKIP + 1)] / closes[-(_MOM_LOOKBACK + 1)] - 1.0
|
||||
return None
|
||||
|
||||
|
||||
async def compute_momentum_percentiles(db: AsyncSession) -> dict[str, float]:
|
||||
"""Compute each ticker's 12-1 momentum and rank the universe into a
|
||||
``{symbol: percentile}`` map (0–100, 100 = strongest momentum). Tickers
|
||||
without a full year of history are absent (can't be ranked)."""
|
||||
result = await db.execute(select(Ticker).order_by(Ticker.symbol))
|
||||
tickers = list(result.scalars().all())
|
||||
|
||||
momentum: dict[str, float] = {}
|
||||
for ticker in tickers:
|
||||
try:
|
||||
records = await query_ohlcv(db, ticker.symbol)
|
||||
except Exception:
|
||||
logger.exception("Momentum fetch failed for %s", ticker.symbol)
|
||||
continue
|
||||
m = compute_12_1_momentum([float(r.close) for r in records])
|
||||
if m is not None:
|
||||
momentum[ticker.symbol] = m
|
||||
|
||||
ranked = sorted(momentum, key=lambda s: momentum[s])
|
||||
n = len(ranked)
|
||||
percentiles = {
|
||||
sym: round((rank / (n - 1) * 100.0) if n > 1 else 100.0, 2)
|
||||
for rank, sym in enumerate(ranked)
|
||||
}
|
||||
logger.info(json.dumps({"event": "momentum_ranked", "tickers": n}))
|
||||
return percentiles
|
||||
@@ -67,11 +67,15 @@ def setup_qualifies(setup: Any, config: dict) -> bool:
|
||||
if (setup.confidence_score or 0.0) < config["min_confidence"]:
|
||||
return False
|
||||
# Cross-sectional momentum: the core selection. A setup's ticker must rank in
|
||||
# the top ``min_momentum_percentile`` of the universe by 12-1 momentum. Only
|
||||
# enforced when a percentile is attached (live setups / backtest) and a
|
||||
# threshold is set; callers that don't attach it defer to the floors above.
|
||||
# the top ``min_momentum_percentile`` of the universe by 12-1 momentum. The
|
||||
# validated edge is long-only, so while the gate is active shorts (which fight
|
||||
# the trend) never qualify. The percentile floor is only enforced when a
|
||||
# percentile is attached (live setups / backtest); callers that don't attach
|
||||
# it defer to the floors above.
|
||||
min_pct = float(config.get("min_momentum_percentile", 0.0))
|
||||
if min_pct > 0:
|
||||
if (getattr(setup, "direction", "long") or "long") == "short":
|
||||
return False
|
||||
momentum_percentile = getattr(setup, "momentum_percentile", None)
|
||||
if momentum_percentile is not None and momentum_percentile < min_pct:
|
||||
return False
|
||||
|
||||
@@ -81,8 +81,13 @@ async def scan_ticker(
|
||||
symbol: str,
|
||||
rr_threshold: float = 1.5,
|
||||
atr_multiplier: float = 1.5,
|
||||
momentum_percentile: float | None = None,
|
||||
) -> list[TradeSetup]:
|
||||
"""Scan a single ticker for trade setups meeting the R:R threshold."""
|
||||
"""Scan a single ticker for trade setups meeting the R:R threshold.
|
||||
|
||||
``momentum_percentile`` is the ticker's 12-1 momentum rank across the universe
|
||||
(computed by the caller), stored on each setup so the activation gate can
|
||||
select the top slice."""
|
||||
ticker = await _get_ticker(db, symbol)
|
||||
|
||||
records = await query_ohlcv(db, symbol)
|
||||
@@ -169,6 +174,7 @@ async def scan_ticker(
|
||||
rr_ratio=round(best_candidate_rr, 4),
|
||||
composite_score=round(composite_score, 4),
|
||||
detected_at=now,
|
||||
momentum_percentile=momentum_percentile,
|
||||
))
|
||||
|
||||
if levels_below:
|
||||
@@ -202,6 +208,7 @@ async def scan_ticker(
|
||||
rr_ratio=round(best_candidate_rr, 4),
|
||||
composite_score=round(composite_score, 4),
|
||||
detected_at=now,
|
||||
momentum_percentile=momentum_percentile,
|
||||
))
|
||||
|
||||
available_directions = {s.direction for s in setups}
|
||||
@@ -249,6 +256,16 @@ async def scan_all_tickers(
|
||||
tickers = list(result.scalars().all())
|
||||
total = len(tickers)
|
||||
|
||||
# Rank the universe by 12-1 momentum up front so each new setup carries its
|
||||
# ticker's percentile (used by the activation gate). Best-effort.
|
||||
try:
|
||||
from app.services import momentum_service
|
||||
|
||||
percentiles = await momentum_service.compute_momentum_percentiles(db)
|
||||
except Exception:
|
||||
logger.exception("Momentum ranking refresh failed")
|
||||
percentiles = {}
|
||||
|
||||
all_setups: list[TradeSetup] = []
|
||||
for index, ticker in enumerate(tickers):
|
||||
if progress_callback is not None:
|
||||
@@ -267,7 +284,8 @@ async def scan_all_tickers(
|
||||
logger.exception("Error refreshing scores for %s", ticker.symbol)
|
||||
|
||||
setups = await scan_ticker(
|
||||
db, ticker.symbol, rr_threshold, atr_multiplier
|
||||
db, ticker.symbol, rr_threshold, atr_multiplier,
|
||||
momentum_percentile=percentiles.get(ticker.symbol),
|
||||
)
|
||||
all_setups.extend(setups)
|
||||
except Exception:
|
||||
@@ -410,4 +428,5 @@ def _trade_setup_to_dict(setup: TradeSetup, symbol: str, current_price: float |
|
||||
"outcome_date": setup.outcome_date,
|
||||
"evaluated_at": setup.evaluated_at,
|
||||
"current_price": current_price,
|
||||
"momentum_percentile": setup.momentum_percentile,
|
||||
}
|
||||
|
||||
@@ -33,11 +33,14 @@ export function qualifiesSetup(setup: TradeSetup, config: ActivationConfig): boo
|
||||
return false;
|
||||
}
|
||||
if ((setup.confidence_score ?? 0) < config.min_confidence) return false;
|
||||
// Cross-sectional momentum is the core selection — only enforced when a
|
||||
// percentile is attached and a threshold is set; otherwise defer to the floors.
|
||||
if (config.min_momentum_percentile > 0 && setup.momentum_percentile != null
|
||||
&& setup.momentum_percentile < config.min_momentum_percentile) {
|
||||
return false;
|
||||
// Cross-sectional momentum is the core selection (long-only). While the gate is
|
||||
// active, shorts never qualify; the percentile floor is enforced only when a
|
||||
// percentile is attached, otherwise defer to the floors.
|
||||
if (config.min_momentum_percentile > 0) {
|
||||
if (setup.direction === 'short') return false;
|
||||
if (setup.momentum_percentile != null && setup.momentum_percentile < config.min_momentum_percentile) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (config.require_high_conviction && !HIGH_CONVICTION_ACTIONS.has(setup.recommended_action ?? '')) {
|
||||
return false;
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
"""Unit tests for the cross-sectional 12-1 momentum ranking."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date, timedelta
|
||||
|
||||
import pytest
|
||||
|
||||
from app.models.ohlcv import OHLCVRecord
|
||||
from app.models.ticker import Ticker
|
||||
from app.services import momentum_service as ms
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def session():
|
||||
from tests.conftest import _test_session_factory
|
||||
|
||||
async with _test_session_factory() as s:
|
||||
yield s
|
||||
|
||||
|
||||
async def _seed(session, symbol: str, rate: float, n: int = 280) -> None:
|
||||
t = Ticker(symbol=symbol)
|
||||
session.add(t)
|
||||
await session.flush()
|
||||
base = date(2024, 1, 1)
|
||||
for i in range(n):
|
||||
close = 100.0 * (rate ** i)
|
||||
session.add(OHLCVRecord(
|
||||
ticker_id=t.id,
|
||||
date=base + timedelta(days=i),
|
||||
open=close, high=close, low=close, close=close,
|
||||
volume=1_000_000,
|
||||
))
|
||||
await session.commit()
|
||||
|
||||
|
||||
def test_compute_momentum_insufficient_history():
|
||||
assert ms.compute_12_1_momentum([100.0] * 100) is None
|
||||
|
||||
|
||||
def test_compute_momentum_value():
|
||||
closes = [100.0 * (1.01 ** i) for i in range(300)]
|
||||
m = ms.compute_12_1_momentum(closes)
|
||||
# 12-1 momentum skips the last month: close[-22] / close[-253] - 1
|
||||
assert m == closes[-22] / closes[-253] - 1.0
|
||||
assert m > 0
|
||||
|
||||
|
||||
async def test_ranks_universe_into_percentiles(session):
|
||||
await _seed(session, "HIGH", rate=1.010) # strong uptrend → top momentum
|
||||
await _seed(session, "MID", rate=1.002)
|
||||
await _seed(session, "LOW", rate=0.999) # declining → bottom momentum
|
||||
|
||||
pct = await ms.compute_momentum_percentiles(session)
|
||||
assert pct["HIGH"] == 100.0
|
||||
assert pct["MID"] == 50.0
|
||||
assert pct["LOW"] == 0.0
|
||||
|
||||
|
||||
async def test_short_history_ticker_is_unranked(session):
|
||||
await _seed(session, "LONG", rate=1.005)
|
||||
await _seed(session, "SHORTHX", rate=1.005, n=100) # < 1y → no momentum
|
||||
|
||||
pct = await ms.compute_momentum_percentiles(session)
|
||||
assert "LONG" in pct
|
||||
assert "SHORTHX" not in pct
|
||||
|
||||
|
||||
async def test_empty_universe_returns_empty(session):
|
||||
assert await ms.compute_momentum_percentiles(session) == {}
|
||||
@@ -90,6 +90,15 @@ class TestMomentumGate:
|
||||
legacy = {k: v for k, v in DEFAULT_GATE.items() if k != "min_momentum_percentile"}
|
||||
assert setup_qualifies(_setup(momentum_percentile=10.0), legacy) is True
|
||||
|
||||
def test_short_excluded_when_gate_active(self):
|
||||
# The momentum edge is long-only — a short never qualifies while the gate
|
||||
# is on, even on a top-momentum name.
|
||||
assert setup_qualifies(_setup(direction="short", momentum_percentile=95.0), MOMENTUM_GATE) is False
|
||||
|
||||
def test_short_allowed_when_gate_off(self):
|
||||
# With the momentum gate disabled, shorts pass on the floors as before.
|
||||
assert setup_qualifies(_setup(direction="short", momentum_percentile=10.0), DEFAULT_GATE) is True
|
||||
|
||||
|
||||
class TestStrictTighteners:
|
||||
def test_clean_high_conviction_passes(self):
|
||||
|
||||
Reference in New Issue
Block a user