Add multi-factor conviction gate to activation
Deploy / lint (push) Successful in 8s
Deploy / test (push) Successful in 35s
Deploy / deploy (push) Successful in 26s

Make "qualified" mean an edge candidate, not just R:R + confidence.
The gate now also requires (all admin-configurable, defaults on):
- high conviction: recommended_action LONG_HIGH / SHORT_HIGH only
- clean read: risk_level Low (no contradicting signals)
- probable primary target: best target probability >= min (default 60)

- Shared predicate: app/services/qualification.py +
  frontend/src/lib/qualification.ts (mirrored)
- Activation config extended (min_target_probability,
  require_high_conviction, exclude_conflicts) with bool-aware
  get/update + validation
- /trades/performance switched to ?qualified_only=true, applying
  the full gate server-side; confidence breakdown stays unfiltered
- Dashboard "Qualified", Signals "Qualified only" toggle, and
  Track Record all use the one gate; Admin gains the new controls

Sentiment provider runtime config (prior change) included.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-06-13 11:50:42 +02:00
parent 6da65b8d8f
commit d53ed972d1
25 changed files with 924 additions and 110 deletions
+45 -27
View File
@@ -31,14 +31,29 @@ RECOMMENDATION_CONFIG_DEFAULTS: dict[str, float] = {
DEFAULT_TICKER_UNIVERSE = "sp500"
SUPPORTED_TICKER_UNIVERSES = {"sp500", "nasdaq100", "nasdaq_all"}
# Activation thresholds: what counts as a signal worth acting on.
# Used as Signals-page default filters, the Dashboard's qualified-setup
# metrics, and the Track Record's "qualified only" view. The outcome
# evaluator deliberately ignores these — every setup gets evaluated so the
# thresholds themselves can be validated against outcomes.
ACTIVATION_DEFAULTS: dict[str, float] = {
"activation_min_rr": 2.0,
"activation_min_confidence": 70.0,
# Activation gate: what counts as a signal worth acting on. Used by the
# Dashboard's "Qualified" metric, the Signals "Qualified only" view, and the
# Track Record's qualified stats. The outcome evaluator deliberately ignores
# these — every setup is evaluated so the gate itself can be validated.
#
# Beyond raw R:R and confidence, the gate demands conviction: a high-conviction
# action (LONG_HIGH / SHORT_HIGH), a clean read (risk Low / no conflicts), and a
# probable primary target.
_ACTIVATION_FLOAT_KEYS: dict[str, str] = {
"min_rr": "activation_min_rr",
"min_confidence": "activation_min_confidence",
"min_target_probability": "activation_min_target_probability",
}
_ACTIVATION_BOOL_KEYS: dict[str, str] = {
"require_high_conviction": "activation_require_high_conviction",
"exclude_conflicts": "activation_exclude_conflicts",
}
ACTIVATION_DEFAULTS: dict[str, float | bool] = {
"min_rr": 2.0,
"min_confidence": 70.0,
"min_target_probability": 60.0,
"require_high_conviction": True,
"exclude_conflicts": True,
}
@@ -157,40 +172,43 @@ async def update_setting(db: AsyncSession, key: str, value: str) -> SystemSettin
# Activation thresholds
# ---------------------------------------------------------------------------
async def get_activation_config(db: AsyncSession) -> dict[str, float]:
"""Return activation thresholds with public keys (min_rr, min_confidence)."""
async def get_activation_config(db: AsyncSession) -> dict[str, float | bool]:
"""Return the activation gate config with public keys."""
result = await db.execute(
select(SystemSetting).where(SystemSetting.key.like("activation_%"))
)
config = dict(ACTIVATION_DEFAULTS)
for setting in result.scalars().all():
if setting.key in config:
stored = {s.key: s.value for s in result.scalars().all()}
config: dict[str, float | bool] = dict(ACTIVATION_DEFAULTS)
for public_key, storage_key in _ACTIVATION_FLOAT_KEYS.items():
if storage_key in stored:
try:
config[setting.key] = float(setting.value)
config[public_key] = float(stored[storage_key])
except (TypeError, ValueError):
pass
return {
"min_rr": config["activation_min_rr"],
"min_confidence": config["activation_min_confidence"],
}
for public_key, storage_key in _ACTIVATION_BOOL_KEYS.items():
if storage_key in stored:
config[public_key] = str(stored[storage_key]).strip().lower() == "true"
return config
async def update_activation_config(
db: AsyncSession, updates: dict[str, float]
) -> dict[str, float]:
"""Update activation thresholds. Accepts public keys min_rr / min_confidence."""
db: AsyncSession, updates: dict[str, float | bool]
) -> dict[str, float | bool]:
"""Update the activation gate. Accepts public keys; only supplied keys change."""
if "min_rr" in updates and updates["min_rr"] < 0:
raise ValidationError("min_rr must be >= 0")
if "min_confidence" in updates and not 0 <= updates["min_confidence"] <= 100:
raise ValidationError("min_confidence must be between 0 and 100")
if "min_target_probability" in updates and not 0 <= updates["min_target_probability"] <= 100:
raise ValidationError("min_target_probability must be between 0 and 100")
key_map = {
"min_rr": "activation_min_rr",
"min_confidence": "activation_min_confidence",
}
for public_key, storage_key in key_map.items():
if public_key in updates:
for public_key, storage_key in _ACTIVATION_FLOAT_KEYS.items():
if public_key in updates and updates[public_key] is not None:
await update_setting(db, storage_key, str(float(updates[public_key])))
for public_key, storage_key in _ACTIVATION_BOOL_KEYS.items():
if public_key in updates and updates[public_key] is not None:
await update_setting(db, storage_key, "true" if updates[public_key] else "false")
return await get_activation_config(db)
+10 -13
View File
@@ -23,6 +23,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ohlcv import OHLCVRecord
from app.models.trade_setup import TradeSetup
from app.services.qualification import setup_qualifies
logger = logging.getLogger(__name__)
@@ -180,8 +181,7 @@ def _confidence_bucket(score: float | None) -> str | None:
async def get_performance_stats(
db: AsyncSession,
min_rr: float | None = None,
min_confidence: float | None = None,
config: dict | None = None,
) -> dict:
"""Aggregate outcome statistics over all evaluated trade setups.
@@ -189,9 +189,10 @@ async def get_performance_stats(
loss = -1R, expired = 0R). A positive avg_r means the signals have
been profitable on a risk-adjusted basis.
min_rr / min_confidence filter the overall, direction and action
breakdowns. The confidence breakdown deliberately stays unfiltered:
it is the instrument for validating the thresholds themselves.
When ``config`` (an activation-gate dict) is supplied, the overall,
direction and action breakdowns cover only qualified setups. The
confidence breakdown deliberately stays unfiltered: it is the
instrument for validating the gate itself.
"""
result = await db.execute(
select(TradeSetup).where(TradeSetup.actual_outcome.is_not(None))
@@ -203,14 +204,10 @@ async def get_performance_stats(
)
pending_count = len(pending_result.scalars().all())
def qualifies(setup: TradeSetup) -> bool:
if min_rr is not None and setup.rr_ratio < min_rr:
return False
if min_confidence is not None and (setup.confidence_score or 0.0) < min_confidence:
return False
return True
qualified = [s for s in evaluated if qualifies(s)]
if config is not None:
qualified = [s for s in evaluated if setup_qualifies(s, config)]
else:
qualified = evaluated
by_direction: dict[str, list[TradeSetup]] = {}
by_action: dict[str, list[TradeSetup]] = {}
+42
View File
@@ -0,0 +1,42 @@
"""Shared definition of a 'qualified' (actionable) trade setup.
A single predicate, driven by the admin activation config, used by the
performance stats (server) and mirrored on the frontend. Beyond raw R:R and
confidence, an actionable setup must show genuine conviction: a high-conviction
recommended action, a clean (conflict-free) read, and a probable primary target.
"""
from __future__ import annotations
from typing import Any
HIGH_CONVICTION_ACTIONS = {"LONG_HIGH", "SHORT_HIGH"}
def best_target_probability(setup: Any) -> float:
"""Highest probability among a setup's targets, 0 if none."""
targets = getattr(setup, "targets", None) or []
probs = [float(t.get("probability", 0.0)) for t in targets if isinstance(t, dict)]
return max(probs, default=0.0)
def setup_qualifies(setup: Any, config: dict) -> bool:
"""Whether a setup clears the activation gate.
``setup`` is duck-typed: any object exposing rr_ratio, confidence_score,
recommended_action, risk_level and a ``targets`` list of dicts.
"""
if setup.rr_ratio < config["min_rr"]:
return False
if (setup.confidence_score or 0.0) < config["min_confidence"]:
return False
if config.get("require_high_conviction"):
if (setup.recommended_action or "") not in HIGH_CONVICTION_ACTIONS:
return False
if config.get("exclude_conflicts"):
if (setup.risk_level or "") != "Low":
return False
min_tp = float(config.get("min_target_probability", 0.0))
if min_tp > 0 and best_target_probability(setup) < min_tp:
return False
return True
+164
View File
@@ -0,0 +1,164 @@
"""Runtime-configurable sentiment provider.
Lets an admin switch the sentiment LLM (provider, model, API key) at runtime
via SystemSetting, without a redeploy. Precedence for each value:
DB setting > environment variable > hardcoded default.
The API key is write-only: it is persisted but never returned through any
read path. Reads report only whether a key is configured and its source.
"""
from __future__ import annotations
import logging
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.exceptions import ProviderError, ValidationError
from app.models.settings import SystemSetting
from app.services.admin_service import update_setting
logger = logging.getLogger(__name__)
VALID_PROVIDERS = {"openai", "gemini"}
PROVIDER_DEFAULT_MODELS: dict[str, str] = {
"openai": "gpt-4o-mini",
"gemini": "gemini-2.0-flash",
}
# SystemSetting keys
KEY_PROVIDER = "sentiment_provider"
KEY_MODEL = "sentiment_model"
KEY_API_KEY = "sentiment_api_key"
async def _get_settings_map(db: AsyncSession) -> dict[str, str]:
result = await db.execute(
select(SystemSetting).where(
SystemSetting.key.in_([KEY_PROVIDER, KEY_MODEL, KEY_API_KEY])
)
)
return {s.key: s.value for s in result.scalars().all()}
def _env_key_for(provider: str) -> str:
if provider == "openai":
return settings.openai_api_key or ""
if provider == "gemini":
return settings.gemini_api_key or ""
return ""
def _env_model_for(provider: str) -> str:
if provider == "openai":
return settings.openai_model or PROVIDER_DEFAULT_MODELS["openai"]
if provider == "gemini":
return settings.gemini_model or PROVIDER_DEFAULT_MODELS["gemini"]
return PROVIDER_DEFAULT_MODELS.get(provider, "")
async def _resolve(db: AsyncSession) -> tuple[str, str, str, str]:
"""Resolve (provider, model, api_key, api_key_source) from DB > env > default."""
stored = await _get_settings_map(db)
provider = (stored.get(KEY_PROVIDER) or "").strip().lower()
if provider not in VALID_PROVIDERS:
# Default to whichever env key is present, else openai
provider = "openai" if settings.openai_api_key or not settings.gemini_api_key else "gemini"
model = (stored.get(KEY_MODEL) or "").strip() or _env_model_for(provider)
db_key = (stored.get(KEY_API_KEY) or "").strip()
if db_key:
return provider, model, db_key, "database"
env_key = _env_key_for(provider)
if env_key:
return provider, model, env_key, "environment"
return provider, model, "", "none"
async def get_sentiment_config(db: AsyncSession) -> dict:
"""Public config — never includes the raw API key."""
provider, model, api_key, source = await _resolve(db)
return {
"provider": provider,
"model": model,
"api_key_configured": bool(api_key),
"api_key_source": source,
"valid_providers": sorted(VALID_PROVIDERS),
"default_models": PROVIDER_DEFAULT_MODELS,
}
async def update_sentiment_config(
db: AsyncSession,
provider: str | None = None,
model: str | None = None,
api_key: str | None = None,
) -> dict:
"""Persist provider/model/key. An empty/omitted api_key leaves the stored
key untouched (so saving other fields does not wipe credentials)."""
if provider is not None:
provider = provider.strip().lower()
if provider not in VALID_PROVIDERS:
raise ValidationError(
f"Unknown sentiment provider '{provider}'. Valid: {', '.join(sorted(VALID_PROVIDERS))}"
)
await update_setting(db, KEY_PROVIDER, provider)
if model is not None:
model = model.strip()
if model:
await update_setting(db, KEY_MODEL, model)
if api_key: # only overwrite when a non-empty key is supplied
await update_setting(db, KEY_API_KEY, api_key.strip())
return await get_sentiment_config(db)
async def build_sentiment_provider(db: AsyncSession):
"""Construct the active sentiment provider from current config.
Raises ProviderError if no API key is available for the chosen provider.
"""
provider, model, api_key, _source = await _resolve(db)
if not api_key:
raise ProviderError(f"No API key configured for sentiment provider '{provider}'")
if provider == "openai":
from app.providers.openai_sentiment import OpenAISentimentProvider
return OpenAISentimentProvider(api_key, model)
if provider == "gemini":
from app.providers.gemini_sentiment import GeminiSentimentProvider
return GeminiSentimentProvider(api_key, model)
raise ProviderError(f"Unsupported sentiment provider '{provider}'")
async def test_sentiment_provider(db: AsyncSession, ticker: str = "AAPL") -> dict:
"""Build the active provider and fetch one ticker as a live credentials check."""
provider, model, _key, _source = await _resolve(db)
try:
prov = await build_sentiment_provider(db)
data = await prov.fetch_sentiment(ticker.strip().upper())
return {
"ok": True,
"provider": provider,
"model": model,
"ticker": ticker.strip().upper(),
"classification": data.classification,
"confidence": data.confidence,
"reasoning": data.reasoning or None,
}
except Exception as exc:
logger.warning("Sentiment provider test failed: %s", exc)
return {
"ok": False,
"provider": provider,
"model": model,
"error": str(exc),
}