Integrate unused indicators into technical scoring; fix indicator dropdown
Deploy / lint (push) Successful in 5s
Deploy / test (push) Successful in 28s
Deploy / deploy (push) Successful in 22s

- Technical dimension now uses all directional indicators:
  0.30*ADX + 0.20*EMA + 0.20*RSI + 0.15*EMA_Cross (bullish=80 /
  neutral=50 / bearish=20) + 0.10*Volume_Profile (POC proximity)
  + 0.05*Pivot_Points (structure confluence); weights re-normalize
  when data is insufficient, as before
- ATR stays out of scoring (volatility input for scanner stops,
  not a directional signal)
- IndicatorSelector uses the shared Select so the option list is
  dark instead of the native white popup
- Update technical scoring tests for the six-component breakdown

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-06-12 17:17:07 +02:00
parent 9c6a0a72fa
commit d139dd0390
3 changed files with 96 additions and 27 deletions
+67 -13
View File
@@ -88,7 +88,8 @@ async def _save_weights(db: AsyncSession, weights: dict[str, float]) -> None:
async def _compute_technical_score(
db: AsyncSession, symbol: str
) -> tuple[float | None, dict | None]:
"""Compute technical dimension score from ADX, EMA, RSI.
"""Compute technical dimension score from ADX, EMA, RSI, EMA Cross,
Volume Profile and Pivot Points.
Returns (score, breakdown) where breakdown follows the ScoreBreakdown
TypedDict shape: {sub_scores, formula, unavailable}.
@@ -96,7 +97,10 @@ async def _compute_technical_score(
from app.services.indicator_service import (
compute_adx,
compute_ema,
compute_ema_cross,
compute_pivot_points,
compute_rsi,
compute_volume_profile,
_extract_ohlcv,
)
from app.services.price_service import query_ohlcv
@@ -105,27 +109,33 @@ async def _compute_technical_score(
if not records:
return None, None
_, highs, lows, closes, _ = _extract_ohlcv(records)
_, highs, lows, closes, volumes = _extract_ohlcv(records)
formula = (
"Weighted average: 0.30*ADX + 0.20*EMA + 0.20*RSI + 0.15*EMA_Cross "
"+ 0.10*Volume_Profile + 0.05*Pivot_Points, re-normalized if any "
"sub-score unavailable."
)
scores: list[tuple[float, float]] = [] # (weight, score)
sub_scores: list[dict] = []
unavailable: list[dict[str, str]] = []
# ADX (weight 0.4) — needs 28+ bars
# ADX (weight 0.30) — needs 28+ bars
try:
adx_result = compute_adx(highs, lows, closes)
scores.append((0.4, adx_result["score"]))
scores.append((0.30, adx_result["score"]))
sub_scores.append({
"name": "ADX",
"score": adx_result["score"],
"weight": 0.4,
"weight": 0.30,
"raw_value": adx_result["adx"],
"description": "ADX value (0-100). Higher = stronger trend.",
})
except Exception as exc:
unavailable.append({"name": "ADX", "reason": str(exc) or "Insufficient data for ADX"})
# EMA (weight 0.3) — needs period+1 bars
# EMA (weight 0.20) — needs period+1 bars
try:
ema_result = compute_ema(closes)
pct_diff = (
@@ -138,35 +148,79 @@ async def _compute_technical_score(
if ema_result["ema"] != 0
else 0.0
)
scores.append((0.3, ema_result["score"]))
scores.append((0.20, ema_result["score"]))
sub_scores.append({
"name": "EMA",
"score": ema_result["score"],
"weight": 0.3,
"weight": 0.20,
"raw_value": pct_diff,
"description": f"Price {pct_diff}% {'above' if pct_diff >= 0 else 'below'} EMA(20). Score: 50 + pct_diff * 10.",
})
except Exception as exc:
unavailable.append({"name": "EMA", "reason": str(exc) or "Insufficient data for EMA"})
# RSI (weight 0.3) — needs 15+ bars
# RSI (weight 0.20) — needs 15+ bars
try:
rsi_result = compute_rsi(closes)
scores.append((0.3, rsi_result["score"]))
scores.append((0.20, rsi_result["score"]))
sub_scores.append({
"name": "RSI",
"score": rsi_result["score"],
"weight": 0.3,
"weight": 0.20,
"raw_value": rsi_result["rsi"],
"description": "RSI(14) value. Score equals RSI.",
})
except Exception as exc:
unavailable.append({"name": "RSI", "reason": str(exc) or "Insufficient data for RSI"})
# EMA Cross (weight 0.15) — needs 51+ bars. Directional trend signal.
try:
cross_result = compute_ema_cross(closes)
cross_score = {"bullish": 80.0, "neutral": 50.0, "bearish": 20.0}[cross_result["signal"]]
scores.append((0.15, cross_score))
sub_scores.append({
"name": "EMA_Cross",
"score": cross_score,
"weight": 0.15,
"raw_value": cross_result["signal"],
"description": "EMA(20) vs EMA(50): bullish=80, neutral=50, bearish=20.",
})
except Exception as exc:
unavailable.append({"name": "EMA_Cross", "reason": str(exc) or "Insufficient data for EMA Cross"})
# Volume Profile (weight 0.10) — needs 20+ bars. Price near POC = trading
# at accepted value; far from POC = extended.
try:
vp_result = compute_volume_profile(highs, lows, closes, volumes)
scores.append((0.10, vp_result["score"]))
sub_scores.append({
"name": "Volume_Profile",
"score": vp_result["score"],
"weight": 0.10,
"raw_value": vp_result["poc"],
"description": "Proximity of price to the Point of Control (volume-accepted value).",
})
except Exception as exc:
unavailable.append({"name": "Volume_Profile", "reason": str(exc) or "Insufficient data for Volume Profile"})
# Pivot Points (weight 0.05) — needs 5+ bars. Price near swing structure.
try:
pivot_result = compute_pivot_points(highs, lows, closes)
scores.append((0.05, pivot_result["score"]))
sub_scores.append({
"name": "Pivot_Points",
"score": pivot_result["score"],
"weight": 0.05,
"raw_value": pivot_result["pivot_count"],
"description": "Share of swing pivots within 2% of price (structure confluence).",
})
except Exception as exc:
unavailable.append({"name": "Pivot_Points", "reason": str(exc) or "Insufficient data for Pivot Points"})
if not scores:
breakdown: dict = {
"sub_scores": [],
"formula": "Weighted average: 0.4*ADX + 0.3*EMA + 0.3*RSI, re-normalized if any sub-score unavailable.",
"formula": formula,
"unavailable": unavailable,
}
return None, breakdown
@@ -179,7 +233,7 @@ async def _compute_technical_score(
breakdown = {
"sub_scores": sub_scores,
"formula": "Weighted average: 0.4*ADX + 0.3*EMA + 0.3*RSI, re-normalized if any sub-score unavailable.",
"formula": formula,
"unavailable": unavailable,
}