redesign activation gate to expected value + make pipelines cron-configurable
Diagnosing "no qualified signals for 5 days": setups were generated but none qualified. The gate required BOTH a high min_rr (2.0) AND a high min_target_probability (60), which became contradictory after the Jun-15 probability recalibration — probability already embeds R:R via the 1/(rr+1) ruin term, so high-R:R targets are inherently low-probability and nothing cleared both. Gate is now expected value (R): p*rr - (1-p) from the primary target's probability. R:R and confidence stay as floors; high-conviction / exclude-conflicts / min-target-probability become optional tighteners (default off). Defaults: min_expected_value=0.15, min_rr=1.2, min_confidence=55. EV is only enforced when computable. Migration 009 clears stored activation_* rows so the new defaults apply. Backtest sweeps min_expected_value instead of target probability. Scheduling: pipelines are now cron-configurable in Admin -> Jobs. daily_pipeline (full, default 0 7 * * *) plus a new light intraday_pipeline (OHLCV + outcome eval, default hourly US session) that keeps prices/live-R:R current without setup churn. Fundamentals on its own early weekly cron. Timezone configurable (default Europe/Berlin). Moving interval->CronTrigger also fixes the restart-deferral bug where an interval job's countdown resets on every process restart. 319 backend unit tests pass; frontend tsc clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
+149
-26
@@ -19,6 +19,7 @@ import asyncio
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from sqlalchemy import case, func, or_, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
@@ -168,6 +169,17 @@ _job_runtime: dict[str, dict[str, object]] = {
|
||||
"finished_at": None,
|
||||
"message": None,
|
||||
},
|
||||
"intraday_pipeline": {
|
||||
"running": False,
|
||||
"status": "idle",
|
||||
"processed": 0,
|
||||
"total": None,
|
||||
"progress_pct": None,
|
||||
"current_ticker": None,
|
||||
"started_at": None,
|
||||
"finished_at": None,
|
||||
"message": None,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -1000,7 +1012,9 @@ async def sync_ticker_universe() -> None:
|
||||
# Steps run in dependency order: each uses fresh output from the previous one.
|
||||
# (name, coroutine) — the names match the individual jobs so each step still
|
||||
# updates its own runtime status while the pipeline runs.
|
||||
_PIPELINE_STEPS = [
|
||||
#
|
||||
# Daily (full): the complete data→signal refresh, once a day.
|
||||
_DAILY_PIPELINE_STEPS = [
|
||||
("data_collector", "collect_ohlcv"),
|
||||
("sentiment_collector", "collect_sentiment"),
|
||||
("rr_scanner", "scan_rr"),
|
||||
@@ -1008,28 +1022,41 @@ _PIPELINE_STEPS = [
|
||||
("market_regime", "compute_market_regime"),
|
||||
]
|
||||
|
||||
# Intraday (light): keep prices current and resolve outcomes through the day,
|
||||
# without the expensive scan/sentiment. The dashboard recomputes live R:R from
|
||||
# the latest price, so refreshing OHLCV is enough to stop prices lagging; the
|
||||
# outcome step also closes paper trades that hit their stop/target intraday.
|
||||
_INTRADAY_PIPELINE_STEPS = [
|
||||
("data_collector", "collect_ohlcv"),
|
||||
("outcome_evaluator", "evaluate_outcomes"),
|
||||
]
|
||||
|
||||
async def run_daily_pipeline() -> None:
|
||||
"""Run the daily data→signal flow in dependency order.
|
||||
|
||||
OHLCV → fundamentals → sentiment → R:R scan → outcome eval (+paper close) →
|
||||
market regime. Each step respects its own enable flag and manages its own
|
||||
runtime status; a failing step is logged and the pipeline continues.
|
||||
async def _run_pipeline(job_name: str, steps: list[tuple[str, str]]) -> None:
|
||||
"""Run an ordered list of (step_name, coroutine_name) steps.
|
||||
|
||||
Each step respects its own enable flag and manages its own runtime status; a
|
||||
failing step is logged and the pipeline continues with the next one.
|
||||
"""
|
||||
job_name = "daily_pipeline"
|
||||
logger.info(json.dumps({"event": "job_start", "job": job_name}))
|
||||
total = len(_PIPELINE_STEPS)
|
||||
async with async_session_factory() as db:
|
||||
if not await _is_job_enabled(db, job_name):
|
||||
logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"}))
|
||||
_runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled")
|
||||
return
|
||||
|
||||
total = len(steps)
|
||||
_runtime_start(job_name, total=total)
|
||||
|
||||
funcs = globals()
|
||||
done = 0
|
||||
try:
|
||||
for step_name, func_name in _PIPELINE_STEPS:
|
||||
for step_name, func_name in steps:
|
||||
_runtime_progress(job_name, processed=done, total=total, current_ticker=step_name)
|
||||
try:
|
||||
await funcs[func_name]()
|
||||
except Exception:
|
||||
logger.exception("Daily pipeline step %s failed", step_name)
|
||||
logger.exception("%s step %s failed", job_name, step_name)
|
||||
done += 1
|
||||
_runtime_finish(job_name, "completed", processed=done, total=total, message="Pipeline complete")
|
||||
logger.info(json.dumps({"event": "job_complete", "job": job_name}))
|
||||
@@ -1041,6 +1068,17 @@ async def run_daily_pipeline() -> None:
|
||||
}))
|
||||
|
||||
|
||||
async def run_daily_pipeline() -> None:
|
||||
"""Full daily flow: OHLCV → sentiment → R:R scan → outcome eval (+paper
|
||||
close) → market regime."""
|
||||
await _run_pipeline("daily_pipeline", _DAILY_PIPELINE_STEPS)
|
||||
|
||||
|
||||
async def run_intraday_pipeline() -> None:
|
||||
"""Light intraday flow: refresh OHLCV → evaluate outcomes (+paper close)."""
|
||||
await _run_pipeline("intraday_pipeline", _INTRADAY_PIPELINE_STEPS)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Frequency helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -1057,22 +1095,91 @@ def _parse_frequency(freq: str) -> dict[str, int]:
|
||||
return _FREQUENCY_MAP.get(freq.lower(), {"hours": 24})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Schedule config (cron, admin-configurable)
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# The cron-driven jobs read their schedule from SystemSettings so it can be
|
||||
# tuned from Admin → Jobs without a redeploy. A wall-clock CronTrigger also fixes
|
||||
# the interval-trigger pitfall: an interval job resets its countdown to now+N on
|
||||
# every process restart, so on a box that's redeployed often it can keep being
|
||||
# deferred and never fire. Cron fires at a fixed local time regardless.
|
||||
|
||||
SCHEDULE_DEFAULTS: dict[str, str] = {
|
||||
"schedule_timezone": "Europe/Berlin",
|
||||
"schedule_daily_pipeline_cron": "0 7 * * *", # full refresh, ready by ~8am
|
||||
"schedule_intraday_pipeline_cron": "0 14-22 * * 1-5", # hourly across the US session
|
||||
"schedule_fundamentals_cron": "0 4 * * 1", # weekly, early Monday (slow job)
|
||||
}
|
||||
|
||||
# job id -> schedule setting key
|
||||
_CRON_JOBS: dict[str, str] = {
|
||||
"daily_pipeline": "schedule_daily_pipeline_cron",
|
||||
"intraday_pipeline": "schedule_intraday_pipeline_cron",
|
||||
"fundamental_collector": "schedule_fundamentals_cron",
|
||||
}
|
||||
|
||||
|
||||
def validate_cron(expr: str, timezone: str) -> None:
|
||||
"""Raise ValueError if the cron expression or timezone is invalid."""
|
||||
CronTrigger.from_crontab((expr or "").strip(), timezone=(timezone or "").strip())
|
||||
|
||||
|
||||
def _cron_trigger(expr: str, timezone: str, fallback_key: str) -> CronTrigger:
|
||||
"""Build a CronTrigger, falling back to the default (UTC) on a bad value."""
|
||||
try:
|
||||
return CronTrigger.from_crontab(expr.strip(), timezone=timezone.strip())
|
||||
except Exception:
|
||||
logger.warning(json.dumps({
|
||||
"event": "invalid_cron", "expr": expr, "timezone": timezone,
|
||||
"fallback": SCHEDULE_DEFAULTS[fallback_key],
|
||||
}))
|
||||
return CronTrigger.from_crontab(SCHEDULE_DEFAULTS[fallback_key], timezone="UTC")
|
||||
|
||||
|
||||
async def load_schedule_config(db: AsyncSession) -> dict[str, str]:
|
||||
"""Read the cron schedule config from SystemSettings, defaults for any unset."""
|
||||
result = await db.execute(
|
||||
select(SystemSetting).where(SystemSetting.key.in_(list(SCHEDULE_DEFAULTS)))
|
||||
)
|
||||
stored = {s.key: s.value for s in result.scalars().all()}
|
||||
return {key: (stored.get(key) or default) for key, default in SCHEDULE_DEFAULTS.items()}
|
||||
|
||||
|
||||
def reschedule_jobs(schedule_config: dict[str, str]) -> dict[str, str]:
|
||||
"""Re-apply cron triggers to the running scheduler after a settings change."""
|
||||
tz = schedule_config.get("schedule_timezone") or SCHEDULE_DEFAULTS["schedule_timezone"]
|
||||
applied: dict[str, str] = {}
|
||||
for job_id, key in _CRON_JOBS.items():
|
||||
if scheduler.get_job(job_id) is None:
|
||||
continue
|
||||
expr = schedule_config.get(key) or SCHEDULE_DEFAULTS[key]
|
||||
scheduler.reschedule_job(job_id, trigger=_cron_trigger(expr, tz, key))
|
||||
applied[job_id] = expr
|
||||
logger.info(json.dumps({"event": "jobs_rescheduled", "applied": applied, "timezone": tz}))
|
||||
return applied
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Scheduler setup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def configure_scheduler() -> None:
|
||||
"""Add all jobs to the scheduler with configured intervals.
|
||||
def configure_scheduler(schedule_config: dict[str, str] | None = None) -> None:
|
||||
"""Add all jobs to the scheduler.
|
||||
|
||||
Call this once before scheduler.start(). Removes any existing jobs first
|
||||
to ensure idempotency.
|
||||
Call this once before scheduler.start(). Removes any existing jobs first to
|
||||
ensure idempotency. ``schedule_config`` supplies the cron strings + timezone
|
||||
for the cron-driven jobs (daily/intraday pipelines, fundamentals); defaults
|
||||
are used for anything missing.
|
||||
"""
|
||||
cfg = {**SCHEDULE_DEFAULTS, **(schedule_config or {})}
|
||||
tz = cfg["schedule_timezone"]
|
||||
scheduler.remove_all_jobs()
|
||||
|
||||
# Pipeline members: registered but PAUSED (next_run_time=None) so they never
|
||||
# auto-fire on their own timer — the daily_pipeline drives them in order. The
|
||||
# long interval is just a backstop after a manual trigger (which re-arms an
|
||||
# auto-fire on their own timer — the pipelines drive them in order. The long
|
||||
# interval is just a backstop after a manual trigger (which re-arms an
|
||||
# interval job). They stay manually triggerable from Admin → Jobs.
|
||||
_members = [
|
||||
(collect_ohlcv, "data_collector", "Data Collector (OHLCV)"),
|
||||
@@ -1087,23 +1194,30 @@ def configure_scheduler() -> None:
|
||||
replace_existing=True, next_run_time=None,
|
||||
)
|
||||
|
||||
# Daily Pipeline — the single ordered daily flow
|
||||
# Cron-driven jobs (admin-configurable times)
|
||||
scheduler.add_job(
|
||||
run_daily_pipeline, "interval", hours=24,
|
||||
run_daily_pipeline,
|
||||
_cron_trigger(cfg["schedule_daily_pipeline_cron"], tz, "schedule_daily_pipeline_cron"),
|
||||
id="daily_pipeline", name="Daily Pipeline", replace_existing=True,
|
||||
)
|
||||
scheduler.add_job(
|
||||
run_intraday_pipeline,
|
||||
_cron_trigger(cfg["schedule_intraday_pipeline_cron"], tz, "schedule_intraday_pipeline_cron"),
|
||||
id="intraday_pipeline", name="Intraday Pipeline", replace_existing=True,
|
||||
)
|
||||
# Fundamentals — quarterly-ish data; weekly by default (conserves API quota).
|
||||
# Its own early cron so the slow, rate-limited fetch finishes before the day.
|
||||
scheduler.add_job(
|
||||
collect_fundamentals,
|
||||
_cron_trigger(cfg["schedule_fundamentals_cron"], tz, "schedule_fundamentals_cron"),
|
||||
id="fundamental_collector", name="Fundamental Collector", replace_existing=True,
|
||||
)
|
||||
|
||||
# Independent jobs (own cadence, no ordering dependency)
|
||||
# Independent interval jobs (own cadence, no ordering dependency)
|
||||
scheduler.add_job(
|
||||
sync_ticker_universe, "interval", hours=24,
|
||||
id="ticker_universe_sync", name="Ticker Universe Sync", replace_existing=True,
|
||||
)
|
||||
# Fundamentals — quarterly-ish data; weekly by default (conserves API quota)
|
||||
fund_interval = _parse_frequency(settings.fundamental_fetch_frequency)
|
||||
scheduler.add_job(
|
||||
collect_fundamentals, "interval", **fund_interval,
|
||||
id="fundamental_collector", name="Fundamental Collector", replace_existing=True,
|
||||
)
|
||||
alerts_interval = _parse_frequency(settings.alerts_frequency)
|
||||
scheduler.add_job(
|
||||
dispatch_alerts_job, "interval", **alerts_interval,
|
||||
@@ -1117,7 +1231,16 @@ def configure_scheduler() -> None:
|
||||
logger.info(
|
||||
json.dumps({
|
||||
"event": "scheduler_configured",
|
||||
"daily_pipeline": [name for name, _ in _PIPELINE_STEPS],
|
||||
"timezone": tz,
|
||||
"daily_pipeline": {
|
||||
"cron": cfg["schedule_daily_pipeline_cron"],
|
||||
"steps": [name for name, _ in _DAILY_PIPELINE_STEPS],
|
||||
},
|
||||
"intraday_pipeline": {
|
||||
"cron": cfg["schedule_intraday_pipeline_cron"],
|
||||
"steps": [name for name, _ in _INTRADAY_PIPELINE_STEPS],
|
||||
},
|
||||
"fundamental_collector": {"cron": cfg["schedule_fundamentals_cron"]},
|
||||
"independent": ["ticker_universe_sync", "alerts", "backtest"],
|
||||
})
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user