from __future__ import annotations import json import logging import math from types import SimpleNamespace from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.settings import SystemSetting from app.models.sr_level import SRLevel from app.models.ticker import Ticker from app.models.trade_setup import TradeSetup from app.services.sr_service import cluster_sr_zones logger = logging.getLogger(__name__) DEFAULT_RECOMMENDATION_CONFIG: dict[str, float] = { "recommendation_high_confidence_threshold": 70.0, "recommendation_moderate_confidence_threshold": 50.0, "recommendation_confidence_diff_threshold": 20.0, "recommendation_signal_alignment_weight": 0.15, "recommendation_sr_strength_weight": 0.20, "recommendation_momentum_technical_divergence_threshold": 30.0, "recommendation_fundamental_technical_divergence_threshold": 40.0, } # Horizon (trading days) over which a target's reach-probability is estimated. # Kept in step with the outcome evaluator's window so probability predicts the # metric we actually measure. _TARGET_HORIZON_DAYS = 30.0 # Distance bands (in ATR) used to spread targets across Conservative / Moderate # / Aggressive. Aligned with where the touch-probability crosses 60% / 40% over # the horizon, so a target from each band tends to land in the matching label. _CONSERVATIVE_MAX_ATR = 2.9 _MODERATE_MAX_ATR = 4.6 # Merge S/R levels within this fraction into one zone before generating targets — # the same tolerance the chart and alerts use, so S/R is one model app-wide. _SR_ZONE_TOLERANCE = 0.02 def _clamp(value: float, low: float, high: float) -> float: return max(low, min(high, value)) def _zone_representative_levels(sr_levels: list[SRLevel], entry_price: float) -> list[Any]: """Collapse near-duplicate S/R levels into one representative per zone. Targets are generated from these representatives, so a clustered wall (e.g. 183 + 185) becomes a single target carrying the zone's COMBINED strength (capped at 100) instead of two near-identical targets, each undervaluing the wall. Same clusterer as the chart and alerts → one S/R model everywhere. The representative price is the zone's near edge (the reachable side of the wall) and it keeps the strongest constituent's id for reference. Singleton levels pass through unchanged. """ if not sr_levels or entry_price <= 0: return list(sr_levels) level_dicts = [ {"price_level": float(lv.price_level), "strength": int(lv.strength), "type": lv.type} for lv in sr_levels ] zones = cluster_sr_zones(level_dicts, entry_price, tolerance=_SR_ZONE_TOLERANCE) reps: list[Any] = [] for zone in zones: constituents = [ lv for lv in sr_levels if zone["low"] <= float(lv.price_level) <= zone["high"] ] if not constituents: continue strongest = max(constituents, key=lambda lv: lv.strength) # Near edge: bottom of a resistance wall (above entry), top of a support # wall (below entry) — the first price the move reaches. near_edge = zone["low"] if zone["type"] == "resistance" else zone["high"] reps.append( SimpleNamespace( id=int(strongest.id), price_level=float(near_edge), type=zone["type"], strength=int(zone["strength"]), ) ) return reps def _norm_cdf(x: float) -> float: """Standard normal CDF via erf (no SciPy dependency).""" return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0))) def _classify_by_probability(probability: float) -> str: """Label a target by how likely it is to be reached (derived, not assumed).""" if probability >= 60.0: return "Conservative" if probability >= 40.0: return "Moderate" return "Aggressive" def _sentiment_value(sentiment_classification: str | None) -> str | None: if sentiment_classification is None: return None return sentiment_classification.strip().lower() def check_signal_alignment( direction: str, dimension_scores: dict[str, float], sentiment_classification: str | None, ) -> tuple[bool, str]: technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) sentiment = _sentiment_value(sentiment_classification) if direction == "long": aligned_count = sum([ technical > 60, momentum > 60, sentiment == "bullish", ]) if aligned_count >= 2: return True, "Technical, momentum, and/or sentiment align with LONG direction." return False, "Signals are mixed for LONG direction." aligned_count = sum([ technical < 40, momentum < 40, sentiment == "bearish", ]) if aligned_count >= 2: return True, "Technical, momentum, and/or sentiment align with SHORT direction." return False, "Signals are mixed for SHORT direction." class SignalConflictDetector: def detect_conflicts( self, dimension_scores: dict[str, float], sentiment_classification: str | None, config: dict[str, float] | None = None, ) -> list[str]: cfg = config or DEFAULT_RECOMMENDATION_CONFIG technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) fundamental = float(dimension_scores.get("fundamental", 50.0)) sentiment = _sentiment_value(sentiment_classification) mt_threshold = float(cfg.get("recommendation_momentum_technical_divergence_threshold", 30.0)) ft_threshold = float(cfg.get("recommendation_fundamental_technical_divergence_threshold", 40.0)) conflicts: list[str] = [] if sentiment == "bearish" and technical > 60: conflicts.append( f"sentiment-technical: Bearish sentiment conflicts with bullish technical ({technical:.0f})" ) if sentiment == "bullish" and technical < 40: conflicts.append( f"sentiment-technical: Bullish sentiment conflicts with bearish technical ({technical:.0f})" ) mt_diff = abs(momentum - technical) if mt_diff > mt_threshold: conflicts.append( "momentum-technical: " f"Momentum ({momentum:.0f}) diverges from technical ({technical:.0f}) by {mt_diff:.0f} points" ) if sentiment == "bearish" and momentum > 60: conflicts.append( f"sentiment-momentum: Bearish sentiment conflicts with momentum ({momentum:.0f})" ) if sentiment == "bullish" and momentum < 40: conflicts.append( f"sentiment-momentum: Bullish sentiment conflicts with momentum ({momentum:.0f})" ) ft_diff = abs(fundamental - technical) if ft_diff > ft_threshold: conflicts.append( "fundamental-technical: " f"Fundamental ({fundamental:.0f}) diverges significantly from technical ({technical:.0f})" ) return conflicts class DirectionAnalyzer: def calculate_confidence( self, direction: str, dimension_scores: dict[str, float], sentiment_classification: str | None, conflicts: list[str] | None = None, ) -> float: """Directional-agreement confidence around a 50 baseline. Each dimension contributes in proportion to how strongly it agrees with the proposed direction: a bullish dimension RAISES long confidence and LOWERS short confidence (and vice-versa). Signals that oppose the direction push confidence below 50 — so a short on a strongly bullish stock scores near zero, not 55. """ technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) fundamental = float(dimension_scores.get("fundamental", 50.0)) sentiment = _sentiment_value(sentiment_classification) dir_sign = 1.0 if direction == "long" else -1.0 def agree(score: float) -> float: # -1 (fully against) .. +1 (fully for) the proposed direction return ((score - 50.0) / 50.0) * dir_sign sentiment_val = {"bullish": 1.0, "bearish": -1.0}.get(sentiment or "", 0.0) sentiment_agree = sentiment_val * dir_sign confidence = 50.0 + ( agree(technical) * 25.0 + agree(momentum) * 20.0 + sentiment_agree * 15.0 + agree(fundamental) * 10.0 ) # Explicit conflict patterns trim a little more (the agreement terms # already capture most disagreement, so penalties are modest). for conflict in conflicts or []: if "sentiment-technical" in conflict: confidence -= 12.0 elif "momentum-technical" in conflict: confidence -= 10.0 elif "sentiment-momentum" in conflict: confidence -= 12.0 elif "fundamental-technical" in conflict: confidence -= 6.0 return _clamp(confidence, 0.0, 100.0) class TargetGenerator: def generate_targets( self, direction: str, entry_price: float, stop_loss: float, sr_levels: list[SRLevel], atr_value: float, ) -> list[dict[str, Any]]: if atr_value <= 0: return [] risk = abs(entry_price - stop_loss) if risk <= 0: return [] candidates: list[dict[str, Any]] = [] atr_pct = atr_value / entry_price if entry_price > 0 else 0.0 max_atr_multiple: float | None = None if atr_pct > 0.05: max_atr_multiple = 10.0 elif atr_pct < 0.02: max_atr_multiple = 3.0 for level in sr_levels: is_candidate = False if direction == "long": is_candidate = level.type == "resistance" and level.price_level > entry_price else: is_candidate = level.type == "support" and level.price_level < entry_price if not is_candidate: continue distance = abs(level.price_level - entry_price) distance_atr_multiple = distance / atr_value if distance_atr_multiple < 1.0: continue if max_atr_multiple is not None and distance_atr_multiple > max_atr_multiple: continue reward = abs(level.price_level - entry_price) rr_ratio = reward / risk norm_rr = min(rr_ratio / 10.0, 1.0) norm_strength = _clamp(level.strength, 0, 100) / 100.0 norm_proximity = 1.0 - min(distance / entry_price, 1.0) quality = 0.35 * norm_rr + 0.35 * norm_strength + 0.30 * norm_proximity candidates.append( { "price": float(level.price_level), "distance_from_entry": float(distance), "distance_atr_multiple": float(distance_atr_multiple), "rr_ratio": float(rr_ratio), "classification": "Moderate", "sr_level_id": int(level.id), "sr_strength": float(level.strength), "quality": float(quality), } ) if not candidates: return [] # Select up to 5 targets that SPAN the distance range, instead of the # top-5 by quality (which biases toward far, high-R:R levels and buries # every nearby target). Guarantees the nearest level plus a # representative from each distance band when they exist, so the table # offers a real mix of Conservative / Moderate / Aggressive. conservative = [c for c in candidates if c["distance_atr_multiple"] <= _CONSERVATIVE_MAX_ATR] moderate = [c for c in candidates if _CONSERVATIVE_MAX_ATR < c["distance_atr_multiple"] <= _MODERATE_MAX_ATR] aggressive = [c for c in candidates if c["distance_atr_multiple"] > _MODERATE_MAX_ATR] selected: list[dict[str, Any]] = [] selected_ids: set[int] = set() def _add(candidate: dict[str, Any] | None) -> None: if candidate is not None and candidate["sr_level_id"] not in selected_ids: selected.append(candidate) selected_ids.add(candidate["sr_level_id"]) # Nearest overall (the most likely / Conservative anchor) _add(min(candidates, key=lambda c: c["distance_atr_multiple"])) # Best-quality representative from each band → spread across labels for bucket in (conservative, moderate, aggressive): if bucket: _add(max(bucket, key=lambda c: c["quality"])) # Fill remaining slots with the next-best by quality for candidate in sorted(candidates, key=lambda c: c["quality"], reverse=True): if len(selected) >= 5: break _add(candidate) selected.sort(key=lambda row: row["distance_from_entry"]) for target in selected: target.pop("quality", None) return selected class ProbabilityEstimator: def estimate_probability( self, target: dict[str, Any], dimension_scores: dict[str, float], sentiment_classification: str | None, direction: str, config: dict[str, float], ) -> float: """Probability the target is hit BEFORE the stop, within the horizon. Two factors (backtest-calibrated 2026-06-15 — the old touch-only model was ~2× over-confident because it ignored the competing stop): reach = P(price touches the target within T) — driftless random walk, reflection principle: 2·(1 − Φ(d / (ATR·√T))). Falls with distance, so a far target is inherently unlikely. ruin = P(target before stop | both reachable) — the two-barrier gambler's-ruin ratio stop/(target+stop) = 1/(R:R + 1). A 3:1 setup wins the race ~25% of the time, not ~70%. base = reach · ruin. Strength and signal alignment (drift toward target) then modulate it. """ strength = float(target.get("sr_strength", 50.0)) atr_multiple = float(target.get("distance_atr_multiple", 1.0)) rr = float(target.get("rr_ratio", 0.0)) expected_move_atr = math.sqrt(_TARGET_HORIZON_DAYS) # ≈ 5.48 ATR over 30d z = atr_multiple / expected_move_atr if expected_move_atr > 0 else 99.0 reach = 2.0 * (1.0 - _norm_cdf(z)) # 0..1, P(touch target in horizon) # P(target before stop): stop distance / (target + stop) = 1/(rr+1). # Without a known rr (e.g. isolated probability checks), assume an even race. ruin = 1.0 / (rr + 1.0) if rr > 0 else 0.5 probability = reach * ruin * 100.0 technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) sentiment = _sentiment_value(sentiment_classification) # Drift toward the target raises touch probability; against it lowers. if direction == "long": aligned = technical > 60 and (sentiment == "bullish" or momentum > 60) opposed = technical < 40 or (sentiment == "bearish" and momentum < 40) else: aligned = technical < 40 and (sentiment == "bearish" or momentum < 40) opposed = technical > 60 or (sentiment == "bullish" and momentum > 60) sr_weight = float(config.get("recommendation_sr_strength_weight", 0.20)) signal_weight = float(config.get("recommendation_signal_alignment_weight", 0.15)) # Strength magnet: ±(sr_weight·50) at the extremes (±10 by default) probability += ((strength - 50.0) / 50.0) * (sr_weight * 50.0) # Alignment: ±(signal_weight·100) (±15 by default) if aligned: probability += signal_weight * 100.0 elif opposed: probability -= signal_weight * 100.0 return round(_clamp(probability, 3.0, 95.0), 2) signal_conflict_detector = SignalConflictDetector() direction_analyzer = DirectionAnalyzer() target_generator = TargetGenerator() probability_estimator = ProbabilityEstimator() async def get_recommendation_config(db: AsyncSession) -> dict[str, float]: result = await db.execute( select(SystemSetting).where(SystemSetting.key.like("recommendation_%")) ) rows = result.scalars().all() config: dict[str, float] = dict(DEFAULT_RECOMMENDATION_CONFIG) for setting in rows: try: config[setting.key] = float(setting.value) except (TypeError, ValueError): logger.warning("Invalid recommendation setting value for %s: %s", setting.key, setting.value) return config def _risk_level_from_conflicts(conflicts: list[str]) -> str: if not conflicts: return "Low" severe = [c for c in conflicts if "sentiment-technical" in c or "sentiment-momentum" in c] if len(severe) >= 2 or len(conflicts) >= 3: return "High" return "Medium" def _choose_recommended_action( long_confidence: float, short_confidence: float, config: dict[str, float], available_directions: set[str] | None = None, ) -> str: """Pick the ticker action — but only recommend a direction you can trade. A direction is recommendable only if a tradeable setup exists for it (``available_directions``). So a strong LONG bias on a stock at all-time highs — where the scanner can build no long target — does NOT yield LONG_HIGH; it falls through to NEUTRAL, and the reasoning explains why. """ high = float(config.get("recommendation_high_confidence_threshold", 70.0)) moderate = float(config.get("recommendation_moderate_confidence_threshold", 50.0)) diff = float(config.get("recommendation_confidence_diff_threshold", 20.0)) long_ok = available_directions is None or "long" in available_directions short_ok = available_directions is None or "short" in available_directions if long_ok and long_confidence >= high and (long_confidence - short_confidence) >= diff: return "LONG_HIGH" if short_ok and short_confidence >= high and (short_confidence - long_confidence) >= diff: return "SHORT_HIGH" if long_ok and long_confidence >= moderate and (long_confidence - short_confidence) >= diff: return "LONG_MODERATE" if short_ok and short_confidence >= moderate and (short_confidence - long_confidence) >= diff: return "SHORT_MODERATE" return "NEUTRAL" def _build_reasoning( action: str, long_confidence: float, short_confidence: float, conflicts: list[str], dimension_scores: dict[str, float], sentiment_classification: str | None, config: dict[str, float], available_directions: set[str] | None = None, ) -> str: """Ticker-level reasoning that always matches the recommended action. Stored identically on both setups so the displayed summary can never mix a SHORT setup's reasoning under a LONG action. """ sentiment = _sentiment_value(sentiment_classification) or "unknown" technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) signals = f"technical={technical:.0f}, momentum={momentum:.0f}, sentiment={sentiment}" conflict_note = f" {len(conflicts)} conflict(s) detected, risk-adjusted." if conflicts else "" if action != "NEUTRAL": direction = "long" if action.startswith("LONG") else "short" tier = "high" if action.endswith("HIGH") else "moderate" confidence = long_confidence if direction == "long" else short_confidence aligned, _ = check_signal_alignment(direction, dimension_scores, sentiment_classification) return ( f"{direction.upper()} ({tier} confidence): {confidence:.0f}% with " f"{'aligned' if aligned else 'mixed'} signals ({signals}).{conflict_note}" ) # NEUTRAL — explain whether it's a missing setup or genuinely mixed signals. moderate = float(config.get("recommendation_moderate_confidence_threshold", 50.0)) avail = available_directions if available_directions is not None else {"long", "short"} bias_dir = "long" if long_confidence >= short_confidence else "short" bias_conf = max(long_confidence, short_confidence) if bias_conf >= moderate and bias_dir not in avail: other = "short" if bias_dir == "long" else "long" extreme = "highs (no resistance target above)" if bias_dir == "long" else "lows (no support target below)" return ( f"Ticker bias is {bias_dir.upper()} (confidence {bias_conf:.0f}%, {signals}) but price is " f"extended near {extreme}, so no high-conviction {bias_dir} setup is available. " f"The available {other.upper()} setup is counter-trend.{conflict_note}" ) return ( f"No high-conviction setup: LONG {long_confidence:.0f}%, SHORT {short_confidence:.0f}% " f"({signals}).{conflict_note}" ) PRIMARY_TARGET_MIN_RR = 1.5 def _select_primary_target(targets: list[dict], min_rr: float = PRIMARY_TARGET_MIN_RR) -> dict | None: """Primary = the most LIKELY target that still offers real asymmetry. Among targets clearing a minimal R:R floor, pick the highest probability (tie-break by R:R). This fixes the old pick, which ignored probability and could land on the furthest, least-likely 'lottery' level. Stronger-reward levels remain in the table as stretch targets. Falls back to the highest-R:R target if nothing clears the floor. """ if not targets: return None worthwhile = [t for t in targets if float(t.get("rr_ratio", 0.0)) >= min_rr] pool = worthwhile or targets return max( pool, key=lambda t: (float(t.get("probability", 0.0)), float(t.get("rr_ratio", 0.0))), ) async def enhance_trade_setup( db: AsyncSession, ticker: Ticker, setup: TradeSetup, dimension_scores: dict[str, float], sr_levels: list[SRLevel], sentiment_classification: str | None, atr_value: float, available_directions: set[str] | None = None, ) -> TradeSetup: config = await get_recommendation_config(db) conflicts = signal_conflict_detector.detect_conflicts( dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, config=config, ) long_confidence = direction_analyzer.calculate_confidence( direction="long", dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, conflicts=conflicts, ) short_confidence = direction_analyzer.calculate_confidence( direction="short", dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, conflicts=conflicts, ) direction = setup.direction.lower() confidence = long_confidence if direction == "long" else short_confidence # Merge near-duplicate levels into zone representatives first, so a clustered # wall yields one strength-combined target instead of several near-identical # ones — consistent with the chart and alerts. zone_levels = _zone_representative_levels(sr_levels, setup.entry_price) targets = target_generator.generate_targets( direction=direction, entry_price=setup.entry_price, stop_loss=setup.stop_loss, sr_levels=zone_levels, atr_value=atr_value, ) for target in targets: target["probability"] = probability_estimator.estimate_probability( target=target, dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, direction=direction, config=config, ) # Label follows from the reach-probability: high prob = Conservative. target["classification"] = _classify_by_probability(target["probability"]) # Primary target = most-likely target with real asymmetry (see # _select_primary_target), not the old quality-score pick that ignored # probability. Sync the setup's headline target/rr_ratio so the chart, gate # and outcome eval all agree with the table's starred row. primary = _select_primary_target(targets) if primary is not None: for target in targets: target["is_primary"] = target is primary setup.target = round(float(primary["price"]), 4) setup.rr_ratio = round(float(primary["rr_ratio"]), 4) # Per-setup conflicts (target availability is specific to this setup) setup_conflicts = list(conflicts) if len(targets) < 3: setup_conflicts.append("target-availability: Fewer than 3 valid S/R targets available") # Action and reasoning are ticker-level: they consider both directions and # which directions are actually tradeable, and are identical on every setup. action = _choose_recommended_action( long_confidence, short_confidence, config, available_directions ) reasoning = _build_reasoning( action=action, long_confidence=long_confidence, short_confidence=short_confidence, conflicts=conflicts, dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, config=config, available_directions=available_directions, ) setup.confidence_score = round(confidence, 2) setup.targets_json = json.dumps(targets) setup.conflict_flags_json = json.dumps(setup_conflicts) setup.recommended_action = action setup.reasoning = reasoning setup.risk_level = _risk_level_from_conflicts(setup_conflicts) return setup