diff --git a/alembic/versions/009_reset_activation_gate.py b/alembic/versions/009_reset_activation_gate.py new file mode 100644 index 0000000..00c48aa --- /dev/null +++ b/alembic/versions/009_reset_activation_gate.py @@ -0,0 +1,40 @@ +"""reset activation gate settings for the EV-based redesign + +The activation gate was redesigned around expected value (R): the core test is +now ``min_expected_value`` instead of the old, self-contradicting pair of a high +``min_rr`` AND a high ``min_target_probability`` (with the post-recalibration +probability model the two could not be satisfied together, so nothing qualified). +The conviction / conflict toggles are now optional tighteners that default off. + +Stored ``activation_*`` rows from the old gate no longer map cleanly onto the new +one, so they are cleared here and the redesigned code defaults take effect. Re-tune +in Admin → Activation (validated against the Track Record's backtest EV sweep). + +Revision ID: 009 +Revises: 008 +Create Date: 2026-06-23 00:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "009" +down_revision: Union[str, None] = "008" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute( + sa.text("DELETE FROM system_settings WHERE key LIKE 'activation\\_%' ESCAPE '\\'") + ) + + +def downgrade() -> None: + # One-way data reset — the old per-key values aren't recoverable. Code defaults + # apply until re-tuned, so there is nothing to restore. + pass diff --git a/app/main.py b/app/main.py index 0463a1a..9bc5f8f 100644 --- a/app/main.py +++ b/app/main.py @@ -68,7 +68,7 @@ from app.config import settings from app.database import async_session_factory, engine from app.middleware import register_exception_handlers from app.models.user import User -from app.scheduler import configure_scheduler, scheduler +from app.scheduler import configure_scheduler, load_schedule_config, scheduler from app.routers.admin import router as admin_router from app.routers.auth import router as auth_router from app.routers.health import router as health_router @@ -128,8 +128,9 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: async with async_session_factory() as session: await _create_default_admin(session) + schedule_config = await load_schedule_config(session) - configure_scheduler() + configure_scheduler(schedule_config) scheduler.start() logger.info("Scheduler started") diff --git a/app/routers/admin.py b/app/routers/admin.py index 600ec1c..b2a0ddd 100644 --- a/app/routers/admin.py +++ b/app/routers/admin.py @@ -15,6 +15,7 @@ from app.schemas.admin import ( DataCleanupRequest, JobToggle, RecommendationConfigUpdate, + ScheduleConfigUpdate, SentimentConfigUpdate, SentimentTestRequest, PasswordReset, @@ -176,6 +177,28 @@ async def update_activation_settings( return APIEnvelope(status="success", data=updated) +@router.get("/admin/settings/schedule", response_model=APIEnvelope) +async def get_schedule_settings( + _admin: User = Depends(require_admin), + db: AsyncSession = Depends(get_db), +): + config = await admin_service.get_schedule_config(db) + return APIEnvelope(status="success", data=config) + + +@router.put("/admin/settings/schedule", response_model=APIEnvelope) +async def update_schedule_settings( + body: ScheduleConfigUpdate, + _admin: User = Depends(require_admin), + db: AsyncSession = Depends(get_db), +): + updated = await admin_service.update_schedule_config( + db, + body.model_dump(exclude_unset=True, exclude_none=True), + ) + return APIEnvelope(status="success", data=updated) + + @router.get("/admin/settings/sentiment", response_model=APIEnvelope) async def get_sentiment_settings( _admin: User = Depends(require_admin), diff --git a/app/scheduler.py b/app/scheduler.py index c3a4418..a7ebe35 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -19,6 +19,7 @@ import asyncio from datetime import date, datetime, timedelta, timezone from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger from sqlalchemy import case, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession @@ -168,6 +169,17 @@ _job_runtime: dict[str, dict[str, object]] = { "finished_at": None, "message": None, }, + "intraday_pipeline": { + "running": False, + "status": "idle", + "processed": 0, + "total": None, + "progress_pct": None, + "current_ticker": None, + "started_at": None, + "finished_at": None, + "message": None, + }, } @@ -1000,7 +1012,9 @@ async def sync_ticker_universe() -> None: # Steps run in dependency order: each uses fresh output from the previous one. # (name, coroutine) — the names match the individual jobs so each step still # updates its own runtime status while the pipeline runs. -_PIPELINE_STEPS = [ +# +# Daily (full): the complete data→signal refresh, once a day. +_DAILY_PIPELINE_STEPS = [ ("data_collector", "collect_ohlcv"), ("sentiment_collector", "collect_sentiment"), ("rr_scanner", "scan_rr"), @@ -1008,28 +1022,41 @@ _PIPELINE_STEPS = [ ("market_regime", "compute_market_regime"), ] +# Intraday (light): keep prices current and resolve outcomes through the day, +# without the expensive scan/sentiment. The dashboard recomputes live R:R from +# the latest price, so refreshing OHLCV is enough to stop prices lagging; the +# outcome step also closes paper trades that hit their stop/target intraday. +_INTRADAY_PIPELINE_STEPS = [ + ("data_collector", "collect_ohlcv"), + ("outcome_evaluator", "evaluate_outcomes"), +] -async def run_daily_pipeline() -> None: - """Run the daily data→signal flow in dependency order. - OHLCV → fundamentals → sentiment → R:R scan → outcome eval (+paper close) → - market regime. Each step respects its own enable flag and manages its own - runtime status; a failing step is logged and the pipeline continues. +async def _run_pipeline(job_name: str, steps: list[tuple[str, str]]) -> None: + """Run an ordered list of (step_name, coroutine_name) steps. + + Each step respects its own enable flag and manages its own runtime status; a + failing step is logged and the pipeline continues with the next one. """ - job_name = "daily_pipeline" logger.info(json.dumps({"event": "job_start", "job": job_name})) - total = len(_PIPELINE_STEPS) + async with async_session_factory() as db: + if not await _is_job_enabled(db, job_name): + logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") + return + + total = len(steps) _runtime_start(job_name, total=total) funcs = globals() done = 0 try: - for step_name, func_name in _PIPELINE_STEPS: + for step_name, func_name in steps: _runtime_progress(job_name, processed=done, total=total, current_ticker=step_name) try: await funcs[func_name]() except Exception: - logger.exception("Daily pipeline step %s failed", step_name) + logger.exception("%s step %s failed", job_name, step_name) done += 1 _runtime_finish(job_name, "completed", processed=done, total=total, message="Pipeline complete") logger.info(json.dumps({"event": "job_complete", "job": job_name})) @@ -1041,6 +1068,17 @@ async def run_daily_pipeline() -> None: })) +async def run_daily_pipeline() -> None: + """Full daily flow: OHLCV → sentiment → R:R scan → outcome eval (+paper + close) → market regime.""" + await _run_pipeline("daily_pipeline", _DAILY_PIPELINE_STEPS) + + +async def run_intraday_pipeline() -> None: + """Light intraday flow: refresh OHLCV → evaluate outcomes (+paper close).""" + await _run_pipeline("intraday_pipeline", _INTRADAY_PIPELINE_STEPS) + + # --------------------------------------------------------------------------- # Frequency helpers # --------------------------------------------------------------------------- @@ -1057,22 +1095,91 @@ def _parse_frequency(freq: str) -> dict[str, int]: return _FREQUENCY_MAP.get(freq.lower(), {"hours": 24}) +# --------------------------------------------------------------------------- +# Schedule config (cron, admin-configurable) +# --------------------------------------------------------------------------- +# +# The cron-driven jobs read their schedule from SystemSettings so it can be +# tuned from Admin → Jobs without a redeploy. A wall-clock CronTrigger also fixes +# the interval-trigger pitfall: an interval job resets its countdown to now+N on +# every process restart, so on a box that's redeployed often it can keep being +# deferred and never fire. Cron fires at a fixed local time regardless. + +SCHEDULE_DEFAULTS: dict[str, str] = { + "schedule_timezone": "Europe/Berlin", + "schedule_daily_pipeline_cron": "0 7 * * *", # full refresh, ready by ~8am + "schedule_intraday_pipeline_cron": "0 14-22 * * 1-5", # hourly across the US session + "schedule_fundamentals_cron": "0 4 * * 1", # weekly, early Monday (slow job) +} + +# job id -> schedule setting key +_CRON_JOBS: dict[str, str] = { + "daily_pipeline": "schedule_daily_pipeline_cron", + "intraday_pipeline": "schedule_intraday_pipeline_cron", + "fundamental_collector": "schedule_fundamentals_cron", +} + + +def validate_cron(expr: str, timezone: str) -> None: + """Raise ValueError if the cron expression or timezone is invalid.""" + CronTrigger.from_crontab((expr or "").strip(), timezone=(timezone or "").strip()) + + +def _cron_trigger(expr: str, timezone: str, fallback_key: str) -> CronTrigger: + """Build a CronTrigger, falling back to the default (UTC) on a bad value.""" + try: + return CronTrigger.from_crontab(expr.strip(), timezone=timezone.strip()) + except Exception: + logger.warning(json.dumps({ + "event": "invalid_cron", "expr": expr, "timezone": timezone, + "fallback": SCHEDULE_DEFAULTS[fallback_key], + })) + return CronTrigger.from_crontab(SCHEDULE_DEFAULTS[fallback_key], timezone="UTC") + + +async def load_schedule_config(db: AsyncSession) -> dict[str, str]: + """Read the cron schedule config from SystemSettings, defaults for any unset.""" + result = await db.execute( + select(SystemSetting).where(SystemSetting.key.in_(list(SCHEDULE_DEFAULTS))) + ) + stored = {s.key: s.value for s in result.scalars().all()} + return {key: (stored.get(key) or default) for key, default in SCHEDULE_DEFAULTS.items()} + + +def reschedule_jobs(schedule_config: dict[str, str]) -> dict[str, str]: + """Re-apply cron triggers to the running scheduler after a settings change.""" + tz = schedule_config.get("schedule_timezone") or SCHEDULE_DEFAULTS["schedule_timezone"] + applied: dict[str, str] = {} + for job_id, key in _CRON_JOBS.items(): + if scheduler.get_job(job_id) is None: + continue + expr = schedule_config.get(key) or SCHEDULE_DEFAULTS[key] + scheduler.reschedule_job(job_id, trigger=_cron_trigger(expr, tz, key)) + applied[job_id] = expr + logger.info(json.dumps({"event": "jobs_rescheduled", "applied": applied, "timezone": tz})) + return applied + + # --------------------------------------------------------------------------- # Scheduler setup # --------------------------------------------------------------------------- -def configure_scheduler() -> None: - """Add all jobs to the scheduler with configured intervals. +def configure_scheduler(schedule_config: dict[str, str] | None = None) -> None: + """Add all jobs to the scheduler. - Call this once before scheduler.start(). Removes any existing jobs first - to ensure idempotency. + Call this once before scheduler.start(). Removes any existing jobs first to + ensure idempotency. ``schedule_config`` supplies the cron strings + timezone + for the cron-driven jobs (daily/intraday pipelines, fundamentals); defaults + are used for anything missing. """ + cfg = {**SCHEDULE_DEFAULTS, **(schedule_config or {})} + tz = cfg["schedule_timezone"] scheduler.remove_all_jobs() # Pipeline members: registered but PAUSED (next_run_time=None) so they never - # auto-fire on their own timer — the daily_pipeline drives them in order. The - # long interval is just a backstop after a manual trigger (which re-arms an + # auto-fire on their own timer — the pipelines drive them in order. The long + # interval is just a backstop after a manual trigger (which re-arms an # interval job). They stay manually triggerable from Admin → Jobs. _members = [ (collect_ohlcv, "data_collector", "Data Collector (OHLCV)"), @@ -1087,23 +1194,30 @@ def configure_scheduler() -> None: replace_existing=True, next_run_time=None, ) - # Daily Pipeline — the single ordered daily flow + # Cron-driven jobs (admin-configurable times) scheduler.add_job( - run_daily_pipeline, "interval", hours=24, + run_daily_pipeline, + _cron_trigger(cfg["schedule_daily_pipeline_cron"], tz, "schedule_daily_pipeline_cron"), id="daily_pipeline", name="Daily Pipeline", replace_existing=True, ) + scheduler.add_job( + run_intraday_pipeline, + _cron_trigger(cfg["schedule_intraday_pipeline_cron"], tz, "schedule_intraday_pipeline_cron"), + id="intraday_pipeline", name="Intraday Pipeline", replace_existing=True, + ) + # Fundamentals — quarterly-ish data; weekly by default (conserves API quota). + # Its own early cron so the slow, rate-limited fetch finishes before the day. + scheduler.add_job( + collect_fundamentals, + _cron_trigger(cfg["schedule_fundamentals_cron"], tz, "schedule_fundamentals_cron"), + id="fundamental_collector", name="Fundamental Collector", replace_existing=True, + ) - # Independent jobs (own cadence, no ordering dependency) + # Independent interval jobs (own cadence, no ordering dependency) scheduler.add_job( sync_ticker_universe, "interval", hours=24, id="ticker_universe_sync", name="Ticker Universe Sync", replace_existing=True, ) - # Fundamentals — quarterly-ish data; weekly by default (conserves API quota) - fund_interval = _parse_frequency(settings.fundamental_fetch_frequency) - scheduler.add_job( - collect_fundamentals, "interval", **fund_interval, - id="fundamental_collector", name="Fundamental Collector", replace_existing=True, - ) alerts_interval = _parse_frequency(settings.alerts_frequency) scheduler.add_job( dispatch_alerts_job, "interval", **alerts_interval, @@ -1117,7 +1231,16 @@ def configure_scheduler() -> None: logger.info( json.dumps({ "event": "scheduler_configured", - "daily_pipeline": [name for name, _ in _PIPELINE_STEPS], + "timezone": tz, + "daily_pipeline": { + "cron": cfg["schedule_daily_pipeline_cron"], + "steps": [name for name, _ in _DAILY_PIPELINE_STEPS], + }, + "intraday_pipeline": { + "cron": cfg["schedule_intraday_pipeline_cron"], + "steps": [name for name, _ in _INTRADAY_PIPELINE_STEPS], + }, + "fundamental_collector": {"cron": cfg["schedule_fundamentals_cron"]}, "independent": ["ticker_universe_sync", "alerts", "backtest"], }) ) diff --git a/app/schemas/admin.py b/app/schemas/admin.py index f6869c9..1bd5a3f 100644 --- a/app/schemas/admin.py +++ b/app/schemas/admin.py @@ -59,6 +59,7 @@ class TickerUniverseUpdate(BaseModel): class ActivationConfigUpdate(BaseModel): """Activation gate: what counts as an actionable signal.""" + min_expected_value: float | None = Field(default=None, ge=-1, le=10) min_rr: float | None = Field(default=None, ge=0) min_confidence: float | None = Field(default=None, ge=0, le=100) min_target_probability: float | None = Field(default=None, ge=0, le=100) @@ -66,6 +67,15 @@ class ActivationConfigUpdate(BaseModel): exclude_conflicts: bool | None = None +class ScheduleConfigUpdate(BaseModel): + """Cron schedule for the pipelines + fundamentals. Crons are 5-field + (min hour dom month dow); timezone is an IANA name (e.g. Europe/Berlin).""" + schedule_timezone: str | None = Field(default=None, max_length=64) + schedule_daily_pipeline_cron: str | None = Field(default=None, max_length=120) + schedule_intraday_pipeline_cron: str | None = Field(default=None, max_length=120) + schedule_fundamentals_cron: str | None = Field(default=None, max_length=120) + + class SentimentConfigUpdate(BaseModel): """Runtime sentiment LLM config. api_key is write-only; omit/empty to keep the stored key.""" diff --git a/app/services/admin_service.py b/app/services/admin_service.py index b71b71a..e8a5049 100644 --- a/app/services/admin_service.py +++ b/app/services/admin_service.py @@ -1,5 +1,6 @@ """Admin service: user management, system settings, data cleanup, job control.""" +import logging from datetime import datetime, timedelta, timezone from passlib.hash import bcrypt @@ -17,6 +18,8 @@ from app.models.ticker import Ticker from app.models.trade_setup import TradeSetup from app.models.user import User +logger = logging.getLogger(__name__) + RECOMMENDATION_CONFIG_DEFAULTS: dict[str, float] = { "recommendation_high_confidence_threshold": 70.0, "recommendation_moderate_confidence_threshold": 50.0, @@ -35,10 +38,12 @@ SUPPORTED_TICKER_UNIVERSES = {"sp500", "nasdaq100", "nasdaq_all"} # Track Record's qualified stats. The outcome evaluator deliberately ignores # these — every setup is evaluated so the gate itself can be validated. # -# Beyond raw R:R and confidence, the gate demands conviction: a high-conviction -# action (LONG_HIGH / SHORT_HIGH), a clean read (risk Low / no conflicts), and a -# probable primary target. +# The core test is expected value (in R): probability-weighted asymmetry, so a +# fat-but-improbable target and a likely-but-thin one are both rejected. R:R and +# confidence are floors; high-conviction / clean-read / target-probability are +# optional tighteners (off by default — turn on to be more selective). _ACTIVATION_FLOAT_KEYS: dict[str, str] = { + "min_expected_value": "activation_min_expected_value", "min_rr": "activation_min_rr", "min_confidence": "activation_min_confidence", "min_target_probability": "activation_min_target_probability", @@ -48,11 +53,12 @@ _ACTIVATION_BOOL_KEYS: dict[str, str] = { "exclude_conflicts": "activation_exclude_conflicts", } ACTIVATION_DEFAULTS: dict[str, float | bool] = { - "min_rr": 2.0, - "min_confidence": 70.0, - "min_target_probability": 60.0, - "require_high_conviction": True, - "exclude_conflicts": True, + "min_expected_value": 0.15, + "min_rr": 1.2, + "min_confidence": 55.0, + "min_target_probability": 0.0, + "require_high_conviction": False, + "exclude_conflicts": False, } @@ -195,6 +201,8 @@ async def update_activation_config( db: AsyncSession, updates: dict[str, float | bool] ) -> dict[str, float | bool]: """Update the activation gate. Accepts public keys; only supplied keys change.""" + if "min_expected_value" in updates and not -1.0 <= updates["min_expected_value"] <= 10.0: + raise ValidationError("min_expected_value must be between -1 and 10 (R units)") if "min_rr" in updates and updates["min_rr"] < 0: raise ValidationError("min_rr must be >= 0") if "min_confidence" in updates and not 0 <= updates["min_confidence"] <= 100: @@ -212,6 +220,59 @@ async def update_activation_config( return await get_activation_config(db) +# --------------------------------------------------------------------------- +# Pipeline schedule (cron) +# --------------------------------------------------------------------------- + +async def get_schedule_config(db: AsyncSession) -> dict[str, str]: + """Cron schedule for the daily/intraday pipelines and fundamentals.""" + from app.scheduler import load_schedule_config + + return await load_schedule_config(db) + + +async def update_schedule_config( + db: AsyncSession, updates: dict[str, str] +) -> dict[str, str]: + """Validate, persist, and apply cron schedule changes to the running scheduler.""" + from app.scheduler import ( + SCHEDULE_DEFAULTS, + load_schedule_config, + reschedule_jobs, + validate_cron, + ) + + current = await load_schedule_config(db) + tz = (updates.get("schedule_timezone") or current["schedule_timezone"]).strip() + + for key, value in updates.items(): + if key not in SCHEDULE_DEFAULTS: + raise ValidationError(f"Unknown schedule key: {key}") + if key == "schedule_timezone": + # Validate the timezone against an existing cron expression. + try: + validate_cron(current["schedule_daily_pipeline_cron"], value) + except Exception as exc: + raise ValidationError(f"Invalid timezone: {value}") from exc + else: + try: + validate_cron(value, tz) + except Exception as exc: + raise ValidationError(f"Invalid cron for {key}: {value!r}") from exc + + for key, value in updates.items(): + await update_setting(db, key, str(value).strip()) + + new_config = await load_schedule_config(db) + try: + reschedule_jobs(new_config) + except Exception: + # Scheduler may not be running (e.g. unit tests) — the config is saved + # regardless and applied on next startup. + logger.warning("Could not reschedule jobs after config update", exc_info=True) + return new_config + + def _recommendation_public_to_storage_key(key: str) -> str: return f"recommendation_{key}" @@ -486,6 +547,7 @@ VALID_JOB_NAMES = { "market_regime", "backtest", "daily_pipeline", + "intraday_pipeline", } JOB_LABELS = { @@ -499,6 +561,7 @@ JOB_LABELS = { "market_regime": "Market Regime", "backtest": "Backtest", "daily_pipeline": "Daily Pipeline", + "intraday_pipeline": "Intraday Pipeline", } # Jobs driven by the daily_pipeline (in order) rather than their own timer. diff --git a/app/services/backtest_service.py b/app/services/backtest_service.py index c7a62b1..f1f5206 100644 --- a/app/services/backtest_service.py +++ b/app/services/backtest_service.py @@ -36,7 +36,11 @@ from app.services.outcome_service import ( evaluate_setup_against_bars, ) from app.services.price_service import query_ohlcv -from app.services.qualification import best_target_probability, setup_qualifies +from app.services.qualification import ( + best_target_probability, + expected_value_r, + setup_qualifies, +) from app.services.recommendation_service import ( _choose_recommended_action, _classify_by_probability, @@ -131,6 +135,10 @@ def _window_setups( primary = _select_primary_target(targets) if primary is None: continue + # Flag the primary so qualification's EV uses the primary target's + # probability (matching production's enhance_trade_setup). + for t in targets: + t["is_primary"] = t is primary per_dir[direction] = {"stop": stop, "targets": targets, "primary": primary} available = set(per_dir.keys()) @@ -160,12 +168,13 @@ def _window_setups( stop_loss=stop, entry_price=entry, ) - # meets_core = clears every gate EXCEPT target probability, so the report - # can sweep the min_target_probability threshold without re-replaying. - core_config = {**activation, "min_target_probability": 0.0} + # meets_core = clears every gate EXCEPT the expected-value floor, so the + # report can sweep the min_expected_value threshold without re-replaying. + core_config = {**activation, "min_expected_value": float("-inf")} meets_core = setup_qualifies(setup_ns, core_config) + ev = expected_value_r(setup_ns) best_prob = best_target_probability(setup_ns) - min_tp = float(activation.get("min_target_probability", 0.0)) + min_ev = float(activation.get("min_expected_value", 0.0)) out.append({ "direction": direction, "entry": entry, @@ -175,10 +184,11 @@ def _window_setups( "confidence": confidences[direction], "primary_prob": float(primary["probability"]), "best_prob": best_prob, + "ev": ev, "meets_core": meets_core, "action": action, "risk_level": risk_level, - "qualified": meets_core and best_prob >= min_tp, + "qualified": meets_core and ev is not None and ev >= min_ev, }) return out @@ -216,6 +226,7 @@ def _replay_ticker(symbol: str, records: list, config: dict, activation: dict) - "confidence": s["confidence"], "primary_prob": s["primary_prob"], "best_prob": s["best_prob"], + "ev": s["ev"], "meets_core": s["meets_core"], "qualified": s["qualified"], "outcome": outcome, @@ -288,14 +299,17 @@ async def run_backtest( longs = [c for c in qualified if c["direction"] == "long"] shorts = [c for c in qualified if c["direction"] == "short"] - # Threshold sweep: re-apply the gate at several min_target_probability values + # Threshold sweep: re-apply the gate at several min_expected_value values # (holding the other conditions fixed) so the trade-off between how many # setups qualify and their expectancy is visible without re-replaying. - current_min_tp = float(activation.get("min_target_probability", 60.0)) + current_min_ev = float(activation.get("min_expected_value", 0.15)) sweep = [] - for threshold in (60, 55, 50, 45, 40, 35, 30): - cands = [c for c in candidates if c["meets_core"] and c["best_prob"] >= threshold] - sweep.append({"min_target_probability": threshold, **_bucket_stats(cands)}) + for threshold in (0.4, 0.3, 0.25, 0.2, 0.15, 0.1, 0.05, 0.0): + cands = [ + c for c in candidates + if c["meets_core"] and c["ev"] is not None and c["ev"] >= threshold + ] + sweep.append({"min_expected_value": threshold, **_bucket_stats(cands)}) return { "generated_at": datetime.now(timezone.utc).isoformat(), @@ -310,7 +324,7 @@ async def run_backtest( "long": _bucket_stats(longs), "short": _bucket_stats(shorts), }, - "min_target_probability": current_min_tp, + "min_expected_value": current_min_ev, "sweep": sweep, "calibration": _calibration(candidates), "note": ( diff --git a/app/services/qualification.py b/app/services/qualification.py index 8413f25..292fe21 100644 --- a/app/services/qualification.py +++ b/app/services/qualification.py @@ -1,9 +1,11 @@ """Shared definition of a 'qualified' (actionable) trade setup. A single predicate, driven by the admin activation config, used by the -performance stats (server) and mirrored on the frontend. Beyond raw R:R and -confidence, an actionable setup must show genuine conviction: a high-conviction -recommended action, a clean (conflict-free) read, and a probable primary target. +performance stats (server) and mirrored on the frontend. The core gate is +expected value (in R): a setup must promise positive, probability-weighted +asymmetry, not just a fat-but-improbable target or a likely-but-thin one. R:R +and confidence remain as floors, and conviction/conflict/target-probability +survive as optional tighteners (off by default). """ from __future__ import annotations @@ -20,6 +22,37 @@ def best_target_probability(setup: Any) -> float: return max(probs, default=0.0) +def primary_target_probability(setup: Any) -> float | None: + """Probability of the starred primary target (the one the headline R:R refers + to). Falls back to the best target's probability when none is flagged primary, + and None when there are no targets at all (probability unknowable). + """ + targets = getattr(setup, "targets", None) or [] + primary = next( + (t for t in targets if isinstance(t, dict) and t.get("is_primary")), None + ) + if primary is not None: + return float(primary.get("probability", 0.0)) + probs = [float(t.get("probability", 0.0)) for t in targets if isinstance(t, dict)] + return max(probs) if probs else None + + +def expected_value_r(setup: Any) -> float | None: + """Expected value per unit of risk, in R: ``p·(R:R) − (1 − p)``. + + ``p`` is the primary target's hit probability. This single number captures + "is this worth taking": it rewards both a good payoff ratio and a likely + target, so a fat-but-improbable target can't outrank a solid, probable one — + and a high R:R no longer fights a high probability the way the old separate + gates did. Returns None when no target probability is known. + """ + p = primary_target_probability(setup) + if p is None: + return None + p = p / 100.0 + return p * setup.rr_ratio - (1.0 - p) + + def live_risk_reward(setup: Any, current_price: float) -> float | None: """R:R recomputed from the CURRENT price, not the (possibly stale) entry. @@ -43,6 +76,11 @@ def setup_qualifies(setup: Any, config: dict) -> bool: ``setup`` is duck-typed: any object exposing rr_ratio, confidence_score, recommended_action, risk_level and a ``targets`` list of dicts. + + Gate order: R:R floor → freshness (live R:R) → confidence floor → expected + value (the core test) → optional conviction / conflict / target-probability + tighteners. ``min_expected_value`` defaults to -inf for callers that pass a + legacy config without the key, so they behave exactly as before. """ if setup.rr_ratio < config["min_rr"]: return False @@ -56,6 +94,13 @@ def setup_qualifies(setup: Any, config: dict) -> bool: return False if (setup.confidence_score or 0.0) < config["min_confidence"]: return False + # Expected value (R): the core gate. Only enforced when computable — setups + # without target probabilities (e.g. legacy historical rows) defer to the + # R:R + confidence floors above rather than being silently dropped. + min_ev = float(config.get("min_expected_value", float("-inf"))) + ev = expected_value_r(setup) + if ev is not None and ev < min_ev: + return False if config.get("require_high_conviction"): if (setup.recommended_action or "") not in HIGH_CONVICTION_ACTIONS: return False diff --git a/frontend/src/api/admin.ts b/frontend/src/api/admin.ts index ec4a9a3..88f4b02 100644 --- a/frontend/src/api/admin.ts +++ b/frontend/src/api/admin.ts @@ -6,6 +6,7 @@ import type { AlertTestResult, PipelineReadiness, RecommendationConfig, + ScheduleConfig, SentimentProviderConfig, SentimentTestResult, SystemSetting, @@ -85,6 +86,18 @@ export function updateActivationSettings(payload: Partial) { .then((r) => r.data); } +export function getScheduleSettings() { + return apiClient + .get('admin/settings/schedule') + .then((r) => r.data); +} + +export function updateScheduleSettings(payload: Partial) { + return apiClient + .put('admin/settings/schedule', payload) + .then((r) => r.data); +} + export function getSentimentSettings() { return apiClient .get('admin/settings/sentiment') diff --git a/frontend/src/components/admin/ActivationSettings.tsx b/frontend/src/components/admin/ActivationSettings.tsx index 4ac21d6..25d57a5 100644 --- a/frontend/src/components/admin/ActivationSettings.tsx +++ b/frontend/src/components/admin/ActivationSettings.tsx @@ -4,11 +4,12 @@ import { useActivationSettings, useUpdateActivationSettings } from '../../hooks/ import { SkeletonTable } from '../ui/Skeleton'; const DEFAULTS: ActivationConfig = { - min_rr: 2, - min_confidence: 70, - min_target_probability: 60, - require_high_conviction: true, - exclude_conflicts: true, + min_expected_value: 0.15, + min_rr: 1.2, + min_confidence: 55, + min_target_probability: 0, + require_high_conviction: false, + exclude_conflicts: false, }; export function ActivationSettings() { @@ -39,13 +40,27 @@ export function ActivationSettings() {

Activation Gate

What counts as a signal worth acting on. Drives the Dashboard's "Qualified" metric, the - Signals "Qualified only" view, and the Track Record's qualified stats. All setups are - still evaluated regardless — tighten the gate, then watch qualified expectancy in the - Track Record to find what actually wins. + Signals "Qualified only" view, and the Track Record's qualified stats. The core test is + expected value — probability-weighted asymmetry — + so R:R and target probability no longer fight each other. All setups are still evaluated + regardless; tune the EV floor against the Track Record's EV sweep to see what actually wins.

+ -
-
-