Files
signal-platform/app/scheduler.py
T
dennisthiessen f0b92a9718
Deploy / lint (push) Successful in 5s
Deploy / test (push) Successful in 36s
Deploy / deploy (push) Successful in 25s
add earnings-date guard — warn when a report falls in the target horizon
Finnhub's earnings calendar now supplies next_earnings_date through the
fundamentals chain; persisted on fundamental_data (migration 006) and exposed in
the fundamentals API. The recommendation panel warns when earnings fall within
the ~30-day target horizon (a report can gap price through stop/target) and
otherwise shows the next date. Informational only.

Deploy: run alembic upgrade (new fundamental_data.next_earnings_date column).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 12:44:08 +02:00

1025 lines
40 KiB
Python

"""APScheduler job definitions and FastAPI lifespan integration.
Defines four scheduled jobs:
- Data Collector (OHLCV fetch for all tickers)
- Sentiment Collector (sentiment for all tickers)
- Fundamental Collector (fundamentals for all tickers)
- R:R Scanner (trade setup scan for all tickers)
Each job processes tickers independently, logs errors as structured JSON,
handles rate limits by recording the last successful ticker, and checks
SystemSetting for enabled/disabled state.
"""
from __future__ import annotations
import json
import logging
import asyncio
from datetime import date, datetime, timezone
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy import case, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import async_session_factory
from app.models.fundamental import FundamentalData
from app.models.ohlcv import OHLCVRecord
from app.models.settings import SystemSetting
from app.models.sentiment import SentimentScore
from app.models.ticker import Ticker
from app.exceptions import ProviderError
from app.providers.alpaca import AlpacaOHLCVProvider
from app.providers.fundamentals_chain import build_fundamental_provider_chain
from app.providers.protocol import SentimentData
from app.services import fundamental_service, ingestion_service, sentiment_service
from app.services.alert_service import dispatch_alerts
from app.services.market_regime_service import update_market_regime
from app.services.outcome_service import evaluate_pending_setups
from app.services.rr_scanner_service import scan_all_tickers
from app.services.sentiment_provider_service import build_sentiment_provider
from app.services.ticker_universe_service import bootstrap_universe
logger = logging.getLogger(__name__)
# Module-level scheduler instance.
#
# job_defaults matter a lot here: this is a single-process app, so the scheduler
# shares one event loop with the API and every other job. APScheduler's default
# misfire_grace_time is just 1 second — if the loop is busy at the instant a
# daily job is due (e.g. the scanner is mid-run), the fire is processed late,
# flagged a misfire, and SILENTLY SKIPPED while next_run still advances 24h. So
# we grant a generous grace window, coalesce missed runs into one catch-up, and
# cap each job at a single concurrent instance.
scheduler = AsyncIOScheduler(
job_defaults={
"coalesce": True,
"max_instances": 1,
"misfire_grace_time": 3600, # tolerate a busy loop; a daily job up to 1h late is fine
}
)
# Track last successful ticker per job for rate-limit resume
_last_successful: dict[str, str | None] = {
"data_collector": None,
"sentiment_collector": None,
"fundamental_collector": None,
}
_job_runtime: dict[str, dict[str, object]] = {
"data_collector": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"sentiment_collector": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"fundamental_collector": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"rr_scanner": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"ticker_universe_sync": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"alerts": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
"market_regime": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _log_job_error(job_name: str, ticker: str, error: Exception) -> None:
"""Log a job error as structured JSON."""
logger.error(
json.dumps({
"event": "job_error",
"job": job_name,
"ticker": ticker,
"error_type": type(error).__name__,
"message": str(error),
})
)
def _runtime_start(job_name: str, total: int | None = None, message: str | None = None) -> None:
now = datetime.now(timezone.utc).isoformat()
_job_runtime[job_name] = {
"running": True,
"status": "running",
"processed": 0,
"total": total,
"progress_pct": 0.0 if total and total > 0 else None,
"current_ticker": None,
"started_at": now,
"finished_at": None,
"message": message,
}
def _runtime_progress(
job_name: str,
processed: int,
total: int | None,
current_ticker: str | None = None,
message: str | None = None,
) -> None:
progress_pct: float | None = None
if total and total > 0:
progress_pct = round((processed / total) * 100.0, 1)
runtime = _job_runtime.get(job_name, {})
runtime.update({
"running": True,
"status": "running",
"processed": processed,
"total": total,
"progress_pct": progress_pct,
"current_ticker": current_ticker,
"message": message,
})
_job_runtime[job_name] = runtime
def _runtime_finish(
job_name: str,
status: str,
processed: int,
total: int | None,
message: str | None = None,
) -> None:
runtime = _job_runtime.get(job_name, {})
runtime.update({
"running": False,
"status": status,
"processed": processed,
"total": total,
"progress_pct": 100.0 if total and processed >= total else runtime.get("progress_pct"),
"current_ticker": None,
"finished_at": datetime.now(timezone.utc).isoformat(),
"message": message,
})
_job_runtime[job_name] = runtime
def get_job_runtime_snapshot(job_name: str | None = None) -> dict[str, dict[str, object]] | dict[str, object]:
if job_name is not None:
return dict(_job_runtime.get(job_name, {}))
return {name: dict(meta) for name, meta in _job_runtime.items()}
async def _is_job_enabled(db: AsyncSession, job_name: str) -> bool:
"""Check SystemSetting for job enabled state. Defaults to True."""
key = f"job_{job_name}_enabled"
result = await db.execute(
select(SystemSetting).where(SystemSetting.key == key)
)
setting = result.scalar_one_or_none()
if setting is None:
return True
return setting.value.lower() == "true"
async def _get_all_tickers(db: AsyncSession) -> list[str]:
"""Return all tracked ticker symbols sorted alphabetically."""
result = await db.execute(select(Ticker.symbol).order_by(Ticker.symbol))
return list(result.scalars().all())
async def _get_ohlcv_priority_tickers(db: AsyncSession) -> list[str]:
"""Return symbols prioritized for OHLCV collection.
Priority:
1) Tickers with no OHLCV bars
2) Tickers with data, oldest latest OHLCV date first
3) Alphabetical tiebreaker
"""
latest_date = func.max(OHLCVRecord.date)
missing_first = case((latest_date.is_(None), 0), else_=1)
result = await db.execute(
select(Ticker.symbol)
.outerjoin(OHLCVRecord, OHLCVRecord.ticker_id == Ticker.id)
.group_by(Ticker.id, Ticker.symbol)
.order_by(missing_first.asc(), latest_date.asc(), Ticker.symbol.asc())
)
return list(result.scalars().all())
async def _get_sentiment_priority_tickers(db: AsyncSession) -> list[str]:
"""Return symbols prioritized for sentiment collection.
Priority:
1) Tickers with no sentiment records
2) Tickers with records, oldest latest sentiment timestamp first
3) Alphabetical tiebreaker
"""
latest_ts = func.max(SentimentScore.timestamp)
missing_first = case((latest_ts.is_(None), 0), else_=1)
result = await db.execute(
select(Ticker.symbol)
.outerjoin(SentimentScore, SentimentScore.ticker_id == Ticker.id)
.group_by(Ticker.id, Ticker.symbol)
.order_by(missing_first.asc(), latest_ts.asc(), Ticker.symbol.asc())
)
return list(result.scalars().all())
async def _get_fundamental_priority_tickers(db: AsyncSession) -> list[str]:
"""Return symbols prioritized for fundamentals refresh.
Priority:
1) Tickers with no fundamentals snapshot yet
2) Tickers with existing fundamentals, oldest fetched_at first
3) Alphabetical tiebreaker
"""
missing_first = case((FundamentalData.fetched_at.is_(None), 0), else_=1)
result = await db.execute(
select(Ticker.symbol)
.outerjoin(FundamentalData, FundamentalData.ticker_id == Ticker.id)
.order_by(missing_first.asc(), FundamentalData.fetched_at.asc(), Ticker.symbol.asc())
)
return list(result.scalars().all())
def _resume_tickers(symbols: list[str], job_name: str) -> list[str]:
"""Reorder tickers to resume after the last successful one (rate-limit resume).
If a previous run was rate-limited, start from the ticker after the last
successful one. Otherwise return the full list.
"""
last = _last_successful.get(job_name)
if last is None or last not in symbols:
return symbols
idx = symbols.index(last)
# Start from the next ticker, then wrap around
return symbols[idx + 1:] + symbols[:idx + 1]
def _chunked(symbols: list[str], chunk_size: int) -> list[list[str]]:
size = max(1, chunk_size)
return [symbols[i:i + size] for i in range(0, len(symbols), size)]
# ---------------------------------------------------------------------------
# Job: Data Collector (OHLCV)
# ---------------------------------------------------------------------------
async def collect_ohlcv() -> None:
"""Fetch latest daily OHLCV for all tracked tickers.
Uses AlpacaOHLCVProvider. Processes each ticker independently.
On rate limit, records last successful ticker for resume.
Start date is resolved by ingestion progress:
- existing ticker: resume from last_ingested_date + 1
- new ticker: backfill ~1 year by default
"""
job_name = "data_collector"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_runtime_start(job_name)
processed = 0
total: int | None = None
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
symbols = await _get_ohlcv_priority_tickers(db)
if not symbols:
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
return
total = len(symbols)
_runtime_progress(job_name, processed=0, total=total)
# Build provider (skip if keys not configured)
if not settings.alpaca_api_key or not settings.alpaca_api_secret:
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "alpaca keys not configured"}))
_runtime_finish(job_name, "skipped", processed=0, total=total, message="Alpaca keys not configured")
return
try:
provider = AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret)
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
return
end_date = date.today()
for symbol in symbols:
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
async with async_session_factory() as db:
try:
result = await ingestion_service.fetch_and_ingest(
db, provider, symbol, start_date=None, end_date=end_date,
)
_last_successful[job_name] = symbol
processed += 1
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
logger.info(json.dumps({
"event": "ticker_collected",
"job": job_name,
"ticker": symbol,
"status": result.status,
"records": result.records_ingested,
}))
if result.status == "partial":
# Rate limited — stop and resume next run
logger.warning(json.dumps({
"event": "rate_limited",
"job": job_name,
"ticker": symbol,
"processed": processed,
}))
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}")
return
except Exception as exc:
_log_job_error(job_name, symbol, exc)
# Reset resume pointer on full completion
_last_successful[job_name] = None
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Sentiment Collector
# ---------------------------------------------------------------------------
async def collect_sentiment() -> None:
"""Fetch sentiment for all tracked tickers via OpenAI.
Processes each ticker independently. On rate limit, records last
successful ticker for resume.
"""
job_name = "sentiment_collector"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_runtime_start(job_name)
processed = 0
total: int | None = None
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
symbols = await _get_sentiment_priority_tickers(db)
if not symbols:
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
return
total = len(symbols)
_runtime_progress(job_name, processed=0, total=total)
try:
async with async_session_factory() as cfg_db:
provider = await build_sentiment_provider(cfg_db)
except ProviderError as exc:
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": str(exc)}))
_runtime_finish(job_name, "skipped", processed=0, total=total, message=str(exc))
return
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
return
batch_size = max(1, settings.openai_sentiment_batch_size)
batches = _chunked(symbols, batch_size)
for batch in batches:
current_hint = batch[0] if len(batch) == 1 else f"{batch[0]} (+{len(batch) - 1})"
_runtime_progress(job_name, processed=processed, total=total, current_ticker=current_hint)
batch_results: dict[str, SentimentData] = {}
if len(batch) > 1 and hasattr(provider, "fetch_sentiment_batch"):
try:
batch_results = await provider.fetch_sentiment_batch(batch)
except Exception as exc:
msg = str(exc).lower()
if "rate" in msg or "quota" in msg or "429" in msg:
logger.warning(json.dumps({
"event": "rate_limited",
"job": job_name,
"ticker": batch[0],
"processed": processed,
}))
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {batch[0]}")
return
logger.warning(json.dumps({
"event": "batch_fallback",
"job": job_name,
"batch": batch,
"reason": str(exc),
}))
for symbol in batch:
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
data = batch_results.get(symbol) if batch_results else None
if data is None:
try:
data = await provider.fetch_sentiment(symbol)
except Exception as exc:
msg = str(exc).lower()
if "rate" in msg or "quota" in msg or "429" in msg:
logger.warning(json.dumps({
"event": "rate_limited",
"job": job_name,
"ticker": symbol,
"processed": processed,
}))
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}")
return
_log_job_error(job_name, symbol, exc)
continue
async with async_session_factory() as db:
try:
await sentiment_service.store_sentiment(
db,
symbol=symbol,
classification=data.classification,
confidence=data.confidence,
source=data.source,
timestamp=data.timestamp,
reasoning=data.reasoning,
citations=data.citations,
)
_last_successful[job_name] = symbol
processed += 1
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
logger.info(json.dumps({
"event": "ticker_collected",
"job": job_name,
"ticker": symbol,
"classification": data.classification,
"confidence": data.confidence,
}))
except Exception as exc:
_log_job_error(job_name, symbol, exc)
_last_successful[job_name] = None
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Fundamental Collector
# ---------------------------------------------------------------------------
async def collect_fundamentals() -> None:
"""Fetch fundamentals for all tracked tickers via FMP.
Processes each ticker independently. On rate limit, records last
successful ticker for resume.
"""
job_name = "fundamental_collector"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_runtime_start(job_name)
processed = 0
total: int | None = None
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
symbols = await _get_fundamental_priority_tickers(db)
if not symbols:
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0}))
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
return
total = len(symbols)
_runtime_progress(job_name, processed=0, total=total)
if not (settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key):
logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "no fundamentals provider keys configured"}))
_runtime_finish(job_name, "skipped", processed=0, total=total, message="No fundamentals provider keys configured")
return
try:
provider = build_fundamental_provider_chain()
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
return
max_retries = max(0, settings.fundamental_rate_limit_retries)
base_backoff = max(1, settings.fundamental_rate_limit_backoff_seconds)
spacing = max(0.0, settings.fundamental_request_spacing_seconds)
async def _store(symbol: str, data) -> None:
async with async_session_factory() as db:
await fundamental_service.store_fundamental(
db,
symbol=symbol,
pe_ratio=data.pe_ratio,
revenue_growth=data.revenue_growth,
earnings_surprise=data.earnings_surprise,
market_cap=data.market_cap,
next_earnings_date=data.next_earnings_date,
unavailable_fields=data.unavailable_fields,
)
for symbol in symbols:
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
attempt = 0
while True:
try:
data = await provider.fetch_fundamentals(symbol)
await _store(symbol, data)
_last_successful[job_name] = symbol
processed += 1
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
logger.info(json.dumps({
"event": "ticker_collected",
"job": job_name,
"ticker": symbol,
}))
break
except Exception as exc:
msg = str(exc).lower()
if "rate" in msg or "429" in msg:
if attempt < max_retries:
wait_seconds = base_backoff * (2 ** attempt)
attempt += 1
logger.warning(json.dumps({
"event": "rate_limited_retry",
"job": job_name,
"ticker": symbol,
"attempt": attempt,
"max_retries": max_retries,
"wait_seconds": wait_seconds,
"processed": processed,
}))
_runtime_progress(
job_name,
processed=processed,
total=total,
current_ticker=symbol,
message=f"Rate-limited at {symbol}; retry {attempt}/{max_retries} in {wait_seconds}s",
)
await asyncio.sleep(wait_seconds)
continue
# Retries exhausted: store whatever partial data we can
# still get (e.g. FMP market cap) and move on, rather than
# aborting the whole run and leaving every later ticker
# untouched.
logger.warning(json.dumps({
"event": "rate_limited_partial",
"job": job_name,
"ticker": symbol,
"processed": processed,
}))
try:
data = await provider.fetch_fundamentals(symbol, allow_partial=True)
await _store(symbol, data)
processed += 1
except Exception as exc2:
_log_job_error(job_name, symbol, exc2)
break
_log_job_error(job_name, symbol, exc)
break
if spacing:
await asyncio.sleep(spacing)
_last_successful[job_name] = None
logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed}))
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
# ---------------------------------------------------------------------------
# Job: R:R Scanner
# ---------------------------------------------------------------------------
async def scan_rr() -> None:
"""Scan all tickers for trade setups meeting the R:R threshold.
Uses rr_scanner_service.scan_all_tickers which already handles
per-ticker error isolation internally.
"""
job_name = "rr_scanner"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_runtime_start(job_name)
processed = 0
total: int | None = None
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
symbols = await _get_all_tickers(db)
total = len(symbols)
_runtime_progress(job_name, processed=0, total=total)
def _on_progress(done: int, count: int, symbol: str) -> None:
_runtime_progress(
job_name, processed=done, total=count, current_ticker=symbol or None
)
try:
setups = await scan_all_tickers(
db, rr_threshold=settings.default_rr_threshold,
progress_callback=_on_progress,
)
processed = total or 0
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Found {len(setups)} setups")
logger.info(json.dumps({
"event": "job_complete",
"job": job_name,
"setups_found": len(setups),
}))
except Exception as exc:
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
logger.error(json.dumps({
"event": "job_error",
"job": job_name,
"error_type": type(exc).__name__,
"message": str(exc),
}))
except Exception as exc:
logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)}))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Outcome Evaluator
# ---------------------------------------------------------------------------
async def evaluate_outcomes() -> None:
"""Evaluate unresolved trade setups against OHLCV data collected since.
Writes actual_outcome / outcome_date / evaluated_at on each decided setup.
Undecided setups stay pending and are re-checked on the next run.
"""
job_name = "outcome_evaluator"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
summary = await evaluate_pending_setups(
db, max_bars=settings.outcome_evaluation_max_bars
)
_runtime_progress(job_name, processed=1, total=1)
_runtime_finish(
job_name, "completed", processed=1, total=1,
message=f"Evaluated {summary['evaluated']}, pending {summary['still_pending']}",
)
logger.info(json.dumps({
"event": "job_complete",
"job": job_name,
"summary": summary,
}))
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
logger.error(json.dumps({
"event": "job_error",
"job": job_name,
"error_type": type(exc).__name__,
"message": str(exc),
}))
# ---------------------------------------------------------------------------
# Job: Alerts Dispatcher
# ---------------------------------------------------------------------------
async def dispatch_alerts_job() -> None:
"""Push Telegram alerts for qualified setups, S/R proximity, score drops, digest."""
job_name = "alerts"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
result = await dispatch_alerts(db)
_runtime_progress(job_name, processed=1, total=1)
_runtime_finish(
job_name, "completed", processed=1, total=1,
message=f"{result.get('status')}, sent {result.get('sent', 0)}",
)
logger.info(json.dumps({"event": "job_complete", "job": job_name, "result": result}))
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
logger.error(json.dumps({
"event": "job_error",
"job": job_name,
"error_type": type(exc).__name__,
"message": str(exc),
}))
# ---------------------------------------------------------------------------
# Job: Market Regime
# ---------------------------------------------------------------------------
async def compute_market_regime() -> None:
"""Refresh the cached benchmark (SPY) trend regime."""
job_name = "market_regime"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
regime = await update_market_regime(db)
_runtime_progress(job_name, processed=1, total=1)
_runtime_finish(
job_name, "completed", processed=1, total=1,
message=f"Regime: {regime.get('label')}",
)
logger.info(json.dumps({"event": "job_complete", "job": job_name, "label": regime.get("label")}))
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
logger.error(json.dumps({
"event": "job_error",
"job": job_name,
"error_type": type(exc).__name__,
"message": str(exc),
}))
# ---------------------------------------------------------------------------
# Job: Ticker Universe Sync
# ---------------------------------------------------------------------------
async def sync_ticker_universe() -> None:
"""Sync tracked tickers from configured default universe.
Setting key: ticker_universe_default (sp500 | nasdaq100 | nasdaq_all)
"""
job_name = "ticker_universe_sync"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
result = await db.execute(
select(SystemSetting).where(SystemSetting.key == "ticker_universe_default")
)
setting = result.scalar_one_or_none()
universe = (setting.value if setting else "sp500").strip().lower()
async with async_session_factory() as db:
summary = await bootstrap_universe(db, universe, prune_missing=False)
_runtime_progress(job_name, processed=1, total=1)
_runtime_finish(job_name, "completed", processed=1, total=1, message=f"Synced {universe}")
logger.info(json.dumps({
"event": "job_complete",
"job": job_name,
"universe": universe,
"summary": summary,
}))
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
logger.error(json.dumps({
"event": "job_error",
"job": job_name,
"error_type": type(exc).__name__,
"message": str(exc),
}))
# ---------------------------------------------------------------------------
# Frequency helpers
# ---------------------------------------------------------------------------
_FREQUENCY_MAP: dict[str, dict[str, int]] = {
"hourly": {"hours": 1},
"daily": {"hours": 24},
}
def _parse_frequency(freq: str) -> dict[str, int]:
"""Convert a frequency string to APScheduler interval kwargs."""
return _FREQUENCY_MAP.get(freq.lower(), {"hours": 24})
# ---------------------------------------------------------------------------
# Scheduler setup
# ---------------------------------------------------------------------------
def configure_scheduler() -> None:
"""Add all jobs to the scheduler with configured intervals.
Call this once before scheduler.start(). Removes any existing jobs first
to ensure idempotency.
"""
scheduler.remove_all_jobs()
# Data Collector — configurable frequency (default: hourly)
ohlcv_interval = _parse_frequency(settings.data_collector_frequency)
scheduler.add_job(
collect_ohlcv,
"interval",
**ohlcv_interval,
id="data_collector",
name="Data Collector (OHLCV)",
replace_existing=True,
)
# Sentiment Collector — default 30 min
scheduler.add_job(
collect_sentiment,
"interval",
minutes=settings.sentiment_poll_interval_minutes,
id="sentiment_collector",
name="Sentiment Collector",
replace_existing=True,
)
# Fundamental Collector — configurable frequency (default: daily)
fund_interval = _parse_frequency(settings.fundamental_fetch_frequency)
scheduler.add_job(
collect_fundamentals,
"interval",
**fund_interval,
id="fundamental_collector",
name="Fundamental Collector",
replace_existing=True,
)
# R:R Scanner — configurable frequency (default: hourly)
rr_interval = _parse_frequency(settings.rr_scan_frequency)
scheduler.add_job(
scan_rr,
"interval",
**rr_interval,
id="rr_scanner",
name="R:R Scanner",
replace_existing=True,
)
# Universe Sync — nightly
scheduler.add_job(
sync_ticker_universe,
"interval",
hours=24,
id="ticker_universe_sync",
name="Ticker Universe Sync",
replace_existing=True,
)
# Outcome Evaluator — nightly, after fresh OHLCV has been collected
scheduler.add_job(
evaluate_outcomes,
"interval",
hours=24,
id="outcome_evaluator",
name="Outcome Evaluator",
replace_existing=True,
)
# Alerts Dispatcher — configurable frequency (default: hourly)
alerts_interval = _parse_frequency(settings.alerts_frequency)
scheduler.add_job(
dispatch_alerts_job,
"interval",
**alerts_interval,
id="alerts",
name="Alerts Dispatcher",
replace_existing=True,
)
# Market Regime — nightly benchmark trend refresh
scheduler.add_job(
compute_market_regime,
"interval",
hours=24,
id="market_regime",
name="Market Regime",
replace_existing=True,
)
logger.info(
json.dumps({
"event": "scheduler_configured",
"jobs": {
"data_collector": ohlcv_interval,
"sentiment_collector": {"minutes": settings.sentiment_poll_interval_minutes},
"fundamental_collector": fund_interval,
"rr_scanner": rr_interval,
"ticker_universe_sync": {"hours": 24},
},
})
)