"""APScheduler job definitions and FastAPI lifespan integration. Defines four scheduled jobs: - Data Collector (OHLCV fetch for all tickers) - Sentiment Collector (sentiment for all tickers) - Fundamental Collector (fundamentals for all tickers) - R:R Scanner (trade setup scan for all tickers) Each job processes tickers independently, logs errors as structured JSON, handles rate limits by recording the last successful ticker, and checks SystemSetting for enabled/disabled state. """ from __future__ import annotations import json import logging import asyncio from datetime import date, datetime, timezone from apscheduler.schedulers.asyncio import AsyncIOScheduler from sqlalchemy import case, func, select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.database import async_session_factory from app.models.fundamental import FundamentalData from app.models.ohlcv import OHLCVRecord from app.models.settings import SystemSetting from app.models.sentiment import SentimentScore from app.models.ticker import Ticker from app.providers.alpaca import AlpacaOHLCVProvider from app.providers.fundamentals_chain import build_fundamental_provider_chain from app.providers.openai_sentiment import OpenAISentimentProvider from app.providers.protocol import SentimentData from app.services import fundamental_service, ingestion_service, sentiment_service from app.services.rr_scanner_service import scan_all_tickers from app.services.ticker_universe_service import bootstrap_universe logger = logging.getLogger(__name__) # Module-level scheduler instance scheduler = AsyncIOScheduler() # Track last successful ticker per job for rate-limit resume _last_successful: dict[str, str | None] = { "data_collector": None, "sentiment_collector": None, "fundamental_collector": None, } _job_runtime: dict[str, dict[str, object]] = { "data_collector": { "running": False, "status": "idle", "processed": 0, "total": None, "progress_pct": None, "current_ticker": None, "started_at": None, "finished_at": None, "message": None, }, "sentiment_collector": { "running": False, "status": "idle", "processed": 0, "total": None, "progress_pct": None, "current_ticker": None, "started_at": None, "finished_at": None, "message": None, }, "fundamental_collector": { "running": False, "status": "idle", "processed": 0, "total": None, "progress_pct": None, "current_ticker": None, "started_at": None, "finished_at": None, "message": None, }, "rr_scanner": { "running": False, "status": "idle", "processed": 0, "total": None, "progress_pct": None, "current_ticker": None, "started_at": None, "finished_at": None, "message": None, }, "ticker_universe_sync": { "running": False, "status": "idle", "processed": 0, "total": None, "progress_pct": None, "current_ticker": None, "started_at": None, "finished_at": None, "message": None, }, } # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _log_job_error(job_name: str, ticker: str, error: Exception) -> None: """Log a job error as structured JSON.""" logger.error( json.dumps({ "event": "job_error", "job": job_name, "ticker": ticker, "error_type": type(error).__name__, "message": str(error), }) ) def _runtime_start(job_name: str, total: int | None = None, message: str | None = None) -> None: now = datetime.now(timezone.utc).isoformat() _job_runtime[job_name] = { "running": True, "status": "running", "processed": 0, "total": total, "progress_pct": 0.0 if total and total > 0 else None, "current_ticker": None, "started_at": now, "finished_at": None, "message": message, } def _runtime_progress( job_name: str, processed: int, total: int | None, current_ticker: str | None = None, message: str | None = None, ) -> None: progress_pct: float | None = None if total and total > 0: progress_pct = round((processed / total) * 100.0, 1) runtime = _job_runtime.get(job_name, {}) runtime.update({ "running": True, "status": "running", "processed": processed, "total": total, "progress_pct": progress_pct, "current_ticker": current_ticker, "message": message, }) _job_runtime[job_name] = runtime def _runtime_finish( job_name: str, status: str, processed: int, total: int | None, message: str | None = None, ) -> None: runtime = _job_runtime.get(job_name, {}) runtime.update({ "running": False, "status": status, "processed": processed, "total": total, "progress_pct": 100.0 if total and processed >= total else runtime.get("progress_pct"), "current_ticker": None, "finished_at": datetime.now(timezone.utc).isoformat(), "message": message, }) _job_runtime[job_name] = runtime def get_job_runtime_snapshot(job_name: str | None = None) -> dict[str, dict[str, object]] | dict[str, object]: if job_name is not None: return dict(_job_runtime.get(job_name, {})) return {name: dict(meta) for name, meta in _job_runtime.items()} async def _is_job_enabled(db: AsyncSession, job_name: str) -> bool: """Check SystemSetting for job enabled state. Defaults to True.""" key = f"job_{job_name}_enabled" result = await db.execute( select(SystemSetting).where(SystemSetting.key == key) ) setting = result.scalar_one_or_none() if setting is None: return True return setting.value.lower() == "true" async def _get_all_tickers(db: AsyncSession) -> list[str]: """Return all tracked ticker symbols sorted alphabetically.""" result = await db.execute(select(Ticker.symbol).order_by(Ticker.symbol)) return list(result.scalars().all()) async def _get_ohlcv_priority_tickers(db: AsyncSession) -> list[str]: """Return symbols prioritized for OHLCV collection. Priority: 1) Tickers with no OHLCV bars 2) Tickers with data, oldest latest OHLCV date first 3) Alphabetical tiebreaker """ latest_date = func.max(OHLCVRecord.date) missing_first = case((latest_date.is_(None), 0), else_=1) result = await db.execute( select(Ticker.symbol) .outerjoin(OHLCVRecord, OHLCVRecord.ticker_id == Ticker.id) .group_by(Ticker.id, Ticker.symbol) .order_by(missing_first.asc(), latest_date.asc(), Ticker.symbol.asc()) ) return list(result.scalars().all()) async def _get_sentiment_priority_tickers(db: AsyncSession) -> list[str]: """Return symbols prioritized for sentiment collection. Priority: 1) Tickers with no sentiment records 2) Tickers with records, oldest latest sentiment timestamp first 3) Alphabetical tiebreaker """ latest_ts = func.max(SentimentScore.timestamp) missing_first = case((latest_ts.is_(None), 0), else_=1) result = await db.execute( select(Ticker.symbol) .outerjoin(SentimentScore, SentimentScore.ticker_id == Ticker.id) .group_by(Ticker.id, Ticker.symbol) .order_by(missing_first.asc(), latest_ts.asc(), Ticker.symbol.asc()) ) return list(result.scalars().all()) async def _get_fundamental_priority_tickers(db: AsyncSession) -> list[str]: """Return symbols prioritized for fundamentals refresh. Priority: 1) Tickers with no fundamentals snapshot yet 2) Tickers with existing fundamentals, oldest fetched_at first 3) Alphabetical tiebreaker """ missing_first = case((FundamentalData.fetched_at.is_(None), 0), else_=1) result = await db.execute( select(Ticker.symbol) .outerjoin(FundamentalData, FundamentalData.ticker_id == Ticker.id) .order_by(missing_first.asc(), FundamentalData.fetched_at.asc(), Ticker.symbol.asc()) ) return list(result.scalars().all()) def _resume_tickers(symbols: list[str], job_name: str) -> list[str]: """Reorder tickers to resume after the last successful one (rate-limit resume). If a previous run was rate-limited, start from the ticker after the last successful one. Otherwise return the full list. """ last = _last_successful.get(job_name) if last is None or last not in symbols: return symbols idx = symbols.index(last) # Start from the next ticker, then wrap around return symbols[idx + 1:] + symbols[:idx + 1] def _chunked(symbols: list[str], chunk_size: int) -> list[list[str]]: size = max(1, chunk_size) return [symbols[i:i + size] for i in range(0, len(symbols), size)] # --------------------------------------------------------------------------- # Job: Data Collector (OHLCV) # --------------------------------------------------------------------------- async def collect_ohlcv() -> None: """Fetch latest daily OHLCV for all tracked tickers. Uses AlpacaOHLCVProvider. Processes each ticker independently. On rate limit, records last successful ticker for resume. Start date is resolved by ingestion progress: - existing ticker: resume from last_ingested_date + 1 - new ticker: backfill ~1 year by default """ job_name = "data_collector" logger.info(json.dumps({"event": "job_start", "job": job_name})) _runtime_start(job_name) processed = 0 total: int | None = None try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") return symbols = await _get_ohlcv_priority_tickers(db) if not symbols: logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0})) _runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers") return total = len(symbols) _runtime_progress(job_name, processed=0, total=total) # Build provider (skip if keys not configured) if not settings.alpaca_api_key or not settings.alpaca_api_secret: logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "alpaca keys not configured"})) _runtime_finish(job_name, "skipped", processed=0, total=total, message="Alpaca keys not configured") return try: provider = AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret) except Exception as exc: logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) _runtime_finish(job_name, "error", processed=0, total=total, message=str(exc)) return end_date = date.today() for symbol in symbols: _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) async with async_session_factory() as db: try: result = await ingestion_service.fetch_and_ingest( db, provider, symbol, start_date=None, end_date=end_date, ) _last_successful[job_name] = symbol processed += 1 _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) logger.info(json.dumps({ "event": "ticker_collected", "job": job_name, "ticker": symbol, "status": result.status, "records": result.records_ingested, })) if result.status == "partial": # Rate limited — stop and resume next run logger.warning(json.dumps({ "event": "rate_limited", "job": job_name, "ticker": symbol, "processed": processed, })) _runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}") return except Exception as exc: _log_job_error(job_name, symbol, exc) # Reset resume pointer on full completion _last_successful[job_name] = None logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed})) _runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers") except Exception as exc: logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) _runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc)) # --------------------------------------------------------------------------- # Job: Sentiment Collector # --------------------------------------------------------------------------- async def collect_sentiment() -> None: """Fetch sentiment for all tracked tickers via OpenAI. Processes each ticker independently. On rate limit, records last successful ticker for resume. """ job_name = "sentiment_collector" logger.info(json.dumps({"event": "job_start", "job": job_name})) _runtime_start(job_name) processed = 0 total: int | None = None try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") return symbols = await _get_sentiment_priority_tickers(db) if not symbols: logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0})) _runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers") return total = len(symbols) _runtime_progress(job_name, processed=0, total=total) if not settings.openai_api_key: logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "openai key not configured"})) _runtime_finish(job_name, "skipped", processed=0, total=total, message="OpenAI key not configured") return try: provider = OpenAISentimentProvider(settings.openai_api_key, settings.openai_model) except Exception as exc: logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) _runtime_finish(job_name, "error", processed=0, total=total, message=str(exc)) return batch_size = max(1, settings.openai_sentiment_batch_size) batches = _chunked(symbols, batch_size) for batch in batches: current_hint = batch[0] if len(batch) == 1 else f"{batch[0]} (+{len(batch) - 1})" _runtime_progress(job_name, processed=processed, total=total, current_ticker=current_hint) batch_results: dict[str, SentimentData] = {} if len(batch) > 1 and hasattr(provider, "fetch_sentiment_batch"): try: batch_results = await provider.fetch_sentiment_batch(batch) except Exception as exc: msg = str(exc).lower() if "rate" in msg or "quota" in msg or "429" in msg: logger.warning(json.dumps({ "event": "rate_limited", "job": job_name, "ticker": batch[0], "processed": processed, })) _runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {batch[0]}") return logger.warning(json.dumps({ "event": "batch_fallback", "job": job_name, "batch": batch, "reason": str(exc), })) for symbol in batch: _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) data = batch_results.get(symbol) if batch_results else None if data is None: try: data = await provider.fetch_sentiment(symbol) except Exception as exc: msg = str(exc).lower() if "rate" in msg or "quota" in msg or "429" in msg: logger.warning(json.dumps({ "event": "rate_limited", "job": job_name, "ticker": symbol, "processed": processed, })) _runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}") return _log_job_error(job_name, symbol, exc) continue async with async_session_factory() as db: try: await sentiment_service.store_sentiment( db, symbol=symbol, classification=data.classification, confidence=data.confidence, source=data.source, timestamp=data.timestamp, reasoning=data.reasoning, citations=data.citations, ) _last_successful[job_name] = symbol processed += 1 _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) logger.info(json.dumps({ "event": "ticker_collected", "job": job_name, "ticker": symbol, "classification": data.classification, "confidence": data.confidence, })) except Exception as exc: _log_job_error(job_name, symbol, exc) _last_successful[job_name] = None logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed})) _runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers") except Exception as exc: logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) _runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc)) # --------------------------------------------------------------------------- # Job: Fundamental Collector # --------------------------------------------------------------------------- async def collect_fundamentals() -> None: """Fetch fundamentals for all tracked tickers via FMP. Processes each ticker independently. On rate limit, records last successful ticker for resume. """ job_name = "fundamental_collector" logger.info(json.dumps({"event": "job_start", "job": job_name})) _runtime_start(job_name) processed = 0 total: int | None = None try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") return symbols = await _get_fundamental_priority_tickers(db) if not symbols: logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0})) _runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers") return total = len(symbols) _runtime_progress(job_name, processed=0, total=total) if not (settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key): logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "no fundamentals provider keys configured"})) _runtime_finish(job_name, "skipped", processed=0, total=total, message="No fundamentals provider keys configured") return try: provider = build_fundamental_provider_chain() except Exception as exc: logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) _runtime_finish(job_name, "error", processed=0, total=total, message=str(exc)) return max_retries = max(0, settings.fundamental_rate_limit_retries) base_backoff = max(1, settings.fundamental_rate_limit_backoff_seconds) for symbol in symbols: _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) attempt = 0 while True: try: data = await provider.fetch_fundamentals(symbol) async with async_session_factory() as db: await fundamental_service.store_fundamental( db, symbol=symbol, pe_ratio=data.pe_ratio, revenue_growth=data.revenue_growth, earnings_surprise=data.earnings_surprise, market_cap=data.market_cap, unavailable_fields=data.unavailable_fields, ) _last_successful[job_name] = symbol processed += 1 _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) logger.info(json.dumps({ "event": "ticker_collected", "job": job_name, "ticker": symbol, })) break except Exception as exc: msg = str(exc).lower() if "rate" in msg or "429" in msg: if attempt < max_retries: wait_seconds = base_backoff * (2 ** attempt) attempt += 1 logger.warning(json.dumps({ "event": "rate_limited_retry", "job": job_name, "ticker": symbol, "attempt": attempt, "max_retries": max_retries, "wait_seconds": wait_seconds, "processed": processed, })) _runtime_progress( job_name, processed=processed, total=total, current_ticker=symbol, message=f"Rate-limited at {symbol}; retry {attempt}/{max_retries} in {wait_seconds}s", ) await asyncio.sleep(wait_seconds) continue logger.warning(json.dumps({ "event": "rate_limited", "job": job_name, "ticker": symbol, "processed": processed, })) _runtime_finish( job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol} after {attempt} retries", ) return _log_job_error(job_name, symbol, exc) break _last_successful[job_name] = None logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed})) _runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers") except Exception as exc: logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) _runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc)) # --------------------------------------------------------------------------- # Job: R:R Scanner # --------------------------------------------------------------------------- async def scan_rr() -> None: """Scan all tickers for trade setups meeting the R:R threshold. Uses rr_scanner_service.scan_all_tickers which already handles per-ticker error isolation internally. """ job_name = "rr_scanner" logger.info(json.dumps({"event": "job_start", "job": job_name})) _runtime_start(job_name) processed = 0 total: int | None = None try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") return symbols = await _get_all_tickers(db) total = len(symbols) _runtime_progress(job_name, processed=0, total=total) try: setups = await scan_all_tickers( db, rr_threshold=settings.default_rr_threshold, ) processed = total or 0 _runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Found {len(setups)} setups") logger.info(json.dumps({ "event": "job_complete", "job": job_name, "setups_found": len(setups), })) except Exception as exc: _runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc)) logger.error(json.dumps({ "event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc), })) except Exception as exc: logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) _runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc)) # --------------------------------------------------------------------------- # Job: Ticker Universe Sync # --------------------------------------------------------------------------- async def sync_ticker_universe() -> None: """Sync tracked tickers from configured default universe. Setting key: ticker_universe_default (sp500 | nasdaq100 | nasdaq_all) """ job_name = "ticker_universe_sync" logger.info(json.dumps({"event": "job_start", "job": job_name})) _runtime_start(job_name, total=1) try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) _runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled") return result = await db.execute( select(SystemSetting).where(SystemSetting.key == "ticker_universe_default") ) setting = result.scalar_one_or_none() universe = (setting.value if setting else "sp500").strip().lower() async with async_session_factory() as db: summary = await bootstrap_universe(db, universe, prune_missing=False) _runtime_progress(job_name, processed=1, total=1) _runtime_finish(job_name, "completed", processed=1, total=1, message=f"Synced {universe}") logger.info(json.dumps({ "event": "job_complete", "job": job_name, "universe": universe, "summary": summary, })) except Exception as exc: _runtime_finish(job_name, "error", processed=0, total=1, message=str(exc)) logger.error(json.dumps({ "event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc), })) # --------------------------------------------------------------------------- # Frequency helpers # --------------------------------------------------------------------------- _FREQUENCY_MAP: dict[str, dict[str, int]] = { "hourly": {"hours": 1}, "daily": {"hours": 24}, } def _parse_frequency(freq: str) -> dict[str, int]: """Convert a frequency string to APScheduler interval kwargs.""" return _FREQUENCY_MAP.get(freq.lower(), {"hours": 24}) # --------------------------------------------------------------------------- # Scheduler setup # --------------------------------------------------------------------------- def configure_scheduler() -> None: """Add all jobs to the scheduler with configured intervals. Call this once before scheduler.start(). Removes any existing jobs first to ensure idempotency. """ scheduler.remove_all_jobs() # Data Collector — configurable frequency (default: hourly) ohlcv_interval = _parse_frequency(settings.data_collector_frequency) scheduler.add_job( collect_ohlcv, "interval", **ohlcv_interval, id="data_collector", name="Data Collector (OHLCV)", replace_existing=True, ) # Sentiment Collector — default 30 min scheduler.add_job( collect_sentiment, "interval", minutes=settings.sentiment_poll_interval_minutes, id="sentiment_collector", name="Sentiment Collector", replace_existing=True, ) # Fundamental Collector — configurable frequency (default: daily) fund_interval = _parse_frequency(settings.fundamental_fetch_frequency) scheduler.add_job( collect_fundamentals, "interval", **fund_interval, id="fundamental_collector", name="Fundamental Collector", replace_existing=True, ) # R:R Scanner — configurable frequency (default: hourly) rr_interval = _parse_frequency(settings.rr_scan_frequency) scheduler.add_job( scan_rr, "interval", **rr_interval, id="rr_scanner", name="R:R Scanner", replace_existing=True, ) # Universe Sync — nightly scheduler.add_job( sync_ticker_universe, "interval", hours=24, id="ticker_universe_sync", name="Ticker Universe Sync", replace_existing=True, ) logger.info( json.dumps({ "event": "scheduler_configured", "jobs": { "data_collector": ohlcv_interval, "sentiment_collector": {"minutes": settings.sentiment_poll_interval_minutes}, "fundamental_collector": fund_interval, "rr_scanner": rr_interval, "ticker_universe_sync": {"hours": 24}, }, }) )