refactor: dedupe scheduler logging/runtime, centralize SystemSetting access, fix rankings N+1
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 42s
Deploy / deploy (push) Successful in 27s

Behavior-preserving cleanup (345 tests pass, ruff clean):

- scheduler: replace 62 inline logger.x(json.dumps({...})) calls with a
  _log_event helper, and collapse 11 identical _job_runtime dicts into an
  _idle_runtime() factory over _JOB_NAMES.
- settings: add app/services/settings_store.py (get_setting/get_value/get_map/
  upsert_setting) and route ~13 hand-rolled SystemSetting queries + two
  identical _settings_map helpers through it.
- scoring.get_rankings: collapse the per-ticker N+1 (3-4 queries + a commit each)
  into 2 bulk reads + a single conditional commit; drop the redundant re-fetch.
  Lazy recompute-on-read is preserved. Adds first tests for get_rankings.

Net ~ -245 lines across the touched modules.
This commit is contained in:
2026-06-24 11:23:39 +02:00
parent f48d8705de
commit 437ceacfc1
11 changed files with 341 additions and 465 deletions
+103 -318
View File
@@ -27,14 +27,13 @@ from app.config import settings
from app.database import async_session_factory
from app.models.fundamental import FundamentalData
from app.models.ohlcv import OHLCVRecord
from app.models.settings import SystemSetting
from app.models.sentiment import SentimentScore
from app.models.ticker import Ticker
from app.exceptions import ProviderError
from app.providers.alpaca import AlpacaOHLCVProvider
from app.providers.fundamentals_chain import build_fundamental_provider_chain
from app.providers.protocol import SentimentData
from app.services import fundamental_service, ingestion_service, sentiment_service
from app.services import fundamental_service, ingestion_service, sentiment_service, settings_store
from app.services.alert_service import dispatch_alerts
from app.services.backtest_service import run_and_store as run_backtest_and_store
from app.services.market_regime_service import update_market_regime
@@ -70,8 +69,25 @@ _last_successful: dict[str, str | None] = {
"fundamental_collector": None,
}
_job_runtime: dict[str, dict[str, object]] = {
"data_collector": {
# Jobs whose per-run progress is surfaced to Admin → Jobs. (outcome_evaluator is
# created lazily on first run via _runtime_start.)
_JOB_NAMES = [
"data_collector",
"data_backfill",
"sentiment_collector",
"fundamental_collector",
"rr_scanner",
"ticker_universe_sync",
"alerts",
"market_regime",
"backtest",
"daily_pipeline",
"intraday_pipeline",
]
def _idle_runtime() -> dict[str, object]:
return {
"running": False,
"status": "idle",
"processed": 0,
@@ -81,118 +97,10 @@ _job_runtime: dict[str, dict[str, object]] = {
"started_at": None,
"finished_at": None,
"message": None,
},
"data_backfill": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"sentiment_collector": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"fundamental_collector": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"rr_scanner": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"ticker_universe_sync": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"alerts": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"market_regime": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"backtest": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"daily_pipeline": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"intraday_pipeline": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
}
}
_job_runtime: dict[str, dict[str, object]] = {name: _idle_runtime() for name in _JOB_NAMES}
# ---------------------------------------------------------------------------
@@ -200,30 +108,27 @@ _job_runtime: dict[str, dict[str, object]] = {
# ---------------------------------------------------------------------------
def _log_event(level: int, event: str, **fields: object) -> None:
"""Emit a structured JSON log line: {"event": ..., **fields}."""
logger.log(level, json.dumps({"event": event, **fields}))
def _log_job_error(job_name: str, ticker: str, error: Exception) -> None:
"""Log a job error as structured JSON."""
logger.error(
json.dumps({
"event": "job_error",
"job": job_name,
"ticker": ticker,
"error_type": type(error).__name__,
"message": str(error),
})
"""Log a per-ticker job error as structured JSON."""
_log_event(
logging.ERROR, "job_error", job=job_name, ticker=ticker,
error_type=type(error).__name__, message=str(error),
)
def _runtime_start(job_name: str, total: int | None = None, message: str | None = None) -> None:
now = datetime.now(timezone.utc).isoformat()
_job_runtime[job_name] = {
**_idle_runtime(),
"running": True,
"status": "running",
"processed": 0,
"total": total,
"progress_pct": 0.0 if total and total > 0 else None,
"current_ticker": None,
"started_at": now,
"finished_at": None,
"started_at": datetime.now(timezone.utc).isoformat(),
"message": message,
}
@@ -280,14 +185,8 @@ def get_job_runtime_snapshot(job_name: str | None = None) -> dict[str, dict[str,
async def _is_job_enabled(db: AsyncSession, job_name: str) -> bool:
"""Check SystemSetting for job enabled state. Defaults to True."""
key = f"job_{job_name}_enabled"
result = await db.execute(
select(SystemSetting).where(SystemSetting.key == key)
)
setting = result.scalar_one_or_none()
if setting is None:
return True
return setting.value.lower() == "true"
setting = await settings_store.get_setting(db, f"job_{job_name}_enabled")
return setting is None or setting.value.lower() == "true"
async def _get_all_tickers(db: AsyncSession) -> list[str]:
@@ -418,7 +317,7 @@ async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_colle
the manual data_backfill job to deepen shallow histories. ``job_name`` lets the
backfill report its own runtime/resume state separate from data_collector.
"""
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name)
processed = 0
total: int | None = None
@@ -426,13 +325,13 @@ async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_colle
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
symbols = await _get_ohlcv_priority_tickers(db)
if not symbols:
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
_log_event(logging.INFO, "job_complete", job=job_name, tickers=0)
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
return
@@ -441,14 +340,14 @@ async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_colle
# Build provider (skip if keys not configured)
if not settings.alpaca_api_key or not settings.alpaca_api_secret:
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "alpaca keys not configured"}))
_log_event(logging.WARNING, "job_skipped", job=job_name, reason="alpaca keys not configured")
_runtime_finish(job_name, "skipped", processed=0, total=total, message="Alpaca keys not configured")
return
try:
provider = AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret)
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
return
@@ -469,21 +368,10 @@ async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_colle
_last_successful[job_name] = symbol
processed += 1
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
logger.info(json.dumps({
"event": "ticker_collected",
"job": job_name,
"ticker": symbol,
"status": result.status,
"records": result.records_ingested,
}))
_log_event(logging.INFO, "ticker_collected", job=job_name, ticker=symbol, status=result.status, records=result.records_ingested)
if result.status == "partial":
# Rate limited — stop and resume next run
logger.warning(json.dumps({
"event": "rate_limited",
"job": job_name,
"ticker": symbol,
"processed": processed,
}))
_log_event(logging.WARNING, "rate_limited", job=job_name, ticker=symbol, processed=processed)
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}")
return
except Exception as exc:
@@ -491,10 +379,10 @@ async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_colle
# Reset resume pointer on full completion
_last_successful[job_name] = None
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
_log_event(logging.INFO, "job_complete", job=job_name, tickers=processed)
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
@@ -521,7 +409,7 @@ async def collect_sentiment() -> None:
successful ticker for resume.
"""
job_name = "sentiment_collector"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name)
processed = 0
total: int | None = None
@@ -529,13 +417,13 @@ async def collect_sentiment() -> None:
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
symbols = await _get_sentiment_priority_tickers(db)
if not symbols:
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
_log_event(logging.INFO, "job_complete", job=job_name, tickers=0)
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
return
@@ -546,11 +434,11 @@ async def collect_sentiment() -> None:
async with async_session_factory() as cfg_db:
provider = await build_sentiment_provider(cfg_db)
except ProviderError as exc:
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": str(exc)}))
_log_event(logging.WARNING, "job_skipped", job=job_name, reason=str(exc))
_runtime_finish(job_name, "skipped", processed=0, total=total, message=str(exc))
return
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
return
@@ -568,20 +456,10 @@ async def collect_sentiment() -> None:
except Exception as exc:
msg = str(exc).lower()
if "rate" in msg or "quota" in msg or "429" in msg:
logger.warning(json.dumps({
"event": "rate_limited",
"job": job_name,
"ticker": batch[0],
"processed": processed,
}))
_log_event(logging.WARNING, "rate_limited", job=job_name, ticker=batch[0], processed=processed)
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {batch[0]}")
return
logger.warning(json.dumps({
"event": "batch_fallback",
"job": job_name,
"batch": batch,
"reason": str(exc),
}))
_log_event(logging.WARNING, "batch_fallback", job=job_name, batch=batch, reason=str(exc))
for symbol in batch:
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
@@ -593,12 +471,7 @@ async def collect_sentiment() -> None:
except Exception as exc:
msg = str(exc).lower()
if "rate" in msg or "quota" in msg or "429" in msg:
logger.warning(json.dumps({
"event": "rate_limited",
"job": job_name,
"ticker": symbol,
"processed": processed,
}))
_log_event(logging.WARNING, "rate_limited", job=job_name, ticker=symbol, processed=processed)
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}")
return
_log_job_error(job_name, symbol, exc)
@@ -620,21 +493,15 @@ async def collect_sentiment() -> None:
_last_successful[job_name] = symbol
processed += 1
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
logger.info(json.dumps({
"event": "ticker_collected",
"job": job_name,
"ticker": symbol,
"classification": data.classification,
"confidence": data.confidence,
}))
_log_event(logging.INFO, "ticker_collected", job=job_name, ticker=symbol, classification=data.classification, confidence=data.confidence)
except Exception as exc:
_log_job_error(job_name, symbol, exc)
_last_successful[job_name] = None
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
_log_event(logging.INFO, "job_complete", job=job_name, tickers=processed)
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
@@ -650,7 +517,7 @@ async def collect_fundamentals() -> None:
successful ticker for resume.
"""
job_name = "fundamental_collector"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name)
processed = 0
total: int | None = None
@@ -658,13 +525,13 @@ async def collect_fundamentals() -> None:
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
symbols = await _get_fundamental_priority_tickers(db)
if not symbols:
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
_log_event(logging.INFO, "job_complete", job=job_name, tickers=0)
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
return
@@ -672,14 +539,14 @@ async def collect_fundamentals() -> None:
_runtime_progress(job_name, processed=0, total=total)
if not (settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key):
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "no fundamentals provider keys configured"}))
_log_event(logging.WARNING, "job_skipped", job=job_name, reason="no fundamentals provider keys configured")
_runtime_finish(job_name, "skipped", processed=0, total=total, message="No fundamentals provider keys configured")
return
try:
provider = build_fundamental_provider_chain()
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
return
@@ -710,11 +577,7 @@ async def collect_fundamentals() -> None:
_last_successful[job_name] = symbol
processed += 1
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
logger.info(json.dumps({
"event": "ticker_collected",
"job": job_name,
"ticker": symbol,
}))
_log_event(logging.INFO, "ticker_collected", job=job_name, ticker=symbol)
break
except Exception as exc:
msg = str(exc).lower()
@@ -722,15 +585,7 @@ async def collect_fundamentals() -> None:
if attempt < max_retries:
wait_seconds = base_backoff * (2 ** attempt)
attempt += 1
logger.warning(json.dumps({
"event": "rate_limited_retry",
"job": job_name,
"ticker": symbol,
"attempt": attempt,
"max_retries": max_retries,
"wait_seconds": wait_seconds,
"processed": processed,
}))
_log_event(logging.WARNING, "rate_limited_retry", job=job_name, ticker=symbol, attempt=attempt, max_retries=max_retries, wait_seconds=wait_seconds, processed=processed)
_runtime_progress(
job_name,
processed=processed,
@@ -745,12 +600,7 @@ async def collect_fundamentals() -> None:
# still get (e.g. FMP market cap) and move on, rather than
# aborting the whole run and leaving every later ticker
# untouched.
logger.warning(json.dumps({
"event": "rate_limited_partial",
"job": job_name,
"ticker": symbol,
"processed": processed,
}))
_log_event(logging.WARNING, "rate_limited_partial", job=job_name, ticker=symbol, processed=processed)
try:
data = await provider.fetch_fundamentals(symbol, allow_partial=True)
await _store(symbol, data)
@@ -765,10 +615,10 @@ async def collect_fundamentals() -> None:
await asyncio.sleep(spacing)
_last_successful[job_name] = None
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
_log_event(logging.INFO, "job_complete", job=job_name, tickers=processed)
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
@@ -784,7 +634,7 @@ async def scan_rr() -> None:
per-ticker error isolation internally.
"""
job_name = "rr_scanner"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name)
processed = 0
total: int | None = None
@@ -792,7 +642,7 @@ async def scan_rr() -> None:
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
@@ -812,21 +662,12 @@ async def scan_rr() -> None:
)
processed = total or 0
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Found {len(setups)} setups")
logger.info(json.dumps({
"event": "job_complete",
"job": job_name,
"setups_found": len(setups),
}))
_log_event(logging.INFO, "job_complete", job=job_name, setups_found=len(setups))
except Exception as exc:
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
logger.error(json.dumps({
"event": "job_error",
"job": job_name,
"error_type": type(exc).__name__,
"message": str(exc),
}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
@@ -842,13 +683,13 @@ async def evaluate_outcomes() -> None:
Undecided setups stay pending and are re-checked on the next run.
"""
job_name = "outcome_evaluator"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
@@ -864,19 +705,10 @@ async def evaluate_outcomes() -> None:
message=f"Evaluated {summary['evaluated']}, pending {summary['still_pending']}, "
f"{closed_trades} paper trade(s) closed",
)
logger.info(json.dumps({
"event": "job_complete",
"job": job_name,
"summary": summary,
}))
_log_event(logging.INFO, "job_complete", job=job_name, summary=summary)
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
logger.error(json.dumps({
"event": "job_error",
"job": job_name,
"error_type": type(exc).__name__,
"message": str(exc),
}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
@@ -887,13 +719,13 @@ async def evaluate_outcomes() -> None:
async def dispatch_alerts_job() -> None:
"""Push Telegram alerts for qualified setups, S/R proximity, score drops, digest."""
job_name = "alerts"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
@@ -904,15 +736,10 @@ async def dispatch_alerts_job() -> None:
job_name, "completed", processed=1, total=1,
message=f"{result.get('status')}, sent {result.get('sent', 0)}",
)
logger.info(json.dumps({"event": "job_complete", "job": job_name, "result": result}))
_log_event(logging.INFO, "job_complete", job=job_name, result=result)
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
logger.error(json.dumps({
"event": "job_error",
"job": job_name,
"error_type": type(exc).__name__,
"message": str(exc),
}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
@@ -923,13 +750,13 @@ async def dispatch_alerts_job() -> None:
async def compute_market_regime() -> None:
"""Refresh the cached benchmark (SPY) trend regime."""
job_name = "market_regime"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
@@ -940,15 +767,10 @@ async def compute_market_regime() -> None:
job_name, "completed", processed=1, total=1,
message=f"Regime: {regime.get('label')}",
)
logger.info(json.dumps({"event": "job_complete", "job": job_name, "label": regime.get("label")}))
_log_event(logging.INFO, "job_complete", job=job_name, label=regime.get("label"))
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
logger.error(json.dumps({
"event": "job_error",
"job": job_name,
"error_type": type(exc).__name__,
"message": str(exc),
}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
@@ -959,7 +781,7 @@ async def compute_market_regime() -> None:
async def run_backtest_job() -> None:
"""Replay the price-derived engine over history and cache the report."""
job_name = "backtest"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name)
def _on_progress(done: int, count: int, symbol: str) -> None:
@@ -968,7 +790,7 @@ async def run_backtest_job() -> None:
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
@@ -979,15 +801,10 @@ async def run_backtest_job() -> None:
processed=report.get("tickers", 0), total=report.get("tickers", 0),
message=f"{report.get('candidates', 0)} setups, {report.get('qualified', 0)} qualified",
)
logger.info(json.dumps({"event": "job_complete", "job": job_name, "candidates": report.get("candidates")}))
_log_event(logging.INFO, "job_complete", job=job_name, candidates=report.get("candidates"))
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=None, message=str(exc))
logger.error(json.dumps({
"event": "job_error",
"job": job_name,
"error_type": type(exc).__name__,
"message": str(exc),
}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
@@ -1001,40 +818,26 @@ async def sync_ticker_universe() -> None:
Setting key: ticker_universe_default (sp500 | nasdaq100 | nasdaq_all)
"""
job_name = "ticker_universe_sync"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
result = await db.execute(
select(SystemSetting).where(SystemSetting.key == "ticker_universe_default")
)
setting = result.scalar_one_or_none()
universe = (setting.value if setting else "sp500").strip().lower()
universe = (await settings_store.get_value(db, "ticker_universe_default", "sp500")).strip().lower()
async with async_session_factory() as db:
summary = await bootstrap_universe(db, universe, prune_missing=False)
_runtime_progress(job_name, processed=1, total=1)
_runtime_finish(job_name, "completed", processed=1, total=1, message=f"Synced {universe}")
logger.info(json.dumps({
"event": "job_complete",
"job": job_name,
"universe": universe,
"summary": summary,
}))
_log_event(logging.INFO, "job_complete", job=job_name, universe=universe, summary=summary)
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
logger.error(json.dumps({
"event": "job_error",
"job": job_name,
"error_type": type(exc).__name__,
"message": str(exc),
}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
@@ -1070,10 +873,10 @@ async def _run_pipeline(job_name: str, steps: list[tuple[str, str]]) -> None:
Each step respects its own enable flag and manages its own runtime status; a
failing step is logged and the pipeline continues with the next one.
"""
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_log_event(logging.INFO, "job_start", job=job_name)
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
@@ -1091,13 +894,10 @@ async def _run_pipeline(job_name: str, steps: list[tuple[str, str]]) -> None:
logger.exception("%s step %s failed", job_name, step_name)
done += 1
_runtime_finish(job_name, "completed", processed=done, total=total, message="Pipeline complete")
logger.info(json.dumps({"event": "job_complete", "job": job_name}))
_log_event(logging.INFO, "job_complete", job=job_name)
except Exception as exc:
_runtime_finish(job_name, "error", processed=done, total=total, message=str(exc))
logger.error(json.dumps({
"event": "job_error", "job": job_name,
"error_type": type(exc).__name__, "message": str(exc),
}))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
async def run_daily_pipeline() -> None:
@@ -1162,19 +962,13 @@ def _cron_trigger(expr: str, timezone: str, fallback_key: str) -> CronTrigger:
try:
return CronTrigger.from_crontab(expr.strip(), timezone=timezone.strip())
except Exception:
logger.warning(json.dumps({
"event": "invalid_cron", "expr": expr, "timezone": timezone,
"fallback": SCHEDULE_DEFAULTS[fallback_key],
}))
_log_event(logging.WARNING, "invalid_cron", expr=expr, timezone=timezone, fallback=SCHEDULE_DEFAULTS[fallback_key])
return CronTrigger.from_crontab(SCHEDULE_DEFAULTS[fallback_key], timezone="UTC")
async def load_schedule_config(db: AsyncSession) -> dict[str, str]:
"""Read the cron schedule config from SystemSettings, defaults for any unset."""
result = await db.execute(
select(SystemSetting).where(SystemSetting.key.in_(list(SCHEDULE_DEFAULTS)))
)
stored = {s.key: s.value for s in result.scalars().all()}
stored = await settings_store.get_map(db, SCHEDULE_DEFAULTS)
return {key: (stored.get(key) or default) for key, default in SCHEDULE_DEFAULTS.items()}
@@ -1188,7 +982,7 @@ def reschedule_jobs(schedule_config: dict[str, str]) -> dict[str, str]:
expr = schedule_config.get(key) or SCHEDULE_DEFAULTS[key]
scheduler.reschedule_job(job_id, trigger=_cron_trigger(expr, tz, key))
applied[job_id] = expr
logger.info(json.dumps({"event": "jobs_rescheduled", "applied": applied, "timezone": tz}))
_log_event(logging.INFO, "jobs_rescheduled", applied=applied, timezone=tz)
return applied
@@ -1267,19 +1061,10 @@ def configure_scheduler(schedule_config: dict[str, str] | None = None) -> None:
replace_existing=True, next_run_time=None,
)
logger.info(
json.dumps({
"event": "scheduler_configured",
"timezone": tz,
"daily_pipeline": {
_log_event(logging.INFO, "scheduler_configured", timezone=tz, daily_pipeline={
"cron": cfg["schedule_daily_pipeline_cron"],
"steps": [name for name, _ in _DAILY_PIPELINE_STEPS],
},
"intraday_pipeline": {
}, intraday_pipeline={
"cron": cfg["schedule_intraday_pipeline_cron"],
"steps": [name for name, _ in _INTRADAY_PIPELINE_STEPS],
},
"fundamental_collector": {"cron": cfg["schedule_fundamentals_cron"]},
"independent": ["ticker_universe_sync", "alerts", "backtest"],
})
)
}, fundamental_collector={"cron": cfg["schedule_fundamentals_cron"]}, independent=["ticker_universe_sync", "alerts", "backtest"])