"""Tests for the cross-sectional signal-evaluation (factor IC) pass.""" from __future__ import annotations import os import pickle import random from datetime import date, timedelta from types import SimpleNamespace from app.services import backtest_service as bt # --------------------------------------------------------------------------- # Rank-correlation primitives # --------------------------------------------------------------------------- def test_spearman_monotonic_is_one(): xs = [1.0, 2.0, 3.0, 4.0, 5.0] ys = [10.0, 20.0, 30.0, 40.0, 50.0] assert bt._spearman(xs, ys) == 1.0 def test_spearman_inverse_is_minus_one(): xs = [1.0, 2.0, 3.0, 4.0, 5.0] ys = [5.0, 4.0, 3.0, 2.0, 1.0] assert bt._spearman(xs, ys) == -1.0 def test_spearman_handles_ties_without_crashing(): xs = [1.0, 1.0, 2.0, 2.0, 3.0] ys = [1.0, 2.0, 2.0, 3.0, 3.0] ic = bt._spearman(xs, ys) assert ic is not None and 0.0 < ic <= 1.0 def test_spearman_none_when_degenerate(): # A flat array has zero variance → correlation undefined. assert bt._spearman([1.0, 1.0, 1.0, 1.0], [1.0, 2.0, 3.0, 4.0]) is None assert bt._spearman([1.0], [2.0]) is None def test_quintile_spread_sign_follows_signal(): # signal == fwd return: top quintile clearly beats bottom → positive spread. pairs = [(float(i), float(i)) for i in range(20)] spread = bt._quintile_spread(pairs) assert spread is not None and spread > 0 # Top quintile mean (17,18,19,16) - bottom (0,1,2,3) = 16.0 assert spread == (17 + 18 + 19 + 16) / 4 - (0 + 1 + 2 + 3) / 4 def test_quintile_spread_none_when_too_few(): assert bt._quintile_spread([(1.0, 1.0)] * 9) is None # --------------------------------------------------------------------------- # Signal value extraction (point-in-time, price-only) # --------------------------------------------------------------------------- def test_signal_values_momentum_and_trend(): # Steadily rising series so every lookback is positive and trend is above SMA. closes = [100.0 * (1.01 ** k) for k in range(300)] i = 299 vals = bt._signal_values(closes, closes, i) assert vals["mom_12_1"] > 0 # up over the 12→1 month window assert vals["trend_200"] > 0 # price above its 200-bar SMA in an uptrend # 12-1 momentum skips the last month: close[i-21] / close[i-252] - 1 assert vals["mom_12_1"] == closes[i - 21] / closes[i - 252] - 1.0 # Strictly rising → today IS the 52-week high (highs==closes here) → ratio 1.0 assert vals["high_52w"] == 1.0 assert vals["vol_6m"] > 0 # realized vol is defined and positive def test_signal_values_drops_signals_without_enough_history(): closes = [100.0 + k for k in range(80)] # only 80 bars vals = bt._signal_values(closes, closes, 79) assert "mom_3_1" in vals # needs 63 bars of lookback — present assert "mom_6_1" not in vals # needs 126 — absent assert "mom_12_1" not in vals # needs 252 — absent assert "trend_200" not in vals # needs 200 — absent assert "high_52w" not in vals # needs 252 — absent assert "vol_6m" not in vals # needs 126 — absent # --------------------------------------------------------------------------- # End-to-end aggregation: a predictive signal scores, noise does not # --------------------------------------------------------------------------- def _records(closes: list[float]) -> list[SimpleNamespace]: start = date(2020, 1, 1) return [ SimpleNamespace(date=start + timedelta(days=k), close=c, high=c) for k, c in enumerate(closes) ] def test_signal_evaluation_separates_edge_from_noise(): rng = random.Random(42) # 120 consecutive weeks, 40 names each. After non-overlapping thinning # (stride = HORIZON/5 = 6) that leaves 20 independent windows — above the # reliability bar. "edge" perfectly orders the forward return; "noise" is # independent of it. collected: dict = {"edge": {}, "noise": {}} for week in range(120): edge_recs = [] noise_recs = [] for _ in range(40): fwd = rng.gauss(0, 0.05) edge_recs.append((fwd, fwd)) # signal == fwd → IC = 1 noise_recs.append((rng.gauss(0, 1), fwd)) # signal ⟂ fwd → IC ≈ 0 collected["edge"][(2020, week)] = edge_recs collected["noise"][(2020, week)] = noise_recs rows = {r["signal"]: r for r in bt._signal_evaluation(collected)} assert rows["edge"]["mean_ic"] == 1.0 assert rows["edge"]["weeks"] == 20 # 120 weeks thinned to non-overlapping assert rows["edge"]["reliable"] is True assert rows["edge"]["ic_positive_pct"] == 100.0 assert rows["edge"]["mean_quintile_spread"] > 0 assert abs(rows["noise"]["mean_ic"]) < 0.15 # indistinguishable from zero # Rows are sorted by mean_ic descending: the real signal ranks first. assert bt._signal_evaluation(collected)[0]["signal"] == "edge" def test_signal_evaluation_flags_too_few_windows_unreliable(): # 5 adjacent weeks collapse to a single non-overlapping window → unreliable. collected: dict = { "edge": {(2020, w): [(float(i), float(i)) for i in range(40)] for w in range(5)} } row = bt._signal_evaluation(collected)[0] assert row["weeks"] == 1 assert row["reliable"] is False def test_nonoverlapping_weeks_thins_by_stride(): weeks = [(2020, w) for w in range(1, 13)] # 12 consecutive ISO weeks kept = bt._nonoverlapping_weeks(weeks, stride=6) assert kept == [(2020, 1), (2020, 7)] # 6 apart, no overlap # Stride 1 keeps everything; ordering is chronological. assert bt._nonoverlapping_weeks(list(reversed(weeks)), stride=1) == weeks def test_signal_evaluation_skips_thin_weeks(): # A week with fewer than MIN_CROSS_SECTION names is ignored entirely. collected: dict = {"edge": {(2020, 1): [(1.0, 1.0)] * (bt.MIN_CROSS_SECTION - 1)}} assert bt._signal_evaluation(collected) == [] def test_accumulate_signal_series_emits_weekly_pairs(): closes = [100.0 * (1.005 ** k) for k in range(400)] collected: dict = {} from collections import defaultdict collected = defaultdict(lambda: defaultdict(list)) bt._accumulate_signal_series(_records(closes), collected) # The long, rising series should yield momentum + trend observations... assert "mom_12_1" in collected and len(collected["mom_12_1"]) > 0 # ...one per ISO week, with a forward return attached to each pair. sample = next(iter(collected["mom_12_1"].values())) assert all(len(pair) == 2 for pair in sample) # --------------------------------------------------------------------------- # Parallel-replay plumbing (process pool): plain/picklable results, worker count # --------------------------------------------------------------------------- def test_signal_series_is_plain_and_picklable(): from collections import defaultdict closes = [100.0 * (1.003 ** k) for k in range(400)] series = bt._signal_series(_records(closes)) # Must be plain dicts (no defaultdict/lambda) so it survives a process boundary. assert type(series) is dict assert all(type(weeks) is dict for weeks in series.values()) pickle.dumps(series) # the worker's return is pickled to the parent — must not raise # ...and equivalent to the in-place accumulator. acc = defaultdict(lambda: defaultdict(list)) bt._accumulate_signal_series(_records(closes), acc) assert series == {name: dict(w) for name, w in acc.items()} def test_worker_count_caps_to_cpu_minus_one(monkeypatch): monkeypatch.setattr(bt.settings, "backtest_workers", 1000) assert bt._backtest_worker_count() == max(1, (os.cpu_count() or 1) - 1) def test_worker_count_one_disables(monkeypatch): monkeypatch.setattr(bt.settings, "backtest_workers", 1) assert bt._backtest_worker_count() == 1 def test_mp_context_is_none_or_posix(): ctx = bt._mp_context() # None on spawn-only platforms (Windows); a safe POSIX context otherwise. assert ctx is None or ctx.get_start_method() in ("fork", "forkserver")