Files
signal-platform/app/services/ingestion_service.py
T
dennisthiessen 099846513b
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 39s
Deploy / deploy (push) Successful in 25s
deepen OHLCV history + make the factor-IC pass honest about overlap/regime
Two changes so the cross-sectional signal results can actually be trusted.

(a) History depth — the binding constraint. Ingestion defaulted to 365 days, so
long-lookback factors (12-month momentum, 52-week high) were only computable on a
handful of weeks at the tail, and every IC reflected a single market regime.
- New `settings.ohlcv_history_days` (default 1825 ≈ 5y); new tickers backfill this
  far instead of 1 year.
- New manual "data_backfill" job (Admin → Jobs) re-fetches the full window for
  every ticker, ignoring incremental resume — run once to deepen existing
  1-year histories. Idempotent (upsert); resumes after rate limits.

(b) Factor-IC honesty. The IC was averaged over weekly rebalances whose 30-day
forward windows overlap, inflating the t-stat ~sqrt(6)x.
- IC now measured on NON-OVERLAPPING windows (weeks thinned to ~HORIZON apart).
- Each signal carries a `reliable` flag (>= 12 independent windows); BacktestPanel
  greys out and de-stars thin signals so a lucky 9-week IC of 0.3 can't masquerade
  as an edge.

332 backend tests pass; frontend build clean. No migration (config + job + an
added JSON field on the cached backtest report).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 18:20:59 +02:00

192 lines
6.3 KiB
Python

"""Ingestion Pipeline service: fetch from provider, validate, upsert into Price Store.
Handles rate-limit resume via IngestionProgress and provider error isolation.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import date, timedelta
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.exceptions import NotFoundError, ProviderError, RateLimitError
from app.models.ohlcv import OHLCVRecord
from app.models.settings import IngestionProgress
from app.models.ticker import Ticker
from app.providers.protocol import MarketDataProvider
from app.services import price_service
logger = logging.getLogger(__name__)
@dataclass
class IngestionResult:
"""Result of an ingestion run."""
symbol: str
records_ingested: int
last_date: date | None
status: str # "complete" | "partial" | "error"
message: str | None = None
async def _get_ticker(db: AsyncSession, symbol: str) -> Ticker:
"""Look up ticker by symbol. Raises NotFoundError if missing."""
normalised = symbol.strip().upper()
result = await db.execute(select(Ticker).where(Ticker.symbol == normalised))
ticker = result.scalar_one_or_none()
if ticker is None:
raise NotFoundError(f"Ticker not found: {normalised}")
return ticker
async def _get_progress(db: AsyncSession, ticker_id: int) -> IngestionProgress | None:
"""Get the IngestionProgress record for a ticker, if any."""
result = await db.execute(
select(IngestionProgress).where(IngestionProgress.ticker_id == ticker_id)
)
return result.scalar_one_or_none()
async def _get_ohlcv_bar_count(db: AsyncSession, ticker_id: int) -> int:
result = await db.execute(
select(func.count()).select_from(OHLCVRecord).where(OHLCVRecord.ticker_id == ticker_id)
)
return int(result.scalar() or 0)
async def _update_progress(
db: AsyncSession, ticker_id: int, last_date: date
) -> None:
"""Create or update the IngestionProgress record for a ticker."""
progress = await _get_progress(db, ticker_id)
if progress is None:
progress = IngestionProgress(ticker_id=ticker_id, last_ingested_date=last_date)
db.add(progress)
else:
progress.last_ingested_date = last_date
await db.commit()
async def fetch_and_ingest(
db: AsyncSession,
provider: MarketDataProvider,
symbol: str,
start_date: date | None = None,
end_date: date | None = None,
) -> IngestionResult:
"""Fetch OHLCV data from provider and upsert into Price Store.
- Resolves start_date from IngestionProgress if not provided (resume).
- Defaults end_date to today.
- Tracks last_ingested_date after each successful upsert.
- On RateLimitError from provider: returns partial progress.
- On ProviderError: returns error, no data modification.
"""
ticker = await _get_ticker(db, symbol)
# Resolve end_date
if end_date is None:
end_date = date.today()
# Resolve start_date: use progress resume or backfill the configured history
# window. If we have too little history, force a full backfill even if
# ingestion progress exists (upsert makes this safe and idempotent). A caller
# that passes an explicit start_date (e.g. the manual deep-backfill job)
# bypasses this entirely.
if start_date is None:
progress = await _get_progress(db, ticker.id)
bar_count = await _get_ohlcv_bar_count(db, ticker.id)
minimum_backfill_bars = 200
backfill_start = end_date - timedelta(days=settings.ohlcv_history_days)
if bar_count < minimum_backfill_bars:
start_date = backfill_start
elif progress is not None:
start_date = progress.last_ingested_date + timedelta(days=1)
else:
start_date = backfill_start
# If start > end, nothing to fetch
if start_date > end_date:
return IngestionResult(
symbol=ticker.symbol,
records_ingested=0,
last_date=None,
status="complete",
message="Already up to date",
)
# Fetch from provider
try:
records = await provider.fetch_ohlcv(ticker.symbol, start_date, end_date)
except RateLimitError:
# No data fetched at all — return partial with 0 records
return IngestionResult(
symbol=ticker.symbol,
records_ingested=0,
last_date=None,
status="partial",
message="Rate limited before any records fetched. Resume available.",
)
except ProviderError as exc:
logger.error("Provider error for %s: %s", ticker.symbol, exc)
return IngestionResult(
symbol=ticker.symbol,
records_ingested=0,
last_date=None,
status="error",
message=str(exc),
)
# Sort records by date to ensure ordered ingestion
records.sort(key=lambda r: r.date)
ingested_count = 0
last_ingested: date | None = None
for record in records:
try:
await price_service.upsert_ohlcv(
db,
symbol=ticker.symbol,
record_date=record.date,
open_=record.open,
high=record.high,
low=record.low,
close=record.close,
volume=record.volume,
)
ingested_count += 1
last_ingested = record.date
# Update progress after each successful upsert
await _update_progress(db, ticker.id, record.date)
except RateLimitError:
# Mid-ingestion rate limit — return partial progress
logger.warning(
"Rate limited during ingestion for %s after %d records",
ticker.symbol,
ingested_count,
)
return IngestionResult(
symbol=ticker.symbol,
records_ingested=ingested_count,
last_date=last_ingested,
status="partial",
message=f"Rate limited. Ingested {ingested_count} records. Resume available.",
)
return IngestionResult(
symbol=ticker.symbol,
records_ingested=ingested_count,
last_date=last_ingested,
status="complete",
message=f"Successfully ingested {ingested_count} records",
)