Compare commits

..

7 Commits

Author SHA1 Message Date
dennisthiessen 66ef0564c1 Add local backtest snapshot runner
Deploy / lint (push) Successful in 8s
Deploy / test (push) Successful in 1m22s
Deploy / deploy (push) Successful in 43s
2026-07-03 18:35:07 +02:00
dennisthiessen 14327ab25a Require aligned action for qualified setups
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 1m7s
Deploy / deploy (push) Successful in 39s
2026-07-03 16:13:27 +02:00
dennisthiessen eaad935a2a Bundle signal alert notifications
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 1m4s
Deploy / deploy (push) Successful in 35s
2026-07-03 13:32:59 +02:00
dennisthiessen d4ccea2d69 Normalize persisted test timestamps
Deploy / lint (push) Successful in 9s
Deploy / test (push) Successful in 1m12s
Deploy / deploy (push) Successful in 37s
2026-07-03 13:01:45 +02:00
dennisthiessen 8c36cfcef1 Make live signal reads non-mutating
Deploy / lint (push) Successful in 6s
Deploy / test (push) Failing after 48s
Deploy / deploy (push) Has been skipped
2026-07-03 10:09:46 +02:00
dennisthiessen ac51e23949 Serve live recommendation context on trade setup APIs and alerts
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 1m0s
Deploy / deploy (push) Successful in 32s
Stored TradeSetup rows are point-in-time snapshots from the RR scan, so
the ticker page could show stale confidence/reasoning/composite (e.g.
sentiment=neutral in the setup card while the sentiment panel showed
bullish). Overlay current score/sentiment context onto the API payload
for GET /trades and GET /trades/{symbol}, gate and format Telegram
qualified-setup alerts on the same live values, and apply the
min_confidence/recommended_action filters after the overlay so they
judge what the caller actually sees. Stored setups stay frozen for
outcome analysis and backtests.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 09:17:27 +02:00
dennisthiessen 2b0068ae08 Add volume pane to ticker chart
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 58s
Deploy / deploy (push) Successful in 32s
2026-07-03 08:09:27 +02:00
26 changed files with 1508 additions and 367 deletions
+4
View File
@@ -33,3 +33,7 @@ alembic/versions/__pycache__/
# Generated SSL bundle
combined-ca-bundle.pem
# Local research artifacts
backtest_snapshots/
reports/backtest-*.json
+44
View File
@@ -247,6 +247,50 @@ cd frontend
npm run build
```
### Local Backtest Snapshots
For research loops, run the production backtest locally from a SQLite snapshot
instead of deploying and clicking the Admin job. The snapshot contains only the
tables needed by `run_backtest`: tickers, OHLCV bars, SPY benchmark closes, and
activation/recommendation settings. Secrets and cached reports are not copied.
1. Open an SSH tunnel to the production Postgres instance:
```bash
ssh -N -L 15432:127.0.0.1:5432 deploy@your-server
```
2. In another terminal, create or refresh the snapshot:
```bash
# macOS/Linux
python scripts/create_backtest_snapshot.py \
--database-url "postgresql+asyncpg://stock_backend:PASSWORD@127.0.0.1:15432/stock_data_backend" \
--output backtest_snapshots/prod.sqlite \
--force
# Windows PowerShell
.venv\Scripts\python.exe scripts\create_backtest_snapshot.py `
--database-url "postgresql+asyncpg://stock_backend:PASSWORD@127.0.0.1:15432/stock_data_backend" `
--output backtest_snapshots\prod.sqlite `
--force
```
3. Run the backtest fully offline from the snapshot:
```bash
# macOS/Linux
python scripts/run_backtest_snapshot.py backtest_snapshots/prod.sqlite --workers 8
# Windows PowerShell
.venv\Scripts\python.exe scripts\run_backtest_snapshot.py backtest_snapshots\prod.sqlite --workers 12 --allow-spawn
```
The runner writes `reports/backtest-<timestamp>.json` and prints the headline
metrics. Keep the SSH tunnel open only while creating the snapshot; the backtest
run itself is local/offline. `backtest_snapshots/` and generated backtest reports
are git-ignored.
## Environment Variables
Configure in `.env` (copy from `.env.example`):
+2 -1
View File
@@ -54,7 +54,7 @@ async def read_score(
_user=Depends(require_access),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
"""Get composite + dimension scores for a symbol. Recomputes stale scores."""
"""Get the latest persisted composite + dimension scores for a symbol."""
result = await get_score(db, symbol)
data = ScoreResponse(
@@ -94,6 +94,7 @@ async def read_rankings(
RankingEntry(
symbol=r["symbol"],
composite_score=r["composite_score"],
composite_stale=r.get("composite_stale", False),
dimensions=[
DimensionScoreResponse(**d) for d in r["dimensions"]
],
+6 -1
View File
@@ -34,6 +34,7 @@ async def list_trade_setups(
direction=direction,
min_confidence=min_confidence,
recommended_action=recommended_action,
live_recommendation=True,
)
data = []
@@ -92,7 +93,11 @@ async def get_ticker_trade_setups(
_user=Depends(require_access),
db: AsyncSession = Depends(get_db),
) -> APIEnvelope:
rows = await get_trade_setups(db, symbol=symbol)
rows = await get_trade_setups(
db,
symbol=symbol,
live_recommendation=True,
)
data = []
for row in rows:
summary = RecommendationSummaryResponse(
+1
View File
@@ -78,6 +78,7 @@ class RankingEntry(BaseModel):
symbol: str
composite_score: float
composite_stale: bool = False
dimensions: list[DimensionScoreResponse] = []
+9
View File
@@ -26,6 +26,14 @@ class RecommendationSummaryResponse(BaseModel):
composite_score: float
class TradeSetupContextAsOfResponse(BaseModel):
setup_detected_at: datetime
score_computed_at: datetime | None = None
sentiment_at: datetime | None = None
price_date: date | None = None
price_updated_at: datetime | None = None
class TradeSetupResponse(BaseModel):
"""A single trade setup detected by the R:R scanner."""
@@ -49,4 +57,5 @@ class TradeSetupResponse(BaseModel):
evaluated_at: datetime | None = None
current_price: float | None = None
momentum_percentile: float | None = None
context_as_of: TradeSetupContextAsOfResponse | None = None
recommendation_summary: RecommendationSummaryResponse | None = None
+123 -22
View File
@@ -75,6 +75,13 @@ COOLDOWN_HOURS = 72 # don't re-send the same key within this window
DIGEST_HOUR_UTC = 22 # send the daily digest on the first run at/after this hour
WATERMARK_TYPE = "score_watermark"
SIGNAL_BUNDLE_ALERT_TYPES = ("qualified", "sr_proximity", "score_drop")
SIGNAL_BUNDLE_SECTIONS = (
("qualified", "Qualified setups"),
("sr_proximity", "Near support/resistance"),
("score_drop", "Score drops"),
)
SIGNAL_BUNDLE_MAX_CHARS = 3900 # Telegram limit is 4096; keep room for HTML parsing
# Regime quadrant-change alert: (regime index x early-warning) quadrant.
# Hysteresis (a deadband around each divider) stops a point sitting on a boundary
@@ -91,6 +98,9 @@ QUAD_LABELS = {
"4": "④ Real downturn",
}
AlertItem = tuple[str, str, str] # alert_type, dedup_key, text
AlertLogRef = tuple[str, str] # alert_type, dedup_key
def _as_bool(value: str | None, default: bool) -> bool:
if value is None:
@@ -254,19 +264,37 @@ async def _watchlist_tickers(db: AsyncSession) -> list[tuple[int, str]]:
async def _qualified_setups(db: AsyncSession) -> list[dict]:
setups = await get_trade_setups(db)
# live_recommendation: gate and format on current score/sentiment context,
# not the values frozen into the setup at scan time.
setups = await get_trade_setups(db, live_recommendation=True)
config = await get_activation_config(db)
return [s for s in setups if setup_qualifies(SimpleNamespace(**s), config)]
def _fmt_price(value: float | int | None) -> str:
return "n/a" if value is None else f"{float(value):.2f}"
def _fmt_signed_move(from_price: float | int | None, to_price: float | int | None) -> str:
if from_price is None or to_price is None:
return "n/a"
from_float = float(from_price)
if from_float == 0:
return "n/a"
pct = (float(to_price) - from_float) / from_float * 100.0
return f"{pct:+.1f}%"
def _format_qualified(s: dict) -> str:
prob = best_target_probability(SimpleNamespace(**s))
arrow = "🟢" if s["direction"] == "long" else "🔴"
current = s.get("current_price") or s.get("entry_price")
return (
f"{arrow} <b>{s['symbol']} {s['direction'].upper()}</b> — qualified setup\n"
f"entry {s['entry_price']:.2f} → target {s['target']:.2f} "
f"(R:R {s['rr_ratio']:.1f}:1)\n"
f"confidence {(s.get('confidence_score') or 0):.0f}% · P(target) {prob:.0f}%"
f"{arrow} <b>{s['symbol']} {s['direction'].upper()}</b> | "
f"now {_fmt_price(current)} | entry {_fmt_price(s['entry_price'])} | "
f"target {_fmt_price(s['target'])} ({_fmt_signed_move(current, s['target'])}) | "
f"stop {_fmt_price(s['stop_loss'])} | R:R {s['rr_ratio']:.1f} | "
f"conf {(s.get('confidence_score') or 0):.0f}% | P(target) {prob:.0f}%"
)
@@ -289,6 +317,34 @@ async def _latest_close(db: AsyncSession, ticker_id: int) -> float | None:
return float(row[0]) if row else None
def _sr_zone_label(zone: dict) -> str:
return (
f"{zone['low']:.2f}{zone['high']:.2f}"
if zone["level_count"] > 1
else f"{zone['midpoint']:.2f}"
)
def _sr_touch_price(zone: dict, current_price: float) -> float:
low = float(zone["low"])
high = float(zone["high"])
if current_price < low:
return low
if current_price > high:
return high
return float(zone["midpoint"])
def _format_sr_proximity(symbol: str, zone: dict, current_price: float) -> str:
touch_price = _sr_touch_price(zone, current_price)
return (
f"📍 <b>{symbol}</b> {zone['type']} | "
f"now {_fmt_price(current_price)} -> {_sr_zone_label(zone)} "
f"({_fmt_signed_move(current_price, touch_price)}) | "
f"strength {float(zone['strength']):.0f}"
)
async def _collect_sr_proximity(db: AsyncSession) -> list[tuple[str, str]]:
"""One alert per watchlist ticker for the NEAREST strong S/R zone within range.
@@ -318,21 +374,12 @@ async def _collect_sr_proximity(db: AsyncSession) -> list[tuple[str, str]]:
# Nearest strong zone only.
nearest = min(strong, key=lambda z: abs(price - z["midpoint"]))
dist_pct = abs(price - nearest["midpoint"]) / price * 100
dist_pct = abs(price - _sr_touch_price(nearest, price)) / price * 100
if dist_pct > SR_PROXIMITY_PCT:
continue
label = (
f"{nearest['low']:.2f}{nearest['high']:.2f}"
if nearest["level_count"] > 1
else f"{nearest['midpoint']:.2f}"
)
key = f"sr:{symbol}:{nearest['type']}" # one per side per ticker per cooldown
out.append((
key,
f"📍 <b>{symbol}</b> approaching {nearest['type']} {label} "
f"(now {price:.2f}, {dist_pct:.1f}% away)",
))
out.append((key, _format_sr_proximity(symbol, nearest, price)))
return out
@@ -550,6 +597,49 @@ async def _collect_regime_quadrant(db: AsyncSession) -> list[tuple[str, str]]:
# Dispatch
# ---------------------------------------------------------------------------
def _signal_bundle_messages(items: list[AlertItem]) -> list[tuple[list[AlertLogRef], str]]:
if not items:
return []
by_type: dict[str, list[AlertItem]] = {key: [] for key in SIGNAL_BUNDLE_ALERT_TYPES}
for item in items:
by_type.setdefault(item[0], []).append(item)
total = sum(len(group) for group in by_type.values())
header = f"📣 <b>Signal run</b> — {total} new alert(s)"
bundles: list[tuple[list[AlertLogRef], str]] = []
lines = [header]
logs: list[AlertLogRef] = []
current_section: str | None = None
def flush() -> None:
nonlocal lines, logs, current_section
if logs:
bundles.append((logs.copy(), "\n".join(lines)))
lines = [f"{header} (continued)"]
logs = []
current_section = None
for alert_type, section_title in SIGNAL_BUNDLE_SECTIONS:
for item_type, key, text in by_type.get(alert_type, []):
block: list[str] = []
if current_section != alert_type:
block.extend(["", f"<b>{section_title}</b>"])
block.append(text)
if logs and len("\n".join(lines + block)) > SIGNAL_BUNDLE_MAX_CHARS:
flush()
block = ["", f"<b>{section_title}</b>", text]
lines.extend(block)
logs.append((item_type, key))
current_section = alert_type
if logs:
bundles.append((logs.copy(), "\n".join(lines)))
return bundles
async def dispatch_alerts(db: AsyncSession) -> dict:
"""Gather all enabled triggers, dedup, and push to Telegram. Job entrypoint."""
cfg = await _resolve(db)
@@ -558,22 +648,23 @@ async def dispatch_alerts(db: AsyncSession) -> dict:
if not cfg["token"] or not cfg["chat_id"]:
return {"status": "no_credentials", "sent": 0}
outgoing: list[tuple[str, str, str]] = [] # (alert_type, key, text)
signal_outgoing: list[AlertItem] = []
outgoing: list[AlertItem] = []
if cfg["qualified"]:
for key, text in await _collect_qualified(db):
if not await _recently_alerted(db, "qualified", key):
outgoing.append(("qualified", key, text))
signal_outgoing.append(("qualified", key, text))
if cfg["sr"]:
for key, text in await _collect_sr_proximity(db):
if not await _recently_alerted(db, "sr_proximity", key):
outgoing.append(("sr_proximity", key, text))
signal_outgoing.append(("sr_proximity", key, text))
if cfg["score_drop"]:
# also seeds/advances watermarks as a side effect
for key, text in await _collect_score_drops(db):
outgoing.append(("score_drop", key, text))
signal_outgoing.append(("score_drop", key, text))
if cfg["digest"]:
digest = await _collect_digest(db)
@@ -591,8 +682,18 @@ async def dispatch_alerts(db: AsyncSession) -> dict:
outgoing.append((TRADE_CLOSED_TYPE, key, text))
sent = 0
if outgoing:
candidates = len(signal_outgoing) + len(outgoing)
if signal_outgoing or outgoing:
async with httpx.AsyncClient(timeout=15) as client:
for log_refs, text in _signal_bundle_messages(signal_outgoing):
try:
await _send(client, cfg["token"], cfg["chat_id"], text)
for alert_type, key in log_refs:
_log_alert(db, alert_type, key)
sent += 1
except Exception:
logger.exception("Failed to send signal alert bundle")
for alert_type, key, text in outgoing:
try:
await _send(client, cfg["token"], cfg["chat_id"], text)
@@ -602,7 +703,7 @@ async def dispatch_alerts(db: AsyncSession) -> dict:
logger.exception("Failed to send alert %s", key)
await db.commit() # persist watermark seeds/advances and sent-logs
return {"status": "ok", "sent": sent, "candidates": len(outgoing)}
return {"status": "ok", "sent": sent, "candidates": candidates}
async def send_test_alert(db: AsyncSession) -> dict:
+36 -13
View File
@@ -54,6 +54,7 @@ from app.services.outcome_service import (
from app.services.price_service import query_ohlcv
from app.services.qualification import (
HIGH_CONVICTION_ACTIONS,
_action_direction,
best_target_probability,
setup_qualifies,
)
@@ -790,6 +791,23 @@ def _backtest_worker_count() -> int:
return max(1, min(configured, cpu - 1))
def _offline_snapshot_mode() -> bool:
return os.getenv("BACKTEST_SNAPSHOT_OFFLINE", "").strip().lower() in {"1", "true", "yes", "on"}
async def _load_benchmark_closes_for_backtest(
db: AsyncSession, *, days: int | None = None, refresh: bool = True
) -> dict[date, float]:
from app.services.benchmark_service import load_benchmark_closes, refresh_benchmark_prices
if refresh and not _offline_snapshot_mode():
if days is None:
await refresh_benchmark_prices(db)
else:
await refresh_benchmark_prices(db, days=days)
return await load_benchmark_closes(db)
def _mp_context():
"""A start method safe to use from the threaded asyncio server: ``forkserver``
(workers forked from a clean, single-threaded server — avoids the
@@ -799,6 +817,12 @@ def _mp_context():
for method in ("forkserver", "fork"):
if method in methods:
return multiprocessing.get_context(method)
if (
_offline_snapshot_mode()
and os.getenv("BACKTEST_ALLOW_SPAWN", "").strip().lower() in {"1", "true", "yes", "on"}
and "spawn" in methods
):
return multiprocessing.get_context("spawn")
return None
@@ -917,7 +941,10 @@ def _gate_ablation(candidates: list[dict], activation: dict, threshold: float) -
return (c["confidence"] or 0.0) >= min_conf
def neutral_ok(c: dict) -> bool:
return not exclude_neutral or (c.get("action") or "NEUTRAL") != "NEUTRAL"
if not exclude_neutral:
return True
action_direction = _action_direction(c.get("action"))
return action_direction != "neutral" and action_direction == c["direction"]
def tighteners_ok(c: dict) -> bool:
if require_high and (c.get("action") or "") not in HIGH_CONVICTION_ACTIONS:
@@ -1564,10 +1591,9 @@ async def run_backtest(
# emitted and the rest of the report remains valid.
benchmark_closes: dict[date, float] = {}
try:
from app.services.benchmark_service import load_benchmark_closes, refresh_benchmark_prices
await refresh_benchmark_prices(db, days=settings.ohlcv_history_days + 365)
benchmark_closes = await load_benchmark_closes(db)
benchmark_closes = await _load_benchmark_closes_for_backtest(
db, days=settings.ohlcv_history_days + 365
)
except Exception:
logger.exception("Benchmark load for residual momentum failed")
@@ -1689,16 +1715,13 @@ async def run_backtest(
spy_closes: dict | None = None
try:
from app.services.benchmark_service import (
load_benchmark_closes,
refresh_benchmark_prices,
)
oldest = min((cols[0][0] for cols in price_columns.values()), default=None)
if oldest is not None:
days_needed = None
if oldest is not None and not _offline_snapshot_mode():
days_needed = (date.today() - date.fromordinal(oldest)).days + 30
await refresh_benchmark_prices(db, days=days_needed)
spy_closes = await load_benchmark_closes(db)
spy_closes = await _load_benchmark_closes_for_backtest(
db, days=days_needed, refresh=oldest is not None
)
except Exception:
logger.exception("Benchmark load for the portfolio sim failed")
+17 -5
View File
@@ -17,6 +17,16 @@ from typing import Any
HIGH_CONVICTION_ACTIONS = {"LONG_HIGH", "SHORT_HIGH"}
def _action_direction(action: str | None) -> str:
if not action or action == "NEUTRAL":
return "neutral"
if action.startswith("LONG"):
return "long"
if action.startswith("SHORT"):
return "short"
return "neutral"
def best_target_probability(setup: Any) -> float:
"""Highest probability among a setup's targets, 0 if none."""
targets = getattr(setup, "targets", None) or []
@@ -78,12 +88,14 @@ def setup_qualifies(setup: Any, config: dict) -> bool:
momentum_percentile = getattr(setup, "momentum_percentile", None)
if momentum_percentile is not None and momentum_percentile < min_pct:
return False
# A NEUTRAL recommendation means the engine found no clear directional setup —
# not an actionable signal, so by default it doesn't qualify (and can't be a
# top pick). ``exclude_neutral`` defaults on; turn it off to also count
# no-clear-direction residual momentum leaders.
# A setup is actionable only when the live ticker action points in the same
# direction. NEUTRAL means no clear signal; an opposite action means the
# setup is counter-bias. ``exclude_neutral`` defaults on; callers that omit
# it keep legacy floor-only behavior.
if config.get("exclude_neutral"):
if (setup.recommended_action or "NEUTRAL") == "NEUTRAL":
action_direction = _action_direction(getattr(setup, "recommended_action", None))
setup_direction = (getattr(setup, "direction", "long") or "long").lower()
if action_direction == "neutral" or action_direction != setup_direction:
return False
if config.get("require_high_conviction"):
if (setup.recommended_action or "") not in HIGH_CONVICTION_ACTIONS:
+57 -27
View File
@@ -524,6 +524,56 @@ def _build_reasoning(
)
def build_recommendation_snapshot(
dimension_scores: dict[str, float],
sentiment_classification: str | None,
config: dict[str, float],
available_directions: set[str] | None = None,
) -> dict[str, Any]:
"""Build the ticker-level recommendation from the supplied live context."""
conflicts = signal_conflict_detector.detect_conflicts(
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
config=config,
)
long_confidence = direction_analyzer.calculate_confidence(
direction="long",
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
conflicts=conflicts,
)
short_confidence = direction_analyzer.calculate_confidence(
direction="short",
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
conflicts=conflicts,
)
action = _choose_recommended_action(
long_confidence, short_confidence, config, available_directions
)
reasoning = _build_reasoning(
action=action,
long_confidence=long_confidence,
short_confidence=short_confidence,
conflicts=conflicts,
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
config=config,
available_directions=available_directions,
)
return {
"action": action,
"reasoning": reasoning,
"risk_level": _risk_level_from_conflicts(conflicts),
"long_confidence": long_confidence,
"short_confidence": short_confidence,
"conflicts": conflicts,
}
PRIMARY_TARGET_MIN_RR = 1.5
@@ -559,24 +609,15 @@ async def enhance_trade_setup(
) -> TradeSetup:
config = await get_recommendation_config(db)
conflicts = signal_conflict_detector.detect_conflicts(
snapshot = build_recommendation_snapshot(
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
config=config,
available_directions=available_directions,
)
long_confidence = direction_analyzer.calculate_confidence(
direction="long",
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
conflicts=conflicts,
)
short_confidence = direction_analyzer.calculate_confidence(
direction="short",
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
conflicts=conflicts,
)
conflicts = list(snapshot["conflicts"])
long_confidence = float(snapshot["long_confidence"])
short_confidence = float(snapshot["short_confidence"])
direction = setup.direction.lower()
confidence = long_confidence if direction == "long" else short_confidence
@@ -622,19 +663,8 @@ async def enhance_trade_setup(
# Action and reasoning are ticker-level: they consider both directions and
# which directions are actually tradeable, and are identical on every setup.
action = _choose_recommended_action(
long_confidence, short_confidence, config, available_directions
)
reasoning = _build_reasoning(
action=action,
long_confidence=long_confidence,
short_confidence=short_confidence,
conflicts=conflicts,
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
config=config,
available_directions=available_directions,
)
action = str(snapshot["action"])
reasoning = str(snapshot["reasoning"])
setup.confidence_score = round(confidence, 2)
setup.targets_json = json.dumps(targets)
+202 -11
View File
@@ -27,7 +27,12 @@ from app.models.ticker import Ticker
from app.models.trade_setup import TradeSetup
from app.services.indicator_service import _extract_ohlcv, compute_atr
from app.services.price_service import query_ohlcv
from app.services.recommendation_service import enhance_trade_setup
from app.services.recommendation_service import (
_risk_level_from_conflicts,
build_recommendation_snapshot,
enhance_trade_setup,
get_recommendation_config,
)
logger = logging.getLogger(__name__)
@@ -80,6 +85,133 @@ async def _get_latest_sentiment(db: AsyncSession, ticker_id: int) -> str | None:
return row.classification if row else None
async def _apply_live_recommendation_context(
db: AsyncSession,
setup_rows: list[tuple[TradeSetup, str]],
rows: list[dict],
) -> list[dict]:
"""Decorate latest setup rows with current score/sentiment recommendation data.
This intentionally updates only the API payload. Stored trade setups and
history remain point-in-time records for outcome analysis.
"""
if not rows or not setup_rows:
return rows
ticker_ids = {setup.ticker_id for setup, _ in setup_rows}
setups_by_id = {setup.id: setup for setup, _ in setup_rows}
directions_by_ticker = await _latest_available_directions_by_ticker(db, ticker_ids)
dim_result = await db.execute(
select(DimensionScore).where(DimensionScore.ticker_id.in_(ticker_ids))
)
dims_by_ticker: dict[int, dict[str, float]] = {}
for ds in dim_result.scalars().all():
dims_by_ticker.setdefault(ds.ticker_id, {})[ds.dimension] = float(ds.score)
comp_result = await db.execute(
select(CompositeScore)
.where(CompositeScore.ticker_id.in_(ticker_ids))
.order_by(CompositeScore.ticker_id, CompositeScore.computed_at.desc())
)
composites: dict[int, CompositeScore] = {}
for comp in comp_result.scalars().all():
composites.setdefault(comp.ticker_id, comp)
sent_result = await db.execute(
select(SentimentScore)
.where(SentimentScore.ticker_id.in_(ticker_ids))
.order_by(SentimentScore.ticker_id, SentimentScore.timestamp.desc())
)
sentiments: dict[int, SentimentScore] = {}
for sent in sent_result.scalars().all():
sentiments.setdefault(sent.ticker_id, sent)
config = await get_recommendation_config(db)
live_rows: list[dict] = []
for row in rows:
setup = setups_by_id.get(row["id"])
if setup is None:
live_rows.append(row)
continue
ticker_id = setup.ticker_id
live_row = dict(row)
comp = composites.get(ticker_id)
if comp is not None:
live_row["composite_score"] = float(comp.score)
live_row["context_as_of"]["score_computed_at"] = comp.computed_at
dimension_scores = dims_by_ticker.get(ticker_id)
sentiment = sentiments.get(ticker_id)
if sentiment is not None:
live_row["context_as_of"]["sentiment_at"] = sentiment.timestamp
if dimension_scores:
snapshot = build_recommendation_snapshot(
dimension_scores=dimension_scores,
sentiment_classification=sentiment.classification if sentiment else None,
config=config,
available_directions=directions_by_ticker.get(ticker_id),
)
direction = setup.direction.lower()
confidence_key = "long_confidence" if direction == "long" else "short_confidence"
live_row["confidence_score"] = round(float(snapshot[confidence_key]), 2)
live_row["recommended_action"] = snapshot["action"]
live_row["reasoning"] = snapshot["reasoning"]
setup_conflicts = _setup_specific_conflicts(live_row.get("conflict_flags", []))
live_conflicts = [str(item) for item in snapshot["conflicts"]]
live_row["conflict_flags"] = live_conflicts + setup_conflicts
live_row["risk_level"] = _risk_level_from_conflicts(live_row["conflict_flags"])
live_rows.append(live_row)
return live_rows
def _setup_specific_conflicts(conflicts: list[str]) -> list[str]:
signal_prefixes = (
"sentiment-technical:",
"sentiment-momentum:",
"momentum-technical:",
"fundamental-technical:",
)
return [
str(conflict)
for conflict in conflicts
if not str(conflict).startswith(signal_prefixes)
]
async def _latest_available_directions_by_ticker(
db: AsyncSession,
ticker_ids: set[int],
) -> dict[int, set[str]]:
if not ticker_ids:
return {}
result = await db.execute(
select(TradeSetup)
.where(TradeSetup.ticker_id.in_(ticker_ids))
.order_by(
TradeSetup.ticker_id,
TradeSetup.direction,
TradeSetup.detected_at.desc(),
TradeSetup.id.desc(),
)
)
latest_by_key: set[tuple[int, str]] = set()
directions: dict[int, set[str]] = {}
for setup in result.scalars().all():
direction = setup.direction.lower()
key = (setup.ticker_id, direction)
if key in latest_by_key:
continue
latest_by_key.add(key)
directions.setdefault(setup.ticker_id, set()).add(direction)
return directions
def _json_default(value):
if isinstance(value, (datetime, date)):
return value.isoformat()
@@ -441,6 +573,7 @@ async def get_trade_setups(
min_confidence: float | None = None,
recommended_action: str | None = None,
symbol: str | None = None,
live_recommendation: bool = False,
) -> list[dict]:
"""Get latest stored trade setups, optionally filtered."""
stmt = (
@@ -451,9 +584,11 @@ async def get_trade_setups(
stmt = stmt.where(TradeSetup.direction == direction.lower())
if symbol is not None:
stmt = stmt.where(Ticker.symbol == symbol.strip().upper())
if min_confidence is not None:
# With live_recommendation these fields are overlaid with current values
# below, so filtering happens there instead of against the stored columns.
if min_confidence is not None and not live_recommendation:
stmt = stmt.where(TradeSetup.confidence_score >= min_confidence)
if recommended_action is not None:
if recommended_action is not None and not live_recommendation:
stmt = stmt.where(TradeSetup.recommended_action == recommended_action)
stmt = stmt.order_by(TradeSetup.detected_at.desc(), TradeSetup.id.desc())
@@ -477,15 +612,37 @@ async def get_trade_setups(
reverse=True,
)
prices = await _latest_closes(db, {s.ticker_id for s, _ in latest_rows})
return [
prices = await _latest_price_context(db, {s.ticker_id for s, _ in latest_rows})
rows_out = [
_trade_setup_to_dict(setup, ticker_symbol, prices.get(setup.ticker_id))
for setup, ticker_symbol in latest_rows
]
if live_recommendation:
rows_out = await _apply_live_recommendation_context(db, latest_rows, rows_out)
if min_confidence is not None:
rows_out = [
row for row in rows_out
if row["confidence_score"] is not None
and row["confidence_score"] >= min_confidence
]
if recommended_action is not None:
rows_out = [
row for row in rows_out
if row["recommended_action"] == recommended_action
]
rows_out.sort(
key=lambda row: (
row["confidence_score"] if row["confidence_score"] is not None else -1.0,
row["rr_ratio"],
row["composite_score"],
),
reverse=True,
)
return rows_out
async def _latest_closes(db: AsyncSession, ticker_ids: set[int]) -> dict[int, float]:
"""Most recent close per ticker — used to judge a setup's current relevance."""
async def _latest_price_context(db: AsyncSession, ticker_ids: set[int]) -> dict[int, dict]:
"""Most recent daily OHLCV row per ticker for live price context."""
if not ticker_ids:
return {}
latest = (
@@ -494,7 +651,12 @@ async def _latest_closes(db: AsyncSession, ticker_ids: set[int]) -> dict[int, fl
.group_by(OHLCVRecord.ticker_id)
.subquery()
)
stmt = select(OHLCVRecord.ticker_id, OHLCVRecord.close).join(
stmt = select(
OHLCVRecord.ticker_id,
OHLCVRecord.close,
OHLCVRecord.date,
OHLCVRecord.created_at,
).join(
latest,
and_(
OHLCVRecord.ticker_id == latest.c.ticker_id,
@@ -502,7 +664,23 @@ async def _latest_closes(db: AsyncSession, ticker_ids: set[int]) -> dict[int, fl
),
)
result = await db.execute(stmt)
return {tid: float(close) for tid, close in result.all()}
return {
tid: {
"current_price": float(close),
"price_date": price_date,
"price_updated_at": created_at,
}
for tid, close, price_date, created_at in result.all()
}
async def _latest_closes(db: AsyncSession, ticker_ids: set[int]) -> dict[int, float]:
"""Most recent close per ticker, kept for callers that only need price."""
price_context = await _latest_price_context(db, ticker_ids)
return {
ticker_id: context["current_price"]
for ticker_id, context in price_context.items()
}
async def get_trade_setup_history(
@@ -519,16 +697,28 @@ async def get_trade_setup_history(
result = await db.execute(stmt)
rows = result.all()
prices = await _latest_closes(db, {s.ticker_id for s, _ in rows})
prices = await _latest_price_context(db, {s.ticker_id for s, _ in rows})
return [
_trade_setup_to_dict(setup, ticker_symbol, prices.get(setup.ticker_id))
for setup, ticker_symbol in rows
]
def _trade_setup_to_dict(setup: TradeSetup, symbol: str, current_price: float | None = None) -> dict:
def _trade_setup_to_dict(setup: TradeSetup, symbol: str, price_context: dict | None = None) -> dict:
targets: list[dict] = []
conflicts: list[str] = []
current_price = (
float(price_context["current_price"])
if price_context and price_context.get("current_price") is not None
else None
)
context_as_of = {
"setup_detected_at": setup.detected_at,
"score_computed_at": None,
"sentiment_at": None,
"price_date": price_context.get("price_date") if price_context else None,
"price_updated_at": price_context.get("price_updated_at") if price_context else None,
}
if setup.targets_json:
try:
@@ -567,4 +757,5 @@ def _trade_setup_to_dict(setup: TradeSetup, symbol: str, current_price: float |
"evaluated_at": setup.evaluated_at,
"current_price": current_price,
"momentum_percentile": setup.momentum_percentile,
"context_as_of": context_as_of,
}
+10 -64
View File
@@ -2,8 +2,8 @@
Computes dimension scores (technical, sr_quality, sentiment, fundamental,
momentum) each 0-100, composite score as weighted average of available
dimensions with re-normalized weights, staleness marking/recomputation
on demand, and weight update triggers full recomputation.
dimensions with re-normalized weights, staleness marking, explicit refresh
paths, and weight update triggers full recomputation.
"""
from __future__ import annotations
@@ -765,73 +765,37 @@ async def compute_composite_score(
async def get_score(
db: AsyncSession, symbol: str
) -> dict:
"""Get composite + all dimension scores for a ticker.
"""Read composite + dimension scores for a ticker without recomputing.
Recomputes stale dimensions on demand, then recomputes composite.
Returns a dict suitable for ScoreResponse, including dimension breakdowns
and composite breakdown with re-normalization info.
GET endpoints use this path, so it must not mutate persisted score context.
Scheduled/manual write paths are responsible for refreshing scores.
"""
ticker = await _get_ticker(db, symbol)
weights = await _get_weights(db)
# Check for stale dimension scores and recompute them
result = await db.execute(
select(DimensionScore).where(DimensionScore.ticker_id == ticker.id)
)
dim_scores = {ds.dimension: ds for ds in result.scalars().all()}
for dim in DIMENSIONS:
ds = dim_scores.get(dim)
if ds is None or ds.is_stale:
await compute_dimension_score(db, symbol, dim)
# Check composite staleness
comp_result = await db.execute(
select(CompositeScore).where(CompositeScore.ticker_id == ticker.id)
)
comp = comp_result.scalar_one_or_none()
if comp is None or comp.is_stale:
await compute_composite_score(db, symbol, weights)
await db.commit()
# Re-fetch everything fresh
result = await db.execute(
select(DimensionScore).where(DimensionScore.ticker_id == ticker.id)
)
dim_scores_list = list(result.scalars().all())
dim_scores = {ds.dimension: ds for ds in dim_scores_list}
comp_result = await db.execute(
select(CompositeScore).where(CompositeScore.ticker_id == ticker.id)
)
comp = comp_result.scalar_one_or_none()
# Compute breakdowns for each dimension by calling the dimension computers
breakdowns: dict[str, dict | None] = {}
for dim in DIMENSIONS:
try:
raw_result = await _DIMENSION_COMPUTERS[dim](db, symbol)
if isinstance(raw_result, tuple) and len(raw_result) == 2:
breakdowns[dim] = raw_result[1]
else:
breakdowns[dim] = None
except Exception:
breakdowns[dim] = None
# Build dimension entries with breakdowns
dimensions = []
missing = []
available_dims: list[str] = []
for dim in DIMENSIONS:
found = next((ds for ds in dim_scores_list if ds.dimension == dim), None)
found = dim_scores.get(dim)
if found is not None and not found.is_stale and found.score is not None:
dimensions.append({
"dimension": found.dimension,
"score": found.score,
"is_stale": found.is_stale,
"computed_at": found.computed_at,
"breakdown": breakdowns.get(dim),
"breakdown": None,
})
w = weights.get(dim, 0.0)
if w > 0:
@@ -845,7 +809,7 @@ async def get_score(
"score": found.score,
"is_stale": found.is_stale,
"computed_at": found.computed_at,
"breakdown": breakdowns.get(dim),
"breakdown": None,
})
# Build composite breakdown: the non-sentiment base (re-normalized weighted
@@ -925,31 +889,13 @@ async def get_rankings(db: AsyncSession) -> dict:
dims[ds.ticker_id][ds.dimension] = ds
return comps, dims
# Two bulk reads instead of ~4 queries per ticker.
comps, dims_by_ticker = await _load_scores()
# Lazily recompute any stale/missing scores (kept fresh by the daily scan;
# this self-heals tickers that aged out between scans), committing once.
recomputed = False
for ticker in tickers:
comp = comps.get(ticker.id)
if comp is None or comp.is_stale:
dim_scores = dims_by_ticker.get(ticker.id, {})
for dim in DIMENSIONS:
ds = dim_scores.get(dim)
if ds is None or ds.is_stale:
await compute_dimension_score(db, ticker.symbol, dim)
await compute_composite_score(db, ticker.symbol, weights)
recomputed = True
if recomputed:
await db.commit()
comps, dims_by_ticker = await _load_scores()
rankings = [
{
"symbol": ticker.symbol,
"composite_score": comp.score,
"composite_stale": comp.is_stale,
"dimensions": [
{
"dimension": ds.dimension,
@@ -1,6 +1,6 @@
import { useRef, useEffect, useCallback, useState } from 'react';
import type { OHLCVBar, SRLevel, SRZone, TradeSetup } from '../../lib/types';
import { formatPrice, formatDate } from '../../lib/format';
import { formatPrice, formatDate, formatLargeNumber } from '../../lib/format';
interface CandlestickChartProps {
data: OHLCVBar[];
@@ -50,6 +50,9 @@ interface TooltipState {
}
const MIN_VISIBLE_BARS = 10;
const CHART_HEIGHT = 440;
const VOLUME_PANE_HEIGHT = 72;
const PANE_GAP = 18;
type RangePreset = '1M' | '3M' | '6M' | 'YTD' | '1Y' | '3Y' | '5Y' | 'All';
const RANGE_PRESETS: RangePreset[] = ['1M', '3M', '6M', 'YTD', '1Y', '3Y', '5Y', 'All'];
@@ -109,7 +112,7 @@ export function CandlestickChart({ data, srLevels = [], zones = [], tradeSetup,
const dpr = window.devicePixelRatio || 1;
const rect = container.getBoundingClientRect();
const W = rect.width;
const H = 400;
const H = CHART_HEIGHT;
canvas.width = W * dpr;
canvas.height = H * dpr;
@@ -124,7 +127,11 @@ export function CandlestickChart({ data, srLevels = [], zones = [], tradeSetup,
// Margins
const ml = 12, mr = 70, mt = 12, mb = 32;
const cw = W - ml - mr;
const ch = H - mt - mb;
const volumeH = VOLUME_PANE_HEIGHT;
const ch = H - mt - mb - volumeH - PANE_GAP;
const priceBottom = mt + ch;
const volumeTop = priceBottom + PANE_GAP;
const volumeBottom = volumeTop + volumeH;
// Current price = explicit prop, else latest close
const livePrice = currentPrice ?? visibleData[visibleData.length - 1].close;
@@ -145,6 +152,9 @@ export function CandlestickChart({ data, srLevels = [], zones = [], tradeSetup,
const yScale = (v: number) => mt + ch - ((v - lo) / (hi - lo)) * ch;
const barW = cw / visibleData.length;
const candleW = Math.max(barW * 0.65, 1);
const volumeW = Math.max(barW * 0.65, 1);
const maxVolume = Math.max(...visibleData.map((b) => Math.max(0, b.volume)), 1);
const volumeScale = (v: number) => volumeTop + volumeH - (Math.max(0, v) / maxVolume) * volumeH;
// Grid lines (horizontal)
const nTicks = 6;
@@ -172,6 +182,34 @@ export function CandlestickChart({ data, srLevels = [], zones = [], tradeSetup,
ctx.fillText(formatDate(visibleData[i].date), x, H - 6);
}
// Volume pane
ctx.strokeStyle = 'rgba(255,255,255,0.06)';
ctx.lineWidth = 1;
ctx.beginPath();
ctx.moveTo(ml, volumeTop - 9);
ctx.lineTo(ml + cw, volumeTop - 9);
ctx.stroke();
ctx.beginPath();
ctx.moveTo(ml, volumeBottom);
ctx.lineTo(ml + cw, volumeBottom);
ctx.stroke();
ctx.font = '10px "IBM Plex Mono", ui-monospace, monospace';
ctx.fillStyle = '#6b7280';
ctx.textAlign = 'left';
ctx.fillText('Volume', ml, volumeTop - 13);
ctx.textAlign = 'right';
ctx.fillText(formatLargeNumber(maxVolume), W - 8, volumeTop + 4);
visibleData.forEach((bar, i) => {
const x = ml + i * barW + barW / 2;
const bullish = bar.close >= bar.open;
const yVolume = volumeScale(bar.volume);
const hVolume = Math.max(volumeBottom - yVolume, bar.volume > 0 ? 1 : 0);
ctx.fillStyle = bullish ? 'rgba(16, 185, 129, 0.32)' : 'rgba(239, 68, 68, 0.28)';
ctx.fillRect(x - volumeW / 2, yVolume, volumeW, hVolume);
});
// Nearest support/resistance only (band if it came from a zone)
markers.forEach((m) => {
const isSupport = m.role === 'support';
@@ -312,7 +350,22 @@ export function CandlestickChart({ data, srLevels = [], zones = [], tradeSetup,
});
// Store geometry for hit testing (includes visibleRange offset)
(canvas as any).__chartMeta = { ml, mr, mt, mb, cw, ch, barW, lo, hi, yScale, visibleStart: start };
(canvas as any).__chartMeta = {
ml,
mr,
mt,
mb,
cw,
ch,
barW,
lo,
hi,
yScale,
visibleStart: start,
volumeTop,
volumeH,
volumeBottom,
};
// Size the overlay canvas to match
const overlay = overlayCanvasRef.current;
@@ -342,12 +395,14 @@ export function CandlestickChart({ data, srLevels = [], zones = [], tradeSetup,
const meta = (canvas as any).__chartMeta;
if (!meta) return;
const { ml, mt, mb, cw, ch, barW, lo, hi, visibleStart } = meta;
const { ml, mt, mb, cw, ch, barW, lo, hi, visibleStart, volumeBottom } = meta;
const H = overlay.height / dpr;
const priceBottom = mt + ch;
const chartBottom = volumeBottom ?? priceBottom;
// Clamp crosshair to chart area
const cx = Math.max(ml, Math.min(ml + cw, pos.x));
const cy = Math.max(mt, Math.min(mt + ch, pos.y));
const cy = Math.max(mt, Math.min(chartBottom, pos.y));
// Dashed crosshair lines
ctx.strokeStyle = 'rgba(255, 255, 255, 0.4)';
@@ -357,37 +412,44 @@ export function CandlestickChart({ data, srLevels = [], zones = [], tradeSetup,
// Vertical line
ctx.beginPath();
ctx.moveTo(cx, mt);
ctx.lineTo(cx, mt + ch);
ctx.stroke();
// Horizontal line
ctx.beginPath();
ctx.moveTo(ml, cy);
ctx.lineTo(ml + cw, cy);
ctx.lineTo(cx, chartBottom);
ctx.stroke();
ctx.setLineDash([]);
// Price label on y-axis (right side)
const price = hi - ((cy - mt) / ch) * (hi - lo);
const priceText = formatPrice(price);
ctx.font = '11px "IBM Plex Mono", ui-monospace, monospace';
const priceMetrics = ctx.measureText(priceText);
const labelPadX = 5;
const labelPadY = 3;
const labelW = priceMetrics.width + labelPadX * 2;
const labelH = 16 + labelPadY * 2;
const labelX = ml + cw + 2;
const labelY = cy - labelH / 2;
ctx.fillStyle = 'rgba(55, 65, 81, 0.9)';
ctx.beginPath();
ctx.roundRect(labelX, labelY, labelW, labelH, 3);
ctx.fill();
ctx.fillStyle = '#e5e7eb';
ctx.textAlign = 'left';
ctx.textBaseline = 'middle';
ctx.fillText(priceText, labelX + labelPadX, cy);
if (cy <= priceBottom) {
// Horizontal price crosshair only belongs in the price pane.
ctx.strokeStyle = 'rgba(255, 255, 255, 0.4)';
ctx.lineWidth = 0.75;
ctx.setLineDash([4, 3]);
ctx.beginPath();
ctx.moveTo(ml, cy);
ctx.lineTo(ml + cw, cy);
ctx.stroke();
ctx.setLineDash([]);
// Price label on y-axis (right side)
const price = hi - ((cy - mt) / ch) * (hi - lo);
const priceText = formatPrice(price);
const priceMetrics = ctx.measureText(priceText);
const labelW = priceMetrics.width + labelPadX * 2;
const labelH = 16 + labelPadY * 2;
const labelX = ml + cw + 2;
const labelY = cy - labelH / 2;
ctx.fillStyle = 'rgba(55, 65, 81, 0.9)';
ctx.beginPath();
ctx.roundRect(labelX, labelY, labelW, labelH, 3);
ctx.fill();
ctx.fillStyle = '#e5e7eb';
ctx.textAlign = 'left';
ctx.textBaseline = 'middle';
ctx.fillText(priceText, labelX + labelPadX, cy);
}
// Date label on x-axis (bottom)
const localIdx = Math.floor((cx - ml) / barW);
@@ -619,7 +681,7 @@ export function CandlestickChart({ data, srLevels = [], zones = [], tradeSetup,
<span>High</span><span class="text-right text-gray-200">${formatPrice(bar.high)}</span>
<span>Low</span><span class="text-right text-gray-200">${formatPrice(bar.low)}</span>
<span>Close</span><span class="text-right text-gray-200">${formatPrice(bar.close)}</span>
<span>Vol</span><span class="text-right text-gray-200">${bar.volume.toLocaleString()}</span>
<span>Vol</span><span class="text-right text-gray-200" title="${bar.volume.toLocaleString()}">${formatLargeNumber(bar.volume)}</span>
</div>${tradeTooltipHtml}`;
} else {
tip.style.display = 'none';
@@ -670,16 +732,16 @@ export function CandlestickChart({ data, srLevels = [], zones = [], tradeSetup,
))}
<span className="ml-1 text-[10px] text-gray-600">scroll to zoom · drag to pan</span>
</div>
<div ref={containerRef} className="relative w-full" style={{ height: 400 }}>
<div ref={containerRef} className="relative w-full" style={{ height: CHART_HEIGHT }}>
<canvas
ref={canvasRef}
className="w-full"
style={{ height: 400 }}
style={{ height: CHART_HEIGHT }}
/>
<canvas
ref={overlayCanvasRef}
className="absolute top-0 left-0 w-full cursor-crosshair"
style={{ height: 400 }}
style={{ height: CHART_HEIGHT }}
onMouseDown={handleMouseDown}
onMouseMove={handleMouseMove}
onMouseUp={handleMouseUp}
+1 -1
View File
@@ -5,7 +5,7 @@ import type { DimensionScoreDetail, CompositeBreakdown } from '../../lib/types';
interface ScoreCardProps {
compositeScore: number | null;
dimensions: DimensionScoreDetail[];
compositeBreakdown?: CompositeBreakdown;
compositeBreakdown?: CompositeBreakdown | null;
/** Hide the composite ring/header when the composite is shown elsewhere
* (e.g. the Standing matrix) and this card only carries the dimension detail. */
showComposite?: boolean;
+16 -7
View File
@@ -2,6 +2,13 @@ import type { ActivationConfig, TradeSetup } from './types';
const HIGH_CONVICTION_ACTIONS = new Set(['LONG_HIGH', 'SHORT_HIGH']);
function actionDirection(action: TradeSetup['recommended_action']): 'long' | 'short' | 'neutral' {
if (!action || action === 'NEUTRAL') return 'neutral';
if (action.startsWith('LONG')) return 'long';
if (action.startsWith('SHORT')) return 'short';
return 'neutral';
}
export function bestTargetProbability(setup: TradeSetup): number {
return setup.targets?.length ? Math.max(...setup.targets.map((t) => t.probability)) : 0;
}
@@ -42,8 +49,11 @@ export function qualifiesSetup(setup: TradeSetup, config: ActivationConfig): boo
return false;
}
}
// NEUTRAL = "no clear setup" — not actionable, so by default it doesn't qualify.
if (config.exclude_neutral && (setup.recommended_action ?? 'NEUTRAL') === 'NEUTRAL') return false;
// NEUTRAL = "no clear setup"; an opposite action means this setup is counter-bias.
if (config.exclude_neutral) {
const actionDir = actionDirection(setup.recommended_action);
if (actionDir === 'neutral' || actionDir !== setup.direction) return false;
}
if (config.require_high_conviction && !HIGH_CONVICTION_ACTIONS.has(setup.recommended_action ?? '')) {
return false;
}
@@ -53,9 +63,9 @@ export function qualifiesSetup(setup: TradeSetup, config: ActivationConfig): boo
/**
* Symbol of the current single 'top pick' — the #1 row the dashboard highlights:
* the highest residual 12-1 momentum percentile among qualified setups (or among all
* setups when none qualify). Returns null when there are no setups. Keep in step
* with the Top Setups ranking in DashboardPage.
* the highest residual 12-1 momentum percentile among qualified setups. Returns
* null when there are no actionable setups. Keep in step with the Top Setups
* ranking in DashboardPage.
*/
export function topPickSymbol(
trades: TradeSetup[] | undefined,
@@ -64,8 +74,7 @@ export function topPickSymbol(
const all = trades ?? [];
if (all.length === 0) return null;
const qualified = activation ? all.filter((t) => qualifiesSetup(t, activation)) : [];
const pool = qualified.length > 0 ? qualified : all;
const top = [...pool].sort(
const top = [...qualified].sort(
(a, b) => (b.momentum_percentile ?? -Infinity) - (a.momentum_percentile ?? -Infinity),
)[0];
return top?.symbol ?? null;
+12 -2
View File
@@ -96,7 +96,7 @@ export interface ScoreResponse {
dimensions: DimensionScoreDetail[];
missing_dimensions: string[];
computed_at: string | null;
composite_breakdown?: CompositeBreakdown;
composite_breakdown?: CompositeBreakdown | null;
}
export interface DimensionScoreDetail {
@@ -104,12 +104,13 @@ export interface DimensionScoreDetail {
score: number;
is_stale: boolean;
computed_at: string | null;
breakdown?: ScoreBreakdown;
breakdown?: ScoreBreakdown | null;
}
export interface RankingEntry {
symbol: string;
composite_score: number;
composite_stale: boolean;
dimensions: DimensionScoreDetail[];
}
@@ -140,9 +141,18 @@ export interface TradeSetup {
evaluated_at: string | null;
current_price: number | null;
momentum_percentile?: number | null;
context_as_of?: TradeSetupContextAsOf | null;
recommendation_summary?: RecommendationSummary;
}
export interface TradeSetupContextAsOf {
setup_detected_at: string;
score_computed_at: string | null;
sentiment_at: string | null;
price_date: string | null;
price_updated_at: string | null;
}
// Performance / outcome statistics
export interface OutcomeBucketStats {
total: number;
+5 -8
View File
@@ -76,15 +76,12 @@ export default function DashboardPage() {
[trades.data, activation.data],
);
// Show qualified setups first; fall back to the full list when none qualify.
// Rank by residual 12-1 momentum percentile so the strongest names sit at the top.
const showingQualified = qualifiedSetups.length > 0;
// Rank only actionable/qualified setups by residual 12-1 momentum percentile.
const topSetups: TradeSetup[] = useMemo(() => {
const pool = showingQualified ? qualifiedSetups : trades.data ?? [];
return [...pool]
return [...qualifiedSetups]
.sort((a, b) => (b.momentum_percentile ?? -Infinity) - (a.momentum_percentile ?? -Infinity))
.slice(0, 5);
}, [showingQualified, qualifiedSetups, trades.data]);
}, [qualifiedSetups]);
const topWatchlist = useMemo(
() =>
@@ -197,12 +194,12 @@ export default function DashboardPage() {
<div className="xl:col-span-3">
<Section
title="Top Setups"
hint={showingQualified ? 'ranked by expected value' : 'none qualified — showing all'}
hint="qualified and ranked by residual momentum"
>
{trades.isLoading && <SkeletonTable rows={5} cols={5} />}
{trades.isError && <Callout variant="error">Failed to load setups</Callout>}
{trades.data && topSetups.length === 0 && (
<Callout variant="empty">No active setups. Run the scanner from the Signals page.</Callout>
<Callout variant="empty">No qualified actionable setups right now.</Callout>
)}
{topSetups.length > 0 && (
<div className="glass overflow-x-auto">
+2 -2
View File
@@ -346,8 +346,8 @@ export default function TickerDetailPage() {
/>
{/* Chart — always visible */}
<Section title="Price Chart">
{ohlcv.isLoading && <SkeletonCard className="h-[400px]" />}
<Section title="Price & Volume">
{ohlcv.isLoading && <SkeletonCard className="h-[440px]" />}
{ohlcv.isError && (
<SectionError
message={ohlcv.error instanceof Error ? ohlcv.error.message : 'Failed to load OHLCV data'}
+162
View File
@@ -0,0 +1,162 @@
"""Create a minimal local SQLite snapshot for offline backtest research.
Copies only the data required by app.services.backtest_service.run_backtest:
tickers, OHLCV bars, SPY benchmark closes, and activation/recommendation
settings. Other system settings are intentionally skipped to avoid copying
secrets into local snapshot files.
"""
from __future__ import annotations
import argparse
import asyncio
import os
import sys
from pathlib import Path
from sqlalchemy import func, insert, or_, select
from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
def _normalize_postgres_url(url: str) -> str:
if url.startswith("postgresql+asyncpg://"):
return url
if url.startswith("postgresql://"):
return "postgresql+asyncpg://" + url[len("postgresql://") :]
if url.startswith("postgres://"):
return "postgresql+asyncpg://" + url[len("postgres://") :]
return url
def _sqlite_url(path: Path) -> str:
return f"sqlite+aiosqlite:///{path.resolve().as_posix()}"
def _hide_password(url: str) -> str:
return make_url(url).render_as_string(hide_password=True)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--database-url",
default=os.getenv("DATABASE_URL"),
help="Source Postgres URL. Defaults to DATABASE_URL, then app .env database_url.",
)
parser.add_argument(
"--output",
default="backtest_snapshots/prod-backtest.sqlite",
help="SQLite snapshot path to create.",
)
parser.add_argument("--batch-size", type=int, default=5000)
parser.add_argument("--force", action="store_true", help="Overwrite an existing snapshot file.")
return parser.parse_args()
async def _copy_table(
source: AsyncSession,
dest: AsyncSession,
model: type,
*,
batch_size: int,
where=None,
) -> int:
table = model.__table__
columns = list(table.columns)
count_stmt = select(func.count()).select_from(table)
stmt = select(*columns)
if where is not None:
count_stmt = count_stmt.where(where)
stmt = stmt.where(where)
primary_key_columns = list(table.primary_key.columns)
if primary_key_columns:
stmt = stmt.order_by(*primary_key_columns)
expected = int((await source.execute(count_stmt)).scalar_one())
if expected == 0:
print(f"{table.name}: 0 rows")
return 0
copied = 0
stream = await source.stream(stmt.execution_options(yield_per=batch_size))
async for partition in stream.partitions(batch_size):
rows = [dict(row._mapping) for row in partition]
if not rows:
continue
await dest.execute(insert(table), rows)
await dest.commit()
copied += len(rows)
print(f"{table.name}: {copied}/{expected}", end="\r")
print(f"{table.name}: {copied} rows")
return copied
async def _main() -> None:
args = _parse_args()
from app.config import settings
from app.database import Base
import app.models # noqa: F401 - registers all metadata tables
from app.models.benchmark_price import BenchmarkPrice
from app.models.ohlcv import OHLCVRecord
from app.models.settings import SystemSetting
from app.models.ticker import Ticker
source_url = _normalize_postgres_url(args.database_url or settings.database_url)
output = Path(args.output)
if output.exists():
if not args.force:
raise SystemExit(f"{output} already exists. Pass --force to overwrite it.")
output.unlink()
output.parent.mkdir(parents=True, exist_ok=True)
source_engine = create_async_engine(
source_url,
pool_pre_ping=True,
connect_args={"server_settings": {"default_transaction_read_only": "on"}},
)
dest_engine = create_async_engine(_sqlite_url(output))
SourceSession = async_sessionmaker(source_engine, class_=AsyncSession, expire_on_commit=False)
DestSession = async_sessionmaker(dest_engine, class_=AsyncSession, expire_on_commit=False)
print(f"Source: {_hide_password(source_url)}")
print(f"Snapshot: {output}")
try:
async with dest_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with SourceSession() as source, DestSession() as dest:
counts = {
"tickers": await _copy_table(source, dest, Ticker, batch_size=args.batch_size),
"system_settings": await _copy_table(
source,
dest,
SystemSetting,
batch_size=args.batch_size,
where=or_(
SystemSetting.key.like("activation_%"),
SystemSetting.key.like("recommendation_%"),
),
),
"benchmark_prices": await _copy_table(source, dest, BenchmarkPrice, batch_size=args.batch_size),
"ohlcv_records": await _copy_table(source, dest, OHLCVRecord, batch_size=args.batch_size),
}
finally:
await source_engine.dispose()
await dest_engine.dispose()
print("Done:")
for name, count in counts.items():
print(f" {name}: {count}")
if __name__ == "__main__":
asyncio.run(_main())
+139
View File
@@ -0,0 +1,139 @@
"""Run the existing backtest service against a local SQLite snapshot.
The runner is offline/read-only: it does not refresh benchmark prices and does
not cache the report back to any database. It writes a local JSON report.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
def _sqlite_url(path: Path) -> str:
return f"sqlite+aiosqlite:///{path.resolve().as_posix()}"
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("snapshot", help="SQLite snapshot created by create_backtest_snapshot.py.")
parser.add_argument(
"--out",
default=None,
help="JSON report path. Defaults to reports/backtest-<timestamp>.json.",
)
parser.add_argument("--workers", type=int, default=None, help="Override backtest worker count.")
parser.add_argument(
"--allow-spawn",
action="store_true",
help="Allow spawn multiprocessing for offline CLI runs, useful on Windows.",
)
parser.add_argument("--quiet", action="store_true", help="Hide progress output.")
return parser.parse_args()
def _default_output_path() -> Path:
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
return Path("reports") / f"backtest-{stamp}.json"
def _pct(value: Any) -> str:
return "-" if value is None else f"{float(value):+.1f}%"
def _r(value: Any) -> str:
return "-" if value is None else f"{float(value):+.2f}R"
def _print_summary(report: dict) -> None:
qualified = report.get("overall_qualified") or {}
all_setups = report.get("overall_all") or {}
time_exit = {row.get("hold_days"): row for row in report.get("time_exit_sweep") or []}
hold_30 = time_exit.get(30) or {}
policies = {
row.get("policy"): row
for row in ((report.get("portfolio_sim") or {}).get("policies") or [])
}
hold_policy = policies.get("hold") or {}
print("")
print("Backtest summary")
print(f" candidates: {report.get('candidates')}")
print(f" qualified: {report.get('qualified')}")
print(f" all setups net avg R: {_r(all_setups.get('net_avg_r'))}")
print(f" qualified net avg R: {_r(qualified.get('net_avg_r'))}")
print(f" qualified total R: {_r(qualified.get('total_r'))}")
print(f" 30d hold net avg R: {_r(hold_30.get('net_avg_r'))}")
print(f" 30d hold total R: {_r(hold_30.get('total_r'))}")
if hold_policy:
print(f" hold CAGR: {_pct(hold_policy.get('cagr_pct'))}")
print(f" hold max drawdown: {_pct(hold_policy.get('max_drawdown_pct'))}")
print(f" hold Sharpe: {hold_policy.get('sharpe')}")
print(f" hold trades: {hold_policy.get('trades')}")
async def _main() -> None:
args = _parse_args()
snapshot = Path(args.snapshot)
if not snapshot.exists():
raise SystemExit(f"Snapshot not found: {snapshot}")
os.environ["BACKTEST_SNAPSHOT_OFFLINE"] = "1"
if args.allow_spawn:
os.environ["BACKTEST_ALLOW_SPAWN"] = "1"
from app.config import settings
from app.services.backtest_service import run_backtest
if args.workers is not None:
settings.backtest_workers = args.workers
output = Path(args.out) if args.out else _default_output_path()
output.parent.mkdir(parents=True, exist_ok=True)
engine = create_async_engine(_sqlite_url(snapshot), pool_pre_ping=True)
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
last_progress: tuple[int, int] | None = None
def progress(done: int, total: int, symbol: str) -> None:
nonlocal last_progress
if args.quiet:
return
marker = (done, total)
if marker == last_progress:
return
last_progress = marker
label = f" {symbol}" if symbol else ""
print(f"progress: {done}/{total}{label}", end="\r")
try:
async with Session() as db:
report = await run_backtest(db, progress_cb=progress)
finally:
await engine.dispose()
if not args.quiet:
print("")
with output.open("w", encoding="utf-8") as fh:
json.dump(report, fh, indent=2)
fh.write("\n")
print(f"Report written: {output}")
_print_summary(report)
if __name__ == "__main__":
asyncio.run(_main())
+78
View File
@@ -6,6 +6,7 @@ from datetime import date, datetime, timedelta, timezone
from types import SimpleNamespace
import pytest
from sqlalchemy import select
from app.models.alert import AlertLog
from app.models.ohlcv import OHLCVRecord
@@ -113,6 +114,26 @@ async def test_score_drop_seeds_then_alerts(session):
assert await svc._watermark(session, "AAA") == 60.0
def test_format_qualified_includes_current_price_and_target_move():
text = svc._format_qualified({
"symbol": "AAPL",
"direction": "long",
"current_price": 196.42,
"entry_price": 195.80,
"target": 207.50,
"stop_loss": 190.20,
"rr_ratio": 2.1,
"confidence_score": 76.0,
"targets": [{"probability": 63.0}],
})
assert "now 196.42" in text
assert "entry 195.80" in text
assert "target 207.50 (+5.6%)" in text
assert "stop 190.20" in text
assert "P(target) 63%" in text
async def _add_ticker(session, symbol: str, *, watchlisted: bool, close: float,
levels: list[tuple[float, str, int]]) -> int:
user = await session.get(User, 1)
@@ -142,6 +163,8 @@ async def test_sr_proximity_merges_close_levels_to_one_alert(session):
cvx = [m for m in msgs if "CVX" in m[1]]
assert len(cvx) == 1
assert "183.00185.00" in cvx[0][1]
assert "now 182.00 -> 183.00185.00 (+0.5%)" in cvx[0][1]
assert "strength 100" in cvx[0][1]
async def test_sr_proximity_skips_non_watchlist_unqualified(session):
@@ -172,6 +195,61 @@ async def test_dispatch_no_credentials(session):
assert res["status"] == "no_credentials"
async def test_dispatch_bundles_discovery_alerts_and_logs_each_item(session, monkeypatch):
async def fake_collect_qualified(_db):
return [
("qualified:AAPL:long", "🟢 <b>AAPL LONG</b> | now 196.42 | target 207.50 (+5.6%)"),
("qualified:TSLA:short", "🔴 <b>TSLA SHORT</b> | now 292.10 | target 276.50 (-5.3%)"),
]
async def fake_collect_sr(_db):
return [
("sr:MSFT:resistance", "📍 <b>MSFT</b> resistance | now 508.20 -> 512.00 (+0.7%)"),
]
sent: list[str] = []
async def fake_send(_client, _token, _chat_id, text):
sent.append(text)
monkeypatch.setattr(svc, "_collect_qualified", fake_collect_qualified)
monkeypatch.setattr(svc, "_collect_sr_proximity", fake_collect_sr)
monkeypatch.setattr(svc, "_send", fake_send)
await svc.update_alert_config(
session,
enabled=True,
bot_token="token",
telegram_chat_id="chat",
score_drop_enabled=False,
digest_enabled=False,
regime_quadrant_enabled=False,
trade_closed_enabled=False,
)
res = await svc.dispatch_alerts(session)
assert res == {"status": "ok", "sent": 1, "candidates": 3}
assert len(sent) == 1
assert "<b>Signal run</b> — 3 new alert(s)" in sent[0]
assert "<b>Qualified setups</b>" in sent[0]
assert "<b>Near support/resistance</b>" in sent[0]
assert "AAPL LONG" in sent[0]
assert "MSFT" in sent[0]
rows = (
await session.execute(
select(AlertLog.alert_type, AlertLog.dedup_key)
.order_by(AlertLog.alert_type, AlertLog.dedup_key)
)
).all()
assert rows == [
("qualified", "qualified:AAPL:long"),
("qualified", "qualified:TSLA:short"),
("sr_proximity", "sr:MSFT:resistance"),
]
async def _add_closed_trade(session, symbol: str, reason: str, *,
close: float = 110.0, closed_hours_ago: float = 1.0) -> None:
if await session.get(User, 1) is None:
+4 -3
View File
@@ -313,7 +313,8 @@ def _acand(
) -> dict:
"""Ablation candidate: meets_core mirrors the default floors (min_rr 1.2,
min_confidence 55, exclude_neutral on)."""
meets = rr >= 1.2 and conf >= 55.0 and action != "NEUTRAL"
action_dir = "long" if action.startswith("LONG") else "short" if action.startswith("SHORT") else "neutral"
meets = rr >= 1.2 and conf >= 55.0 and action_dir != "neutral" and action_dir == direction
return {
"rr": rr,
"confidence": conf,
@@ -347,7 +348,7 @@ class TestGateAblation:
_acand(rr=1.0), # fails R:R floor
_acand(action="NEUTRAL"), # fails NEUTRAL exclusion
_acand(mp=50.0), # fails the momentum cutoff
_acand(direction="short", mp=95.0), # short — gated out
_acand(direction="short", action="SHORT_MODERATE", mp=95.0), # short — gated out
]
rows = {r["variant"]: r for r in bt._gate_ablation(cands, self.ACTIVATION, 80.0)}
assert rows["all_floors"]["total"] == 1
@@ -364,7 +365,7 @@ class TestGateAblation:
def test_threshold_zero_disables_momentum_gate(self):
# Floors only: the short and the low-momentum long both pass all_floors.
cands = [_acand(mp=50.0), _acand(direction="short", mp=None)]
cands = [_acand(mp=50.0), _acand(direction="short", action="SHORT_MODERATE", mp=None)]
rows = {r["variant"]: r for r in bt._gate_ablation(cands, self.ACTIVATION, 0.0)}
assert rows["all_floors"]["total"] == 2
+10
View File
@@ -31,6 +31,7 @@ STRICT_GATE = {
def _setup(**kwargs):
base = dict(
direction="long",
rr_ratio=3.0,
confidence_score=80.0,
recommended_action="LONG_HIGH",
@@ -124,6 +125,15 @@ class TestExcludeNeutral:
def test_directional_passes_when_on(self):
assert setup_qualifies(_setup(recommended_action="LONG_MODERATE"), NEUTRAL_GATE) is True
def test_opposing_short_action_fails_for_long_setup(self):
assert setup_qualifies(_setup(direction="long", recommended_action="SHORT_MODERATE"), NEUTRAL_GATE) is False
def test_matching_short_action_still_fails_long_only_momentum_gate(self):
assert setup_qualifies(
_setup(direction="short", recommended_action="SHORT_MODERATE", momentum_percentile=95.0),
{**NEUTRAL_GATE, "min_momentum_percentile": 80.0},
) is False
def test_neutral_allowed_when_off(self):
# Flag absent from the config → NEUTRAL still qualifies (backward compatible).
assert setup_qualifies(_setup(recommended_action="NEUTRAL"), DEFAULT_GATE) is True
+396 -1
View File
@@ -11,20 +11,29 @@ Zero-candidate and single-candidate scenarios must produce identical results.
from __future__ import annotations
import json
from datetime import date, datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from hypothesis import given, settings, HealthCheck, strategies as st
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ohlcv import OHLCVRecord
from app.models.signal_context_snapshot import SignalContextSnapshot
from app.models.sr_level import SRLevel
from app.models.ticker import Ticker
from app.models.trade_setup import TradeSetup
from app.models.score import CompositeScore
from app.models.score import CompositeScore, DimensionScore
from app.models.sentiment import SentimentScore
from app.services.rr_scanner_service import scan_ticker, get_trade_setups
def _as_utc(value: datetime) -> datetime:
return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc)
# ---------------------------------------------------------------------------
# Session fixtures
# ---------------------------------------------------------------------------
@@ -431,3 +440,389 @@ async def test_get_trade_setups_sorting_rr_desc_composite_desc(db_session: Async
f"Expected symbol order ['SORTD', 'SORTC', 'SORTB', 'SORTA'], "
f"got {symbols}"
)
async def _seed_stale_setup_with_current_scores(db_session: AsyncSession) -> TradeSetup:
"""Stored setup frozen at scan time (conf 82, neutral) vs. current context
(bullish sentiment, composite 96) that yields live confidence 97."""
old_scan = datetime(2026, 7, 1, tzinfo=timezone.utc)
current = datetime(2026, 7, 3, tzinfo=timezone.utc)
old_reasoning = (
"LONG (high confidence): 82% with aligned signals "
"(technical=88, momentum=60, sentiment=neutral)."
)
ticker = Ticker(symbol="TTWO")
db_session.add(ticker)
await db_session.flush()
stale_setup = TradeSetup(
ticker_id=ticker.id,
direction="long",
entry_price=235.0,
stop_loss=220.0,
target=265.0,
rr_ratio=2.0,
composite_score=71.8,
detected_at=old_scan,
confidence_score=82.0,
recommended_action="LONG_HIGH",
reasoning=old_reasoning,
risk_level="High",
)
db_session.add(stale_setup)
db_session.add_all([
DimensionScore(
ticker_id=ticker.id,
dimension="technical",
score=88.0,
is_stale=False,
computed_at=current,
),
DimensionScore(
ticker_id=ticker.id,
dimension="momentum",
score=60.0,
is_stale=False,
computed_at=current,
),
DimensionScore(
ticker_id=ticker.id,
dimension="fundamental",
score=95.0,
is_stale=False,
computed_at=current,
),
DimensionScore(
ticker_id=ticker.id,
dimension="sentiment",
score=85.0,
is_stale=False,
computed_at=current,
),
CompositeScore(
ticker_id=ticker.id,
score=96.0,
is_stale=False,
weights_json="{}",
computed_at=current,
),
SentimentScore(
ticker_id=ticker.id,
classification="bullish",
confidence=85,
source="test",
timestamp=current,
reasoning="",
citations_json="[]",
),
])
await db_session.flush()
return stale_setup
@pytest.mark.asyncio
async def test_live_recommendation_payload_uses_current_score_and_sentiment(
db_session: AsyncSession,
):
"""Latest setup payload should not show stale scan text when score context moved."""
stale_setup = await _seed_stale_setup_with_current_scores(db_session)
old_reasoning = stale_setup.reasoning
rows = await get_trade_setups(
db_session,
symbol="TTWO",
live_recommendation=True,
)
assert len(rows) == 1
row = rows[0]
assert row["composite_score"] == pytest.approx(96.0)
assert row["confidence_score"] == pytest.approx(97.0)
assert row["recommended_action"] == "LONG_HIGH"
assert "sentiment=bullish" in row["reasoning"]
assert "sentiment=neutral" not in row["reasoning"]
persisted = await db_session.get(TradeSetup, stale_setup.id)
assert persisted is not None
assert persisted.composite_score == pytest.approx(71.8)
assert persisted.reasoning == old_reasoning
@pytest.mark.asyncio
async def test_live_recommendation_filters_apply_to_live_values(
db_session: AsyncSession,
):
"""min_confidence must judge the overlaid live confidence, not the stored one."""
await _seed_stale_setup_with_current_scores(db_session)
# Stored confidence is 82 — a stored-column filter would drop this row.
# Live confidence is 97, so it must pass.
rows = await get_trade_setups(
db_session,
symbol="TTWO",
min_confidence=90.0,
live_recommendation=True,
)
assert len(rows) == 1
assert rows[0]["confidence_score"] == pytest.approx(97.0)
# And a floor above the live value must drop it.
rows = await get_trade_setups(
db_session,
symbol="TTWO",
min_confidence=98.0,
live_recommendation=True,
)
assert rows == []
async def _seed_two_direction_setup(db_session: AsyncSession) -> None:
current = datetime(2026, 7, 3, tzinfo=timezone.utc)
ticker = Ticker(symbol="BOTH")
db_session.add(ticker)
await db_session.flush()
db_session.add_all([
TradeSetup(
ticker_id=ticker.id,
direction="long",
entry_price=100.0,
stop_loss=95.0,
target=112.0,
rr_ratio=2.4,
composite_score=30.0,
detected_at=current,
confidence_score=25.0,
recommended_action="NEUTRAL",
risk_level="Low",
),
TradeSetup(
ticker_id=ticker.id,
direction="short",
entry_price=100.0,
stop_loss=105.0,
target=88.0,
rr_ratio=2.4,
composite_score=30.0,
detected_at=current,
confidence_score=90.0,
recommended_action="SHORT_HIGH",
risk_level="Low",
),
DimensionScore(
ticker_id=ticker.id,
dimension="technical",
score=10.0,
is_stale=False,
computed_at=current,
),
DimensionScore(
ticker_id=ticker.id,
dimension="momentum",
score=10.0,
is_stale=False,
computed_at=current,
),
DimensionScore(
ticker_id=ticker.id,
dimension="fundamental",
score=10.0,
is_stale=False,
computed_at=current,
),
CompositeScore(
ticker_id=ticker.id,
score=30.0,
is_stale=False,
weights_json="{}",
computed_at=current,
),
SentimentScore(
ticker_id=ticker.id,
classification="bearish",
confidence=90,
source="test",
timestamp=current,
reasoning="",
citations_json="[]",
),
])
await db_session.flush()
@pytest.mark.asyncio
async def test_live_recommendation_action_independent_of_direction_filter(
db_session: AsyncSession,
):
await _seed_two_direction_setup(db_session)
all_rows = await get_trade_setups(
db_session,
symbol="BOTH",
live_recommendation=True,
)
filtered_rows = await get_trade_setups(
db_session,
symbol="BOTH",
direction="long",
live_recommendation=True,
)
long_from_all = next(row for row in all_rows if row["direction"] == "long")
assert len(filtered_rows) == 1
assert long_from_all["recommended_action"] == "SHORT_HIGH"
assert filtered_rows[0]["recommended_action"] == "SHORT_HIGH"
@pytest.mark.asyncio
async def test_live_overlay_preserves_setup_specific_risk_and_context(
db_session: AsyncSession,
):
current = datetime(2026, 7, 3, tzinfo=timezone.utc)
ticker = Ticker(symbol="RISK")
db_session.add(ticker)
await db_session.flush()
db_session.add_all([
TradeSetup(
ticker_id=ticker.id,
direction="long",
entry_price=100.0,
stop_loss=95.0,
target=112.0,
rr_ratio=2.4,
composite_score=50.0,
detected_at=current,
confidence_score=50.0,
recommended_action="NEUTRAL",
risk_level="Medium",
conflict_flags_json=json.dumps([
"target-availability: Fewer than 3 valid S/R targets available"
]),
),
DimensionScore(
ticker_id=ticker.id,
dimension="technical",
score=50.0,
is_stale=False,
computed_at=current,
),
DimensionScore(
ticker_id=ticker.id,
dimension="momentum",
score=50.0,
is_stale=False,
computed_at=current,
),
CompositeScore(
ticker_id=ticker.id,
score=50.0,
is_stale=False,
weights_json="{}",
computed_at=current,
),
SentimentScore(
ticker_id=ticker.id,
classification="neutral",
confidence=50,
source="test",
timestamp=current,
reasoning="",
citations_json="[]",
),
OHLCVRecord(
ticker_id=ticker.id,
date=date(2026, 7, 3),
open=101.0,
high=102.0,
low=100.0,
close=101.0,
volume=1000,
created_at=current,
),
])
await db_session.flush()
rows = await get_trade_setups(
db_session,
symbol="RISK",
live_recommendation=True,
)
assert len(rows) == 1
row = rows[0]
assert row["risk_level"] == "Medium"
assert row["conflict_flags"] == [
"target-availability: Fewer than 3 valid S/R targets available"
]
assert row["current_price"] == pytest.approx(101.0)
assert _as_utc(row["context_as_of"]["score_computed_at"]) == current
assert _as_utc(row["context_as_of"]["sentiment_at"]) == current
assert row["context_as_of"]["price_date"] == date(2026, 7, 3)
assert _as_utc(row["context_as_of"]["price_updated_at"]) == current
@pytest.mark.asyncio
async def test_live_trade_setup_read_does_not_recompute_scores(db_session: AsyncSession):
await _seed_stale_setup_with_current_scores(db_session)
with patch(
"app.services.scoring_service.compute_all_dimensions",
new=AsyncMock(side_effect=AssertionError("GET must not recompute dimensions")),
), patch(
"app.services.scoring_service.compute_composite_score",
new=AsyncMock(side_effect=AssertionError("GET must not recompute composite")),
):
rows = await get_trade_setups(
db_session,
symbol="TTWO",
live_recommendation=True,
)
assert len(rows) == 1
@pytest.mark.asyncio
async def test_intraday_price_update_changes_live_price_without_new_signal_rows(
db_session: AsyncSession,
):
current = datetime(2026, 7, 3, tzinfo=timezone.utc)
ticker = Ticker(symbol="LIVEP")
db_session.add(ticker)
await db_session.flush()
setup = TradeSetup(
ticker_id=ticker.id,
direction="long",
entry_price=100.0,
stop_loss=95.0,
target=112.0,
rr_ratio=2.4,
composite_score=50.0,
detected_at=current,
)
price = OHLCVRecord(
ticker_id=ticker.id,
date=date(2026, 7, 3),
open=100.0,
high=101.0,
low=99.0,
close=100.0,
volume=1000,
created_at=current,
)
db_session.add_all([setup, price])
await db_session.flush()
rows = await get_trade_setups(db_session, symbol="LIVEP", live_recommendation=True)
assert rows[0]["current_price"] == pytest.approx(100.0)
price.close = 102.0
await db_session.flush()
rows = await get_trade_setups(db_session, symbol="LIVEP", live_recommendation=True)
assert rows[0]["current_price"] == pytest.approx(102.0)
setup_count = await db_session.scalar(select(func.count()).select_from(TradeSetup))
snapshot_count = await db_session.scalar(select(func.count()).select_from(SignalContextSnapshot))
assert setup_count == 1
assert snapshot_count == 0
+65 -140
View File
@@ -1,24 +1,24 @@
"""Unit tests for get_score composite breakdown and dimension breakdown wiring."""
"""Unit tests for read-only get_score composite breakdown wiring."""
from __future__ import annotations
from datetime import date
from types import SimpleNamespace
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import pytest
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.database import Base
from app.models.score import CompositeScore, DimensionScore
from app.models.ticker import Ticker
from app.services.scoring_service import get_score, _DIMENSION_COMPUTERS
from app.services.scoring_service import get_score
TEST_DATABASE_URL = "sqlite+aiosqlite://"
@pytest.fixture
async def fresh_db():
"""Provide a non-transactional session so get_score can commit."""
"""Provide a non-transactional session for persisted score reads."""
engine = create_async_engine(TEST_DATABASE_URL, echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@@ -30,176 +30,101 @@ async def fresh_db():
await engine.dispose()
def _make_ohlcv_records(n: int, base_close: float = 100.0) -> list:
"""Create n mock OHLCV records with realistic price data."""
records = []
for i in range(n):
price = base_close + (i * 0.5)
records.append(
SimpleNamespace(
date=date(2024, 1, 1),
open=price - 0.5,
high=price + 1.0,
low=price - 1.0,
close=price,
volume=1000000,
)
)
return records
def _mock_none_computer():
"""Return an AsyncMock that returns (None, None) — simulates missing dimension data."""
return AsyncMock(return_value=(None, None))
def _mock_score_computer(score: float, breakdown: dict | None = None):
"""Return an AsyncMock that returns a fixed (score, breakdown) tuple."""
bd = breakdown or {
"sub_scores": [{"name": "mock", "score": score, "weight": 1.0, "raw_value": score, "description": "mock"}],
"formula": "mock formula",
"unavailable": [],
}
return AsyncMock(return_value=(score, bd))
async def _seed_ticker(session: AsyncSession, symbol: str = "AAPL") -> Ticker:
"""Insert a ticker row and return it."""
ticker = Ticker(symbol=symbol)
session.add(ticker)
await session.commit()
return ticker
async def _seed_scores(session: AsyncSession, ticker: Ticker, *, stale: bool = False) -> None:
now = datetime(2026, 7, 3, tzinfo=timezone.utc)
session.add_all([
DimensionScore(
ticker_id=ticker.id,
dimension="technical",
score=70.0,
is_stale=stale,
computed_at=now,
),
DimensionScore(
ticker_id=ticker.id,
dimension="momentum",
score=60.0,
is_stale=False,
computed_at=now,
),
CompositeScore(
ticker_id=ticker.id,
score=66.0,
is_stale=stale,
weights_json="{}",
computed_at=now,
),
])
await session.commit()
@pytest.mark.asyncio
async def test_get_score_returns_composite_breakdown(fresh_db):
"""get_score should include a composite_breakdown dict with weights and re-normalization info."""
await _seed_ticker(fresh_db, "AAPL")
original = dict(_DIMENSION_COMPUTERS)
try:
_DIMENSION_COMPUTERS["technical"] = _mock_score_computer(70.0)
_DIMENSION_COMPUTERS["momentum"] = _mock_score_computer(60.0)
_DIMENSION_COMPUTERS["sentiment"] = _mock_none_computer()
_DIMENSION_COMPUTERS["fundamental"] = _mock_none_computer()
_DIMENSION_COMPUTERS["sr_quality"] = _mock_none_computer()
async def test_get_score_returns_composite_breakdown_without_recomputing(fresh_db):
ticker = await _seed_ticker(fresh_db, "AAPL")
await _seed_scores(fresh_db, ticker)
with patch(
"app.services.scoring_service.compute_dimension_score",
new=AsyncMock(side_effect=AssertionError("GET must not recompute dimensions")),
), patch(
"app.services.scoring_service.compute_composite_score",
new=AsyncMock(side_effect=AssertionError("GET must not recompute composite")),
):
result = await get_score(fresh_db, "AAPL")
finally:
_DIMENSION_COMPUTERS.update(original)
assert "composite_breakdown" in result
assert result["composite_score"] == 66.0
cb = result["composite_breakdown"]
assert cb is not None
assert "weights" in cb
assert "available_dimensions" in cb
assert "missing_dimensions" in cb
assert "renormalized_weights" in cb
assert "formula" in cb
@pytest.mark.asyncio
async def test_get_score_composite_breakdown_has_correct_available_missing(fresh_db):
"""Composite breakdown should correctly list available and missing dimensions."""
await _seed_ticker(fresh_db, "AAPL")
original = dict(_DIMENSION_COMPUTERS)
try:
_DIMENSION_COMPUTERS["technical"] = _mock_score_computer(70.0)
_DIMENSION_COMPUTERS["momentum"] = _mock_score_computer(60.0)
_DIMENSION_COMPUTERS["sentiment"] = _mock_none_computer()
_DIMENSION_COMPUTERS["fundamental"] = _mock_none_computer()
_DIMENSION_COMPUTERS["sr_quality"] = _mock_none_computer()
result = await get_score(fresh_db, "AAPL")
finally:
_DIMENSION_COMPUTERS.update(original)
cb = result["composite_breakdown"]
assert "technical" in cb["available_dimensions"]
assert "momentum" in cb["available_dimensions"]
assert "sentiment" in cb["missing_dimensions"]
assert "fundamental" in cb["missing_dimensions"]
assert "sr_quality" in cb["missing_dimensions"]
@pytest.mark.asyncio
async def test_get_score_renormalized_weights_sum_to_one(fresh_db):
"""Re-normalized weights should sum to 1.0 when at least one dimension is available."""
await _seed_ticker(fresh_db, "AAPL")
original = dict(_DIMENSION_COMPUTERS)
try:
_DIMENSION_COMPUTERS["technical"] = _mock_score_computer(70.0)
_DIMENSION_COMPUTERS["momentum"] = _mock_score_computer(60.0)
_DIMENSION_COMPUTERS["sentiment"] = _mock_none_computer()
_DIMENSION_COMPUTERS["fundamental"] = _mock_none_computer()
_DIMENSION_COMPUTERS["sr_quality"] = _mock_none_computer()
result = await get_score(fresh_db, "AAPL")
finally:
_DIMENSION_COMPUTERS.update(original)
cb = result["composite_breakdown"]
assert cb["renormalized_weights"]
total = sum(cb["renormalized_weights"].values())
assert abs(total - 1.0) < 1e-9
assert abs(sum(cb["renormalized_weights"].values()) - 1.0) < 1e-9
@pytest.mark.asyncio
async def test_get_score_dimensions_include_breakdowns(fresh_db):
"""Each available dimension entry should include a breakdown dict."""
await _seed_ticker(fresh_db, "AAPL")
async def test_get_score_dimensions_do_not_recompute_breakdowns(fresh_db):
ticker = await _seed_ticker(fresh_db, "AAPL")
await _seed_scores(fresh_db, ticker)
tech_breakdown = {
"sub_scores": [
{"name": "ADX", "score": 72.0, "weight": 0.4, "raw_value": 72.0, "description": "ADX value"},
{"name": "EMA", "score": 65.0, "weight": 0.3, "raw_value": 1.5, "description": "EMA diff"},
{"name": "RSI", "score": 62.0, "weight": 0.3, "raw_value": 62.0, "description": "RSI value"},
],
"formula": "Weighted average: 0.4*ADX + 0.3*EMA + 0.3*RSI",
"unavailable": [],
}
original = dict(_DIMENSION_COMPUTERS)
try:
_DIMENSION_COMPUTERS["technical"] = _mock_score_computer(68.2, tech_breakdown)
_DIMENSION_COMPUTERS["momentum"] = _mock_score_computer(55.0)
_DIMENSION_COMPUTERS["sentiment"] = _mock_none_computer()
_DIMENSION_COMPUTERS["fundamental"] = _mock_none_computer()
_DIMENSION_COMPUTERS["sr_quality"] = _mock_none_computer()
result = await get_score(fresh_db, "AAPL")
finally:
_DIMENSION_COMPUTERS.update(original)
result = await get_score(fresh_db, "AAPL")
tech_dim = next((d for d in result["dimensions"] if d["dimension"] == "technical"), None)
assert tech_dim is not None
assert "breakdown" in tech_dim
assert tech_dim["breakdown"] is not None
assert len(tech_dim["breakdown"]["sub_scores"]) == 3
names = [s["name"] for s in tech_dim["breakdown"]["sub_scores"]]
assert "ADX" in names
assert "EMA" in names
assert "RSI" in names
assert tech_dim["breakdown"] is None
@pytest.mark.asyncio
async def test_get_score_all_dimensions_missing(fresh_db):
"""When all dimensions return None, composite_breakdown should list all as missing."""
await _seed_ticker(fresh_db, "AAPL")
original = dict(_DIMENSION_COMPUTERS)
try:
for dim in _DIMENSION_COMPUTERS:
_DIMENSION_COMPUTERS[dim] = _mock_none_computer()
result = await get_score(fresh_db, "AAPL")
finally:
_DIMENSION_COMPUTERS.update(original)
result = await get_score(fresh_db, "AAPL")
cb = result["composite_breakdown"]
assert cb["available_dimensions"] == []
assert len(cb["missing_dimensions"]) == 5
assert cb["renormalized_weights"] == {}
assert result["composite_score"] is None
@pytest.mark.asyncio
async def test_get_score_reports_stale_without_refreshing(fresh_db):
ticker = await _seed_ticker(fresh_db, "AAPL")
await _seed_scores(fresh_db, ticker, stale=True)
result = await get_score(fresh_db, "AAPL")
assert result["composite_stale"] is True
assert "technical" in result["missing_dimensions"]
tech_dim = next((d for d in result["dimensions"] if d["dimension"] == "technical"), None)
assert tech_dim is not None
assert tech_dim["is_stale"] is True
+12 -26
View File
@@ -1,5 +1,4 @@
"""Unit tests for get_rankings: bulk-load fast path, sorting, exclusion, and
lazy recompute of stale scores."""
"""Unit tests for read-only get_rankings: bulk-load, sorting, and staleness."""
from __future__ import annotations
@@ -7,7 +6,6 @@ from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import pytest
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.database import Base
@@ -20,7 +18,7 @@ TEST_DATABASE_URL = "sqlite+aiosqlite://"
@pytest.fixture
async def fresh_db():
"""Non-transactional session so get_rankings can commit recomputes."""
"""Non-transactional session for persisted ranking reads."""
engine = create_async_engine(TEST_DATABASE_URL, echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@@ -84,46 +82,34 @@ async def test_fast_path_sorts_and_does_not_recompute(fresh_db: AsyncSession):
@pytest.mark.asyncio
async def test_ticker_without_computable_composite_is_excluded(fresh_db: AsyncSession):
"""A ticker whose composite can't be computed (recompute yields no row) is
omitted from the rankings rather than appearing with a null score."""
"""A ticker without a persisted composite is omitted from rankings."""
fresh = await _seed_ticker(fresh_db, "OK")
await _seed_ticker(fresh_db, "NONE") # no composite; recompute can't make one
await _seed_ticker(fresh_db, "NONE")
fresh_db.add_all([_composite(fresh.id, 50.0), _dimension(fresh.id, "technical", 50.0)])
await fresh_db.commit()
# Recompute is a no-op that produces no composite row for NONE.
with patch("app.services.scoring_service.compute_dimension_score",
new=AsyncMock(return_value=None)), \
new=AsyncMock(side_effect=AssertionError("should not recompute"))), \
patch("app.services.scoring_service.compute_composite_score",
new=AsyncMock(return_value=(None, ["technical"]))):
new=AsyncMock(side_effect=AssertionError("should not recompute"))):
result = await get_rankings(fresh_db)
assert [r["symbol"] for r in result["rankings"]] == ["OK"]
@pytest.mark.asyncio
async def test_stale_composite_is_recomputed(fresh_db: AsyncSession):
"""A stale composite triggers a recompute and then appears in the rankings."""
async def test_stale_composite_is_reported_without_recompute(fresh_db: AsyncSession):
"""A stale composite appears with its stale flag and is not recomputed."""
ticker = await _seed_ticker(fresh_db, "STALE")
fresh_db.add(_composite(ticker.id, 10.0, stale=True))
await fresh_db.commit()
async def _fake_recompute(db, symbol, weights=None):
# Mirror the real upsert: refresh the existing row in place.
existing = (await db.execute(
select(CompositeScore).where(CompositeScore.ticker_id == ticker.id)
)).scalar_one()
existing.score = 77.0
existing.is_stale = False
return 77.0, []
# Dimension recompute is a no-op; composite recompute refreshes the score.
with patch("app.services.scoring_service.compute_dimension_score",
new=AsyncMock(return_value=55.0)), \
new=AsyncMock(side_effect=AssertionError("should not recompute"))), \
patch("app.services.scoring_service.compute_composite_score",
new=AsyncMock(side_effect=_fake_recompute)) as comp_mock:
new=AsyncMock(side_effect=AssertionError("should not recompute"))):
result = await get_rankings(fresh_db)
comp_mock.assert_awaited() # recompute path was taken
assert [r["symbol"] for r in result["rankings"]] == ["STALE"]
assert result["rankings"][0]["composite_score"] == 77.0 # reflects the recompute
assert result["rankings"][0]["composite_score"] == 10.0
assert result["rankings"][0]["composite_stale"] is True