feat: separate live early-warning + combined score on the regime tab
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 40s
Deploy / deploy (push) Successful in 24s

The event study showed the breadth-divergence signal genuinely leads (warned
before 7/11 drawdowns, ~6 weeks median, where the coincident baseline almost
never did). Surface it live to observe before deciding how to embed it — kept
separate from the index, not folded into its weights.

- regime_monitor daily job now computes breadth-divergence live and attaches a
  separate early_warning score plus a combined blend (weighted mean, default
  0.6/0.4, configurable via combined_weights) to each snapshot, including the
  backfill so the 7/30-day trends populate immediately. Stored in breakdown_json
  — no schema change. Best-effort: a breadth failure can't break the index.
- get_regime_monitor returns the index, early_warning, and combined scores each
  with 7/30-day deltas.
- Regime tab shows three gauges (generalized ScoreGauge): coincident index,
  early warning, and a compact combined blend. Stale snapshots render "—".

Note: the daily regime job now also does a universe-wide breadth scan.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-26 15:23:37 +02:00
parent 7c5fb1138d
commit 613fc756ec
4 changed files with 205 additions and 55 deletions
+70 -10
View File
@@ -38,7 +38,7 @@ from app.config import settings
from app.exceptions import ProviderError
from app.models.regime_snapshot import RegimeSnapshot
from app.providers.alpaca import AlpacaOHLCVProvider
from app.services import settings_store
from app.services import breadth_service, settings_store
from app.services.admin_service import update_setting
from app.services.sentiment_provider_service import _resolve as resolve_llm_config
@@ -65,6 +65,11 @@ DEFAULT_CONFIG: dict = {
"F1": 25, "F2": 15, "F3": 8, "F4": 7,
},
"alert_threshold": 65,
# Observational early-warning blend: a small Combined score = weighted mean of
# the coincident index and the breadth-divergence early-warning score. Kept
# separate from the index weights above so the early-warning side stays
# decoupled until proven. Tunable; need not sum to 1 (normalised).
"combined_weights": {"coincident": 0.6, "early_warning": 0.4},
"leader_weight": 2.0, # SMH counts 2x vs QQQ where both feed a signal
"rs_lookback": 60, # trading days for relative-strength / breadth trend
"fundamental_staleness_days": 80,
@@ -530,6 +535,18 @@ async def update_regime_monitor(db: AsyncSession, backfill_days: int = 90) -> di
leader_series = prices.get(leader or "", [])
latest_date = leader_series[-1][0] if leader_series else end
# Early-warning signal: breadth-divergence over the stored universe (leads but
# noisy). Computed once here so the daily job carries it live, as a SEPARATE
# score next to the coincident index — not folded into the index weights.
# Best-effort: a breadth failure must not stop the index update.
try:
breadth = await breadth_service.compute_breadth_series(db)
divergence = breadth_service.compute_divergence_series(breadth, sorted(leader_series))
except Exception as exc:
logger.warning("Regime monitor: breadth/divergence skipped: %s", exc)
divergence = {}
cw = config.get("combined_weights") or {"coincident": 0.6, "early_warning": 0.4}
dates = {latest_date}
if await _snapshot_count(db) < 5 and leader_series:
cutoff = end - timedelta(days=backfill_days)
@@ -538,6 +555,7 @@ async def update_regime_monitor(db: AsyncSession, backfill_days: int = 90) -> di
latest_result: dict | None = None
for d in sorted(dates):
result = _compute_index(prices, vix_series, oas_series, overrides, config, d)
_attach_early_warning(result, divergence.get(d), cw)
await _upsert_snapshot(db, result)
latest_result = result
await db.commit()
@@ -551,19 +569,51 @@ async def update_regime_monitor(db: AsyncSession, backfill_days: int = 90) -> di
return latest_result or {"available": False, "reason": "no data"}
async def _score_at_or_before(db: AsyncSession, target: date) -> float | None:
def _attach_early_warning(result: dict, ew: float | None, weights: dict) -> None:
"""Attach the separate early-warning score and a combined blend to a snapshot.
``ew`` is the breadth-divergence value as-of this date (or None). The combined
score is a normalised weighted mean of the coincident index and the early
warning — observational, kept apart from the index itself.
"""
result["early_warning"] = {
"score": round(ew, 1) if ew is not None else None,
"band": band_for(ew) if ew is not None else None,
}
if ew is None:
combined = result["total_score"]
else:
wc = float(weights.get("coincident", 0.6))
we = float(weights.get("early_warning", 0.4))
wsum = (wc + we) or 1.0
combined = (result["total_score"] * wc + ew * we) / wsum
result["combined"] = {"score": round(combined, 1), "band": band_for(combined)}
async def _result_at_or_before(db: AsyncSession, target: date) -> dict | None:
"""Parsed snapshot result for the latest date on/before ``target``."""
res = await db.execute(
select(RegimeSnapshot.total_score)
select(RegimeSnapshot.breakdown_json)
.where(RegimeSnapshot.date <= target)
.order_by(RegimeSnapshot.date.desc())
.limit(1)
)
val = res.scalar_one_or_none()
return float(val) if val is not None else None
raw = res.scalar_one_or_none()
if raw is None:
return None
try:
return json.loads(raw)
except (TypeError, ValueError):
return None
def _delta(curr: float | None, prev: float | None) -> float | None:
return round(curr - prev, 1) if (curr is not None and prev is not None) else None
async def get_regime_monitor(db: AsyncSession) -> dict:
"""Latest snapshot result + 7/30-day trend deltas. Cheap (one+ row reads)."""
"""Latest snapshot + 7/30-day trend deltas for the index, early-warning, and
combined scores. Cheap (a few row reads)."""
res = await db.execute(
select(RegimeSnapshot).order_by(RegimeSnapshot.date.desc()).limit(1)
)
@@ -577,13 +627,23 @@ async def get_regime_monitor(db: AsyncSession) -> dict:
result = {"date": latest.date.isoformat(), "total_score": latest.total_score,
"band": latest.band, "breakdown": []}
score_7 = await _score_at_or_before(db, latest.date - timedelta(days=7))
score_30 = await _score_at_or_before(db, latest.date - timedelta(days=30))
r7 = await _result_at_or_before(db, latest.date - timedelta(days=7))
r30 = await _result_at_or_before(db, latest.date - timedelta(days=30))
def _nested(r: dict | None, key: str) -> float | None:
return (r.get(key) or {}).get("score") if r else None
result["available"] = True
cur_total = result.get("total_score")
result["trend"] = {
"delta_7": round(latest.total_score - score_7, 1) if score_7 is not None else None,
"delta_30": round(latest.total_score - score_30, 1) if score_30 is not None else None,
"delta_7": _delta(cur_total, (r7 or {}).get("total_score")),
"delta_30": _delta(cur_total, (r30 or {}).get("total_score")),
}
for key in ("early_warning", "combined"):
block = result.get(key) or {"score": None, "band": None}
block["delta_7"] = _delta(block.get("score"), _nested(r7, key))
block["delta_30"] = _delta(block.get("score"), _nested(r30, key))
result[key] = block
return result