From 099846513bcbf569f8dbf9814b06e6cbf291912d Mon Sep 17 00:00:00 2001 From: Dennis Thiessen Date: Tue, 23 Jun 2026 18:20:59 +0200 Subject: [PATCH] deepen OHLCV history + make the factor-IC pass honest about overlap/regime MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two changes so the cross-sectional signal results can actually be trusted. (a) History depth — the binding constraint. Ingestion defaulted to 365 days, so long-lookback factors (12-month momentum, 52-week high) were only computable on a handful of weeks at the tail, and every IC reflected a single market regime. - New `settings.ohlcv_history_days` (default 1825 ≈ 5y); new tickers backfill this far instead of 1 year. - New manual "data_backfill" job (Admin → Jobs) re-fetches the full window for every ticker, ignoring incremental resume — run once to deepen existing 1-year histories. Idempotent (upsert); resumes after rate limits. (b) Factor-IC honesty. The IC was averaged over weekly rebalances whose 30-day forward windows overlap, inflating the t-stat ~sqrt(6)x. - IC now measured on NON-OVERLAPPING windows (weeks thinned to ~HORIZON apart). - Each signal carries a `reliable` flag (>= 12 independent windows); BacktestPanel greys out and de-stars thin signals so a lucky 9-week IC of 0.3 can't masquerade as an edge. 332 backend tests pass; frontend build clean. No migration (config + job + an added JSON field on the cached backtest report). Co-Authored-By: Claude Opus 4.8 --- app/config.py | 6 ++ app/scheduler.py | 47 +++++++++++++-- app/services/admin_service.py | 2 + app/services/backtest_service.py | 60 ++++++++++++++----- app/services/ingestion_service.py | 14 +++-- .../src/components/signals/BacktestPanel.tsx | 17 ++++-- frontend/src/lib/types.ts | 1 + tests/unit/test_scheduler.py | 2 + tests/unit/test_signal_eval.py | 37 +++++++++--- 9 files changed, 148 insertions(+), 38 deletions(-) diff --git a/app/config.py b/app/config.py index 12259b0..9441acd 100644 --- a/app/config.py +++ b/app/config.py @@ -67,6 +67,12 @@ class Settings(BaseSettings): # Outcome evaluation: trading days before an undecided setup expires outcome_evaluation_max_bars: int = 30 + # OHLCV history depth to fetch. New tickers backfill this far; the manual + # "data_backfill" job re-fetches the full window for everyone. ~5 years so + # long-lookback factors (12-month momentum, 52-week high) and multi-regime + # backtests become computable. ~252 trading days/year. + ohlcv_history_days: int = 1825 + # Database Pool db_pool_size: int = 5 db_pool_timeout: int = 30 diff --git a/app/scheduler.py b/app/scheduler.py index a7ebe35..4ee2d5f 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -65,6 +65,7 @@ scheduler = AsyncIOScheduler( # Track last successful ticker per job for rate-limit resume _last_successful: dict[str, str | None] = { "data_collector": None, + "data_backfill": None, "sentiment_collector": None, "fundamental_collector": None, } @@ -81,6 +82,17 @@ _job_runtime: dict[str, dict[str, object]] = { "finished_at": None, "message": None, }, + "data_backfill": { + "running": False, + "status": "idle", + "processed": 0, + "total": None, + "progress_pct": None, + "current_ticker": None, + "started_at": None, + "finished_at": None, + "message": None, + }, "sentiment_collector": { "running": False, "status": "idle", @@ -392,16 +404,20 @@ def _chunked(symbols: list[str], chunk_size: int) -> list[list[str]]: # --------------------------------------------------------------------------- -async def collect_ohlcv() -> None: +async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_collector") -> None: """Fetch latest daily OHLCV for all tracked tickers. Uses AlpacaOHLCVProvider. Processes each ticker independently. On rate limit, records last successful ticker for resume. Start date is resolved by ingestion progress: - existing ticker: resume from last_ingested_date + 1 - - new ticker: backfill ~1 year by default + - new ticker: backfill the configured history window + + ``full_backfill`` forces every ticker to re-fetch the full + ``settings.ohlcv_history_days`` window (ignoring incremental resume) — used by + the manual data_backfill job to deepen shallow histories. ``job_name`` lets the + backfill report its own runtime/resume state separate from data_collector. """ - job_name = "data_collector" logger.info(json.dumps({"event": "job_start", "job": job_name})) _runtime_start(job_name) processed = 0 @@ -437,13 +453,18 @@ async def collect_ohlcv() -> None: return end_date = date.today() + # Full backfill: pass an explicit start_date so fetch_and_ingest re-pulls + # the whole window instead of resuming from the last stored bar. + backfill_start = ( + end_date - timedelta(days=settings.ohlcv_history_days) if full_backfill else None + ) for symbol in symbols: _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) async with async_session_factory() as db: try: result = await ingestion_service.fetch_and_ingest( - db, provider, symbol, start_date=None, end_date=end_date, + db, provider, symbol, start_date=backfill_start, end_date=end_date, ) _last_successful[job_name] = symbol processed += 1 @@ -477,6 +498,17 @@ async def collect_ohlcv() -> None: _runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc)) +async def backfill_ohlcv() -> None: + """Deep historical backfill: re-fetch the full ``settings.ohlcv_history_days`` + window for every ticker, ignoring incremental resume. + + Manual/triggered job (Admin → Jobs). Run once to deepen the ~1-year histories + so long-lookback factors (12-month momentum, 52-week high) and multi-regime + backtests become computable. Idempotent (upsert); resumes after rate limits. + """ + await collect_ohlcv(full_backfill=True, job_name="data_backfill") + + # --------------------------------------------------------------------------- # Job: Sentiment Collector # --------------------------------------------------------------------------- @@ -1227,6 +1259,13 @@ def configure_scheduler(schedule_config: dict[str, str] | None = None) -> None: run_backtest_job, "interval", hours=168, id="backtest", name="Backtest", replace_existing=True, ) + # Deep history backfill: manual only (never auto-fires); triggered from + # Admin → Jobs when histories need deepening. + scheduler.add_job( + backfill_ohlcv, "interval", weeks=520, + id="data_backfill", name="Data Backfill (deep history)", + replace_existing=True, next_run_time=None, + ) logger.info( json.dumps({ diff --git a/app/services/admin_service.py b/app/services/admin_service.py index e8a5049..7e85705 100644 --- a/app/services/admin_service.py +++ b/app/services/admin_service.py @@ -538,6 +538,7 @@ async def get_pipeline_readiness(db: AsyncSession) -> list[dict]: VALID_JOB_NAMES = { "data_collector", + "data_backfill", "sentiment_collector", "fundamental_collector", "rr_scanner", @@ -552,6 +553,7 @@ VALID_JOB_NAMES = { JOB_LABELS = { "data_collector": "Data Collector (OHLCV)", + "data_backfill": "Data Backfill (deep history)", "sentiment_collector": "Sentiment Collector", "fundamental_collector": "Fundamental Collector", "rr_scanner": "R:R Scanner", diff --git a/app/services/backtest_service.py b/app/services/backtest_service.py index 95cd537..a36fdf7 100644 --- a/app/services/backtest_service.py +++ b/app/services/backtest_service.py @@ -79,7 +79,8 @@ _CAL_BUCKETS = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100.01)] # ranking stocks by this signal sort tomorrow's winners from losers. This is the # test the per-setup hit-rate report can't do: it measures predictive power of a # signal, not the outcome of a target/stop structure built on top of one. -MIN_CROSS_SECTION = 20 # min tickers present in a week to score that week +MIN_CROSS_SECTION = 20 # min tickers present in a week to score that week +MIN_RELIABLE_PERIODS = 12 # min non-overlapping windows before a signal's IC is trusted def _wrap_levels(level_dicts: list[dict]) -> list[Any]: @@ -407,26 +408,53 @@ def _quintile_spread(pairs: list[tuple[float, float]]) -> float | None: return sum(p[1] for p in top) / k - sum(p[1] for p in bottom) / k +def _week_ordinal(week_key: tuple[int, int]) -> int: + """Monotonic absolute week number from an (ISO year, ISO week) key.""" + year, week = week_key + return year * 53 + week + + +def _nonoverlapping_weeks( + week_keys: list[tuple[int, int]], stride: int +) -> list[tuple[int, int]]: + """Thin to weeks at least ``stride`` apart so their forward windows don't + overlap — greedy earliest-first. Removes the autocorrelation that would + otherwise inflate the IC t-stat across adjacent weekly rebalances.""" + kept: list[tuple[int, int]] = [] + last: int | None = None + for wk in sorted(week_keys, key=_week_ordinal): + o = _week_ordinal(wk) + if last is None or o - last >= stride: + kept.append(wk) + last = o + return kept + + def _signal_evaluation(collected: dict) -> list[dict]: """Per-signal factor diagnostics, one row per candidate signal: - mean_ic average weekly rank-IC (Spearman of signal vs fwd ret) + mean_ic average rank-IC (Spearman of signal vs fwd ret) ic_t_stat mean_ic / stderr — is the IC reliably non-zero? - ic_positive_pct share of weeks the IC is positive (consistency) + ic_positive_pct share of windows the IC is positive (consistency) mean_quintile_spread avg top-minus-bottom-quintile forward return + reliable True once there are >= MIN_RELIABLE_PERIODS windows - A signal with no edge lands near IC 0 and spread 0. Caveat: weekly rebalances - with a HORIZON-day forward window overlap, so the t-stat overstates - significance — read it as directional, alongside ic_positive_pct. + IC is measured on NON-OVERLAPPING forward windows (weeks thinned to ~HORIZON + apart) so the t-stat isn't inflated by autocorrelation. A signal with no edge + lands near IC 0 / spread 0; one with too few independent windows is flagged + unreliable rather than trusted on a lucky handful. """ + stride = max(1, round(HORIZON / 5)) # ISO weeks spanned by the forward window rows: list[dict] = [] for name in sorted(collected): + weeks_map = collected[name] + usable = [wk for wk, recs in weeks_map.items() if len(recs) >= MIN_CROSS_SECTION] + kept = _nonoverlapping_weeks(usable, stride) ics: list[float] = [] spreads: list[float] = [] sizes: list[int] = [] - for recs in collected[name].values(): - if len(recs) < MIN_CROSS_SECTION: - continue + for wk in kept: + recs = weeks_map[wk] ic = _spearman([r[0] for r in recs], [r[1] for r in recs]) if ic is not None: ics.append(ic) @@ -450,6 +478,7 @@ def _signal_evaluation(collected: dict) -> list[dict]: "ic_t_stat": round(t_stat, 2) if t_stat is not None else None, "ic_positive_pct": round(sum(1 for x in ics if x > 0) / len(ics) * 100, 1), "mean_quintile_spread": round(sum(spreads) / len(spreads), 4) if spreads else None, + "reliable": len(ics) >= MIN_RELIABLE_PERIODS, }) rows.sort(key=lambda r: r["mean_ic"], reverse=True) return rows @@ -518,12 +547,13 @@ async def run_backtest( "signal_eval": _signal_evaluation(collected), "signal_eval_note": ( "Cross-sectional rank-IC of price-only signals vs the forward " - f"{HORIZON}-day return (weekly rebalance, min {MIN_CROSS_SECTION} " - "names/week). |IC| ≳ 0.03 with a consistent sign is a real (if small) " - "edge; near 0 means ranking on it sorts nothing. Momentum factors and " - "high_52w are expected positive; reversal_1m and vol_6m are expected " - "negative (mean-reversion / low-vol anomaly). Overlapping windows inflate " - "the t-stat — read directionally." + f"{HORIZON}-day return (min {MIN_CROSS_SECTION} names/window). |IC| ≳ " + "0.03 with a consistent sign is a real (if small) edge; near 0 means " + "ranking on it sorts nothing. Momentum factors and high_52w are expected " + "positive; reversal_1m and vol_6m expected negative (mean-reversion / " + "low-vol anomaly). IC is measured on non-overlapping windows; signals " + f"with fewer than {MIN_RELIABLE_PERIODS} independent windows are flagged " + "unreliable (too few regimes — deepen history with the Data Backfill job)." ), "note": ( "Sentiment & fundamentals held neutral (no point-in-time history). " diff --git a/app/services/ingestion_service.py b/app/services/ingestion_service.py index 25dc1df..826a6db 100644 --- a/app/services/ingestion_service.py +++ b/app/services/ingestion_service.py @@ -12,6 +12,7 @@ from datetime import date, timedelta from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession +from app.config import settings from app.exceptions import NotFoundError, ProviderError, RateLimitError from app.models.ohlcv import OHLCVRecord from app.models.settings import IngestionProgress @@ -92,20 +93,23 @@ async def fetch_and_ingest( if end_date is None: end_date = date.today() - # Resolve start_date: use progress resume or default to 1 year ago. - # If we have too little history, force a one-year backfill even if - # ingestion progress exists (upsert makes this safe and idempotent). + # Resolve start_date: use progress resume or backfill the configured history + # window. If we have too little history, force a full backfill even if + # ingestion progress exists (upsert makes this safe and idempotent). A caller + # that passes an explicit start_date (e.g. the manual deep-backfill job) + # bypasses this entirely. if start_date is None: progress = await _get_progress(db, ticker.id) bar_count = await _get_ohlcv_bar_count(db, ticker.id) minimum_backfill_bars = 200 + backfill_start = end_date - timedelta(days=settings.ohlcv_history_days) if bar_count < minimum_backfill_bars: - start_date = end_date - timedelta(days=365) + start_date = backfill_start elif progress is not None: start_date = progress.last_ingested_date + timedelta(days=1) else: - start_date = end_date - timedelta(days=365) + start_date = backfill_start # If start > end, nothing to fetch if start_date > end_date: diff --git a/frontend/src/components/signals/BacktestPanel.tsx b/frontend/src/components/signals/BacktestPanel.tsx index eba1607..639fe2d 100644 --- a/frontend/src/components/signals/BacktestPanel.tsx +++ b/frontend/src/components/signals/BacktestPanel.tsx @@ -277,11 +277,12 @@ export function BacktestPanel() {

