diff --git a/app/scheduler.py b/app/scheduler.py
index aed1a6f..ee13fcd 100644
--- a/app/scheduler.py
+++ b/app/scheduler.py
@@ -157,6 +157,17 @@ _job_runtime: dict[str, dict[str, object]] = {
"finished_at": None,
"message": None,
},
+ "daily_pipeline": {
+ "running": False,
+ "status": "idle",
+ "processed": 0,
+ "total": None,
+ "progress_pct": None,
+ "current_ticker": None,
+ "started_at": None,
+ "finished_at": None,
+ "message": None,
+ },
}
@@ -982,6 +993,55 @@ async def sync_ticker_universe() -> None:
}))
+# ---------------------------------------------------------------------------
+# Job: Daily Pipeline (orchestrator)
+# ---------------------------------------------------------------------------
+
+# Steps run in dependency order: each uses fresh output from the previous one.
+# (name, coroutine) — the names match the individual jobs so each step still
+# updates its own runtime status while the pipeline runs.
+_PIPELINE_STEPS = [
+ ("data_collector", "collect_ohlcv"),
+ ("fundamental_collector", "collect_fundamentals"),
+ ("sentiment_collector", "collect_sentiment"),
+ ("rr_scanner", "scan_rr"),
+ ("outcome_evaluator", "evaluate_outcomes"),
+ ("market_regime", "compute_market_regime"),
+]
+
+
+async def run_daily_pipeline() -> None:
+ """Run the daily data→signal flow in dependency order.
+
+ OHLCV → fundamentals → sentiment → R:R scan → outcome eval (+paper close) →
+ market regime. Each step respects its own enable flag and manages its own
+ runtime status; a failing step is logged and the pipeline continues.
+ """
+ job_name = "daily_pipeline"
+ logger.info(json.dumps({"event": "job_start", "job": job_name}))
+ total = len(_PIPELINE_STEPS)
+ _runtime_start(job_name, total=total)
+
+ funcs = globals()
+ done = 0
+ try:
+ for step_name, func_name in _PIPELINE_STEPS:
+ _runtime_progress(job_name, processed=done, total=total, current_ticker=step_name)
+ try:
+ await funcs[func_name]()
+ except Exception:
+ logger.exception("Daily pipeline step %s failed", step_name)
+ done += 1
+ _runtime_finish(job_name, "completed", processed=done, total=total, message="Pipeline complete")
+ logger.info(json.dumps({"event": "job_complete", "job": job_name}))
+ except Exception as exc:
+ _runtime_finish(job_name, "error", processed=done, total=total, message=str(exc))
+ logger.error(json.dumps({
+ "event": "job_error", "job": job_name,
+ "error_type": type(exc).__name__, "message": str(exc),
+ }))
+
+
# ---------------------------------------------------------------------------
# Frequency helpers
# ---------------------------------------------------------------------------
@@ -1010,109 +1070,49 @@ def configure_scheduler() -> None:
"""
scheduler.remove_all_jobs()
- # Data Collector — configurable frequency (default: hourly)
- ohlcv_interval = _parse_frequency(settings.data_collector_frequency)
+ # Pipeline members: registered but PAUSED (next_run_time=None) so they never
+ # auto-fire on their own timer — the daily_pipeline drives them in order. The
+ # long interval is just a backstop after a manual trigger (which re-arms an
+ # interval job). They stay manually triggerable from Admin → Jobs.
+ _members = [
+ (collect_ohlcv, "data_collector", "Data Collector (OHLCV)"),
+ (collect_fundamentals, "fundamental_collector", "Fundamental Collector"),
+ (collect_sentiment, "sentiment_collector", "Sentiment Collector"),
+ (scan_rr, "rr_scanner", "R:R Scanner"),
+ (evaluate_outcomes, "outcome_evaluator", "Outcome Evaluator"),
+ (compute_market_regime, "market_regime", "Market Regime"),
+ ]
+ for fn, job_id, job_name in _members:
+ scheduler.add_job(
+ fn, "interval", weeks=520, id=job_id, name=job_name,
+ replace_existing=True, next_run_time=None,
+ )
+
+ # Daily Pipeline — the single ordered daily flow
scheduler.add_job(
- collect_ohlcv,
- "interval",
- **ohlcv_interval,
- id="data_collector",
- name="Data Collector (OHLCV)",
- replace_existing=True,
+ run_daily_pipeline, "interval", hours=24,
+ id="daily_pipeline", name="Daily Pipeline", replace_existing=True,
)
- # Sentiment Collector — default 30 min
+ # Independent jobs (own cadence, no ordering dependency)
scheduler.add_job(
- collect_sentiment,
- "interval",
- minutes=settings.sentiment_poll_interval_minutes,
- id="sentiment_collector",
- name="Sentiment Collector",
- replace_existing=True,
+ sync_ticker_universe, "interval", hours=24,
+ id="ticker_universe_sync", name="Ticker Universe Sync", replace_existing=True,
)
-
- # Fundamental Collector — configurable frequency (default: daily)
- fund_interval = _parse_frequency(settings.fundamental_fetch_frequency)
- scheduler.add_job(
- collect_fundamentals,
- "interval",
- **fund_interval,
- id="fundamental_collector",
- name="Fundamental Collector",
- replace_existing=True,
- )
-
- # R:R Scanner — configurable frequency (default: hourly)
- rr_interval = _parse_frequency(settings.rr_scan_frequency)
- scheduler.add_job(
- scan_rr,
- "interval",
- **rr_interval,
- id="rr_scanner",
- name="R:R Scanner",
- replace_existing=True,
- )
-
- # Universe Sync — nightly
- scheduler.add_job(
- sync_ticker_universe,
- "interval",
- hours=24,
- id="ticker_universe_sync",
- name="Ticker Universe Sync",
- replace_existing=True,
- )
-
- # Outcome Evaluator — nightly, after fresh OHLCV has been collected
- scheduler.add_job(
- evaluate_outcomes,
- "interval",
- hours=24,
- id="outcome_evaluator",
- name="Outcome Evaluator",
- replace_existing=True,
- )
-
- # Alerts Dispatcher — configurable frequency (default: hourly)
alerts_interval = _parse_frequency(settings.alerts_frequency)
scheduler.add_job(
- dispatch_alerts_job,
- "interval",
- **alerts_interval,
- id="alerts",
- name="Alerts Dispatcher",
- replace_existing=True,
+ dispatch_alerts_job, "interval", **alerts_interval,
+ id="alerts", name="Alerts Dispatcher", replace_existing=True,
)
-
- # Market Regime — nightly benchmark trend refresh
scheduler.add_job(
- compute_market_regime,
- "interval",
- hours=24,
- id="market_regime",
- name="Market Regime",
- replace_existing=True,
- )
-
- # Backtest — weekly historical replay (expensive; mostly run on demand)
- scheduler.add_job(
- run_backtest_job,
- "interval",
- hours=168,
- id="backtest",
- name="Backtest",
- replace_existing=True,
+ run_backtest_job, "interval", hours=168,
+ id="backtest", name="Backtest", replace_existing=True,
)
logger.info(
json.dumps({
"event": "scheduler_configured",
- "jobs": {
- "data_collector": ohlcv_interval,
- "sentiment_collector": {"minutes": settings.sentiment_poll_interval_minutes},
- "fundamental_collector": fund_interval,
- "rr_scanner": rr_interval,
- "ticker_universe_sync": {"hours": 24},
- },
+ "daily_pipeline": [name for name, _ in _PIPELINE_STEPS],
+ "independent": ["ticker_universe_sync", "alerts", "backtest"],
})
)
diff --git a/app/services/admin_service.py b/app/services/admin_service.py
index a5cbe27..63545d2 100644
--- a/app/services/admin_service.py
+++ b/app/services/admin_service.py
@@ -485,6 +485,7 @@ VALID_JOB_NAMES = {
"alerts",
"market_regime",
"backtest",
+ "daily_pipeline",
}
JOB_LABELS = {
@@ -497,6 +498,17 @@ JOB_LABELS = {
"alerts": "Alerts Dispatcher",
"market_regime": "Market Regime",
"backtest": "Backtest",
+ "daily_pipeline": "Daily Pipeline",
+}
+
+# Jobs driven by the daily_pipeline (in order) rather than their own timer.
+PIPELINE_MEMBERS = {
+ "data_collector",
+ "fundamental_collector",
+ "sentiment_collector",
+ "rr_scanner",
+ "outcome_evaluator",
+ "market_regime",
}
@@ -527,6 +539,7 @@ async def list_jobs(db: AsyncSession) -> list[dict]:
"label": JOB_LABELS.get(name, name),
"enabled": enabled,
"next_run_at": next_run,
+ "via_pipeline": name in PIPELINE_MEMBERS,
"registered": job is not None,
"running": bool(runtime.get("running", False)),
"runtime_status": runtime.get("status"),
diff --git a/frontend/src/api/admin.ts b/frontend/src/api/admin.ts
index a892540..d66a2eb 100644
--- a/frontend/src/api/admin.ts
+++ b/frontend/src/api/admin.ts
@@ -162,6 +162,7 @@ export interface JobStatus {
label: string;
enabled: boolean;
next_run_at: string | null;
+ via_pipeline?: boolean;
registered: boolean;
running?: boolean;
runtime_status?: string | null;
diff --git a/frontend/src/components/admin/JobControls.tsx b/frontend/src/components/admin/JobControls.tsx
index 4faa2ee..68356b3 100644
--- a/frontend/src/components/admin/JobControls.tsx
+++ b/frontend/src/components/admin/JobControls.tsx
@@ -146,10 +146,14 @@ export function JobControls() {
? 'Active'
: 'Inactive'}
- {job.enabled && job.next_run_at && (
-
- Next run {formatNextRun(job.next_run_at)}
-
+ {job.via_pipeline ? (
+ runs in daily pipeline
+ ) : (
+ job.enabled && job.next_run_at && (
+
+ Next run {formatNextRun(job.next_run_at)}
+
+ )
)}
{!job.registered && (
Not registered
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index ad4fe6c..d82bb44 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -84,6 +84,7 @@ class TestConfigureScheduler:
"alerts",
"market_regime",
"backtest",
+ "daily_pipeline",
}
def test_configure_is_idempotent(self):
@@ -95,6 +96,7 @@ class TestConfigureScheduler:
assert sorted(job_ids) == sorted([
"alerts",
"backtest",
+ "daily_pipeline",
"data_collector",
"fundamental_collector",
"market_regime",