From 437ceacfc14522c1402870fb376f159836534424 Mon Sep 17 00:00:00 2001 From: Dennis Thiessen Date: Wed, 24 Jun 2026 11:23:39 +0200 Subject: [PATCH] refactor: dedupe scheduler logging/runtime, centralize SystemSetting access, fix rankings N+1 Behavior-preserving cleanup (345 tests pass, ruff clean): - scheduler: replace 62 inline logger.x(json.dumps({...})) calls with a _log_event helper, and collapse 11 identical _job_runtime dicts into an _idle_runtime() factory over _JOB_NAMES. - settings: add app/services/settings_store.py (get_setting/get_value/get_map/ upsert_setting) and route ~13 hand-rolled SystemSetting queries + two identical _settings_map helpers through it. - scoring.get_rankings: collapse the per-ticker N+1 (3-4 queries + a commit each) into 2 bulk reads + a single conditional commit; drop the redundant re-fetch. Lazy recompute-on-read is preserved. Adds first tests for get_rankings. Net ~ -245 lines across the touched modules. --- app/scheduler.py | 421 +++++--------------- app/services/admin_service.py | 37 +- app/services/alert_service.py | 11 +- app/services/auth_service.py | 7 +- app/services/backtest_service.py | 5 +- app/services/market_regime_service.py | 6 +- app/services/scoring_service.py | 115 ++---- app/services/sentiment_provider_service.py | 14 +- app/services/settings_store.py | 46 +++ app/services/ticker_universe_service.py | 15 +- tests/unit/test_scoring_service_rankings.py | 129 ++++++ 11 files changed, 341 insertions(+), 465 deletions(-) create mode 100644 app/services/settings_store.py create mode 100644 tests/unit/test_scoring_service_rankings.py diff --git a/app/scheduler.py b/app/scheduler.py index 4ee2d5f..c232fff 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -27,14 +27,13 @@ from app.config import settings from app.database import async_session_factory from app.models.fundamental import FundamentalData from app.models.ohlcv import OHLCVRecord -from app.models.settings import SystemSetting from app.models.sentiment import SentimentScore from app.models.ticker import Ticker from app.exceptions import ProviderError from app.providers.alpaca import AlpacaOHLCVProvider from app.providers.fundamentals_chain import build_fundamental_provider_chain from app.providers.protocol import SentimentData -from app.services import fundamental_service, ingestion_service, sentiment_service +from app.services import fundamental_service, ingestion_service, sentiment_service, settings_store from app.services.alert_service import dispatch_alerts from app.services.backtest_service import run_and_store as run_backtest_and_store from app.services.market_regime_service import update_market_regime @@ -70,8 +69,25 @@ _last_successful: dict[str, str | None] = { "fundamental_collector": None, } -_job_runtime: dict[str, dict[str, object]] = { - "data_collector": { +# Jobs whose per-run progress is surfaced to Admin → Jobs. (outcome_evaluator is +# created lazily on first run via _runtime_start.) +_JOB_NAMES = [ + "data_collector", + "data_backfill", + "sentiment_collector", + "fundamental_collector", + "rr_scanner", + "ticker_universe_sync", + "alerts", + "market_regime", + "backtest", + "daily_pipeline", + "intraday_pipeline", +] + + +def _idle_runtime() -> dict[str, object]: + return { "running": False, "status": "idle", "processed": 0, @@ -81,118 +97,10 @@ _job_runtime: dict[str, dict[str, object]] = { "started_at": None, "finished_at": None, "message": None, - }, - "data_backfill": { - "running": False, - "status": "idle", - "processed": 0, - "total": None, - "progress_pct": None, - "current_ticker": None, - "started_at": None, - "finished_at": None, - "message": None, - }, - "sentiment_collector": { - "running": False, - "status": "idle", - "processed": 0, - "total": None, - "progress_pct": None, - "current_ticker": None, - "started_at": None, - "finished_at": None, - "message": None, - }, - "fundamental_collector": { - "running": False, - "status": "idle", - "processed": 0, - "total": None, - "progress_pct": None, - "current_ticker": None, - "started_at": None, - "finished_at": None, - "message": None, - }, - "rr_scanner": { - "running": False, - "status": "idle", - "processed": 0, - "total": None, - "progress_pct": None, - "current_ticker": None, - "started_at": None, - "finished_at": None, - "message": None, - }, - "ticker_universe_sync": { - "running": False, - "status": "idle", - "processed": 0, - "total": None, - "progress_pct": None, - "current_ticker": None, - "started_at": None, - "finished_at": None, - "message": None, - }, - "alerts": { - "running": False, - "status": "idle", - "processed": 0, - "total": None, - "progress_pct": None, - "current_ticker": None, - "started_at": None, - "finished_at": None, - "message": None, - }, - "market_regime": { - "running": False, - "status": "idle", - "processed": 0, - "total": None, - "progress_pct": None, - "current_ticker": None, - "started_at": None, - "finished_at": None, - "message": None, - }, - "backtest": { - "running": False, - "status": "idle", - "processed": 0, - "total": None, - "progress_pct": None, - "current_ticker": None, - "started_at": None, - "finished_at": None, - "message": None, - }, - "daily_pipeline": { - "running": False, - "status": "idle", - "processed": 0, - "total": None, - "progress_pct": None, - "current_ticker": None, - "started_at": None, - "finished_at": None, - "message": None, - }, - "intraday_pipeline": { - "running": False, - "status": "idle", - "processed": 0, - "total": None, - "progress_pct": None, - "current_ticker": None, - "started_at": None, - "finished_at": None, - "message": None, - }, -} + } + + +_job_runtime: dict[str, dict[str, object]] = {name: _idle_runtime() for name in _JOB_NAMES} # --------------------------------------------------------------------------- @@ -200,30 +108,27 @@ _job_runtime: dict[str, dict[str, object]] = { # --------------------------------------------------------------------------- +def _log_event(level: int, event: str, **fields: object) -> None: + """Emit a structured JSON log line: {"event": ..., **fields}.""" + logger.log(level, json.dumps({"event": event, **fields})) + + def _log_job_error(job_name: str, ticker: str, error: Exception) -> None: - """Log a job error as structured JSON.""" - logger.error( - json.dumps({ - "event": "job_error", - "job": job_name, - "ticker": ticker, - "error_type": type(error).__name__, - "message": str(error), - }) + """Log a per-ticker job error as structured JSON.""" + _log_event( + logging.ERROR, "job_error", job=job_name, ticker=ticker, + error_type=type(error).__name__, message=str(error), ) def _runtime_start(job_name: str, total: int | None = None, message: str | None = None) -> None: - now = datetime.now(timezone.utc).isoformat() _job_runtime[job_name] = { + **_idle_runtime(), "running": True, "status": "running", - "processed": 0, "total": total, "progress_pct": 0.0 if total and total > 0 else None, - "current_ticker": None, - "started_at": now, - "finished_at": None, + "started_at": datetime.now(timezone.utc).isoformat(), "message": message, } @@ -280,14 +185,8 @@ def get_job_runtime_snapshot(job_name: str | None = None) -> dict[str, dict[str, async def _is_job_enabled(db: AsyncSession, job_name: str) -> bool: """Check SystemSetting for job enabled state. Defaults to True.""" - key = f"job_{job_name}_enabled" - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == key) - ) - setting = result.scalar_one_or_none() - if setting is None: - return True - return setting.value.lower() == "true" + setting = await settings_store.get_setting(db, f"job_{job_name}_enabled") + return setting is None or setting.value.lower() == "true" async def _get_all_tickers(db: AsyncSession) -> list[str]: @@ -418,7 +317,7 @@ async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_colle the manual data_backfill job to deepen shallow histories. ``job_name`` lets the backfill report its own runtime/resume state separate from data_collector. """ - logger.info(json.dumps({"event": "job_start", "job": job_name})) + _log_event(logging.INFO, "job_start", job=job_name) _runtime_start(job_name) processed = 0 total: int | None = None @@ -426,13 +325,13 @@ async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_colle try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): - logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") return symbols = await _get_ohlcv_priority_tickers(db) if not symbols: - logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0})) + _log_event(logging.INFO, "job_complete", job=job_name, tickers=0) _runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers") return @@ -441,14 +340,14 @@ async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_colle # Build provider (skip if keys not configured) if not settings.alpaca_api_key or not settings.alpaca_api_secret: - logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "alpaca keys not configured"})) + _log_event(logging.WARNING, "job_skipped", job=job_name, reason="alpaca keys not configured") _runtime_finish(job_name, "skipped", processed=0, total=total, message="Alpaca keys not configured") return try: provider = AlpacaOHLCVProvider(settings.alpaca_api_key, settings.alpaca_api_secret) except Exception as exc: - logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) _runtime_finish(job_name, "error", processed=0, total=total, message=str(exc)) return @@ -469,21 +368,10 @@ async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_colle _last_successful[job_name] = symbol processed += 1 _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) - logger.info(json.dumps({ - "event": "ticker_collected", - "job": job_name, - "ticker": symbol, - "status": result.status, - "records": result.records_ingested, - })) + _log_event(logging.INFO, "ticker_collected", job=job_name, ticker=symbol, status=result.status, records=result.records_ingested) if result.status == "partial": # Rate limited — stop and resume next run - logger.warning(json.dumps({ - "event": "rate_limited", - "job": job_name, - "ticker": symbol, - "processed": processed, - })) + _log_event(logging.WARNING, "rate_limited", job=job_name, ticker=symbol, processed=processed) _runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}") return except Exception as exc: @@ -491,10 +379,10 @@ async def collect_ohlcv(full_backfill: bool = False, job_name: str = "data_colle # Reset resume pointer on full completion _last_successful[job_name] = None - logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed})) + _log_event(logging.INFO, "job_complete", job=job_name, tickers=processed) _runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers") except Exception as exc: - logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) _runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc)) @@ -521,7 +409,7 @@ async def collect_sentiment() -> None: successful ticker for resume. """ job_name = "sentiment_collector" - logger.info(json.dumps({"event": "job_start", "job": job_name})) + _log_event(logging.INFO, "job_start", job=job_name) _runtime_start(job_name) processed = 0 total: int | None = None @@ -529,13 +417,13 @@ async def collect_sentiment() -> None: try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): - logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") return symbols = await _get_sentiment_priority_tickers(db) if not symbols: - logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0})) + _log_event(logging.INFO, "job_complete", job=job_name, tickers=0) _runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers") return @@ -546,11 +434,11 @@ async def collect_sentiment() -> None: async with async_session_factory() as cfg_db: provider = await build_sentiment_provider(cfg_db) except ProviderError as exc: - logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": str(exc)})) + _log_event(logging.WARNING, "job_skipped", job=job_name, reason=str(exc)) _runtime_finish(job_name, "skipped", processed=0, total=total, message=str(exc)) return except Exception as exc: - logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) _runtime_finish(job_name, "error", processed=0, total=total, message=str(exc)) return @@ -568,20 +456,10 @@ async def collect_sentiment() -> None: except Exception as exc: msg = str(exc).lower() if "rate" in msg or "quota" in msg or "429" in msg: - logger.warning(json.dumps({ - "event": "rate_limited", - "job": job_name, - "ticker": batch[0], - "processed": processed, - })) + _log_event(logging.WARNING, "rate_limited", job=job_name, ticker=batch[0], processed=processed) _runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {batch[0]}") return - logger.warning(json.dumps({ - "event": "batch_fallback", - "job": job_name, - "batch": batch, - "reason": str(exc), - })) + _log_event(logging.WARNING, "batch_fallback", job=job_name, batch=batch, reason=str(exc)) for symbol in batch: _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) @@ -593,12 +471,7 @@ async def collect_sentiment() -> None: except Exception as exc: msg = str(exc).lower() if "rate" in msg or "quota" in msg or "429" in msg: - logger.warning(json.dumps({ - "event": "rate_limited", - "job": job_name, - "ticker": symbol, - "processed": processed, - })) + _log_event(logging.WARNING, "rate_limited", job=job_name, ticker=symbol, processed=processed) _runtime_finish(job_name, "rate_limited", processed=processed, total=total, message=f"Rate limited at {symbol}") return _log_job_error(job_name, symbol, exc) @@ -620,21 +493,15 @@ async def collect_sentiment() -> None: _last_successful[job_name] = symbol processed += 1 _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) - logger.info(json.dumps({ - "event": "ticker_collected", - "job": job_name, - "ticker": symbol, - "classification": data.classification, - "confidence": data.confidence, - })) + _log_event(logging.INFO, "ticker_collected", job=job_name, ticker=symbol, classification=data.classification, confidence=data.confidence) except Exception as exc: _log_job_error(job_name, symbol, exc) _last_successful[job_name] = None - logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed})) + _log_event(logging.INFO, "job_complete", job=job_name, tickers=processed) _runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers") except Exception as exc: - logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) _runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc)) @@ -650,7 +517,7 @@ async def collect_fundamentals() -> None: successful ticker for resume. """ job_name = "fundamental_collector" - logger.info(json.dumps({"event": "job_start", "job": job_name})) + _log_event(logging.INFO, "job_start", job=job_name) _runtime_start(job_name) processed = 0 total: int | None = None @@ -658,13 +525,13 @@ async def collect_fundamentals() -> None: try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): - logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") return symbols = await _get_fundamental_priority_tickers(db) if not symbols: - logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": 0})) + _log_event(logging.INFO, "job_complete", job=job_name, tickers=0) _runtime_finish(job_name, "completed", processed=0, total=0, message="No tickers") return @@ -672,14 +539,14 @@ async def collect_fundamentals() -> None: _runtime_progress(job_name, processed=0, total=total) if not (settings.fmp_api_key or settings.finnhub_api_key or settings.alpha_vantage_api_key): - logger.warning(json.dumps({"event": "job_skipped", "job": job_name, "reason": "no fundamentals provider keys configured"})) + _log_event(logging.WARNING, "job_skipped", job=job_name, reason="no fundamentals provider keys configured") _runtime_finish(job_name, "skipped", processed=0, total=total, message="No fundamentals provider keys configured") return try: provider = build_fundamental_provider_chain() except Exception as exc: - logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) _runtime_finish(job_name, "error", processed=0, total=total, message=str(exc)) return @@ -710,11 +577,7 @@ async def collect_fundamentals() -> None: _last_successful[job_name] = symbol processed += 1 _runtime_progress(job_name, processed=processed, total=total, current_ticker=symbol) - logger.info(json.dumps({ - "event": "ticker_collected", - "job": job_name, - "ticker": symbol, - })) + _log_event(logging.INFO, "ticker_collected", job=job_name, ticker=symbol) break except Exception as exc: msg = str(exc).lower() @@ -722,15 +585,7 @@ async def collect_fundamentals() -> None: if attempt < max_retries: wait_seconds = base_backoff * (2 ** attempt) attempt += 1 - logger.warning(json.dumps({ - "event": "rate_limited_retry", - "job": job_name, - "ticker": symbol, - "attempt": attempt, - "max_retries": max_retries, - "wait_seconds": wait_seconds, - "processed": processed, - })) + _log_event(logging.WARNING, "rate_limited_retry", job=job_name, ticker=symbol, attempt=attempt, max_retries=max_retries, wait_seconds=wait_seconds, processed=processed) _runtime_progress( job_name, processed=processed, @@ -745,12 +600,7 @@ async def collect_fundamentals() -> None: # still get (e.g. FMP market cap) and move on, rather than # aborting the whole run and leaving every later ticker # untouched. - logger.warning(json.dumps({ - "event": "rate_limited_partial", - "job": job_name, - "ticker": symbol, - "processed": processed, - })) + _log_event(logging.WARNING, "rate_limited_partial", job=job_name, ticker=symbol, processed=processed) try: data = await provider.fetch_fundamentals(symbol, allow_partial=True) await _store(symbol, data) @@ -765,10 +615,10 @@ async def collect_fundamentals() -> None: await asyncio.sleep(spacing) _last_successful[job_name] = None - logger.info(json.dumps({"event": "job_complete", "job": job_name, "tickers": processed})) + _log_event(logging.INFO, "job_complete", job=job_name, tickers=processed) _runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Processed {processed} tickers") except Exception as exc: - logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) _runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc)) @@ -784,7 +634,7 @@ async def scan_rr() -> None: per-ticker error isolation internally. """ job_name = "rr_scanner" - logger.info(json.dumps({"event": "job_start", "job": job_name})) + _log_event(logging.INFO, "job_start", job=job_name) _runtime_start(job_name) processed = 0 total: int | None = None @@ -792,7 +642,7 @@ async def scan_rr() -> None: try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): - logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") return @@ -812,21 +662,12 @@ async def scan_rr() -> None: ) processed = total or 0 _runtime_finish(job_name, "completed", processed=processed, total=total, message=f"Found {len(setups)} setups") - logger.info(json.dumps({ - "event": "job_complete", - "job": job_name, - "setups_found": len(setups), - })) + _log_event(logging.INFO, "job_complete", job=job_name, setups_found=len(setups)) except Exception as exc: _runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc)) - logger.error(json.dumps({ - "event": "job_error", - "job": job_name, - "error_type": type(exc).__name__, - "message": str(exc), - })) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) except Exception as exc: - logger.error(json.dumps({"event": "job_error", "job": job_name, "error_type": type(exc).__name__, "message": str(exc)})) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) _runtime_finish(job_name, "error", processed=processed, total=total, message=str(exc)) @@ -842,13 +683,13 @@ async def evaluate_outcomes() -> None: Undecided setups stay pending and are re-checked on the next run. """ job_name = "outcome_evaluator" - logger.info(json.dumps({"event": "job_start", "job": job_name})) + _log_event(logging.INFO, "job_start", job=job_name) _runtime_start(job_name, total=1) try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): - logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") _runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled") return @@ -864,19 +705,10 @@ async def evaluate_outcomes() -> None: message=f"Evaluated {summary['evaluated']}, pending {summary['still_pending']}, " f"{closed_trades} paper trade(s) closed", ) - logger.info(json.dumps({ - "event": "job_complete", - "job": job_name, - "summary": summary, - })) + _log_event(logging.INFO, "job_complete", job=job_name, summary=summary) except Exception as exc: _runtime_finish(job_name, "error", processed=0, total=1, message=str(exc)) - logger.error(json.dumps({ - "event": "job_error", - "job": job_name, - "error_type": type(exc).__name__, - "message": str(exc), - })) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) # --------------------------------------------------------------------------- @@ -887,13 +719,13 @@ async def evaluate_outcomes() -> None: async def dispatch_alerts_job() -> None: """Push Telegram alerts for qualified setups, S/R proximity, score drops, digest.""" job_name = "alerts" - logger.info(json.dumps({"event": "job_start", "job": job_name})) + _log_event(logging.INFO, "job_start", job=job_name) _runtime_start(job_name, total=1) try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): - logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") _runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled") return @@ -904,15 +736,10 @@ async def dispatch_alerts_job() -> None: job_name, "completed", processed=1, total=1, message=f"{result.get('status')}, sent {result.get('sent', 0)}", ) - logger.info(json.dumps({"event": "job_complete", "job": job_name, "result": result})) + _log_event(logging.INFO, "job_complete", job=job_name, result=result) except Exception as exc: _runtime_finish(job_name, "error", processed=0, total=1, message=str(exc)) - logger.error(json.dumps({ - "event": "job_error", - "job": job_name, - "error_type": type(exc).__name__, - "message": str(exc), - })) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) # --------------------------------------------------------------------------- @@ -923,13 +750,13 @@ async def dispatch_alerts_job() -> None: async def compute_market_regime() -> None: """Refresh the cached benchmark (SPY) trend regime.""" job_name = "market_regime" - logger.info(json.dumps({"event": "job_start", "job": job_name})) + _log_event(logging.INFO, "job_start", job=job_name) _runtime_start(job_name, total=1) try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): - logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") _runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled") return @@ -940,15 +767,10 @@ async def compute_market_regime() -> None: job_name, "completed", processed=1, total=1, message=f"Regime: {regime.get('label')}", ) - logger.info(json.dumps({"event": "job_complete", "job": job_name, "label": regime.get("label")})) + _log_event(logging.INFO, "job_complete", job=job_name, label=regime.get("label")) except Exception as exc: _runtime_finish(job_name, "error", processed=0, total=1, message=str(exc)) - logger.error(json.dumps({ - "event": "job_error", - "job": job_name, - "error_type": type(exc).__name__, - "message": str(exc), - })) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) # --------------------------------------------------------------------------- @@ -959,7 +781,7 @@ async def compute_market_regime() -> None: async def run_backtest_job() -> None: """Replay the price-derived engine over history and cache the report.""" job_name = "backtest" - logger.info(json.dumps({"event": "job_start", "job": job_name})) + _log_event(logging.INFO, "job_start", job=job_name) _runtime_start(job_name) def _on_progress(done: int, count: int, symbol: str) -> None: @@ -968,7 +790,7 @@ async def run_backtest_job() -> None: try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): - logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") return @@ -979,15 +801,10 @@ async def run_backtest_job() -> None: processed=report.get("tickers", 0), total=report.get("tickers", 0), message=f"{report.get('candidates', 0)} setups, {report.get('qualified', 0)} qualified", ) - logger.info(json.dumps({"event": "job_complete", "job": job_name, "candidates": report.get("candidates")})) + _log_event(logging.INFO, "job_complete", job=job_name, candidates=report.get("candidates")) except Exception as exc: _runtime_finish(job_name, "error", processed=0, total=None, message=str(exc)) - logger.error(json.dumps({ - "event": "job_error", - "job": job_name, - "error_type": type(exc).__name__, - "message": str(exc), - })) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) # --------------------------------------------------------------------------- @@ -1001,40 +818,26 @@ async def sync_ticker_universe() -> None: Setting key: ticker_universe_default (sp500 | nasdaq100 | nasdaq_all) """ job_name = "ticker_universe_sync" - logger.info(json.dumps({"event": "job_start", "job": job_name})) + _log_event(logging.INFO, "job_start", job=job_name) _runtime_start(job_name, total=1) try: async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): - logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") _runtime_finish(job_name, "skipped", processed=0, total=1, message="Disabled") return - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == "ticker_universe_default") - ) - setting = result.scalar_one_or_none() - universe = (setting.value if setting else "sp500").strip().lower() + universe = (await settings_store.get_value(db, "ticker_universe_default", "sp500")).strip().lower() async with async_session_factory() as db: summary = await bootstrap_universe(db, universe, prune_missing=False) _runtime_progress(job_name, processed=1, total=1) _runtime_finish(job_name, "completed", processed=1, total=1, message=f"Synced {universe}") - logger.info(json.dumps({ - "event": "job_complete", - "job": job_name, - "universe": universe, - "summary": summary, - })) + _log_event(logging.INFO, "job_complete", job=job_name, universe=universe, summary=summary) except Exception as exc: _runtime_finish(job_name, "error", processed=0, total=1, message=str(exc)) - logger.error(json.dumps({ - "event": "job_error", - "job": job_name, - "error_type": type(exc).__name__, - "message": str(exc), - })) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) # --------------------------------------------------------------------------- @@ -1070,10 +873,10 @@ async def _run_pipeline(job_name: str, steps: list[tuple[str, str]]) -> None: Each step respects its own enable flag and manages its own runtime status; a failing step is logged and the pipeline continues with the next one. """ - logger.info(json.dumps({"event": "job_start", "job": job_name})) + _log_event(logging.INFO, "job_start", job=job_name) async with async_session_factory() as db: if not await _is_job_enabled(db, job_name): - logger.info(json.dumps({"event": "job_skipped", "job": job_name, "reason": "disabled"})) + _log_event(logging.INFO, "job_skipped", job=job_name, reason="disabled") _runtime_finish(job_name, "skipped", processed=0, total=0, message="Disabled") return @@ -1091,13 +894,10 @@ async def _run_pipeline(job_name: str, steps: list[tuple[str, str]]) -> None: logger.exception("%s step %s failed", job_name, step_name) done += 1 _runtime_finish(job_name, "completed", processed=done, total=total, message="Pipeline complete") - logger.info(json.dumps({"event": "job_complete", "job": job_name})) + _log_event(logging.INFO, "job_complete", job=job_name) except Exception as exc: _runtime_finish(job_name, "error", processed=done, total=total, message=str(exc)) - logger.error(json.dumps({ - "event": "job_error", "job": job_name, - "error_type": type(exc).__name__, "message": str(exc), - })) + _log_event(logging.ERROR, "job_error", job=job_name, error_type=type(exc).__name__, message=str(exc)) async def run_daily_pipeline() -> None: @@ -1162,19 +962,13 @@ def _cron_trigger(expr: str, timezone: str, fallback_key: str) -> CronTrigger: try: return CronTrigger.from_crontab(expr.strip(), timezone=timezone.strip()) except Exception: - logger.warning(json.dumps({ - "event": "invalid_cron", "expr": expr, "timezone": timezone, - "fallback": SCHEDULE_DEFAULTS[fallback_key], - })) + _log_event(logging.WARNING, "invalid_cron", expr=expr, timezone=timezone, fallback=SCHEDULE_DEFAULTS[fallback_key]) return CronTrigger.from_crontab(SCHEDULE_DEFAULTS[fallback_key], timezone="UTC") async def load_schedule_config(db: AsyncSession) -> dict[str, str]: """Read the cron schedule config from SystemSettings, defaults for any unset.""" - result = await db.execute( - select(SystemSetting).where(SystemSetting.key.in_(list(SCHEDULE_DEFAULTS))) - ) - stored = {s.key: s.value for s in result.scalars().all()} + stored = await settings_store.get_map(db, SCHEDULE_DEFAULTS) return {key: (stored.get(key) or default) for key, default in SCHEDULE_DEFAULTS.items()} @@ -1188,7 +982,7 @@ def reschedule_jobs(schedule_config: dict[str, str]) -> dict[str, str]: expr = schedule_config.get(key) or SCHEDULE_DEFAULTS[key] scheduler.reschedule_job(job_id, trigger=_cron_trigger(expr, tz, key)) applied[job_id] = expr - logger.info(json.dumps({"event": "jobs_rescheduled", "applied": applied, "timezone": tz})) + _log_event(logging.INFO, "jobs_rescheduled", applied=applied, timezone=tz) return applied @@ -1267,19 +1061,10 @@ def configure_scheduler(schedule_config: dict[str, str] | None = None) -> None: replace_existing=True, next_run_time=None, ) - logger.info( - json.dumps({ - "event": "scheduler_configured", - "timezone": tz, - "daily_pipeline": { + _log_event(logging.INFO, "scheduler_configured", timezone=tz, daily_pipeline={ "cron": cfg["schedule_daily_pipeline_cron"], "steps": [name for name, _ in _DAILY_PIPELINE_STEPS], - }, - "intraday_pipeline": { + }, intraday_pipeline={ "cron": cfg["schedule_intraday_pipeline_cron"], "steps": [name for name, _ in _INTRADAY_PIPELINE_STEPS], - }, - "fundamental_collector": {"cron": cfg["schedule_fundamentals_cron"]}, - "independent": ["ticker_universe_sync", "alerts", "backtest"], - }) - ) + }, fundamental_collector={"cron": cfg["schedule_fundamentals_cron"]}, independent=["ticker_universe_sync", "alerts", "backtest"]) diff --git a/app/services/admin_service.py b/app/services/admin_service.py index 8af6b5c..8185d68 100644 --- a/app/services/admin_service.py +++ b/app/services/admin_service.py @@ -17,6 +17,7 @@ from app.models.settings import SystemSetting from app.models.ticker import Ticker from app.models.trade_setup import TradeSetup from app.models.user import User +from app.services import settings_store logger = logging.getLogger(__name__) @@ -126,18 +127,7 @@ async def reset_password(db: AsyncSession, user_id: int, new_password: str) -> U async def toggle_registration(db: AsyncSession, enabled: bool) -> SystemSetting: """Enable or disable user registration via SystemSetting.""" - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == "registration_enabled") - ) - setting = result.scalar_one_or_none() - value = str(enabled).lower() - - if setting is None: - setting = SystemSetting(key="registration_enabled", value=value) - db.add(setting) - else: - setting.value = value - + setting = await settings_store.upsert_setting(db, "registration_enabled", str(enabled).lower()) await db.commit() await db.refresh(setting) return setting @@ -155,17 +145,7 @@ async def list_settings(db: AsyncSession) -> list[SystemSetting]: async def update_setting(db: AsyncSession, key: str, value: str) -> SystemSetting: """Create or update a system setting.""" - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == key) - ) - setting = result.scalar_one_or_none() - - if setting is None: - setting = SystemSetting(key=key, value=value) - db.add(setting) - else: - setting.value = value - + setting = await settings_store.upsert_setting(db, key, value) await db.commit() await db.refresh(setting) return setting @@ -309,10 +289,7 @@ async def update_recommendation_config( async def get_ticker_universe_default(db: AsyncSession) -> dict[str, str]: - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == "ticker_universe_default") - ) - setting = result.scalar_one_or_none() + setting = await settings_store.get_setting(db, "ticker_universe_default") universe = setting.value if setting else DEFAULT_TICKER_UNIVERSE if universe not in SUPPORTED_TICKER_UNIVERSES: universe = DEFAULT_TICKER_UNIVERSE @@ -579,11 +556,7 @@ async def list_jobs(db: AsyncSession) -> list[dict]: jobs_out = [] for name in sorted(VALID_JOB_NAMES): # Check enabled setting - key = f"job_{name}_enabled" - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == key) - ) - setting = result.scalar_one_or_none() + setting = await settings_store.get_setting(db, f"job_{name}_enabled") enabled = setting.value == "true" if setting else True # default enabled # Get scheduler job info diff --git a/app/services/alert_service.py b/app/services/alert_service.py index 54c411e..9b06e60 100644 --- a/app/services/alert_service.py +++ b/app/services/alert_service.py @@ -27,10 +27,10 @@ from app.config import settings from app.models.alert import AlertLog from app.models.ohlcv import OHLCVRecord from app.models.score import CompositeScore -from app.models.settings import SystemSetting from app.models.sr_level import SRLevel from app.models.ticker import Ticker from app.models.watchlist import WatchlistEntry +from app.services import settings_store from app.services.admin_service import get_activation_config, update_setting from app.services.qualification import best_target_probability, setup_qualifies from app.services.rr_scanner_service import get_trade_setups @@ -72,14 +72,9 @@ def _as_bool(value: str | None, default: bool) -> bool: return value.strip().lower() == "true" -async def _settings_map(db: AsyncSession) -> dict[str, str]: - keys = [KEY_ENABLED, KEY_TOKEN, KEY_CHAT_ID, KEY_QUALIFIED, KEY_SR, KEY_SCORE_DROP, KEY_DIGEST] - result = await db.execute(select(SystemSetting).where(SystemSetting.key.in_(keys))) - return {s.key: s.value for s in result.scalars().all()} - - async def _resolve(db: AsyncSession) -> dict: - stored = await _settings_map(db) + keys = [KEY_ENABLED, KEY_TOKEN, KEY_CHAT_ID, KEY_QUALIFIED, KEY_SR, KEY_SCORE_DROP, KEY_DIGEST] + stored = await settings_store.get_map(db, keys) db_token = (stored.get(KEY_TOKEN) or "").strip() if db_token: diff --git a/app/services/auth_service.py b/app/services/auth_service.py index d0b0090..d55b51b 100644 --- a/app/services/auth_service.py +++ b/app/services/auth_service.py @@ -10,8 +10,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.dependencies import JWT_ALGORITHM from app.exceptions import AuthenticationError, AuthorizationError, DuplicateError -from app.models.settings import SystemSetting from app.models.user import User +from app.services import settings_store async def register(db: AsyncSession, username: str, password: str) -> User: @@ -21,10 +21,7 @@ async def register(db: AsyncSession, username: str, password: str) -> User: and creates a user with role='user' and has_access=False. """ # Check registration toggle - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == "registration_enabled") - ) - setting = result.scalar_one_or_none() + setting = await settings_store.get_setting(db, "registration_enabled") if setting is not None and setting.value.lower() == "false": raise AuthorizationError("Registration is closed") diff --git a/app/services/backtest_service.py b/app/services/backtest_service.py index 89251e3..07fd2bc 100644 --- a/app/services/backtest_service.py +++ b/app/services/backtest_service.py @@ -31,8 +31,8 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings -from app.models.settings import SystemSetting from app.models.ticker import Ticker +from app.services import settings_store from app.services.admin_service import get_activation_config, update_setting from app.services.indicator_service import _extract_ohlcv, compute_atr from app.services.outcome_service import ( @@ -741,8 +741,7 @@ async def run_and_store( async def get_backtest_report(db: AsyncSession) -> dict | None: """Return the last cached backtest report, or None if never run.""" - result = await db.execute(select(SystemSetting).where(SystemSetting.key == KEY_REPORT)) - setting = result.scalar_one_or_none() + setting = await settings_store.get_setting(db, KEY_REPORT) if setting is None: return None try: diff --git a/app/services/market_regime_service.py b/app/services/market_regime_service.py index 88122b7..271e71b 100644 --- a/app/services/market_regime_service.py +++ b/app/services/market_regime_service.py @@ -18,9 +18,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.providers.alpaca import AlpacaOHLCVProvider +from app.services import settings_store from app.services.admin_service import update_setting -from app.models.settings import SystemSetting -from sqlalchemy import select logger = logging.getLogger(__name__) @@ -105,8 +104,7 @@ async def update_market_regime(db: AsyncSession) -> dict: async def get_market_regime(db: AsyncSession) -> dict: """Return the cached regime (computed by the daily job).""" - result = await db.execute(select(SystemSetting).where(SystemSetting.key == KEY_REGIME)) - setting = result.scalar_one_or_none() + setting = await settings_store.get_setting(db, KEY_REGIME) if setting is None: return {"label": "unknown", "benchmark": BENCHMARK, "reason": "not computed yet"} try: diff --git a/app/services/scoring_service.py b/app/services/scoring_service.py index 507176d..a238fa6 100644 --- a/app/services/scoring_service.py +++ b/app/services/scoring_service.py @@ -10,6 +10,7 @@ from __future__ import annotations import json import logging +from collections import defaultdict from datetime import datetime, timezone from sqlalchemy import select @@ -17,8 +18,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.exceptions import NotFoundError, ValidationError from app.models.score import CompositeScore, DimensionScore -from app.models.settings import SystemSetting from app.models.ticker import Ticker +from app.services import settings_store logger = logging.getLogger(__name__) @@ -50,10 +51,7 @@ async def _get_ticker(db: AsyncSession, symbol: str) -> Ticker: async def _get_weights(db: AsyncSession) -> dict[str, float]: """Load scoring weights from SystemSetting, falling back to defaults.""" - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == SCORING_WEIGHTS_KEY) - ) - setting = result.scalar_one_or_none() + setting = await settings_store.get_setting(db, SCORING_WEIGHTS_KEY) if setting is not None: try: return json.loads(setting.value) @@ -64,21 +62,7 @@ async def _get_weights(db: AsyncSession) -> dict[str, float]: async def _save_weights(db: AsyncSession, weights: dict[str, float]) -> None: """Persist scoring weights to SystemSetting.""" - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == SCORING_WEIGHTS_KEY) - ) - setting = result.scalar_one_or_none() - now = datetime.now(timezone.utc) - if setting is not None: - setting.value = json.dumps(weights) - setting.updated_at = now - else: - setting = SystemSetting( - key=SCORING_WEIGHTS_KEY, - value=json.dumps(weights), - updated_at=now, - ) - db.add(setting) + await settings_store.upsert_setting(db, SCORING_WEIGHTS_KEY, json.dumps(weights)) # --------------------------------------------------------------------------- @@ -875,73 +859,62 @@ async def get_rankings(db: AsyncSession) -> dict: Returns dict suitable for RankingResponse. """ weights = await _get_weights(db) + tickers = (await db.execute(select(Ticker).order_by(Ticker.symbol))).scalars().all() - # Get all tickers - result = await db.execute(select(Ticker).order_by(Ticker.symbol)) - tickers = list(result.scalars().all()) - - rankings: list[dict] = [] - for ticker in tickers: - # Get composite score - comp_result = await db.execute( - select(CompositeScore).where(CompositeScore.ticker_id == ticker.id) + async def _load_scores() -> tuple[dict[int, CompositeScore], dict[int, dict[str, DimensionScore]]]: + comps = { + c.ticker_id: c + for c in (await db.execute(select(CompositeScore))).scalars().all() + } + dims: dict[int, dict[str, DimensionScore]] = defaultdict(dict) + rows = await db.execute( + select(DimensionScore).order_by(DimensionScore.ticker_id, DimensionScore.id) ) - comp = comp_result.scalar_one_or_none() + for ds in rows.scalars().all(): + dims[ds.ticker_id][ds.dimension] = ds + return comps, dims - # If no composite or stale, recompute + # Two bulk reads instead of ~4 queries per ticker. + comps, dims_by_ticker = await _load_scores() + + # Lazily recompute any stale/missing scores (kept fresh by the daily scan; + # this self-heals tickers that aged out between scans), committing once. + recomputed = False + for ticker in tickers: + comp = comps.get(ticker.id) if comp is None or comp.is_stale: - # Recompute stale dimensions first - dim_result = await db.execute( - select(DimensionScore).where( - DimensionScore.ticker_id == ticker.id - ) - ) - dim_scores = {ds.dimension: ds for ds in dim_result.scalars().all()} + dim_scores = dims_by_ticker.get(ticker.id, {}) for dim in DIMENSIONS: ds = dim_scores.get(dim) if ds is None or ds.is_stale: await compute_dimension_score(db, ticker.symbol, dim) - await compute_composite_score(db, ticker.symbol, weights) + recomputed = True + if recomputed: await db.commit() + comps, dims_by_ticker = await _load_scores() - # Re-fetch - comp_result = await db.execute( - select(CompositeScore).where(CompositeScore.ticker_id == ticker.id) - ) - comp = comp_result.scalar_one_or_none() - if comp is None: - continue - - dim_result = await db.execute( - select(DimensionScore).where( - DimensionScore.ticker_id == ticker.id - ) - ) - dims = [ - { - "dimension": ds.dimension, - "score": ds.score, - "is_stale": ds.is_stale, - "computed_at": ds.computed_at, - } - for ds in dim_result.scalars().all() - ] - - rankings.append({ + rankings = [ + { "symbol": ticker.symbol, "composite_score": comp.score, - "dimensions": dims, - }) + "dimensions": [ + { + "dimension": ds.dimension, + "score": ds.score, + "is_stale": ds.is_stale, + "computed_at": ds.computed_at, + } + for ds in dims_by_ticker.get(ticker.id, {}).values() + ], + } + for ticker in tickers + if (comp := comps.get(ticker.id)) is not None + ] - # Sort by composite score descending rankings.sort(key=lambda r: r["composite_score"], reverse=True) - - return { - "rankings": rankings, - "weights": weights, - } + return {"rankings": rankings, "weights": weights} async def update_weights( diff --git a/app/services/sentiment_provider_service.py b/app/services/sentiment_provider_service.py index a8181b6..25e8d38 100644 --- a/app/services/sentiment_provider_service.py +++ b/app/services/sentiment_provider_service.py @@ -12,12 +12,11 @@ from __future__ import annotations import logging -from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.exceptions import ProviderError, ValidationError -from app.models.settings import SystemSetting +from app.services import settings_store from app.services.admin_service import update_setting logger = logging.getLogger(__name__) @@ -53,15 +52,6 @@ KEY_API_KEY = "sentiment_api_key" KEY_BASE_URL = "sentiment_base_url" -async def _get_settings_map(db: AsyncSession) -> dict[str, str]: - result = await db.execute( - select(SystemSetting).where( - SystemSetting.key.in_([KEY_PROVIDER, KEY_MODEL, KEY_API_KEY, KEY_BASE_URL]) - ) - ) - return {s.key: s.value for s in result.scalars().all()} - - def _env_key_for(provider: str) -> str: if provider == "openai": return settings.openai_api_key or "" @@ -90,7 +80,7 @@ def _base_url_for(provider: str, stored_base_url: str) -> str: async def _resolve(db: AsyncSession) -> dict: """Resolve effective config from DB > env > default.""" - stored = await _get_settings_map(db) + stored = await settings_store.get_map(db, [KEY_PROVIDER, KEY_MODEL, KEY_API_KEY, KEY_BASE_URL]) provider = (stored.get(KEY_PROVIDER) or "").strip().lower() if provider not in VALID_PROVIDERS: diff --git a/app/services/settings_store.py b/app/services/settings_store.py new file mode 100644 index 0000000..97411c4 --- /dev/null +++ b/app/services/settings_store.py @@ -0,0 +1,46 @@ +"""Single source for SystemSetting reads/writes. + +Services used to hand-roll ``select(SystemSetting).where(key == ...)`` + +``scalar_one_or_none`` (plus a near-identical get-or-create upsert) in a dozen +places. These helpers centralise that. ``upsert_setting`` never commits — the +caller owns the transaction. ``updated_at`` is managed by the model's +``onupdate`` hook, so callers don't set it. +""" + +from __future__ import annotations + +from collections.abc import Iterable + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.settings import SystemSetting + + +async def get_setting(db: AsyncSession, key: str) -> SystemSetting | None: + """Return the SystemSetting row for ``key``, or None if unset.""" + result = await db.execute(select(SystemSetting).where(SystemSetting.key == key)) + return result.scalar_one_or_none() + + +async def get_value(db: AsyncSession, key: str, default: str | None = None) -> str | None: + """Return the stored value for ``key``, or ``default`` if unset.""" + setting = await get_setting(db, key) + return setting.value if setting is not None else default + + +async def get_map(db: AsyncSession, keys: Iterable[str]) -> dict[str, str]: + """Return a {key: value} map for the given keys that exist.""" + result = await db.execute(select(SystemSetting).where(SystemSetting.key.in_(list(keys)))) + return {s.key: s.value for s in result.scalars().all()} + + +async def upsert_setting(db: AsyncSession, key: str, value: str) -> SystemSetting: + """Create or update a setting. Does NOT commit; caller controls the transaction.""" + setting = await get_setting(db, key) + if setting is None: + setting = SystemSetting(key=key, value=value) + db.add(setting) + else: + setting.value = value + return setting diff --git a/app/services/ticker_universe_service.py b/app/services/ticker_universe_service.py index 702c87b..f8ef731 100644 --- a/app/services/ticker_universe_service.py +++ b/app/services/ticker_universe_service.py @@ -20,8 +20,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.exceptions import ProviderError, ValidationError -from app.models.settings import SystemSetting from app.models.ticker import Ticker +from app.services import settings_store logger = logging.getLogger(__name__) @@ -268,8 +268,7 @@ async def _fetch_universe_symbols_from_public(universe: str) -> tuple[list[str], async def _read_cached_symbols(db: AsyncSession, universe: str) -> list[str]: key = f"ticker_universe_cache_{universe}" - result = await db.execute(select(SystemSetting).where(SystemSetting.key == key)) - setting = result.scalar_one_or_none() + setting = await settings_store.get_setting(db, key) if setting is None: return [] @@ -304,15 +303,7 @@ async def _write_cached_symbols( "updated_at": datetime.now(timezone.utc).isoformat(), } - result = await db.execute(select(SystemSetting).where(SystemSetting.key == key)) - setting = result.scalar_one_or_none() - value = json.dumps(payload) - - if setting is None: - db.add(SystemSetting(key=key, value=value)) - else: - setting.value = value - + await settings_store.upsert_setting(db, key, json.dumps(payload)) await db.commit() diff --git a/tests/unit/test_scoring_service_rankings.py b/tests/unit/test_scoring_service_rankings.py new file mode 100644 index 0000000..1d78137 --- /dev/null +++ b/tests/unit/test_scoring_service_rankings.py @@ -0,0 +1,129 @@ +"""Unit tests for get_rankings: bulk-load fast path, sorting, exclusion, and +lazy recompute of stale scores.""" + +from __future__ import annotations + +from datetime import datetime, timezone +from unittest.mock import AsyncMock, patch + +import pytest +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from app.database import Base +from app.models.score import CompositeScore, DimensionScore +from app.models.ticker import Ticker +from app.services.scoring_service import get_rankings + +TEST_DATABASE_URL = "sqlite+aiosqlite://" + + +@pytest.fixture +async def fresh_db(): + """Non-transactional session so get_rankings can commit recomputes.""" + engine = create_async_engine(TEST_DATABASE_URL, echo=False) + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + yield session + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await engine.dispose() + + +async def _seed_ticker(session: AsyncSession, symbol: str) -> Ticker: + ticker = Ticker(symbol=symbol) + session.add(ticker) + await session.commit() + await session.refresh(ticker) + return ticker + + +def _composite(ticker_id: int, score: float, *, stale: bool = False) -> CompositeScore: + return CompositeScore( + ticker_id=ticker_id, score=score, is_stale=stale, + weights_json="{}", computed_at=datetime.now(timezone.utc), + ) + + +def _dimension(ticker_id: int, dimension: str, score: float) -> DimensionScore: + return DimensionScore( + ticker_id=ticker_id, dimension=dimension, score=score, + is_stale=False, computed_at=datetime.now(timezone.utc), + ) + + +@pytest.mark.asyncio +async def test_fast_path_sorts_and_does_not_recompute(fresh_db: AsyncSession): + """All composites fresh: result is sorted desc and no recompute is triggered + (the common steady-state path after the daily scan).""" + low = await _seed_ticker(fresh_db, "LOW") + high = await _seed_ticker(fresh_db, "HIGH") + + fresh_db.add_all([ + _composite(low.id, 40.0), + _composite(high.id, 90.0), + _dimension(high.id, "technical", 88.0), + _dimension(low.id, "technical", 42.0), + ]) + await fresh_db.commit() + + # If the fast path tries to recompute, these blow up. + with patch("app.services.scoring_service.compute_dimension_score", + new=AsyncMock(side_effect=AssertionError("should not recompute"))), \ + patch("app.services.scoring_service.compute_composite_score", + new=AsyncMock(side_effect=AssertionError("should not recompute"))): + result = await get_rankings(fresh_db) + + symbols = [r["symbol"] for r in result["rankings"]] + assert symbols == ["HIGH", "LOW"] # sorted desc + assert result["rankings"][0]["composite_score"] == 90.0 + assert result["rankings"][0]["dimensions"][0]["dimension"] == "technical" + + +@pytest.mark.asyncio +async def test_ticker_without_computable_composite_is_excluded(fresh_db: AsyncSession): + """A ticker whose composite can't be computed (recompute yields no row) is + omitted from the rankings rather than appearing with a null score.""" + fresh = await _seed_ticker(fresh_db, "OK") + await _seed_ticker(fresh_db, "NONE") # no composite; recompute can't make one + fresh_db.add_all([_composite(fresh.id, 50.0), _dimension(fresh.id, "technical", 50.0)]) + await fresh_db.commit() + + # Recompute is a no-op that produces no composite row for NONE. + with patch("app.services.scoring_service.compute_dimension_score", + new=AsyncMock(return_value=None)), \ + patch("app.services.scoring_service.compute_composite_score", + new=AsyncMock(return_value=(None, ["technical"]))): + result = await get_rankings(fresh_db) + + assert [r["symbol"] for r in result["rankings"]] == ["OK"] + + +@pytest.mark.asyncio +async def test_stale_composite_is_recomputed(fresh_db: AsyncSession): + """A stale composite triggers a recompute and then appears in the rankings.""" + ticker = await _seed_ticker(fresh_db, "STALE") + fresh_db.add(_composite(ticker.id, 10.0, stale=True)) + await fresh_db.commit() + + async def _fake_recompute(db, symbol, weights=None): + # Mirror the real upsert: refresh the existing row in place. + existing = (await db.execute( + select(CompositeScore).where(CompositeScore.ticker_id == ticker.id) + )).scalar_one() + existing.score = 77.0 + existing.is_stale = False + return 77.0, [] + + # Dimension recompute is a no-op; composite recompute refreshes the score. + with patch("app.services.scoring_service.compute_dimension_score", + new=AsyncMock(return_value=55.0)), \ + patch("app.services.scoring_service.compute_composite_score", + new=AsyncMock(side_effect=_fake_recompute)) as comp_mock: + result = await get_rankings(fresh_db) + + comp_mock.assert_awaited() # recompute path was taken + assert [r["symbol"] for r in result["rankings"]] == ["STALE"] + assert result["rankings"][0]["composite_score"] == 77.0 # reflects the recompute