Does ranking the universe by a signal predict the forward {report.params.horizon_days}-day - return? Mean IC is the rank correlation between signal and return, averaged over weekly - rebalances. |IC| ≳ {IC_EDGE_THRESHOLD} with a + return? Mean IC is the rank correlation between signal and return, averaged over + non-overlapping windows. |IC| ≳ {IC_EDGE_THRESHOLD} with a consistent sign (high IC>0 %) is a real, if small, edge; near 0 means it sorts nothing. Momentum skips the last month; reversal_1m is expected negative if the universe - mean-reverts. Q5−Q1 is the top-minus-bottom-quintile forward return. + mean-reverts. Q5−Q1 is the top-minus-bottom-quintile forward return. Greyed + rows have too few independent windows to trust — deepen history via the Data Backfill job.

@@ -298,9 +299,15 @@ export function BacktestPanel() { {report.signal_eval.map((row) => { - const edge = Math.abs(row.mean_ic) >= IC_EDGE_THRESHOLD; + // Only trust the edge highlight when the IC rests on enough + // independent windows; thin signals are dimmed, not starred. + const edge = row.reliable && Math.abs(row.mean_ic) >= IC_EDGE_THRESHOLD; return ( - +
{edge && } {SIGNAL_LABELS[row.signal] ?? row.signal} diff --git a/frontend/src/lib/types.ts b/frontend/src/lib/types.ts index 0657e45..df055bc 100644 --- a/frontend/src/lib/types.ts +++ b/frontend/src/lib/types.ts @@ -232,6 +232,7 @@ export interface BacktestSignalEvalRow { ic_t_stat: number | null; ic_positive_pct: number; mean_quintile_spread: number | null; + reliable: boolean; } export interface BacktestReport { diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 70b2df6..68164e0 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -79,6 +79,7 @@ class TestConfigureScheduler: job_ids = {j.id for j in jobs} assert job_ids == { "data_collector", + "data_backfill", "sentiment_collector", "fundamental_collector", "rr_scanner", @@ -103,6 +104,7 @@ class TestConfigureScheduler: "daily_pipeline", "intraday_pipeline", "data_collector", + "data_backfill", "fundamental_collector", "market_regime", "outcome_evaluator", diff --git a/tests/unit/test_signal_eval.py b/tests/unit/test_signal_eval.py index 5edb009..78f1194 100644 --- a/tests/unit/test_signal_eval.py +++ b/tests/unit/test_signal_eval.py @@ -94,18 +94,17 @@ def _records(closes: list[float]) -> list[SimpleNamespace]: def test_signal_evaluation_separates_edge_from_noise(): rng = random.Random(42) - # Build a synthetic cross-section directly: 30 weeks, 40 names each. - # "edge" perfectly orders the forward return; "noise" is independent of it. - collected: dict = { - "edge": {}, - "noise": {}, - } - for week in range(30): + # 120 consecutive weeks, 40 names each. After non-overlapping thinning + # (stride = HORIZON/5 = 6) that leaves 20 independent windows — above the + # reliability bar. "edge" perfectly orders the forward return; "noise" is + # independent of it. + collected: dict = {"edge": {}, "noise": {}} + for week in range(120): edge_recs = [] noise_recs = [] for _ in range(40): fwd = rng.gauss(0, 0.05) - edge_recs.append((fwd, fwd)) # signal == fwd → IC = 1 + edge_recs.append((fwd, fwd)) # signal == fwd → IC = 1 noise_recs.append((rng.gauss(0, 1), fwd)) # signal ⟂ fwd → IC ≈ 0 collected["edge"][(2020, week)] = edge_recs collected["noise"][(2020, week)] = noise_recs @@ -113,13 +112,33 @@ def test_signal_evaluation_separates_edge_from_noise(): rows = {r["signal"]: r for r in bt._signal_evaluation(collected)} assert rows["edge"]["mean_ic"] == 1.0 + assert rows["edge"]["weeks"] == 20 # 120 weeks thinned to non-overlapping + assert rows["edge"]["reliable"] is True assert rows["edge"]["ic_positive_pct"] == 100.0 assert rows["edge"]["mean_quintile_spread"] > 0 - assert abs(rows["noise"]["mean_ic"]) < 0.15 # indistinguishable from zero + assert abs(rows["noise"]["mean_ic"]) < 0.15 # indistinguishable from zero # Rows are sorted by mean_ic descending: the real signal ranks first. assert bt._signal_evaluation(collected)[0]["signal"] == "edge" +def test_signal_evaluation_flags_too_few_windows_unreliable(): + # 5 adjacent weeks collapse to a single non-overlapping window → unreliable. + collected: dict = { + "edge": {(2020, w): [(float(i), float(i)) for i in range(40)] for w in range(5)} + } + row = bt._signal_evaluation(collected)[0] + assert row["weeks"] == 1 + assert row["reliable"] is False + + +def test_nonoverlapping_weeks_thins_by_stride(): + weeks = [(2020, w) for w in range(1, 13)] # 12 consecutive ISO weeks + kept = bt._nonoverlapping_weeks(weeks, stride=6) + assert kept == [(2020, 1), (2020, 7)] # 6 apart, no overlap + # Stride 1 keeps everything; ordering is chronological. + assert bt._nonoverlapping_weeks(list(reversed(weeks)), stride=1) == weeks + + def test_signal_evaluation_skips_thin_weeks(): # A week with fewer than MIN_CROSS_SECTION names is ignored entirely. collected: dict = {"edge": {(2020, 1): [(1.0, 1.0)] * (bt.MIN_CROSS_SECTION - 1)}}