From 6df67ad7ae779b79399586ba6764401c9aa87e1a Mon Sep 17 00:00:00 2001 From: Dennis Thiessen Date: Mon, 15 Jun 2026 20:14:07 +0200 Subject: [PATCH] add backtest harness (Phase 1): historical replay + hit-rate & calibration reports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replays the price-derived engine over stored OHLCV: at each weekly as-of date, rebuild the setup from bars <= D (no lookahead) and walk the actual forward bars for the realized outcome. Reports realized hit-rate/expectancy of qualified setups (and all setups, by direction) plus a probability calibration curve (predicted target prob vs realized hit rate). Reuses pure functions throughout; extracted compute_technical_from_arrays / compute_momentum_from_closes from scoring_service so live and backtest stay in sync. Runs as a weekly/triggerable 'backtest' job caching the report in a SystemSetting; GET /backtest/report serves it. Sentiment/fundamentals held neutral (no point-in-time history) — calibrates the price/S-R/probability machinery. Co-Authored-By: Claude Opus 4.8 --- app/routers/market.py | 11 + app/scheduler.py | 61 ++++++ app/services/admin_service.py | 2 + app/services/backtest_service.py | 322 ++++++++++++++++++++++++++++ app/services/scoring_service.py | 44 ++-- tests/unit/test_backtest_service.py | 118 ++++++++++ tests/unit/test_scheduler.py | 2 + 7 files changed, 548 insertions(+), 12 deletions(-) create mode 100644 app/services/backtest_service.py create mode 100644 tests/unit/test_backtest_service.py diff --git a/app/routers/market.py b/app/routers/market.py index 0e0ffdc..1626134 100644 --- a/app/routers/market.py +++ b/app/routers/market.py @@ -6,6 +6,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.dependencies import get_db, require_access from app.models.user import User from app.schemas.common import APIEnvelope +from app.services.backtest_service import get_backtest_report from app.services.market_regime_service import get_market_regime router = APIRouter(tags=["market"]) @@ -19,3 +20,13 @@ async def market_regime( """Current benchmark (SPY) trend regime: bullish / bearish / neutral.""" data = await get_market_regime(db) return APIEnvelope(status="success", data=data) + + +@router.get("/backtest/report", response_model=APIEnvelope) +async def backtest_report( + _user: User = Depends(require_access), + db: AsyncSession = Depends(get_db), +) -> APIEnvelope: + """Latest cached historical backtest report (None until the job runs).""" + data = await get_backtest_report(db) + return APIEnvelope(status="success", data=data) diff --git a/app/scheduler.py b/app/scheduler.py index 7901b0d..2058dda 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -35,6 +35,7 @@ from app.providers.fundamentals_chain import build_fundamental_provider_chain from app.providers.protocol import SentimentData from app.services import fundamental_service, ingestion_service, sentiment_service from app.services.alert_service import dispatch_alerts +from app.services.backtest_service import run_and_store as run_backtest_and_store from app.services.market_regime_service import update_market_regime from app.services.outcome_service import evaluate_pending_setups from app.services.rr_scanner_service import scan_all_tickers @@ -145,6 +146,17 @@ _job_runtime: dict[str, dict[str, object]] = { "finished_at": None, "message": None, }, + "backtest": { + "running": False, + "status": "idle", + "processed": 0, + "total": None, + "progress_pct": None, + "current_ticker": None, + "started_at": None, + "finished_at": None, + "message": None, + }, } @@ -851,6 +863,45 @@ async def compute_market_regime() -> None: })) +# --------------------------------------------------------------------------- +# Job: Backtest +# --------------------------------------------------------------------------- + + +async def run_backtest_job() -> None: + """Replay the price-derived engine over history and cache the report.""" + job_name = "backtest" + logger.info(json.dumps({"event": "job_start", "job": job_name})) + _runtime_start(job_name) + + def _on_progress(done: int, count: int, symbol: str) -> None: + _runtime_progress(job_name, processed=done, total=count, current_ticker=symbol or None) + + try: + async with async_session_factory() as db: + if not await _is_job_enabled(db, job_name): + logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") + return + + report = await run_backtest_and_store(db, _on_progress) + + _runtime_finish( + job_name, "completed", + processed=report.get("tickers", 0), total=report.get("tickers", 0), + message=f"{report.get('candidates', 0)} setups, {report.get('qualified', 0)} qualified", + ) + logger.info(json.dumps({"event": "job_complete", "job": job_name, "candidates": report.get("candidates")})) + except Exception as exc: + _runtime_finish(job_name, "error", processed=0, total=None, message=str(exc)) + logger.error(json.dumps({ + "event": "job_error", + "job": job_name, + "error_type": type(exc).__name__, + "message": str(exc), + })) + + # --------------------------------------------------------------------------- # Job: Ticker Universe Sync # --------------------------------------------------------------------------- @@ -1010,6 +1061,16 @@ def configure_scheduler() -> None: replace_existing=True, ) + # Backtest — weekly historical replay (expensive; mostly run on demand) + scheduler.add_job( + run_backtest_job, + "interval", + hours=168, + id="backtest", + name="Backtest", + replace_existing=True, + ) + logger.info( json.dumps({ "event": "scheduler_configured", diff --git a/app/services/admin_service.py b/app/services/admin_service.py index 2dfbacd..a5cbe27 100644 --- a/app/services/admin_service.py +++ b/app/services/admin_service.py @@ -484,6 +484,7 @@ VALID_JOB_NAMES = { "outcome_evaluator", "alerts", "market_regime", + "backtest", } JOB_LABELS = { @@ -495,6 +496,7 @@ JOB_LABELS = { "outcome_evaluator": "Outcome Evaluator", "alerts": "Alerts Dispatcher", "market_regime": "Market Regime", + "backtest": "Backtest", } diff --git a/app/services/backtest_service.py b/app/services/backtest_service.py new file mode 100644 index 0000000..80773da --- /dev/null +++ b/app/services/backtest_service.py @@ -0,0 +1,322 @@ +"""Historical backtest (Phase 1): replay the price-derived engine over stored +OHLCV and measure how the CURRENT config would have performed. + +For each ticker we step through history (weekly), and at each as-of date D we +rebuild the setup using only bars ≤ D (no lookahead), then walk the actual bars +after D to record the realized outcome. Two reports come out: + + - realized hit-rate / expectancy of qualified setups (and of all setups) + - a probability calibration curve: do "60% likely" targets hit ~60% of the time? + +Limitation: sentiment and fundamentals have no point-in-time history, so they're +held neutral here — this calibrates the price/S-R/probability machinery only. +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Callable +from datetime import datetime, timezone +from types import SimpleNamespace +from typing import Any + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.settings import SystemSetting +from app.models.ticker import Ticker +from app.services.admin_service import get_activation_config, update_setting +from app.services.indicator_service import _extract_ohlcv, compute_atr +from app.services.outcome_service import ( + OUTCOME_AMBIGUOUS, + OUTCOME_STOP_HIT, + OUTCOME_TARGET_HIT, + Bar, + evaluate_setup_against_bars, +) +from app.services.price_service import query_ohlcv +from app.services.qualification import best_target_probability, setup_qualifies +from app.services.recommendation_service import ( + _choose_recommended_action, + _classify_by_probability, + _risk_level_from_conflicts, + _select_primary_target, + _zone_representative_levels, + direction_analyzer, + get_recommendation_config, + probability_estimator, + signal_conflict_detector, + target_generator, +) +from app.services.scoring_service import ( + compute_momentum_from_closes, + compute_technical_from_arrays, +) +from app.services.sr_service import detect_sr_levels + +logger = logging.getLogger(__name__) + +KEY_REPORT = "backtest_report" + +STEP_DAYS = 5 # weekly cadence (≈ 5 trading days) +MIN_LOOKBACK = 60 # bars needed before D for indicators (EMA cross needs 51) +HORIZON = 30 # trading days to resolve an outcome (matches the evaluator) +ATR_MULTIPLIER = 1.5 + +_CAL_BUCKETS = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100.01)] + + +def _wrap_levels(level_dicts: list[dict]) -> list[Any]: + return [ + SimpleNamespace( + id=i, + price_level=float(d["price_level"]), + type=d["type"], + strength=int(d["strength"]), + ) + for i, d in enumerate(level_dicts) + ] + + +def _window_setups( + window_records: list, + config: dict, + activation: dict, +) -> list[dict]: + """Rebuild the setup(s) at the last bar of ``window_records`` (the as-of date), + using only those bars. Returns one dict per tradeable direction.""" + if len(window_records) < MIN_LOOKBACK: + return [] + + _, highs, lows, closes, volumes = _extract_ohlcv(window_records) + entry = closes[-1] + if entry <= 0: + return [] + + try: + atr = compute_atr(highs, lows, closes)["atr"] + except Exception: + return [] + if atr <= 0: + return [] + + sr_levels = _wrap_levels(detect_sr_levels(highs, lows, closes, volumes)) + if not sr_levels: + return [] + + technical = (compute_technical_from_arrays(highs, lows, closes, volumes)[0]) or 50.0 + momentum = (compute_momentum_from_closes(closes)[0]) or 50.0 + dim_scores = {"technical": technical, "momentum": momentum} + + conflicts = signal_conflict_detector.detect_conflicts(dim_scores, None, config) + confidences = { + "long": direction_analyzer.calculate_confidence("long", dim_scores, None, conflicts), + "short": direction_analyzer.calculate_confidence("short", dim_scores, None, conflicts), + } + + # First pass: build targets per direction + per_dir: dict[str, dict] = {} + for direction in ("long", "short"): + stop = entry - atr * ATR_MULTIPLIER if direction == "long" else entry + atr * ATR_MULTIPLIER + zone_levels = _zone_representative_levels(sr_levels, entry) + targets = target_generator.generate_targets(direction, entry, stop, zone_levels, atr) + if not targets: + continue + for t in targets: + t["probability"] = probability_estimator.estimate_probability( + t, dim_scores, None, direction, config + ) + t["classification"] = _classify_by_probability(t["probability"]) + primary = _select_primary_target(targets) + if primary is None: + continue + per_dir[direction] = {"stop": stop, "targets": targets, "primary": primary} + + available = set(per_dir.keys()) + if not available: + return [] + + action = _choose_recommended_action(confidences["long"], confidences["short"], config, available) + + out: list[dict] = [] + for direction, data in per_dir.items(): + targets, primary, stop = data["targets"], data["primary"], data["stop"] + setup_conflicts = list(conflicts) + if len(targets) < 3: + setup_conflicts.append("target-availability: Fewer than 3 valid S/R targets available") + risk_level = _risk_level_from_conflicts(setup_conflicts) + rr = float(primary["rr_ratio"]) + target_price = float(primary["price"]) + + setup_ns = SimpleNamespace( + rr_ratio=rr, + confidence_score=confidences[direction], + recommended_action=action, + risk_level=risk_level, + targets=targets, + direction=direction, + target=target_price, + stop_loss=stop, + entry_price=entry, + ) + out.append({ + "direction": direction, + "entry": entry, + "stop": stop, + "target": target_price, + "rr": rr, + "confidence": confidences[direction], + "primary_prob": float(primary["probability"]), + "best_prob": best_target_probability(setup_ns), + "action": action, + "risk_level": risk_level, + "qualified": setup_qualifies(setup_ns, activation), + }) + return out + + +def _replay_ticker(symbol: str, records: list, config: dict, activation: dict) -> list[dict]: + """Walk one ticker's history weekly, building setups and their realized outcomes.""" + candidates: list[dict] = [] + n = len(records) + if n < MIN_LOOKBACK + HORIZON: + return candidates + + for i in range(MIN_LOOKBACK - 1, n - HORIZON, STEP_DAYS): + window = records[: i + 1] + forward = records[i + 1 :] + forward_bars = [Bar(date=r.date, high=r.high, low=r.low) for r in forward] + + for s in _window_setups(window, config, activation): + outcome, _ = evaluate_setup_against_bars( + s["direction"], s["stop"], s["target"], forward_bars, HORIZON + ) + if outcome is None: + continue + target_hit = outcome == OUTCOME_TARGET_HIT + if outcome == OUTCOME_TARGET_HIT: + realized_r = s["rr"] + elif outcome in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS): + realized_r = -1.0 + else: # expired + realized_r = 0.0 + candidates.append({ + "symbol": symbol, + "date": records[i].date.isoformat(), + "direction": s["direction"], + "rr": s["rr"], + "confidence": s["confidence"], + "primary_prob": s["primary_prob"], + "qualified": s["qualified"], + "outcome": outcome, + "target_hit": target_hit, + "realized_r": realized_r, + }) + return candidates + + +def _bucket_stats(cands: list[dict]) -> dict: + wins = sum(1 for c in cands if c["target_hit"]) + losses = sum(1 for c in cands if c["outcome"] in (OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS)) + expired = sum(1 for c in cands if c["outcome"] not in (OUTCOME_TARGET_HIT, OUTCOME_STOP_HIT, OUTCOME_AMBIGUOUS)) + decided = wins + losses + rs = [c["realized_r"] for c in cands] + return { + "total": len(cands), + "wins": wins, + "losses": losses, + "expired": expired, + "hit_rate": round(wins / decided * 100, 1) if decided else None, + "avg_r": round(sum(rs) / len(rs), 3) if rs else None, + "total_r": round(sum(rs), 2) if rs else None, + } + + +def _calibration(cands: list[dict]) -> list[dict]: + """Predicted target probability vs realized hit rate, per probability bucket.""" + rows: list[dict] = [] + for lo, hi in _CAL_BUCKETS: + bucket = [c for c in cands if lo <= c["primary_prob"] < hi] + if not bucket: + continue + hits = sum(1 for c in bucket if c["target_hit"]) + rows.append({ + "bucket": f"{int(lo)}-{int(min(hi, 100))}%", + "n": len(bucket), + "predicted_avg": round(sum(c["primary_prob"] for c in bucket) / len(bucket), 1), + "realized_hit_rate": round(hits / len(bucket) * 100, 1), + }) + return rows + + +async def run_backtest( + db: AsyncSession, + progress_cb: Callable[[int, int, str], None] | None = None, +) -> dict: + """Replay every ticker and aggregate the Phase-1 reports for the current config.""" + config = await get_recommendation_config(db) + activation = await get_activation_config(db) + + result = await db.execute(select(Ticker).order_by(Ticker.symbol)) + tickers = list(result.scalars().all()) + total = len(tickers) + + candidates: list[dict] = [] + for index, ticker in enumerate(tickers): + if progress_cb is not None: + progress_cb(index, total, ticker.symbol) + try: + records = await query_ohlcv(db, ticker.symbol) + candidates.extend(_replay_ticker(ticker.symbol, records, config, activation)) + except Exception: + logger.exception("Backtest replay failed for %s", ticker.symbol) + + if progress_cb is not None and total: + progress_cb(total, total, "") + + qualified = [c for c in candidates if c["qualified"]] + longs = [c for c in qualified if c["direction"] == "long"] + shorts = [c for c in qualified if c["direction"] == "short"] + + return { + "generated_at": datetime.now(timezone.utc).isoformat(), + "tickers": total, + "candidates": len(candidates), + "qualified": len(qualified), + "params": {"step_days": STEP_DAYS, "horizon_days": HORIZON, "min_lookback": MIN_LOOKBACK}, + "activation": activation, + "overall_qualified": _bucket_stats(qualified), + "overall_all": _bucket_stats(candidates), + "by_direction": { + "long": _bucket_stats(longs), + "short": _bucket_stats(shorts), + }, + "calibration": _calibration(candidates), + "note": ( + "Sentiment & fundamentals held neutral (no point-in-time history). " + "~6 months ≈ one market regime — treat as directional, not gospel." + ), + } + + +async def run_and_store( + db: AsyncSession, + progress_cb: Callable[[int, int, str], None] | None = None, +) -> dict: + """Run the backtest and cache the report in a SystemSetting. Job entrypoint.""" + report = await run_backtest(db, progress_cb) + await update_setting(db, KEY_REPORT, json.dumps(report)) + return report + + +async def get_backtest_report(db: AsyncSession) -> dict | None: + """Return the last cached backtest report, or None if never run.""" + result = await db.execute(select(SystemSetting).where(SystemSetting.key == KEY_REPORT)) + setting = result.scalar_one_or_none() + if setting is None: + return None + try: + return json.loads(setting.value) + except (TypeError, ValueError): + return None diff --git a/app/services/scoring_service.py b/app/services/scoring_service.py index 2bbb63d..16f05b4 100644 --- a/app/services/scoring_service.py +++ b/app/services/scoring_service.py @@ -88,11 +88,28 @@ async def _save_weights(db: AsyncSession, weights: dict[str, float]) -> None: async def _compute_technical_score( db: AsyncSession, symbol: str ) -> tuple[float | None, dict | None]: - """Compute technical dimension score from ADX, EMA, RSI, EMA Cross, - Volume Profile and Pivot Points. + """Compute technical dimension score from stored OHLCV (DB wrapper).""" + from app.services.indicator_service import _extract_ohlcv + from app.services.price_service import query_ohlcv - Returns (score, breakdown) where breakdown follows the ScoreBreakdown - TypedDict shape: {sub_scores, formula, unavailable}. + records = await query_ohlcv(db, symbol) + if not records: + return None, None + + _, highs, lows, closes, volumes = _extract_ohlcv(records) + return compute_technical_from_arrays(highs, lows, closes, volumes) + + +def compute_technical_from_arrays( + highs: list[float], + lows: list[float], + closes: list[float], + volumes: list[int], +) -> tuple[float | None, dict | None]: + """Technical score from raw OHLCV arrays — ADX, EMA, RSI, EMA Cross, Volume + Profile, Pivot Points. Pure (no DB) so the backtest can compute it as-of-date. + + Returns (score, breakdown). """ from app.services.indicator_service import ( compute_adx, @@ -101,16 +118,11 @@ async def _compute_technical_score( compute_pivot_points, compute_rsi, compute_volume_profile, - _extract_ohlcv, ) - from app.services.price_service import query_ohlcv - records = await query_ohlcv(db, symbol) - if not records: + if not closes: return None, None - _, highs, lows, closes, volumes = _extract_ohlcv(records) - formula = ( "Weighted average: 0.30*ADX + 0.20*EMA + 0.20*RSI + 0.15*EMA_Cross " "+ 0.10*Volume_Profile + 0.05*Pivot_Points, re-normalized if any " @@ -514,13 +526,21 @@ async def _compute_momentum_score( """ from app.services.price_service import query_ohlcv - formula = "Weighted average: 0.5 * ROC_5 + 0.5 * ROC_20, re-normalized if any sub-score unavailable." - records = await query_ohlcv(db, symbol) if not records or len(records) < 6: return None, None closes = [float(r.close) for r in records] + return compute_momentum_from_closes(closes) + + +def compute_momentum_from_closes(closes: list[float]) -> tuple[float | None, dict | None]: + """Momentum score (5- and 20-day ROC) from a close series. Pure (no DB).""" + formula = "Weighted average: 0.5 * ROC_5 + 0.5 * ROC_20, re-normalized if any sub-score unavailable." + + if not closes or len(closes) < 6: + return None, None + latest = closes[-1] scores: list[tuple[float, float]] = [] # (weight, score) diff --git a/tests/unit/test_backtest_service.py b/tests/unit/test_backtest_service.py new file mode 100644 index 0000000..7770715 --- /dev/null +++ b/tests/unit/test_backtest_service.py @@ -0,0 +1,118 @@ +"""Tests for the historical backtest harness.""" + +from __future__ import annotations + +import math +from datetime import date, timedelta + +import pytest + +from app.models.ohlcv import OHLCVRecord +from app.models.ticker import Ticker +from app.services import backtest_service as bt +from app.services.outcome_service import ( + OUTCOME_EXPIRED, + OUTCOME_STOP_HIT, + OUTCOME_TARGET_HIT, +) +from tests.conftest import _test_session_factory # type: ignore + + +@pytest.fixture +async def session(): + async with _test_session_factory() as s: + yield s + + +def _cand(prob: float, outcome: str, rr: float, qualified: bool = True, direction: str = "long") -> dict: + target_hit = outcome == OUTCOME_TARGET_HIT + realized = rr if target_hit else (0.0 if outcome == OUTCOME_EXPIRED else -1.0) + return { + "primary_prob": prob, + "outcome": outcome, + "target_hit": target_hit, + "rr": rr, + "realized_r": realized, + "qualified": qualified, + "direction": direction, + } + + +def test_bucket_stats_counts_and_expectancy(): + cands = [ + _cand(70, OUTCOME_TARGET_HIT, 3.0), # +3R win + _cand(60, OUTCOME_TARGET_HIT, 2.0), # +2R win + _cand(40, OUTCOME_STOP_HIT, 3.0), # -1R loss + _cand(30, OUTCOME_EXPIRED, 3.0), # 0R expired + ] + s = bt._bucket_stats(cands) + assert s["total"] == 4 + assert s["wins"] == 2 + assert s["losses"] == 1 + assert s["expired"] == 1 + # hit rate is over decided (wins+losses) only + assert s["hit_rate"] == round(2 / 3 * 100, 1) + # avg R = (3 + 2 - 1 + 0) / 4 = 1.0 + assert s["avg_r"] == 1.0 + assert s["total_r"] == 4.0 + + +def test_bucket_stats_empty(): + s = bt._bucket_stats([]) + assert s["total"] == 0 + assert s["hit_rate"] is None + assert s["avg_r"] is None + + +def test_calibration_buckets(): + cands = [ + _cand(65, OUTCOME_TARGET_HIT, 2.0), + _cand(62, OUTCOME_STOP_HIT, 2.0), + _cand(15, OUTCOME_STOP_HIT, 2.0), + ] + rows = bt._calibration(cands) + by_bucket = {r["bucket"]: r for r in rows} + assert by_bucket["60-80%"]["n"] == 2 + assert by_bucket["60-80%"]["realized_hit_rate"] == 50.0 # 1 of 2 hit + assert by_bucket["0-20%"]["n"] == 1 + assert by_bucket["0-20%"]["realized_hit_rate"] == 0.0 + + +def test_window_setups_too_short_returns_empty(): + assert bt._window_setups([], {}, {}) == [] + + +async def _seed_oscillating_ticker(session, symbol: str, n: int = 160) -> None: + t = Ticker(symbol=symbol) + session.add(t) + await session.flush() + base = date(2025, 1, 1) + for i in range(n): + close = 100.0 + 8.0 * math.sin(i / 6.0) + session.add(OHLCVRecord( + ticker_id=t.id, + date=base + timedelta(days=i), + open=close, + high=close + 1.5, + low=close - 1.5, + close=close, + volume=1_000_000 + (i % 5) * 1000, + )) + await session.commit() + + +async def test_run_backtest_smoke(session): + await _seed_oscillating_ticker(session, "OSC") + report = await bt.run_backtest(session) + + # well-formed report + assert report["tickers"] == 1 + assert isinstance(report["candidates"], int) + for key in ("overall_qualified", "overall_all", "by_direction", "calibration"): + assert key in report + # the oscillating series should yield at least some resolved setups + assert report["candidates"] >= 1 + # every calibration row is internally consistent + for row in report["calibration"]: + assert 0 <= row["realized_hit_rate"] <= 100 + assert row["n"] >= 1 diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 051738c..ad4fe6c 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -83,6 +83,7 @@ class TestConfigureScheduler: "outcome_evaluator", "alerts", "market_regime", + "backtest", } def test_configure_is_idempotent(self): @@ -93,6 +94,7 @@ class TestConfigureScheduler: # Each ID should appear exactly once assert sorted(job_ids) == sorted([ "alerts", + "backtest", "data_collector", "fundamental_collector", "market_regime",