from __future__ import annotations import json import logging import math from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.settings import SystemSetting from app.models.sr_level import SRLevel from app.models.ticker import Ticker from app.models.trade_setup import TradeSetup logger = logging.getLogger(__name__) DEFAULT_RECOMMENDATION_CONFIG: dict[str, float] = { "recommendation_high_confidence_threshold": 70.0, "recommendation_moderate_confidence_threshold": 50.0, "recommendation_confidence_diff_threshold": 20.0, "recommendation_signal_alignment_weight": 0.15, "recommendation_sr_strength_weight": 0.20, "recommendation_distance_penalty_factor": 0.10, "recommendation_momentum_technical_divergence_threshold": 30.0, "recommendation_fundamental_technical_divergence_threshold": 40.0, } # Horizon (trading days) over which a target's reach-probability is estimated. # Kept in step with the outcome evaluator's window so probability predicts the # metric we actually measure. _TARGET_HORIZON_DAYS = 30.0 # Distance bands (in ATR) used to spread targets across Conservative / Moderate # / Aggressive. Aligned with where the touch-probability crosses 60% / 40% over # the horizon, so a target from each band tends to land in the matching label. _CONSERVATIVE_MAX_ATR = 2.9 _MODERATE_MAX_ATR = 4.6 def _clamp(value: float, low: float, high: float) -> float: return max(low, min(high, value)) def _norm_cdf(x: float) -> float: """Standard normal CDF via erf (no SciPy dependency).""" return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0))) def _classify_by_probability(probability: float) -> str: """Label a target by how likely it is to be reached (derived, not assumed).""" if probability >= 60.0: return "Conservative" if probability >= 40.0: return "Moderate" return "Aggressive" def _sentiment_value(sentiment_classification: str | None) -> str | None: if sentiment_classification is None: return None return sentiment_classification.strip().lower() def check_signal_alignment( direction: str, dimension_scores: dict[str, float], sentiment_classification: str | None, ) -> tuple[bool, str]: technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) sentiment = _sentiment_value(sentiment_classification) if direction == "long": aligned_count = sum([ technical > 60, momentum > 60, sentiment == "bullish", ]) if aligned_count >= 2: return True, "Technical, momentum, and/or sentiment align with LONG direction." return False, "Signals are mixed for LONG direction." aligned_count = sum([ technical < 40, momentum < 40, sentiment == "bearish", ]) if aligned_count >= 2: return True, "Technical, momentum, and/or sentiment align with SHORT direction." return False, "Signals are mixed for SHORT direction." class SignalConflictDetector: def detect_conflicts( self, dimension_scores: dict[str, float], sentiment_classification: str | None, config: dict[str, float] | None = None, ) -> list[str]: cfg = config or DEFAULT_RECOMMENDATION_CONFIG technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) fundamental = float(dimension_scores.get("fundamental", 50.0)) sentiment = _sentiment_value(sentiment_classification) mt_threshold = float(cfg.get("recommendation_momentum_technical_divergence_threshold", 30.0)) ft_threshold = float(cfg.get("recommendation_fundamental_technical_divergence_threshold", 40.0)) conflicts: list[str] = [] if sentiment == "bearish" and technical > 60: conflicts.append( f"sentiment-technical: Bearish sentiment conflicts with bullish technical ({technical:.0f})" ) if sentiment == "bullish" and technical < 40: conflicts.append( f"sentiment-technical: Bullish sentiment conflicts with bearish technical ({technical:.0f})" ) mt_diff = abs(momentum - technical) if mt_diff > mt_threshold: conflicts.append( "momentum-technical: " f"Momentum ({momentum:.0f}) diverges from technical ({technical:.0f}) by {mt_diff:.0f} points" ) if sentiment == "bearish" and momentum > 60: conflicts.append( f"sentiment-momentum: Bearish sentiment conflicts with momentum ({momentum:.0f})" ) if sentiment == "bullish" and momentum < 40: conflicts.append( f"sentiment-momentum: Bullish sentiment conflicts with momentum ({momentum:.0f})" ) ft_diff = abs(fundamental - technical) if ft_diff > ft_threshold: conflicts.append( "fundamental-technical: " f"Fundamental ({fundamental:.0f}) diverges significantly from technical ({technical:.0f})" ) return conflicts class DirectionAnalyzer: def calculate_confidence( self, direction: str, dimension_scores: dict[str, float], sentiment_classification: str | None, conflicts: list[str] | None = None, ) -> float: """Directional-agreement confidence around a 50 baseline. Each dimension contributes in proportion to how strongly it agrees with the proposed direction: a bullish dimension RAISES long confidence and LOWERS short confidence (and vice-versa). Signals that oppose the direction push confidence below 50 — so a short on a strongly bullish stock scores near zero, not 55. """ technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) fundamental = float(dimension_scores.get("fundamental", 50.0)) sentiment = _sentiment_value(sentiment_classification) dir_sign = 1.0 if direction == "long" else -1.0 def agree(score: float) -> float: # -1 (fully against) .. +1 (fully for) the proposed direction return ((score - 50.0) / 50.0) * dir_sign sentiment_val = {"bullish": 1.0, "bearish": -1.0}.get(sentiment or "", 0.0) sentiment_agree = sentiment_val * dir_sign confidence = 50.0 + ( agree(technical) * 25.0 + agree(momentum) * 20.0 + sentiment_agree * 15.0 + agree(fundamental) * 10.0 ) # Explicit conflict patterns trim a little more (the agreement terms # already capture most disagreement, so penalties are modest). for conflict in conflicts or []: if "sentiment-technical" in conflict: confidence -= 12.0 elif "momentum-technical" in conflict: confidence -= 10.0 elif "sentiment-momentum" in conflict: confidence -= 12.0 elif "fundamental-technical" in conflict: confidence -= 6.0 return _clamp(confidence, 0.0, 100.0) class TargetGenerator: def generate_targets( self, direction: str, entry_price: float, stop_loss: float, sr_levels: list[SRLevel], atr_value: float, ) -> list[dict[str, Any]]: if atr_value <= 0: return [] risk = abs(entry_price - stop_loss) if risk <= 0: return [] candidates: list[dict[str, Any]] = [] atr_pct = atr_value / entry_price if entry_price > 0 else 0.0 max_atr_multiple: float | None = None if atr_pct > 0.05: max_atr_multiple = 10.0 elif atr_pct < 0.02: max_atr_multiple = 3.0 for level in sr_levels: is_candidate = False if direction == "long": is_candidate = level.type == "resistance" and level.price_level > entry_price else: is_candidate = level.type == "support" and level.price_level < entry_price if not is_candidate: continue distance = abs(level.price_level - entry_price) distance_atr_multiple = distance / atr_value if distance_atr_multiple < 1.0: continue if max_atr_multiple is not None and distance_atr_multiple > max_atr_multiple: continue reward = abs(level.price_level - entry_price) rr_ratio = reward / risk norm_rr = min(rr_ratio / 10.0, 1.0) norm_strength = _clamp(level.strength, 0, 100) / 100.0 norm_proximity = 1.0 - min(distance / entry_price, 1.0) quality = 0.35 * norm_rr + 0.35 * norm_strength + 0.30 * norm_proximity candidates.append( { "price": float(level.price_level), "distance_from_entry": float(distance), "distance_atr_multiple": float(distance_atr_multiple), "rr_ratio": float(rr_ratio), "classification": "Moderate", "sr_level_id": int(level.id), "sr_strength": float(level.strength), "quality": float(quality), } ) if not candidates: return [] # Select up to 5 targets that SPAN the distance range, instead of the # top-5 by quality (which biases toward far, high-R:R levels and buries # every nearby target). Guarantees the nearest level plus a # representative from each distance band when they exist, so the table # offers a real mix of Conservative / Moderate / Aggressive. conservative = [c for c in candidates if c["distance_atr_multiple"] <= _CONSERVATIVE_MAX_ATR] moderate = [c for c in candidates if _CONSERVATIVE_MAX_ATR < c["distance_atr_multiple"] <= _MODERATE_MAX_ATR] aggressive = [c for c in candidates if c["distance_atr_multiple"] > _MODERATE_MAX_ATR] selected: list[dict[str, Any]] = [] selected_ids: set[int] = set() def _add(candidate: dict[str, Any] | None) -> None: if candidate is not None and candidate["sr_level_id"] not in selected_ids: selected.append(candidate) selected_ids.add(candidate["sr_level_id"]) # Nearest overall (the most likely / Conservative anchor) _add(min(candidates, key=lambda c: c["distance_atr_multiple"])) # Best-quality representative from each band → spread across labels for bucket in (conservative, moderate, aggressive): if bucket: _add(max(bucket, key=lambda c: c["quality"])) # Fill remaining slots with the next-best by quality for candidate in sorted(candidates, key=lambda c: c["quality"], reverse=True): if len(selected) >= 5: break _add(candidate) selected.sort(key=lambda row: row["distance_from_entry"]) for target in selected: target.pop("quality", None) return selected class ProbabilityEstimator: def estimate_probability( self, target: dict[str, Any], dimension_scores: dict[str, float], sentiment_classification: str | None, direction: str, config: dict[str, float], ) -> float: """Probability the target is reached within the outcome horizon. Base = probability of price *touching* a level at the target's distance within the evaluation window, under a driftless random walk (reflection principle): 2·(1 − Φ(d / (ATR·√T))). Distance is in ATR multiples and T is the horizon in trading days, so a far target is inherently unlikely — no more 90% on a +39% move. Strength and signal alignment (drift toward the target) then modulate it modestly. """ strength = float(target.get("sr_strength", 50.0)) atr_multiple = float(target.get("distance_atr_multiple", 1.0)) expected_move_atr = math.sqrt(_TARGET_HORIZON_DAYS) # ≈ 5.48 ATR over 30d z = atr_multiple / expected_move_atr if expected_move_atr > 0 else 99.0 touch_prob = 2.0 * (1.0 - _norm_cdf(z)) # 0..1 probability = touch_prob * 100.0 technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) sentiment = _sentiment_value(sentiment_classification) # Drift toward the target raises touch probability; against it lowers. if direction == "long": aligned = technical > 60 and (sentiment == "bullish" or momentum > 60) opposed = technical < 40 or (sentiment == "bearish" and momentum < 40) else: aligned = technical < 40 and (sentiment == "bearish" or momentum < 40) opposed = technical > 60 or (sentiment == "bullish" and momentum > 60) sr_weight = float(config.get("recommendation_sr_strength_weight", 0.20)) signal_weight = float(config.get("recommendation_signal_alignment_weight", 0.15)) # Strength magnet: ±(sr_weight·50) at the extremes (±10 by default) probability += ((strength - 50.0) / 50.0) * (sr_weight * 50.0) # Alignment: ±(signal_weight·100) (±15 by default) if aligned: probability += signal_weight * 100.0 elif opposed: probability -= signal_weight * 100.0 return round(_clamp(probability, 3.0, 95.0), 2) signal_conflict_detector = SignalConflictDetector() direction_analyzer = DirectionAnalyzer() target_generator = TargetGenerator() probability_estimator = ProbabilityEstimator() async def get_recommendation_config(db: AsyncSession) -> dict[str, float]: result = await db.execute( select(SystemSetting).where(SystemSetting.key.like("recommendation_%")) ) rows = result.scalars().all() config: dict[str, float] = dict(DEFAULT_RECOMMENDATION_CONFIG) for setting in rows: try: config[setting.key] = float(setting.value) except (TypeError, ValueError): logger.warning("Invalid recommendation setting value for %s: %s", setting.key, setting.value) return config def _risk_level_from_conflicts(conflicts: list[str]) -> str: if not conflicts: return "Low" severe = [c for c in conflicts if "sentiment-technical" in c or "sentiment-momentum" in c] if len(severe) >= 2 or len(conflicts) >= 3: return "High" return "Medium" def _choose_recommended_action( long_confidence: float, short_confidence: float, config: dict[str, float], available_directions: set[str] | None = None, ) -> str: """Pick the ticker action — but only recommend a direction you can trade. A direction is recommendable only if a tradeable setup exists for it (``available_directions``). So a strong LONG bias on a stock at all-time highs — where the scanner can build no long target — does NOT yield LONG_HIGH; it falls through to NEUTRAL, and the reasoning explains why. """ high = float(config.get("recommendation_high_confidence_threshold", 70.0)) moderate = float(config.get("recommendation_moderate_confidence_threshold", 50.0)) diff = float(config.get("recommendation_confidence_diff_threshold", 20.0)) long_ok = available_directions is None or "long" in available_directions short_ok = available_directions is None or "short" in available_directions if long_ok and long_confidence >= high and (long_confidence - short_confidence) >= diff: return "LONG_HIGH" if short_ok and short_confidence >= high and (short_confidence - long_confidence) >= diff: return "SHORT_HIGH" if long_ok and long_confidence >= moderate and (long_confidence - short_confidence) >= diff: return "LONG_MODERATE" if short_ok and short_confidence >= moderate and (short_confidence - long_confidence) >= diff: return "SHORT_MODERATE" return "NEUTRAL" def _build_reasoning( action: str, long_confidence: float, short_confidence: float, conflicts: list[str], dimension_scores: dict[str, float], sentiment_classification: str | None, config: dict[str, float], available_directions: set[str] | None = None, ) -> str: """Ticker-level reasoning that always matches the recommended action. Stored identically on both setups so the displayed summary can never mix a SHORT setup's reasoning under a LONG action. """ sentiment = _sentiment_value(sentiment_classification) or "unknown" technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) signals = f"technical={technical:.0f}, momentum={momentum:.0f}, sentiment={sentiment}" conflict_note = f" {len(conflicts)} conflict(s) detected, risk-adjusted." if conflicts else "" if action != "NEUTRAL": direction = "long" if action.startswith("LONG") else "short" tier = "high" if action.endswith("HIGH") else "moderate" confidence = long_confidence if direction == "long" else short_confidence aligned, _ = check_signal_alignment(direction, dimension_scores, sentiment_classification) return ( f"{direction.upper()} ({tier} confidence): {confidence:.0f}% with " f"{'aligned' if aligned else 'mixed'} signals ({signals}).{conflict_note}" ) # NEUTRAL — explain whether it's a missing setup or genuinely mixed signals. moderate = float(config.get("recommendation_moderate_confidence_threshold", 50.0)) avail = available_directions if available_directions is not None else {"long", "short"} bias_dir = "long" if long_confidence >= short_confidence else "short" bias_conf = max(long_confidence, short_confidence) if bias_conf >= moderate and bias_dir not in avail: other = "short" if bias_dir == "long" else "long" extreme = "highs (no resistance target above)" if bias_dir == "long" else "lows (no support target below)" return ( f"Ticker bias is {bias_dir.upper()} (confidence {bias_conf:.0f}%, {signals}) but price is " f"extended near {extreme}, so no high-conviction {bias_dir} setup is available. " f"The available {other.upper()} setup is counter-trend.{conflict_note}" ) return ( f"No high-conviction setup: LONG {long_confidence:.0f}%, SHORT {short_confidence:.0f}% " f"({signals}).{conflict_note}" ) PRIMARY_TARGET_MIN_RR = 1.5 def _select_primary_target(targets: list[dict], min_rr: float = PRIMARY_TARGET_MIN_RR) -> dict | None: """Primary = the most LIKELY target that still offers real asymmetry. Among targets clearing a minimal R:R floor, pick the highest probability (tie-break by R:R). This fixes the old pick, which ignored probability and could land on the furthest, least-likely 'lottery' level. Stronger-reward levels remain in the table as stretch targets. Falls back to the highest-R:R target if nothing clears the floor. """ if not targets: return None worthwhile = [t for t in targets if float(t.get("rr_ratio", 0.0)) >= min_rr] pool = worthwhile or targets return max( pool, key=lambda t: (float(t.get("probability", 0.0)), float(t.get("rr_ratio", 0.0))), ) async def enhance_trade_setup( db: AsyncSession, ticker: Ticker, setup: TradeSetup, dimension_scores: dict[str, float], sr_levels: list[SRLevel], sentiment_classification: str | None, atr_value: float, available_directions: set[str] | None = None, ) -> TradeSetup: config = await get_recommendation_config(db) conflicts = signal_conflict_detector.detect_conflicts( dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, config=config, ) long_confidence = direction_analyzer.calculate_confidence( direction="long", dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, conflicts=conflicts, ) short_confidence = direction_analyzer.calculate_confidence( direction="short", dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, conflicts=conflicts, ) direction = setup.direction.lower() confidence = long_confidence if direction == "long" else short_confidence targets = target_generator.generate_targets( direction=direction, entry_price=setup.entry_price, stop_loss=setup.stop_loss, sr_levels=sr_levels, atr_value=atr_value, ) for target in targets: target["probability"] = probability_estimator.estimate_probability( target=target, dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, direction=direction, config=config, ) # Label follows from the reach-probability: high prob = Conservative. target["classification"] = _classify_by_probability(target["probability"]) # Primary target = most-likely target with real asymmetry (see # _select_primary_target), not the old quality-score pick that ignored # probability. Sync the setup's headline target/rr_ratio so the chart, gate # and outcome eval all agree with the table's starred row. primary = _select_primary_target(targets) if primary is not None: for target in targets: target["is_primary"] = target is primary setup.target = round(float(primary["price"]), 4) setup.rr_ratio = round(float(primary["rr_ratio"]), 4) # Per-setup conflicts (target availability is specific to this setup) setup_conflicts = list(conflicts) if len(targets) < 3: setup_conflicts.append("target-availability: Fewer than 3 valid S/R targets available") # Action and reasoning are ticker-level: they consider both directions and # which directions are actually tradeable, and are identical on every setup. action = _choose_recommended_action( long_confidence, short_confidence, config, available_directions ) reasoning = _build_reasoning( action=action, long_confidence=long_confidence, short_confidence=short_confidence, conflicts=conflicts, dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, config=config, available_directions=available_directions, ) setup.confidence_score = round(confidence, 2) setup.targets_json = json.dumps(targets) setup.conflict_flags_json = json.dumps(setup_conflicts) setup.recommended_action = action setup.reasoning = reasoning setup.risk_level = _risk_level_from_conflicts(setup_conflicts) return setup