Files
signal-platform/app/routers/market.py
T
dennisthiessen 824c15cf69 feat: breadth-divergence early-warning indicator + event study
Adds a leading-by-construction candidate and the harness to measure whether it
actually leads regime breaks, before any of it earns weight in the live index.

- breadth_service: % of the stored universe above its own 200-DMA + a divergence
  score (benchmark price up while breadth falls, nudged by low breadth). Genuinely
  leading because it keys on divergence, not level. Not wired into the live score.
- event_study_service: detect drawdown events on the benchmark, then measure each
  indicator's median lead time (event-centered) and precision/recall vs. the base
  rate (signal-centered). Compares breadth-divergence against the deterministic
  coincident price composite (reuses the regime price sub-scores). Price/breadth
  only — reproducible, no LLM/FRED.
- Manual "Event Study" job (Admin → Jobs), GET /regime/event-study, and an
  inline early-warning panel on the Regime tab with an honest small-sample caveat.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-26 14:08:52 +02:00

132 lines
4.9 KiB
Python

"""Market-level endpoints (benchmark regime + AI/Tech regime-change monitor)."""
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from app.dependencies import get_db, require_access, require_admin
from app.models.user import User
from app.schemas.common import APIEnvelope
from app.services import event_study_service, regime_monitor_service
from app.services.backtest_service import get_backtest_report
from app.services.market_regime_service import get_market_regime
router = APIRouter(tags=["market"])
@router.get("/market/regime", response_model=APIEnvelope)
async def market_regime(
_user: User = Depends(require_access),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
"""Current benchmark (SPY) trend regime: bullish / bearish / neutral."""
data = await get_market_regime(db)
return APIEnvelope(status="success", data=data)
@router.get("/backtest/report", response_model=APIEnvelope)
async def backtest_report(
_user: User = Depends(require_access),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
"""Latest cached historical backtest report (None until the job runs)."""
data = await get_backtest_report(db)
return APIEnvelope(status="success", data=data)
# ---------------------------------------------------------------------------
# AI/Tech Regime-Change Monitor (standalone, observational)
# ---------------------------------------------------------------------------
class RegimeConfigUpdate(BaseModel):
weights: dict[str, float] | None = None
alert_threshold: float | None = None
tickers: dict | None = None
leader_weight: float | None = None
rs_lookback: int | None = None
fundamental_staleness_days: int | None = None
class RegimeFundamentalsUpdate(BaseModel):
f1_score: float | None = None
f3_score: float | None = None
locked: bool | None = None
@router.get("/regime/monitor", response_model=APIEnvelope)
async def regime_monitor(
_user: User = Depends(require_access),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
"""Latest AI/Tech regime-change index (0-100) + per-signal breakdown + trend."""
data = await regime_monitor_service.get_regime_monitor(db)
return APIEnvelope(status="success", data=data)
@router.get("/regime/config", response_model=APIEnvelope)
async def regime_config(
_admin: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
"""Editable weights / thresholds / ticker lists for the regime monitor."""
data = await regime_monitor_service.get_regime_config(db)
return APIEnvelope(status="success", data=data)
@router.put("/regime/config", response_model=APIEnvelope)
async def update_regime_config(
body: RegimeConfigUpdate,
_admin: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
"""Merge the supplied fields into the stored regime-monitor config."""
updates = body.model_dump(exclude_none=True)
data = await regime_monitor_service.update_regime_config(db, updates)
return APIEnvelope(status="success", data=data)
@router.get("/regime/fundamentals", response_model=APIEnvelope)
async def regime_fundamentals(
_admin: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
"""Current F1 (capex) / F3 (earnings reaction) override (LLM-proposed or manual)."""
data = await regime_monitor_service.get_fundamental_overrides(db)
return APIEnvelope(status="success", data=data)
@router.put("/regime/fundamentals", response_model=APIEnvelope)
async def update_regime_fundamentals(
body: RegimeFundamentalsUpdate,
_admin: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
"""Manually override F1/F3 (locks out the LLM refresh until unlocked)."""
data = await regime_monitor_service.set_fundamental_overrides(
db, f1_score=body.f1_score, f3_score=body.f3_score, locked=body.locked
)
return APIEnvelope(status="success", data=data)
@router.post("/regime/fundamentals/refresh", response_model=APIEnvelope)
async def refresh_regime_fundamentals(
_admin: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
"""Ask the configured LLM to re-estimate F1/F3 now (forces past a lock)."""
data = await regime_monitor_service.refresh_fundamental_overrides(db, force=True)
return APIEnvelope(status="success", data=data)
@router.get("/regime/event-study", response_model=APIEnvelope)
async def regime_event_study(
_user: User = Depends(require_access),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
"""Cached early-warning event study (lead time vs. historical drawdowns).
None until the manual "Event Study" job has run (Admin → Jobs)."""
data = await event_study_service.get_event_study_report(db)
return APIEnvelope(status="success", data=data)