Files
signal-platform/app/services/event_study_service.py
T
dennisthiessen 7c5fb1138d
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 41s
Deploy / deploy (push) Successful in 26s
feat: sharpen the event study — more events, fair baseline, per-event view
The first run gave only 2 events (N=2 is anecdote, not evidence) and an unfairly
weak coincident baseline, so the +42d lead couldn't be trusted. This makes the
measurement meaningful:

- More, cleaner events: default drawdown threshold 15%→10%, and dedup switched
  from "recover to the high" to a rising-edge + cooldown (40d), so distinct
  drawdowns each register instead of merging.
- Fair comparison: each indicator now warns at its OWN 80th percentile instead of
  a shared absolute 60, removing the artifact that muted the coincident baseline.
- Per-event breakdown (date · depth · breadth lead · coincident lead) so a median
  over a tiny sample can't hide an apples-to-oranges comparison — you see whether
  both warned on the same drawdown.
- Surface precision/recall (best row) + base rate per indicator — the honest edge
  read, not just lead time.

Re-run the Event Study job to regenerate the cached report in the new shape.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-26 14:54:29 +02:00

338 lines
13 KiB
Python

"""Event study: does a candidate indicator actually *lead* regime breaks?
This is a backtest-style measurement, but the unit of analysis is **events**
(historical drawdowns), not trades. For each candidate indicator it answers:
- how many days of warning did it give before the break (event-centered)?
- at what false-alarm cost (signal-centered precision/recall vs. the base rate)?
It compares the breadth-divergence early-warning candidate against a deterministic
**coincident** price composite (the existing regime price sub-scores), so you can
see whether the candidate crosses *earlier*. Everything is price/breadth only —
no LLM/FRED — so the result is reproducible.
Honest caveat: with only a handful of real drawdowns in ~5y, the sample is tiny
and the numbers are noisy. Read the median lead time as an order of magnitude, and
do NOT overfit thresholds to this history.
Report is cached in a SystemSetting (mirrors ``backtest_service``); a manual job
(Admin → Jobs) drives it.
"""
from __future__ import annotations
import json
import logging
from datetime import date, datetime, timedelta, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from app.services import breadth_service, settings_store
from app.services import regime_monitor_service as rms
from app.services.admin_service import update_setting
logger = logging.getLogger(__name__)
KEY_REPORT = "regime_event_study"
# Defaults. The 15% threshold gave only 2 events in 5y (statistically useless),
# so the default is lower with a cooldown-based dedup to surface more, cleaner
# events. Each indicator "warns" at its OWN 80th percentile rather than a shared
# absolute level, so the leading vs. coincident comparison is fair across scales.
EVENT_THRESHOLD_PCT = 10.0 # drawdown from the 52w high that counts as a "break"
COOLDOWN_DAYS = 40 # min trading days between event onsets (dedup)
DRAWDOWN_LOOKBACK = 252 # 52-week trailing high
HORIZON_DAYS = 20 # signal-centered prediction horizon
WARN_PERCENTILE = 80.0 # each indicator warns at its own Nth percentile
PRE, POST = 60, 20 # event-centered window (trading days)
def _median(values: list[float]) -> float | None:
if not values:
return None
s = sorted(values)
n = len(s)
mid = n // 2
return float(s[mid]) if n % 2 else (s[mid - 1] + s[mid]) / 2.0
def _percentile(values: list[float], pct: float) -> float | None:
"""Linear-interpolated percentile of the non-None values."""
vals = sorted(v for v in values if v is not None)
if not vals:
return None
k = (len(vals) - 1) * (pct / 100.0)
lo = int(k)
hi = min(lo + 1, len(vals) - 1)
return vals[lo] + (vals[hi] - vals[lo]) * (k - lo)
# ---------------------------------------------------------------------------
# Event detection
# ---------------------------------------------------------------------------
def detect_events(
closes: list[float],
dates: list[date],
threshold_pct: float = EVENT_THRESHOLD_PCT,
lookback: int = DRAWDOWN_LOOKBACK,
cooldown: int = COOLDOWN_DAYS,
) -> list[dict]:
"""Drawdown events: ``t0`` = a day the drawdown from the trailing 52w high
crosses up through ``threshold_pct`` (rising edge). De-duplicated by a
``cooldown`` of trading days, so a continuous decline counts once but distinct
drawdowns separated by a recovery each register."""
events: list[dict] = []
prev_dd = 0.0
last_event = -10**9
for i in range(len(closes)):
window = closes[max(0, i - lookback + 1): i + 1]
hi = max(window)
dd = (hi - closes[i]) / hi * 100.0 if hi > 0 else 0.0
if dd >= threshold_pct and prev_dd < threshold_pct and (i - last_event) >= cooldown:
events.append({"date": dates[i].isoformat(), "index": i, "depth_pct": round(dd, 1)})
last_event = i
prev_dd = dd
return events
# ---------------------------------------------------------------------------
# Event-centered: lead time + mean path
# ---------------------------------------------------------------------------
def _lead(indicator: dict[date, float], t0: int, dates: list[date], pre: int, threshold: float) -> int | None:
"""Earliest day within ``[t0-pre, t0]`` at which the indicator crosses
``threshold`` — i.e. how many days of warning before the event, or None."""
lead: int | None = None
for k in range(0, pre + 1):
idx = t0 - k
if idx < 0:
break
v = indicator.get(dates[idx])
if v is not None and v >= threshold:
lead = k # keep going: the largest k = earliest warning in the window
return lead
def event_centered(
indicator: dict[date, float],
events_idx: list[int],
dates: list[date],
pre: int = PRE,
post: int = POST,
threshold: float = 60.0,
) -> dict:
"""Align the indicator at each event's ``t0`` and measure how early it warned.
Lead time is measured against ``threshold`` (each indicator gets its own,
derived from its distribution). Also returns the cross-event mean path.
"""
leads: list[float] = []
sums: dict[int, float] = {}
counts: dict[int, int] = {}
for t0 in events_idx:
lead = _lead(indicator, t0, dates, pre, threshold)
if lead is not None:
leads.append(lead)
for rel in range(-pre, post + 1):
idx = t0 + rel
if 0 <= idx < len(dates):
v = indicator.get(dates[idx])
if v is not None:
sums[rel] = sums.get(rel, 0.0) + v
counts[rel] = counts.get(rel, 0) + 1
mean_path = [
{"rel_day": rel, "value": round(sums[rel] / counts[rel], 1)} for rel in sorted(sums)
]
return {
"median_lead_days": _median(leads),
"events_with_signal": len(leads),
"events_total": len(events_idx),
"warn_threshold": round(threshold, 1),
"mean_path": mean_path,
}
# ---------------------------------------------------------------------------
# Signal-centered: precision / recall vs. base rate
# ---------------------------------------------------------------------------
def signal_centered(
indicator: dict[date, float],
events_idx: list[int],
dates: list[date],
horizon: int = HORIZON_DAYS,
thresholds: list[float] | None = None,
) -> dict:
"""Treat ``indicator >= threshold`` as predicting a break within ``horizon``
days. Sweep thresholds → precision/recall/alarm count, plus the base rate."""
thresholds = thresholds or [50, 55, 60, 65, 70, 75, 80]
n = len(dates)
labels = [1 if any(i < e <= i + horizon for e in events_idx) else 0 for i in range(n)]
positives = sum(labels)
base_rate = positives / n if n else 0.0
rows: list[dict] = []
for th in thresholds:
tp = fp = fn = 0
for i in range(n):
v = indicator.get(dates[i])
if v is None:
continue
pred = v >= th
if pred and labels[i]:
tp += 1
elif pred and not labels[i]:
fp += 1
elif not pred and labels[i]:
fn += 1
precision = tp / (tp + fp) if (tp + fp) else None
recall = tp / (tp + fn) if (tp + fn) else None
rows.append({
"threshold": th,
"precision": round(precision, 3) if precision is not None else None,
"recall": round(recall, 3) if recall is not None else None,
"alarms": tp + fp,
})
return {"base_rate": round(base_rate, 3), "horizon_days": horizon, "rows": rows}
# ---------------------------------------------------------------------------
# Coincident baseline (deterministic price composite, reusing the regime sub-scores)
# ---------------------------------------------------------------------------
def _coincident_series(prices: dict[str, list], dates: list[date], config: dict) -> dict[date, float]:
"""Mean of the available price sub-scores (P1-P4) as-of each date — the
coincident baseline the leading candidate must beat on lead time."""
lw = float(config.get("leader_weight", 2.0))
lb = int(config.get("rs_lookback", 60))
t = config["tickers"]
smh_full = prices.get(t["leaders"][0], []) if t["leaders"] else []
qqq_full = prices.get(t["confirm"][0], []) if t["confirm"] else []
spy_full = prices.get(t["market"], [])
out: dict[date, float] = {}
for d in dates:
smh = rms._closes_asof(smh_full, d)
qqq = rms._closes_asof(qqq_full, d)
spy = rms._closes_asof(spy_full, d)
subs = [
rms.p1_trend_break(smh, qqq, lw),
rms.p2_death_cross(smh, qqq, lw),
rms.p3_drawdown(smh, qqq),
rms.p4_relative_strength(smh, spy, lb),
]
vals = [v for v in subs if v is not None]
if vals:
out[d] = round(sum(vals) / len(vals), 2)
return out
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
async def run_event_study(
db: AsyncSession,
threshold_pct: float = EVENT_THRESHOLD_PCT,
horizon: int = HORIZON_DAYS,
cooldown: int = COOLDOWN_DAYS,
warn_percentile: float = WARN_PERCENTILE,
) -> dict:
"""Run the study: detect events on the benchmark, then measure breadth-divergence
vs. the coincident price composite. Best-effort; returns available=False on no data."""
config = await rms.get_regime_config(db)
end = date.today()
start = end - timedelta(days=5 * 365 + 30)
prices = await rms._fetch_prices(config, start, end)
leader = config["tickers"]["leaders"][0] if config["tickers"]["leaders"] else "SMH"
bench = sorted(prices.get(leader, []), key=lambda x: x[0])
if len(bench) < 260:
return {"available": False, "reason": "insufficient benchmark history"}
dates = [d for d, _ in bench]
closes = [c for _, c in bench]
events = detect_events(closes, dates, threshold_pct, cooldown=cooldown)
events_idx = [e["index"] for e in events]
breadth = await breadth_service.compute_breadth_series(db)
divergence = breadth_service.compute_divergence_series(breadth, bench)
coincident = _coincident_series(prices, dates, config)
# Each indicator warns at its OWN distribution's percentile, so a leading
# indicator isn't penalised for living on a different scale than the baseline.
warn = {
"breadth_divergence": _percentile(list(divergence.values()), warn_percentile) or 60.0,
"coincident_price": _percentile(list(coincident.values()), warn_percentile) or 60.0,
}
series_by_key = {"breadth_divergence": divergence, "coincident_price": coincident}
def _evaluate(series: dict[date, float], threshold: float) -> dict:
return {
**event_centered(series, events_idx, dates, threshold=threshold),
"signal": signal_centered(series, events_idx, dates, horizon),
}
indicators = {key: _evaluate(series_by_key[key], warn[key]) for key in series_by_key}
# Per-event comparison: which event, and each indicator's lead on THAT event —
# so a median over a tiny sample can't hide an apples-to-oranges comparison.
per_event = [
{
"date": e["date"],
"depth_pct": e["depth_pct"],
"breadth_lead": _lead(divergence, e["index"], dates, PRE, warn["breadth_divergence"]),
"coincident_lead": _lead(coincident, e["index"], dates, PRE, warn["coincident_price"]),
}
for e in events
]
bd = indicators["breadth_divergence"]["median_lead_days"]
cd = indicators["coincident_price"]["median_lead_days"]
lead_delta = (bd - cd) if (bd is not None and cd is not None) else None
recent_breadth = [
{"date": d.isoformat(), "breadth": breadth[d], "divergence": divergence.get(d)}
for d in dates[-90:]
if d in breadth
]
report = {
"available": True,
"generated_at": datetime.now(timezone.utc).isoformat(),
"params": {
"benchmark": leader,
"event_threshold_pct": threshold_pct,
"cooldown_days": cooldown,
"horizon_days": horizon,
"warn_percentile": warn_percentile,
},
"events": events,
"indicators": indicators,
"per_event": per_event,
"lead_delta_days": lead_delta,
"recent_breadth": recent_breadth,
}
logger.info(json.dumps({
"event": "event_study_complete", "events": len(events),
"breadth_lead": bd, "coincident_lead": cd,
}))
return report
async def run_and_store(db: AsyncSession) -> dict:
"""Run the event study and cache the report in a SystemSetting. Job entrypoint."""
report = await run_event_study(db)
await update_setting(db, KEY_REPORT, json.dumps(report))
return report
async def get_event_study_report(db: AsyncSession) -> dict | None:
"""Return the last cached event-study report, or None if never run."""
setting = await settings_store.get_setting(db, KEY_REPORT)
if setting is None:
return None
try:
return json.loads(setting.value)
except (TypeError, ValueError):
return None