diff --git a/app/main.py b/app/main.py index 612e24b..62a2822 100644 --- a/app/main.py +++ b/app/main.py @@ -82,6 +82,7 @@ from app.routers.watchlist import router as watchlist_router from app.routers.sentiment import router as sentiment_router from app.routers.sr_levels import router as sr_levels_router from app.routers.tickers import router as tickers_router +from app.routers.jobs import router as jobs_router def _configure_logging() -> None: @@ -158,3 +159,4 @@ app.include_router(fundamentals_router, prefix="/api/v1") app.include_router(scores_router, prefix="/api/v1") app.include_router(trades_router, prefix="/api/v1") app.include_router(watchlist_router, prefix="/api/v1") +app.include_router(jobs_router, prefix="/api/v1") diff --git a/app/routers/ingestion.py b/app/routers/ingestion.py index 56426fc..304f2f6 100644 --- a/app/routers/ingestion.py +++ b/app/routers/ingestion.py @@ -39,6 +39,9 @@ logger = logging.getLogger(__name__) router = APIRouter(tags=["ingestion"]) +_PROVIDER_SOURCES = {"ohlcv", "sentiment", "fundamentals"} + + def _get_provider() -> AlpacaOHLCVProvider: """Build the OHLCV provider from current settings.""" if not settings.alpaca_api_key or not settings.alpaca_api_secret: @@ -46,26 +49,47 @@ def _get_provider() -> AlpacaOHLCVProvider: return AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret) +def _parse_requested_sources(sources: str | None) -> set[str]: + """Which provider sources to fetch. None/'all' → every provider source. + + Anything else is parsed as a comma list; an empty/recompute-only request + fetches no providers (just refreshes the free derived pipeline). + """ + if sources is None: + return set(_PROVIDER_SOURCES) + parts = {p.strip().lower() for p in sources.split(",") if p.strip()} + if "all" in parts: + return set(_PROVIDER_SOURCES) + return parts & _PROVIDER_SOURCES + + @router.post("/ingestion/fetch/{symbol}", response_model=APIEnvelope) async def fetch_symbol( symbol: str, start_date: date | None = Query(None, description="Start date (YYYY-MM-DD)"), end_date: date | None = Query(None, description="End date (YYYY-MM-DD)"), force_refetch: bool = Query(False, description="Delete existing OHLCV data and re-fetch split-adjusted history"), + sources: str | None = Query( + None, + description="Comma list of provider sources to fetch (ohlcv,sentiment,fundamentals). " + "Omit for all. The derived pipeline (S/R, scores, scanner) always recomputes — it's free.", + ), _user: User = Depends(require_access), db: AsyncSession = Depends(get_db), ): - """Fetch all data sources for a ticker: OHLCV, sentiment, and fundamentals. + """Fetch selected data sources for a ticker, then recompute derived data. - Returns a per-source breakdown so the frontend can show exactly what - succeeded and what failed. + Provider calls (which may cost money/quota — sentiment especially) are + limited to ``sources``; the free derived pipeline (S/R, scores, scanner) + always runs so everything stays consistent. Returns a per-source breakdown. """ symbol_upper = symbol.strip().upper() - sources: dict[str, dict] = {} + requested = _parse_requested_sources(sources) + sources_out: dict[str, dict] = {} # If force_refetch is requested, clear old OHLCV and ingestion progress # so the backfill logic pulls a full year of fresh split-adjusted data. - if force_refetch: + if force_refetch and "ohlcv" in requested: try: result = await db.execute( select(Ticker).where(Ticker.symbol == symbol_upper) @@ -84,90 +108,98 @@ async def fetch_symbol( logger.error("force_refetch cleanup failed for %s: %s", symbol_upper, exc) # --- OHLCV --- - try: - provider = _get_provider() - result = await ingestion_service.fetch_and_ingest( - db, provider, symbol_upper, start_date, end_date - ) - sources["ohlcv"] = { - "status": "ok" if result.status in ("complete", "partial") else "error", - "records": result.records_ingested, - "message": result.message, - } - except Exception as exc: - logger.error("OHLCV fetch failed for %s: %s", symbol_upper, exc) - sources["ohlcv"] = {"status": "error", "records": 0, "message": str(exc)} - - # --- Sentiment --- - try: - sent_provider = await build_sentiment_provider(db) - except ProviderError as exc: - sent_provider = None - sources["sentiment"] = {"status": "skipped", "message": str(exc)} - - if sent_provider is not None: + if "ohlcv" in requested: try: - data = await sent_provider.fetch_sentiment(symbol_upper) - await sentiment_service.store_sentiment( - db, - symbol=symbol_upper, - classification=data.classification, - confidence=data.confidence, - source=data.source, - timestamp=data.timestamp, - reasoning=data.reasoning, - citations=data.citations, + provider = _get_provider() + result = await ingestion_service.fetch_and_ingest( + db, provider, symbol_upper, start_date, end_date ) - sources["sentiment"] = { - "status": "ok", - "classification": data.classification, - "confidence": data.confidence, - "message": None, + sources_out["ohlcv"] = { + "status": "ok" if result.status in ("complete", "partial") else "error", + "records": result.records_ingested, + "message": result.message, } except Exception as exc: - logger.error("Sentiment fetch failed for %s: %s", symbol_upper, exc) - sources["sentiment"] = {"status": "error", "message": str(exc)} + logger.error("OHLCV fetch failed for %s: %s", symbol_upper, exc) + sources_out["ohlcv"] = {"status": "error", "records": 0, "message": str(exc)} + + # --- Sentiment --- + if "sentiment" in requested: + try: + sent_provider = await build_sentiment_provider(db) + except ProviderError as exc: + sent_provider = None + sources_out["sentiment"] = {"status": "skipped", "message": str(exc)} + + if sent_provider is not None: + try: + data = await sent_provider.fetch_sentiment(symbol_upper) + await sentiment_service.store_sentiment( + db, + symbol=symbol_upper, + classification=data.classification, + confidence=data.confidence, + source=data.source, + timestamp=data.timestamp, + reasoning=data.reasoning, + citations=data.citations, + ) + sources_out["sentiment"] = { + "status": "ok", + "classification": data.classification, + "confidence": data.confidence, + "message": None, + } + except Exception as exc: + logger.error("Sentiment fetch failed for %s: %s", symbol_upper, exc) + sources_out["sentiment"] = {"status": "error", "message": str(exc)} # --- Fundamentals --- - if settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key: - try: - fundamentals_provider = build_fundamental_provider_chain() - fdata = await fundamentals_provider.fetch_fundamentals(symbol_upper) - await fundamental_service.store_fundamental( - db, - symbol=symbol_upper, - pe_ratio=fdata.pe_ratio, - revenue_growth=fdata.revenue_growth, - earnings_surprise=fdata.earnings_surprise, - market_cap=fdata.market_cap, - unavailable_fields=fdata.unavailable_fields, - ) - sources["fundamentals"] = {"status": "ok", "message": None} - except Exception as exc: - logger.error("Fundamentals fetch failed for %s: %s", symbol_upper, exc) - sources["fundamentals"] = {"status": "error", "message": str(exc)} - else: - sources["fundamentals"] = { - "status": "skipped", - "message": "No fundamentals provider key configured", - } + if "fundamentals" in requested: + if settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key: + try: + fundamentals_provider = build_fundamental_provider_chain() + fdata = await fundamentals_provider.fetch_fundamentals(symbol_upper) + await fundamental_service.store_fundamental( + db, + symbol=symbol_upper, + pe_ratio=fdata.pe_ratio, + revenue_growth=fdata.revenue_growth, + earnings_surprise=fdata.earnings_surprise, + market_cap=fdata.market_cap, + unavailable_fields=fdata.unavailable_fields, + ) + sources_out["fundamentals"] = {"status": "ok", "message": None} + except Exception as exc: + logger.error("Fundamentals fetch failed for %s: %s", symbol_upper, exc) + sources_out["fundamentals"] = {"status": "error", "message": str(exc)} + else: + sources_out["fundamentals"] = { + "status": "skipped", + "message": "No fundamentals provider key configured", + } - # --- Derived pipeline: S/R levels --- + # --- Derived pipeline: S/R levels (free, always) --- try: levels = await sr_service.recalculate_sr_levels(db, symbol_upper) - sources["sr_levels"] = { + sources_out["sr_levels"] = { "status": "ok", "count": len(levels), "message": None, } except Exception as exc: logger.error("S/R recalc failed for %s: %s", symbol_upper, exc) - sources["sr_levels"] = {"status": "error", "message": str(exc)} + sources_out["sr_levels"] = {"status": "error", "message": str(exc)} - # --- Derived pipeline: scores --- + # --- Derived pipeline: scores (free, always) --- + # Force a full recompute — fetched data doesn't mark old scores stale, so + # get_score alone would keep returning the previously computed values. try: + await scoring_service.compute_all_dimensions(db, symbol_upper) + await scoring_service.compute_composite_score(db, symbol_upper) + await db.commit() score_payload = await scoring_service.get_score(db, symbol_upper) - sources["scores"] = { + sources_out["scores"] = { "status": "ok", "composite_score": score_payload.get("composite_score"), "missing_dimensions": score_payload.get("missing_dimensions", []), @@ -175,27 +207,27 @@ async def fetch_symbol( } except Exception as exc: logger.error("Score recompute failed for %s: %s", symbol_upper, exc) - sources["scores"] = {"status": "error", "message": str(exc)} + sources_out["scores"] = {"status": "error", "message": str(exc)} - # --- Derived pipeline: scanner --- + # --- Derived pipeline: scanner (free, always) --- try: setups = await scan_ticker( db, symbol_upper, rr_threshold=settings.default_rr_threshold, ) - sources["scanner"] = { + sources_out["scanner"] = { "status": "ok", "setups_found": len(setups), "message": None, } except Exception as exc: logger.error("Scanner run failed for %s: %s", symbol_upper, exc) - sources["scanner"] = {"status": "error", "message": str(exc)} + sources_out["scanner"] = {"status": "error", "message": str(exc)} # Always return success — per-source breakdown tells the full story return APIEnvelope( status="success", - data={"symbol": symbol_upper, "sources": sources}, + data={"symbol": symbol_upper, "sources": sources_out}, error=None, ) diff --git a/app/routers/jobs.py b/app/routers/jobs.py new file mode 100644 index 0000000..adc141c --- /dev/null +++ b/app/routers/jobs.py @@ -0,0 +1,37 @@ +"""Lightweight job status for any authenticated user. + +The admin Jobs page has full control; this exposes only which scheduled jobs +are currently running (name + progress) so the UI can show a live activity +indicator without admin rights. +""" + +from __future__ import annotations + +from fastapi import APIRouter, Depends + +from app.dependencies import require_access +from app.schemas.common import APIEnvelope +from app.services.admin_service import JOB_LABELS + +router = APIRouter(tags=["jobs"]) + + +@router.get("/jobs/running", response_model=APIEnvelope) +async def list_running_jobs(_user=Depends(require_access)) -> APIEnvelope: + """Return scheduled jobs that are currently running, with progress.""" + from app.scheduler import get_job_runtime_snapshot + + snapshot = get_job_runtime_snapshot() + running = [] + for name, meta in snapshot.items(): + if meta.get("running"): + running.append({ + "name": name, + "label": JOB_LABELS.get(name, name), + "progress_pct": meta.get("progress_pct"), + "processed": meta.get("processed"), + "total": meta.get("total"), + "current_ticker": meta.get("current_ticker"), + }) + running.sort(key=lambda j: j["name"]) + return APIEnvelope(status="success", data={"running": running}) diff --git a/app/services/rr_scanner_service.py b/app/services/rr_scanner_service.py index 98d9bd1..95e3a83 100644 --- a/app/services/rr_scanner_service.py +++ b/app/services/rr_scanner_service.py @@ -244,6 +244,18 @@ async def scan_all_tickers( all_setups: list[TradeSetup] = [] for ticker in tickers: try: + # Refresh scores first so the scheduled scan works off current data. + # Nothing else marks scores stale, so without this they'd never + # update for tickers the user doesn't manually fetch. + try: + from app.services import scoring_service + + await scoring_service.compute_all_dimensions(db, ticker.symbol) + await scoring_service.compute_composite_score(db, ticker.symbol) + await db.commit() + except Exception: + logger.exception("Error refreshing scores for %s", ticker.symbol) + setups = await scan_ticker( db, ticker.symbol, rr_threshold, atr_multiplier ) diff --git a/frontend/src/api/ingestion.ts b/frontend/src/api/ingestion.ts index 1878e9e..6fc6410 100644 --- a/frontend/src/api/ingestion.ts +++ b/frontend/src/api/ingestion.ts @@ -13,8 +13,17 @@ export interface FetchDataResult { sources: Record; } -export function fetchData(symbol: string) { +/** Provider sources that cost an API call/quota. */ +export type FetchSource = 'ohlcv' | 'sentiment' | 'fundamentals'; +/** Source selector: omit → fetch all; array → those providers; 'recompute' → derived only (free). */ +export type FetchSelector = FetchSource[] | 'recompute'; + +export function fetchData(symbol: string, sources?: FetchSelector) { + let sourcesParam: string | undefined; + if (sources === 'recompute') sourcesParam = 'recompute'; + else if (sources && sources.length) sourcesParam = sources.join(','); + const params = sourcesParam ? { sources: sourcesParam } : undefined; return apiClient - .post(`ingestion/fetch/${symbol}`) + .post(`ingestion/fetch/${symbol}`, null, { params }) .then((r) => r.data); } diff --git a/frontend/src/api/jobs.ts b/frontend/src/api/jobs.ts new file mode 100644 index 0000000..e413cf3 --- /dev/null +++ b/frontend/src/api/jobs.ts @@ -0,0 +1,16 @@ +import apiClient from './client'; + +export interface RunningJob { + name: string; + label: string; + progress_pct: number | null; + processed: number | null; + total: number | null; + current_ticker: string | null; +} + +export function getRunningJobs() { + return apiClient + .get<{ running: RunningJob[] }>('jobs/running') + .then((r) => r.data); +} diff --git a/frontend/src/components/layout/Sidebar.tsx b/frontend/src/components/layout/Sidebar.tsx index 858afab..5d7129d 100644 --- a/frontend/src/components/layout/Sidebar.tsx +++ b/frontend/src/components/layout/Sidebar.tsx @@ -2,6 +2,7 @@ import { NavLink } from 'react-router-dom'; import { useQuery } from '@tanstack/react-query'; import { useAuthStore } from '../../stores/authStore'; import { check as healthCheck } from '../../api/health'; +import { getRunningJobs } from '../../api/jobs'; const navItems = [ { to: '/', label: 'Overview', index: '01', end: true }, @@ -28,6 +29,16 @@ export default function Sidebar() { const isBackendUp = health.isSuccess; + const jobs = useQuery({ + queryKey: ['jobs', 'running'], + queryFn: getRunningJobs, + refetchInterval: 10_000, + retry: 1, + enabled: isBackendUp, + }); + + const running = jobs.data?.running ?? []; + return (