feat: ticker search, watchlist momentum column, alpha vs S&P 500
Three usability fixes: 1. Global ticker search in the sidebar (TickerSearch) — typeahead over the tracked universe that opens a ticker's detail page without adding it to the watchlist. Also wired into the mobile nav. 2. Watchlist table shows the ticker's 12-1 momentum percentile (the top-pick selector) instead of the noisy full S/R-level list. Enriched from the setup already loaded in watchlist_service._enrich_entry — no extra query. 3. Alpha vs the S&P 500 on paper trades (open + closed). New benchmark_prices table + benchmark_service store SPY daily closes (a standalone series, not a Ticker, so it never enters the scanner / momentum ranking / rankings) via a new daily-pipeline step. paper_trade_service computes per-trade benchmark_return / alpha_pct / alpha_usd over each holding period; the open- trades table, dashboard, and closed-trades panel surface per-trade and total alpha. The list read path never makes a provider call. Deploy: alembic upgrade head, then run the benchmark/daily job once to populate SPY closes (alpha shows "—" until then). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -11,6 +11,7 @@ from app.models.settings import SystemSetting, IngestionProgress
|
||||
from app.models.alert import AlertLog
|
||||
from app.models.paper_trade import PaperTrade
|
||||
from app.models.regime_snapshot import RegimeSnapshot
|
||||
from app.models.benchmark_price import BenchmarkPrice
|
||||
|
||||
__all__ = [
|
||||
"Ticker",
|
||||
@@ -28,4 +29,5 @@ __all__ = [
|
||||
"AlertLog",
|
||||
"PaperTrade",
|
||||
"RegimeSnapshot",
|
||||
"BenchmarkPrice",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
from datetime import date as date_type
|
||||
|
||||
from sqlalchemy import Date, Float, String, UniqueConstraint
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from app.database import Base
|
||||
|
||||
|
||||
class BenchmarkPrice(Base):
|
||||
"""Daily close for a benchmark index (e.g. SPY), used to compute trade alpha.
|
||||
|
||||
A standalone price series, deliberately NOT a tracked ``Ticker`` — so the
|
||||
benchmark never enters the scanner, the momentum-percentile ranking, or the
|
||||
rankings table. One row per (symbol, date).
|
||||
"""
|
||||
|
||||
__tablename__ = "benchmark_prices"
|
||||
__table_args__ = (
|
||||
UniqueConstraint("symbol", "date", name="uq_benchmark_symbol_date"),
|
||||
)
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
symbol: Mapped[str] = mapped_column(String(20), nullable=False, index=True)
|
||||
date: Mapped[date_type] = mapped_column(Date, nullable=False)
|
||||
close: Mapped[float] = mapped_column(Float, nullable=False)
|
||||
+33
-2
@@ -36,6 +36,7 @@ from app.providers.protocol import SentimentData
|
||||
from app.services import fundamental_service, ingestion_service, sentiment_service, settings_store
|
||||
from app.services.alert_service import dispatch_alerts
|
||||
from app.services.backtest_service import run_and_store as run_backtest_and_store
|
||||
from app.services.benchmark_service import refresh_benchmark_prices
|
||||
from app.services.market_regime_service import update_market_regime
|
||||
from app.services.regime_monitor_service import update_regime_monitor
|
||||
from app.services.event_study_service import run_and_store as run_event_study_and_store
|
||||
@@ -866,6 +867,34 @@ async def compute_market_regime() -> None:
|
||||
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Job: Benchmark Collector (SPY closes for paper-trade alpha)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def collect_benchmark() -> None:
|
||||
"""Refresh the stored benchmark (SPY) daily closes used for paper-trade alpha."""
|
||||
job_name = "benchmark_collector"
|
||||
_log_event(logging.INFO, "job_start", job=job_name)
|
||||
_runtime_start(job_name, total=1)
|
||||
|
||||
try:
|
||||
async with async_session_factory() as db:
|
||||
if not await _is_job_enabled(db, job_name):
|
||||
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
|
||||
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
|
||||
return
|
||||
|
||||
written = await refresh_benchmark_prices(db)
|
||||
|
||||
_runtime_progress(job_name, processed=1, total=1)
|
||||
_runtime_finish(job_name, "completed", processed=1, total=1, message=f"{written} rows")
|
||||
_log_event(logging.INFO, "job_complete", job=job_name, rows=written)
|
||||
except Exception as exc:
|
||||
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
|
||||
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Job: Regime Monitor
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -1016,6 +1045,7 @@ async def sync_ticker_universe() -> None:
|
||||
# Daily (full): the complete data→signal refresh, once a day.
|
||||
_DAILY_PIPELINE_STEPS = [
|
||||
("data_collector", "collect_ohlcv"),
|
||||
("benchmark_collector", "collect_benchmark"),
|
||||
("sentiment_collector", "collect_sentiment"),
|
||||
("rr_scanner", "scan_rr"),
|
||||
("outcome_evaluator", "evaluate_outcomes"),
|
||||
@@ -1068,8 +1098,8 @@ async def _run_pipeline(job_name: str, steps: list[tuple[str, str]]) -> None:
|
||||
|
||||
|
||||
async def run_daily_pipeline() -> None:
|
||||
"""Full daily flow: OHLCV → sentiment → R:R scan → outcome eval (+paper
|
||||
close) → market regime."""
|
||||
"""Full daily flow: OHLCV → benchmark → sentiment → R:R scan → outcome eval
|
||||
(+paper close) → market regime."""
|
||||
await _run_pipeline("daily_pipeline", _DAILY_PIPELINE_STEPS)
|
||||
|
||||
|
||||
@@ -1176,6 +1206,7 @@ def configure_scheduler(schedule_config: dict[str, str] | None = None) -> None:
|
||||
# interval job). They stay manually triggerable from Admin → Jobs.
|
||||
_members = [
|
||||
(collect_ohlcv, "data_collector", "Data Collector (OHLCV)"),
|
||||
(collect_benchmark, "benchmark_collector", "Benchmark Collector"),
|
||||
(collect_sentiment, "sentiment_collector", "Sentiment Collector"),
|
||||
(scan_rr, "rr_scanner", "R:R Scanner"),
|
||||
(evaluate_outcomes, "outcome_evaluator", "Outcome Evaluator"),
|
||||
|
||||
@@ -33,3 +33,8 @@ class PaperTradeResponse(BaseModel):
|
||||
close_price: float | None = None
|
||||
closed_at: datetime | None = None
|
||||
current_price: float | None = None
|
||||
# Alpha vs the S&P 500 (SPY) over the trade's holding period. None when the
|
||||
# benchmark series doesn't cover the trade's open date yet.
|
||||
benchmark_return_pct: float | None = None
|
||||
alpha_pct: float | None = None
|
||||
alpha_usd: float | None = None
|
||||
|
||||
@@ -32,6 +32,7 @@ class WatchlistEntryResponse(BaseModel):
|
||||
dimensions: list[DimensionScoreSummary] = []
|
||||
rr_ratio: float | None = None
|
||||
rr_direction: str | None = None
|
||||
momentum_percentile: float | None = None
|
||||
sr_levels: list[SRLevelSummary] = []
|
||||
last_close: float | None = None
|
||||
change_pct: float | None = None
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
"""Benchmark price store + alpha helpers.
|
||||
|
||||
Fetches the S&P 500 proxy (SPY) daily closes via Alpaca and persists them, so
|
||||
paper-trade alpha — a trade's return minus the benchmark's return over the same
|
||||
holding period — can be computed. The benchmark is a standalone series, NOT a
|
||||
tracked ``Ticker``, so it never contaminates the scanner, momentum-percentile
|
||||
ranking, or rankings.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import bisect
|
||||
import logging
|
||||
from datetime import date, timedelta
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import settings
|
||||
from app.models.benchmark_price import BenchmarkPrice
|
||||
from app.providers.alpaca import AlpacaOHLCVProvider
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BENCHMARK_SYMBOL = "SPY"
|
||||
# ~800 calendar days ≈ 550 trading days — comfortably covers any realistic paper
|
||||
# holding period plus a margin for the nearest-prior-trading-day lookup.
|
||||
_HISTORY_DAYS = 800
|
||||
|
||||
|
||||
async def refresh_benchmark_prices(
|
||||
db: AsyncSession, symbol: str = BENCHMARK_SYMBOL, days: int = _HISTORY_DAYS
|
||||
) -> int:
|
||||
"""Fetch the benchmark's daily closes and upsert them. Returns rows written.
|
||||
|
||||
Idempotent: inserts new dates, updates a close only if it changed (e.g. after
|
||||
a split adjustment). Best-effort — returns 0 when Alpaca keys are unset.
|
||||
"""
|
||||
if not settings.alpaca_api_key or not settings.alpaca_api_secret:
|
||||
logger.warning("Benchmark refresh skipped: Alpaca keys not configured")
|
||||
return 0
|
||||
|
||||
provider = AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret)
|
||||
end = date.today()
|
||||
start = end - timedelta(days=days)
|
||||
bars = await provider.fetch_ohlcv(symbol, start, end)
|
||||
|
||||
existing = {
|
||||
row.date: row
|
||||
for row in (
|
||||
await db.execute(select(BenchmarkPrice).where(BenchmarkPrice.symbol == symbol))
|
||||
).scalars()
|
||||
}
|
||||
|
||||
written = 0
|
||||
for bar in bars:
|
||||
current = existing.get(bar.date)
|
||||
if current is None:
|
||||
db.add(BenchmarkPrice(symbol=symbol, date=bar.date, close=float(bar.close)))
|
||||
written += 1
|
||||
elif abs(current.close - float(bar.close)) > 1e-9:
|
||||
current.close = float(bar.close)
|
||||
written += 1
|
||||
|
||||
if written:
|
||||
await db.commit()
|
||||
logger.info("Benchmark %s refreshed: %d rows written", symbol, written)
|
||||
return written
|
||||
|
||||
|
||||
async def load_benchmark_closes(
|
||||
db: AsyncSession, symbol: str = BENCHMARK_SYMBOL
|
||||
) -> dict[date, float]:
|
||||
"""Return ``{date: close}`` for the benchmark (empty if none stored yet)."""
|
||||
rows = await db.execute(
|
||||
select(BenchmarkPrice.date, BenchmarkPrice.close).where(BenchmarkPrice.symbol == symbol)
|
||||
)
|
||||
return {d: float(c) for d, c in rows.all()}
|
||||
|
||||
|
||||
def benchmark_return_pct(
|
||||
closes: dict[date, float], open_date: date, as_of_date: date
|
||||
) -> float | None:
|
||||
"""Benchmark % return between two dates, using the nearest close on/before each.
|
||||
|
||||
Returns ``None`` when there's no benchmark data at or before either endpoint
|
||||
(e.g. a trade opened before the stored history, or the table is empty).
|
||||
"""
|
||||
if not closes:
|
||||
return None
|
||||
dates = sorted(closes)
|
||||
|
||||
def _close_on_or_before(target: date) -> float | None:
|
||||
idx = bisect.bisect_right(dates, target) - 1
|
||||
return closes[dates[idx]] if idx >= 0 else None
|
||||
|
||||
start = _close_on_or_before(open_date)
|
||||
end = _close_on_or_before(as_of_date)
|
||||
if start is None or end is None or start == 0:
|
||||
return None
|
||||
return (end - start) / start * 100.0
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from datetime import date, datetime, timezone
|
||||
|
||||
from sqlalchemy import and_, func, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
@@ -11,6 +11,7 @@ from app.exceptions import NotFoundError, ValidationError
|
||||
from app.models.ohlcv import OHLCVRecord
|
||||
from app.models.paper_trade import PaperTrade
|
||||
from app.models.ticker import Ticker
|
||||
from app.services import benchmark_service
|
||||
from app.services.outcome_service import (
|
||||
OUTCOME_AMBIGUOUS,
|
||||
OUTCOME_STOP_HIT,
|
||||
@@ -85,7 +86,34 @@ async def create_trade(
|
||||
return trade
|
||||
|
||||
|
||||
def _to_dict(trade: PaperTrade, symbol: str, current_price: float | None) -> dict:
|
||||
def _to_dict(
|
||||
trade: PaperTrade,
|
||||
symbol: str,
|
||||
current_price: float | None,
|
||||
benchmark_closes: dict[date, float] | None = None,
|
||||
) -> dict:
|
||||
# For open trades, mark to market; for closed, the realized exit price.
|
||||
ref = current_price if trade.status == "open" else trade.close_price
|
||||
|
||||
# Alpha = trade return − benchmark (SPY) return over the same holding period.
|
||||
benchmark_return = None
|
||||
alpha_pct = None
|
||||
alpha_usd = None
|
||||
if ref is not None and trade.entry_price and benchmark_closes:
|
||||
sign = 1.0 if trade.direction == "long" else -1.0
|
||||
trade_return = (ref - trade.entry_price) / trade.entry_price * 100.0 * sign
|
||||
as_of = (
|
||||
trade.closed_at.date()
|
||||
if trade.status == "closed" and trade.closed_at is not None
|
||||
else date.today()
|
||||
)
|
||||
benchmark_return = benchmark_service.benchmark_return_pct(
|
||||
benchmark_closes, trade.opened_at.date(), as_of
|
||||
)
|
||||
if benchmark_return is not None:
|
||||
alpha_pct = trade_return - benchmark_return
|
||||
alpha_usd = alpha_pct / 100.0 * trade.entry_price * trade.shares
|
||||
|
||||
return {
|
||||
"id": trade.id,
|
||||
"symbol": symbol,
|
||||
@@ -98,8 +126,10 @@ def _to_dict(trade: PaperTrade, symbol: str, current_price: float | None) -> dic
|
||||
"opened_at": trade.opened_at,
|
||||
"close_price": trade.close_price,
|
||||
"closed_at": trade.closed_at,
|
||||
# For open trades, mark to market; for closed, the realized exit price.
|
||||
"current_price": current_price if trade.status == "open" else trade.close_price,
|
||||
"current_price": ref,
|
||||
"benchmark_return_pct": benchmark_return,
|
||||
"alpha_pct": alpha_pct,
|
||||
"alpha_usd": alpha_usd,
|
||||
}
|
||||
|
||||
|
||||
@@ -120,7 +150,13 @@ async def list_trades(
|
||||
rows = (await db.execute(stmt)).all()
|
||||
open_ids = {t.ticker_id for t, _ in rows if t.status == "open"}
|
||||
prices = await _latest_closes(db, open_ids)
|
||||
return [_to_dict(t, sym, prices.get(t.ticker_id)) for t, sym in rows]
|
||||
|
||||
# Benchmark closes for alpha — populated by the daily/benchmark job. Empty until
|
||||
# that runs once, in which case alpha is simply left unset (a read path never
|
||||
# makes a provider call).
|
||||
benchmark_closes = await benchmark_service.load_benchmark_closes(db)
|
||||
|
||||
return [_to_dict(t, sym, prices.get(t.ticker_id), benchmark_closes) for t, sym in rows]
|
||||
|
||||
|
||||
async def close_trade(
|
||||
|
||||
@@ -173,6 +173,9 @@ async def _enrich_entry(
|
||||
"dimensions": dims,
|
||||
"rr_ratio": setup.rr_ratio if setup else None,
|
||||
"rr_direction": setup.direction if setup else None,
|
||||
# 12-1 cross-sectional momentum percentile (the top-pick selector); ticker-
|
||||
# level, so any of the ticker's setups carries the same value.
|
||||
"momentum_percentile": setup.momentum_percentile if setup else None,
|
||||
"sr_levels": sr_levels,
|
||||
"last_close": last_close,
|
||||
"change_pct": change_pct,
|
||||
|
||||
Reference in New Issue
Block a user