feat: sharpen the event study — more events, fair baseline, per-event view
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 41s
Deploy / deploy (push) Successful in 26s

The first run gave only 2 events (N=2 is anecdote, not evidence) and an unfairly
weak coincident baseline, so the +42d lead couldn't be trusted. This makes the
measurement meaningful:

- More, cleaner events: default drawdown threshold 15%→10%, and dedup switched
  from "recover to the high" to a rising-edge + cooldown (40d), so distinct
  drawdowns each register instead of merging.
- Fair comparison: each indicator now warns at its OWN 80th percentile instead of
  a shared absolute 60, removing the artifact that muted the coincident baseline.
- Per-event breakdown (date · depth · breadth lead · coincident lead) so a median
  over a tiny sample can't hide an apples-to-oranges comparison — you see whether
  both warned on the same drawdown.
- Surface precision/recall (best row) + base rate per indicator — the honest edge
  read, not just lead time.

Re-run the Event Study job to regenerate the cached report in the new shape.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-26 14:54:29 +02:00
parent f8d62e4074
commit 7c5fb1138d
4 changed files with 173 additions and 35 deletions
+76 -33
View File
@@ -34,12 +34,15 @@ logger = logging.getLogger(__name__)
KEY_REPORT = "regime_event_study"
# Defaults — admin-tunable later if needed.
EVENT_THRESHOLD_PCT = 15.0 # drawdown from the 52w high that counts as a "break"
RECOVER_PCT = 5.0 # must recover to within this of the high before a new event
# Defaults. The 15% threshold gave only 2 events in 5y (statistically useless),
# so the default is lower with a cooldown-based dedup to surface more, cleaner
# events. Each indicator "warns" at its OWN 80th percentile rather than a shared
# absolute level, so the leading vs. coincident comparison is fair across scales.
EVENT_THRESHOLD_PCT = 10.0 # drawdown from the 52w high that counts as a "break"
COOLDOWN_DAYS = 40 # min trading days between event onsets (dedup)
DRAWDOWN_LOOKBACK = 252 # 52-week trailing high
HORIZON_DAYS = 20 # signal-centered prediction horizon
WARN_THRESHOLD = 60.0 # indicator level treated as "warning on"
WARN_PERCENTILE = 80.0 # each indicator warns at its own Nth percentile
PRE, POST = 60, 20 # event-centered window (trading days)
@@ -52,6 +55,17 @@ def _median(values: list[float]) -> float | None:
return float(s[mid]) if n % 2 else (s[mid - 1] + s[mid]) / 2.0
def _percentile(values: list[float], pct: float) -> float | None:
"""Linear-interpolated percentile of the non-None values."""
vals = sorted(v for v in values if v is not None)
if not vals:
return None
k = (len(vals) - 1) * (pct / 100.0)
lo = int(k)
hi = min(lo + 1, len(vals) - 1)
return vals[lo] + (vals[hi] - vals[lo]) * (k - lo)
# ---------------------------------------------------------------------------
# Event detection
# ---------------------------------------------------------------------------
@@ -61,22 +75,23 @@ def detect_events(
dates: list[date],
threshold_pct: float = EVENT_THRESHOLD_PCT,
lookback: int = DRAWDOWN_LOOKBACK,
recover_pct: float = RECOVER_PCT,
cooldown: int = COOLDOWN_DAYS,
) -> list[dict]:
"""Drawdown events: ``t0`` = first day the drawdown from the trailing 52w high
crosses ``threshold_pct``. De-duplicated — a new event needs a recovery back to
within ``recover_pct`` of the high first (so one decline = one event)."""
"""Drawdown events: ``t0`` = a day the drawdown from the trailing 52w high
crosses up through ``threshold_pct`` (rising edge). De-duplicated by a
``cooldown`` of trading days, so a continuous decline counts once but distinct
drawdowns separated by a recovery each register."""
events: list[dict] = []
in_event = False
prev_dd = 0.0
last_event = -10**9
for i in range(len(closes)):
window = closes[max(0, i - lookback + 1): i + 1]
hi = max(window)
dd = (hi - closes[i]) / hi * 100.0 if hi > 0 else 0.0
if not in_event and dd >= threshold_pct:
if dd >= threshold_pct and prev_dd < threshold_pct and (i - last_event) >= cooldown:
events.append({"date": dates[i].isoformat(), "index": i, "depth_pct": round(dd, 1)})
in_event = True
elif in_event and dd <= recover_pct:
in_event = False
last_event = i
prev_dd = dd
return events
@@ -84,31 +99,38 @@ def detect_events(
# Event-centered: lead time + mean path
# ---------------------------------------------------------------------------
def _lead(indicator: dict[date, float], t0: int, dates: list[date], pre: int, threshold: float) -> int | None:
"""Earliest day within ``[t0-pre, t0]`` at which the indicator crosses
``threshold`` — i.e. how many days of warning before the event, or None."""
lead: int | None = None
for k in range(0, pre + 1):
idx = t0 - k
if idx < 0:
break
v = indicator.get(dates[idx])
if v is not None and v >= threshold:
lead = k # keep going: the largest k = earliest warning in the window
return lead
def event_centered(
indicator: dict[date, float],
events_idx: list[int],
dates: list[date],
pre: int = PRE,
post: int = POST,
threshold: float = WARN_THRESHOLD,
threshold: float = 60.0,
) -> dict:
"""Align the indicator at each event's ``t0`` and measure how early it warned.
Lead = the earliest day within ``[t0-pre, t0]`` at which the indicator first
crosses ``threshold``. Also returns the cross-event mean path.
Lead time is measured against ``threshold`` (each indicator gets its own,
derived from its distribution). Also returns the cross-event mean path.
"""
leads: list[float] = []
sums: dict[int, float] = {}
counts: dict[int, int] = {}
for t0 in events_idx:
lead: int | None = None
for k in range(0, pre + 1):
idx = t0 - k
if idx < 0:
break
v = indicator.get(dates[idx])
if v is not None and v >= threshold:
lead = k # keep going: the largest k = earliest warning in the window
lead = _lead(indicator, t0, dates, pre, threshold)
if lead is not None:
leads.append(lead)
for rel in range(-pre, post + 1):
@@ -125,6 +147,7 @@ def event_centered(
"median_lead_days": _median(leads),
"events_with_signal": len(leads),
"events_total": len(events_idx),
"warn_threshold": round(threshold, 1),
"mean_path": mean_path,
}
@@ -211,7 +234,8 @@ async def run_event_study(
db: AsyncSession,
threshold_pct: float = EVENT_THRESHOLD_PCT,
horizon: int = HORIZON_DAYS,
warn_threshold: float = WARN_THRESHOLD,
cooldown: int = COOLDOWN_DAYS,
warn_percentile: float = WARN_PERCENTILE,
) -> dict:
"""Run the study: detect events on the benchmark, then measure breadth-divergence
vs. the coincident price composite. Best-effort; returns available=False on no data."""
@@ -227,23 +251,40 @@ async def run_event_study(
dates = [d for d, _ in bench]
closes = [c for _, c in bench]
events = detect_events(closes, dates, threshold_pct)
events = detect_events(closes, dates, threshold_pct, cooldown=cooldown)
events_idx = [e["index"] for e in events]
breadth = await breadth_service.compute_breadth_series(db)
divergence = breadth_service.compute_divergence_series(breadth, bench)
coincident = _coincident_series(prices, dates, config)
def _evaluate(series: dict[date, float]) -> dict:
# Each indicator warns at its OWN distribution's percentile, so a leading
# indicator isn't penalised for living on a different scale than the baseline.
warn = {
"breadth_divergence": _percentile(list(divergence.values()), warn_percentile) or 60.0,
"coincident_price": _percentile(list(coincident.values()), warn_percentile) or 60.0,
}
series_by_key = {"breadth_divergence": divergence, "coincident_price": coincident}
def _evaluate(series: dict[date, float], threshold: float) -> dict:
return {
**event_centered(series, events_idx, dates, threshold=warn_threshold),
**event_centered(series, events_idx, dates, threshold=threshold),
"signal": signal_centered(series, events_idx, dates, horizon),
}
indicators = {
"breadth_divergence": _evaluate(divergence),
"coincident_price": _evaluate(coincident),
}
indicators = {key: _evaluate(series_by_key[key], warn[key]) for key in series_by_key}
# Per-event comparison: which event, and each indicator's lead on THAT event —
# so a median over a tiny sample can't hide an apples-to-oranges comparison.
per_event = [
{
"date": e["date"],
"depth_pct": e["depth_pct"],
"breadth_lead": _lead(divergence, e["index"], dates, PRE, warn["breadth_divergence"]),
"coincident_lead": _lead(coincident, e["index"], dates, PRE, warn["coincident_price"]),
}
for e in events
]
bd = indicators["breadth_divergence"]["median_lead_days"]
cd = indicators["coincident_price"]["median_lead_days"]
@@ -261,11 +302,13 @@ async def run_event_study(
"params": {
"benchmark": leader,
"event_threshold_pct": threshold_pct,
"cooldown_days": cooldown,
"horizon_days": horizon,
"warn_threshold": warn_threshold,
"warn_percentile": warn_percentile,
},
"events": events,
"indicators": indicators,
"per_event": per_event,
"lead_delta_days": lead_delta,
"recent_breadth": recent_breadth,
}