Files
signal-platform/app/services/recommendation_service.py
T
dennisthiessen e355368748
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 35s
Deploy / deploy (push) Successful in 23s
generate targets from S/R zones, not raw levels (consistency + strength)
Trade-setup targets now pre-merge near-duplicate S/R levels into zone
representatives (same 2% clusterer as chart + alerts) before generate_targets
runs. A clustered wall (e.g. 183 + 185) becomes one target carrying the zone's
COMBINED strength (capped 100) instead of two near-identical targets that each
undervalue the wall — which also feeds a more honest reach-probability via the
S/R-strength magnet. Representative price is the zone's near edge; the strongest
constituent's id is retained. Singleton levels pass through unchanged, so the
downstream band-spreading / probability / primary-selection pipeline and its
tests are untouched.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 10:20:15 +02:00

637 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import logging
import math
from types import SimpleNamespace
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.settings import SystemSetting
from app.models.sr_level import SRLevel
from app.models.ticker import Ticker
from app.models.trade_setup import TradeSetup
from app.services.sr_service import cluster_sr_zones
logger = logging.getLogger(__name__)
DEFAULT_RECOMMENDATION_CONFIG: dict[str, float] = {
"recommendation_high_confidence_threshold": 70.0,
"recommendation_moderate_confidence_threshold": 50.0,
"recommendation_confidence_diff_threshold": 20.0,
"recommendation_signal_alignment_weight": 0.15,
"recommendation_sr_strength_weight": 0.20,
"recommendation_momentum_technical_divergence_threshold": 30.0,
"recommendation_fundamental_technical_divergence_threshold": 40.0,
}
# Horizon (trading days) over which a target's reach-probability is estimated.
# Kept in step with the outcome evaluator's window so probability predicts the
# metric we actually measure.
_TARGET_HORIZON_DAYS = 30.0
# Distance bands (in ATR) used to spread targets across Conservative / Moderate
# / Aggressive. Aligned with where the touch-probability crosses 60% / 40% over
# the horizon, so a target from each band tends to land in the matching label.
_CONSERVATIVE_MAX_ATR = 2.9
_MODERATE_MAX_ATR = 4.6
# Merge S/R levels within this fraction into one zone before generating targets —
# the same tolerance the chart and alerts use, so S/R is one model app-wide.
_SR_ZONE_TOLERANCE = 0.02
def _clamp(value: float, low: float, high: float) -> float:
return max(low, min(high, value))
def _zone_representative_levels(sr_levels: list[SRLevel], entry_price: float) -> list[Any]:
"""Collapse near-duplicate S/R levels into one representative per zone.
Targets are generated from these representatives, so a clustered wall (e.g.
183 + 185) becomes a single target carrying the zone's COMBINED strength
(capped at 100) instead of two near-identical targets, each undervaluing the
wall. Same clusterer as the chart and alerts → one S/R model everywhere.
The representative price is the zone's near edge (the reachable side of the
wall) and it keeps the strongest constituent's id for reference. Singleton
levels pass through unchanged.
"""
if not sr_levels or entry_price <= 0:
return list(sr_levels)
level_dicts = [
{"price_level": float(lv.price_level), "strength": int(lv.strength), "type": lv.type}
for lv in sr_levels
]
zones = cluster_sr_zones(level_dicts, entry_price, tolerance=_SR_ZONE_TOLERANCE)
reps: list[Any] = []
for zone in zones:
constituents = [
lv for lv in sr_levels if zone["low"] <= float(lv.price_level) <= zone["high"]
]
if not constituents:
continue
strongest = max(constituents, key=lambda lv: lv.strength)
# Near edge: bottom of a resistance wall (above entry), top of a support
# wall (below entry) — the first price the move reaches.
near_edge = zone["low"] if zone["type"] == "resistance" else zone["high"]
reps.append(
SimpleNamespace(
id=int(strongest.id),
price_level=float(near_edge),
type=zone["type"],
strength=int(zone["strength"]),
)
)
return reps
def _norm_cdf(x: float) -> float:
"""Standard normal CDF via erf (no SciPy dependency)."""
return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0)))
def _classify_by_probability(probability: float) -> str:
"""Label a target by how likely it is to be reached (derived, not assumed)."""
if probability >= 60.0:
return "Conservative"
if probability >= 40.0:
return "Moderate"
return "Aggressive"
def _sentiment_value(sentiment_classification: str | None) -> str | None:
if sentiment_classification is None:
return None
return sentiment_classification.strip().lower()
def check_signal_alignment(
direction: str,
dimension_scores: dict[str, float],
sentiment_classification: str | None,
) -> tuple[bool, str]:
technical = float(dimension_scores.get("technical", 50.0))
momentum = float(dimension_scores.get("momentum", 50.0))
sentiment = _sentiment_value(sentiment_classification)
if direction == "long":
aligned_count = sum([
technical > 60,
momentum > 60,
sentiment == "bullish",
])
if aligned_count >= 2:
return True, "Technical, momentum, and/or sentiment align with LONG direction."
return False, "Signals are mixed for LONG direction."
aligned_count = sum([
technical < 40,
momentum < 40,
sentiment == "bearish",
])
if aligned_count >= 2:
return True, "Technical, momentum, and/or sentiment align with SHORT direction."
return False, "Signals are mixed for SHORT direction."
class SignalConflictDetector:
def detect_conflicts(
self,
dimension_scores: dict[str, float],
sentiment_classification: str | None,
config: dict[str, float] | None = None,
) -> list[str]:
cfg = config or DEFAULT_RECOMMENDATION_CONFIG
technical = float(dimension_scores.get("technical", 50.0))
momentum = float(dimension_scores.get("momentum", 50.0))
fundamental = float(dimension_scores.get("fundamental", 50.0))
sentiment = _sentiment_value(sentiment_classification)
mt_threshold = float(cfg.get("recommendation_momentum_technical_divergence_threshold", 30.0))
ft_threshold = float(cfg.get("recommendation_fundamental_technical_divergence_threshold", 40.0))
conflicts: list[str] = []
if sentiment == "bearish" and technical > 60:
conflicts.append(
f"sentiment-technical: Bearish sentiment conflicts with bullish technical ({technical:.0f})"
)
if sentiment == "bullish" and technical < 40:
conflicts.append(
f"sentiment-technical: Bullish sentiment conflicts with bearish technical ({technical:.0f})"
)
mt_diff = abs(momentum - technical)
if mt_diff > mt_threshold:
conflicts.append(
"momentum-technical: "
f"Momentum ({momentum:.0f}) diverges from technical ({technical:.0f}) by {mt_diff:.0f} points"
)
if sentiment == "bearish" and momentum > 60:
conflicts.append(
f"sentiment-momentum: Bearish sentiment conflicts with momentum ({momentum:.0f})"
)
if sentiment == "bullish" and momentum < 40:
conflicts.append(
f"sentiment-momentum: Bullish sentiment conflicts with momentum ({momentum:.0f})"
)
ft_diff = abs(fundamental - technical)
if ft_diff > ft_threshold:
conflicts.append(
"fundamental-technical: "
f"Fundamental ({fundamental:.0f}) diverges significantly from technical ({technical:.0f})"
)
return conflicts
class DirectionAnalyzer:
def calculate_confidence(
self,
direction: str,
dimension_scores: dict[str, float],
sentiment_classification: str | None,
conflicts: list[str] | None = None,
) -> float:
"""Directional-agreement confidence around a 50 baseline.
Each dimension contributes in proportion to how strongly it agrees
with the proposed direction: a bullish dimension RAISES long confidence
and LOWERS short confidence (and vice-versa). Signals that oppose the
direction push confidence below 50 — so a short on a strongly bullish
stock scores near zero, not 55.
"""
technical = float(dimension_scores.get("technical", 50.0))
momentum = float(dimension_scores.get("momentum", 50.0))
fundamental = float(dimension_scores.get("fundamental", 50.0))
sentiment = _sentiment_value(sentiment_classification)
dir_sign = 1.0 if direction == "long" else -1.0
def agree(score: float) -> float:
# -1 (fully against) .. +1 (fully for) the proposed direction
return ((score - 50.0) / 50.0) * dir_sign
sentiment_val = {"bullish": 1.0, "bearish": -1.0}.get(sentiment or "", 0.0)
sentiment_agree = sentiment_val * dir_sign
confidence = 50.0 + (
agree(technical) * 25.0
+ agree(momentum) * 20.0
+ sentiment_agree * 15.0
+ agree(fundamental) * 10.0
)
# Explicit conflict patterns trim a little more (the agreement terms
# already capture most disagreement, so penalties are modest).
for conflict in conflicts or []:
if "sentiment-technical" in conflict:
confidence -= 12.0
elif "momentum-technical" in conflict:
confidence -= 10.0
elif "sentiment-momentum" in conflict:
confidence -= 12.0
elif "fundamental-technical" in conflict:
confidence -= 6.0
return _clamp(confidence, 0.0, 100.0)
class TargetGenerator:
def generate_targets(
self,
direction: str,
entry_price: float,
stop_loss: float,
sr_levels: list[SRLevel],
atr_value: float,
) -> list[dict[str, Any]]:
if atr_value <= 0:
return []
risk = abs(entry_price - stop_loss)
if risk <= 0:
return []
candidates: list[dict[str, Any]] = []
atr_pct = atr_value / entry_price if entry_price > 0 else 0.0
max_atr_multiple: float | None = None
if atr_pct > 0.05:
max_atr_multiple = 10.0
elif atr_pct < 0.02:
max_atr_multiple = 3.0
for level in sr_levels:
is_candidate = False
if direction == "long":
is_candidate = level.type == "resistance" and level.price_level > entry_price
else:
is_candidate = level.type == "support" and level.price_level < entry_price
if not is_candidate:
continue
distance = abs(level.price_level - entry_price)
distance_atr_multiple = distance / atr_value
if distance_atr_multiple < 1.0:
continue
if max_atr_multiple is not None and distance_atr_multiple > max_atr_multiple:
continue
reward = abs(level.price_level - entry_price)
rr_ratio = reward / risk
norm_rr = min(rr_ratio / 10.0, 1.0)
norm_strength = _clamp(level.strength, 0, 100) / 100.0
norm_proximity = 1.0 - min(distance / entry_price, 1.0)
quality = 0.35 * norm_rr + 0.35 * norm_strength + 0.30 * norm_proximity
candidates.append(
{
"price": float(level.price_level),
"distance_from_entry": float(distance),
"distance_atr_multiple": float(distance_atr_multiple),
"rr_ratio": float(rr_ratio),
"classification": "Moderate",
"sr_level_id": int(level.id),
"sr_strength": float(level.strength),
"quality": float(quality),
}
)
if not candidates:
return []
# Select up to 5 targets that SPAN the distance range, instead of the
# top-5 by quality (which biases toward far, high-R:R levels and buries
# every nearby target). Guarantees the nearest level plus a
# representative from each distance band when they exist, so the table
# offers a real mix of Conservative / Moderate / Aggressive.
conservative = [c for c in candidates if c["distance_atr_multiple"] <= _CONSERVATIVE_MAX_ATR]
moderate = [c for c in candidates if _CONSERVATIVE_MAX_ATR < c["distance_atr_multiple"] <= _MODERATE_MAX_ATR]
aggressive = [c for c in candidates if c["distance_atr_multiple"] > _MODERATE_MAX_ATR]
selected: list[dict[str, Any]] = []
selected_ids: set[int] = set()
def _add(candidate: dict[str, Any] | None) -> None:
if candidate is not None and candidate["sr_level_id"] not in selected_ids:
selected.append(candidate)
selected_ids.add(candidate["sr_level_id"])
# Nearest overall (the most likely / Conservative anchor)
_add(min(candidates, key=lambda c: c["distance_atr_multiple"]))
# Best-quality representative from each band → spread across labels
for bucket in (conservative, moderate, aggressive):
if bucket:
_add(max(bucket, key=lambda c: c["quality"]))
# Fill remaining slots with the next-best by quality
for candidate in sorted(candidates, key=lambda c: c["quality"], reverse=True):
if len(selected) >= 5:
break
_add(candidate)
selected.sort(key=lambda row: row["distance_from_entry"])
for target in selected:
target.pop("quality", None)
return selected
class ProbabilityEstimator:
def estimate_probability(
self,
target: dict[str, Any],
dimension_scores: dict[str, float],
sentiment_classification: str | None,
direction: str,
config: dict[str, float],
) -> float:
"""Probability the target is reached within the outcome horizon.
Base = probability of price *touching* a level at the target's distance
within the evaluation window, under a driftless random walk (reflection
principle): 2·(1 Φ(d / (ATR·√T))). Distance is in ATR multiples and T
is the horizon in trading days, so a far target is inherently unlikely —
no more 90% on a +39% move. Strength and signal alignment (drift toward
the target) then modulate it modestly.
"""
strength = float(target.get("sr_strength", 50.0))
atr_multiple = float(target.get("distance_atr_multiple", 1.0))
expected_move_atr = math.sqrt(_TARGET_HORIZON_DAYS) # ≈ 5.48 ATR over 30d
z = atr_multiple / expected_move_atr if expected_move_atr > 0 else 99.0
touch_prob = 2.0 * (1.0 - _norm_cdf(z)) # 0..1
probability = touch_prob * 100.0
technical = float(dimension_scores.get("technical", 50.0))
momentum = float(dimension_scores.get("momentum", 50.0))
sentiment = _sentiment_value(sentiment_classification)
# Drift toward the target raises touch probability; against it lowers.
if direction == "long":
aligned = technical > 60 and (sentiment == "bullish" or momentum > 60)
opposed = technical < 40 or (sentiment == "bearish" and momentum < 40)
else:
aligned = technical < 40 and (sentiment == "bearish" or momentum < 40)
opposed = technical > 60 or (sentiment == "bullish" and momentum > 60)
sr_weight = float(config.get("recommendation_sr_strength_weight", 0.20))
signal_weight = float(config.get("recommendation_signal_alignment_weight", 0.15))
# Strength magnet: ±(sr_weight·50) at the extremes (±10 by default)
probability += ((strength - 50.0) / 50.0) * (sr_weight * 50.0)
# Alignment: ±(signal_weight·100) (±15 by default)
if aligned:
probability += signal_weight * 100.0
elif opposed:
probability -= signal_weight * 100.0
return round(_clamp(probability, 3.0, 95.0), 2)
signal_conflict_detector = SignalConflictDetector()
direction_analyzer = DirectionAnalyzer()
target_generator = TargetGenerator()
probability_estimator = ProbabilityEstimator()
async def get_recommendation_config(db: AsyncSession) -> dict[str, float]:
result = await db.execute(
select(SystemSetting).where(SystemSetting.key.like("recommendation_%"))
)
rows = result.scalars().all()
config: dict[str, float] = dict(DEFAULT_RECOMMENDATION_CONFIG)
for setting in rows:
try:
config[setting.key] = float(setting.value)
except (TypeError, ValueError):
logger.warning("Invalid recommendation setting value for %s: %s", setting.key, setting.value)
return config
def _risk_level_from_conflicts(conflicts: list[str]) -> str:
if not conflicts:
return "Low"
severe = [c for c in conflicts if "sentiment-technical" in c or "sentiment-momentum" in c]
if len(severe) >= 2 or len(conflicts) >= 3:
return "High"
return "Medium"
def _choose_recommended_action(
long_confidence: float,
short_confidence: float,
config: dict[str, float],
available_directions: set[str] | None = None,
) -> str:
"""Pick the ticker action — but only recommend a direction you can trade.
A direction is recommendable only if a tradeable setup exists for it
(``available_directions``). So a strong LONG bias on a stock at all-time
highs — where the scanner can build no long target — does NOT yield
LONG_HIGH; it falls through to NEUTRAL, and the reasoning explains why.
"""
high = float(config.get("recommendation_high_confidence_threshold", 70.0))
moderate = float(config.get("recommendation_moderate_confidence_threshold", 50.0))
diff = float(config.get("recommendation_confidence_diff_threshold", 20.0))
long_ok = available_directions is None or "long" in available_directions
short_ok = available_directions is None or "short" in available_directions
if long_ok and long_confidence >= high and (long_confidence - short_confidence) >= diff:
return "LONG_HIGH"
if short_ok and short_confidence >= high and (short_confidence - long_confidence) >= diff:
return "SHORT_HIGH"
if long_ok and long_confidence >= moderate and (long_confidence - short_confidence) >= diff:
return "LONG_MODERATE"
if short_ok and short_confidence >= moderate and (short_confidence - long_confidence) >= diff:
return "SHORT_MODERATE"
return "NEUTRAL"
def _build_reasoning(
action: str,
long_confidence: float,
short_confidence: float,
conflicts: list[str],
dimension_scores: dict[str, float],
sentiment_classification: str | None,
config: dict[str, float],
available_directions: set[str] | None = None,
) -> str:
"""Ticker-level reasoning that always matches the recommended action.
Stored identically on both setups so the displayed summary can never mix a
SHORT setup's reasoning under a LONG action.
"""
sentiment = _sentiment_value(sentiment_classification) or "unknown"
technical = float(dimension_scores.get("technical", 50.0))
momentum = float(dimension_scores.get("momentum", 50.0))
signals = f"technical={technical:.0f}, momentum={momentum:.0f}, sentiment={sentiment}"
conflict_note = f" {len(conflicts)} conflict(s) detected, risk-adjusted." if conflicts else ""
if action != "NEUTRAL":
direction = "long" if action.startswith("LONG") else "short"
tier = "high" if action.endswith("HIGH") else "moderate"
confidence = long_confidence if direction == "long" else short_confidence
aligned, _ = check_signal_alignment(direction, dimension_scores, sentiment_classification)
return (
f"{direction.upper()} ({tier} confidence): {confidence:.0f}% with "
f"{'aligned' if aligned else 'mixed'} signals ({signals}).{conflict_note}"
)
# NEUTRAL — explain whether it's a missing setup or genuinely mixed signals.
moderate = float(config.get("recommendation_moderate_confidence_threshold", 50.0))
avail = available_directions if available_directions is not None else {"long", "short"}
bias_dir = "long" if long_confidence >= short_confidence else "short"
bias_conf = max(long_confidence, short_confidence)
if bias_conf >= moderate and bias_dir not in avail:
other = "short" if bias_dir == "long" else "long"
extreme = "highs (no resistance target above)" if bias_dir == "long" else "lows (no support target below)"
return (
f"Ticker bias is {bias_dir.upper()} (confidence {bias_conf:.0f}%, {signals}) but price is "
f"extended near {extreme}, so no high-conviction {bias_dir} setup is available. "
f"The available {other.upper()} setup is counter-trend.{conflict_note}"
)
return (
f"No high-conviction setup: LONG {long_confidence:.0f}%, SHORT {short_confidence:.0f}% "
f"({signals}).{conflict_note}"
)
PRIMARY_TARGET_MIN_RR = 1.5
def _select_primary_target(targets: list[dict], min_rr: float = PRIMARY_TARGET_MIN_RR) -> dict | None:
"""Primary = the most LIKELY target that still offers real asymmetry.
Among targets clearing a minimal R:R floor, pick the highest probability
(tie-break by R:R). This fixes the old pick, which ignored probability and
could land on the furthest, least-likely 'lottery' level. Stronger-reward
levels remain in the table as stretch targets. Falls back to the highest-R:R
target if nothing clears the floor.
"""
if not targets:
return None
worthwhile = [t for t in targets if float(t.get("rr_ratio", 0.0)) >= min_rr]
pool = worthwhile or targets
return max(
pool,
key=lambda t: (float(t.get("probability", 0.0)), float(t.get("rr_ratio", 0.0))),
)
async def enhance_trade_setup(
db: AsyncSession,
ticker: Ticker,
setup: TradeSetup,
dimension_scores: dict[str, float],
sr_levels: list[SRLevel],
sentiment_classification: str | None,
atr_value: float,
available_directions: set[str] | None = None,
) -> TradeSetup:
config = await get_recommendation_config(db)
conflicts = signal_conflict_detector.detect_conflicts(
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
config=config,
)
long_confidence = direction_analyzer.calculate_confidence(
direction="long",
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
conflicts=conflicts,
)
short_confidence = direction_analyzer.calculate_confidence(
direction="short",
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
conflicts=conflicts,
)
direction = setup.direction.lower()
confidence = long_confidence if direction == "long" else short_confidence
# Merge near-duplicate levels into zone representatives first, so a clustered
# wall yields one strength-combined target instead of several near-identical
# ones — consistent with the chart and alerts.
zone_levels = _zone_representative_levels(sr_levels, setup.entry_price)
targets = target_generator.generate_targets(
direction=direction,
entry_price=setup.entry_price,
stop_loss=setup.stop_loss,
sr_levels=zone_levels,
atr_value=atr_value,
)
for target in targets:
target["probability"] = probability_estimator.estimate_probability(
target=target,
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
direction=direction,
config=config,
)
# Label follows from the reach-probability: high prob = Conservative.
target["classification"] = _classify_by_probability(target["probability"])
# Primary target = most-likely target with real asymmetry (see
# _select_primary_target), not the old quality-score pick that ignored
# probability. Sync the setup's headline target/rr_ratio so the chart, gate
# and outcome eval all agree with the table's starred row.
primary = _select_primary_target(targets)
if primary is not None:
for target in targets:
target["is_primary"] = target is primary
setup.target = round(float(primary["price"]), 4)
setup.rr_ratio = round(float(primary["rr_ratio"]), 4)
# Per-setup conflicts (target availability is specific to this setup)
setup_conflicts = list(conflicts)
if len(targets) < 3:
setup_conflicts.append("target-availability: Fewer than 3 valid S/R targets available")
# Action and reasoning are ticker-level: they consider both directions and
# which directions are actually tradeable, and are identical on every setup.
action = _choose_recommended_action(
long_confidence, short_confidence, config, available_directions
)
reasoning = _build_reasoning(
action=action,
long_confidence=long_confidence,
short_confidence=short_confidence,
conflicts=conflicts,
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
config=config,
available_directions=available_directions,
)
setup.confidence_score = round(confidence, 2)
setup.targets_json = json.dumps(targets)
setup.conflict_flags_json = json.dumps(setup_conflicts)
setup.recommended_action = action
setup.reasoning = reasoning
setup.risk_level = _risk_level_from_conflicts(setup_conflicts)
return setup