diff --git a/alembic/versions/011_add_benchmark_prices.py b/alembic/versions/011_add_benchmark_prices.py new file mode 100644 index 0000000..dcbbf20 --- /dev/null +++ b/alembic/versions/011_add_benchmark_prices.py @@ -0,0 +1,41 @@ +"""add benchmark_prices + +Stores daily closes for a benchmark index (SPY) so paper-trade alpha — trade +return minus the benchmark's return over the same holding period — can be +computed. Kept separate from the tradeable universe: the benchmark is not a +Ticker, so it never enters the scanner, momentum ranking, or rankings. + +Revision ID: 011 +Revises: 010 +Create Date: 2026-06-28 00:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "011" +down_revision: Union[str, None] = "010" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "benchmark_prices", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("symbol", sa.String(length=20), nullable=False), + sa.Column("date", sa.Date(), nullable=False), + sa.Column("close", sa.Float(), nullable=False), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("symbol", "date", name="uq_benchmark_symbol_date"), + ) + op.create_index("ix_benchmark_prices_symbol", "benchmark_prices", ["symbol"]) + + +def downgrade() -> None: + op.drop_index("ix_benchmark_prices_symbol", table_name="benchmark_prices") + op.drop_table("benchmark_prices") diff --git a/app/models/__init__.py b/app/models/__init__.py index 29e309b..84cdaab 100644 --- a/app/models/__init__.py +++ b/app/models/__init__.py @@ -11,6 +11,7 @@ from app.models.settings import SystemSetting, IngestionProgress from app.models.alert import AlertLog from app.models.paper_trade import PaperTrade from app.models.regime_snapshot import RegimeSnapshot +from app.models.benchmark_price import BenchmarkPrice __all__ = [ "Ticker", @@ -28,4 +29,5 @@ __all__ = [ "AlertLog", "PaperTrade", "RegimeSnapshot", + "BenchmarkPrice", ] diff --git a/app/models/benchmark_price.py b/app/models/benchmark_price.py new file mode 100644 index 0000000..9956e5a --- /dev/null +++ b/app/models/benchmark_price.py @@ -0,0 +1,25 @@ +from datetime import date as date_type + +from sqlalchemy import Date, Float, String, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column + +from app.database import Base + + +class BenchmarkPrice(Base): + """Daily close for a benchmark index (e.g. SPY), used to compute trade alpha. + + A standalone price series, deliberately NOT a tracked ``Ticker`` — so the + benchmark never enters the scanner, the momentum-percentile ranking, or the + rankings table. One row per (symbol, date). + """ + + __tablename__ = "benchmark_prices" + __table_args__ = ( + UniqueConstraint("symbol", "date", name="uq_benchmark_symbol_date"), + ) + + id: Mapped[int] = mapped_column(primary_key=True) + symbol: Mapped[str] = mapped_column(String(20), nullable=False, index=True) + date: Mapped[date_type] = mapped_column(Date, nullable=False) + close: Mapped[float] = mapped_column(Float, nullable=False) diff --git a/app/scheduler.py b/app/scheduler.py index 360e8b8..82d7ae5 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -36,6 +36,7 @@ from app.providers.protocol import SentimentData from app.services import fundamental_service, ingestion_service, sentiment_service, settings_store from app.services.alert_service import dispatch_alerts from app.services.backtest_service import run_and_store as run_backtest_and_store +from app.services.benchmark_service import refresh_benchmark_prices from app.services.market_regime_service import update_market_regime from app.services.regime_monitor_service import update_regime_monitor from app.services.event_study_service import run_and_store as run_event_study_and_store @@ -866,6 +867,34 @@ async def compute_market_regime() -> None: _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) +# --------------------------------------------------------------------------- +# Job: Benchmark Collector (SPY closes for paper-trade alpha) +# --------------------------------------------------------------------------- + + +async def collect_benchmark() -> None: + """Refresh the stored benchmark (SPY) daily closes used for paper-trade alpha.""" + job_name = "benchmark_collector" + _log_event(logging.INFO, "job_start", job=job_name) + _runtime_start(job_name, total=1) + + try: + async with async_session_factory() as db: + if not await _is_job_enabled(db, job_name): + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") + _runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled") + return + + written = await refresh_benchmark_prices(db) + + _runtime_progress(job_name, processed=1, total=1) + _runtime_finish(job_name, "completed", processed=1, total=1, message=f"{written} rows") + _log_event(logging.INFO, "job_complete", job=job_name, rows=written) + except Exception as exc: + _runtime_finish(job_name, "error", processed=0, total=1, message=str(exc)) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) + + # --------------------------------------------------------------------------- # Job: Regime Monitor # --------------------------------------------------------------------------- @@ -1016,6 +1045,7 @@ async def sync_ticker_universe() -> None: # Daily (full): the complete data→signal refresh, once a day. _DAILY_PIPELINE_STEPS = [ ("data_collector", "collect_ohlcv"), + ("benchmark_collector", "collect_benchmark"), ("sentiment_collector", "collect_sentiment"), ("rr_scanner", "scan_rr"), ("outcome_evaluator", "evaluate_outcomes"), @@ -1068,8 +1098,8 @@ async def _run_pipeline(job_name: str, steps: list[tuple[str, str]]) -> None: async def run_daily_pipeline() -> None: - """Full daily flow: OHLCV → sentiment → R:R scan → outcome eval (+paper - close) → market regime.""" + """Full daily flow: OHLCV → benchmark → sentiment → R:R scan → outcome eval + (+paper close) → market regime.""" await _run_pipeline("daily_pipeline", _DAILY_PIPELINE_STEPS) @@ -1176,6 +1206,7 @@ def configure_scheduler(schedule_config: dict[str, str] | None = None) -> None: # interval job). They stay manually triggerable from Admin → Jobs. _members = [ (collect_ohlcv, "data_collector", "Data Collector (OHLCV)"), + (collect_benchmark, "benchmark_collector", "Benchmark Collector"), (collect_sentiment, "sentiment_collector", "Sentiment Collector"), (scan_rr, "rr_scanner", "R:R Scanner"), (evaluate_outcomes, "outcome_evaluator", "Outcome Evaluator"), diff --git a/app/schemas/paper_trade.py b/app/schemas/paper_trade.py index b3f8e1f..ccf5fd9 100644 --- a/app/schemas/paper_trade.py +++ b/app/schemas/paper_trade.py @@ -33,3 +33,8 @@ class PaperTradeResponse(BaseModel): close_price: float | None = None closed_at: datetime | None = None current_price: float | None = None + # Alpha vs the S&P 500 (SPY) over the trade's holding period. None when the + # benchmark series doesn't cover the trade's open date yet. + benchmark_return_pct: float | None = None + alpha_pct: float | None = None + alpha_usd: float | None = None diff --git a/app/schemas/watchlist.py b/app/schemas/watchlist.py index b0fa36f..494704b 100644 --- a/app/schemas/watchlist.py +++ b/app/schemas/watchlist.py @@ -32,6 +32,7 @@ class WatchlistEntryResponse(BaseModel): dimensions: list[DimensionScoreSummary] = [] rr_ratio: float | None = None rr_direction: str | None = None + momentum_percentile: float | None = None sr_levels: list[SRLevelSummary] = [] last_close: float | None = None change_pct: float | None = None diff --git a/app/services/benchmark_service.py b/app/services/benchmark_service.py new file mode 100644 index 0000000..805e371 --- /dev/null +++ b/app/services/benchmark_service.py @@ -0,0 +1,101 @@ +"""Benchmark price store + alpha helpers. + +Fetches the S&P 500 proxy (SPY) daily closes via Alpaca and persists them, so +paper-trade alpha — a trade's return minus the benchmark's return over the same +holding period — can be computed. The benchmark is a standalone series, NOT a +tracked ``Ticker``, so it never contaminates the scanner, momentum-percentile +ranking, or rankings. +""" + +from __future__ import annotations + +import bisect +import logging +from datetime import date, timedelta + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import settings +from app.models.benchmark_price import BenchmarkPrice +from app.providers.alpaca import AlpacaOHLCVProvider + +logger = logging.getLogger(__name__) + +BENCHMARK_SYMBOL = "SPY" +# ~800 calendar days ≈ 550 trading days — comfortably covers any realistic paper +# holding period plus a margin for the nearest-prior-trading-day lookup. +_HISTORY_DAYS = 800 + + +async def refresh_benchmark_prices( + db: AsyncSession, symbol: str = BENCHMARK_SYMBOL, days: int = _HISTORY_DAYS +) -> int: + """Fetch the benchmark's daily closes and upsert them. Returns rows written. + + Idempotent: inserts new dates, updates a close only if it changed (e.g. after + a split adjustment). Best-effort — returns 0 when Alpaca keys are unset. + """ + if not settings.alpaca_api_key or not settings.alpaca_api_secret: + logger.warning("Benchmark refresh skipped: Alpaca keys not configured") + return 0 + + provider = AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret) + end = date.today() + start = end - timedelta(days=days) + bars = await provider.fetch_ohlcv(symbol, start, end) + + existing = { + row.date: row + for row in ( + await db.execute(select(BenchmarkPrice).where(BenchmarkPrice.symbol == symbol)) + ).scalars() + } + + written = 0 + for bar in bars: + current = existing.get(bar.date) + if current is None: + db.add(BenchmarkPrice(symbol=symbol, date=bar.date, close=float(bar.close))) + written += 1 + elif abs(current.close - float(bar.close)) > 1e-9: + current.close = float(bar.close) + written += 1 + + if written: + await db.commit() + logger.info("Benchmark %s refreshed: %d rows written", symbol, written) + return written + + +async def load_benchmark_closes( + db: AsyncSession, symbol: str = BENCHMARK_SYMBOL +) -> dict[date, float]: + """Return ``{date: close}`` for the benchmark (empty if none stored yet).""" + rows = await db.execute( + select(BenchmarkPrice.date, BenchmarkPrice.close).where(BenchmarkPrice.symbol == symbol) + ) + return {d: float(c) for d, c in rows.all()} + + +def benchmark_return_pct( + closes: dict[date, float], open_date: date, as_of_date: date +) -> float | None: + """Benchmark % return between two dates, using the nearest close on/before each. + + Returns ``None`` when there's no benchmark data at or before either endpoint + (e.g. a trade opened before the stored history, or the table is empty). + """ + if not closes: + return None + dates = sorted(closes) + + def _close_on_or_before(target: date) -> float | None: + idx = bisect.bisect_right(dates, target) - 1 + return closes[dates[idx]] if idx >= 0 else None + + start = _close_on_or_before(open_date) + end = _close_on_or_before(as_of_date) + if start is None or end is None or start == 0: + return None + return (end - start) / start * 100.0 diff --git a/app/services/paper_trade_service.py b/app/services/paper_trade_service.py index 1f0b49a..397251f 100644 --- a/app/services/paper_trade_service.py +++ b/app/services/paper_trade_service.py @@ -2,7 +2,7 @@ from __future__ import annotations -from datetime import datetime, timezone +from datetime import date, datetime, timezone from sqlalchemy import and_, func, select from sqlalchemy.ext.asyncio import AsyncSession @@ -11,6 +11,7 @@ from app.exceptions import NotFoundError, ValidationError from app.models.ohlcv import OHLCVRecord from app.models.paper_trade import PaperTrade from app.models.ticker import Ticker +from app.services import benchmark_service from app.services.outcome_service import ( OUTCOME_AMBIGUOUS, OUTCOME_STOP_HIT, @@ -85,7 +86,34 @@ async def create_trade( return trade -def _to_dict(trade: PaperTrade, symbol: str, current_price: float | None) -> dict: +def _to_dict( + trade: PaperTrade, + symbol: str, + current_price: float | None, + benchmark_closes: dict[date, float] | None = None, +) -> dict: + # For open trades, mark to market; for closed, the realized exit price. + ref = current_price if trade.status == "open" else trade.close_price + + # Alpha = trade return − benchmark (SPY) return over the same holding period. + benchmark_return = None + alpha_pct = None + alpha_usd = None + if ref is not None and trade.entry_price and benchmark_closes: + sign = 1.0 if trade.direction == "long" else -1.0 + trade_return = (ref - trade.entry_price) / trade.entry_price * 100.0 * sign + as_of = ( + trade.closed_at.date() + if trade.status == "closed" and trade.closed_at is not None + else date.today() + ) + benchmark_return = benchmark_service.benchmark_return_pct( + benchmark_closes, trade.opened_at.date(), as_of + ) + if benchmark_return is not None: + alpha_pct = trade_return - benchmark_return + alpha_usd = alpha_pct / 100.0 * trade.entry_price * trade.shares + return { "id": trade.id, "symbol": symbol, @@ -98,8 +126,10 @@ def _to_dict(trade: PaperTrade, symbol: str, current_price: float | None) -> dic "opened_at": trade.opened_at, "close_price": trade.close_price, "closed_at": trade.closed_at, - # For open trades, mark to market; for closed, the realized exit price. - "current_price": current_price if trade.status == "open" else trade.close_price, + "current_price": ref, + "benchmark_return_pct": benchmark_return, + "alpha_pct": alpha_pct, + "alpha_usd": alpha_usd, } @@ -120,7 +150,13 @@ async def list_trades( rows = (await db.execute(stmt)).all() open_ids = {t.ticker_id for t, _ in rows if t.status == "open"} prices = await _latest_closes(db, open_ids) - return [_to_dict(t, sym, prices.get(t.ticker_id)) for t, sym in rows] + + # Benchmark closes for alpha — populated by the daily/benchmark job. Empty until + # that runs once, in which case alpha is simply left unset (a read path never + # makes a provider call). + benchmark_closes = await benchmark_service.load_benchmark_closes(db) + + return [_to_dict(t, sym, prices.get(t.ticker_id), benchmark_closes) for t, sym in rows] async def close_trade( diff --git a/app/services/watchlist_service.py b/app/services/watchlist_service.py index 7eebba0..557fb5d 100644 --- a/app/services/watchlist_service.py +++ b/app/services/watchlist_service.py @@ -173,6 +173,9 @@ async def _enrich_entry( "dimensions": dims, "rr_ratio": setup.rr_ratio if setup else None, "rr_direction": setup.direction if setup else None, + # 12-1 cross-sectional momentum percentile (the top-pick selector); ticker- + # level, so any of the ticker's setups carries the same value. + "momentum_percentile": setup.momentum_percentile if setup else None, "sr_levels": sr_levels, "last_close": last_close, "change_pct": change_pct, diff --git a/frontend/src/components/dashboard/OpenTradesPanel.tsx b/frontend/src/components/dashboard/OpenTradesPanel.tsx index cf77894..e7fa2df 100644 --- a/frontend/src/components/dashboard/OpenTradesPanel.tsx +++ b/frontend/src/components/dashboard/OpenTradesPanel.tsx @@ -21,16 +21,21 @@ export function OpenTradesPanel() { const close = useClosePaperTrade(); const totals = useMemo(() => { - let pnl = 0, winners = 0, losers = 0, priced = 0; + let pnl = 0, winners = 0, losers = 0, priced = 0, alphaUsd = 0, alphaPriced = 0; for (const t of trades ?? []) { const p = tradePnl(t); - if (!p) continue; - priced += 1; - pnl += p.pnl; - if (p.pnl > 0) winners += 1; - else if (p.pnl < 0) losers += 1; + if (p) { + priced += 1; + pnl += p.pnl; + if (p.pnl > 0) winners += 1; + else if (p.pnl < 0) losers += 1; + } + if (t.alpha_usd != null) { + alphaUsd += t.alpha_usd; + alphaPriced += 1; + } } - return { pnl, winners, losers, priced }; + return { pnl, winners, losers, priced, alphaUsd, alphaPriced }; }, [trades]); if (isLoading) return null; @@ -58,6 +63,7 @@ export function OpenTradesPanel() {