Files
signal-platform/app/scheduler.py
T
dennisthiessen 30effa89b7
Deploy / lint (push) Successful in 6s
Deploy / test (push) Failing after 12s
Deploy / deploy (push) Has been skipped
feat: ticker search, watchlist momentum column, alpha vs S&P 500
Three usability fixes:

1. Global ticker search in the sidebar (TickerSearch) — typeahead over the
   tracked universe that opens a ticker's detail page without adding it to the
   watchlist. Also wired into the mobile nav.

2. Watchlist table shows the ticker's 12-1 momentum percentile (the top-pick
   selector) instead of the noisy full S/R-level list. Enriched from the setup
   already loaded in watchlist_service._enrich_entry — no extra query.

3. Alpha vs the S&P 500 on paper trades (open + closed). New benchmark_prices
   table + benchmark_service store SPY daily closes (a standalone series, not a
   Ticker, so it never enters the scanner / momentum ranking / rankings) via a
   new daily-pipeline step. paper_trade_service computes per-trade
   benchmark_return / alpha_pct / alpha_usd over each holding period; the open-
   trades table, dashboard, and closed-trades panel surface per-trade and total
   alpha. The list read path never makes a provider call.

Deploy: alembic upgrade head, then run the benchmark/daily job once to populate
SPY closes (alpha shows "—" until then).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-28 08:44:40 +02:00

1276 lines
55 KiB
Python

