355 lines
12 KiB
Python
355 lines
12 KiB
Python
"""R:R Scanner service.
|
||
|
||
Scans tracked tickers for asymmetric risk-reward trade setups.
|
||
Long: target = nearest SR above, stop = entry - ATR × multiplier.
|
||
Short: target = nearest SR below, stop = entry + ATR × multiplier.
|
||
Filters by configurable R:R threshold (default 1.5).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
from datetime import datetime, timezone
|
||
|
||
from sqlalchemy import select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.exceptions import NotFoundError
|
||
from app.models.score import CompositeScore, DimensionScore
|
||
from app.models.sentiment import SentimentScore
|
||
from app.models.sr_level import SRLevel
|
||
from app.models.ticker import Ticker
|
||
from app.models.trade_setup import TradeSetup
|
||
from app.services.indicator_service import _extract_ohlcv, compute_atr
|
||
from app.services.price_service import query_ohlcv
|
||
from app.services.recommendation_service import enhance_trade_setup
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
async def _get_ticker(db: AsyncSession, symbol: str) -> Ticker:
|
||
normalised = symbol.strip().upper()
|
||
result = await db.execute(select(Ticker).where(Ticker.symbol == normalised))
|
||
ticker = result.scalar_one_or_none()
|
||
if ticker is None:
|
||
raise NotFoundError(f"Ticker not found: {normalised}")
|
||
return ticker
|
||
|
||
|
||
def _compute_quality_score(
|
||
rr: float,
|
||
strength: int,
|
||
distance: float,
|
||
entry_price: float,
|
||
*,
|
||
w_rr: float = 0.35,
|
||
w_strength: float = 0.35,
|
||
w_proximity: float = 0.30,
|
||
rr_cap: float = 10.0,
|
||
) -> float:
|
||
"""Compute a quality score for a candidate S/R level."""
|
||
norm_rr = min(rr / rr_cap, 1.0)
|
||
norm_strength = strength / 100.0
|
||
norm_proximity = 1.0 - min(distance / entry_price, 1.0)
|
||
return w_rr * norm_rr + w_strength * norm_strength + w_proximity * norm_proximity
|
||
|
||
|
||
async def _get_dimension_scores(db: AsyncSession, ticker_id: int) -> dict[str, float]:
|
||
result = await db.execute(
|
||
select(DimensionScore).where(DimensionScore.ticker_id == ticker_id)
|
||
)
|
||
rows = result.scalars().all()
|
||
return {row.dimension: float(row.score) for row in rows}
|
||
|
||
|
||
async def _get_latest_sentiment(db: AsyncSession, ticker_id: int) -> str | None:
|
||
result = await db.execute(
|
||
select(SentimentScore)
|
||
.where(SentimentScore.ticker_id == ticker_id)
|
||
.order_by(SentimentScore.timestamp.desc())
|
||
.limit(1)
|
||
)
|
||
row = result.scalar_one_or_none()
|
||
return row.classification if row else None
|
||
|
||
|
||
async def scan_ticker(
|
||
db: AsyncSession,
|
||
symbol: str,
|
||
rr_threshold: float = 1.5,
|
||
atr_multiplier: float = 1.5,
|
||
) -> list[TradeSetup]:
|
||
"""Scan a single ticker for trade setups meeting the R:R threshold."""
|
||
ticker = await _get_ticker(db, symbol)
|
||
|
||
records = await query_ohlcv(db, symbol)
|
||
if not records or len(records) < 15:
|
||
logger.info(
|
||
"Skipping %s: insufficient OHLCV data (%d bars, need 15+)",
|
||
symbol, len(records),
|
||
)
|
||
return []
|
||
|
||
_, highs, lows, closes, _ = _extract_ohlcv(records)
|
||
entry_price = closes[-1]
|
||
|
||
try:
|
||
atr_result = compute_atr(highs, lows, closes)
|
||
atr_value = atr_result["atr"]
|
||
except Exception:
|
||
logger.info("Skipping %s: cannot compute ATR", symbol)
|
||
return []
|
||
|
||
if atr_value <= 0:
|
||
logger.info("Skipping %s: ATR is zero or negative", symbol)
|
||
return []
|
||
|
||
sr_result = await db.execute(
|
||
select(SRLevel).where(SRLevel.ticker_id == ticker.id)
|
||
)
|
||
sr_levels = list(sr_result.scalars().all())
|
||
|
||
if not sr_levels:
|
||
logger.info("Skipping %s: no SR levels available", symbol)
|
||
return []
|
||
|
||
levels_above = sorted(
|
||
[lv for lv in sr_levels if lv.price_level > entry_price],
|
||
key=lambda lv: lv.price_level,
|
||
)
|
||
levels_below = sorted(
|
||
[lv for lv in sr_levels if lv.price_level < entry_price],
|
||
key=lambda lv: lv.price_level,
|
||
reverse=True,
|
||
)
|
||
|
||
comp_result = await db.execute(
|
||
select(CompositeScore).where(CompositeScore.ticker_id == ticker.id)
|
||
)
|
||
comp = comp_result.scalar_one_or_none()
|
||
composite_score = comp.score if comp else 0.0
|
||
|
||
dimension_scores = await _get_dimension_scores(db, ticker.id)
|
||
sentiment_classification = await _get_latest_sentiment(db, ticker.id)
|
||
|
||
now = datetime.now(timezone.utc)
|
||
setups: list[TradeSetup] = []
|
||
|
||
if levels_above:
|
||
stop = entry_price - (atr_value * atr_multiplier)
|
||
risk = entry_price - stop
|
||
if risk > 0:
|
||
best_quality = 0.0
|
||
best_candidate_rr = 0.0
|
||
best_candidate_target = 0.0
|
||
for lv in levels_above:
|
||
reward = lv.price_level - entry_price
|
||
if reward <= 0:
|
||
continue
|
||
rr = reward / risk
|
||
if rr < rr_threshold:
|
||
continue
|
||
distance = lv.price_level - entry_price
|
||
quality = _compute_quality_score(rr, lv.strength, distance, entry_price)
|
||
if quality > best_quality:
|
||
best_quality = quality
|
||
best_candidate_rr = rr
|
||
best_candidate_target = lv.price_level
|
||
|
||
if best_candidate_rr > 0:
|
||
setups.append(TradeSetup(
|
||
ticker_id=ticker.id,
|
||
direction="long",
|
||
entry_price=round(entry_price, 4),
|
||
stop_loss=round(stop, 4),
|
||
target=round(best_candidate_target, 4),
|
||
rr_ratio=round(best_candidate_rr, 4),
|
||
composite_score=round(composite_score, 4),
|
||
detected_at=now,
|
||
))
|
||
|
||
if levels_below:
|
||
stop = entry_price + (atr_value * atr_multiplier)
|
||
risk = stop - entry_price
|
||
if risk > 0:
|
||
best_quality = 0.0
|
||
best_candidate_rr = 0.0
|
||
best_candidate_target = 0.0
|
||
for lv in levels_below:
|
||
reward = entry_price - lv.price_level
|
||
if reward <= 0:
|
||
continue
|
||
rr = reward / risk
|
||
if rr < rr_threshold:
|
||
continue
|
||
distance = entry_price - lv.price_level
|
||
quality = _compute_quality_score(rr, lv.strength, distance, entry_price)
|
||
if quality > best_quality:
|
||
best_quality = quality
|
||
best_candidate_rr = rr
|
||
best_candidate_target = lv.price_level
|
||
|
||
if best_candidate_rr > 0:
|
||
setups.append(TradeSetup(
|
||
ticker_id=ticker.id,
|
||
direction="short",
|
||
entry_price=round(entry_price, 4),
|
||
stop_loss=round(stop, 4),
|
||
target=round(best_candidate_target, 4),
|
||
rr_ratio=round(best_candidate_rr, 4),
|
||
composite_score=round(composite_score, 4),
|
||
detected_at=now,
|
||
))
|
||
|
||
enhanced_setups: list[TradeSetup] = []
|
||
for setup in setups:
|
||
try:
|
||
enhanced = await enhance_trade_setup(
|
||
db=db,
|
||
ticker=ticker,
|
||
setup=setup,
|
||
dimension_scores=dimension_scores,
|
||
sr_levels=sr_levels,
|
||
sentiment_classification=sentiment_classification,
|
||
atr_value=atr_value,
|
||
)
|
||
enhanced_setups.append(enhanced)
|
||
except Exception:
|
||
logger.exception("Error enhancing setup for %s (%s)", ticker.symbol, setup.direction)
|
||
enhanced_setups.append(setup)
|
||
|
||
for setup in enhanced_setups:
|
||
db.add(setup)
|
||
|
||
await db.commit()
|
||
|
||
for s in enhanced_setups:
|
||
await db.refresh(s)
|
||
|
||
return enhanced_setups
|
||
|
||
|
||
async def scan_all_tickers(
|
||
db: AsyncSession,
|
||
rr_threshold: float = 1.5,
|
||
atr_multiplier: float = 1.5,
|
||
) -> list[TradeSetup]:
|
||
"""Scan all tracked tickers for trade setups."""
|
||
result = await db.execute(select(Ticker).order_by(Ticker.symbol))
|
||
tickers = list(result.scalars().all())
|
||
|
||
all_setups: list[TradeSetup] = []
|
||
for ticker in tickers:
|
||
try:
|
||
setups = await scan_ticker(
|
||
db, ticker.symbol, rr_threshold, atr_multiplier
|
||
)
|
||
all_setups.extend(setups)
|
||
except Exception:
|
||
logger.exception("Error scanning ticker %s", ticker.symbol)
|
||
|
||
return all_setups
|
||
|
||
|
||
async def get_trade_setups(
|
||
db: AsyncSession,
|
||
direction: str | None = None,
|
||
min_confidence: float | None = None,
|
||
recommended_action: str | None = None,
|
||
symbol: str | None = None,
|
||
) -> list[dict]:
|
||
"""Get latest stored trade setups, optionally filtered."""
|
||
stmt = (
|
||
select(TradeSetup, Ticker.symbol)
|
||
.join(Ticker, TradeSetup.ticker_id == Ticker.id)
|
||
)
|
||
if direction is not None:
|
||
stmt = stmt.where(TradeSetup.direction == direction.lower())
|
||
if symbol is not None:
|
||
stmt = stmt.where(Ticker.symbol == symbol.strip().upper())
|
||
if min_confidence is not None:
|
||
stmt = stmt.where(TradeSetup.confidence_score >= min_confidence)
|
||
if recommended_action is not None:
|
||
stmt = stmt.where(TradeSetup.recommended_action == recommended_action)
|
||
|
||
stmt = stmt.order_by(TradeSetup.detected_at.desc(), TradeSetup.id.desc())
|
||
|
||
result = await db.execute(stmt)
|
||
rows = result.all()
|
||
|
||
latest_by_key: dict[tuple[str, str], tuple[TradeSetup, str]] = {}
|
||
for setup, ticker_symbol in rows:
|
||
dedupe_key = (ticker_symbol, setup.direction)
|
||
if dedupe_key not in latest_by_key:
|
||
latest_by_key[dedupe_key] = (setup, ticker_symbol)
|
||
|
||
latest_rows = list(latest_by_key.values())
|
||
latest_rows.sort(
|
||
key=lambda row: (
|
||
row[0].confidence_score if row[0].confidence_score is not None else -1.0,
|
||
row[0].rr_ratio,
|
||
row[0].composite_score,
|
||
),
|
||
reverse=True,
|
||
)
|
||
|
||
return [_trade_setup_to_dict(setup, ticker_symbol) for setup, ticker_symbol in latest_rows]
|
||
|
||
|
||
async def get_trade_setup_history(
|
||
db: AsyncSession,
|
||
symbol: str,
|
||
) -> list[dict]:
|
||
"""Get full recommendation history for a symbol (newest first)."""
|
||
stmt = (
|
||
select(TradeSetup, Ticker.symbol)
|
||
.join(Ticker, TradeSetup.ticker_id == Ticker.id)
|
||
.where(Ticker.symbol == symbol.strip().upper())
|
||
.order_by(TradeSetup.detected_at.desc(), TradeSetup.id.desc())
|
||
)
|
||
result = await db.execute(stmt)
|
||
rows = result.all()
|
||
|
||
return [_trade_setup_to_dict(setup, ticker_symbol) for setup, ticker_symbol in rows]
|
||
|
||
|
||
def _trade_setup_to_dict(setup: TradeSetup, symbol: str) -> dict:
|
||
targets: list[dict] = []
|
||
conflicts: list[str] = []
|
||
|
||
if setup.targets_json:
|
||
try:
|
||
parsed_targets = json.loads(setup.targets_json)
|
||
if isinstance(parsed_targets, list):
|
||
targets = parsed_targets
|
||
except (TypeError, ValueError):
|
||
targets = []
|
||
|
||
if setup.conflict_flags_json:
|
||
try:
|
||
parsed_conflicts = json.loads(setup.conflict_flags_json)
|
||
if isinstance(parsed_conflicts, list):
|
||
conflicts = [str(item) for item in parsed_conflicts]
|
||
except (TypeError, ValueError):
|
||
conflicts = []
|
||
|
||
return {
|
||
"id": setup.id,
|
||
"symbol": symbol,
|
||
"direction": setup.direction,
|
||
"entry_price": setup.entry_price,
|
||
"stop_loss": setup.stop_loss,
|
||
"target": setup.target,
|
||
"rr_ratio": setup.rr_ratio,
|
||
"composite_score": setup.composite_score,
|
||
"detected_at": setup.detected_at,
|
||
"confidence_score": setup.confidence_score,
|
||
"targets": targets,
|
||
"conflict_flags": conflicts,
|
||
"recommended_action": setup.recommended_action,
|
||
"reasoning": setup.reasoning,
|
||
"risk_level": setup.risk_level,
|
||
"actual_outcome": setup.actual_outcome,
|
||
}
|