"""Ingestion Pipeline service: fetch from provider, validate, upsert into Price Store. Handles rate-limit resume via IngestionProgress and provider error isolation. """ from __future__ import annotations import logging from dataclasses import dataclass from datetime import date, timedelta from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.exceptions import NotFoundError, ProviderError, RateLimitError from app.models.ohlcv import OHLCVRecord from app.models.settings import IngestionProgress from app.models.ticker import Ticker from app.providers.protocol import MarketDataProvider from app.services import price_service logger = logging.getLogger(__name__) @dataclass class IngestionResult: """Result of an ingestion run.""" symbol: str records_ingested: int last_date: date | None status: str # "complete" | "partial" | "error" message: str | None = None async def _get_ticker(db: AsyncSession, symbol: str) -> Ticker: """Look up ticker by symbol. Raises NotFoundError if missing.""" normalised = symbol.strip().upper() result = await db.execute(select(Ticker).where(Ticker.symbol == normalised)) ticker = result.scalar_one_or_none() if ticker is None: raise NotFoundError(f"Ticker not found: {normalised}") return ticker async def _get_progress(db: AsyncSession, ticker_id: int) -> IngestionProgress | None: """Get the IngestionProgress record for a ticker, if any.""" result = await db.execute( select(IngestionProgress).where(IngestionProgress.ticker_id == ticker_id) ) return result.scalar_one_or_none() async def _get_ohlcv_bar_count(db: AsyncSession, ticker_id: int) -> int: result = await db.execute( select(func.count()).select_from(OHLCVRecord).where(OHLCVRecord.ticker_id == ticker_id) ) return int(result.scalar() or 0) async def _update_progress( db: AsyncSession, ticker_id: int, last_date: date ) -> None: """Create or update the IngestionProgress record for a ticker.""" progress = await _get_progress(db, ticker_id) if progress is None: progress = IngestionProgress(ticker_id=ticker_id, last_ingested_date=last_date) db.add(progress) else: progress.last_ingested_date = last_date await db.commit() async def fetch_and_ingest( db: AsyncSession, provider: MarketDataProvider, symbol: str, start_date: date | None = None, end_date: date | None = None, ) -> IngestionResult: """Fetch OHLCV data from provider and upsert into Price Store. - Resolves start_date from IngestionProgress if not provided (resume). - Defaults end_date to today. - Tracks last_ingested_date after each successful upsert. - On RateLimitError from provider: returns partial progress. - On ProviderError: returns error, no data modification. """ ticker = await _get_ticker(db, symbol) # Resolve end_date if end_date is None: end_date = date.today() # Resolve start_date: use progress resume or default to 1 year ago. # If we have too little history, force a one-year backfill even if # ingestion progress exists (upsert makes this safe and idempotent). if start_date is None: progress = await _get_progress(db, ticker.id) bar_count = await _get_ohlcv_bar_count(db, ticker.id) minimum_backfill_bars = 200 if bar_count < minimum_backfill_bars: start_date = end_date - timedelta(days=365) elif progress is not None: start_date = progress.last_ingested_date + timedelta(days=1) else: start_date = end_date - timedelta(days=365) # If start > end, nothing to fetch if start_date > end_date: return IngestionResult( symbol=ticker.symbol, records_ingested=0, last_date=None, status="complete", message="Already up to date", ) # Fetch from provider try: records = await provider.fetch_ohlcv(ticker.symbol, start_date, end_date) except RateLimitError: # No data fetched at all — return partial with 0 records return IngestionResult( symbol=ticker.symbol, records_ingested=0, last_date=None, status="partial", message="Rate limited before any records fetched. Resume available.", ) except ProviderError as exc: logger.error("Provider error for %s: %s", ticker.symbol, exc) return IngestionResult( symbol=ticker.symbol, records_ingested=0, last_date=None, status="error", message=str(exc), ) # Sort records by date to ensure ordered ingestion records.sort(key=lambda r: r.date) ingested_count = 0 last_ingested: date | None = None for record in records: try: await price_service.upsert_ohlcv( db, symbol=ticker.symbol, record_date=record.date, open_=record.open, high=record.high, low=record.low, close=record.close, volume=record.volume, ) ingested_count += 1 last_ingested = record.date # Update progress after each successful upsert await _update_progress(db, ticker.id, record.date) except RateLimitError: # Mid-ingestion rate limit — return partial progress logger.warning( "Rate limited during ingestion for %s after %d records", ticker.symbol, ingested_count, ) return IngestionResult( symbol=ticker.symbol, records_ingested=ingested_count, last_date=last_ingested, status="partial", message=f"Rate limited. Ingested {ingested_count} records. Resume available.", ) return IngestionResult( symbol=ticker.symbol, records_ingested=ingested_count, last_date=last_ingested, status="complete", message=f"Successfully ingested {ingested_count} records", )