coordinate jobs: daily pipeline orchestrator runs the flow in order
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 39s
Deploy / deploy (push) Successful in 25s

Jobs were independent 24h timers with no ordering, so the scanner could run on
stale OHLCV, and manual runs desynced the offsets. New daily_pipeline job runs
the data→signal flow in dependency order: OHLCV → fundamentals → sentiment →
R:R scan → outcome eval (+paper close) → market regime. Each step keeps its own
enable flag and runtime status; a failing step is logged and the pipeline
continues.

The member jobs are registered PAUSED (no auto-fire) so they only run via the
pipeline — but stay manually triggerable from Admin → Jobs (shown as "runs in
daily pipeline"). Alerts (hourly), ticker universe sync, and backtest keep their
own independent cadence.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-17 10:16:41 +02:00
parent fb3b8d18d7
commit e982487abd
5 changed files with 114 additions and 94 deletions
+90 -90
View File
@@ -157,6 +157,17 @@ _job_runtime: dict[str, dict[str, object]] = {
"finished_at": None,
"message": None,
},
"daily_pipeline": {
"running": False,
"status": "idle",
"processed": 0,
"total": None,
"progress_pct": None,
"current_ticker": None,
"started_at": None,
"finished_at": None,
"message": None,
},
}
@@ -982,6 +993,55 @@ async def sync_ticker_universe() -> None:
}))
# ---------------------------------------------------------------------------
# Job: Daily Pipeline (orchestrator)
# ---------------------------------------------------------------------------
# Steps run in dependency order: each uses fresh output from the previous one.
# (name, coroutine) — the names match the individual jobs so each step still
# updates its own runtime status while the pipeline runs.
_PIPELINE_STEPS = [
("data_collector", "collect_ohlcv"),
("fundamental_collector", "collect_fundamentals"),
("sentiment_collector", "collect_sentiment"),
("rr_scanner", "scan_rr"),
("outcome_evaluator", "evaluate_outcomes"),
("market_regime", "compute_market_regime"),
]
async def run_daily_pipeline() -> None:
"""Run the daily data→signal flow in dependency order.
OHLCV → fundamentals → sentiment → R:R scan → outcome eval (+paper close) →
market regime. Each step respects its own enable flag and manages its own
runtime status; a failing step is logged and the pipeline continues.
"""
job_name = "daily_pipeline"
logger.info(json.dumps({"event": "job_start", "job": job_name}))
total = len(_PIPELINE_STEPS)
_runtime_start(job_name, total=total)
funcs = globals()
done = 0
try:
for step_name, func_name in _PIPELINE_STEPS:
_runtime_progress(job_name, processed=done, total=total, current_ticker=step_name)
try:
await funcs[func_name]()
except Exception:
logger.exception("Daily pipeline step %s failed", step_name)
done += 1
_runtime_finish(job_name, "completed", processed=done, total=total, message="Pipeline complete")
logger.info(json.dumps({"event": "job_complete", "job": job_name}))
except Exception as exc:
_runtime_finish(job_name, "error", processed=done, total=total, message=str(exc))
logger.error(json.dumps({
"event": "job_error", "job": job_name,
"error_type": type(exc).__name__, "message": str(exc),
}))
# ---------------------------------------------------------------------------
# Frequency helpers
# ---------------------------------------------------------------------------
@@ -1010,109 +1070,49 @@ def configure_scheduler() -> None:
"""
scheduler.remove_all_jobs()
# Data Collector — configurable frequency (default: hourly)
ohlcv_interval = _parse_frequency(settings.data_collector_frequency)
# Pipeline members: registered but PAUSED (next_run_time=None) so they never
# auto-fire on their own timer — the daily_pipeline drives them in order. The
# long interval is just a backstop after a manual trigger (which re-arms an
# interval job). They stay manually triggerable from Admin → Jobs.
_members = [
(collect_ohlcv, "data_collector", "Data Collector (OHLCV)"),
(collect_fundamentals, "fundamental_collector", "Fundamental Collector"),
(collect_sentiment, "sentiment_collector", "Sentiment Collector"),
(scan_rr, "rr_scanner", "R:R Scanner"),
(evaluate_outcomes, "outcome_evaluator", "Outcome Evaluator"),
(compute_market_regime, "market_regime", "Market Regime"),
]
for fn, job_id, job_name in _members:
scheduler.add_job(
fn, "interval", weeks=520, id=job_id, name=job_name,
replace_existing=True, next_run_time=None,
)
# Daily Pipeline — the single ordered daily flow
scheduler.add_job(
collect_ohlcv,
"interval",
**ohlcv_interval,
id="data_collector",
name="Data Collector (OHLCV)",
replace_existing=True,
run_daily_pipeline, "interval", hours=24,
id="daily_pipeline", name="Daily Pipeline", replace_existing=True,
)
# Sentiment Collector — default 30 min
# Independent jobs (own cadence, no ordering dependency)
scheduler.add_job(
collect_sentiment,
"interval",
minutes=settings.sentiment_poll_interval_minutes,
id="sentiment_collector",
name="Sentiment Collector",
replace_existing=True,
sync_ticker_universe, "interval", hours=24,
id="ticker_universe_sync", name="Ticker Universe Sync", replace_existing=True,
)
# Fundamental Collector — configurable frequency (default: daily)
fund_interval = _parse_frequency(settings.fundamental_fetch_frequency)
scheduler.add_job(
collect_fundamentals,
"interval",
**fund_interval,
id="fundamental_collector",
name="Fundamental Collector",
replace_existing=True,
)
# R:R Scanner — configurable frequency (default: hourly)
rr_interval = _parse_frequency(settings.rr_scan_frequency)
scheduler.add_job(
scan_rr,
"interval",
**rr_interval,
id="rr_scanner",
name="R:R Scanner",
replace_existing=True,
)
# Universe Sync — nightly
scheduler.add_job(
sync_ticker_universe,
"interval",
hours=24,
id="ticker_universe_sync",
name="Ticker Universe Sync",
replace_existing=True,
)
# Outcome Evaluator — nightly, after fresh OHLCV has been collected
scheduler.add_job(
evaluate_outcomes,
"interval",
hours=24,
id="outcome_evaluator",
name="Outcome Evaluator",
replace_existing=True,
)
# Alerts Dispatcher — configurable frequency (default: hourly)
alerts_interval = _parse_frequency(settings.alerts_frequency)
scheduler.add_job(
dispatch_alerts_job,
"interval",
**alerts_interval,
id="alerts",
name="Alerts Dispatcher",
replace_existing=True,
dispatch_alerts_job, "interval", **alerts_interval,
id="alerts", name="Alerts Dispatcher", replace_existing=True,
)
# Market Regime — nightly benchmark trend refresh
scheduler.add_job(
compute_market_regime,
"interval",
hours=24,
id="market_regime",
name="Market Regime",
replace_existing=True,
)
# Backtest — weekly historical replay (expensive; mostly run on demand)
scheduler.add_job(
run_backtest_job,
"interval",
hours=168,
id="backtest",
name="Backtest",
replace_existing=True,
run_backtest_job, "interval", hours=168,
id="backtest", name="Backtest", replace_existing=True,
)
logger.info(
json.dumps({
"event": "scheduler_configured",
"jobs": {
"data_collector": ohlcv_interval,
"sentiment_collector": {"minutes": settings.sentiment_poll_interval_minutes},
"fundamental_collector": fund_interval,
"rr_scanner": rr_interval,
"ticker_universe_sync": {"hours": 24},
},
"daily_pipeline": [name for name, _ in _PIPELINE_STEPS],
"independent": ["ticker_universe_sync", "alerts", "backtest"],
})
)
+13
View File
@@ -485,6 +485,7 @@ VALID_JOB_NAMES = {
"alerts",
"market_regime",
"backtest",
"daily_pipeline",
}
JOB_LABELS = {
@@ -497,6 +498,17 @@ JOB_LABELS = {
"alerts": "Alerts Dispatcher",
"market_regime": "Market Regime",
"backtest": "Backtest",
"daily_pipeline": "Daily Pipeline",
}
# Jobs driven by the daily_pipeline (in order) rather than their own timer.
PIPELINE_MEMBERS = {
"data_collector",
"fundamental_collector",
"sentiment_collector",
"rr_scanner",
"outcome_evaluator",
"market_regime",
}
@@ -527,6 +539,7 @@ async def list_jobs(db: AsyncSession) -> list[dict]:
"label": JOB_LABELS.get(name, name),
"enabled": enabled,
"next_run_at": next_run,
"via_pipeline": name in PIPELINE_MEMBERS,
"registered": job is not None,
"running": bool(runtime.get("running", False)),
"runtime_status": runtime.get("status"),
+1
View File
@@ -162,6 +162,7 @@ export interface JobStatus {
label: string;
enabled: boolean;
next_run_at: string | null;
via_pipeline?: boolean;
registered: boolean;
running?: boolean;
runtime_status?: string | null;
@@ -146,10 +146,14 @@ export function JobControls() {
? 'Active'
: 'Inactive'}
</span>
{job.enabled && job.next_run_at && (
<span className="text-[11px] text-gray-500">
Next run {formatNextRun(job.next_run_at)}
</span>
{job.via_pipeline ? (
<span className="text-[11px] text-gray-500">runs in daily pipeline</span>
) : (
job.enabled && job.next_run_at && (
<span className="text-[11px] text-gray-500">
Next run {formatNextRun(job.next_run_at)}
</span>
)
)}
{!job.registered && (
<span className="text-[11px] text-red-400">Not registered</span>
+2
View File
@@ -84,6 +84,7 @@ class TestConfigureScheduler:
"alerts",
"market_regime",
"backtest",
"daily_pipeline",
}
def test_configure_is_idempotent(self):
@@ -95,6 +96,7 @@ class TestConfigureScheduler:
assert sorted(job_ids) == sorted([
"alerts",
"backtest",
"daily_pipeline",
"data_collector",
"fundamental_collector",
"market_regime",