Files
signal-platform/app/services/ingestion_service.py
T
dennisthiessen 20a1c143f3
Deploy / lint (push) Successful in 8s
Deploy / test (push) Successful in 1m25s
Deploy / deploy (push) Successful in 46s
fix: surface empty OHLCV fetch as a warning, not success
Fetching a symbol the provider doesn't cover (e.g. RHM/Rheinmetall — Alpaca
serves US listings only) returned 0 bars but reported "complete · Successfully
ingested 0 records", which the UI showed as green success.

fetch_and_ingest now returns a distinct `no_data` status when the provider
returns nothing AND the ticker has no history (vs. "already up to date" when bars
exist). The fetch endpoint maps it to a `warning` source status, and the fetch
toast renders it as ⚠ with the provider message instead of success.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-28 19:27:41 +02:00

217 lines
7.3 KiB
Python

"""Ingestion Pipeline service: fetch from provider, validate, upsert into Price Store.
Handles rate-limit resume via IngestionProgress and provider error isolation.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import date, timedelta
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.exceptions import NotFoundError, ProviderError, RateLimitError
from app.models.ohlcv import OHLCVRecord
from app.models.settings import IngestionProgress
from app.models.ticker import Ticker
from app.providers.protocol import MarketDataProvider
from app.services import price_service
logger = logging.getLogger(__name__)
@dataclass
class IngestionResult:
"""Result of an ingestion run."""
symbol: str
records_ingested: int
last_date: date | None
status: str # "complete" | "partial" | "error" | "no_data"
message: str | None = None
async def _get_ticker(db: AsyncSession, symbol: str) -> Ticker:
"""Look up ticker by symbol. Raises NotFoundError if missing."""
normalised = symbol.strip().upper()
result = await db.execute(select(Ticker).where(Ticker.symbol == normalised))
ticker = result.scalar_one_or_none()
if ticker is None:
raise NotFoundError(f"Ticker not found: {normalised}")
return ticker
async def _get_progress(db: AsyncSession, ticker_id: int) -> IngestionProgress | None:
"""Get the IngestionProgress record for a ticker, if any."""
result = await db.execute(
select(IngestionProgress).where(IngestionProgress.ticker_id == ticker_id)
)
return result.scalar_one_or_none()
async def _get_ohlcv_bar_count(db: AsyncSession, ticker_id: int) -> int:
result = await db.execute(
select(func.count()).select_from(OHLCVRecord).where(OHLCVRecord.ticker_id == ticker_id)
)
return int(result.scalar() or 0)
async def _update_progress(
db: AsyncSession, ticker_id: int, last_date: date
) -> None:
"""Create or update the IngestionProgress record for a ticker."""
progress = await _get_progress(db, ticker_id)
if progress is None:
progress = IngestionProgress(ticker_id=ticker_id, last_ingested_date=last_date)
db.add(progress)
else:
progress.last_ingested_date = last_date
await db.commit()
async def fetch_and_ingest(
db: AsyncSession,
provider: MarketDataProvider,
symbol: str,
start_date: date | None = None,
end_date: date | None = None,
) -> IngestionResult:
"""Fetch OHLCV data from provider and upsert into Price Store.
- Resolves start_date from IngestionProgress if not provided (resume).
- Defaults end_date to today.
- Tracks last_ingested_date after each successful upsert.
- On RateLimitError from provider: returns partial progress.
- On ProviderError: returns error, no data modification.
"""
ticker = await _get_ticker(db, symbol)
# Resolve end_date
if end_date is None:
end_date = date.today()
# Resolve start_date: use progress resume or backfill the configured history
# window. If we have too little history, force a full backfill even if
# ingestion progress exists (upsert makes this safe and idempotent). A caller
# that passes an explicit start_date (e.g. the manual deep-backfill job)
# bypasses this entirely.
if start_date is None:
progress = await _get_progress(db, ticker.id)
bar_count = await _get_ohlcv_bar_count(db, ticker.id)
minimum_backfill_bars = 200
backfill_start = end_date - timedelta(days=settings.ohlcv_history_days)
if bar_count < minimum_backfill_bars:
start_date = backfill_start
elif progress is not None:
start_date = progress.last_ingested_date + timedelta(days=1)
else:
start_date = backfill_start
# If start > end, nothing to fetch
if start_date > end_date:
return IngestionResult(
symbol=ticker.symbol,
records_ingested=0,
last_date=None,
status="complete",
message="Already up to date",
)
# Fetch from provider
try:
records = await provider.fetch_ohlcv(ticker.symbol, start_date, end_date)
except RateLimitError:
# No data fetched at all — return partial with 0 records
return IngestionResult(
symbol=ticker.symbol,
records_ingested=0,
last_date=None,
status="partial",
message="Rate limited before any records fetched. Resume available.",
)
except ProviderError as exc:
logger.error("Provider error for %s: %s", ticker.symbol, exc)
return IngestionResult(
symbol=ticker.symbol,
records_ingested=0,
last_date=None,
status="error",
message=str(exc),
)
# Provider returned nothing. With no history at all this almost always means
# the provider doesn't cover this symbol (Alpaca = US listings only) — surface
# that instead of a misleading "success". With existing bars it just means
# there were no new bars in the requested window.
if not records:
existing = await _get_ohlcv_bar_count(db, ticker.id)
if existing == 0:
return IngestionResult(
symbol=ticker.symbol,
records_ingested=0,
last_date=None,
status="no_data",
message=(
"No data returned by the provider — it may not cover this symbol "
"(Alpaca serves US-listed securities only)."
),
)
return IngestionResult(
symbol=ticker.symbol,
records_ingested=0,
last_date=None,
status="complete",
message="Already up to date — no new bars.",
)
# Sort records by date to ensure ordered ingestion
records.sort(key=lambda r: r.date)
ingested_count = 0
last_ingested: date | None = None
for record in records:
try:
await price_service.upsert_ohlcv(
db,
symbol=ticker.symbol,
record_date=record.date,
open_=record.open,
high=record.high,
low=record.low,
close=record.close,
volume=record.volume,
)
ingested_count += 1
last_ingested = record.date
# Update progress after each successful upsert
await _update_progress(db, ticker.id, record.date)
except RateLimitError:
# Mid-ingestion rate limit — return partial progress
logger.warning(
"Rate limited during ingestion for %s after %d records",
ticker.symbol,
ingested_count,
)
return IngestionResult(
symbol=ticker.symbol,
records_ingested=ingested_count,
last_date=last_ingested,
status="partial",
message=f"Rate limited. Ingested {ingested_count} records. Resume available.",
)
return IngestionResult(
symbol=ticker.symbol,
records_ingested=ingested_count,
last_date=last_ingested,
status="complete",
message=f"Successfully ingested {ingested_count} records",
)