fix bulk fundamentals: rate limits masked by partial FMP success
Root cause of "price plan needed in bulk but fine on manual reload": on free tiers FMP returns only market cap (others 402) and the chain merged that as a partial success — so when the Finnhub/Alpha Vantage fallbacks were rate-limited during a bulk run, the chain silently returned market-cap-only and the collector's backoff never engaged. Manual single fetches worked because the fallbacks weren't throttled at that moment. Fixes: - Chain distinguishes RateLimitError from other failures: if a fallback is rate-limited and fields are still missing, raise RateLimitError (unless allow_partial=True) so the collector backs off and retries. - Bulk job paces requests (fundamental_request_spacing_seconds, default 3s) to stay under Finnhub's ~60/min, and on retry-exhaustion stores partial data and continues instead of aborting the whole run. - Manual fetch passes allow_partial=True so a lone 429 doesn't fail the refresh. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -49,6 +49,10 @@ class Settings(BaseSettings):
|
|||||||
alerts_frequency: str = "hourly"
|
alerts_frequency: str = "hourly"
|
||||||
fundamental_rate_limit_retries: int = 3
|
fundamental_rate_limit_retries: int = 3
|
||||||
fundamental_rate_limit_backoff_seconds: int = 15
|
fundamental_rate_limit_backoff_seconds: int = 15
|
||||||
|
# Pause between tickers in the bulk fundamentals job. Free tiers throttle
|
||||||
|
# hard (Finnhub ~60 calls/min, ~3 calls/ticker → ~3s/ticker); without
|
||||||
|
# spacing the job bursts straight into 429s. 0 disables.
|
||||||
|
fundamental_request_spacing_seconds: float = 3.0
|
||||||
|
|
||||||
# Scoring Defaults
|
# Scoring Defaults
|
||||||
default_watchlist_auto_size: int = 10
|
default_watchlist_auto_size: int = 10
|
||||||
|
|||||||
@@ -220,16 +220,31 @@ class ChainedFundamentalProvider:
|
|||||||
raise ProviderError("No fundamental providers configured")
|
raise ProviderError("No fundamental providers configured")
|
||||||
self._providers = providers
|
self._providers = providers
|
||||||
|
|
||||||
async def fetch_fundamentals(self, ticker: str) -> FundamentalData:
|
async def fetch_fundamentals(self, ticker: str, allow_partial: bool = False) -> FundamentalData:
|
||||||
|
"""Merge fundamentals across providers.
|
||||||
|
|
||||||
|
``allow_partial`` controls behaviour when a fallback provider is *rate
|
||||||
|
limited* and we end up with missing fields. By default we raise
|
||||||
|
RateLimitError so the caller (the bulk collector) can back off and retry
|
||||||
|
the ticker once the window frees — otherwise a transient 429 on Finnhub
|
||||||
|
would be silently stored as market-cap-only. Pass ``allow_partial=True``
|
||||||
|
(manual single fetches, or the collector's final give-up attempt) to
|
||||||
|
accept whatever was gathered instead of raising.
|
||||||
|
"""
|
||||||
merged: dict[str, float | None] = {f: None for f in _FUNDAMENTAL_FIELDS}
|
merged: dict[str, float | None] = {f: None for f in _FUNDAMENTAL_FIELDS}
|
||||||
field_source: dict[str, str] = {}
|
field_source: dict[str, str] = {}
|
||||||
errors: list[str] = []
|
errors: list[str] = []
|
||||||
|
rate_limited = False
|
||||||
|
|
||||||
for provider_name, provider in self._providers:
|
for provider_name, provider in self._providers:
|
||||||
if all(merged[f] is not None for f in _FUNDAMENTAL_FIELDS):
|
if all(merged[f] is not None for f in _FUNDAMENTAL_FIELDS):
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
data = await provider.fetch_fundamentals(ticker)
|
data = await provider.fetch_fundamentals(ticker)
|
||||||
|
except RateLimitError as exc:
|
||||||
|
rate_limited = True
|
||||||
|
errors.append(f"{provider_name}: RateLimitError: {exc}")
|
||||||
|
continue
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
errors.append(f"{provider_name}: {type(exc).__name__}: {exc}")
|
errors.append(f"{provider_name}: {type(exc).__name__}: {exc}")
|
||||||
continue
|
continue
|
||||||
@@ -241,6 +256,17 @@ class ChainedFundamentalProvider:
|
|||||||
merged[field] = value
|
merged[field] = value
|
||||||
field_source[field] = provider_name
|
field_source[field] = provider_name
|
||||||
|
|
||||||
|
missing = [f for f in _FUNDAMENTAL_FIELDS if merged[f] is None]
|
||||||
|
|
||||||
|
# A rate limit left data incomplete: signal it (unless partial is OK) so
|
||||||
|
# the collector backs off rather than persisting a degraded record.
|
||||||
|
if rate_limited and missing and not allow_partial:
|
||||||
|
attempts = "; ".join(errors[:6])
|
||||||
|
raise RateLimitError(
|
||||||
|
f"Fundamentals incomplete for {ticker} due to provider rate limits "
|
||||||
|
f"(missing {', '.join(missing)}). Attempts: {attempts}"
|
||||||
|
)
|
||||||
|
|
||||||
if all(merged[f] is None for f in _FUNDAMENTAL_FIELDS):
|
if all(merged[f] is None for f in _FUNDAMENTAL_FIELDS):
|
||||||
attempts = "; ".join(errors[:6]) if errors else "no usable metrics from any provider"
|
attempts = "; ".join(errors[:6]) if errors else "no usable metrics from any provider"
|
||||||
raise ProviderError(f"All fundamentals providers failed for {ticker}. Attempts: {attempts}")
|
raise ProviderError(f"All fundamentals providers failed for {ticker}. Attempts: {attempts}")
|
||||||
|
|||||||
@@ -159,7 +159,11 @@ async def fetch_symbol(
|
|||||||
if settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key:
|
if settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key:
|
||||||
try:
|
try:
|
||||||
fundamentals_provider = build_fundamental_provider_chain()
|
fundamentals_provider = build_fundamental_provider_chain()
|
||||||
fdata = await fundamentals_provider.fetch_fundamentals(symbol_upper)
|
# Manual single fetch: take whatever we can get (a lone 429 on a
|
||||||
|
# fallback shouldn't fail the whole refresh).
|
||||||
|
fdata = await fundamentals_provider.fetch_fundamentals(
|
||||||
|
symbol_upper, allow_partial=True
|
||||||
|
)
|
||||||
await fundamental_service.store_fundamental(
|
await fundamental_service.store_fundamental(
|
||||||
db,
|
db,
|
||||||
symbol=symbol_upper,
|
symbol=symbol_upper,
|
||||||
|
|||||||
+25
-15
@@ -576,13 +576,9 @@ async def collect_fundamentals() -> None:
|
|||||||
|
|
||||||
max_retries = max(0, settings.fundamental_rate_limit_retries)
|
max_retries = max(0, settings.fundamental_rate_limit_retries)
|
||||||
base_backoff = max(1, settings.fundamental_rate_limit_backoff_seconds)
|
base_backoff = max(1, settings.fundamental_rate_limit_backoff_seconds)
|
||||||
|
spacing = max(0.0, settings.fundamental_request_spacing_seconds)
|
||||||
|
|
||||||
for symbol in symbols:
|
async def _store(symbol: str, data) -> None:
|
||||||
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
|
|
||||||
attempt = 0
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
data = await provider.fetch_fundamentals(symbol)
|
|
||||||
async with async_session_factory() as db:
|
async with async_session_factory() as db:
|
||||||
await fundamental_service.store_fundamental(
|
await fundamental_service.store_fundamental(
|
||||||
db,
|
db,
|
||||||
@@ -593,6 +589,14 @@ async def collect_fundamentals() -> None:
|
|||||||
market_cap=data.market_cap,
|
market_cap=data.market_cap,
|
||||||
unavailable_fields=data.unavailable_fields,
|
unavailable_fields=data.unavailable_fields,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for symbol in symbols:
|
||||||
|
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
|
||||||
|
attempt = 0
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
data = await provider.fetch_fundamentals(symbol)
|
||||||
|
await _store(symbol, data)
|
||||||
_last_successful[job_name] = symbol
|
_last_successful[job_name] = symbol
|
||||||
processed += 1
|
processed += 1
|
||||||
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
|
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
|
||||||
@@ -627,23 +631,29 @@ async def collect_fundamentals() -> None:
|
|||||||
await asyncio.sleep(wait_seconds)
|
await asyncio.sleep(wait_seconds)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Retries exhausted: store whatever partial data we can
|
||||||
|
# still get (e.g. FMP market cap) and move on, rather than
|
||||||
|
# aborting the whole run and leaving every later ticker
|
||||||
|
# untouched.
|
||||||
logger.warning(json.dumps({
|
logger.warning(json.dumps({
|
||||||
"event": "rate_limited",
|
"event": "rate_limited_partial",
|
||||||
"job": job_name,
|
"job": job_name,
|
||||||
"ticker": symbol,
|
"ticker": symbol,
|
||||||
"processed": processed,
|
"processed": processed,
|
||||||
}))
|
}))
|
||||||
_runtime_finish(
|
try:
|
||||||
job_name,
|
data = await provider.fetch_fundamentals(symbol, allow_partial=True)
|
||||||
"rate_limited",
|
await _store(symbol, data)
|
||||||
processed=processed,
|
processed += 1
|
||||||
total=total,
|
except Exception as exc2:
|
||||||
message=f"Rate limited at {symbol} after {attempt} retries",
|
_log_job_error(job_name, symbol, exc2)
|
||||||
)
|
break
|
||||||
return
|
|
||||||
_log_job_error(job_name, symbol, exc)
|
_log_job_error(job_name, symbol, exc)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
if spacing:
|
||||||
|
await asyncio.sleep(spacing)
|
||||||
|
|
||||||
_last_successful[job_name] = None
|
_last_successful[job_name] = None
|
||||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
|
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
|
||||||
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
|
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from datetime import datetime, timezone
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from app.exceptions import ProviderError
|
from app.exceptions import ProviderError, RateLimitError
|
||||||
from app.providers.fundamentals_chain import ChainedFundamentalProvider
|
from app.providers.fundamentals_chain import ChainedFundamentalProvider
|
||||||
from app.providers.protocol import FundamentalData
|
from app.providers.protocol import FundamentalData
|
||||||
|
|
||||||
@@ -19,6 +19,11 @@ class _FailProvider:
|
|||||||
raise ProviderError(f"{self._message} ({ticker})")
|
raise ProviderError(f"{self._message} ({ticker})")
|
||||||
|
|
||||||
|
|
||||||
|
class _RateLimitedProvider:
|
||||||
|
async def fetch_fundamentals(self, ticker: str) -> FundamentalData:
|
||||||
|
raise RateLimitError(f"rate limit hit for {ticker}")
|
||||||
|
|
||||||
|
|
||||||
class _DataProvider:
|
class _DataProvider:
|
||||||
def __init__(self, data: FundamentalData) -> None:
|
def __init__(self, data: FundamentalData) -> None:
|
||||||
self._data = data
|
self._data = data
|
||||||
@@ -98,3 +103,53 @@ async def test_chained_provider_raises_when_all_providers_fail():
|
|||||||
await provider.fetch_fundamentals("MSFT")
|
await provider.fetch_fundamentals("MSFT")
|
||||||
|
|
||||||
assert "All fundamentals providers failed" in str(exc.value)
|
assert "All fundamentals providers failed" in str(exc.value)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rate_limited_fallback_raises_when_incomplete():
|
||||||
|
"""FMP gives market cap; the fallback is rate-limited → chain signals it so
|
||||||
|
the collector can back off instead of storing a degraded record."""
|
||||||
|
primary_data = FundamentalData(
|
||||||
|
ticker="AAPL", pe_ratio=None, revenue_growth=None, earnings_surprise=None,
|
||||||
|
market_cap=2_000_000.0, fetched_at=datetime.now(timezone.utc), unavailable_fields={},
|
||||||
|
)
|
||||||
|
provider = ChainedFundamentalProvider([
|
||||||
|
("fmp", _DataProvider(primary_data)),
|
||||||
|
("finnhub", _RateLimitedProvider()),
|
||||||
|
])
|
||||||
|
|
||||||
|
with pytest.raises(RateLimitError):
|
||||||
|
await provider.fetch_fundamentals("AAPL")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rate_limited_fallback_allows_partial():
|
||||||
|
"""With allow_partial=True the chain returns the market cap it did get."""
|
||||||
|
primary_data = FundamentalData(
|
||||||
|
ticker="AAPL", pe_ratio=None, revenue_growth=None, earnings_surprise=None,
|
||||||
|
market_cap=2_000_000.0, fetched_at=datetime.now(timezone.utc), unavailable_fields={},
|
||||||
|
)
|
||||||
|
provider = ChainedFundamentalProvider([
|
||||||
|
("fmp", _DataProvider(primary_data)),
|
||||||
|
("finnhub", _RateLimitedProvider()),
|
||||||
|
])
|
||||||
|
|
||||||
|
result = await provider.fetch_fundamentals("AAPL", allow_partial=True)
|
||||||
|
assert result.market_cap == 2_000_000.0
|
||||||
|
assert result.pe_ratio is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rate_limited_but_complete_does_not_raise():
|
||||||
|
"""If every field is filled, a rate limit on a later (unused) provider is moot."""
|
||||||
|
full = FundamentalData(
|
||||||
|
ticker="AAPL", pe_ratio=20.0, revenue_growth=10.0, earnings_surprise=2.0,
|
||||||
|
market_cap=5.0, fetched_at=datetime.now(timezone.utc), unavailable_fields={},
|
||||||
|
)
|
||||||
|
provider = ChainedFundamentalProvider([
|
||||||
|
("fmp", _DataProvider(full)),
|
||||||
|
("finnhub", _RateLimitedProvider()),
|
||||||
|
])
|
||||||
|
|
||||||
|
result = await provider.fetch_fundamentals("AAPL")
|
||||||
|
assert result.pe_ratio == 20.0
|
||||||
|
|||||||
Reference in New Issue
Block a user