From e982487abd6646fc64bbcebf19004eac49f80f44 Mon Sep 17 00:00:00 2001 From: Dennis Thiessen Date: Wed, 17 Jun 2026 10:16:41 +0200 Subject: [PATCH] coordinate jobs: daily pipeline orchestrator runs the flow in order MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Jobs were independent 24h timers with no ordering, so the scanner could run on stale OHLCV, and manual runs desynced the offsets. New daily_pipeline job runs the data→signal flow in dependency order: OHLCV → fundamentals → sentiment → R:R scan → outcome eval (+paper close) → market regime. Each step keeps its own enable flag and runtime status; a failing step is logged and the pipeline continues. The member jobs are registered PAUSED (no auto-fire) so they only run via the pipeline — but stay manually triggerable from Admin → Jobs (shown as "runs in daily pipeline"). Alerts (hourly), ticker universe sync, and backtest keep their own independent cadence. Co-Authored-By: Claude Opus 4.8 --- app/scheduler.py | 180 +++++++++--------- app/services/admin_service.py | 13 ++ frontend/src/api/admin.ts | 1 + frontend/src/components/admin/JobControls.tsx | 12 +- tests/unit/test_scheduler.py | 2 + 5 files changed, 114 insertions(+), 94 deletions(-) diff --git a/app/scheduler.py b/app/scheduler.py index aed1a6f..ee13fcd 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -157,6 +157,17 @@ _job_runtime: dict[str, dict[str, object]] = { "finished_at": None, "message": None, }, + "daily_pipeline": { + "running": False, + "status": "idle", + "processed": 0, + "total": None, + "progress_pct": None, + "current_ticker": None, + "started_at": None, + "finished_at": None, + "message": None, + }, } @@ -982,6 +993,55 @@ async def sync_ticker_universe() -> None: })) +# --------------------------------------------------------------------------- +# Job: Daily Pipeline (orchestrator) +# --------------------------------------------------------------------------- + +# Steps run in dependency order: each uses fresh output from the previous one. +# (name, coroutine) — the names match the individual jobs so each step still +# updates its own runtime status while the pipeline runs. +_PIPELINE_STEPS = [ + ("data_collector", "collect_ohlcv"), + ("fundamental_collector", "collect_fundamentals"), + ("sentiment_collector", "collect_sentiment"), + ("rr_scanner", "scan_rr"), + ("outcome_evaluator", "evaluate_outcomes"), + ("market_regime", "compute_market_regime"), +] + + +async def run_daily_pipeline() -> None: + """Run the daily data→signal flow in dependency order. + + OHLCV → fundamentals → sentiment → R:R scan → outcome eval (+paper close) → + market regime. Each step respects its own enable flag and manages its own + runtime status; a failing step is logged and the pipeline continues. + """ + job_name = "daily_pipeline" + logger.info(json.dumps({"event": "job_start", "job": job_name})) + total = len(_PIPELINE_STEPS) + _runtime_start(job_name, total=total) + + funcs = globals() + done = 0 + try: + for step_name, func_name in _PIPELINE_STEPS: + _runtime_progress(job_name, processed=done, total=total, current_ticker=step_name) + try: + await funcs[func_name]() + except Exception: + logger.exception("Daily pipeline step %s failed", step_name) + done += 1 + _runtime_finish(job_name, "completed", processed=done, total=total, message="Pipeline complete") + logger.info(json.dumps({"event": "job_complete", "job": job_name})) + except Exception as exc: + _runtime_finish(job_name, "error", processed=done, total=total, message=str(exc)) + logger.error(json.dumps({ + "event": "job_error", "job": job_name, + "error_type": type(exc).__name__, "message": str(exc), + })) + + # --------------------------------------------------------------------------- # Frequency helpers # --------------------------------------------------------------------------- @@ -1010,109 +1070,49 @@ def configure_scheduler() -> None: """ scheduler.remove_all_jobs() - # Data Collector — configurable frequency (default: hourly) - ohlcv_interval = _parse_frequency(settings.data_collector_frequency) + # Pipeline members: registered but PAUSED (next_run_time=None) so they never + # auto-fire on their own timer — the daily_pipeline drives them in order. The + # long interval is just a backstop after a manual trigger (which re-arms an + # interval job). They stay manually triggerable from Admin → Jobs. + _members = [ + (collect_ohlcv, "data_collector", "Data Collector (OHLCV)"), + (collect_fundamentals, "fundamental_collector", "Fundamental Collector"), + (collect_sentiment, "sentiment_collector", "Sentiment Collector"), + (scan_rr, "rr_scanner", "R:R Scanner"), + (evaluate_outcomes, "outcome_evaluator", "Outcome Evaluator"), + (compute_market_regime, "market_regime", "Market Regime"), + ] + for fn, job_id, job_name in _members: + scheduler.add_job( + fn, "interval", weeks=520, id=job_id, name=job_name, + replace_existing=True, next_run_time=None, + ) + + # Daily Pipeline — the single ordered daily flow scheduler.add_job( - collect_ohlcv, - "interval", - **ohlcv_interval, - id="data_collector", - name="Data Collector (OHLCV)", - replace_existing=True, + run_daily_pipeline, "interval", hours=24, + id="daily_pipeline", name="Daily Pipeline", replace_existing=True, ) - # Sentiment Collector — default 30 min + # Independent jobs (own cadence, no ordering dependency) scheduler.add_job( - collect_sentiment, - "interval", - minutes=settings.sentiment_poll_interval_minutes, - id="sentiment_collector", - name="Sentiment Collector", - replace_existing=True, + sync_ticker_universe, "interval", hours=24, + id="ticker_universe_sync", name="Ticker Universe Sync", replace_existing=True, ) - - # Fundamental Collector — configurable frequency (default: daily) - fund_interval = _parse_frequency(settings.fundamental_fetch_frequency) - scheduler.add_job( - collect_fundamentals, - "interval", - **fund_interval, - id="fundamental_collector", - name="Fundamental Collector", - replace_existing=True, - ) - - # R:R Scanner — configurable frequency (default: hourly) - rr_interval = _parse_frequency(settings.rr_scan_frequency) - scheduler.add_job( - scan_rr, - "interval", - **rr_interval, - id="rr_scanner", - name="R:R Scanner", - replace_existing=True, - ) - - # Universe Sync — nightly - scheduler.add_job( - sync_ticker_universe, - "interval", - hours=24, - id="ticker_universe_sync", - name="Ticker Universe Sync", - replace_existing=True, - ) - - # Outcome Evaluator — nightly, after fresh OHLCV has been collected - scheduler.add_job( - evaluate_outcomes, - "interval", - hours=24, - id="outcome_evaluator", - name="Outcome Evaluator", - replace_existing=True, - ) - - # Alerts Dispatcher — configurable frequency (default: hourly) alerts_interval = _parse_frequency(settings.alerts_frequency) scheduler.add_job( - dispatch_alerts_job, - "interval", - **alerts_interval, - id="alerts", - name="Alerts Dispatcher", - replace_existing=True, + dispatch_alerts_job, "interval", **alerts_interval, + id="alerts", name="Alerts Dispatcher", replace_existing=True, ) - - # Market Regime — nightly benchmark trend refresh scheduler.add_job( - compute_market_regime, - "interval", - hours=24, - id="market_regime", - name="Market Regime", - replace_existing=True, - ) - - # Backtest — weekly historical replay (expensive; mostly run on demand) - scheduler.add_job( - run_backtest_job, - "interval", - hours=168, - id="backtest", - name="Backtest", - replace_existing=True, + run_backtest_job, "interval", hours=168, + id="backtest", name="Backtest", replace_existing=True, ) logger.info( json.dumps({ "event": "scheduler_configured", - "jobs": { - "data_collector": ohlcv_interval, - "sentiment_collector": {"minutes": settings.sentiment_poll_interval_minutes}, - "fundamental_collector": fund_interval, - "rr_scanner": rr_interval, - "ticker_universe_sync": {"hours": 24}, - }, + "daily_pipeline": [name for name, _ in _PIPELINE_STEPS], + "independent": ["ticker_universe_sync", "alerts", "backtest"], }) ) diff --git a/app/services/admin_service.py b/app/services/admin_service.py index a5cbe27..63545d2 100644 --- a/app/services/admin_service.py +++ b/app/services/admin_service.py @@ -485,6 +485,7 @@ VALID_JOB_NAMES = { "alerts", "market_regime", "backtest", + "daily_pipeline", } JOB_LABELS = { @@ -497,6 +498,17 @@ JOB_LABELS = { "alerts": "Alerts Dispatcher", "market_regime": "Market Regime", "backtest": "Backtest", + "daily_pipeline": "Daily Pipeline", +} + +# Jobs driven by the daily_pipeline (in order) rather than their own timer. +PIPELINE_MEMBERS = { + "data_collector", + "fundamental_collector", + "sentiment_collector", + "rr_scanner", + "outcome_evaluator", + "market_regime", } @@ -527,6 +539,7 @@ async def list_jobs(db: AsyncSession) -> list[dict]: "label": JOB_LABELS.get(name, name), "enabled": enabled, "next_run_at": next_run, + "via_pipeline": name in PIPELINE_MEMBERS, "registered": job is not None, "running": bool(runtime.get("running", False)), "runtime_status": runtime.get("status"), diff --git a/frontend/src/api/admin.ts b/frontend/src/api/admin.ts index a892540..d66a2eb 100644 --- a/frontend/src/api/admin.ts +++ b/frontend/src/api/admin.ts @@ -162,6 +162,7 @@ export interface JobStatus { label: string; enabled: boolean; next_run_at: string | null; + via_pipeline?: boolean; registered: boolean; running?: boolean; runtime_status?: string | null; diff --git a/frontend/src/components/admin/JobControls.tsx b/frontend/src/components/admin/JobControls.tsx index 4faa2ee..68356b3 100644 --- a/frontend/src/components/admin/JobControls.tsx +++ b/frontend/src/components/admin/JobControls.tsx @@ -146,10 +146,14 @@ export function JobControls() { ? 'Active' : 'Inactive'} - {job.enabled && job.next_run_at && ( - - Next run {formatNextRun(job.next_run_at)} - + {job.via_pipeline ? ( + runs in daily pipeline + ) : ( + job.enabled && job.next_run_at && ( + + Next run {formatNextRun(job.next_run_at)} + + ) )} {!job.registered && ( Not registered diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index ad4fe6c..d82bb44 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -84,6 +84,7 @@ class TestConfigureScheduler: "alerts", "market_regime", "backtest", + "daily_pipeline", } def test_configure_is_idempotent(self): @@ -95,6 +96,7 @@ class TestConfigureScheduler: assert sorted(job_ids) == sorted([ "alerts", "backtest", + "daily_pipeline", "data_collector", "fundamental_collector", "market_regime",