"""Ticker universe discovery and bootstrap service. Provides a minimal, provider-backed way to populate tracked tickers from well-known universes (S&P 500, NASDAQ-100, NASDAQ All). """ from __future__ import annotations import json import logging import os import re from collections.abc import Iterable from datetime import datetime, timezone from pathlib import Path import httpx from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.exceptions import ProviderError, ValidationError from app.models.settings import SystemSetting from app.models.ticker import Ticker logger = logging.getLogger(__name__) SUPPORTED_UNIVERSES = {"sp500", "nasdaq100", "nasdaq_all"} _SYMBOL_PATTERN = re.compile(r"^[A-Z0-9-]{1,10}$") _SEED_UNIVERSES: dict[str, list[str]] = { "sp500": [ "AAPL", "MSFT", "NVDA", "AMZN", "META", "GOOGL", "GOOG", "BRK-B", "TSLA", "JPM", "V", "MA", "UNH", "XOM", "LLY", "AVGO", "COST", "PG", "JNJ", "HD", "MRK", "BAC", "ABBV", "PEP", "KO", "ADBE", "NFLX", "CRM", "CSCO", "WMT", "AMD", "TMO", "MCD", "ORCL", "ACN", "CVX", "LIN", "DHR", "ABT", "QCOM", "TXN", "PM", "DIS", "INTU", ], "nasdaq100": [ "AAPL", "MSFT", "NVDA", "AMZN", "META", "GOOGL", "GOOG", "TSLA", "AVGO", "COST", "NFLX", "ADBE", "CSCO", "AMD", "INTU", "QCOM", "AMGN", "TXN", "INTC", "BKNG", "GILD", "ISRG", "MDLZ", "ADP", "LRCX", "ADI", "PANW", "SNPS", "CDNS", "KLAC", "MELI", "MU", "SBUX", "CSX", "REGN", "VRTX", "MAR", "MNST", "CTAS", "ASML", "PYPL", "AMAT", "NXPI", ], "nasdaq_all": [ "AAPL", "MSFT", "NVDA", "AMZN", "META", "GOOGL", "TSLA", "AMD", "INTC", "QCOM", "CSCO", "ADBE", "NFLX", "PYPL", "AMAT", "MU", "SBUX", "GILD", "INTU", "BKNG", "ADP", "CTAS", "PANW", "SNPS", "CDNS", "LRCX", "KLAC", "MELI", "ASML", "REGN", "VRTX", "MDLZ", "AMGN", ], } _CA_BUNDLE = os.environ.get("SSL_CERT_FILE", "") if not _CA_BUNDLE or not Path(_CA_BUNDLE).exists(): _CA_BUNDLE_PATH: str | bool = True else: _CA_BUNDLE_PATH = _CA_BUNDLE def _validate_universe(universe: str) -> str: normalised = universe.strip().lower() if normalised not in SUPPORTED_UNIVERSES: supported = ", ".join(sorted(SUPPORTED_UNIVERSES)) raise ValidationError(f"Unsupported universe '{universe}'. Supported: {supported}") return normalised def _normalise_symbols(symbols: Iterable[str]) -> list[str]: deduped: set[str] = set() for raw_symbol in symbols: symbol = raw_symbol.strip().upper().replace(".", "-") if not symbol: continue if _SYMBOL_PATTERN.fullmatch(symbol) is None: continue deduped.add(symbol) return sorted(deduped) def _extract_symbols_from_fmp_payload(payload: object) -> list[str]: if not isinstance(payload, list): return [] symbols: list[str] = [] for item in payload: if not isinstance(item, dict): continue candidate = item.get("symbol") or item.get("ticker") if isinstance(candidate, str): symbols.append(candidate) return symbols async def _try_fmp_urls( client: httpx.AsyncClient, urls: list[str], ) -> tuple[list[str], list[str]]: failures: list[str] = [] for url in urls: endpoint = url.split("?")[0] try: response = await client.get(url) except httpx.HTTPError as exc: failures.append(f"{endpoint}: network error ({type(exc).__name__}: {exc})") continue if response.status_code != 200: failures.append(f"{endpoint}: HTTP {response.status_code}") continue try: payload = response.json() except ValueError: failures.append(f"{endpoint}: invalid JSON payload") continue symbols = _extract_symbols_from_fmp_payload(payload) if symbols: return symbols, failures failures.append(f"{endpoint}: empty/unsupported payload") return [], failures async def _fetch_universe_symbols_from_fmp(universe: str) -> list[str]: if not settings.fmp_api_key: raise ValidationError( "FMP API key is required for universe bootstrap (set FMP_API_KEY)" ) api_key = settings.fmp_api_key stable_base = "https://financialmodelingprep.com/stable" legacy_base = "https://financialmodelingprep.com/api/v3" stable_candidates: dict[str, list[str]] = { "sp500": [ f"{stable_base}/sp500-constituent?apikey={api_key}", f"{stable_base}/sp500-constituents?apikey={api_key}", ], "nasdaq100": [ f"{stable_base}/nasdaq-100-constituent?apikey={api_key}", f"{stable_base}/nasdaq100-constituent?apikey={api_key}", f"{stable_base}/nasdaq-100-constituents?apikey={api_key}", ], "nasdaq_all": [ f"{stable_base}/stock-screener?exchange=NASDAQ&isEtf=false&limit=10000&apikey={api_key}", f"{stable_base}/available-traded/list?apikey={api_key}", ], } legacy_candidates: dict[str, list[str]] = { "sp500": [ f"{legacy_base}/sp500_constituent?apikey={api_key}", f"{legacy_base}/sp500_constituent", ], "nasdaq100": [ f"{legacy_base}/nasdaq_constituent?apikey={api_key}", f"{legacy_base}/nasdaq_constituent", ], "nasdaq_all": [ f"{legacy_base}/stock-screener?exchange=NASDAQ&isEtf=false&limit=10000&apikey={api_key}", ], } failures: list[str] = [] async with httpx.AsyncClient(timeout=30.0, verify=_CA_BUNDLE_PATH) as client: stable_symbols, stable_failures = await _try_fmp_urls(client, stable_candidates[universe]) failures.extend(stable_failures) if stable_symbols: return stable_symbols legacy_symbols, legacy_failures = await _try_fmp_urls(client, legacy_candidates[universe]) failures.extend(legacy_failures) if legacy_symbols: return legacy_symbols if failures: reason = "; ".join(failures[:6]) logger.warning("FMP universe fetch failed for %s: %s", universe, reason) raise ProviderError( f"Failed to fetch universe symbols from FMP for '{universe}'. Attempts: {reason}" ) raise ProviderError(f"Failed to fetch universe symbols from FMP for '{universe}'") async def _fetch_html_symbols( client: httpx.AsyncClient, url: str, pattern: str, ) -> tuple[list[str], str | None]: try: response = await client.get(url) except httpx.HTTPError as exc: return [], f"{url}: network error ({type(exc).__name__}: {exc})" if response.status_code != 200: return [], f"{url}: HTTP {response.status_code}" matches = re.findall(pattern, response.text, flags=re.IGNORECASE) if not matches: return [], f"{url}: no symbols parsed" return list(matches), None async def _fetch_nasdaq_trader_symbols( client: httpx.AsyncClient, ) -> tuple[list[str], str | None]: url = "https://www.nasdaqtrader.com/dynamic/SymDir/nasdaqlisted.txt" try: response = await client.get(url) except httpx.HTTPError as exc: return [], f"{url}: network error ({type(exc).__name__}: {exc})" if response.status_code != 200: return [], f"{url}: HTTP {response.status_code}" symbols: list[str] = [] for line in response.text.splitlines(): if not line or line.startswith("Symbol|") or line.startswith("File Creation Time"): continue parts = line.split("|") if not parts: continue symbol = parts[0].strip() test_issue = parts[6].strip() if len(parts) > 6 else "N" if test_issue == "Y": continue symbols.append(symbol) if not symbols: return [], f"{url}: no symbols parsed" return symbols, None async def _fetch_universe_symbols_from_public(universe: str) -> tuple[list[str], list[str], str | None]: failures: list[str] = [] sp500_url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" nasdaq100_url = "https://en.wikipedia.org/wiki/Nasdaq-100" wiki_symbol_pattern = r"\s*]*>([A-Z.]{1,10})\s*" async with httpx.AsyncClient(timeout=30.0, verify=_CA_BUNDLE_PATH) as client: if universe == "sp500": symbols, error = await _fetch_html_symbols(client, sp500_url, wiki_symbol_pattern) if error: failures.append(error) else: return symbols, failures, "wikipedia_sp500" if universe == "nasdaq100": symbols, error = await _fetch_html_symbols(client, nasdaq100_url, wiki_symbol_pattern) if error: failures.append(error) else: return symbols, failures, "wikipedia_nasdaq100" if universe == "nasdaq_all": symbols, error = await _fetch_nasdaq_trader_symbols(client) if error: failures.append(error) else: return symbols, failures, "nasdaq_trader" return [], failures, None async def _read_cached_symbols(db: AsyncSession, universe: str) -> list[str]: key = f"ticker_universe_cache_{universe}" result = await db.execute(select(SystemSetting).where(SystemSetting.key == key)) setting = result.scalar_one_or_none() if setting is None: return [] try: payload = json.loads(setting.value) except (TypeError, ValueError): return [] if isinstance(payload, dict): symbols = payload.get("symbols", []) elif isinstance(payload, list): symbols = payload else: symbols = [] if not isinstance(symbols, list): return [] return _normalise_symbols([str(symbol) for symbol in symbols]) async def _write_cached_symbols( db: AsyncSession, universe: str, symbols: list[str], source: str, ) -> None: key = f"ticker_universe_cache_{universe}" payload = { "symbols": symbols, "source": source, "updated_at": datetime.now(timezone.utc).isoformat(), } result = await db.execute(select(SystemSetting).where(SystemSetting.key == key)) setting = result.scalar_one_or_none() value = json.dumps(payload) if setting is None: db.add(SystemSetting(key=key, value=value)) else: setting.value = value await db.commit() async def fetch_universe_symbols(db: AsyncSession, universe: str) -> list[str]: """Fetch and normalise symbols for a supported universe with fallbacks. Fallback order: 1) Free public sources (Wikipedia/NASDAQ trader) 2) FMP endpoints (if available) 3) Cached snapshot in SystemSetting 4) Built-in seed symbols """ normalised_universe = _validate_universe(universe) failures: list[str] = [] public_symbols, public_failures, public_source = await _fetch_universe_symbols_from_public(normalised_universe) failures.extend(public_failures) cleaned_public = _normalise_symbols(public_symbols) if cleaned_public: await _write_cached_symbols(db, normalised_universe, cleaned_public, public_source or "public") return cleaned_public try: fmp_symbols = await _fetch_universe_symbols_from_fmp(normalised_universe) cleaned_fmp = _normalise_symbols(fmp_symbols) if cleaned_fmp: await _write_cached_symbols(db, normalised_universe, cleaned_fmp, "fmp") return cleaned_fmp except (ProviderError, ValidationError) as exc: failures.append(str(exc)) cached_symbols = await _read_cached_symbols(db, normalised_universe) if cached_symbols: logger.warning( "Using cached universe symbols for %s because live fetch failed: %s", normalised_universe, "; ".join(failures[:3]), ) return cached_symbols seed_symbols = _normalise_symbols(_SEED_UNIVERSES.get(normalised_universe, [])) if seed_symbols: logger.warning( "Using built-in seed symbols for %s because live/cache fetch failed: %s", normalised_universe, "; ".join(failures[:3]), ) return seed_symbols reason = "; ".join(failures[:6]) if failures else "no provider returned symbols" raise ProviderError(f"Universe '{normalised_universe}' returned no valid symbols. Attempts: {reason}") async def bootstrap_universe( db: AsyncSession, universe: str, *, prune_missing: bool = False, ) -> dict[str, int | str]: """Upsert ticker universe into tracked tickers. Returns summary counts for added/existing/deleted symbols. """ normalised_universe = _validate_universe(universe) symbols = await fetch_universe_symbols(db, normalised_universe) existing_rows = await db.execute(select(Ticker.symbol)) existing_symbols = set(existing_rows.scalars().all()) target_symbols = set(symbols) symbols_to_add = sorted(target_symbols - existing_symbols) symbols_to_delete = sorted(existing_symbols - target_symbols) if prune_missing else [] for symbol in symbols_to_add: db.add(Ticker(symbol=symbol)) deleted_count = 0 if symbols_to_delete: result = await db.execute(delete(Ticker).where(Ticker.symbol.in_(symbols_to_delete))) deleted_count = int(result.rowcount or 0) await db.commit() return { "universe": normalised_universe, "total_universe_symbols": len(symbols), "added": len(symbols_to_add), "already_tracked": len(target_symbols & existing_symbols), "deleted": deleted_count, }