deepen OHLCV history + make the factor-IC pass honest about overlap/regime
Two changes so the cross-sectional signal results can actually be trusted. (a) History depth — the binding constraint. Ingestion defaulted to 365 days, so long-lookback factors (12-month momentum, 52-week high) were only computable on a handful of weeks at the tail, and every IC reflected a single market regime. - New `settings.ohlcv_history_days` (default 1825 ≈ 5y); new tickers backfill this far instead of 1 year. - New manual "data_backfill" job (Admin → Jobs) re-fetches the full window for every ticker, ignoring incremental resume — run once to deepen existing 1-year histories. Idempotent (upsert); resumes after rate limits. (b) Factor-IC honesty. The IC was averaged over weekly rebalances whose 30-day forward windows overlap, inflating the t-stat ~sqrt(6)x. - IC now measured on NON-OVERLAPPING windows (weeks thinned to ~HORIZON apart). - Each signal carries a `reliable` flag (>= 12 independent windows); BacktestPanel greys out and de-stars thin signals so a lucky 9-week IC of 0.3 can't masquerade as an edge. 332 backend tests pass; frontend build clean. No migration (config + job + an added JSON field on the cached backtest report). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -67,6 +67,12 @@ class Settings(BaseSettings):
|
||||
# Outcome evaluation: trading days before an undecided setup expires
|
||||
outcome_evaluation_max_bars: int = 30
|
||||
|
||||
# OHLCV history depth to fetch. New tickers backfill this far; the manual
|
||||
# "data_backfill" job re-fetches the full window for everyone. ~5 years so
|
||||
# long-lookback factors (12-month momentum, 52-week high) and multi-regime
|
||||
# backtests become computable. ~252 trading days/year.
|
||||
ohlcv_history_days: int = 1825
|
||||
|
||||
# Database Pool
|
||||
db_pool_size: int = 5
|
||||
db_pool_timeout: int = 30
|
||||
|
||||
+43
-4
@@ -65,6 +65,7 @@ scheduler = AsyncIOScheduler(
|
||||
# Track last successful ticker per job for rate-limit resume
|
||||
_last_successful: dict[str, str | None] = {
|
||||
"data_collector": None,
|
||||
"data_backfill": None,
|
||||
"sentiment_collector": None,
|
||||
"fundamental_collector": None,
|
||||
}
|
||||
@@ -81,6 +82,17 @@ _job_runtime: dict[str, dict[str, object]] = {
|
||||
"finished_at": None,
|
||||
"message": None,
|
||||
},
|
||||
"data_backfill": {
|
||||
"running": False,
|
||||
"status": "idle",
|
||||
"processed": 0,
|
||||
"total": None,
|
||||
"progress_pct": None,
|
||||
"current_ticker": None,
|
||||
"started_at": None,
|
||||
"finished_at": None,
|
||||
"message": None,
|
||||
},
|
||||
"sentiment_collector": {
|
||||
"running": False,
|
||||
"status": "idle",
|
||||
@@ -392,16 +404,20 @@ def _chunked(symbols: list[str], chunk_size: int) -> list[list[str]]:
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def collect_ohlcv() -> None:
|
||||
async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_collector") -> None:
|
||||
"""Fetch latest daily OHLCV for all tracked tickers.
|
||||
|
||||
Uses AlpacaOHLCVProvider. Processes each ticker independently.
|
||||
On rate limit, records last successful ticker for resume.
|
||||
Start date is resolved by ingestion progress:
|
||||
- existing ticker: resume from last_ingested_date + 1
|
||||
- new ticker: backfill ~1 year by default
|
||||
- new ticker: backfill the configured history window
|
||||
|
||||
``full_backfill`` forces every ticker to re-fetch the full
|
||||
``settings.ohlcv_history_days`` window (ignoring incremental resume) — used by
|
||||
the manual data_backfill job to deepen shallow histories. ``job_name`` lets the
|
||||
backfill report its own runtime/resume state separate from data_collector.
|
||||
"""
|
||||
job_name = "data_collector"
|
||||
logger.info(json.dumps({"event": "job_start", "job": job_name}))
|
||||
_runtime_start(job_name)
|
||||
processed = 0
|
||||
@@ -437,13 +453,18 @@ async def collect_ohlcv() -> None:
|
||||
return
|
||||
|
||||
end_date = date.today()
|
||||
# Full backfill: pass an explicit start_date so fetch_and_ingest re-pulls
|
||||
# the whole window instead of resuming from the last stored bar.
|
||||
backfill_start = (
|
||||
end_date - timedelta(days=settings.ohlcv_history_days) if full_backfill else None
|
||||
)
|
||||
|
||||
for symbol in symbols:
|
||||
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
|
||||
async with async_session_factory() as db:
|
||||
try:
|
||||
result = await ingestion_service.fetch_and_ingest(
|
||||
db, provider, symbol, start_date=None, end_date=end_date,
|
||||
db, provider, symbol, start_date=backfill_start, end_date=end_date,
|
||||
)
|
||||
_last_successful[job_name] = symbol
|
||||
processed += 1
|
||||
@@ -477,6 +498,17 @@ async def collect_ohlcv() -> None:
|
||||
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
|
||||
|
||||
|
||||
async def backfill_ohlcv() -> None:
|
||||
"""Deep historical backfill: re-fetch the full ``settings.ohlcv_history_days``
|
||||
window for every ticker, ignoring incremental resume.
|
||||
|
||||
Manual/triggered job (Admin → Jobs). Run once to deepen the ~1-year histories
|
||||
so long-lookback factors (12-month momentum, 52-week high) and multi-regime
|
||||
backtests become computable. Idempotent (upsert); resumes after rate limits.
|
||||
"""
|
||||
await collect_ohlcv(full_backfill=True, job_name="data_backfill")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Job: Sentiment Collector
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -1227,6 +1259,13 @@ def configure_scheduler(schedule_config: dict[str, str] | None = None) -> None:
|
||||
run_backtest_job, "interval", hours=168,
|
||||
id="backtest", name="Backtest", replace_existing=True,
|
||||
)
|
||||
# Deep history backfill: manual only (never auto-fires); triggered from
|
||||
# Admin → Jobs when histories need deepening.
|
||||
scheduler.add_job(
|
||||
backfill_ohlcv, "interval", weeks=520,
|
||||
id="data_backfill", name="Data Backfill (deep history)",
|
||||
replace_existing=True, next_run_time=None,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
json.dumps({
|
||||
|
||||
@@ -538,6 +538,7 @@ async def get_pipeline_readiness(db: AsyncSession) -> list[dict]:
|
||||
|
||||
VALID_JOB_NAMES = {
|
||||
"data_collector",
|
||||
"data_backfill",
|
||||
"sentiment_collector",
|
||||
"fundamental_collector",
|
||||
"rr_scanner",
|
||||
@@ -552,6 +553,7 @@ VALID_JOB_NAMES = {
|
||||
|
||||
JOB_LABELS = {
|
||||
"data_collector": "Data Collector (OHLCV)",
|
||||
"data_backfill": "Data Backfill (deep history)",
|
||||
"sentiment_collector": "Sentiment Collector",
|
||||
"fundamental_collector": "Fundamental Collector",
|
||||
"rr_scanner": "R:R Scanner",
|
||||
|
||||
@@ -79,7 +79,8 @@ _CAL_BUCKETS = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100.01)]
|
||||
# ranking stocks by this signal sort tomorrow's winners from losers. This is the
|
||||
# test the per-setup hit-rate report can't do: it measures predictive power of a
|
||||
# signal, not the outcome of a target/stop structure built on top of one.
|
||||
MIN_CROSS_SECTION = 20 # min tickers present in a week to score that week
|
||||
MIN_CROSS_SECTION = 20 # min tickers present in a week to score that week
|
||||
MIN_RELIABLE_PERIODS = 12 # min non-overlapping windows before a signal's IC is trusted
|
||||
|
||||
|
||||
def _wrap_levels(level_dicts: list[dict]) -> list[Any]:
|
||||
@@ -407,26 +408,53 @@ def _quintile_spread(pairs: list[tuple[float, float]]) -> float | None:
|
||||
return sum(p[1] for p in top) / k - sum(p[1] for p in bottom) / k
|
||||
|
||||
|
||||
def _week_ordinal(week_key: tuple[int, int]) -> int:
|
||||
"""Monotonic absolute week number from an (ISO year, ISO week) key."""
|
||||
year, week = week_key
|
||||
return year * 53 + week
|
||||
|
||||
|
||||
def _nonoverlapping_weeks(
|
||||
week_keys: list[tuple[int, int]], stride: int
|
||||
) -> list[tuple[int, int]]:
|
||||
"""Thin to weeks at least ``stride`` apart so their forward windows don't
|
||||
overlap — greedy earliest-first. Removes the autocorrelation that would
|
||||
otherwise inflate the IC t-stat across adjacent weekly rebalances."""
|
||||
kept: list[tuple[int, int]] = []
|
||||
last: int | None = None
|
||||
for wk in sorted(week_keys, key=_week_ordinal):
|
||||
o = _week_ordinal(wk)
|
||||
if last is None or o - last >= stride:
|
||||
kept.append(wk)
|
||||
last = o
|
||||
return kept
|
||||
|
||||
|
||||
def _signal_evaluation(collected: dict) -> list[dict]:
|
||||
"""Per-signal factor diagnostics, one row per candidate signal:
|
||||
|
||||
mean_ic average weekly rank-IC (Spearman of signal vs fwd ret)
|
||||
mean_ic average rank-IC (Spearman of signal vs fwd ret)
|
||||
ic_t_stat mean_ic / stderr — is the IC reliably non-zero?
|
||||
ic_positive_pct share of weeks the IC is positive (consistency)
|
||||
ic_positive_pct share of windows the IC is positive (consistency)
|
||||
mean_quintile_spread avg top-minus-bottom-quintile forward return
|
||||
reliable True once there are >= MIN_RELIABLE_PERIODS windows
|
||||
|
||||
A signal with no edge lands near IC 0 and spread 0. Caveat: weekly rebalances
|
||||
with a HORIZON-day forward window overlap, so the t-stat overstates
|
||||
significance — read it as directional, alongside ic_positive_pct.
|
||||
IC is measured on NON-OVERLAPPING forward windows (weeks thinned to ~HORIZON
|
||||
apart) so the t-stat isn't inflated by autocorrelation. A signal with no edge
|
||||
lands near IC 0 / spread 0; one with too few independent windows is flagged
|
||||
unreliable rather than trusted on a lucky handful.
|
||||
"""
|
||||
stride = max(1, round(HORIZON / 5)) # ISO weeks spanned by the forward window
|
||||
rows: list[dict] = []
|
||||
for name in sorted(collected):
|
||||
weeks_map = collected[name]
|
||||
usable = [wk for wk, recs in weeks_map.items() if len(recs) >= MIN_CROSS_SECTION]
|
||||
kept = _nonoverlapping_weeks(usable, stride)
|
||||
ics: list[float] = []
|
||||
spreads: list[float] = []
|
||||
sizes: list[int] = []
|
||||
for recs in collected[name].values():
|
||||
if len(recs) < MIN_CROSS_SECTION:
|
||||
continue
|
||||
for wk in kept:
|
||||
recs = weeks_map[wk]
|
||||
ic = _spearman([r[0] for r in recs], [r[1] for r in recs])
|
||||
if ic is not None:
|
||||
ics.append(ic)
|
||||
@@ -450,6 +478,7 @@ def _signal_evaluation(collected: dict) -> list[dict]:
|
||||
"ic_t_stat": round(t_stat, 2) if t_stat is not None else None,
|
||||
"ic_positive_pct": round(sum(1 for x in ics if x > 0) / len(ics) * 100, 1),
|
||||
"mean_quintile_spread": round(sum(spreads) / len(spreads), 4) if spreads else None,
|
||||
"reliable": len(ics) >= MIN_RELIABLE_PERIODS,
|
||||
})
|
||||
rows.sort(key=lambda r: r["mean_ic"], reverse=True)
|
||||
return rows
|
||||
@@ -518,12 +547,13 @@ async def run_backtest(
|
||||
"signal_eval": _signal_evaluation(collected),
|
||||
"signal_eval_note": (
|
||||
"Cross-sectional rank-IC of price-only signals vs the forward "
|
||||
f"{HORIZON}-day return (weekly rebalance, min {MIN_CROSS_SECTION} "
|
||||
"names/week). |IC| ≳ 0.03 with a consistent sign is a real (if small) "
|
||||
"edge; near 0 means ranking on it sorts nothing. Momentum factors and "
|
||||
"high_52w are expected positive; reversal_1m and vol_6m are expected "
|
||||
"negative (mean-reversion / low-vol anomaly). Overlapping windows inflate "
|
||||
"the t-stat — read directionally."
|
||||
f"{HORIZON}-day return (min {MIN_CROSS_SECTION} names/window). |IC| ≳ "
|
||||
"0.03 with a consistent sign is a real (if small) edge; near 0 means "
|
||||
"ranking on it sorts nothing. Momentum factors and high_52w are expected "
|
||||
"positive; reversal_1m and vol_6m expected negative (mean-reversion / "
|
||||
"low-vol anomaly). IC is measured on non-overlapping windows; signals "
|
||||
f"with fewer than {MIN_RELIABLE_PERIODS} independent windows are flagged "
|
||||
"unreliable (too few regimes — deepen history with the Data Backfill job)."
|
||||
),
|
||||
"note": (
|
||||
"Sentiment & fundamentals held neutral (no point-in-time history). "
|
||||
|
||||
@@ -12,6 +12,7 @@ from datetime import date, timedelta
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import settings
|
||||
from app.exceptions import NotFoundError, ProviderError, RateLimitError
|
||||
from app.models.ohlcv import OHLCVRecord
|
||||
from app.models.settings import IngestionProgress
|
||||
@@ -92,20 +93,23 @@ async def fetch_and_ingest(
|
||||
if end_date is None:
|
||||
end_date = date.today()
|
||||
|
||||
# Resolve start_date: use progress resume or default to 1 year ago.
|
||||
# If we have too little history, force a one-year backfill even if
|
||||
# ingestion progress exists (upsert makes this safe and idempotent).
|
||||
# Resolve start_date: use progress resume or backfill the configured history
|
||||
# window. If we have too little history, force a full backfill even if
|
||||
# ingestion progress exists (upsert makes this safe and idempotent). A caller
|
||||
# that passes an explicit start_date (e.g. the manual deep-backfill job)
|
||||
# bypasses this entirely.
|
||||
if start_date is None:
|
||||
progress = await _get_progress(db, ticker.id)
|
||||
bar_count = await _get_ohlcv_bar_count(db, ticker.id)
|
||||
minimum_backfill_bars = 200
|
||||
backfill_start = end_date - timedelta(days=settings.ohlcv_history_days)
|
||||
|
||||
if bar_count < minimum_backfill_bars:
|
||||
start_date = end_date - timedelta(days=365)
|
||||
start_date = backfill_start
|
||||
elif progress is not None:
|
||||
start_date = progress.last_ingested_date + timedelta(days=1)
|
||||
else:
|
||||
start_date = end_date - timedelta(days=365)
|
||||
start_date = backfill_start
|
||||
|
||||
# If start > end, nothing to fetch
|
||||
if start_date > end_date:
|
||||
|
||||
@@ -277,11 +277,12 @@ export function BacktestPanel() {
|
||||
</p>
|
||||
<p className="mb-2 text-[11px] text-gray-500">
|
||||
Does ranking the universe by a signal predict the forward {report.params.horizon_days}-day
|
||||
return? Mean IC is the rank correlation between signal and return, averaged over weekly
|
||||
rebalances. <span className="text-emerald-400">|IC| ≳ {IC_EDGE_THRESHOLD}</span> with a
|
||||
return? Mean IC is the rank correlation between signal and return, averaged over
|
||||
non-overlapping windows. <span className="text-emerald-400">|IC| ≳ {IC_EDGE_THRESHOLD}</span> with a
|
||||
consistent sign (high IC>0 %) is a real, if small, edge; near 0 means it sorts nothing.
|
||||
Momentum skips the last month; <em>reversal_1m is expected negative</em> if the universe
|
||||
mean-reverts. Q5−Q1 is the top-minus-bottom-quintile forward return.
|
||||
mean-reverts. Q5−Q1 is the top-minus-bottom-quintile forward return. <span className="text-gray-600">Greyed
|
||||
rows have too few independent windows to trust — deepen history via the Data Backfill job.</span>
|
||||
</p>
|
||||
<div className="glass overflow-x-auto">
|
||||
<table className="w-full text-sm">
|
||||
@@ -298,9 +299,15 @@ export function BacktestPanel() {
|
||||
</thead>
|
||||
<tbody>
|
||||
{report.signal_eval.map((row) => {
|
||||
const edge = Math.abs(row.mean_ic) >= IC_EDGE_THRESHOLD;
|
||||
// Only trust the edge highlight when the IC rests on enough
|
||||
// independent windows; thin signals are dimmed, not starred.
|
||||
const edge = row.reliable && Math.abs(row.mean_ic) >= IC_EDGE_THRESHOLD;
|
||||
return (
|
||||
<tr key={row.signal} className={`border-b border-white/[0.04] ${edge ? 'bg-emerald-400/[0.06]' : ''}`}>
|
||||
<tr
|
||||
key={row.signal}
|
||||
className={`border-b border-white/[0.04] ${edge ? 'bg-emerald-400/[0.06]' : ''} ${row.reliable ? '' : 'opacity-40'}`}
|
||||
title={row.reliable ? undefined : `Only ${row.weeks} independent window(s) — not enough to trust`}
|
||||
>
|
||||
<td className="px-4 py-2.5 font-medium text-gray-200">
|
||||
{edge && <span className="mr-1 text-emerald-300">★</span>}
|
||||
{SIGNAL_LABELS[row.signal] ?? row.signal}
|
||||
|
||||
@@ -232,6 +232,7 @@ export interface BacktestSignalEvalRow {
|
||||
ic_t_stat: number | null;
|
||||
ic_positive_pct: number;
|
||||
mean_quintile_spread: number | null;
|
||||
reliable: boolean;
|
||||
}
|
||||
|
||||
export interface BacktestReport {
|
||||
|
||||
@@ -79,6 +79,7 @@ class TestConfigureScheduler:
|
||||
job_ids = {j.id for j in jobs}
|
||||
assert job_ids == {
|
||||
"data_collector",
|
||||
"data_backfill",
|
||||
"sentiment_collector",
|
||||
"fundamental_collector",
|
||||
"rr_scanner",
|
||||
@@ -103,6 +104,7 @@ class TestConfigureScheduler:
|
||||
"daily_pipeline",
|
||||
"intraday_pipeline",
|
||||
"data_collector",
|
||||
"data_backfill",
|
||||
"fundamental_collector",
|
||||
"market_regime",
|
||||
"outcome_evaluator",
|
||||
|
||||
@@ -94,18 +94,17 @@ def _records(closes: list[float]) -> list[SimpleNamespace]:
|
||||
|
||||
def test_signal_evaluation_separates_edge_from_noise():
|
||||
rng = random.Random(42)
|
||||
# Build a synthetic cross-section directly: 30 weeks, 40 names each.
|
||||
# "edge" perfectly orders the forward return; "noise" is independent of it.
|
||||
collected: dict = {
|
||||
"edge": {},
|
||||
"noise": {},
|
||||
}
|
||||
for week in range(30):
|
||||
# 120 consecutive weeks, 40 names each. After non-overlapping thinning
|
||||
# (stride = HORIZON/5 = 6) that leaves 20 independent windows — above the
|
||||
# reliability bar. "edge" perfectly orders the forward return; "noise" is
|
||||
# independent of it.
|
||||
collected: dict = {"edge": {}, "noise": {}}
|
||||
for week in range(120):
|
||||
edge_recs = []
|
||||
noise_recs = []
|
||||
for _ in range(40):
|
||||
fwd = rng.gauss(0, 0.05)
|
||||
edge_recs.append((fwd, fwd)) # signal == fwd → IC = 1
|
||||
edge_recs.append((fwd, fwd)) # signal == fwd → IC = 1
|
||||
noise_recs.append((rng.gauss(0, 1), fwd)) # signal ⟂ fwd → IC ≈ 0
|
||||
collected["edge"][(2020, week)] = edge_recs
|
||||
collected["noise"][(2020, week)] = noise_recs
|
||||
@@ -113,13 +112,33 @@ def test_signal_evaluation_separates_edge_from_noise():
|
||||
rows = {r["signal"]: r for r in bt._signal_evaluation(collected)}
|
||||
|
||||
assert rows["edge"]["mean_ic"] == 1.0
|
||||
assert rows["edge"]["weeks"] == 20 # 120 weeks thinned to non-overlapping
|
||||
assert rows["edge"]["reliable"] is True
|
||||
assert rows["edge"]["ic_positive_pct"] == 100.0
|
||||
assert rows["edge"]["mean_quintile_spread"] > 0
|
||||
assert abs(rows["noise"]["mean_ic"]) < 0.15 # indistinguishable from zero
|
||||
assert abs(rows["noise"]["mean_ic"]) < 0.15 # indistinguishable from zero
|
||||
# Rows are sorted by mean_ic descending: the real signal ranks first.
|
||||
assert bt._signal_evaluation(collected)[0]["signal"] == "edge"
|
||||
|
||||
|
||||
def test_signal_evaluation_flags_too_few_windows_unreliable():
|
||||
# 5 adjacent weeks collapse to a single non-overlapping window → unreliable.
|
||||
collected: dict = {
|
||||
"edge": {(2020, w): [(float(i), float(i)) for i in range(40)] for w in range(5)}
|
||||
}
|
||||
row = bt._signal_evaluation(collected)[0]
|
||||
assert row["weeks"] == 1
|
||||
assert row["reliable"] is False
|
||||
|
||||
|
||||
def test_nonoverlapping_weeks_thins_by_stride():
|
||||
weeks = [(2020, w) for w in range(1, 13)] # 12 consecutive ISO weeks
|
||||
kept = bt._nonoverlapping_weeks(weeks, stride=6)
|
||||
assert kept == [(2020, 1), (2020, 7)] # 6 apart, no overlap
|
||||
# Stride 1 keeps everything; ordering is chronological.
|
||||
assert bt._nonoverlapping_weeks(list(reversed(weeks)), stride=1) == weeks
|
||||
|
||||
|
||||
def test_signal_evaluation_skips_thin_weeks():
|
||||
# A week with fewer than MIN_CROSS_SECTION names is ignored entirely.
|
||||
collected: dict = {"edge": {(2020, 1): [(1.0, 1.0)] * (bt.MIN_CROSS_SECTION - 1)}}
|
||||
|
||||
Reference in New Issue
Block a user