"""APScheduler job definitions and FastAPI lifespan integration.
Defines four scheduled jobs:
- Data Collector (OHLCV fetch for all tickers)
- Sentiment Collector (sentiment for all tickers)
- Fundamental Collector (fundamentals for all tickers)
- R:R Scanner (trade setup scan for all tickers)
Each job processes tickers independently, logs errors as structured JSON,
handles rate limits by recording the last successful ticker, and checks
SystemSetting for enabled/disabled state.
"""
from __future__ import annotations
import json
import logging
import asyncio
from datetime import date, datetime, timedelta, timezone
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import and_, case, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import async_session_factory
from app.models.fundamental import FundamentalData
from app.models.ohlcv import OHLCVRecord
from app.models.sentiment import SentimentScore
from app.models.ticker import Ticker
from app.exceptions import ProviderError
from app.providers.alpaca import AlpacaOHLCVProvider
from app.providers.fundamentals_chain import build_fundamental_provider_chain
from app.providers.protocol import SentimentData
from app.services import fundamental_service, ingestion_service, sentiment_service, settings_store
from app.services.alert_service import dispatch_alerts
from app.services.backtest_service import run_and_store as run_backtest_and_store
from app.services.benchmark_service import refresh_benchmark_prices
from app.services.market_regime_service import update_market_regime
from app.services.regime_monitor_service import update_regime_monitor
from app.services.event_study_service import run_and_store as run_event_study_and_store
from app.services.outcome_service import evaluate_pending_setups
from app.services.rr_scanner_service import scan_all_tickers
from app.services.sentiment_provider_service import build_sentiment_provider
from app.services.ticker_universe_service import bootstrap_universe
logger = logging.getLogger(__name__)
# Module-level scheduler instance.
#
# job_defaults matter a lot here: this is a single-process app, so the scheduler
# shares one event loop with the API and every other job. APScheduler's default
# misfire_grace_time is just 1 second — if the loop is busy at the instant a
# daily job is due (e.g. the scanner is mid-run), the fire is processed late,
# flagged a misfire, and SILENTLY SKIPPED while next_run still advances 24h. So
# we grant a generous grace window, coalesce missed runs into one catch-up, and
# cap each job at a single concurrent instance.
scheduler = AsyncIOScheduler(
job_defaults={
"coalesce": True,
"max_instances": 1,
"misfire_grace_time": 3600, # tolerate a busy loop; a daily job up to 1h late is fine
}
)
# Track last successful ticker per job for rate-limit resume
_last_successful: dict[str, str | None] = {
"data_collector": None,
"data_backfill": None,
"sentiment_collector": None,
"fundamental_collector": None,
}
# Jobs whose per-run progress is surfaced to Admin → Jobs. (outcome_evaluator is
# created lazily on first run via _runtime_start.)
_JOB_NAMES = [
"data_collector",
"data_backfill",
"sentiment_collector",
"fundamental_collector",
"rr_scanner",
"ticker_universe_sync",
"alerts",
"market_regime",
"regime_monitor",
"event_study",
"backtest",
"daily_pipeline",
"intraday_pipeline",
]
def _idle_runtime() -> dict[str, object]:
return {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
}
_job_runtime: dict[str, dict[str, object]] = {name: _idle_runtime() for name in _JOB_NAMES}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _log_event(level: int, event: str, **fields: object) -> None:
"""Emit a structured JSON log line: {"event": ..., **fields}."""
logger.log(level, json.dumps({"event": event, **fields}))
def _log_job_error(job_name: str, ticker: str, error: Exception) -> None:
"""Log a per-ticker job error as structured JSON."""
_log_event(
logging.ERROR, "job_error", job=job_name, ticker=ticker,
error_type=type(error).__name__, message=str(error),
)
def _runtime_start(job_name: str, total: int | None = None, message: str | None = None) -> None:
_job_runtime[job_name] = {
**_idle_runtime(),
"running": True,
"status": "running",
"total": total,
"progress_pct": 0.0 if total and total > 0 else None,
"started_at": datetime.now(timezone.utc).isoformat(),
"message": message,
}
def _runtime_progress(
job_name: str,
processed: int,
total: int | None,
current_ticker: str | None = None,
message: str | None = None,
) -> None:
progress_pct: float | None = None
if total and total > 0:
progress_pct = round((processed / total) * 100.0, 1)
runtime = _job_runtime.get(job_name, {})
runtime.update({
"running": True,
"status": "running",
"processed": processed,
"total": total,
"progress_pct": progress_pct,
"current_ticker": current_ticker,
"message": message,
})
_job_runtime[job_name] = runtime
def _runtime_finish(
job_name: str,
status: str,
processed: int,
total: int | None,
message: str | None = None,
) -> None:
runtime = _job_runtime.get(job_name, {})
runtime.update({
"running": False,
"status": status,
"processed": processed,
"total": total,
"progress_pct": 100.0 if total and processed >= total else runtime.get("progress_pct"),
"current_ticker": None,
"finished_at": datetime.now(timezone.utc).isoformat(),
"message": message,
})
_job_runtime[job_name] = runtime
def get_job_runtime_snapshot(job_name: str | None = None) -> dict[str, dict[str, object]] | dict[str, object]:
if job_name is not None:
return dict(_job_runtime.get(job_name, {}))
return {name: dict(meta) for name, meta in _job_runtime.items()}
async def _is_job_enabled(db: AsyncSession, job_name: str) -> bool:
"""Check SystemSetting for job enabled state. Defaults to True."""
setting = await settings_store.get_setting(db, f"job_{job_name}_enabled")
return setting is None or setting.value.lower() == "true"
async def _get_all_tickers(db: AsyncSession) -> list[str]:
"""Return all tracked ticker symbols sorted alphabetically."""
result = await db.execute(select(Ticker.symbol).order_by(Ticker.symbol))
return list(result.scalars().all())
async def _get_ohlcv_priority_tickers(db: AsyncSession) -> list[str]:
"""Return symbols prioritized for OHLCV collection.
Priority:
1) Tickers with no OHLCV bars
2) Tickers with data, oldest latest OHLCV date first
3) Alphabetical tiebreaker
"""
latest_date = func.max(OHLCVRecord.date)
missing_first = case((latest_date.is_(None), 0), else_=1)
result = await db.execute(
select(Ticker.symbol)
.outerjoin(OHLCVRecord, OHLCVRecord.ticker_id == Ticker.id)
.group_by(Ticker.id, Ticker.symbol)
.order_by(missing_first.asc(), latest_date.asc(), Ticker.symbol.asc())
)
return list(result.scalars().all())
async def _get_top_pick_feeder_ids(db: AsyncSession) -> set[int]:
"""Ticker ids whose latest LONG setup makes them a top-pick feeder.
A dashboard 'top pick' is the highest-momentum *qualified* setup. Sentiment
can never move a ticker's momentum percentile (the gate's core axis) — only
its confidence and EV ranking. So the only tickers that are, or could become
with positive sentiment, a top pick are momentum leaders that already have a
tradeable long setup clearing the R:R floor. That set is exactly:
latest long setup with momentum_percentile >= gate AND rr_ratio >= floor.
It contains both the currently-qualified setups and the near-miss ones held
back only by a neutral/missing sentiment — the cases the user saw surface as
top picks with no sentiment. Only meaningful with the momentum gate on
(min_momentum_percentile > 0); off, there is no leader axis to anchor on and we
defer to the filler set. Best-effort: a config failure must not stop collection.
"""
from app.models.trade_setup import TradeSetup
try:
from app.services.admin_service import get_activation_config
activation = await get_activation_config(db)
min_pct = float(activation.get("min_momentum_percentile", 0.0))
min_rr = float(activation.get("min_rr", 0.0))
except Exception:
logger.exception("Sentiment top-pick scoping failed; using filler set only")
return set()
if min_pct <= 0:
return set()
# Latest long setup per ticker, then keep those clearing the gate's momentum
# percentile and R:R floor. (Sentiment runs before the day's scan, so this
# reads the previous scan's setups — momentum is a slow, cross-sectional signal,
# so yesterday's leaders are the right anchor.)
latest_long = (
select(TradeSetup.ticker_id, func.max(TradeSetup.detected_at).label("md"))
.where(TradeSetup.direction == "long")
.group_by(TradeSetup.ticker_id)
.subquery()
)
rows = await db.execute(
select(TradeSetup.ticker_id)
.join(
latest_long,
and_(
TradeSetup.ticker_id == latest_long.c.ticker_id,
TradeSetup.detected_at == latest_long.c.md,
),
)
.where(
TradeSetup.direction == "long",
TradeSetup.rr_ratio >= min_rr,
TradeSetup.momentum_percentile.is_not(None),
TradeSetup.momentum_percentile >= min_pct,
)
)
return {r[0] for r in rows.all()}
async def _stale_sentiment_symbols(
db: AsyncSession, ticker_ids: set[int], cutoff: datetime
) -> list[str]:
"""Symbols among ``ticker_ids`` whose newest sentiment is missing or older than
``cutoff``, ordered missing-first → oldest → alphabetical."""
if not ticker_ids:
return []
latest_ts = func.max(SentimentScore.timestamp)
missing_first = case((latest_ts.is_(None), 0), else_=1)
stmt = (
select(Ticker.symbol)
.outerjoin(SentimentScore, SentimentScore.ticker_id == Ticker.id)
.where(Ticker.id.in_(ticker_ids))
.group_by(Ticker.id, Ticker.symbol)
.having(or_(latest_ts.is_(None), latest_ts < cutoff))
.order_by(missing_first.asc(), latest_ts.asc(), Ticker.symbol.asc())
)
result = await db.execute(stmt)
return list(result.scalars().all())
async def _get_sentiment_priority_tickers(db: AsyncSession) -> list[str]:
"""Symbols to fetch sentiment for, skipping anything refreshed within
``sentiment_fresh_hours``.
No per-run cap: the relevant set is naturally bounded (curated watchlist <= 20,
a handful of open trades and top-pick feeders, top-N composite), so refreshing
all of it stays well inside the free search tier — and everything that matters
is always fully covered. The two tiers only affect ORDER, so a mid-run provider
rate limit still lands the names we care about first:
Priority: top-pick feeders (momentum leaders with a tradeable long setup, see
``_get_top_pick_feeder_ids``) + the curated watchlist + open paper trades —
the set we never want shown without sentiment.
Filler: top-N by composite — a cheap discovery net for names not yet covered.
Once the set is fresh, runs make zero grounded searches until it ages out.
"""
from app.models.paper_trade import PaperTrade
from app.models.score import CompositeScore
from app.models.watchlist import WatchlistEntry
cutoff = datetime.now(timezone.utc) - timedelta(hours=settings.sentiment_fresh_hours)
# Priority: the set we always want fresh — top-pick feeders, the curated
# watchlist, and open positions.
priority_ids = await _get_top_pick_feeder_ids(db)
wl = await db.execute(
select(WatchlistEntry.ticker_id)
.where(WatchlistEntry.entry_type != "dismissed")
.distinct()
)
priority_ids.update(r[0] for r in wl.all())
pt = await db.execute(
select(PaperTrade.ticker_id).where(PaperTrade.status == "open").distinct()
)
priority_ids.update(r[0] for r in pt.all())
# Filler: top-N by composite, a discovery net for names not already covered.
top = await db.execute(
select(CompositeScore.ticker_id)
.order_by(CompositeScore.score.desc())
.limit(settings.sentiment_top_composite)
)
filler_ids = {r[0] for r in top.all()} - priority_ids
if not priority_ids and not filler_ids:
return []
# No cap — fetch every stale name. Priority first so a rate limit mid-run still
# covers the curated/at-risk set before the discovery net.
priority_syms = await _stale_sentiment_symbols(db, priority_ids, cutoff)
filler_syms = await _stale_sentiment_symbols(db, filler_ids, cutoff)
return priority_syms + filler_syms
async def _get_fundamental_priority_tickers(db: AsyncSession) -> list[str]:
"""Return symbols prioritized for fundamentals refresh.
Priority:
1) Tickers with no fundamentals snapshot yet
2) Tickers with existing fundamentals, oldest fetched_at first
3) Alphabetical tiebreaker
"""
missing_first = case((FundamentalData.fetched_at.is_(None), 0), else_=1)
result = await db.execute(
select(Ticker.symbol)
.outerjoin(FundamentalData, FundamentalData.ticker_id == Ticker.id)
.order_by(missing_first.asc(), FundamentalData.fetched_at.asc(), Ticker.symbol.asc())
)
return list(result.scalars().all())
def _resume_tickers(symbols: list[str], job_name: str) -> list[str]:
"""Reorder tickers to resume after the last successful one (rate-limit resume).
If a previous run was rate-limited, start from the ticker after the last
successful one. Otherwise return the full list.
"""
last = _last_successful.get(job_name)
if last is None or last not in symbols:
return symbols
idx = symbols.index(last)
# Start from the next ticker, then wrap around
return symbols[idx + 1:] + symbols[:idx + 1]
def _chunked(symbols: list[str], chunk_size: int) -> list[list[str]]:
size = max(1, chunk_size)
return [symbols[i:i + size] for i in range(0, len(symbols), size)]
# ---------------------------------------------------------------------------
# Job: Data Collector (OHLCV)
# ---------------------------------------------------------------------------
async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_collector") -> None:
"""Fetch latest daily OHLCV for all tracked tickers.
Uses AlpacaOHLCVProvider. Processes each ticker independently.
On rate limit, records last successful ticker for resume.
Start date is resolved by ingestion progress:
- existing ticker: resume from last_ingested_date + 1
- new ticker: backfill the configured history window
``full_backfill`` forces every ticker to re-fetch the full
``settings.ohlcv_history_days`` window (ignoring incremental resume) — used by
the manual data_backfill job to deepen shallow histories. ``job_name`` lets the
backfill report its own runtime/resume state separate from data_collector.
"""
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name)
processed = 0
total: int | None = None
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
symbols = await _get_ohlcv_priority_tickers(db)
if not symbols:
_log_event(logging.INFO, "job_complete", job=job_name, tickers=0)
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
return
total = len(symbols)
_runtime_progress(job_name, processed=0, total=total)
# Build provider (skip if keys not configured)
if not settings.alpaca_api_key or not settings.alpaca_api_secret:
_log_event(logging.WARNING, "job_skipped", job=job_name, reason="alpaca keys not configured")
_runtime_finish(job_name, "skipped", processed=0, total=total, message="Alpaca keys not configured")
return
try:
provider = AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret)
except Exception as exc:
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
return
end_date = date.today()
# Full backfill: pass an explicit start_date so fetch_and_ingest re-pulls
# the whole window instead of resuming from the last stored bar.
backfill_start = (
end_date - timedelta(days=settings.ohlcv_history_days) if full_backfill else None
)
for symbol in symbols:
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
async with async_session_factory() as db:
try:
result = await ingestion_service.fetch_and_ingest(
db, provider, symbol, start_date=backfill_start, end_date=end_date,
)
_last_successful[job_name] = symbol
processed += 1
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
_log_event(logging.INFO, "ticker_collected", job=job_name, ticker=symbol, status=result.status, records=result.records_ingested)
if result.status == "partial":
# Rate limited — stop and resume next run
_log_event(logging.WARNING, "rate_limited", job=job_name, ticker=symbol, processed=processed)
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}")
return
except Exception as exc:
_log_job_error(job_name, symbol, exc)
# Reset resume pointer on full completion
_last_successful[job_name] = None
_log_event(logging.INFO, "job_complete", job=job_name, tickers=processed)
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
except Exception as exc:
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
async def backfill_ohlcv() -> None:
"""Deep historical backfill: re-fetch the full ``settings.ohlcv_history_days``
window for every ticker, ignoring incremental resume.
Manual/triggered job (Admin → Jobs). Run once to deepen the ~1-year histories
so long-lookback factors (12-month momentum, 52-week high) and multi-regime
backtests become computable. Idempotent (upsert); resumes after rate limits.
"""
await collect_ohlcv(full_backfill=True, job_name="data_backfill")
# ---------------------------------------------------------------------------
# Job: Sentiment Collector
# ---------------------------------------------------------------------------
async def collect_sentiment() -> None:
"""Fetch sentiment for all tracked tickers via OpenAI.
Processes each ticker independently. On rate limit, records last
successful ticker for resume.
"""
job_name = "sentiment_collector"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name)
processed = 0
total: int | None = None
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
symbols = await _get_sentiment_priority_tickers(db)
if not symbols:
_log_event(logging.INFO, "job_complete", job=job_name, tickers=0)
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
return
total = len(symbols)
_runtime_progress(job_name, processed=0, total=total)
try:
async with async_session_factory() as cfg_db:
provider = await build_sentiment_provider(cfg_db)
except ProviderError as exc:
_log_event(logging.WARNING, "job_skipped", job=job_name, reason=str(exc))
_runtime_finish(job_name, "skipped", processed=0, total=total, message=str(exc))
return
except Exception as exc:
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
return
batch_size = max(1, settings.openai_sentiment_batch_size)
batches = _chunked(symbols, batch_size)
for batch in batches:
current_hint = batch[0] if len(batch) == 1 else f"{batch[0]} (+{len(batch) - 1})"
_runtime_progress(job_name, processed=processed, total=total, current_ticker=current_hint)
batch_results: dict[str, SentimentData] = {}
if len(batch) > 1 and hasattr(provider, "fetch_sentiment_batch"):
try:
batch_results = await provider.fetch_sentiment_batch(batch)
except Exception as exc:
msg = str(exc).lower()
if "rate" in msg or "quota" in msg or "429" in msg:
_log_event(logging.WARNING, "rate_limited", job=job_name, ticker=batch[0], processed=processed)
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {batch[0]}")
return
_log_event(logging.WARNING, "batch_fallback", job=job_name, batch=batch, reason=str(exc))
for symbol in batch:
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
data = batch_results.get(symbol) if batch_results else None
if data is None:
try:
data = await provider.fetch_sentiment(symbol)
except Exception as exc:
msg = str(exc).lower()
if "rate" in msg or "quota" in msg or "429" in msg:
_log_event(logging.WARNING, "rate_limited", job=job_name, ticker=symbol, processed=processed)
_runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}")
return
_log_job_error(job_name, symbol, exc)
continue
async with async_session_factory() as db:
try:
await sentiment_service.store_sentiment(
db,
symbol=symbol,
classification=data.classification,
confidence=data.confidence,
source=data.source,
timestamp=data.timestamp,
reasoning=data.reasoning,
citations=data.citations,
recommendation=data.recommendation,
)
_last_successful[job_name] = symbol
processed += 1
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
_log_event(logging.INFO, "ticker_collected", job=job_name, ticker=symbol, classification=data.classification, confidence=data.confidence)
except Exception as exc:
_log_job_error(job_name, symbol, exc)
_last_successful[job_name] = None
_log_event(logging.INFO, "job_complete", job=job_name, tickers=processed)
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
except Exception as exc:
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Fundamental Collector
# ---------------------------------------------------------------------------
async def collect_fundamentals() -> None:
"""Fetch fundamentals for all tracked tickers via FMP.
Processes each ticker independently. On rate limit, records last
successful ticker for resume.
"""
job_name = "fundamental_collector"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name)
processed = 0
total: int | None = None
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
symbols = await _get_fundamental_priority_tickers(db)
if not symbols:
_log_event(logging.INFO, "job_complete", job=job_name, tickers=0)
_runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers")
return
total = len(symbols)
_runtime_progress(job_name, processed=0, total=total)
if not (settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key):
_log_event(logging.WARNING, "job_skipped", job=job_name, reason="no fundamentals provider keys configured")
_runtime_finish(job_name, "skipped", processed=0, total=total, message="No fundamentals provider keys configured")
return
try:
provider = build_fundamental_provider_chain()
except Exception as exc:
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=0, total=total, message=str(exc))
return
max_retries = max(0, settings.fundamental_rate_limit_retries)
base_backoff = max(1, settings.fundamental_rate_limit_backoff_seconds)
spacing = max(0.0, settings.fundamental_request_spacing_seconds)
async def _store(symbol: str, data) -> None:
async with async_session_factory() as db:
await fundamental_service.store_fundamental(
db,
symbol=symbol,
pe_ratio=data.pe_ratio,
revenue_growth=data.revenue_growth,
earnings_surprise=data.earnings_surprise,
market_cap=data.market_cap,
next_earnings_date=data.next_earnings_date,
unavailable_fields=data.unavailable_fields,
)
for symbol in symbols:
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
attempt = 0
while True:
try:
data = await provider.fetch_fundamentals(symbol)
await _store(symbol, data)
_last_successful[job_name] = symbol
processed += 1
_runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol)
_log_event(logging.INFO, "ticker_collected", job=job_name, ticker=symbol)
break
except Exception as exc:
msg = str(exc).lower()
if "rate" in msg or "429" in msg:
if attempt < max_retries:
wait_seconds = base_backoff * (2 ** attempt)
attempt += 1
_log_event(logging.WARNING, "rate_limited_retry", job=job_name, ticker=symbol, attempt=attempt, max_retries=max_retries, wait_seconds=wait_seconds, processed=processed)
_runtime_progress(
job_name,
processed=processed,
total=total,
current_ticker=symbol,
message=f"Rate-limited at {symbol}; retry {attempt}/{max_retries} in {wait_seconds}s",
)
await asyncio.sleep(wait_seconds)
continue
# Retries exhausted: store whatever partial data we can
# still get (e.g. FMP market cap) and move on, rather than
# aborting the whole run and leaving every later ticker
# untouched.
_log_event(logging.WARNING, "rate_limited_partial", job=job_name, ticker=symbol, processed=processed)
try:
data = await provider.fetch_fundamentals(symbol, allow_partial=True)
await _store(symbol, data)
processed += 1
except Exception as exc2:
_log_job_error(job_name, symbol, exc2)
break
_log_job_error(job_name, symbol, exc)
break
if spacing:
await asyncio.sleep(spacing)
_last_successful[job_name] = None
_log_event(logging.INFO, "job_complete", job=job_name, tickers=processed)
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers")
except Exception as exc:
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
# ---------------------------------------------------------------------------
# Job: R:R Scanner
# ---------------------------------------------------------------------------
async def scan_rr() -> None:
"""Scan all tickers for trade setups meeting the R:R threshold.
Uses rr_scanner_service.scan_all_tickers which already handles
per-ticker error isolation internally.
"""
job_name = "rr_scanner"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name)
processed = 0
total: int | None = None
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
symbols = await _get_all_tickers(db)
total = len(symbols)
_runtime_progress(job_name, processed=0, total=total)
def _on_progress(done: int, count: int, symbol: str) -> None:
_runtime_progress(
job_name, processed=done, total=count, current_ticker=symbol or None
)
try:
setups = await scan_all_tickers(
db, rr_threshold=settings.default_rr_threshold,
progress_callback=_on_progress,
)
processed = total or 0
_runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Found {len(setups)} setups")
_log_event(logging.INFO, "job_complete", job=job_name, setups_found=len(setups))
except Exception as exc:
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
except Exception as exc:
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
_runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Outcome Evaluator
# ---------------------------------------------------------------------------
async def evaluate_outcomes() -> None:
"""Evaluate unresolved trade setups against OHLCV data collected since.
Writes actual_outcome / outcome_date / evaluated_at on each decided setup.
Undecided setups stay pending and are re-checked on the next run.
"""
job_name = "outcome_evaluator"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
summary = await evaluate_pending_setups(
db, max_bars=settings.outcome_evaluation_max_bars
)
from app.services import paper_trade_service
closed_trades = await paper_trade_service.resolve_open_trades(db)
_runtime_progress(job_name, processed=1, total=1)
_runtime_finish(
job_name, "completed", processed=1, total=1,
message=f"Evaluated {summary['evaluated']}, pending {summary['still_pending']}, "
f"{closed_trades} paper trade(s) closed",
)
_log_event(logging.INFO, "job_complete", job=job_name, summary=summary)
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Alerts Dispatcher
# ---------------------------------------------------------------------------
async def dispatch_alerts_job() -> None:
"""Push Telegram alerts for qualified setups, S/R proximity, score drops, digest."""
job_name = "alerts"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
result = await dispatch_alerts(db)
_runtime_progress(job_name, processed=1, total=1)
_runtime_finish(
job_name, "completed", processed=1, total=1,
message=f"{result.get('status')}, sent {result.get('sent', 0)}",
)
_log_event(logging.INFO, "job_complete", job=job_name, result=result)
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Market Regime
# ---------------------------------------------------------------------------
async def compute_market_regime() -> None:
"""Refresh the cached benchmark (SPY) trend regime."""
job_name = "market_regime"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
regime = await update_market_regime(db)
_runtime_progress(job_name, processed=1, total=1)
_runtime_finish(
job_name, "completed", processed=1, total=1,
message=f"Regime: {regime.get('label')}",
)
_log_event(logging.INFO, "job_complete", job=job_name, label=regime.get("label"))
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Benchmark Collector (SPY closes for paper-trade alpha)
# ---------------------------------------------------------------------------
async def collect_benchmark() -> None:
"""Refresh the stored benchmark (SPY) daily closes used for paper-trade alpha."""
job_name = "benchmark_collector"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
written = await refresh_benchmark_prices(db)
_runtime_progress(job_name, processed=1, total=1)
_runtime_finish(job_name, "completed", processed=1, total=1, message=f"{written} rows")
_log_event(logging.INFO, "job_complete", job=job_name, rows=written)
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Regime Monitor
# ---------------------------------------------------------------------------
async def compute_regime_monitor() -> None:
"""Refresh the standalone AI/Tech regime-change index (observational only).
Pulls sector/benchmark prices via Alpaca + VIX/credit spreads via FRED,
computes the 0-100 index, and persists a daily snapshot. Output feeds nothing
else — it only powers its own tab. Pipeline membership is scheduling only.
"""
job_name = "regime_monitor"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
result = await update_regime_monitor(db)
_runtime_progress(job_name, processed=1, total=1)
_runtime_finish(
job_name, "completed", processed=1, total=1,
message=f"Index: {result.get('total_score')} ({result.get('band')})",
)
_log_event(logging.INFO, "job_complete", job=job_name, score=result.get("total_score"))
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Backtest
# ---------------------------------------------------------------------------
async def run_backtest_job() -> None:
"""Replay the price-derived engine over history and cache the report."""
job_name = "backtest"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name)
def _on_progress(done: int, count: int, symbol: str) -> None:
_runtime_progress(job_name, processed=done, total=count, current_ticker=symbol or None)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
report = await run_backtest_and_store(db, _on_progress)
_runtime_finish(
job_name, "completed",
processed=report.get("tickers", 0), total=report.get("tickers", 0),
message=f"{report.get('candidates', 0)} setups, {report.get('qualified', 0)} qualified",
)
_log_event(logging.INFO, "job_complete", job=job_name, candidates=report.get("candidates"))
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=None, message=str(exc))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Event Study (manual)
# ---------------------------------------------------------------------------
async def run_event_study_job() -> None:
"""Measure indicator lead time vs. historical drawdowns and cache the report.
Manual only (never auto-fires) — it does a universe-wide OHLCV scan. Triggered
from Admin → Jobs when you want to re-run the early-warning measurement.
"""
job_name = "event_study"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
report = await run_event_study_and_store(db)
_runtime_progress(job_name, processed=1, total=1)
if report.get("available"):
msg = f"{len(report.get('events', []))} events, lead Δ {report.get('lead_delta_days')}d"
else:
msg = report.get("reason", "no data")
_runtime_finish(job_name, "completed", processed=1, total=1, message=msg)
_log_event(logging.INFO, "job_complete", job=job_name, events=len(report.get("events", [])))
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Ticker Universe Sync
# ---------------------------------------------------------------------------
async def sync_ticker_universe() -> None:
"""Sync tracked tickers from configured default universe.
Setting key: ticker_universe_default (sp500 | nasdaq100 | nasdaq_all)
"""
job_name = "ticker_universe_sync"
_log_event(logging.INFO, "job_start", job=job_name)
_runtime_start(job_name, total=1)
try:
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled")
return
universe = (await settings_store.get_value(db, "ticker_universe_default", "sp500")).strip().lower()
async with async_session_factory() as db:
summary = await bootstrap_universe(db, universe, prune_missing=False)
_runtime_progress(job_name, processed=1, total=1)
_runtime_finish(job_name, "completed", processed=1, total=1, message=f"Synced {universe}")
_log_event(logging.INFO, "job_complete", job=job_name, universe=universe, summary=summary)
except Exception as exc:
_runtime_finish(job_name, "error", processed=0, total=1, message=str(exc))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
# ---------------------------------------------------------------------------
# Job: Daily Pipeline (orchestrator)
# ---------------------------------------------------------------------------
# Steps run in dependency order: each uses fresh output from the previous one.
# (name, coroutine) — the names match the individual jobs so each step still
# updates its own runtime status while the pipeline runs.
#
# Daily (full): the complete data→signal refresh, once a day.
_DAILY_PIPELINE_STEPS = [
("data_collector", "collect_ohlcv"),
("benchmark_collector", "collect_benchmark"),
("sentiment_collector", "collect_sentiment"),
("rr_scanner", "scan_rr"),
("outcome_evaluator", "evaluate_outcomes"),
("market_regime", "compute_market_regime"),
# Observational only — runs here for scheduling; its output feeds nothing else.
("regime_monitor", "compute_regime_monitor"),
]
# Intraday (light): keep prices current and resolve outcomes through the day,
# without the expensive scan/sentiment. The dashboard recomputes live R:R from
# the latest price, so refreshing OHLCV is enough to stop prices lagging; the
# outcome step also closes paper trades that hit their stop/target intraday.
_INTRADAY_PIPELINE_STEPS = [
("data_collector", "collect_ohlcv"),
("outcome_evaluator", "evaluate_outcomes"),
]
async def _run_pipeline(job_name: str, steps: list[tuple[str, str]]) -> None:
"""Run an ordered list of (step_name, coroutine_name) steps.
Each step respects its own enable flag and manages its own runtime status; a
failing step is logged and the pipeline continues with the next one.
"""
_log_event(logging.INFO, "job_start", job=job_name)
async with async_session_factory() as db:
if not await _is_job_enabled(db, job_name):
_log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled")
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
return
total = len(steps)
_runtime_start(job_name, total=total)
funcs = globals()
done = 0
try:
for step_name, func_name in steps:
_runtime_progress(job_name, processed=done, total=total, current_ticker=step_name)
try:
await funcs[func_name]()
except Exception:
logger.exception("%s step %s failed", job_name, step_name)
done += 1
_runtime_finish(job_name, "completed", processed=done, total=total, message="Pipeline complete")
_log_event(logging.INFO, "job_complete", job=job_name)
except Exception as exc:
_runtime_finish(job_name, "error", processed=done, total=total, message=str(exc))
_log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc))
async def run_daily_pipeline() -> None:
"""Full daily flow: OHLCV → benchmark → sentiment → R:R scan → outcome eval
(+paper close) → market regime."""
await _run_pipeline("daily_pipeline", _DAILY_PIPELINE_STEPS)
async def run_intraday_pipeline() -> None:
"""Light intraday flow: refresh OHLCV → evaluate outcomes (+paper close)."""
await _run_pipeline("intraday_pipeline", _INTRADAY_PIPELINE_STEPS)
# ---------------------------------------------------------------------------
# Frequency helpers
# ---------------------------------------------------------------------------
_FREQUENCY_MAP: dict[str, dict[str, int]] = {
"hourly": {"hours": 1},
"daily": {"hours": 24},
"weekly": {"weeks": 1},
}
def _parse_frequency(freq: str) -> dict[str, int]:
"""Convert a frequency string to APScheduler interval kwargs."""
return _FREQUENCY_MAP.get(freq.lower(), {"hours": 24})
# ---------------------------------------------------------------------------
# Schedule config (cron, admin-configurable)
# ---------------------------------------------------------------------------
#
# The cron-driven jobs read their schedule from SystemSettings so it can be
# tuned from Admin → Jobs without a redeploy. A wall-clock CronTrigger also fixes
# the interval-trigger pitfall: an interval job resets its countdown to now+N on
# every process restart, so on a box that's redeployed often it can keep being
# deferred and never fire. Cron fires at a fixed local time regardless.
SCHEDULE_DEFAULTS: dict[str, str] = {
"schedule_timezone": "Europe/Berlin",
"schedule_daily_pipeline_cron": "0 7 * * *", # full refresh, ready by ~8am
"schedule_intraday_pipeline_cron": "0 14-22 * * 1-5", # hourly across the US session
"schedule_fundamentals_cron": "0 4 * * 1", # weekly, early Monday (slow job)
}
# job id -> schedule setting key
_CRON_JOBS: dict[str, str] = {
"daily_pipeline": "schedule_daily_pipeline_cron",
"intraday_pipeline": "schedule_intraday_pipeline_cron",
"fundamental_collector": "schedule_fundamentals_cron",
}
def validate_cron(expr: str, timezone: str) -> None:
"""Raise ValueError if the cron expression or timezone is invalid."""
CronTrigger.from_crontab((expr or "").strip(), timezone=(timezone or "").strip())
def _cron_trigger(expr: str, timezone: str, fallback_key: str) -> CronTrigger:
"""Build a CronTrigger, falling back to the default (UTC) on a bad value."""
try:
return CronTrigger.from_crontab(expr.strip(), timezone=timezone.strip())
except Exception:
_log_event(logging.WARNING, "invalid_cron", expr=expr, timezone=timezone, fallback=SCHEDULE_DEFAULTS[fallback_key])
return CronTrigger.from_crontab(SCHEDULE_DEFAULTS[fallback_key], timezone="UTC")
async def load_schedule_config(db: AsyncSession) -> dict[str, str]:
"""Read the cron schedule config from SystemSettings, defaults for any unset."""
stored = await settings_store.get_map(db, SCHEDULE_DEFAULTS)
return {key: (stored.get(key) or default) for key, default in SCHEDULE_DEFAULTS.items()}
def reschedule_jobs(schedule_config: dict[str, str]) -> dict[str, str]:
"""Re-apply cron triggers to the running scheduler after a settings change."""
tz = schedule_config.get("schedule_timezone") or SCHEDULE_DEFAULTS["schedule_timezone"]
applied: dict[str, str] = {}
for job_id, key in _CRON_JOBS.items():
if scheduler.get_job(job_id) is None:
continue
expr = schedule_config.get(key) or SCHEDULE_DEFAULTS[key]
scheduler.reschedule_job(job_id, trigger=_cron_trigger(expr, tz, key))
applied[job_id] = expr
_log_event(logging.INFO, "jobs_rescheduled", applied=applied, timezone=tz)
return applied
# ---------------------------------------------------------------------------
# Scheduler setup
# ---------------------------------------------------------------------------
def configure_scheduler(schedule_config: dict[str, str] | None = None) -> None:
"""Add all jobs to the scheduler.
Call this once before scheduler.start(). Removes any existing jobs first to
ensure idempotency. ``schedule_config`` supplies the cron strings + timezone
for the cron-driven jobs (daily/intraday pipelines, fundamentals); defaults
are used for anything missing.
"""
cfg = {**SCHEDULE_DEFAULTS, **(schedule_config or {})}
tz = cfg["schedule_timezone"]
scheduler.remove_all_jobs()
# Pipeline members: registered but PAUSED (next_run_time=None) so they never
# auto-fire on their own timer — the pipelines drive them in order. The long
# interval is just a backstop after a manual trigger (which re-arms an
# interval job). They stay manually triggerable from Admin → Jobs.
_members = [
(collect_ohlcv, "data_collector", "Data Collector (OHLCV)"),
(collect_benchmark, "benchmark_collector", "Benchmark Collector"),
(collect_sentiment, "sentiment_collector", "Sentiment Collector"),
(scan_rr, "rr_scanner", "R:R Scanner"),
(evaluate_outcomes, "outcome_evaluator", "Outcome Evaluator"),
(compute_market_regime, "market_regime", "Market Regime"),
(compute_regime_monitor, "regime_monitor", "Regime Monitor"),
]
for fn, job_id, job_name in _members:
scheduler.add_job(
fn, "interval", weeks=520, id=job_id, name=job_name,
replace_existing=True, next_run_time=None,
)
# Cron-driven jobs (admin-configurable times)
scheduler.add_job(
run_daily_pipeline,
_cron_trigger(cfg["schedule_daily_pipeline_cron"], tz, "schedule_daily_pipeline_cron"),
id="daily_pipeline", name="Daily Pipeline", replace_existing=True,
)
scheduler.add_job(
run_intraday_pipeline,
_cron_trigger(cfg["schedule_intraday_pipeline_cron"], tz, "schedule_intraday_pipeline_cron"),
id="intraday_pipeline", name="Intraday Pipeline", replace_existing=True,
)
# Fundamentals — quarterly-ish data; weekly by default (conserves API quota).
# Its own early cron so the slow, rate-limited fetch finishes before the day.
scheduler.add_job(
collect_fundamentals,
_cron_trigger(cfg["schedule_fundamentals_cron"], tz, "schedule_fundamentals_cron"),
id="fundamental_collector", name="Fundamental Collector", replace_existing=True,
)
# Independent interval jobs (own cadence, no ordering dependency)
scheduler.add_job(
sync_ticker_universe, "interval", hours=24,
id="ticker_universe_sync", name="Ticker Universe Sync", replace_existing=True,
)
alerts_interval = _parse_frequency(settings.alerts_frequency)
scheduler.add_job(
dispatch_alerts_job, "interval", **alerts_interval,
id="alerts", name="Alerts Dispatcher", replace_existing=True,
)
scheduler.add_job(
run_backtest_job, "interval", hours=168,
id="backtest", name="Backtest", replace_existing=True,
)
# Deep history backfill: manual only (never auto-fires); triggered from
# Admin → Jobs when histories need deepening.
scheduler.add_job(
backfill_ohlcv, "interval", weeks=520,
id="data_backfill", name="Data Backfill (deep history)",
replace_existing=True, next_run_time=None,
)
# Event study: manual only (universe-wide scan); triggered from Admin → Jobs.
scheduler.add_job(
run_event_study_job, "interval", weeks=520,
id="event_study", name="Event Study",
replace_existing=True, next_run_time=None,
)
_log_event(logging.INFO, "scheduler_configured", timezone=tz, daily_pipeline={
"cron": cfg["schedule_daily_pipeline_cron"],
"steps": [name for name, _ in _DAILY_PIPELINE_STEPS],
}, intraday_pipeline={
"cron": cfg["schedule_intraday_pipeline_cron"],
"steps": [name for name, _ in _INTRADAY_PIPELINE_STEPS],
}, fundamental_collector={"cron": cfg["schedule_fundamentals_cron"]}, independent=["ticker_universe_sync", "alerts", "backtest"])