"""Ingestion router: trigger data fetches from the market data provider. Provides both a single-source OHLCV endpoint and a comprehensive fetch-all endpoint that collects OHLCV + sentiment + fundamentals in one call with per-source status reporting. """ from __future__ import annotations import logging from datetime import date from fastapi import APIRouter, Depends, Query from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.dependencies import get_db, require_access from app.exceptions import ProviderError from app.models.ohlcv import OHLCVRecord from app.models.settings import IngestionProgress from app.models.ticker import Ticker from app.models.user import User from app.providers.alpaca import AlpacaOHLCVProvider from app.providers.fundamentals_chain import build_fundamental_provider_chain from app.providers.openai_sentiment import OpenAISentimentProvider from app.services.rr_scanner_service import scan_ticker from app.schemas.common import APIEnvelope from app.services import ( fundamental_service, ingestion_service, scoring_service, sentiment_service, sr_service, ) logger = logging.getLogger(__name__) router = APIRouter(tags=["ingestion"]) def _get_provider() -> AlpacaOHLCVProvider: """Build the OHLCV provider from current settings.""" if not settings.alpaca_api_key or not settings.alpaca_api_secret: raise ProviderError("Alpaca API credentials not configured") return AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret) @router.post("/ingestion/fetch/{symbol}", response_model=APIEnvelope) async def fetch_symbol( symbol: str, start_date: date | None = Query(None, description="Start date (YYYY-MM-DD)"), end_date: date | None = Query(None, description="End date (YYYY-MM-DD)"), force_refetch: bool = Query(False, description="Delete existing OHLCV data and re-fetch split-adjusted history"), _user: User = Depends(require_access), db: AsyncSession = Depends(get_db), ): """Fetch all data sources for a ticker: OHLCV, sentiment, and fundamentals. Returns a per-source breakdown so the frontend can show exactly what succeeded and what failed. """ symbol_upper = symbol.strip().upper() sources: dict[str, dict] = {} # If force_refetch is requested, clear old OHLCV and ingestion progress # so the backfill logic pulls a full year of fresh split-adjusted data. if force_refetch: try: result = await db.execute( select(Ticker).where(Ticker.symbol == symbol_upper) ) ticker_obj = result.scalar_one_or_none() if ticker_obj: await db.execute( delete(OHLCVRecord).where(OHLCVRecord.ticker_id == ticker_obj.id) ) await db.execute( delete(IngestionProgress).where(IngestionProgress.ticker_id == ticker_obj.id) ) await db.commit() logger.info("force_refetch: cleared OHLCV and progress for %s", symbol_upper) except Exception as exc: logger.error("force_refetch cleanup failed for %s: %s", symbol_upper, exc) # --- OHLCV --- try: provider = _get_provider() result = await ingestion_service.fetch_and_ingest( db, provider, symbol_upper, start_date, end_date ) sources["ohlcv"] = { "status": "ok" if result.status in ("complete", "partial") else "error", "records": result.records_ingested, "message": result.message, } except Exception as exc: logger.error("OHLCV fetch failed for %s: %s", symbol_upper, exc) sources["ohlcv"] = {"status": "error", "records": 0, "message": str(exc)} # --- Sentiment --- if settings.openai_api_key: try: sent_provider = OpenAISentimentProvider( settings.openai_api_key, settings.openai_model ) data = await sent_provider.fetch_sentiment(symbol_upper) await sentiment_service.store_sentiment( db, symbol=symbol_upper, classification=data.classification, confidence=data.confidence, source=data.source, timestamp=data.timestamp, reasoning=data.reasoning, citations=data.citations, ) sources["sentiment"] = { "status": "ok", "classification": data.classification, "confidence": data.confidence, "message": None, } except Exception as exc: logger.error("Sentiment fetch failed for %s: %s", symbol_upper, exc) sources["sentiment"] = {"status": "error", "message": str(exc)} else: sources["sentiment"] = { "status": "skipped", "message": "OpenAI API key not configured", } # --- Fundamentals --- if settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key: try: fundamentals_provider = build_fundamental_provider_chain() fdata = await fundamentals_provider.fetch_fundamentals(symbol_upper) await fundamental_service.store_fundamental( db, symbol=symbol_upper, pe_ratio=fdata.pe_ratio, revenue_growth=fdata.revenue_growth, earnings_surprise=fdata.earnings_surprise, market_cap=fdata.market_cap, unavailable_fields=fdata.unavailable_fields, ) sources["fundamentals"] = {"status": "ok", "message": None} except Exception as exc: logger.error("Fundamentals fetch failed for %s: %s", symbol_upper, exc) sources["fundamentals"] = {"status": "error", "message": str(exc)} else: sources["fundamentals"] = { "status": "skipped", "message": "No fundamentals provider key configured", } # --- Derived pipeline: S/R levels --- try: levels = await sr_service.recalculate_sr_levels(db, symbol_upper) sources["sr_levels"] = { "status": "ok", "count": len(levels), "message": None, } except Exception as exc: logger.error("S/R recalc failed for %s: %s", symbol_upper, exc) sources["sr_levels"] = {"status": "error", "message": str(exc)} # --- Derived pipeline: scores --- try: score_payload = await scoring_service.get_score(db, symbol_upper) sources["scores"] = { "status": "ok", "composite_score": score_payload.get("composite_score"), "missing_dimensions": score_payload.get("missing_dimensions", []), "message": None, } except Exception as exc: logger.error("Score recompute failed for %s: %s", symbol_upper, exc) sources["scores"] = {"status": "error", "message": str(exc)} # --- Derived pipeline: scanner --- try: setups = await scan_ticker( db, symbol_upper, rr_threshold=settings.default_rr_threshold, ) sources["scanner"] = { "status": "ok", "setups_found": len(setups), "message": None, } except Exception as exc: logger.error("Scanner run failed for %s: %s", symbol_upper, exc) sources["scanner"] = {"status": "error", "message": str(exc)} # Always return success — per-source breakdown tells the full story return APIEnvelope( status="success", data={"symbol": symbol_upper, "sources": sources}, error=None, )