From f24e5070ee8ef93d6ebc54047f751e91473e47de Mon Sep 17 00:00:00 2001 From: Dennis Thiessen Date: Sun, 14 Jun 2026 21:18:32 +0200 Subject: [PATCH] fix bulk fundamentals: rate limits masked by partial FMP success MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause of "price plan needed in bulk but fine on manual reload": on free tiers FMP returns only market cap (others 402) and the chain merged that as a partial success — so when the Finnhub/Alpha Vantage fallbacks were rate-limited during a bulk run, the chain silently returned market-cap-only and the collector's backoff never engaged. Manual single fetches worked because the fallbacks weren't throttled at that moment. Fixes: - Chain distinguishes RateLimitError from other failures: if a fallback is rate-limited and fields are still missing, raise RateLimitError (unless allow_partial=True) so the collector backs off and retries. - Bulk job paces requests (fundamental_request_spacing_seconds, default 3s) to stay under Finnhub's ~60/min, and on retry-exhaustion stores partial data and continues instead of aborting the whole run. - Manual fetch passes allow_partial=True so a lone 429 doesn't fail the refresh. Co-Authored-By: Claude Opus 4.8 --- app/config.py | 4 ++ app/providers/fundamentals_chain.py | 28 ++++++++- app/routers/ingestion.py | 6 +- app/scheduler.py | 48 +++++++++------- .../unit/test_fundamentals_chain_provider.py | 57 ++++++++++++++++++- 5 files changed, 121 insertions(+), 22 deletions(-) diff --git a/app/config.py b/app/config.py index c0780e4..e492099 100644 --- a/app/config.py +++ b/app/config.py @@ -49,6 +49,10 @@ class Settings(BaseSettings): alerts_frequency: str = "hourly" fundamental_rate_limit_retries: int = 3 fundamental_rate_limit_backoff_seconds: int = 15 + # Pause between tickers in the bulk fundamentals job. Free tiers throttle + # hard (Finnhub ~60 calls/min, ~3 calls/ticker → ~3s/ticker); without + # spacing the job bursts straight into 429s. 0 disables. + fundamental_request_spacing_seconds: float = 3.0 # Scoring Defaults default_watchlist_auto_size: int = 10 diff --git a/app/providers/fundamentals_chain.py b/app/providers/fundamentals_chain.py index 9d78114..4631ee6 100644 --- a/app/providers/fundamentals_chain.py +++ b/app/providers/fundamentals_chain.py @@ -220,16 +220,31 @@ class ChainedFundamentalProvider: raise ProviderError("No fundamental providers configured") self._providers = providers - async def fetch_fundamentals(self, ticker: str) -> FundamentalData: + async def fetch_fundamentals(self, ticker: str, allow_partial: bool = False) -> FundamentalData: + """Merge fundamentals across providers. + + ``allow_partial`` controls behaviour when a fallback provider is *rate + limited* and we end up with missing fields. By default we raise + RateLimitError so the caller (the bulk collector) can back off and retry + the ticker once the window frees — otherwise a transient 429 on Finnhub + would be silently stored as market-cap-only. Pass ``allow_partial=True`` + (manual single fetches, or the collector's final give-up attempt) to + accept whatever was gathered instead of raising. + """ merged: dict[str, float | None] = {f: None for f in _FUNDAMENTAL_FIELDS} field_source: dict[str, str] = {} errors: list[str] = [] + rate_limited = False for provider_name, provider in self._providers: if all(merged[f] is not None for f in _FUNDAMENTAL_FIELDS): break try: data = await provider.fetch_fundamentals(ticker) + except RateLimitError as exc: + rate_limited = True + errors.append(f"{provider_name}: RateLimitError: {exc}") + continue except Exception as exc: errors.append(f"{provider_name}: {type(exc).__name__}: {exc}") continue @@ -241,6 +256,17 @@ class ChainedFundamentalProvider: merged[field] = value field_source[field] = provider_name + missing = [f for f in _FUNDAMENTAL_FIELDS if merged[f] is None] + + # A rate limit left data incomplete: signal it (unless partial is OK) so + # the collector backs off rather than persisting a degraded record. + if rate_limited and missing and not allow_partial: + attempts = "; ".join(errors[:6]) + raise RateLimitError( + f"Fundamentals incomplete for {ticker} due to provider rate limits " + f"(missing {', '.join(missing)}). Attempts: {attempts}" + ) + if all(merged[f] is None for f in _FUNDAMENTAL_FIELDS): attempts = "; ".join(errors[:6]) if errors else "no usable metrics from any provider" raise ProviderError(f"All fundamentals providers failed for {ticker}. Attempts: {attempts}") diff --git a/app/routers/ingestion.py b/app/routers/ingestion.py index 304f2f6..22b1d66 100644 --- a/app/routers/ingestion.py +++ b/app/routers/ingestion.py @@ -159,7 +159,11 @@ async def fetch_symbol( if settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key: try: fundamentals_provider = build_fundamental_provider_chain() - fdata = await fundamentals_provider.fetch_fundamentals(symbol_upper) + # Manual single fetch: take whatever we can get (a lone 429 on a + # fallback shouldn't fail the whole refresh). + fdata = await fundamentals_provider.fetch_fundamentals( + symbol_upper, allow_partial=True + ) await fundamental_service.store_fundamental( db, symbol=symbol_upper, diff --git a/app/scheduler.py b/app/scheduler.py index adda64f..39292b4 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -576,6 +576,19 @@ async def collect_fundamentals() -> None: max_retries = max(0, settings.fundamental_rate_limit_retries) base_backoff = max(1, settings.fundamental_rate_limit_backoff_seconds) + spacing = max(0.0, settings.fundamental_request_spacing_seconds) + + async def _store(symbol: str, data) -> None: + async with async_session_factory() as db: + await fundamental_service.store_fundamental( + db, + symbol=symbol, + pe_ratio=data.pe_ratio, + revenue_growth=data.revenue_growth, + earnings_surprise=data.earnings_surprise, + market_cap=data.market_cap, + unavailable_fields=data.unavailable_fields, + ) for symbol in symbols: _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) @@ -583,16 +596,7 @@ async def collect_fundamentals() -> None: while True: try: data = await provider.fetch_fundamentals(symbol) - async with async_session_factory() as db: - await fundamental_service.store_fundamental( - db, - symbol=symbol, - pe_ratio=data.pe_ratio, - revenue_growth=data.revenue_growth, - earnings_surprise=data.earnings_surprise, - market_cap=data.market_cap, - unavailable_fields=data.unavailable_fields, - ) + await _store(symbol, data) _last_successful[job_name] = symbol processed += 1 _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) @@ -627,23 +631,29 @@ async def collect_fundamentals() -> None: await asyncio.sleep(wait_seconds) continue + # Retries exhausted: store whatever partial data we can + # still get (e.g. FMP market cap) and move on, rather than + # aborting the whole run and leaving every later ticker + # untouched. logger.warning(json.dumps({ - "event": "rate_limited", + "event": "rate_limited_partial", "job": job_name, "ticker": symbol, "processed": processed, })) - _runtime_finish( - job_name, - "rate_limited", - processed=processed, - total=total, - message=f"Rate limited at {symbol} after {attempt} retries", - ) - return + try: + data = await provider.fetch_fundamentals(symbol, allow_partial=True) + await _store(symbol, data) + processed += 1 + except Exception as exc2: + _log_job_error(job_name, symbol, exc2) + break _log_job_error(job_name, symbol, exc) break + if spacing: + await asyncio.sleep(spacing) + _last_successful[job_name] = None logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed})) _runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers") diff --git a/tests/unit/test_fundamentals_chain_provider.py b/tests/unit/test_fundamentals_chain_provider.py index 2440a7b..09d4282 100644 --- a/tests/unit/test_fundamentals_chain_provider.py +++ b/tests/unit/test_fundamentals_chain_provider.py @@ -6,7 +6,7 @@ from datetime import datetime, timezone import pytest -from app.exceptions import ProviderError +from app.exceptions import ProviderError, RateLimitError from app.providers.fundamentals_chain import ChainedFundamentalProvider from app.providers.protocol import FundamentalData @@ -19,6 +19,11 @@ class _FailProvider: raise ProviderError(f"{self._message} ({ticker})") +class _RateLimitedProvider: + async def fetch_fundamentals(self, ticker: str) -> FundamentalData: + raise RateLimitError(f"rate limit hit for {ticker}") + + class _DataProvider: def __init__(self, data: FundamentalData) -> None: self._data = data @@ -98,3 +103,53 @@ async def test_chained_provider_raises_when_all_providers_fail(): await provider.fetch_fundamentals("MSFT") assert "All fundamentals providers failed" in str(exc.value) + + +@pytest.mark.asyncio +async def test_rate_limited_fallback_raises_when_incomplete(): + """FMP gives market cap; the fallback is rate-limited → chain signals it so + the collector can back off instead of storing a degraded record.""" + primary_data = FundamentalData( + ticker="AAPL", pe_ratio=None, revenue_growth=None, earnings_surprise=None, + market_cap=2_000_000.0, fetched_at=datetime.now(timezone.utc), unavailable_fields={}, + ) + provider = ChainedFundamentalProvider([ + ("fmp", _DataProvider(primary_data)), + ("finnhub", _RateLimitedProvider()), + ]) + + with pytest.raises(RateLimitError): + await provider.fetch_fundamentals("AAPL") + + +@pytest.mark.asyncio +async def test_rate_limited_fallback_allows_partial(): + """With allow_partial=True the chain returns the market cap it did get.""" + primary_data = FundamentalData( + ticker="AAPL", pe_ratio=None, revenue_growth=None, earnings_surprise=None, + market_cap=2_000_000.0, fetched_at=datetime.now(timezone.utc), unavailable_fields={}, + ) + provider = ChainedFundamentalProvider([ + ("fmp", _DataProvider(primary_data)), + ("finnhub", _RateLimitedProvider()), + ]) + + result = await provider.fetch_fundamentals("AAPL", allow_partial=True) + assert result.market_cap == 2_000_000.0 + assert result.pe_ratio is None + + +@pytest.mark.asyncio +async def test_rate_limited_but_complete_does_not_raise(): + """If every field is filled, a rate limit on a later (unused) provider is moot.""" + full = FundamentalData( + ticker="AAPL", pe_ratio=20.0, revenue_growth=10.0, earnings_surprise=2.0, + market_cap=5.0, fetched_at=datetime.now(timezone.utc), unavailable_fields={}, + ) + provider = ChainedFundamentalProvider([ + ("fmp", _DataProvider(full)), + ("finnhub", _RateLimitedProvider()), + ]) + + result = await provider.fetch_fundamentals("AAPL") + assert result.pe_ratio == 20.0