Fix score refresh, add granular fetch and live job status
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 35s
Deploy / deploy (push) Successful in 22s

Scores never updated ("101d ago"): get_score only recomputes stale/
missing dimensions, but nothing marked them stale on new data, and there
was no scheduled scoring job.
- Fetch endpoint force-recomputes dimensions + composite.
- Scheduled scan (scan_all_tickers) refreshes scores per ticker, so
  scores stay current globally, not just on manual fetch.

Granular fetch: /ingestion/fetch accepts a sources filter; the freshness
bar gets a per-row refresh button (OHLCV/Sentiment/Fundamentals fetch
that provider only — marked paid; S/R/Scores recompute for free). Header
button is now "Fetch All".

Job visibility: GET /jobs/running (any user) + sidebar live indicator
showing running scheduled jobs with progress, polled every 10s.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-06-14 13:10:15 +02:00
parent 3aebfd72d3
commit 316226096b
10 changed files with 296 additions and 94 deletions
+108 -76
View File
@@ -39,6 +39,9 @@ logger = logging.getLogger(__name__)
router = APIRouter(tags=["ingestion"])
_PROVIDER_SOURCES = {"ohlcv", "sentiment", "fundamentals"}
def _get_provider() -> AlpacaOHLCVProvider:
"""Build the OHLCV provider from current settings."""
if not settings.alpaca_api_key or not settings.alpaca_api_secret:
@@ -46,26 +49,47 @@ def _get_provider() -> AlpacaOHLCVProvider:
return AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret)
def _parse_requested_sources(sources: str | None) -> set[str]:
"""Which provider sources to fetch. None/'all' → every provider source.
Anything else is parsed as a comma list; an empty/recompute-only request
fetches no providers (just refreshes the free derived pipeline).
"""
if sources is None:
return set(_PROVIDER_SOURCES)
parts = {p.strip().lower() for p in sources.split(",") if p.strip()}
if "all" in parts:
return set(_PROVIDER_SOURCES)
return parts & _PROVIDER_SOURCES
@router.post("/ingestion/fetch/{symbol}", response_model=APIEnvelope)
async def fetch_symbol(
symbol: str,
start_date: date | None = Query(None, description="Start date (YYYY-MM-DD)"),
end_date: date | None = Query(None, description="End date (YYYY-MM-DD)"),
force_refetch: bool = Query(False, description="Delete existing OHLCV data and re-fetch split-adjusted history"),
sources: str | None = Query(
None,
description="Comma list of provider sources to fetch (ohlcv,sentiment,fundamentals). "
"Omit for all. The derived pipeline (S/R, scores, scanner) always recomputes — it's free.",
),
_user: User = Depends(require_access),
db: AsyncSession = Depends(get_db),
):
"""Fetch all data sources for a ticker: OHLCV, sentiment, and fundamentals.
"""Fetch selected data sources for a ticker, then recompute derived data.
Returns a per-source breakdown so the frontend can show exactly what
succeeded and what failed.
Provider calls (which may cost money/quota — sentiment especially) are
limited to ``sources``; the free derived pipeline (S/R, scores, scanner)
always runs so everything stays consistent. Returns a per-source breakdown.
"""
symbol_upper = symbol.strip().upper()
sources: dict[str, dict] = {}
requested = _parse_requested_sources(sources)
sources_out: dict[str, dict] = {}
# If force_refetch is requested, clear old OHLCV and ingestion progress
# so the backfill logic pulls a full year of fresh split-adjusted data.
if force_refetch:
if force_refetch and "ohlcv" in requested:
try:
result = await db.execute(
select(Ticker).where(Ticker.symbol == symbol_upper)
@@ -84,90 +108,98 @@ async def fetch_symbol(
logger.error("force_refetch cleanup failed for %s: %s", symbol_upper, exc)
# --- OHLCV ---
try:
provider = _get_provider()
result = await ingestion_service.fetch_and_ingest(
db, provider, symbol_upper, start_date, end_date
)
sources["ohlcv"] = {
"status": "ok" if result.status in ("complete", "partial") else "error",
"records": result.records_ingested,
"message": result.message,
}
except Exception as exc:
logger.error("OHLCV fetch failed for %s: %s", symbol_upper, exc)
sources["ohlcv"] = {"status": "error", "records": 0, "message": str(exc)}
# --- Sentiment ---
try:
sent_provider = await build_sentiment_provider(db)
except ProviderError as exc:
sent_provider = None
sources["sentiment"] = {"status": "skipped", "message": str(exc)}
if sent_provider is not None:
if "ohlcv" in requested:
try:
data = await sent_provider.fetch_sentiment(symbol_upper)
await sentiment_service.store_sentiment(
db,
symbol=symbol_upper,
classification=data.classification,
confidence=data.confidence,
source=data.source,
timestamp=data.timestamp,
reasoning=data.reasoning,
citations=data.citations,
provider = _get_provider()
result = await ingestion_service.fetch_and_ingest(
db, provider, symbol_upper, start_date, end_date
)
sources["sentiment"] = {
"status": "ok",
"classification": data.classification,
"confidence": data.confidence,
"message": None,
sources_out["ohlcv"] = {
"status": "ok" if result.status in ("complete", "partial") else "error",
"records": result.records_ingested,
"message": result.message,
}
except Exception as exc:
logger.error("Sentiment fetch failed for %s: %s", symbol_upper, exc)
sources["sentiment"] = {"status": "error", "message": str(exc)}
logger.error("OHLCV fetch failed for %s: %s", symbol_upper, exc)
sources_out["ohlcv"] = {"status": "error", "records": 0, "message": str(exc)}
# --- Sentiment ---
if "sentiment" in requested:
try:
sent_provider = await build_sentiment_provider(db)
except ProviderError as exc:
sent_provider = None
sources_out["sentiment"] = {"status": "skipped", "message": str(exc)}
if sent_provider is not None:
try:
data = await sent_provider.fetch_sentiment(symbol_upper)
await sentiment_service.store_sentiment(
db,
symbol=symbol_upper,
classification=data.classification,
confidence=data.confidence,
source=data.source,
timestamp=data.timestamp,
reasoning=data.reasoning,
citations=data.citations,
)
sources_out["sentiment"] = {
"status": "ok",
"classification": data.classification,
"confidence": data.confidence,
"message": None,
}
except Exception as exc:
logger.error("Sentiment fetch failed for %s: %s", symbol_upper, exc)
sources_out["sentiment"] = {"status": "error", "message": str(exc)}
# --- Fundamentals ---
if settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key:
try:
fundamentals_provider = build_fundamental_provider_chain()
fdata = await fundamentals_provider.fetch_fundamentals(symbol_upper)
await fundamental_service.store_fundamental(
db,
symbol=symbol_upper,
pe_ratio=fdata.pe_ratio,
revenue_growth=fdata.revenue_growth,
earnings_surprise=fdata.earnings_surprise,
market_cap=fdata.market_cap,
unavailable_fields=fdata.unavailable_fields,
)
sources["fundamentals"] = {"status": "ok", "message": None}
except Exception as exc:
logger.error("Fundamentals fetch failed for %s: %s", symbol_upper, exc)
sources["fundamentals"] = {"status": "error", "message": str(exc)}
else:
sources["fundamentals"] = {
"status": "skipped",
"message": "No fundamentals provider key configured",
}
if "fundamentals" in requested:
if settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key:
try:
fundamentals_provider = build_fundamental_provider_chain()
fdata = await fundamentals_provider.fetch_fundamentals(symbol_upper)
await fundamental_service.store_fundamental(
db,
symbol=symbol_upper,
pe_ratio=fdata.pe_ratio,
revenue_growth=fdata.revenue_growth,
earnings_surprise=fdata.earnings_surprise,
market_cap=fdata.market_cap,
unavailable_fields=fdata.unavailable_fields,
)
sources_out["fundamentals"] = {"status": "ok", "message": None}
except Exception as exc:
logger.error("Fundamentals fetch failed for %s: %s", symbol_upper, exc)
sources_out["fundamentals"] = {"status": "error", "message": str(exc)}
else:
sources_out["fundamentals"] = {
"status": "skipped",
"message": "No fundamentals provider key configured",
}
# --- Derived pipeline: S/R levels ---
# --- Derived pipeline: S/R levels (free, always) ---
try:
levels = await sr_service.recalculate_sr_levels(db, symbol_upper)
sources["sr_levels"] = {
sources_out["sr_levels"] = {
"status": "ok",
"count": len(levels),
"message": None,
}
except Exception as exc:
logger.error("S/R recalc failed for %s: %s", symbol_upper, exc)
sources["sr_levels"] = {"status": "error", "message": str(exc)}
sources_out["sr_levels"] = {"status": "error", "message": str(exc)}
# --- Derived pipeline: scores ---
# --- Derived pipeline: scores (free, always) ---
# Force a full recompute — fetched data doesn't mark old scores stale, so
# get_score alone would keep returning the previously computed values.
try:
await scoring_service.compute_all_dimensions(db, symbol_upper)
await scoring_service.compute_composite_score(db, symbol_upper)
await db.commit()
score_payload = await scoring_service.get_score(db, symbol_upper)
sources["scores"] = {
sources_out["scores"] = {
"status": "ok",
"composite_score": score_payload.get("composite_score"),
"missing_dimensions": score_payload.get("missing_dimensions", []),
@@ -175,27 +207,27 @@ async def fetch_symbol(
}
except Exception as exc:
logger.error("Score recompute failed for %s: %s", symbol_upper, exc)
sources["scores"] = {"status": "error", "message": str(exc)}
sources_out["scores"] = {"status": "error", "message": str(exc)}
# --- Derived pipeline: scanner ---
# --- Derived pipeline: scanner (free, always) ---
try:
setups = await scan_ticker(
db,
symbol_upper,
rr_threshold=settings.default_rr_threshold,
)
sources["scanner"] = {
sources_out["scanner"] = {
"status": "ok",
"setups_found": len(setups),
"message": None,
}
except Exception as exc:
logger.error("Scanner run failed for %s: %s", symbol_upper, exc)
sources["scanner"] = {"status": "error", "message": str(exc)}
sources_out["scanner"] = {"status": "error", "message": str(exc)}
# Always return success — per-source breakdown tells the full story
return APIEnvelope(
status="success",
data={"symbol": symbol_upper, "sources": sources},
data={"symbol": symbol_upper, "sources": sources_out},
error=None,
)
+37
View File
@@ -0,0 +1,37 @@
"""Lightweight job status for any authenticated user.
The admin Jobs page has full control; this exposes only which scheduled jobs
are currently running (name + progress) so the UI can show a live activity
indicator without admin rights.
"""
from __future__ import annotations
from fastapi import APIRouter, Depends
from app.dependencies import require_access
from app.schemas.common import APIEnvelope
from app.services.admin_service import JOB_LABELS
router = APIRouter(tags=["jobs"])
@router.get("/jobs/running", response_model=APIEnvelope)
async def list_running_jobs(_user=Depends(require_access)) -> APIEnvelope:
"""Return scheduled jobs that are currently running, with progress."""
from app.scheduler import get_job_runtime_snapshot
snapshot = get_job_runtime_snapshot()
running = []
for name, meta in snapshot.items():
if meta.get("running"):
running.append({
"name": name,
"label": JOB_LABELS.get(name, name),
"progress_pct": meta.get("progress_pct"),
"processed": meta.get("processed"),
"total": meta.get("total"),
"current_ticker": meta.get("current_ticker"),
})
running.sort(key=lambda j: j["name"])
return APIEnvelope(status="success", data={"running": running})