diff --git a/app/routers/market.py b/app/routers/market.py index 63964d3..bcac902 100644 --- a/app/routers/market.py +++ b/app/routers/market.py @@ -7,7 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.dependencies import get_db, require_access, require_admin from app.models.user import User from app.schemas.common import APIEnvelope -from app.services import regime_monitor_service +from app.services import event_study_service, regime_monitor_service from app.services.backtest_service import get_backtest_report from app.services.market_regime_service import get_market_regime @@ -117,3 +117,15 @@ async def refresh_regime_fundamentals( """Ask the configured LLM to re-estimate F1/F3 now (forces past a lock).""" data = await regime_monitor_service.refresh_fundamental_overrides(db, force=True) return APIEnvelope(status="success", data=data) + + +@router.get("/regime/event-study", response_model=APIEnvelope) +async def regime_event_study( + _user: User = Depends(require_access), + db: AsyncSession = Depends(get_db), +) -> APIEnvelope: + """Cached early-warning event study (lead time vs. historical drawdowns). + + None until the manual "Event Study" job has run (Admin → Jobs).""" + data = await event_study_service.get_event_study_report(db) + return APIEnvelope(status="success", data=data) diff --git a/app/scheduler.py b/app/scheduler.py index 8f336af..38b310a 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -38,6 +38,7 @@ from app.services.alert_service import dispatch_alerts from app.services.backtest_service import run_and_store as run_backtest_and_store from app.services.market_regime_service import update_market_regime from app.services.regime_monitor_service import update_regime_monitor +from app.services.event_study_service import run_and_store as run_event_study_and_store from app.services.outcome_service import evaluate_pending_setups from app.services.rr_scanner_service import scan_all_tickers from app.services.sentiment_provider_service import build_sentiment_provider @@ -82,6 +83,7 @@ _JOB_NAMES = [ "alerts", "market_regime", "regime_monitor", + "event_study", "backtest", "daily_pipeline", "intraday_pipeline", @@ -871,6 +873,42 @@ async def run_backtest_job() -> None: _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) +# --------------------------------------------------------------------------- +# Job: Event Study (manual) +# --------------------------------------------------------------------------- + + +async def run_event_study_job() -> None: + """Measure indicator lead time vs. historical drawdowns and cache the report. + + Manual only (never auto-fires) — it does a universe-wide OHLCV scan. Triggered + from Admin → Jobs when you want to re-run the early-warning measurement. + """ + job_name = "event_study" + _log_event(logging.INFO, "job_start", job=job_name) + _runtime_start(job_name, total=1) + + try: + async with async_session_factory() as db: + if not await _is_job_enabled(db, job_name): + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") + _runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled") + return + + report = await run_event_study_and_store(db) + + _runtime_progress(job_name, processed=1, total=1) + if report.get("available"): + msg = f"{len(report.get('events', []))} events, lead Δ {report.get('lead_delta_days')}d" + else: + msg = report.get("reason", "no data") + _runtime_finish(job_name, "completed", processed=1, total=1, message=msg) + _log_event(logging.INFO, "job_complete", job=job_name, events=len(report.get("events", []))) + except Exception as exc: + _runtime_finish(job_name, "error", processed=0, total=1, message=str(exc)) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) + + # --------------------------------------------------------------------------- # Job: Ticker Universe Sync # --------------------------------------------------------------------------- @@ -1127,6 +1165,12 @@ def configure_scheduler(schedule_config: dict[str, str] | None = None) -> None: id="data_backfill", name="Data Backfill (deep history)", replace_existing=True, next_run_time=None, ) + # Event study: manual only (universe-wide scan); triggered from Admin → Jobs. + scheduler.add_job( + run_event_study_job, "interval", weeks=520, + id="event_study", name="Event Study", + replace_existing=True, next_run_time=None, + ) _log_event(logging.INFO, "scheduler_configured", timezone=tz, daily_pipeline={ "cron": cfg["schedule_daily_pipeline_cron"], diff --git a/app/services/admin_service.py b/app/services/admin_service.py index 6e2557b..97c7ad3 100644 --- a/app/services/admin_service.py +++ b/app/services/admin_service.py @@ -520,6 +520,7 @@ VALID_JOB_NAMES = { "alerts", "market_regime", "regime_monitor", + "event_study", "backtest", "daily_pipeline", "intraday_pipeline", @@ -536,6 +537,7 @@ JOB_LABELS = { "alerts": "Alerts Dispatcher", "market_regime": "Market Regime", "regime_monitor": "Regime Monitor", + "event_study": "Event Study", "backtest": "Backtest", "daily_pipeline": "Daily Pipeline", "intraday_pipeline": "Intraday Pipeline", diff --git a/app/services/breadth_service.py b/app/services/breadth_service.py new file mode 100644 index 0000000..1d3971d --- /dev/null +++ b/app/services/breadth_service.py @@ -0,0 +1,118 @@ +"""Market-breadth early-warning indicator (from the stored universe OHLCV). + +Breadth is a genuinely *leading* construct: a few mega-caps can keep an index +rising while participation narrows underneath — the classic pre-top divergence. +We measure it from the OHLCV we already store for the whole universe, so it costs +no new data source. + +Two layers: + - breadth = % of the universe trading above its own 200-DMA (0-100). + - divergence = an early-warning score (0-100, high = fragile): the benchmark + price rising *while* breadth falls, plus a nudge for already-low breadth. + +This module only *computes* the indicator. It is deliberately NOT wired into the +live regime index yet — the event study measures whether it actually leads before +it earns any weight. +""" + +from __future__ import annotations + +import logging +from datetime import date + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.ticker import Ticker +from app.services.price_service import query_ohlcv + +logger = logging.getLogger(__name__) + +Series = list[tuple[date, float]] + + +def _breadth_from_closes( + closes_by_symbol: dict[str, Series], window: int = 200, min_tickers: int = 20 +) -> dict[date, float]: + """Pure core: % of symbols above their own rolling SMA(window), per date. + + Each symbol's SMA is computed once with a sliding sum (O(bars)); dates with + fewer than ``min_tickers`` qualifying names are dropped (too thin to trust). + """ + counts: dict[date, list[int]] = {} # date -> [above, total] + for series in closes_by_symbol.values(): + ordered = sorted(series, key=lambda x: x[0]) + dates = [d for d, _ in ordered] + closes = [c for _, c in ordered] + if len(closes) < window: + continue + running = sum(closes[:window]) + for i in range(window - 1, len(closes)): + if i >= window: + running += closes[i] - closes[i - window] + sma = running / window + entry = counts.setdefault(dates[i], [0, 0]) + entry[1] += 1 + if closes[i] > sma: + entry[0] += 1 + return { + d: round(above / total * 100.0, 2) + for d, (above, total) in counts.items() + if total >= min_tickers + } + + +def compute_divergence_series( + breadth: dict[date, float], benchmark_closes: Series, lookback: int = 20 +) -> dict[date, float]: + """Early-warning score (0-100, high = fragile) per date. + + Fragility rises when the benchmark price climbs over ``lookback`` days while + breadth deteriorates over the same window, and is nudged up when the absolute + breadth level is already low. It is the *divergence* (not the level) that + makes this leading. + """ + bench = {d: c for d, c in benchmark_closes} + common = sorted(d for d in bench if d in breadth) + out: dict[date, float] = {} + for i in range(lookback, len(common)): + d, d0 = common[i], common[i - lookback] + price_past = bench[d0] + if price_past <= 0: + continue + price_ret = (bench[d] / price_past - 1.0) * 100.0 # % + breadth_chg = breadth[d] - breadth[d0] # percentage points + raw = price_ret - breadth_chg # price up & breadth down -> large + score = 50.0 + raw * 2.0 + (50.0 - breadth[d]) * 0.4 + out[d] = max(0.0, min(100.0, round(score, 2))) + return out + + +async def _load_universe_closes(db: AsyncSession) -> dict[str, Series]: + result = await db.execute(select(Ticker).order_by(Ticker.symbol)) + closes_by_symbol: dict[str, Series] = {} + for ticker in result.scalars().all(): + try: + records = await query_ohlcv(db, ticker.symbol) + except Exception: + logger.exception("Breadth: OHLCV load failed for %s", ticker.symbol) + continue + if records: + closes_by_symbol[ticker.symbol] = [(r.date, float(r.close)) for r in records] + return closes_by_symbol + + +async def compute_breadth_series( + db: AsyncSession, window: int = 200, min_tickers: int = 20 +) -> dict[date, float]: + """Historical breadth series across the stored universe (for the event study).""" + closes_by_symbol = await _load_universe_closes(db) + return _breadth_from_closes(closes_by_symbol, window, min_tickers) + + +async def compute_breadth_today(db: AsyncSession) -> float | None: + """Latest breadth reading (thin wrapper, for future live use).""" + series = await compute_breadth_series(db) + if not series: + return None + return series[max(series)] diff --git a/app/services/event_study_service.py b/app/services/event_study_service.py new file mode 100644 index 0000000..1861e78 --- /dev/null +++ b/app/services/event_study_service.py @@ -0,0 +1,294 @@ +"""Event study: does a candidate indicator actually *lead* regime breaks? + +This is a backtest-style measurement, but the unit of analysis is **events** +(historical drawdowns), not trades. For each candidate indicator it answers: + - how many days of warning did it give before the break (event-centered)? + - at what false-alarm cost (signal-centered precision/recall vs. the base rate)? + +It compares the breadth-divergence early-warning candidate against a deterministic +**coincident** price composite (the existing regime price sub-scores), so you can +see whether the candidate crosses *earlier*. Everything is price/breadth only — +no LLM/FRED — so the result is reproducible. + +Honest caveat: with only a handful of real drawdowns in ~5y, the sample is tiny +and the numbers are noisy. Read the median lead time as an order of magnitude, and +do NOT overfit thresholds to this history. + +Report is cached in a SystemSetting (mirrors ``backtest_service``); a manual job +(Admin → Jobs) drives it. +""" + +from __future__ import annotations + +import json +import logging +from datetime import date, datetime, timedelta, timezone + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.services import breadth_service, settings_store +from app.services import regime_monitor_service as rms +from app.services.admin_service import update_setting + +logger = logging.getLogger(__name__) + +KEY_REPORT = "regime_event_study" + +# Defaults — admin-tunable later if needed. +EVENT_THRESHOLD_PCT = 15.0 # drawdown from the 52w high that counts as a "break" +RECOVER_PCT = 5.0 # must recover to within this of the high before a new event +DRAWDOWN_LOOKBACK = 252 # 52-week trailing high +HORIZON_DAYS = 20 # signal-centered prediction horizon +WARN_THRESHOLD = 60.0 # indicator level treated as "warning on" +PRE, POST = 60, 20 # event-centered window (trading days) + + +def _median(values: list[float]) -> float | None: + if not values: + return None + s = sorted(values) + n = len(s) + mid = n // 2 + return float(s[mid]) if n % 2 else (s[mid - 1] + s[mid]) / 2.0 + + +# --------------------------------------------------------------------------- +# Event detection +# --------------------------------------------------------------------------- + +def detect_events( + closes: list[float], + dates: list[date], + threshold_pct: float = EVENT_THRESHOLD_PCT, + lookback: int = DRAWDOWN_LOOKBACK, + recover_pct: float = RECOVER_PCT, +) -> list[dict]: + """Drawdown events: ``t0`` = first day the drawdown from the trailing 52w high + crosses ``threshold_pct``. De-duplicated — a new event needs a recovery back to + within ``recover_pct`` of the high first (so one decline = one event).""" + events: list[dict] = [] + in_event = False + for i in range(len(closes)): + window = closes[max(0, i - lookback + 1): i + 1] + hi = max(window) + dd = (hi - closes[i]) / hi * 100.0 if hi > 0 else 0.0 + if not in_event and dd >= threshold_pct: + events.append({"date": dates[i].isoformat(), "index": i, "depth_pct": round(dd, 1)}) + in_event = True + elif in_event and dd <= recover_pct: + in_event = False + return events + + +# --------------------------------------------------------------------------- +# Event-centered: lead time + mean path +# --------------------------------------------------------------------------- + +def event_centered( + indicator: dict[date, float], + events_idx: list[int], + dates: list[date], + pre: int = PRE, + post: int = POST, + threshold: float = WARN_THRESHOLD, +) -> dict: + """Align the indicator at each event's ``t0`` and measure how early it warned. + + Lead = the earliest day within ``[t0-pre, t0]`` at which the indicator first + crosses ``threshold``. Also returns the cross-event mean path. + """ + leads: list[float] = [] + sums: dict[int, float] = {} + counts: dict[int, int] = {} + for t0 in events_idx: + lead: int | None = None + for k in range(0, pre + 1): + idx = t0 - k + if idx < 0: + break + v = indicator.get(dates[idx]) + if v is not None and v >= threshold: + lead = k # keep going: the largest k = earliest warning in the window + if lead is not None: + leads.append(lead) + for rel in range(-pre, post + 1): + idx = t0 + rel + if 0 <= idx < len(dates): + v = indicator.get(dates[idx]) + if v is not None: + sums[rel] = sums.get(rel, 0.0) + v + counts[rel] = counts.get(rel, 0) + 1 + mean_path = [ + {"rel_day": rel, "value": round(sums[rel] / counts[rel], 1)} for rel in sorted(sums) + ] + return { + "median_lead_days": _median(leads), + "events_with_signal": len(leads), + "events_total": len(events_idx), + "mean_path": mean_path, + } + + +# --------------------------------------------------------------------------- +# Signal-centered: precision / recall vs. base rate +# --------------------------------------------------------------------------- + +def signal_centered( + indicator: dict[date, float], + events_idx: list[int], + dates: list[date], + horizon: int = HORIZON_DAYS, + thresholds: list[float] | None = None, +) -> dict: + """Treat ``indicator >= threshold`` as predicting a break within ``horizon`` + days. Sweep thresholds → precision/recall/alarm count, plus the base rate.""" + thresholds = thresholds or [50, 55, 60, 65, 70, 75, 80] + n = len(dates) + labels = [1 if any(i < e <= i + horizon for e in events_idx) else 0 for i in range(n)] + positives = sum(labels) + base_rate = positives / n if n else 0.0 + + rows: list[dict] = [] + for th in thresholds: + tp = fp = fn = 0 + for i in range(n): + v = indicator.get(dates[i]) + if v is None: + continue + pred = v >= th + if pred and labels[i]: + tp += 1 + elif pred and not labels[i]: + fp += 1 + elif not pred and labels[i]: + fn += 1 + precision = tp / (tp + fp) if (tp + fp) else None + recall = tp / (tp + fn) if (tp + fn) else None + rows.append({ + "threshold": th, + "precision": round(precision, 3) if precision is not None else None, + "recall": round(recall, 3) if recall is not None else None, + "alarms": tp + fp, + }) + return {"base_rate": round(base_rate, 3), "horizon_days": horizon, "rows": rows} + + +# --------------------------------------------------------------------------- +# Coincident baseline (deterministic price composite, reusing the regime sub-scores) +# --------------------------------------------------------------------------- + +def _coincident_series(prices: dict[str, list], dates: list[date], config: dict) -> dict[date, float]: + """Mean of the available price sub-scores (P1-P4) as-of each date — the + coincident baseline the leading candidate must beat on lead time.""" + lw = float(config.get("leader_weight", 2.0)) + lb = int(config.get("rs_lookback", 60)) + t = config["tickers"] + smh_full = prices.get(t["leaders"][0], []) if t["leaders"] else [] + qqq_full = prices.get(t["confirm"][0], []) if t["confirm"] else [] + spy_full = prices.get(t["market"], []) + out: dict[date, float] = {} + for d in dates: + smh = rms._closes_asof(smh_full, d) + qqq = rms._closes_asof(qqq_full, d) + spy = rms._closes_asof(spy_full, d) + subs = [ + rms.p1_trend_break(smh, qqq, lw), + rms.p2_death_cross(smh, qqq, lw), + rms.p3_drawdown(smh, qqq), + rms.p4_relative_strength(smh, spy, lb), + ] + vals = [v for v in subs if v is not None] + if vals: + out[d] = round(sum(vals) / len(vals), 2) + return out + + +# --------------------------------------------------------------------------- +# Orchestration +# --------------------------------------------------------------------------- + +async def run_event_study( + db: AsyncSession, + threshold_pct: float = EVENT_THRESHOLD_PCT, + horizon: int = HORIZON_DAYS, + warn_threshold: float = WARN_THRESHOLD, +) -> dict: + """Run the study: detect events on the benchmark, then measure breadth-divergence + vs. the coincident price composite. Best-effort; returns available=False on no data.""" + config = await rms.get_regime_config(db) + end = date.today() + start = end - timedelta(days=5 * 365 + 30) + + prices = await rms._fetch_prices(config, start, end) + leader = config["tickers"]["leaders"][0] if config["tickers"]["leaders"] else "SMH" + bench = sorted(prices.get(leader, []), key=lambda x: x[0]) + if len(bench) < 260: + return {"available": False, "reason": "insufficient benchmark history"} + + dates = [d for d, _ in bench] + closes = [c for _, c in bench] + events = detect_events(closes, dates, threshold_pct) + events_idx = [e["index"] for e in events] + + breadth = await breadth_service.compute_breadth_series(db) + divergence = breadth_service.compute_divergence_series(breadth, bench) + coincident = _coincident_series(prices, dates, config) + + def _evaluate(series: dict[date, float]) -> dict: + return { + **event_centered(series, events_idx, dates, threshold=warn_threshold), + "signal": signal_centered(series, events_idx, dates, horizon), + } + + indicators = { + "breadth_divergence": _evaluate(divergence), + "coincident_price": _evaluate(coincident), + } + + bd = indicators["breadth_divergence"]["median_lead_days"] + cd = indicators["coincident_price"]["median_lead_days"] + lead_delta = (bd - cd) if (bd is not None and cd is not None) else None + + recent_breadth = [ + {"date": d.isoformat(), "breadth": breadth[d], "divergence": divergence.get(d)} + for d in dates[-90:] + if d in breadth + ] + + report = { + "available": True, + "generated_at": datetime.now(timezone.utc).isoformat(), + "params": { + "benchmark": leader, + "event_threshold_pct": threshold_pct, + "horizon_days": horizon, + "warn_threshold": warn_threshold, + }, + "events": events, + "indicators": indicators, + "lead_delta_days": lead_delta, + "recent_breadth": recent_breadth, + } + logger.info(json.dumps({ + "event": "event_study_complete", "events": len(events), + "breadth_lead": bd, "coincident_lead": cd, + })) + return report + + +async def run_and_store(db: AsyncSession) -> dict: + """Run the event study and cache the report in a SystemSetting. Job entrypoint.""" + report = await run_event_study(db) + await update_setting(db, KEY_REPORT, json.dumps(report)) + return report + + +async def get_event_study_report(db: AsyncSession) -> dict | None: + """Return the last cached event-study report, or None if never run.""" + setting = await settings_store.get_setting(db, KEY_REPORT) + if setting is None: + return None + try: + return json.loads(setting.value) + except (TypeError, ValueError): + return None diff --git a/frontend/src/api/regime.ts b/frontend/src/api/regime.ts index 865ac8a..50044de 100644 --- a/frontend/src/api/regime.ts +++ b/frontend/src/api/regime.ts @@ -1,10 +1,19 @@ import apiClient from './client'; -import type { RegimeMonitor, RegimeConfig, RegimeFundamentals } from '../lib/types'; +import type { + RegimeMonitor, + RegimeConfig, + RegimeFundamentals, + EventStudyReport, +} from '../lib/types'; export function getRegimeMonitor() { return apiClient.get('regime/monitor').then((r) => r.data); } +export function getEventStudy() { + return apiClient.get('regime/event-study').then((r) => r.data); +} + export function getRegimeConfig() { return apiClient.get('regime/config').then((r) => r.data); } diff --git a/frontend/src/lib/types.ts b/frontend/src/lib/types.ts index cce7c27..262839a 100644 --- a/frontend/src/lib/types.ts +++ b/frontend/src/lib/types.ts @@ -311,6 +311,38 @@ export interface RegimeConfig { fundamental_staleness_days: number; } +// Event study — measured lead time of early-warning indicators vs. drawdowns +export interface EventStudyLeadStats { + median_lead_days: number | null; + events_with_signal: number; + events_total: number; + mean_path: { rel_day: number; value: number }[]; + signal: { + base_rate: number; + horizon_days: number; + rows: { threshold: number; precision: number | null; recall: number | null; alarms: number }[]; + }; +} + +export interface EventStudyReport { + available: boolean; + reason?: string; + generated_at?: string; + params?: { + benchmark: string; + event_threshold_pct: number; + horizon_days: number; + warn_threshold: number; + }; + events?: { date: string; index: number; depth_pct: number }[]; + indicators?: { + breadth_divergence: EventStudyLeadStats; + coincident_price: EventStudyLeadStats; + }; + lead_delta_days?: number | null; + recent_breadth?: { date: string; breadth: number; divergence: number | null }[]; +} + export interface AlertConfig { enabled: boolean; telegram_chat_id: string; diff --git a/frontend/src/pages/RegimePage.tsx b/frontend/src/pages/RegimePage.tsx index 64bbb49..8d6c581 100644 --- a/frontend/src/pages/RegimePage.tsx +++ b/frontend/src/pages/RegimePage.tsx @@ -13,6 +13,7 @@ import { getRegimeFundamentals, updateRegimeFundamentals, refreshRegimeFundamentals, + getEventStudy, } from '../api/regime'; import type { RegimeBand, @@ -20,6 +21,8 @@ import type { RegimeSignal, RegimeConfig, RegimeFundamentals, + EventStudyReport, + EventStudyLeadStats, } from '../lib/types'; const BAND_STYLES: Record = { @@ -266,6 +269,101 @@ function WeightsEditor({ ); } +function Sparkline({ values, color = '#60a5fa', height = 28 }: { values: number[]; color?: string; height?: number }) { + if (values.length < 2) return null; + const min = Math.min(...values); + const max = Math.max(...values); + const range = max - min || 1; + const w = 120; + const pts = values + .map((v, i) => `${(i / (values.length - 1)) * w},${height - ((v - min) / range) * height}`) + .join(' '); + return ( + + + + ); +} + +function LeadStat({ label, stats, highlight }: { label: string; stats: EventStudyLeadStats; highlight?: boolean }) { + return ( +
+
{label}
+
+ {stats.median_lead_days != null ? `${stats.median_lead_days}d lead` : 'no signal'} +
+
+ {stats.events_with_signal}/{stats.events_total} events warned +
+
+ ); +} + +function EventStudyBody({ report }: { report: EventStudyReport }) { + const bd = report.indicators!.breadth_divergence; + const cd = report.indicators!.coincident_price; + const recent = report.recent_breadth ?? []; + const breadthVals = recent.map((r) => r.breadth); + const divVals = recent.map((r) => r.divergence ?? 0); + const lead = report.lead_delta_days; + return ( +
+

+ {report.events?.length ?? 0} drawdown events (≥{report.params?.event_threshold_pct}%) on{' '} + {report.params?.benchmark} over ~5y. Higher median lead = earlier warning. +

+
+ 0} /> + +
+ {lead != null && ( +

+ Breadth divergence warned a median{' '} + 0 ? 'text-emerald-400' : 'text-amber-400'}`}> + {lead > 0 ? '+' : ''}{lead} days + {' '} + {lead >= 0 ? 'earlier' : 'later'} than the coincident baseline. +

+ )} + {recent.length > 1 && ( +
+
+
Breadth (% > 200d), last 90d
+ +
{breadthVals[breadthVals.length - 1]?.toFixed(0)}%
+
+
+
Divergence (fragility), last 90d
+ +
{divVals[divVals.length - 1]?.toFixed(0)}
+
+
+ )} +

+ Base rate {Math.round(bd.signal.base_rate * 100)}% · horizon {bd.signal.horizon_days}d. Few events in + 5y → noisy; treat lead time as an order of magnitude and don't overfit thresholds. Not yet wired + into the live score. +

+
+ ); +} + +function EventStudyPanel() { + const study = useQuery({ queryKey: ['regime', 'event-study'], queryFn: getEventStudy }); + return ( + + {study.isLoading && } + {study.data === null && ( + Not run yet — trigger the “Event Study” job in Admin → Jobs. + )} + {study.data && !study.data.available && ( + {study.data.reason ?? 'No data'} + )} + {study.data && study.data.available && } + + ); +} + function AdminControls() { const qc = useQueryClient(); const fundamentals = useQuery({ queryKey: ['regime', 'fundamentals'], queryFn: getRegimeFundamentals }); @@ -348,6 +446,8 @@ export default function RegimePage() { )} + + {isAdmin && } ); diff --git a/tests/unit/test_event_study.py b/tests/unit/test_event_study.py new file mode 100644 index 0000000..5d17179 --- /dev/null +++ b/tests/unit/test_event_study.py @@ -0,0 +1,104 @@ +"""Unit tests for the breadth indicator and the event-study measurement.""" + +from __future__ import annotations + +from datetime import date, timedelta + +from app.services.breadth_service import _breadth_from_closes, compute_divergence_series +from app.services.event_study_service import ( + detect_events, + event_centered, + signal_centered, +) + + +def _days(n: int, start: date = date(2021, 1, 1)) -> list[date]: + return [start + timedelta(days=i) for i in range(n)] + + +# --------------------------------------------------------------------------- +# Event detection +# --------------------------------------------------------------------------- + +def test_detect_events_single_drawdown(): + closes = [100.0] * 300 + [85.0] * 5 # 15% off the trailing high -> one event + dates = _days(len(closes)) + events = detect_events(closes, dates, threshold_pct=15.0) + assert len(events) == 1 + assert events[0]["index"] == 300 + + +def test_detect_events_dedup_without_recovery(): + closes = [100.0] * 300 + [85.0] * 5 + [80.0] * 5 # deepens but never recovers + events = detect_events(closes, _days(len(closes)), threshold_pct=15.0) + assert len(events) == 1 + + +def test_detect_events_two_after_recovery(): + closes = [100.0] * 300 + [85.0] * 10 + [100.0] * 300 + [85.0] * 10 + events = detect_events(closes, _days(len(closes)), threshold_pct=15.0) + assert len(events) == 2 + + +# --------------------------------------------------------------------------- +# Event-centered lead time +# --------------------------------------------------------------------------- + +def test_event_centered_lead_time(): + dates = _days(200) + t0 = 120 + # Indicator goes hot 30 days before t0 and stays hot through t0. + indicator = {dates[i]: (70.0 if t0 - 30 <= i <= t0 else 10.0) for i in range(len(dates))} + res = event_centered(indicator, [t0], dates, pre=60, post=20, threshold=60.0) + assert res["median_lead_days"] == 30 + assert res["events_with_signal"] == 1 + + +def test_breadth_divergence_leads_coincident(): + dates = _days(200) + t0 = 120 + breadth_ind = {dates[i]: (70.0 if t0 - 30 <= i <= t0 else 10.0) for i in range(len(dates))} + coincident = {dates[i]: (70.0 if t0 - 2 <= i <= t0 else 10.0) for i in range(len(dates))} + bd = event_centered(breadth_ind, [t0], dates, threshold=60.0) + cd = event_centered(coincident, [t0], dates, threshold=60.0) + assert bd["median_lead_days"] > cd["median_lead_days"] + + +# --------------------------------------------------------------------------- +# Signal-centered precision / recall +# --------------------------------------------------------------------------- + +def test_signal_centered_base_rate_and_recall(): + dates = _days(200) + t0 = 120 + indicator = {dates[i]: (70.0 if t0 - 30 <= i <= t0 else 10.0) for i in range(len(dates))} + res = signal_centered(indicator, [t0], dates, horizon=20) + assert 0.0 < res["base_rate"] < 1.0 + # An aligned indicator should catch some of the pre-event window at a mid threshold. + row60 = next(r for r in res["rows"] if r["threshold"] == 60) + assert row60["recall"] is not None and row60["recall"] > 0 + + +# --------------------------------------------------------------------------- +# Breadth aggregation + divergence +# --------------------------------------------------------------------------- + +def test_breadth_from_closes_fraction_above_sma(): + dates = _days(5) + closes_by_symbol = { + "A": list(zip(dates, [1.0, 2.0, 3.0, 4.0, 5.0])), # rising -> above its SMA + "B": list(zip(dates, [5.0, 4.0, 3.0, 2.0, 1.0])), # falling -> below + "C": list(zip(dates, [3.0, 3.0, 3.0, 3.0, 3.0])), # flat -> not strictly above + } + breadth = _breadth_from_closes(closes_by_symbol, window=3, min_tickers=2) + # At d2: SMA(3) over each -> only A is strictly above -> 1/3. + assert breadth[dates[2]] == round(1 / 3 * 100, 2) + + +def test_divergence_high_when_price_up_breadth_down(): + dates = _days(10) + breadth = {dates[i]: 80.0 - i * 3 for i in range(len(dates))} # falling breadth + benchmark = list(zip(dates, [100.0 + i for i in range(len(dates))])) # rising price + div = compute_divergence_series(breadth, benchmark, lookback=3) + last = div[dates[-1]] + assert last > 50.0 # fragile: price up while breadth deteriorates diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 5541ee7..d5f0276 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -88,6 +88,7 @@ class TestConfigureScheduler: "alerts", "market_regime", "regime_monitor", + "event_study", "backtest", "daily_pipeline", "intraday_pipeline", @@ -109,6 +110,7 @@ class TestConfigureScheduler: "fundamental_collector", "market_regime", "regime_monitor", + "event_study", "outcome_evaluator", "rr_scanner", "sentiment_collector",