Big refactoring
This commit is contained in:
746
app/scheduler.py
746
app/scheduler.py
@@ -15,21 +15,27 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import date, timedelta
|
||||
import asyncio
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import case, func, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import settings
|
||||
from app.database import async_session_factory
|
||||
from app.models.fundamental import FundamentalData
|
||||
from app.models.ohlcv import OHLCVRecord
|
||||
from app.models.settings import SystemSetting
|
||||
from app.models.sentiment import SentimentScore
|
||||
from app.models.ticker import Ticker
|
||||
from app.providers.alpaca import AlpacaOHLCVProvider
|
||||
from app.providers.fmp import FMPFundamentalProvider
|
||||
from app.providers.fundamentals_chain import build_fundamental_provider_chain
|
||||
from app.providers.openai_sentiment import OpenAISentimentProvider
|
||||
from app.providers.protocol import SentimentData
|
||||
from app.services import fundamental_service, ingestion_service, sentiment_service
|
||||
from app.services.rr_scanner_service import scan_all_tickers
|
||||
from app.services.ticker_universe_service import bootstrap_universe
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -43,6 +49,64 @@ _last_successful: dict[str, str | None] = {
|
||||
"fundamental_collector": None,
|
||||
}
|
||||
|
||||
_job_runtime: dict[str, dict[str, object]] = {
|
||||
"data_collector": {
|
||||
"running": False,
|
||||
"status": "idle",
|
||||
"processed": 0,
|
||||
"total": None,
|
||||
"progress_pct": None,
|
||||
"current_ticker": None,
|
||||
"started_at": None,
|
||||
"finished_at": None,
|
||||
"message": None,
|
||||
},
|
||||
"sentiment_collector": {
|
||||
"running": False,
|
||||
"status": "idle",
|
||||
"processed": 0,
|
||||
"total": None,
|
||||
"progress_pct": None,
|
||||
"current_ticker": None,
|
||||
"started_at": None,
|
||||
"finished_at": None,
|
||||
"message": None,
|
||||
},
|
||||
"fundamental_collector": {
|
||||
"running": False,
|
||||
"status": "idle",
|
||||
"processed": 0,
|
||||
"total": None,
|
||||
"progress_pct": None,
|
||||
"current_ticker": None,
|
||||
"started_at": None,
|
||||
"finished_at": None,
|
||||
"message": None,
|
||||
},
|
||||
"rr_scanner": {
|
||||
"running": False,
|
||||
"status": "idle",
|
||||
"processed": 0,
|
||||
"total": None,
|
||||
"progress_pct": None,
|
||||
"current_ticker": None,
|
||||
"started_at": None,
|
||||
"finished_at": None,
|
||||
"message": None,
|
||||
},
|
||||
"ticker_universe_sync": {
|
||||
"running": False,
|
||||
"status": "idle",
|
||||
"processed": 0,
|
||||
"total": None,
|
||||
"progress_pct": None,
|
||||
"current_ticker": None,
|
||||
"started_at": None,
|
||||
"finished_at": None,
|
||||
"message": None,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
@@ -62,6 +126,71 @@ def _log_job_error(job_name: str, ticker: str, error: Exception) -> None:
|
||||
)
|
||||
|
||||
|
||||
def _runtime_start(job_name: str, total: int | None = None, message: str | None = None) -> None:
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
_job_runtime[job_name] = {
|
||||
"running": True,
|
||||
"status": "running",
|
||||
"processed": 0,
|
||||
"total": total,
|
||||
"progress_pct": 0.0 if total and total > 0 else None,
|
||||
"current_ticker": None,
|
||||
"started_at": now,
|
||||
"finished_at": None,
|
||||
"message": message,
|
||||
}
|
||||
|
||||
|
||||
def _runtime_progress(
|
||||
job_name: str,
|
||||
processed: int,
|
||||
total: int | None,
|
||||
current_ticker: str | None = None,
|
||||
message: str | None = None,
|
||||
) -> None:
|
||||
progress_pct: float | None = None
|
||||
if total and total > 0:
|
||||
progress_pct = round((processed / total) * 100.0, 1)
|
||||
runtime = _job_runtime.get(job_name, {})
|
||||
runtime.update({
|
||||
"running": True,
|
||||
"status": "running",
|
||||
"processed": processed,
|
||||
"total": total,
|
||||
"progress_pct": progress_pct,
|
||||
"current_ticker": current_ticker,
|
||||
"message": message,
|
||||
})
|
||||
_job_runtime[job_name] = runtime
|
||||
|
||||
|
||||
def _runtime_finish(
|
||||
job_name: str,
|
||||
status: str,
|
||||
processed: int,
|
||||
total: int | None,
|
||||
message: str | None = None,
|
||||
) -> None:
|
||||
runtime = _job_runtime.get(job_name, {})
|
||||
runtime.update({
|
||||
"running": False,
|
||||
"status": status,
|
||||
"processed": processed,
|
||||
"total": total,
|
||||
"progress_pct": 100.0 if total and processed >= total else runtime.get("progress_pct"),
|
||||
"current_ticker": None,
|
||||
"finished_at": datetime.now(timezone.utc).isoformat(),
|
||||
"message": message,
|
||||
})
|
||||
_job_runtime[job_name] = runtime
|
||||
|
||||
|
||||
def get_job_runtime_snapshot(job_name: str | None = None) -> dict[str, dict[str, object]] | dict[str, object]:
|
||||
if job_name is not None:
|
||||
return dict(_job_runtime.get(job_name, {}))
|
||||
return {name: dict(meta) for name, meta in _job_runtime.items()}
|
||||
|
||||
|
||||
async def _is_job_enabled(db: AsyncSession, job_name: str) -> bool:
|
||||
"""Check SystemSetting for job enabled state. Defaults to True."""
|
||||
key = f"job_{job_name}_enabled"
|
||||
@@ -80,6 +209,61 @@ async def _get_all_tickers(db: AsyncSession) -> list[str]:
|
||||
return list(result.scalars().all())
|
||||
|
||||
|
||||
async def _get_ohlcv_priority_tickers(db: AsyncSession) -> list[str]:
|
||||
"""Return symbols prioritized for OHLCV collection.
|
||||
|
||||
Priority:
|
||||
1) Tickers with no OHLCV bars
|
||||
2) Tickers with data, oldest latest OHLCV date first
|
||||
3) Alphabetical tiebreaker
|
||||
"""
|
||||
latest_date = func.max(OHLCVRecord.date)
|
||||
missing_first = case((latest_date.is_(None), 0), else_=1)
|
||||
result = await db.execute(
|
||||
select(Ticker.symbol)
|
||||
.outerjoin(OHLCVRecord, OHLCVRecord.ticker_id == Ticker.id)
|
||||
.group_by(Ticker.id, Ticker.symbol)
|
||||
.order_by(missing_first.asc(), latest_date.asc(), Ticker.symbol.asc())
|
||||
)
|
||||
return list(result.scalars().all())
|
||||
|
||||
|
||||
async def _get_sentiment_priority_tickers(db: AsyncSession) -> list[str]:
|
||||
"""Return symbols prioritized for sentiment collection.
|
||||
|
||||
Priority:
|
||||
1) Tickers with no sentiment records
|
||||
2) Tickers with records, oldest latest sentiment timestamp first
|
||||
3) Alphabetical tiebreaker
|
||||
"""
|
||||
latest_ts = func.max(SentimentScore.timestamp)
|
||||
missing_first = case((latest_ts.is_(None), 0), else_=1)
|
||||
result = await db.execute(
|
||||
select(Ticker.symbol)
|
||||
.outerjoin(SentimentScore, SentimentScore.ticker_id == Ticker.id)
|
||||
.group_by(Ticker.id, Ticker.symbol)
|
||||
.order_by(missing_first.asc(), latest_ts.asc(), Ticker.symbol.asc())
|
||||
)
|
||||
return list(result.scalars().all())
|
||||
|
||||
|
||||
async def _get_fundamental_priority_tickers(db: AsyncSession) -> list[str]:
|
||||
"""Return symbols prioritized for fundamentals refresh.
|
||||
|
||||
Priority:
|
||||
1) Tickers with no fundamentals snapshot yet
|
||||
2) Tickers with existing fundamentals, oldest fetched_at first
|
||||
3) Alphabetical tiebreaker
|
||||
"""
|
||||
missing_first = case((FundamentalData.fetched_at.is_(None), 0), else_=1)
|
||||
result = await db.execute(
|
||||
select(Ticker.symbol)
|
||||
.outerjoin(FundamentalData, FundamentalData.ticker_id == Ticker.id)
|
||||
.order_by(missing_first.asc(), FundamentalData.fetched_at.asc(), Ticker.symbol.asc())
|
||||
)
|
||||
return list(result.scalars().all())
|
||||
|
||||
|
||||
def _resume_tickers(symbols: list[str], job_name: str) -> list[str]:
|
||||
"""Reorder tickers to resume after the last successful one (rate-limit resume).
|
||||
|
||||
@@ -94,6 +278,11 @@ def _resume_tickers(symbols: list[str], job_name: str) -> list[str]:
|
||||
return symbols[idx + 1:] + symbols[:idx + 1]
|
||||
|
||||
|
||||
def _chunked(symbols: list[str], chunk_size: int) -> list[list[str]]:
|
||||
size = max(1, chunk_size)
|
||||
return [symbols[i:i + size] for i in range(0, len(symbols), size)]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Job: Data Collector (OHLCV)
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -104,68 +293,84 @@ async def collect_ohlcv() -> None:
|
||||
|
||||
Uses AlpacaOHLCVProvider. Processes each ticker independently.
|
||||
On rate limit, records last successful ticker for resume.
|
||||
Start date is resolved by ingestion progress:
|
||||
- existing ticker: resume from last_ingested_date + 1
|
||||
- new ticker: backfill ~1 year by default
|
||||
"""
|
||||
job_name = "data_collector"
|
||||
logger.info(json.dumps({"event": "job_start", "job": job_name}))
|
||||
|
||||
async with async_session_factory() as db:
|
||||
if not await _is_job_enabled(db, job_name):
|
||||
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
|
||||
return
|
||||
|
||||
symbols = await _get_all_tickers(db)
|
||||
if not symbols:
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
|
||||
return
|
||||
|
||||
# Reorder for rate-limit resume
|
||||
symbols = _resume_tickers(symbols, job_name)
|
||||
|
||||
# Build provider (skip if keys not configured)
|
||||
if not settings.alpaca_api_key or not settings.alpaca_api_secret:
|
||||
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "alpaca keys not configured"}))
|
||||
return
|
||||
_runtime_start(job_name)
|
||||
processed = 0
|
||||
total: int | None = None
|
||||
|
||||
try:
|
||||
provider = AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret)
|
||||
except Exception as exc:
|
||||
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
|
||||
return
|
||||
|
||||
end_date = date.today()
|
||||
start_date = end_date - timedelta(days=5) # Fetch last 5 days to catch up
|
||||
processed = 0
|
||||
|
||||
for symbol in symbols:
|
||||
async with async_session_factory() as db:
|
||||
try:
|
||||
result = await ingestion_service.fetch_and_ingest(
|
||||
db, provider, symbol, start_date=start_date, end_date=end_date,
|
||||
)
|
||||
_last_successful[job_name] = symbol
|
||||
processed += 1
|
||||
logger.info(json.dumps({
|
||||
"event": "ticker_collected",
|
||||
"job": job_name,
|
||||
"ticker": symbol,
|
||||
"status": result.status,
|
||||
"records": result.records_ingested,
|
||||
}))
|
||||
if result.status == "partial":
|
||||
# Rate limited — stop and resume next run
|
||||
logger.warning(json.dumps({
|
||||
"event": "rate_limited",
|
||||
if not await _is_job_enabled(db, job_name):
|
||||
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
|
||||
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
|
||||
return
|
||||
|
||||
symbols = await _get_ohlcv_priority_tickers(db)
|
||||
if not symbols:
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
|
||||
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
|
||||
return
|
||||
|
||||
total = len(symbols)
|
||||
_runtime_progress(job_name, processed=0, total=total)
|
||||
|
||||
# Build provider (skip if keys not configured)
|
||||
if not settings.alpaca_api_key or not settings.alpaca_api_secret:
|
||||
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "alpaca keys not configured"}))
|
||||
_runtime_finish(job_name, "skipped", processed=0, total=total, message="Alpaca keys not configured")
|
||||
return
|
||||
|
||||
try:
|
||||
provider = AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret)
|
||||
except Exception as exc:
|
||||
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
|
||||
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
|
||||
return
|
||||
|
||||
end_date = date.today()
|
||||
|
||||
for symbol in symbols:
|
||||
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
|
||||
async with async_session_factory() as db:
|
||||
try:
|
||||
result = await ingestion_service.fetch_and_ingest(
|
||||
db, provider, symbol, start_date=None, end_date=end_date,
|
||||
)
|
||||
_last_successful[job_name] = symbol
|
||||
processed += 1
|
||||
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
|
||||
logger.info(json.dumps({
|
||||
"event": "ticker_collected",
|
||||
"job": job_name,
|
||||
"ticker": symbol,
|
||||
"processed": processed,
|
||||
"status": result.status,
|
||||
"records": result.records_ingested,
|
||||
}))
|
||||
return
|
||||
except Exception as exc:
|
||||
_log_job_error(job_name, symbol, exc)
|
||||
if result.status == "partial":
|
||||
# Rate limited — stop and resume next run
|
||||
logger.warning(json.dumps({
|
||||
"event": "rate_limited",
|
||||
"job": job_name,
|
||||
"ticker": symbol,
|
||||
"processed": processed,
|
||||
}))
|
||||
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}")
|
||||
return
|
||||
except Exception as exc:
|
||||
_log_job_error(job_name, symbol, exc)
|
||||
|
||||
# Reset resume pointer on full completion
|
||||
_last_successful[job_name] = None
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
|
||||
# Reset resume pointer on full completion
|
||||
_last_successful[job_name] = None
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
|
||||
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
|
||||
except Exception as exc:
|
||||
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
|
||||
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -181,68 +386,119 @@ async def collect_sentiment() -> None:
|
||||
"""
|
||||
job_name = "sentiment_collector"
|
||||
logger.info(json.dumps({"event": "job_start", "job": job_name}))
|
||||
|
||||
async with async_session_factory() as db:
|
||||
if not await _is_job_enabled(db, job_name):
|
||||
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
|
||||
return
|
||||
|
||||
symbols = await _get_all_tickers(db)
|
||||
if not symbols:
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
|
||||
return
|
||||
|
||||
symbols = _resume_tickers(symbols, job_name)
|
||||
|
||||
if not settings.openai_api_key:
|
||||
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "openai key not configured"}))
|
||||
return
|
||||
_runtime_start(job_name)
|
||||
processed = 0
|
||||
total: int | None = None
|
||||
|
||||
try:
|
||||
provider = OpenAISentimentProvider(settings.openai_api_key, settings.openai_model)
|
||||
async with async_session_factory() as db:
|
||||
if not await _is_job_enabled(db, job_name):
|
||||
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
|
||||
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
|
||||
return
|
||||
|
||||
symbols = await _get_sentiment_priority_tickers(db)
|
||||
if not symbols:
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
|
||||
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
|
||||
return
|
||||
|
||||
total = len(symbols)
|
||||
_runtime_progress(job_name, processed=0, total=total)
|
||||
|
||||
if not settings.openai_api_key:
|
||||
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "openai key not configured"}))
|
||||
_runtime_finish(job_name, "skipped", processed=0, total=total, message="OpenAI key not configured")
|
||||
return
|
||||
|
||||
try:
|
||||
provider = OpenAISentimentProvider(settings.openai_api_key, settings.openai_model)
|
||||
except Exception as exc:
|
||||
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
|
||||
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
|
||||
return
|
||||
|
||||
batch_size = max(1, settings.openai_sentiment_batch_size)
|
||||
batches = _chunked(symbols, batch_size)
|
||||
|
||||
for batch in batches:
|
||||
current_hint = batch[0] if len(batch) == 1 else f"{batch[0]} (+{len(batch) - 1})"
|
||||
_runtime_progress(job_name, processed=processed, total=total, current_ticker=current_hint)
|
||||
|
||||
batch_results: dict[str, SentimentData] = {}
|
||||
if len(batch) > 1 and hasattr(provider, "fetch_sentiment_batch"):
|
||||
try:
|
||||
batch_results = await provider.fetch_sentiment_batch(batch)
|
||||
except Exception as exc:
|
||||
msg = str(exc).lower()
|
||||
if "rate" in msg or "quota" in msg or "429" in msg:
|
||||
logger.warning(json.dumps({
|
||||
"event": "rate_limited",
|
||||
"job": job_name,
|
||||
"ticker": batch[0],
|
||||
"processed": processed,
|
||||
}))
|
||||
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {batch[0]}")
|
||||
return
|
||||
logger.warning(json.dumps({
|
||||
"event": "batch_fallback",
|
||||
"job": job_name,
|
||||
"batch": batch,
|
||||
"reason": str(exc),
|
||||
}))
|
||||
|
||||
for symbol in batch:
|
||||
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
|
||||
data = batch_results.get(symbol) if batch_results else None
|
||||
|
||||
if data is None:
|
||||
try:
|
||||
data = await provider.fetch_sentiment(symbol)
|
||||
except Exception as exc:
|
||||
msg = str(exc).lower()
|
||||
if "rate" in msg or "quota" in msg or "429" in msg:
|
||||
logger.warning(json.dumps({
|
||||
"event": "rate_limited",
|
||||
"job": job_name,
|
||||
"ticker": symbol,
|
||||
"processed": processed,
|
||||
}))
|
||||
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}")
|
||||
return
|
||||
_log_job_error(job_name, symbol, exc)
|
||||
continue
|
||||
|
||||
async with async_session_factory() as db:
|
||||
try:
|
||||
await sentiment_service.store_sentiment(
|
||||
db,
|
||||
symbol=symbol,
|
||||
classification=data.classification,
|
||||
confidence=data.confidence,
|
||||
source=data.source,
|
||||
timestamp=data.timestamp,
|
||||
reasoning=data.reasoning,
|
||||
citations=data.citations,
|
||||
)
|
||||
_last_successful[job_name] = symbol
|
||||
processed += 1
|
||||
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
|
||||
logger.info(json.dumps({
|
||||
"event": "ticker_collected",
|
||||
"job": job_name,
|
||||
"ticker": symbol,
|
||||
"classification": data.classification,
|
||||
"confidence": data.confidence,
|
||||
}))
|
||||
except Exception as exc:
|
||||
_log_job_error(job_name, symbol, exc)
|
||||
|
||||
_last_successful[job_name] = None
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
|
||||
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
|
||||
except Exception as exc:
|
||||
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
|
||||
return
|
||||
|
||||
processed = 0
|
||||
|
||||
for symbol in symbols:
|
||||
async with async_session_factory() as db:
|
||||
try:
|
||||
data = await provider.fetch_sentiment(symbol)
|
||||
await sentiment_service.store_sentiment(
|
||||
db,
|
||||
symbol=symbol,
|
||||
classification=data.classification,
|
||||
confidence=data.confidence,
|
||||
source=data.source,
|
||||
timestamp=data.timestamp,
|
||||
reasoning=data.reasoning,
|
||||
citations=data.citations,
|
||||
)
|
||||
_last_successful[job_name] = symbol
|
||||
processed += 1
|
||||
logger.info(json.dumps({
|
||||
"event": "ticker_collected",
|
||||
"job": job_name,
|
||||
"ticker": symbol,
|
||||
"classification": data.classification,
|
||||
"confidence": data.confidence,
|
||||
}))
|
||||
except Exception as exc:
|
||||
msg = str(exc).lower()
|
||||
if "rate" in msg or "quota" in msg or "429" in msg:
|
||||
logger.warning(json.dumps({
|
||||
"event": "rate_limited",
|
||||
"job": job_name,
|
||||
"ticker": symbol,
|
||||
"processed": processed,
|
||||
}))
|
||||
return
|
||||
_log_job_error(job_name, symbol, exc)
|
||||
|
||||
_last_successful[job_name] = None
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
|
||||
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -258,65 +514,114 @@ async def collect_fundamentals() -> None:
|
||||
"""
|
||||
job_name = "fundamental_collector"
|
||||
logger.info(json.dumps({"event": "job_start", "job": job_name}))
|
||||
|
||||
async with async_session_factory() as db:
|
||||
if not await _is_job_enabled(db, job_name):
|
||||
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
|
||||
return
|
||||
|
||||
symbols = await _get_all_tickers(db)
|
||||
if not symbols:
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
|
||||
return
|
||||
|
||||
symbols = _resume_tickers(symbols, job_name)
|
||||
|
||||
if not settings.fmp_api_key:
|
||||
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "fmp key not configured"}))
|
||||
return
|
||||
_runtime_start(job_name)
|
||||
processed = 0
|
||||
total: int | None = None
|
||||
|
||||
try:
|
||||
provider = FMPFundamentalProvider(settings.fmp_api_key)
|
||||
except Exception as exc:
|
||||
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
|
||||
return
|
||||
|
||||
processed = 0
|
||||
|
||||
for symbol in symbols:
|
||||
async with async_session_factory() as db:
|
||||
try:
|
||||
data = await provider.fetch_fundamentals(symbol)
|
||||
await fundamental_service.store_fundamental(
|
||||
db,
|
||||
symbol=symbol,
|
||||
pe_ratio=data.pe_ratio,
|
||||
revenue_growth=data.revenue_growth,
|
||||
earnings_surprise=data.earnings_surprise,
|
||||
market_cap=data.market_cap,
|
||||
unavailable_fields=data.unavailable_fields,
|
||||
)
|
||||
_last_successful[job_name] = symbol
|
||||
processed += 1
|
||||
logger.info(json.dumps({
|
||||
"event": "ticker_collected",
|
||||
"job": job_name,
|
||||
"ticker": symbol,
|
||||
}))
|
||||
except Exception as exc:
|
||||
msg = str(exc).lower()
|
||||
if "rate" in msg or "429" in msg:
|
||||
logger.warning(json.dumps({
|
||||
"event": "rate_limited",
|
||||
if not await _is_job_enabled(db, job_name):
|
||||
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
|
||||
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
|
||||
return
|
||||
|
||||
symbols = await _get_fundamental_priority_tickers(db)
|
||||
if not symbols:
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
|
||||
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
|
||||
return
|
||||
|
||||
total = len(symbols)
|
||||
_runtime_progress(job_name, processed=0, total=total)
|
||||
|
||||
if not (settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key):
|
||||
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "no fundamentals provider keys configured"}))
|
||||
_runtime_finish(job_name, "skipped", processed=0, total=total, message="No fundamentals provider keys configured")
|
||||
return
|
||||
|
||||
try:
|
||||
provider = build_fundamental_provider_chain()
|
||||
except Exception as exc:
|
||||
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
|
||||
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
|
||||
return
|
||||
|
||||
max_retries = max(0, settings.fundamental_rate_limit_retries)
|
||||
base_backoff = max(1, settings.fundamental_rate_limit_backoff_seconds)
|
||||
|
||||
for symbol in symbols:
|
||||
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
|
||||
attempt = 0
|
||||
while True:
|
||||
try:
|
||||
data = await provider.fetch_fundamentals(symbol)
|
||||
async with async_session_factory() as db:
|
||||
await fundamental_service.store_fundamental(
|
||||
db,
|
||||
symbol=symbol,
|
||||
pe_ratio=data.pe_ratio,
|
||||
revenue_growth=data.revenue_growth,
|
||||
earnings_surprise=data.earnings_surprise,
|
||||
market_cap=data.market_cap,
|
||||
unavailable_fields=data.unavailable_fields,
|
||||
)
|
||||
_last_successful[job_name] = symbol
|
||||
processed += 1
|
||||
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
|
||||
logger.info(json.dumps({
|
||||
"event": "ticker_collected",
|
||||
"job": job_name,
|
||||
"ticker": symbol,
|
||||
"processed": processed,
|
||||
}))
|
||||
return
|
||||
_log_job_error(job_name, symbol, exc)
|
||||
break
|
||||
except Exception as exc:
|
||||
msg = str(exc).lower()
|
||||
if "rate" in msg or "429" in msg:
|
||||
if attempt < max_retries:
|
||||
wait_seconds = base_backoff * (2 ** attempt)
|
||||
attempt += 1
|
||||
logger.warning(json.dumps({
|
||||
"event": "rate_limited_retry",
|
||||
"job": job_name,
|
||||
"ticker": symbol,
|
||||
"attempt": attempt,
|
||||
"max_retries": max_retries,
|
||||
"wait_seconds": wait_seconds,
|
||||
"processed": processed,
|
||||
}))
|
||||
_runtime_progress(
|
||||
job_name,
|
||||
processed=processed,
|
||||
total=total,
|
||||
current_ticker=symbol,
|
||||
message=f"Rate-limited at {symbol}; retry {attempt}/{max_retries} in {wait_seconds}s",
|
||||
)
|
||||
await asyncio.sleep(wait_seconds)
|
||||
continue
|
||||
|
||||
_last_successful[job_name] = None
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
|
||||
logger.warning(json.dumps({
|
||||
"event": "rate_limited",
|
||||
"job": job_name,
|
||||
"ticker": symbol,
|
||||
"processed": processed,
|
||||
}))
|
||||
_runtime_finish(
|
||||
job_name,
|
||||
"rate_limited",
|
||||
processed=processed,
|
||||
total=total,
|
||||
message=f"Rate limited at {symbol} after {attempt} retries",
|
||||
)
|
||||
return
|
||||
_log_job_error(job_name, symbol, exc)
|
||||
break
|
||||
|
||||
_last_successful[job_name] = None
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
|
||||
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
|
||||
except Exception as exc:
|
||||
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
|
||||
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -332,28 +637,90 @@ async def scan_rr() -> None:
|
||||
"""
|
||||
job_name = "rr_scanner"
|
||||
logger.info(json.dumps({"event": "job_start", "job": job_name}))
|
||||
_runtime_start(job_name)
|
||||
processed = 0
|
||||
total: int | None = None
|
||||
|
||||
async with async_session_factory() as db:
|
||||
if not await _is_job_enabled(db, job_name):
|
||||
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
|
||||
return
|
||||
try:
|
||||
async with async_session_factory() as db:
|
||||
if not await _is_job_enabled(db, job_name):
|
||||
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
|
||||
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
|
||||
return
|
||||
|
||||
try:
|
||||
setups = await scan_all_tickers(
|
||||
db, rr_threshold=settings.default_rr_threshold,
|
||||
symbols = await _get_all_tickers(db)
|
||||
total = len(symbols)
|
||||
_runtime_progress(job_name, processed=0, total=total)
|
||||
|
||||
try:
|
||||
setups = await scan_all_tickers(
|
||||
db, rr_threshold=settings.default_rr_threshold,
|
||||
)
|
||||
processed = total or 0
|
||||
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Found {len(setups)} setups")
|
||||
logger.info(json.dumps({
|
||||
"event": "job_complete",
|
||||
"job": job_name,
|
||||
"setups_found": len(setups),
|
||||
}))
|
||||
except Exception as exc:
|
||||
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
|
||||
logger.error(json.dumps({
|
||||
"event": "job_error",
|
||||
"job": job_name,
|
||||
"error_type": type(exc).__name__,
|
||||
"message": str(exc),
|
||||
}))
|
||||
except Exception as exc:
|
||||
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
|
||||
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Job: Ticker Universe Sync
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def sync_ticker_universe() -> None:
|
||||
"""Sync tracked tickers from configured default universe.
|
||||
|
||||
Setting key: ticker_universe_default (sp500 | nasdaq100 | nasdaq_all)
|
||||
"""
|
||||
job_name = "ticker_universe_sync"
|
||||
logger.info(json.dumps({"event": "job_start", "job": job_name}))
|
||||
_runtime_start(job_name, total=1)
|
||||
|
||||
try:
|
||||
async with async_session_factory() as db:
|
||||
if not await _is_job_enabled(db, job_name):
|
||||
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
|
||||
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
|
||||
return
|
||||
|
||||
result = await db.execute(
|
||||
select(SystemSetting).where(SystemSetting.key == "ticker_universe_default")
|
||||
)
|
||||
logger.info(json.dumps({
|
||||
"event": "job_complete",
|
||||
"job": job_name,
|
||||
"setups_found": len(setups),
|
||||
}))
|
||||
except Exception as exc:
|
||||
logger.error(json.dumps({
|
||||
"event": "job_error",
|
||||
"job": job_name,
|
||||
"error_type": type(exc).__name__,
|
||||
"message": str(exc),
|
||||
}))
|
||||
setting = result.scalar_one_or_none()
|
||||
universe = (setting.value if setting else "sp500").strip().lower()
|
||||
|
||||
async with async_session_factory() as db:
|
||||
summary = await bootstrap_universe(db, universe, prune_missing=False)
|
||||
_runtime_progress(job_name, processed=1, total=1)
|
||||
_runtime_finish(job_name, "completed", processed=1, total=1, message=f"Synced {universe}")
|
||||
logger.info(json.dumps({
|
||||
"event": "job_complete",
|
||||
"job": job_name,
|
||||
"universe": universe,
|
||||
"summary": summary,
|
||||
}))
|
||||
except Exception as exc:
|
||||
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
|
||||
logger.error(json.dumps({
|
||||
"event": "job_error",
|
||||
"job": job_name,
|
||||
"error_type": type(exc).__name__,
|
||||
"message": str(exc),
|
||||
}))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -427,6 +794,16 @@ def configure_scheduler() -> None:
|
||||
replace_existing=True,
|
||||
)
|
||||
|
||||
# Universe Sync — nightly
|
||||
scheduler.add_job(
|
||||
sync_ticker_universe,
|
||||
"interval",
|
||||
hours=24,
|
||||
id="ticker_universe_sync",
|
||||
name="Ticker Universe Sync",
|
||||
replace_existing=True,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
json.dumps({
|
||||
"event": "scheduler_configured",
|
||||
@@ -435,6 +812,7 @@ def configure_scheduler() -> None:
|
||||
"sentiment_collector": {"minutes": settings.sentiment_poll_interval_minutes},
|
||||
"fundamental_collector": fund_interval,
|
||||
"rr_scanner": rr_interval,
|
||||
"ticker_universe_sync": {"hours": 24},
|
||||
},
|
||||
})
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user