fix: populate early-warning/combined on the latest snapshot + recent history
The early-warning score showed n/a because it required an exact date match between the live benchmark (Alpaca, may have today's bar) and the stored universe breadth (DB, often a day behind), which blanked the newest snapshot — the one the UI displays. - Look up the divergence as-of the snapshot date (newest value within a 7-day lag) instead of requiring an exact match. - Backfill early_warning + combined onto recent existing snapshots (the index history predates this signal) so the 7/30-day trends populate on the first run rather than only filling in over the coming weeks. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -545,6 +545,9 @@ async def update_regime_monitor(db: AsyncSession, backfill_days: int = 90) -> di
|
|||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning("Regime monitor: breadth/divergence skipped: %s", exc)
|
logger.warning("Regime monitor: breadth/divergence skipped: %s", exc)
|
||||||
divergence = {}
|
divergence = {}
|
||||||
|
# As-of lookup: the stored universe (breadth) can lag the live benchmark date
|
||||||
|
# by a day or two, so an exact-date match would blank the newest snapshot.
|
||||||
|
div_items = sorted(divergence.items())
|
||||||
cw = config.get("combined_weights") or {"coincident": 0.6, "early_warning": 0.4}
|
cw = config.get("combined_weights") or {"coincident": 0.6, "early_warning": 0.4}
|
||||||
|
|
||||||
dates = {latest_date}
|
dates = {latest_date}
|
||||||
@@ -555,9 +558,26 @@ async def update_regime_monitor(db: AsyncSession, backfill_days: int = 90) -> di
|
|||||||
latest_result: dict | None = None
|
latest_result: dict | None = None
|
||||||
for d in sorted(dates):
|
for d in sorted(dates):
|
||||||
result = _compute_index(prices, vix_series, oas_series, overrides, config, d)
|
result = _compute_index(prices, vix_series, oas_series, overrides, config, d)
|
||||||
_attach_early_warning(result, divergence.get(d), cw)
|
_attach_early_warning(result, _divergence_asof(div_items, d), cw)
|
||||||
await _upsert_snapshot(db, result)
|
await _upsert_snapshot(db, result)
|
||||||
latest_result = result
|
latest_result = result
|
||||||
|
|
||||||
|
# Backfill early-warning + combined onto recent existing snapshots (e.g. the
|
||||||
|
# index history written before this signal existed) so their 7/30-day trends
|
||||||
|
# populate immediately rather than only filling in over the coming weeks.
|
||||||
|
if div_items:
|
||||||
|
recent = await db.execute(
|
||||||
|
select(RegimeSnapshot).where(RegimeSnapshot.date >= end - timedelta(days=120))
|
||||||
|
)
|
||||||
|
for row in recent.scalars().all():
|
||||||
|
try:
|
||||||
|
res = json.loads(row.breakdown_json)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
continue
|
||||||
|
if (res.get("early_warning") or {}).get("score") is not None:
|
||||||
|
continue
|
||||||
|
_attach_early_warning(res, _divergence_asof(div_items, row.date), cw)
|
||||||
|
row.breakdown_json = json.dumps(res)
|
||||||
await db.commit()
|
await db.commit()
|
||||||
|
|
||||||
logger.info(json.dumps({
|
logger.info(json.dumps({
|
||||||
@@ -569,6 +589,20 @@ async def update_regime_monitor(db: AsyncSession, backfill_days: int = 90) -> di
|
|||||||
return latest_result or {"available": False, "reason": "no data"}
|
return latest_result or {"available": False, "reason": "no data"}
|
||||||
|
|
||||||
|
|
||||||
|
def _divergence_asof(div_items: list[tuple[date, float]], as_of: date, max_lag_days: int = 7) -> float | None:
|
||||||
|
"""Latest divergence value on/before ``as_of``, tolerating a small data lag
|
||||||
|
between the live benchmark and the stored universe. None if too stale/absent."""
|
||||||
|
chosen: tuple[date, float] | None = None
|
||||||
|
for d, v in div_items:
|
||||||
|
if d <= as_of:
|
||||||
|
chosen = (d, v)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
if chosen is None or (as_of - chosen[0]).days > max_lag_days:
|
||||||
|
return None
|
||||||
|
return chosen[1]
|
||||||
|
|
||||||
|
|
||||||
def _attach_early_warning(result: dict, ew: float | None, weights: dict) -> None:
|
def _attach_early_warning(result: dict, ew: float | None, weights: dict) -> None:
|
||||||
"""Attach the separate early-warning score and a combined blend to a snapshot.
|
"""Attach the separate early-warning score and a combined blend to a snapshot.
|
||||||
|
|
||||||
|
|||||||
@@ -53,6 +53,15 @@ def test_attach_early_warning_none_falls_back_to_index():
|
|||||||
assert result["combined"]["score"] == 80.0 # no early warning -> just the index
|
assert result["combined"]["score"] == 80.0 # no early warning -> just the index
|
||||||
|
|
||||||
|
|
||||||
|
def test_divergence_asof_tolerates_small_lag():
|
||||||
|
from app.services.regime_monitor_service import _divergence_asof
|
||||||
|
items = [(date(2026, 6, 1), 55.0), (date(2026, 6, 3), 60.0)]
|
||||||
|
assert _divergence_asof(items, date(2026, 6, 3)) == 60.0 # exact date
|
||||||
|
assert _divergence_asof(items, date(2026, 6, 4)) == 60.0 # 1-day lag -> newest
|
||||||
|
assert _divergence_asof(items, date(2026, 6, 20)) is None # too stale
|
||||||
|
assert _divergence_asof([], date(2026, 6, 3)) is None
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Price sub-scores
|
# Price sub-scores
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user