099846513b
Two changes so the cross-sectional signal results can actually be trusted. (a) History depth — the binding constraint. Ingestion defaulted to 365 days, so long-lookback factors (12-month momentum, 52-week high) were only computable on a handful of weeks at the tail, and every IC reflected a single market regime. - New `settings.ohlcv_history_days` (default 1825 ≈ 5y); new tickers backfill this far instead of 1 year. - New manual "data_backfill" job (Admin → Jobs) re-fetches the full window for every ticker, ignoring incremental resume — run once to deepen existing 1-year histories. Idempotent (upsert); resumes after rate limits. (b) Factor-IC honesty. The IC was averaged over weekly rebalances whose 30-day forward windows overlap, inflating the t-stat ~sqrt(6)x. - IC now measured on NON-OVERLAPPING windows (weeks thinned to ~HORIZON apart). - Each signal carries a `reliable` flag (>= 12 independent windows); BacktestPanel greys out and de-stars thin signals so a lucky 9-week IC of 0.3 can't masquerade as an edge. 332 backend tests pass; frontend build clean. No migration (config + job + an added JSON field on the cached backtest report). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
192 lines
6.3 KiB
Python
192 lines
6.3 KiB
Python
"""Ingestion Pipeline service: fetch from provider, validate, upsert into Price Store.
|
|
|
|
Handles rate-limit resume via IngestionProgress and provider error isolation.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import date, timedelta
|
|
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import settings
|
|
from app.exceptions import NotFoundError, ProviderError, RateLimitError
|
|
from app.models.ohlcv import OHLCVRecord
|
|
from app.models.settings import IngestionProgress
|
|
from app.models.ticker import Ticker
|
|
from app.providers.protocol import MarketDataProvider
|
|
from app.services import price_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class IngestionResult:
|
|
"""Result of an ingestion run."""
|
|
|
|
symbol: str
|
|
records_ingested: int
|
|
last_date: date | None
|
|
status: str # "complete" | "partial" | "error"
|
|
message: str | None = None
|
|
|
|
|
|
async def _get_ticker(db: AsyncSession, symbol: str) -> Ticker:
|
|
"""Look up ticker by symbol. Raises NotFoundError if missing."""
|
|
normalised = symbol.strip().upper()
|
|
result = await db.execute(select(Ticker).where(Ticker.symbol == normalised))
|
|
ticker = result.scalar_one_or_none()
|
|
if ticker is None:
|
|
raise NotFoundError(f"Ticker not found: {normalised}")
|
|
return ticker
|
|
|
|
|
|
async def _get_progress(db: AsyncSession, ticker_id: int) -> IngestionProgress | None:
|
|
"""Get the IngestionProgress record for a ticker, if any."""
|
|
result = await db.execute(
|
|
select(IngestionProgress).where(IngestionProgress.ticker_id == ticker_id)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
|
|
async def _get_ohlcv_bar_count(db: AsyncSession, ticker_id: int) -> int:
|
|
result = await db.execute(
|
|
select(func.count()).select_from(OHLCVRecord).where(OHLCVRecord.ticker_id == ticker_id)
|
|
)
|
|
return int(result.scalar() or 0)
|
|
|
|
|
|
async def _update_progress(
|
|
db: AsyncSession, ticker_id: int, last_date: date
|
|
) -> None:
|
|
"""Create or update the IngestionProgress record for a ticker."""
|
|
progress = await _get_progress(db, ticker_id)
|
|
if progress is None:
|
|
progress = IngestionProgress(ticker_id=ticker_id, last_ingested_date=last_date)
|
|
db.add(progress)
|
|
else:
|
|
progress.last_ingested_date = last_date
|
|
await db.commit()
|
|
|
|
|
|
async def fetch_and_ingest(
|
|
db: AsyncSession,
|
|
provider: MarketDataProvider,
|
|
symbol: str,
|
|
start_date: date | None = None,
|
|
end_date: date | None = None,
|
|
) -> IngestionResult:
|
|
"""Fetch OHLCV data from provider and upsert into Price Store.
|
|
|
|
- Resolves start_date from IngestionProgress if not provided (resume).
|
|
- Defaults end_date to today.
|
|
- Tracks last_ingested_date after each successful upsert.
|
|
- On RateLimitError from provider: returns partial progress.
|
|
- On ProviderError: returns error, no data modification.
|
|
"""
|
|
ticker = await _get_ticker(db, symbol)
|
|
|
|
# Resolve end_date
|
|
if end_date is None:
|
|
end_date = date.today()
|
|
|
|
# Resolve start_date: use progress resume or backfill the configured history
|
|
# window. If we have too little history, force a full backfill even if
|
|
# ingestion progress exists (upsert makes this safe and idempotent). A caller
|
|
# that passes an explicit start_date (e.g. the manual deep-backfill job)
|
|
# bypasses this entirely.
|
|
if start_date is None:
|
|
progress = await _get_progress(db, ticker.id)
|
|
bar_count = await _get_ohlcv_bar_count(db, ticker.id)
|
|
minimum_backfill_bars = 200
|
|
backfill_start = end_date - timedelta(days=settings.ohlcv_history_days)
|
|
|
|
if bar_count < minimum_backfill_bars:
|
|
start_date = backfill_start
|
|
elif progress is not None:
|
|
start_date = progress.last_ingested_date + timedelta(days=1)
|
|
else:
|
|
start_date = backfill_start
|
|
|
|
# If start > end, nothing to fetch
|
|
if start_date > end_date:
|
|
return IngestionResult(
|
|
symbol=ticker.symbol,
|
|
records_ingested=0,
|
|
last_date=None,
|
|
status="complete",
|
|
message="Already up to date",
|
|
)
|
|
|
|
# Fetch from provider
|
|
try:
|
|
records = await provider.fetch_ohlcv(ticker.symbol, start_date, end_date)
|
|
except RateLimitError:
|
|
# No data fetched at all — return partial with 0 records
|
|
return IngestionResult(
|
|
symbol=ticker.symbol,
|
|
records_ingested=0,
|
|
last_date=None,
|
|
status="partial",
|
|
message="Rate limited before any records fetched. Resume available.",
|
|
)
|
|
except ProviderError as exc:
|
|
logger.error("Provider error for %s: %s", ticker.symbol, exc)
|
|
return IngestionResult(
|
|
symbol=ticker.symbol,
|
|
records_ingested=0,
|
|
last_date=None,
|
|
status="error",
|
|
message=str(exc),
|
|
)
|
|
|
|
# Sort records by date to ensure ordered ingestion
|
|
records.sort(key=lambda r: r.date)
|
|
|
|
ingested_count = 0
|
|
last_ingested: date | None = None
|
|
|
|
for record in records:
|
|
try:
|
|
await price_service.upsert_ohlcv(
|
|
db,
|
|
symbol=ticker.symbol,
|
|
record_date=record.date,
|
|
open_=record.open,
|
|
high=record.high,
|
|
low=record.low,
|
|
close=record.close,
|
|
volume=record.volume,
|
|
)
|
|
ingested_count += 1
|
|
last_ingested = record.date
|
|
|
|
# Update progress after each successful upsert
|
|
await _update_progress(db, ticker.id, record.date)
|
|
|
|
except RateLimitError:
|
|
# Mid-ingestion rate limit — return partial progress
|
|
logger.warning(
|
|
"Rate limited during ingestion for %s after %d records",
|
|
ticker.symbol,
|
|
ingested_count,
|
|
)
|
|
return IngestionResult(
|
|
symbol=ticker.symbol,
|
|
records_ingested=ingested_count,
|
|
last_date=last_ingested,
|
|
status="partial",
|
|
message=f"Rate limited. Ingested {ingested_count} records. Resume available.",
|
|
)
|
|
|
|
return IngestionResult(
|
|
symbol=ticker.symbol,
|
|
records_ingested=ingested_count,
|
|
last_date=last_ingested,
|
|
status="complete",
|
|
message=f"Successfully ingested {ingested_count} records",
|
|
)
|