feat: breadth-divergence early-warning indicator + event study

Adds a leading-by-construction candidate and the harness to measure whether it
actually leads regime breaks, before any of it earns weight in the live index.

- breadth_service: % of the stored universe above its own 200-DMA + a divergence
  score (benchmark price up while breadth falls, nudged by low breadth). Genuinely
  leading because it keys on divergence, not level. Not wired into the live score.
- event_study_service: detect drawdown events on the benchmark, then measure each
  indicator's median lead time (event-centered) and precision/recall vs. the base
  rate (signal-centered). Compares breadth-divergence against the deterministic
  coincident price composite (reuses the regime price sub-scores). Price/breadth
  only — reproducible, no LLM/FRED.
- Manual "Event Study" job (Admin → Jobs), GET /regime/event-study, and an
  inline early-warning panel on the Regime tab with an honest small-sample caveat.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-26 14:08:52 +02:00
parent ebff19940b
commit 824c15cf69
10 changed files with 719 additions and 2 deletions
+13 -1
View File
@@ -7,7 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.dependencies import get_db, require_access, require_admin
from app.models.user import User
from app.schemas.common import APIEnvelope
from app.services import regime_monitor_service
from app.services import event_study_service, regime_monitor_service
from app.services.backtest_service import get_backtest_report
from app.services.market_regime_service import get_market_regime
@@ -117,3 +117,15 @@ async def refresh_regime_fundamentals(
"""Ask the configured LLM to re-estimate F1/F3 now (forces past a lock)."""
data = await regime_monitor_service.refresh_fundamental_overrides(db, force=True)
return APIEnvelope(status="success", data=data)
@router.get("/regime/event-study", response_model=APIEnvelope)
async def regime_event_study(
_user: User = Depends(require_access),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
"""Cached early-warning event study (lead time vs. historical drawdowns).
None until the manual "Event Study" job has run (Admin → Jobs)."""
data = await event_study_service.get_event_study_report(db)
return APIEnvelope(status="success", data=data)
+44
View File
@@ -38,6 +38,7 @@ from app.services.alert_service import dispatch_alerts
from app.services.backtest_service import run_and_store as run_backtest_and_store
from app.services.market_regime_service import update_market_regime
from app.services.regime_monitor_service import update_regime_monitor
from app.services.event_study_service import run_and_store as run_event_study_and_store
from app.services.outcome_service import evaluate_pending_setups
from app.services.rr_scanner_service import scan_all_tickers
from app.services.sentiment_provider_service import build_sentiment_provider
@@ -82,6 +83,7 @@ _JOB_NAMES = [
"alerts",
"market_regime",
"regime_monitor",
"event_study",
"backtest",
"daily_pipeline",
"intraday_pipeline",
@@ -871,6 +873,42 @@ async def run_backtest_job() -> None:
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Event Study (manual)
# ---------------------------------------------------------------------------
async def run_event_study_job() -> None:
"""Measure indicator lead time vs. historical drawdowns and cache the report.
Manual only (never auto-fires) — it does a universe-wide OHLCV scan. Triggered
from Admin → Jobs when you want to re-run the early-warning measurement.
"""
job_name = "event_study"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
report = await run_event_study_and_store(db)
_runtime_progress(job_name, processed=1, total=1)
if report.get("available"):
msg = f"{len(report.get('events', []))} events, lead Δ {report.get('lead_delta_days')}d"
else:
msg = report.get("reason", "no data")
_runtime_finish(job_name, "completed", processed=1, total=1, message=msg)
_log_event(logging.INFO, "job_complete", job=job_name, events=len(report.get("events", [])))
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Ticker Universe Sync
# ---------------------------------------------------------------------------
@@ -1127,6 +1165,12 @@ def configure_scheduler(schedule_config: dict[str, str] | None = None) -> None:
id="data_backfill", name="Data Backfill (deep history)",
replace_existing=True, next_run_time=None,
)
# Event study: manual only (universe-wide scan); triggered from Admin → Jobs.
scheduler.add_job(
run_event_study_job, "interval", weeks=520,
id="event_study", name="Event Study",
replace_existing=True, next_run_time=None,
)
_log_event(logging.INFO, "scheduler_configured", timezone=tz, daily_pipeline={
"cron": cfg["schedule_daily_pipeline_cron"],
+2
View File
@@ -520,6 +520,7 @@ VALID_JOB_NAMES = {
"alerts",
"market_regime",
"regime_monitor",
"event_study",
"backtest",
"daily_pipeline",
"intraday_pipeline",
@@ -536,6 +537,7 @@ JOB_LABELS = {
"alerts": "Alerts Dispatcher",
"market_regime": "Market Regime",
"regime_monitor": "Regime Monitor",
"event_study": "Event Study",
"backtest": "Backtest",
"daily_pipeline": "Daily Pipeline",
"intraday_pipeline": "Intraday Pipeline",
+118
View File
@@ -0,0 +1,118 @@
"""Market-breadth early-warning indicator (from the stored universe OHLCV).
Breadth is a genuinely *leading* construct: a few mega-caps can keep an index
rising while participation narrows underneath — the classic pre-top divergence.
We measure it from the OHLCV we already store for the whole universe, so it costs
no new data source.
Two layers:
- breadth = % of the universe trading above its own 200-DMA (0-100).
- divergence = an early-warning score (0-100, high = fragile): the benchmark
price rising *while* breadth falls, plus a nudge for already-low breadth.
This module only *computes* the indicator. It is deliberately NOT wired into the
live regime index yet — the event study measures whether it actually leads before
it earns any weight.
"""
from __future__ import annotations
import logging
from datetime import date
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ticker import Ticker
from app.services.price_service import query_ohlcv
logger = logging.getLogger(__name__)
Series = list[tuple[date, float]]
def _breadth_from_closes(
closes_by_symbol: dict[str, Series], window: int = 200, min_tickers: int = 20
) -> dict[date, float]:
"""Pure core: % of symbols above their own rolling SMA(window), per date.
Each symbol's SMA is computed once with a sliding sum (O(bars)); dates with
fewer than ``min_tickers`` qualifying names are dropped (too thin to trust).
"""
counts: dict[date, list[int]] = {} # date -> [above, total]
for series in closes_by_symbol.values():
ordered = sorted(series, key=lambda x: x[0])
dates = [d for d, _ in ordered]
closes = [c for _, c in ordered]
if len(closes) < window:
continue
running = sum(closes[:window])
for i in range(window - 1, len(closes)):
if i >= window:
running += closes[i] - closes[i - window]
sma = running / window
entry = counts.setdefault(dates[i], [0, 0])
entry[1] += 1
if closes[i] > sma:
entry[0] += 1
return {
d: round(above / total * 100.0, 2)
for d, (above, total) in counts.items()
if total >= min_tickers
}
def compute_divergence_series(
breadth: dict[date, float], benchmark_closes: Series, lookback: int = 20
) -> dict[date, float]:
"""Early-warning score (0-100, high = fragile) per date.
Fragility rises when the benchmark price climbs over ``lookback`` days while
breadth deteriorates over the same window, and is nudged up when the absolute
breadth level is already low. It is the *divergence* (not the level) that
makes this leading.
"""
bench = {d: c for d, c in benchmark_closes}
common = sorted(d for d in bench if d in breadth)
out: dict[date, float] = {}
for i in range(lookback, len(common)):
d, d0 = common[i], common[i - lookback]
price_past = bench[d0]
if price_past <= 0:
continue
price_ret = (bench[d] / price_past - 1.0) * 100.0 # %
breadth_chg = breadth[d] - breadth[d0] # percentage points
raw = price_ret - breadth_chg # price up & breadth down -> large
score = 50.0 + raw * 2.0 + (50.0 - breadth[d]) * 0.4
out[d] = max(0.0, min(100.0, round(score, 2)))
return out
async def _load_universe_closes(db: AsyncSession) -> dict[str, Series]:
result = await db.execute(select(Ticker).order_by(Ticker.symbol))
closes_by_symbol: dict[str, Series] = {}
for ticker in result.scalars().all():
try:
records = await query_ohlcv(db, ticker.symbol)
except Exception:
logger.exception("Breadth: OHLCV load failed for %s", ticker.symbol)
continue
if records:
closes_by_symbol[ticker.symbol] = [(r.date, float(r.close)) for r in records]
return closes_by_symbol
async def compute_breadth_series(
db: AsyncSession, window: int = 200, min_tickers: int = 20
) -> dict[date, float]:
"""Historical breadth series across the stored universe (for the event study)."""
closes_by_symbol = await _load_universe_closes(db)
return _breadth_from_closes(closes_by_symbol, window, min_tickers)
async def compute_breadth_today(db: AsyncSession) -> float | None:
"""Latest breadth reading (thin wrapper, for future live use)."""
series = await compute_breadth_series(db)
if not series:
return None
return series[max(series)]
+294
View File
@@ -0,0 +1,294 @@
"""Event study: does a candidate indicator actually *lead* regime breaks?
This is a backtest-style measurement, but the unit of analysis is **events**
(historical drawdowns), not trades. For each candidate indicator it answers:
- how many days of warning did it give before the break (event-centered)?
- at what false-alarm cost (signal-centered precision/recall vs. the base rate)?
It compares the breadth-divergence early-warning candidate against a deterministic
**coincident** price composite (the existing regime price sub-scores), so you can
see whether the candidate crosses *earlier*. Everything is price/breadth only —
no LLM/FRED — so the result is reproducible.
Honest caveat: with only a handful of real drawdowns in ~5y, the sample is tiny
and the numbers are noisy. Read the median lead time as an order of magnitude, and
do NOT overfit thresholds to this history.
Report is cached in a SystemSetting (mirrors ``backtest_service``); a manual job
(Admin → Jobs) drives it.
"""
from __future__ import annotations
import json
import logging
from datetime import date, datetime, timedelta, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from app.services import breadth_service, settings_store
from app.services import regime_monitor_service as rms
from app.services.admin_service import update_setting
logger = logging.getLogger(__name__)
KEY_REPORT = "regime_event_study"
# Defaults — admin-tunable later if needed.
EVENT_THRESHOLD_PCT = 15.0 # drawdown from the 52w high that counts as a "break"
RECOVER_PCT = 5.0 # must recover to within this of the high before a new event
DRAWDOWN_LOOKBACK = 252 # 52-week trailing high
HORIZON_DAYS = 20 # signal-centered prediction horizon
WARN_THRESHOLD = 60.0 # indicator level treated as "warning on"
PRE, POST = 60, 20 # event-centered window (trading days)
def _median(values: list[float]) -> float | None:
if not values:
return None
s = sorted(values)
n = len(s)
mid = n // 2
return float(s[mid]) if n % 2 else (s[mid - 1] + s[mid]) / 2.0
# ---------------------------------------------------------------------------
# Event detection
# ---------------------------------------------------------------------------
def detect_events(
closes: list[float],
dates: list[date],
threshold_pct: float = EVENT_THRESHOLD_PCT,
lookback: int = DRAWDOWN_LOOKBACK,
recover_pct: float = RECOVER_PCT,
) -> list[dict]:
"""Drawdown events: ``t0`` = first day the drawdown from the trailing 52w high
crosses ``threshold_pct``. De-duplicated — a new event needs a recovery back to
within ``recover_pct`` of the high first (so one decline = one event)."""
events: list[dict] = []
in_event = False
for i in range(len(closes)):
window = closes[max(0, i - lookback + 1): i + 1]
hi = max(window)
dd = (hi - closes[i]) / hi * 100.0 if hi > 0 else 0.0
if not in_event and dd >= threshold_pct:
events.append({"date": dates[i].isoformat(), "index": i, "depth_pct": round(dd, 1)})
in_event = True
elif in_event and dd <= recover_pct:
in_event = False
return events
# ---------------------------------------------------------------------------
# Event-centered: lead time + mean path
# ---------------------------------------------------------------------------
def event_centered(
indicator: dict[date, float],
events_idx: list[int],
dates: list[date],
pre: int = PRE,
post: int = POST,
threshold: float = WARN_THRESHOLD,
) -> dict:
"""Align the indicator at each event's ``t0`` and measure how early it warned.
Lead = the earliest day within ``[t0-pre, t0]`` at which the indicator first
crosses ``threshold``. Also returns the cross-event mean path.
"""
leads: list[float] = []
sums: dict[int, float] = {}
counts: dict[int, int] = {}
for t0 in events_idx:
lead: int | None = None
for k in range(0, pre + 1):
idx = t0 - k
if idx < 0:
break
v = indicator.get(dates[idx])
if v is not None and v >= threshold:
lead = k # keep going: the largest k = earliest warning in the window
if lead is not None:
leads.append(lead)
for rel in range(-pre, post + 1):
idx = t0 + rel
if 0 <= idx < len(dates):
v = indicator.get(dates[idx])
if v is not None:
sums[rel] = sums.get(rel, 0.0) + v
counts[rel] = counts.get(rel, 0) + 1
mean_path = [
{"rel_day": rel, "value": round(sums[rel] / counts[rel], 1)} for rel in sorted(sums)
]
return {
"median_lead_days": _median(leads),
"events_with_signal": len(leads),
"events_total": len(events_idx),
"mean_path": mean_path,
}
# ---------------------------------------------------------------------------
# Signal-centered: precision / recall vs. base rate
# ---------------------------------------------------------------------------
def signal_centered(
indicator: dict[date, float],
events_idx: list[int],
dates: list[date],
horizon: int = HORIZON_DAYS,
thresholds: list[float] | None = None,
) -> dict:
"""Treat ``indicator >= threshold`` as predicting a break within ``horizon``
days. Sweep thresholds → precision/recall/alarm count, plus the base rate."""
thresholds = thresholds or [50, 55, 60, 65, 70, 75, 80]
n = len(dates)
labels = [1 if any(i < e <= i + horizon for e in events_idx) else 0 for i in range(n)]
positives = sum(labels)
base_rate = positives / n if n else 0.0
rows: list[dict] = []
for th in thresholds:
tp = fp = fn = 0
for i in range(n):
v = indicator.get(dates[i])
if v is None:
continue
pred = v >= th
if pred and labels[i]:
tp += 1
elif pred and not labels[i]:
fp += 1
elif not pred and labels[i]:
fn += 1
precision = tp / (tp + fp) if (tp + fp) else None
recall = tp / (tp + fn) if (tp + fn) else None
rows.append({
"threshold": th,
"precision": round(precision, 3) if precision is not None else None,
"recall": round(recall, 3) if recall is not None else None,
"alarms": tp + fp,
})
return {"base_rate": round(base_rate, 3), "horizon_days": horizon, "rows": rows}
# ---------------------------------------------------------------------------
# Coincident baseline (deterministic price composite, reusing the regime sub-scores)
# ---------------------------------------------------------------------------
def _coincident_series(prices: dict[str, list], dates: list[date], config: dict) -> dict[date, float]:
"""Mean of the available price sub-scores (P1-P4) as-of each date — the
coincident baseline the leading candidate must beat on lead time."""
lw = float(config.get("leader_weight", 2.0))
lb = int(config.get("rs_lookback", 60))
t = config["tickers"]
smh_full = prices.get(t["leaders"][0], []) if t["leaders"] else []
qqq_full = prices.get(t["confirm"][0], []) if t["confirm"] else []
spy_full = prices.get(t["market"], [])
out: dict[date, float] = {}
for d in dates:
smh = rms._closes_asof(smh_full, d)
qqq = rms._closes_asof(qqq_full, d)
spy = rms._closes_asof(spy_full, d)
subs = [
rms.p1_trend_break(smh, qqq, lw),
rms.p2_death_cross(smh, qqq, lw),
rms.p3_drawdown(smh, qqq),
rms.p4_relative_strength(smh, spy, lb),
]
vals = [v for v in subs if v is not None]
if vals:
out[d] = round(sum(vals) / len(vals), 2)
return out
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
async def run_event_study(
db: AsyncSession,
threshold_pct: float = EVENT_THRESHOLD_PCT,
horizon: int = HORIZON_DAYS,
warn_threshold: float = WARN_THRESHOLD,
) -> dict:
"""Run the study: detect events on the benchmark, then measure breadth-divergence
vs. the coincident price composite. Best-effort; returns available=False on no data."""
config = await rms.get_regime_config(db)
end = date.today()
start = end - timedelta(days=5 * 365 + 30)
prices = await rms._fetch_prices(config, start, end)
leader = config["tickers"]["leaders"][0] if config["tickers"]["leaders"] else "SMH"
bench = sorted(prices.get(leader, []), key=lambda x: x[0])
if len(bench) < 260:
return {"available": False, "reason": "insufficient benchmark history"}
dates = [d for d, _ in bench]
closes = [c for _, c in bench]
events = detect_events(closes, dates, threshold_pct)
events_idx = [e["index"] for e in events]
breadth = await breadth_service.compute_breadth_series(db)
divergence = breadth_service.compute_divergence_series(breadth, bench)
coincident = _coincident_series(prices, dates, config)
def _evaluate(series: dict[date, float]) -> dict:
return {
**event_centered(series, events_idx, dates, threshold=warn_threshold),
"signal": signal_centered(series, events_idx, dates, horizon),
}
indicators = {
"breadth_divergence": _evaluate(divergence),
"coincident_price": _evaluate(coincident),
}
bd = indicators["breadth_divergence"]["median_lead_days"]
cd = indicators["coincident_price"]["median_lead_days"]
lead_delta = (bd - cd) if (bd is not None and cd is not None) else None
recent_breadth = [
{"date": d.isoformat(), "breadth": breadth[d], "divergence": divergence.get(d)}
for d in dates[-90:]
if d in breadth
]
report = {
"available": True,
"generated_at": datetime.now(timezone.utc).isoformat(),
"params": {
"benchmark": leader,
"event_threshold_pct": threshold_pct,
"horizon_days": horizon,
"warn_threshold": warn_threshold,
},
"events": events,
"indicators": indicators,
"lead_delta_days": lead_delta,
"recent_breadth": recent_breadth,
}
logger.info(json.dumps({
"event": "event_study_complete", "events": len(events),
"breadth_lead": bd, "coincident_lead": cd,
}))
return report
async def run_and_store(db: AsyncSession) -> dict:
"""Run the event study and cache the report in a SystemSetting. Job entrypoint."""
report = await run_event_study(db)
await update_setting(db, KEY_REPORT, json.dumps(report))
return report
async def get_event_study_report(db: AsyncSession) -> dict | None:
"""Return the last cached event-study report, or None if never run."""
setting = await settings_store.get_setting(db, KEY_REPORT)
if setting is None:
return None
try:
return json.loads(setting.value)
except (TypeError, ValueError):
return None
+10 -1
View File
@@ -1,10 +1,19 @@
import apiClient from './client';
import type { RegimeMonitor, RegimeConfig, RegimeFundamentals } from '../lib/types';
import type {
RegimeMonitor,
RegimeConfig,
RegimeFundamentals,
EventStudyReport,
} from '../lib/types';
export function getRegimeMonitor() {
return apiClient.get<RegimeMonitor>('regime/monitor').then((r) => r.data);
}
export function getEventStudy() {
return apiClient.get<EventStudyReport | null>('regime/event-study').then((r) => r.data);
}
export function getRegimeConfig() {
return apiClient.get<RegimeConfig>('regime/config').then((r) => r.data);
}
+32
View File
@@ -311,6 +311,38 @@ export interface RegimeConfig {
fundamental_staleness_days: number;
}
// Event study — measured lead time of early-warning indicators vs. drawdowns
export interface EventStudyLeadStats {
median_lead_days: number | null;
events_with_signal: number;
events_total: number;
mean_path: { rel_day: number; value: number }[];
signal: {
base_rate: number;
horizon_days: number;
rows: { threshold: number; precision: number | null; recall: number | null; alarms: number }[];
};
}
export interface EventStudyReport {
available: boolean;
reason?: string;
generated_at?: string;
params?: {
benchmark: string;
event_threshold_pct: number;
horizon_days: number;
warn_threshold: number;
};
events?: { date: string; index: number; depth_pct: number }[];
indicators?: {
breadth_divergence: EventStudyLeadStats;
coincident_price: EventStudyLeadStats;
};
lead_delta_days?: number | null;
recent_breadth?: { date: string; breadth: number; divergence: number | null }[];
}
export interface AlertConfig {
enabled: boolean;
telegram_chat_id: string;
+100
View File
@@ -13,6 +13,7 @@ import {
getRegimeFundamentals,
updateRegimeFundamentals,
refreshRegimeFundamentals,
getEventStudy,
} from '../api/regime';
import type {
RegimeBand,
@@ -20,6 +21,8 @@ import type {
RegimeSignal,
RegimeConfig,
RegimeFundamentals,
EventStudyReport,
EventStudyLeadStats,
} from '../lib/types';
const BAND_STYLES: Record<RegimeBand, { text: string; bar: string; ring: string; label: string }> = {
@@ -266,6 +269,101 @@ function WeightsEditor({
);
}
function Sparkline({ values, color = '#60a5fa', height = 28 }: { values: number[]; color?: string; height?: number }) {
if (values.length < 2) return null;
const min = Math.min(...values);
const max = Math.max(...values);
const range = max - min || 1;
const w = 120;
const pts = values
.map((v, i) => `${(i / (values.length - 1)) * w},${height - ((v - min) / range) * height}`)
.join(' ');
return (
<svg width={w} height={height}>
<polyline points={pts} fill="none" stroke={color} strokeWidth={1.5} />
</svg>
);
}
function LeadStat({ label, stats, highlight }: { label: string; stats: EventStudyLeadStats; highlight?: boolean }) {
return (
<div className={`rounded-lg border px-3 py-2 ${highlight ? 'border-blue-400/30 bg-blue-400/[0.06]' : 'border-white/[0.06] bg-white/[0.02]'}`}>
<div className="text-xs text-gray-500">{label}</div>
<div className="mt-0.5 text-lg font-semibold text-gray-200">
{stats.median_lead_days != null ? `${stats.median_lead_days}d lead` : 'no signal'}
</div>
<div className="text-[11px] text-gray-600">
{stats.events_with_signal}/{stats.events_total} events warned
</div>
</div>
);
}
function EventStudyBody({ report }: { report: EventStudyReport }) {
const bd = report.indicators!.breadth_divergence;
const cd = report.indicators!.coincident_price;
const recent = report.recent_breadth ?? [];
const breadthVals = recent.map((r) => r.breadth);
const divVals = recent.map((r) => r.divergence ?? 0);
const lead = report.lead_delta_days;
return (
<div className="space-y-4">
<p className="text-xs text-gray-500">
{report.events?.length ?? 0} drawdown events (≥{report.params?.event_threshold_pct}%) on{' '}
{report.params?.benchmark} over ~5y. Higher median lead = earlier warning.
</p>
<div className="grid grid-cols-1 gap-2 sm:grid-cols-2">
<LeadStat label="Breadth divergence (leading candidate)" stats={bd} highlight={lead != null && lead > 0} />
<LeadStat label="Coincident price composite (baseline)" stats={cd} />
</div>
{lead != null && (
<p className="text-xs text-gray-400">
Breadth divergence warned a median{' '}
<span className={`font-medium ${lead > 0 ? 'text-emerald-400' : 'text-amber-400'}`}>
{lead > 0 ? '+' : ''}{lead} days
</span>{' '}
{lead >= 0 ? 'earlier' : 'later'} than the coincident baseline.
</p>
)}
{recent.length > 1 && (
<div className="flex flex-wrap items-end gap-6">
<div>
<div className="text-[11px] text-gray-500">Breadth (% &gt; 200d), last 90d</div>
<Sparkline values={breadthVals} color="#34d399" />
<div className="num text-xs text-gray-400">{breadthVals[breadthVals.length - 1]?.toFixed(0)}%</div>
</div>
<div>
<div className="text-[11px] text-gray-500">Divergence (fragility), last 90d</div>
<Sparkline values={divVals} color="#fb923c" />
<div className="num text-xs text-gray-400">{divVals[divVals.length - 1]?.toFixed(0)}</div>
</div>
</div>
)}
<p className="text-[11px] leading-relaxed text-gray-600">
Base rate {Math.round(bd.signal.base_rate * 100)}% · horizon {bd.signal.horizon_days}d. Few events in
5y → noisy; treat lead time as an order of magnitude and don&apos;t overfit thresholds. Not yet wired
into the live score.
</p>
</div>
);
}
function EventStudyPanel() {
const study = useQuery({ queryKey: ['regime', 'event-study'], queryFn: getEventStudy });
return (
<Disclosure summary="Early-warning study — measured lead time vs. drawdowns">
{study.isLoading && <SkeletonCard className="h-24" />}
{study.data === null && (
<Callout variant="empty">Not run yet — trigger the “Event Study” job in Admin → Jobs.</Callout>
)}
{study.data && !study.data.available && (
<Callout variant="warning">{study.data.reason ?? 'No data'}</Callout>
)}
{study.data && study.data.available && <EventStudyBody report={study.data} />}
</Disclosure>
);
}
function AdminControls() {
const qc = useQueryClient();
const fundamentals = useQuery({ queryKey: ['regime', 'fundamentals'], queryFn: getRegimeFundamentals });
@@ -348,6 +446,8 @@ export default function RegimePage() {
</>
)}
<EventStudyPanel />
{isAdmin && <AdminControls />}
</div>
);
+104
View File
@@ -0,0 +1,104 @@
"""Unit tests for the breadth indicator and the event-study measurement."""
from __future__ import annotations
from datetime import date, timedelta
from app.services.breadth_service import _breadth_from_closes, compute_divergence_series
from app.services.event_study_service import (
detect_events,
event_centered,
signal_centered,
)
def _days(n: int, start: date = date(2021, 1, 1)) -> list[date]:
return [start + timedelta(days=i) for i in range(n)]
# ---------------------------------------------------------------------------
# Event detection
# ---------------------------------------------------------------------------
def test_detect_events_single_drawdown():
closes = [100.0] * 300 + [85.0] * 5 # 15% off the trailing high -> one event
dates = _days(len(closes))
events = detect_events(closes, dates, threshold_pct=15.0)
assert len(events) == 1
assert events[0]["index"] == 300
def test_detect_events_dedup_without_recovery():
closes = [100.0] * 300 + [85.0] * 5 + [80.0] * 5 # deepens but never recovers
events = detect_events(closes, _days(len(closes)), threshold_pct=15.0)
assert len(events) == 1
def test_detect_events_two_after_recovery():
closes = [100.0] * 300 + [85.0] * 10 + [100.0] * 300 + [85.0] * 10
events = detect_events(closes, _days(len(closes)), threshold_pct=15.0)
assert len(events) == 2
# ---------------------------------------------------------------------------
# Event-centered lead time
# ---------------------------------------------------------------------------
def test_event_centered_lead_time():
dates = _days(200)
t0 = 120
# Indicator goes hot 30 days before t0 and stays hot through t0.
indicator = {dates[i]: (70.0 if t0 - 30 <= i <= t0 else 10.0) for i in range(len(dates))}
res = event_centered(indicator, [t0], dates, pre=60, post=20, threshold=60.0)
assert res["median_lead_days"] == 30
assert res["events_with_signal"] == 1
def test_breadth_divergence_leads_coincident():
dates = _days(200)
t0 = 120
breadth_ind = {dates[i]: (70.0 if t0 - 30 <= i <= t0 else 10.0) for i in range(len(dates))}
coincident = {dates[i]: (70.0 if t0 - 2 <= i <= t0 else 10.0) for i in range(len(dates))}
bd = event_centered(breadth_ind, [t0], dates, threshold=60.0)
cd = event_centered(coincident, [t0], dates, threshold=60.0)
assert bd["median_lead_days"] > cd["median_lead_days"]
# ---------------------------------------------------------------------------
# Signal-centered precision / recall
# ---------------------------------------------------------------------------
def test_signal_centered_base_rate_and_recall():
dates = _days(200)
t0 = 120
indicator = {dates[i]: (70.0 if t0 - 30 <= i <= t0 else 10.0) for i in range(len(dates))}
res = signal_centered(indicator, [t0], dates, horizon=20)
assert 0.0 < res["base_rate"] < 1.0
# An aligned indicator should catch some of the pre-event window at a mid threshold.
row60 = next(r for r in res["rows"] if r["threshold"] == 60)
assert row60["recall"] is not None and row60["recall"] > 0
# ---------------------------------------------------------------------------
# Breadth aggregation + divergence
# ---------------------------------------------------------------------------
def test_breadth_from_closes_fraction_above_sma():
dates = _days(5)
closes_by_symbol = {
"A": list(zip(dates, [1.0, 2.0, 3.0, 4.0, 5.0])), # rising -> above its SMA
"B": list(zip(dates, [5.0, 4.0, 3.0, 2.0, 1.0])), # falling -> below
"C": list(zip(dates, [3.0, 3.0, 3.0, 3.0, 3.0])), # flat -> not strictly above
}
breadth = _breadth_from_closes(closes_by_symbol, window=3, min_tickers=2)
# At d2: SMA(3) over each -> only A is strictly above -> 1/3.
assert breadth[dates[2]] == round(1 / 3 * 100, 2)
def test_divergence_high_when_price_up_breadth_down():
dates = _days(10)
breadth = {dates[i]: 80.0 - i * 3 for i in range(len(dates))} # falling breadth
benchmark = list(zip(dates, [100.0 + i for i in range(len(dates))])) # rising price
div = compute_divergence_series(breadth, benchmark, lookback=3)
last = div[dates[-1]]
assert last > 50.0 # fragile: price up while breadth deteriorates
+2
View File
@@ -88,6 +88,7 @@ class TestConfigureScheduler:
"alerts",
"market_regime",
"regime_monitor",
"event_study",
"backtest",
"daily_pipeline",
"intraday_pipeline",
@@ -109,6 +110,7 @@ class TestConfigureScheduler:
"fundamental_collector",
"market_regime",
"regime_monitor",
"event_study",
"outcome_evaluator",
"rr_scanner",
"sentiment_collector",