from __future__ import annotations import json import logging from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.settings import SystemSetting from app.models.sr_level import SRLevel from app.models.ticker import Ticker from app.models.trade_setup import TradeSetup logger = logging.getLogger(__name__) DEFAULT_RECOMMENDATION_CONFIG: dict[str, float] = { "recommendation_high_confidence_threshold": 70.0, "recommendation_moderate_confidence_threshold": 50.0, "recommendation_confidence_diff_threshold": 20.0, "recommendation_signal_alignment_weight": 0.15, "recommendation_sr_strength_weight": 0.20, "recommendation_distance_penalty_factor": 0.10, "recommendation_momentum_technical_divergence_threshold": 30.0, "recommendation_fundamental_technical_divergence_threshold": 40.0, } def _clamp(value: float, low: float, high: float) -> float: return max(low, min(high, value)) def _sentiment_value(sentiment_classification: str | None) -> str | None: if sentiment_classification is None: return None return sentiment_classification.strip().lower() def check_signal_alignment( direction: str, dimension_scores: dict[str, float], sentiment_classification: str | None, ) -> tuple[bool, str]: technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) sentiment = _sentiment_value(sentiment_classification) if direction == "long": aligned_count = sum([ technical > 60, momentum > 60, sentiment == "bullish", ]) if aligned_count >= 2: return True, "Technical, momentum, and/or sentiment align with LONG direction." return False, "Signals are mixed for LONG direction." aligned_count = sum([ technical < 40, momentum < 40, sentiment == "bearish", ]) if aligned_count >= 2: return True, "Technical, momentum, and/or sentiment align with SHORT direction." return False, "Signals are mixed for SHORT direction." class SignalConflictDetector: def detect_conflicts( self, dimension_scores: dict[str, float], sentiment_classification: str | None, config: dict[str, float] | None = None, ) -> list[str]: cfg = config or DEFAULT_RECOMMENDATION_CONFIG technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) fundamental = float(dimension_scores.get("fundamental", 50.0)) sentiment = _sentiment_value(sentiment_classification) mt_threshold = float(cfg.get("recommendation_momentum_technical_divergence_threshold", 30.0)) ft_threshold = float(cfg.get("recommendation_fundamental_technical_divergence_threshold", 40.0)) conflicts: list[str] = [] if sentiment == "bearish" and technical > 60: conflicts.append( f"sentiment-technical: Bearish sentiment conflicts with bullish technical ({technical:.0f})" ) if sentiment == "bullish" and technical < 40: conflicts.append( f"sentiment-technical: Bullish sentiment conflicts with bearish technical ({technical:.0f})" ) mt_diff = abs(momentum - technical) if mt_diff > mt_threshold: conflicts.append( "momentum-technical: " f"Momentum ({momentum:.0f}) diverges from technical ({technical:.0f}) by {mt_diff:.0f} points" ) if sentiment == "bearish" and momentum > 60: conflicts.append( f"sentiment-momentum: Bearish sentiment conflicts with momentum ({momentum:.0f})" ) if sentiment == "bullish" and momentum < 40: conflicts.append( f"sentiment-momentum: Bullish sentiment conflicts with momentum ({momentum:.0f})" ) ft_diff = abs(fundamental - technical) if ft_diff > ft_threshold: conflicts.append( "fundamental-technical: " f"Fundamental ({fundamental:.0f}) diverges significantly from technical ({technical:.0f})" ) return conflicts class DirectionAnalyzer: def calculate_confidence( self, direction: str, dimension_scores: dict[str, float], sentiment_classification: str | None, conflicts: list[str] | None = None, ) -> float: confidence = 50.0 technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) fundamental = float(dimension_scores.get("fundamental", 50.0)) sentiment = _sentiment_value(sentiment_classification) if direction == "long": if technical > 70: confidence += 25.0 elif technical > 60: confidence += 15.0 if momentum > 70: confidence += 20.0 elif momentum > 60: confidence += 15.0 if sentiment == "bullish": confidence += 15.0 elif sentiment == "neutral": confidence += 5.0 if fundamental > 60: confidence += 10.0 else: if technical < 30: confidence += 25.0 elif technical < 40: confidence += 15.0 if momentum < 30: confidence += 20.0 elif momentum < 40: confidence += 15.0 if sentiment == "bearish": confidence += 15.0 elif sentiment == "neutral": confidence += 5.0 if fundamental < 40: confidence += 10.0 for conflict in conflicts or []: if "sentiment-technical" in conflict: confidence -= 20.0 elif "momentum-technical" in conflict: confidence -= 15.0 elif "sentiment-momentum" in conflict: confidence -= 20.0 elif "fundamental-technical" in conflict: confidence -= 10.0 return _clamp(confidence, 0.0, 100.0) class TargetGenerator: def generate_targets( self, direction: str, entry_price: float, stop_loss: float, sr_levels: list[SRLevel], atr_value: float, ) -> list[dict[str, Any]]: if atr_value <= 0: return [] risk = abs(entry_price - stop_loss) if risk <= 0: return [] candidates: list[dict[str, Any]] = [] atr_pct = atr_value / entry_price if entry_price > 0 else 0.0 max_atr_multiple: float | None = None if atr_pct > 0.05: max_atr_multiple = 10.0 elif atr_pct < 0.02: max_atr_multiple = 3.0 for level in sr_levels: is_candidate = False if direction == "long": is_candidate = level.type == "resistance" and level.price_level > entry_price else: is_candidate = level.type == "support" and level.price_level < entry_price if not is_candidate: continue distance = abs(level.price_level - entry_price) distance_atr_multiple = distance / atr_value if distance_atr_multiple < 1.0: continue if max_atr_multiple is not None and distance_atr_multiple > max_atr_multiple: continue reward = abs(level.price_level - entry_price) rr_ratio = reward / risk norm_rr = min(rr_ratio / 10.0, 1.0) norm_strength = _clamp(level.strength, 0, 100) / 100.0 norm_proximity = 1.0 - min(distance / entry_price, 1.0) quality = 0.35 * norm_rr + 0.35 * norm_strength + 0.30 * norm_proximity candidates.append( { "price": float(level.price_level), "distance_from_entry": float(distance), "distance_atr_multiple": float(distance_atr_multiple), "rr_ratio": float(rr_ratio), "classification": "Moderate", "sr_level_id": int(level.id), "sr_strength": float(level.strength), "quality": float(quality), } ) candidates.sort(key=lambda row: row["quality"], reverse=True) selected = candidates[:5] selected.sort(key=lambda row: row["distance_from_entry"]) if not selected: return [] n = len(selected) for idx, target in enumerate(selected): if n <= 2: target["classification"] = "Conservative" if idx == 0 else "Aggressive" elif idx <= 1: target["classification"] = "Conservative" elif idx >= n - 2: target["classification"] = "Aggressive" else: target["classification"] = "Moderate" target.pop("quality", None) return selected class ProbabilityEstimator: def estimate_probability( self, target: dict[str, Any], dimension_scores: dict[str, float], sentiment_classification: str | None, direction: str, config: dict[str, float], ) -> float: classification = str(target.get("classification", "Moderate")) strength = float(target.get("sr_strength", 50.0)) atr_multiple = float(target.get("distance_atr_multiple", 1.0)) if classification == "Conservative": base_prob = 70.0 elif classification == "Aggressive": base_prob = 40.0 else: base_prob = 55.0 if strength >= 80: strength_adj = 15.0 elif strength >= 60: strength_adj = 10.0 elif strength >= 40: strength_adj = 5.0 else: strength_adj = -10.0 technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) sentiment = _sentiment_value(sentiment_classification) alignment_adj = 0.0 if direction == "long": if technical > 60 and (sentiment == "bullish" or momentum > 60): alignment_adj = 15.0 elif technical < 40 or (sentiment == "bearish" and momentum < 40): alignment_adj = -15.0 else: if technical < 40 and (sentiment == "bearish" or momentum < 40): alignment_adj = 15.0 elif technical > 60 or (sentiment == "bullish" and momentum > 60): alignment_adj = -15.0 volatility_adj = 0.0 if atr_multiple > 5: volatility_adj = 5.0 elif atr_multiple < 2: volatility_adj = 5.0 signal_weight = float(config.get("recommendation_signal_alignment_weight", 0.15)) sr_weight = float(config.get("recommendation_sr_strength_weight", 0.20)) distance_penalty = float(config.get("recommendation_distance_penalty_factor", 0.10)) scaled_alignment_adj = alignment_adj * (signal_weight / 0.15) scaled_strength_adj = strength_adj * (sr_weight / 0.20) distance_adj = -distance_penalty * max(atr_multiple - 1.0, 0.0) * 2.0 probability = base_prob + scaled_strength_adj + scaled_alignment_adj + volatility_adj + distance_adj probability = _clamp(probability, 10.0, 90.0) if classification == "Conservative": probability = max(probability, 61.0) elif classification == "Moderate": probability = _clamp(probability, 40.0, 70.0) elif classification == "Aggressive": probability = min(probability, 49.0) return round(probability, 2) signal_conflict_detector = SignalConflictDetector() direction_analyzer = DirectionAnalyzer() target_generator = TargetGenerator() probability_estimator = ProbabilityEstimator() async def get_recommendation_config(db: AsyncSession) -> dict[str, float]: result = await db.execute( select(SystemSetting).where(SystemSetting.key.like("recommendation_%")) ) rows = result.scalars().all() config: dict[str, float] = dict(DEFAULT_RECOMMENDATION_CONFIG) for setting in rows: try: config[setting.key] = float(setting.value) except (TypeError, ValueError): logger.warning("Invalid recommendation setting value for %s: %s", setting.key, setting.value) return config def _risk_level_from_conflicts(conflicts: list[str]) -> str: if not conflicts: return "Low" severe = [c for c in conflicts if "sentiment-technical" in c or "sentiment-momentum" in c] if len(severe) >= 2 or len(conflicts) >= 3: return "High" return "Medium" def _choose_recommended_action( long_confidence: float, short_confidence: float, config: dict[str, float], ) -> str: high = float(config.get("recommendation_high_confidence_threshold", 70.0)) moderate = float(config.get("recommendation_moderate_confidence_threshold", 50.0)) diff = float(config.get("recommendation_confidence_diff_threshold", 20.0)) if long_confidence >= high and (long_confidence - short_confidence) >= diff: return "LONG_HIGH" if short_confidence >= high and (short_confidence - long_confidence) >= diff: return "SHORT_HIGH" if long_confidence >= moderate and (long_confidence - short_confidence) >= diff: return "LONG_MODERATE" if short_confidence >= moderate and (short_confidence - long_confidence) >= diff: return "SHORT_MODERATE" return "NEUTRAL" def _build_reasoning( direction: str, confidence: float, conflicts: list[str], dimension_scores: dict[str, float], sentiment_classification: str | None, action: str, ) -> str: aligned, alignment_text = check_signal_alignment( direction, dimension_scores, sentiment_classification, ) sentiment = _sentiment_value(sentiment_classification) or "unknown" technical = float(dimension_scores.get("technical", 50.0)) momentum = float(dimension_scores.get("momentum", 50.0)) direction_text = direction.upper() alignment_summary = "aligned" if aligned else "mixed" base = ( f"{direction_text} confidence {confidence:.1f}% with {alignment_summary} signals " f"(technical={technical:.0f}, momentum={momentum:.0f}, sentiment={sentiment})." ) if conflicts: return ( f"{base} {alignment_text} Detected {len(conflicts)} conflict(s), " f"so recommendation is risk-adjusted. Action={action}." ) return f"{base} {alignment_text} No major conflicts detected. Action={action}." async def enhance_trade_setup( db: AsyncSession, ticker: Ticker, setup: TradeSetup, dimension_scores: dict[str, float], sr_levels: list[SRLevel], sentiment_classification: str | None, atr_value: float, ) -> TradeSetup: config = await get_recommendation_config(db) conflicts = signal_conflict_detector.detect_conflicts( dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, config=config, ) long_confidence = direction_analyzer.calculate_confidence( direction="long", dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, conflicts=conflicts, ) short_confidence = direction_analyzer.calculate_confidence( direction="short", dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, conflicts=conflicts, ) direction = setup.direction.lower() confidence = long_confidence if direction == "long" else short_confidence targets = target_generator.generate_targets( direction=direction, entry_price=setup.entry_price, stop_loss=setup.stop_loss, sr_levels=sr_levels, atr_value=atr_value, ) for target in targets: target["probability"] = probability_estimator.estimate_probability( target=target, dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, direction=direction, config=config, ) if len(targets) < 3: conflicts = [*conflicts, "target-availability: Fewer than 3 valid S/R targets available"] action = _choose_recommended_action(long_confidence, short_confidence, config) risk_level = _risk_level_from_conflicts(conflicts) setup.confidence_score = round(confidence, 2) setup.targets_json = json.dumps(targets) setup.conflict_flags_json = json.dumps(conflicts) setup.recommended_action = action setup.reasoning = _build_reasoning( direction=direction, confidence=confidence, conflicts=conflicts, dimension_scores=dimension_scores, sentiment_classification=sentiment_classification, action=action, ) setup.risk_level = risk_level return setup