ebff19940b
A new /regime tab scoring how far the AI/Tech bull regime has deteriorated toward a re-rating as a single 0-100 index with per-signal breakdown and a 7/30-day trend. Intentionally decoupled: nothing reads its output to gate or score trades — the daily-pipeline membership is scheduling only. - regime_monitor_service: price sub-scores (P1-P6 via Alpaca, like market_regime), VIX + HY credit spreads via a small FRED helper, weighted aggregation over available signals (missing source -> n/a, dropped from the denominator), one snapshot row/day, and a ~90-day history backfill by replaying the already-fetched series as-of each past day. - F1/F3 fundamentals proposed by the configured grounded LLM (reuses sentiment_provider_service config resolution), with a manual override + lock. - regime_snapshots table (migration 011); endpoints on the existing market router; admin-editable weights/threshold; standalone /regime page. Data needs: prices via Alpaca, VIX/credit via FRED (optional key — signals show n/a without it). No LLM needed for history. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
117 lines
3.6 KiB
Python
117 lines
3.6 KiB
Python
"""Unit tests for app.scheduler module."""
|
|
|
|
import pytest
|
|
|
|
from app.scheduler import (
|
|
_is_job_enabled,
|
|
_parse_frequency,
|
|
_resume_tickers,
|
|
_last_successful,
|
|
configure_scheduler,
|
|
scheduler,
|
|
)
|
|
|
|
|
|
class TestParseFrequency:
|
|
def test_hourly(self):
|
|
assert _parse_frequency("hourly") == {"hours": 1}
|
|
|
|
def test_daily(self):
|
|
assert _parse_frequency("daily") == {"hours": 24}
|
|
|
|
def test_case_insensitive(self):
|
|
assert _parse_frequency("Hourly") == {"hours": 1}
|
|
assert _parse_frequency("DAILY") == {"hours": 24}
|
|
|
|
def test_weekly_maps_to_one_week(self):
|
|
assert _parse_frequency("weekly") == {"weeks": 1}
|
|
|
|
def test_unknown_defaults_to_daily(self):
|
|
assert _parse_frequency("monthly") == {"hours": 24}
|
|
assert _parse_frequency("") == {"hours": 24}
|
|
|
|
|
|
class TestResumeTickers:
|
|
def test_no_previous_returns_full_list(self):
|
|
symbols = ["AAPL", "GOOG", "MSFT"]
|
|
_last_successful["test_job"] = None
|
|
result = _resume_tickers(symbols, "test_job")
|
|
assert result == ["AAPL", "GOOG", "MSFT"]
|
|
|
|
def test_resume_after_first(self):
|
|
symbols = ["AAPL", "GOOG", "MSFT"]
|
|
_last_successful["test_job"] = "AAPL"
|
|
result = _resume_tickers(symbols, "test_job")
|
|
# Should start from GOOG, then wrap around
|
|
assert result == ["GOOG", "MSFT", "AAPL"]
|
|
|
|
def test_resume_after_middle(self):
|
|
symbols = ["AAPL", "GOOG", "MSFT", "TSLA"]
|
|
_last_successful["test_job"] = "GOOG"
|
|
result = _resume_tickers(symbols, "test_job")
|
|
assert result == ["MSFT", "TSLA", "AAPL", "GOOG"]
|
|
|
|
def test_resume_after_last(self):
|
|
symbols = ["AAPL", "GOOG", "MSFT"]
|
|
_last_successful["test_job"] = "MSFT"
|
|
result = _resume_tickers(symbols, "test_job")
|
|
# All already processed, wraps to full list
|
|
assert result == ["AAPL", "GOOG", "MSFT"]
|
|
|
|
def test_unknown_last_returns_full_list(self):
|
|
symbols = ["AAPL", "GOOG", "MSFT"]
|
|
_last_successful["test_job"] = "NVDA"
|
|
result = _resume_tickers(symbols, "test_job")
|
|
assert result == ["AAPL", "GOOG", "MSFT"]
|
|
|
|
def test_empty_list(self):
|
|
_last_successful["test_job"] = "AAPL"
|
|
result = _resume_tickers([], "test_job")
|
|
assert result == []
|
|
|
|
|
|
class TestConfigureScheduler:
|
|
def test_configure_adds_all_jobs(self):
|
|
# Remove any existing jobs first
|
|
scheduler.remove_all_jobs()
|
|
configure_scheduler()
|
|
jobs = scheduler.get_jobs()
|
|
job_ids = {j.id for j in jobs}
|
|
assert job_ids == {
|
|
"data_collector",
|
|
"data_backfill",
|
|
"sentiment_collector",
|
|
"fundamental_collector",
|
|
"rr_scanner",
|
|
"ticker_universe_sync",
|
|
"outcome_evaluator",
|
|
"alerts",
|
|
"market_regime",
|
|
"regime_monitor",
|
|
"backtest",
|
|
"daily_pipeline",
|
|
"intraday_pipeline",
|
|
}
|
|
|
|
def test_configure_is_idempotent(self):
|
|
scheduler.remove_all_jobs()
|
|
configure_scheduler()
|
|
configure_scheduler() # Should replace, not duplicate
|
|
job_ids = [j.id for j in scheduler.get_jobs()]
|
|
# Each ID should appear exactly once
|
|
assert sorted(job_ids) == sorted([
|
|
"alerts",
|
|
"backtest",
|
|
"daily_pipeline",
|
|
"intraday_pipeline",
|
|
"data_collector",
|
|
"data_backfill",
|
|
"fundamental_collector",
|
|
"market_regime",
|
|
"regime_monitor",
|
|
"outcome_evaluator",
|
|
"rr_scanner",
|
|
"sentiment_collector",
|
|
"ticker_universe_sync",
|
|
])
|