Files
signal-platform/app/services/recommendation_service.py
Dennis Thiessen 0a011d4ce9
Some checks failed
Deploy / lint (push) Failing after 21s
Deploy / test (push) Has been skipped
Deploy / deploy (push) Has been skipped
Big refactoring
2026-03-03 15:20:18 +01:00

500 lines
17 KiB
Python

from __future__ import annotations
import json
import logging
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.settings import SystemSetting
from app.models.sr_level import SRLevel
from app.models.ticker import Ticker
from app.models.trade_setup import TradeSetup
logger = logging.getLogger(__name__)
DEFAULT_RECOMMENDATION_CONFIG: dict[str, float] = {
"recommendation_high_confidence_threshold": 70.0,
"recommendation_moderate_confidence_threshold": 50.0,
"recommendation_confidence_diff_threshold": 20.0,
"recommendation_signal_alignment_weight": 0.15,
"recommendation_sr_strength_weight": 0.20,
"recommendation_distance_penalty_factor": 0.10,
"recommendation_momentum_technical_divergence_threshold": 30.0,
"recommendation_fundamental_technical_divergence_threshold": 40.0,
}
def _clamp(value: float, low: float, high: float) -> float:
return max(low, min(high, value))
def _sentiment_value(sentiment_classification: str | None) -> str | None:
if sentiment_classification is None:
return None
return sentiment_classification.strip().lower()
def check_signal_alignment(
direction: str,
dimension_scores: dict[str, float],
sentiment_classification: str | None,
) -> tuple[bool, str]:
technical = float(dimension_scores.get("technical", 50.0))
momentum = float(dimension_scores.get("momentum", 50.0))
sentiment = _sentiment_value(sentiment_classification)
if direction == "long":
aligned_count = sum([
technical > 60,
momentum > 60,
sentiment == "bullish",
])
if aligned_count >= 2:
return True, "Technical, momentum, and/or sentiment align with LONG direction."
return False, "Signals are mixed for LONG direction."
aligned_count = sum([
technical < 40,
momentum < 40,
sentiment == "bearish",
])
if aligned_count >= 2:
return True, "Technical, momentum, and/or sentiment align with SHORT direction."
return False, "Signals are mixed for SHORT direction."
class SignalConflictDetector:
def detect_conflicts(
self,
dimension_scores: dict[str, float],
sentiment_classification: str | None,
config: dict[str, float] | None = None,
) -> list[str]:
cfg = config or DEFAULT_RECOMMENDATION_CONFIG
technical = float(dimension_scores.get("technical", 50.0))
momentum = float(dimension_scores.get("momentum", 50.0))
fundamental = float(dimension_scores.get("fundamental", 50.0))
sentiment = _sentiment_value(sentiment_classification)
mt_threshold = float(cfg.get("recommendation_momentum_technical_divergence_threshold", 30.0))
ft_threshold = float(cfg.get("recommendation_fundamental_technical_divergence_threshold", 40.0))
conflicts: list[str] = []
if sentiment == "bearish" and technical > 60:
conflicts.append(
f"sentiment-technical: Bearish sentiment conflicts with bullish technical ({technical:.0f})"
)
if sentiment == "bullish" and technical < 40:
conflicts.append(
f"sentiment-technical: Bullish sentiment conflicts with bearish technical ({technical:.0f})"
)
mt_diff = abs(momentum - technical)
if mt_diff > mt_threshold:
conflicts.append(
"momentum-technical: "
f"Momentum ({momentum:.0f}) diverges from technical ({technical:.0f}) by {mt_diff:.0f} points"
)
if sentiment == "bearish" and momentum > 60:
conflicts.append(
f"sentiment-momentum: Bearish sentiment conflicts with momentum ({momentum:.0f})"
)
if sentiment == "bullish" and momentum < 40:
conflicts.append(
f"sentiment-momentum: Bullish sentiment conflicts with momentum ({momentum:.0f})"
)
ft_diff = abs(fundamental - technical)
if ft_diff > ft_threshold:
conflicts.append(
"fundamental-technical: "
f"Fundamental ({fundamental:.0f}) diverges significantly from technical ({technical:.0f})"
)
return conflicts
class DirectionAnalyzer:
def calculate_confidence(
self,
direction: str,
dimension_scores: dict[str, float],
sentiment_classification: str | None,
conflicts: list[str] | None = None,
) -> float:
confidence = 50.0
technical = float(dimension_scores.get("technical", 50.0))
momentum = float(dimension_scores.get("momentum", 50.0))
fundamental = float(dimension_scores.get("fundamental", 50.0))
sentiment = _sentiment_value(sentiment_classification)
if direction == "long":
if technical > 70:
confidence += 25.0
elif technical > 60:
confidence += 15.0
if momentum > 70:
confidence += 20.0
elif momentum > 60:
confidence += 15.0
if sentiment == "bullish":
confidence += 15.0
elif sentiment == "neutral":
confidence += 5.0
if fundamental > 60:
confidence += 10.0
else:
if technical < 30:
confidence += 25.0
elif technical < 40:
confidence += 15.0
if momentum < 30:
confidence += 20.0
elif momentum < 40:
confidence += 15.0
if sentiment == "bearish":
confidence += 15.0
elif sentiment == "neutral":
confidence += 5.0
if fundamental < 40:
confidence += 10.0
for conflict in conflicts or []:
if "sentiment-technical" in conflict:
confidence -= 20.0
elif "momentum-technical" in conflict:
confidence -= 15.0
elif "sentiment-momentum" in conflict:
confidence -= 20.0
elif "fundamental-technical" in conflict:
confidence -= 10.0
return _clamp(confidence, 0.0, 100.0)
class TargetGenerator:
def generate_targets(
self,
direction: str,
entry_price: float,
stop_loss: float,
sr_levels: list[SRLevel],
atr_value: float,
) -> list[dict[str, Any]]:
if atr_value <= 0:
return []
risk = abs(entry_price - stop_loss)
if risk <= 0:
return []
candidates: list[dict[str, Any]] = []
atr_pct = atr_value / entry_price if entry_price > 0 else 0.0
max_atr_multiple: float | None = None
if atr_pct > 0.05:
max_atr_multiple = 10.0
elif atr_pct < 0.02:
max_atr_multiple = 3.0
for level in sr_levels:
is_candidate = False
if direction == "long":
is_candidate = level.type == "resistance" and level.price_level > entry_price
else:
is_candidate = level.type == "support" and level.price_level < entry_price
if not is_candidate:
continue
distance = abs(level.price_level - entry_price)
distance_atr_multiple = distance / atr_value
if distance_atr_multiple < 1.0:
continue
if max_atr_multiple is not None and distance_atr_multiple > max_atr_multiple:
continue
reward = abs(level.price_level - entry_price)
rr_ratio = reward / risk
norm_rr = min(rr_ratio / 10.0, 1.0)
norm_strength = _clamp(level.strength, 0, 100) / 100.0
norm_proximity = 1.0 - min(distance / entry_price, 1.0)
quality = 0.35 * norm_rr + 0.35 * norm_strength + 0.30 * norm_proximity
candidates.append(
{
"price": float(level.price_level),
"distance_from_entry": float(distance),
"distance_atr_multiple": float(distance_atr_multiple),
"rr_ratio": float(rr_ratio),
"classification": "Moderate",
"sr_level_id": int(level.id),
"sr_strength": float(level.strength),
"quality": float(quality),
}
)
candidates.sort(key=lambda row: row["quality"], reverse=True)
selected = candidates[:5]
selected.sort(key=lambda row: row["distance_from_entry"])
if not selected:
return []
n = len(selected)
for idx, target in enumerate(selected):
if n <= 2:
target["classification"] = "Conservative" if idx == 0 else "Aggressive"
elif idx <= 1:
target["classification"] = "Conservative"
elif idx >= n - 2:
target["classification"] = "Aggressive"
else:
target["classification"] = "Moderate"
target.pop("quality", None)
return selected
class ProbabilityEstimator:
def estimate_probability(
self,
target: dict[str, Any],
dimension_scores: dict[str, float],
sentiment_classification: str | None,
direction: str,
config: dict[str, float],
) -> float:
classification = str(target.get("classification", "Moderate"))
strength = float(target.get("sr_strength", 50.0))
atr_multiple = float(target.get("distance_atr_multiple", 1.0))
if classification == "Conservative":
base_prob = 70.0
elif classification == "Aggressive":
base_prob = 40.0
else:
base_prob = 55.0
if strength >= 80:
strength_adj = 15.0
elif strength >= 60:
strength_adj = 10.0
elif strength >= 40:
strength_adj = 5.0
else:
strength_adj = -10.0
technical = float(dimension_scores.get("technical", 50.0))
momentum = float(dimension_scores.get("momentum", 50.0))
sentiment = _sentiment_value(sentiment_classification)
alignment_adj = 0.0
if direction == "long":
if technical > 60 and (sentiment == "bullish" or momentum > 60):
alignment_adj = 15.0
elif technical < 40 or (sentiment == "bearish" and momentum < 40):
alignment_adj = -15.0
else:
if technical < 40 and (sentiment == "bearish" or momentum < 40):
alignment_adj = 15.0
elif technical > 60 or (sentiment == "bullish" and momentum > 60):
alignment_adj = -15.0
volatility_adj = 0.0
if atr_multiple > 5:
volatility_adj = 5.0
elif atr_multiple < 2:
volatility_adj = 5.0
signal_weight = float(config.get("recommendation_signal_alignment_weight", 0.15))
sr_weight = float(config.get("recommendation_sr_strength_weight", 0.20))
distance_penalty = float(config.get("recommendation_distance_penalty_factor", 0.10))
scaled_alignment_adj = alignment_adj * (signal_weight / 0.15)
scaled_strength_adj = strength_adj * (sr_weight / 0.20)
distance_adj = -distance_penalty * max(atr_multiple - 1.0, 0.0) * 2.0
probability = base_prob + scaled_strength_adj + scaled_alignment_adj + volatility_adj + distance_adj
probability = _clamp(probability, 10.0, 90.0)
if classification == "Conservative":
probability = max(probability, 61.0)
elif classification == "Moderate":
probability = _clamp(probability, 40.0, 70.0)
elif classification == "Aggressive":
probability = min(probability, 49.0)
return round(probability, 2)
signal_conflict_detector = SignalConflictDetector()
direction_analyzer = DirectionAnalyzer()
target_generator = TargetGenerator()
probability_estimator = ProbabilityEstimator()
async def get_recommendation_config(db: AsyncSession) -> dict[str, float]:
result = await db.execute(
select(SystemSetting).where(SystemSetting.key.like("recommendation_%"))
)
rows = result.scalars().all()
config: dict[str, float] = dict(DEFAULT_RECOMMENDATION_CONFIG)
for setting in rows:
try:
config[setting.key] = float(setting.value)
except (TypeError, ValueError):
logger.warning("Invalid recommendation setting value for %s: %s", setting.key, setting.value)
return config
def _risk_level_from_conflicts(conflicts: list[str]) -> str:
if not conflicts:
return "Low"
severe = [c for c in conflicts if "sentiment-technical" in c or "sentiment-momentum" in c]
if len(severe) >= 2 or len(conflicts) >= 3:
return "High"
return "Medium"
def _choose_recommended_action(
long_confidence: float,
short_confidence: float,
config: dict[str, float],
) -> str:
high = float(config.get("recommendation_high_confidence_threshold", 70.0))
moderate = float(config.get("recommendation_moderate_confidence_threshold", 50.0))
diff = float(config.get("recommendation_confidence_diff_threshold", 20.0))
if long_confidence >= high and (long_confidence - short_confidence) >= diff:
return "LONG_HIGH"
if short_confidence >= high and (short_confidence - long_confidence) >= diff:
return "SHORT_HIGH"
if long_confidence >= moderate and (long_confidence - short_confidence) >= diff:
return "LONG_MODERATE"
if short_confidence >= moderate and (short_confidence - long_confidence) >= diff:
return "SHORT_MODERATE"
return "NEUTRAL"
def _build_reasoning(
direction: str,
confidence: float,
conflicts: list[str],
dimension_scores: dict[str, float],
sentiment_classification: str | None,
action: str,
) -> str:
aligned, alignment_text = check_signal_alignment(
direction,
dimension_scores,
sentiment_classification,
)
sentiment = _sentiment_value(sentiment_classification) or "unknown"
technical = float(dimension_scores.get("technical", 50.0))
momentum = float(dimension_scores.get("momentum", 50.0))
direction_text = direction.upper()
alignment_summary = "aligned" if aligned else "mixed"
base = (
f"{direction_text} confidence {confidence:.1f}% with {alignment_summary} signals "
f"(technical={technical:.0f}, momentum={momentum:.0f}, sentiment={sentiment})."
)
if conflicts:
return (
f"{base} {alignment_text} Detected {len(conflicts)} conflict(s), "
f"so recommendation is risk-adjusted. Action={action}."
)
return f"{base} {alignment_text} No major conflicts detected. Action={action}."
async def enhance_trade_setup(
db: AsyncSession,
ticker: Ticker,
setup: TradeSetup,
dimension_scores: dict[str, float],
sr_levels: list[SRLevel],
sentiment_classification: str | None,
atr_value: float,
) -> TradeSetup:
config = await get_recommendation_config(db)
conflicts = signal_conflict_detector.detect_conflicts(
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
config=config,
)
long_confidence = direction_analyzer.calculate_confidence(
direction="long",
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
conflicts=conflicts,
)
short_confidence = direction_analyzer.calculate_confidence(
direction="short",
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
conflicts=conflicts,
)
direction = setup.direction.lower()
confidence = long_confidence if direction == "long" else short_confidence
targets = target_generator.generate_targets(
direction=direction,
entry_price=setup.entry_price,
stop_loss=setup.stop_loss,
sr_levels=sr_levels,
atr_value=atr_value,
)
for target in targets:
target["probability"] = probability_estimator.estimate_probability(
target=target,
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
direction=direction,
config=config,
)
if len(targets) < 3:
conflicts = [*conflicts, "target-availability: Fewer than 3 valid S/R targets available"]
action = _choose_recommended_action(long_confidence, short_confidence, config)
risk_level = _risk_level_from_conflicts(conflicts)
setup.confidence_score = round(confidence, 2)
setup.targets_json = json.dumps(targets)
setup.conflict_flags_json = json.dumps(conflicts)
setup.recommended_action = action
setup.reasoning = _build_reasoning(
direction=direction,
confidence=confidence,
conflicts=conflicts,
dimension_scores=dimension_scores,
sentiment_classification=sentiment_classification,
action=action,
)
setup.risk_level = risk_level
return setup