Files
signal-platform/app/routers/ingestion.py
T
dennisthiessen f24e5070ee
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 35s
Deploy / deploy (push) Successful in 23s
fix bulk fundamentals: rate limits masked by partial FMP success
Root cause of "price plan needed in bulk but fine on manual reload": on free
tiers FMP returns only market cap (others 402) and the chain merged that as a
partial success — so when the Finnhub/Alpha Vantage fallbacks were rate-limited
during a bulk run, the chain silently returned market-cap-only and the
collector's backoff never engaged. Manual single fetches worked because the
fallbacks weren't throttled at that moment.

Fixes:
- Chain distinguishes RateLimitError from other failures: if a fallback is
  rate-limited and fields are still missing, raise RateLimitError (unless
  allow_partial=True) so the collector backs off and retries.
- Bulk job paces requests (fundamental_request_spacing_seconds, default 3s) to
  stay under Finnhub's ~60/min, and on retry-exhaustion stores partial data and
  continues instead of aborting the whole run.
- Manual fetch passes allow_partial=True so a lone 429 doesn't fail the refresh.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 21:18:32 +02:00

238 lines
9.6 KiB
Python

"""Ingestion router: trigger data fetches from the market data provider.
Provides both a single-source OHLCV endpoint and a comprehensive
fetch-all endpoint that collects OHLCV + sentiment + fundamentals
in one call with per-source status reporting.
"""
from __future__ import annotations
import logging
from datetime import date
from fastapi import APIRouter, Depends, Query
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.dependencies import get_db, require_access
from app.exceptions import ProviderError
from app.models.ohlcv import OHLCVRecord
from app.models.settings import IngestionProgress
from app.models.ticker import Ticker
from app.models.user import User
from app.providers.alpaca import AlpacaOHLCVProvider
from app.providers.fundamentals_chain import build_fundamental_provider_chain
from app.services.rr_scanner_service import scan_ticker
from app.services.sentiment_provider_service import build_sentiment_provider
from app.schemas.common import APIEnvelope
from app.services import (
fundamental_service,
ingestion_service,
scoring_service,
sentiment_service,
sr_service,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["ingestion"])
_PROVIDER_SOURCES = {"ohlcv", "sentiment", "fundamentals"}
def _get_provider() -> AlpacaOHLCVProvider:
"""Build the OHLCV provider from current settings."""
if not settings.alpaca_api_key or not settings.alpaca_api_secret:
raise ProviderError("Alpaca API credentials not configured")
return AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret)
def _parse_requested_sources(sources: str | None) -> set[str]:
"""Which provider sources to fetch. None/'all' → every provider source.
Anything else is parsed as a comma list; an empty/recompute-only request
fetches no providers (just refreshes the free derived pipeline).
"""
if sources is None:
return set(_PROVIDER_SOURCES)
parts = {p.strip().lower() for p in sources.split(",") if p.strip()}
if "all" in parts:
return set(_PROVIDER_SOURCES)
return parts & _PROVIDER_SOURCES
@router.post("/ingestion/fetch/{symbol}", response_model=APIEnvelope)
async def fetch_symbol(
symbol: str,
start_date: date | None = Query(None, description="Start date (YYYY-MM-DD)"),
end_date: date | None = Query(None, description="End date (YYYY-MM-DD)"),
force_refetch: bool = Query(False, description="Delete existing OHLCV data and re-fetch split-adjusted history"),
sources: str | None = Query(
None,
description="Comma list of provider sources to fetch (ohlcv,sentiment,fundamentals). "
"Omit for all. The derived pipeline (S/R, scores, scanner) always recomputes — it's free.",
),
_user: User = Depends(require_access),
db: AsyncSession = Depends(get_db),
):
"""Fetch selected data sources for a ticker, then recompute derived data.
Provider calls (which may cost money/quota — sentiment especially) are
limited to ``sources``; the free derived pipeline (S/R, scores, scanner)
always runs so everything stays consistent. Returns a per-source breakdown.
"""
symbol_upper = symbol.strip().upper()
requested = _parse_requested_sources(sources)
sources_out: dict[str, dict] = {}
# If force_refetch is requested, clear old OHLCV and ingestion progress
# so the backfill logic pulls a full year of fresh split-adjusted data.
if force_refetch and "ohlcv" in requested:
try:
result = await db.execute(
select(Ticker).where(Ticker.symbol == symbol_upper)
)
ticker_obj = result.scalar_one_or_none()
if ticker_obj:
await db.execute(
delete(OHLCVRecord).where(OHLCVRecord.ticker_id == ticker_obj.id)
)
await db.execute(
delete(IngestionProgress).where(IngestionProgress.ticker_id == ticker_obj.id)
)
await db.commit()
logger.info("force_refetch: cleared OHLCV and progress for %s", symbol_upper)
except Exception as exc:
logger.error("force_refetch cleanup failed for %s: %s", symbol_upper, exc)
# --- OHLCV ---
if "ohlcv" in requested:
try:
provider = _get_provider()
result = await ingestion_service.fetch_and_ingest(
db, provider, symbol_upper, start_date, end_date
)
sources_out["ohlcv"] = {
"status": "ok" if result.status in ("complete", "partial") else "error",
"records": result.records_ingested,
"message": result.message,
}
except Exception as exc:
logger.error("OHLCV fetch failed for %s: %s", symbol_upper, exc)
sources_out["ohlcv"] = {"status": "error", "records": 0, "message": str(exc)}
# --- Sentiment ---
if "sentiment" in requested:
try:
sent_provider = await build_sentiment_provider(db)
except ProviderError as exc:
sent_provider = None
sources_out["sentiment"] = {"status": "skipped", "message": str(exc)}
if sent_provider is not None:
try:
data = await sent_provider.fetch_sentiment(symbol_upper)
await sentiment_service.store_sentiment(
db,
symbol=symbol_upper,
classification=data.classification,
confidence=data.confidence,
source=data.source,
timestamp=data.timestamp,
reasoning=data.reasoning,
citations=data.citations,
)
sources_out["sentiment"] = {
"status": "ok",
"classification": data.classification,
"confidence": data.confidence,
"message": None,
}
except Exception as exc:
logger.error("Sentiment fetch failed for %s: %s", symbol_upper, exc)
sources_out["sentiment"] = {"status": "error", "message": str(exc)}
# --- Fundamentals ---
if "fundamentals" in requested:
if settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key:
try:
fundamentals_provider = build_fundamental_provider_chain()
# Manual single fetch: take whatever we can get (a lone 429 on a
# fallback shouldn't fail the whole refresh).
fdata = await fundamentals_provider.fetch_fundamentals(
symbol_upper, allow_partial=True
)
await fundamental_service.store_fundamental(
db,
symbol=symbol_upper,
pe_ratio=fdata.pe_ratio,
revenue_growth=fdata.revenue_growth,
earnings_surprise=fdata.earnings_surprise,
market_cap=fdata.market_cap,
unavailable_fields=fdata.unavailable_fields,
)
sources_out["fundamentals"] = {"status": "ok", "message": None}
except Exception as exc:
logger.error("Fundamentals fetch failed for %s: %s", symbol_upper, exc)
sources_out["fundamentals"] = {"status": "error", "message": str(exc)}
else:
sources_out["fundamentals"] = {
"status": "skipped",
"message": "No fundamentals provider key configured",
}
# --- Derived pipeline: S/R levels (free, always) ---
try:
levels = await sr_service.recalculate_sr_levels(db, symbol_upper)
sources_out["sr_levels"] = {
"status": "ok",
"count": len(levels),
"message": None,
}
except Exception as exc:
logger.error("S/R recalc failed for %s: %s", symbol_upper, exc)
sources_out["sr_levels"] = {"status": "error", "message": str(exc)}
# --- Derived pipeline: scores (free, always) ---
# Force a full recompute — fetched data doesn't mark old scores stale, so
# get_score alone would keep returning the previously computed values.
try:
await scoring_service.compute_all_dimensions(db, symbol_upper)
await scoring_service.compute_composite_score(db, symbol_upper)
await db.commit()
score_payload = await scoring_service.get_score(db, symbol_upper)
sources_out["scores"] = {
"status": "ok",
"composite_score": score_payload.get("composite_score"),
"missing_dimensions": score_payload.get("missing_dimensions", []),
"message": None,
}
except Exception as exc:
logger.error("Score recompute failed for %s: %s", symbol_upper, exc)
sources_out["scores"] = {"status": "error", "message": str(exc)}
# --- Derived pipeline: scanner (free, always) ---
try:
setups = await scan_ticker(
db,
symbol_upper,
rr_threshold=settings.default_rr_threshold,
)
sources_out["scanner"] = {
"status": "ok",
"setups_found": len(setups),
"message": None,
}
except Exception as exc:
logger.error("Scanner run failed for %s: %s", symbol_upper, exc)
sources_out["scanner"] = {"status": "error", "message": str(exc)}
# Always return success — per-source breakdown tells the full story
return APIEnvelope(
status="success",
data={"symbol": symbol_upper, "sources": sources_out},
error=None,
)