Files
signal-platform/app/services/watchlist_service.py
T
dennisthiessen 6e06f51bb6
Deploy / lint (push) Successful in 6s
Deploy / test (push) Successful in 35s
Deploy / deploy (push) Successful in 24s
make watchlist fully manual; add price + day-change, two-block overview
Per design decision: the watchlist is now purely user-curated (no auto-seeding
of the top-10), so the auto_populate/dismissed machinery is removed and removals
are plain deletes. Each entry is enriched with latest close + day-over-day move.

Overview now shows two clear blocks: Top Setups (what to trade) and My Watchlist
(my names with current price and today's %). Market watchlist table drops the
now-meaningless auto/manual Type column in favour of Price and Day columns.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 14:25:04 +02:00

234 lines
6.8 KiB
Python

"""Watchlist service.
A purely user-curated watchlist: the user adds and removes tickers, capped at
WATCHLIST_MAX entries. Each entry is enriched on read with its composite score,
best trade setup, active S/R levels, and latest price + day-over-day move.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.exceptions import DuplicateError, NotFoundError, ValidationError
from app.models.ohlcv import OHLCVRecord
from app.models.score import CompositeScore, DimensionScore
from app.models.sr_level import SRLevel
from app.models.ticker import Ticker
from app.models.trade_setup import TradeSetup
from app.models.watchlist import WatchlistEntry
logger = logging.getLogger(__name__)
WATCHLIST_MAX = 20
async def _get_ticker(db: AsyncSession, symbol: str) -> Ticker:
normalised = symbol.strip().upper()
result = await db.execute(select(Ticker).where(Ticker.symbol == normalised))
ticker = result.scalar_one_or_none()
if ticker is None:
raise NotFoundError(f"Ticker not found: {normalised}")
return ticker
async def add_manual_entry(
db: AsyncSession,
user_id: int,
symbol: str,
) -> WatchlistEntry:
"""Add a ticker to the user's watchlist.
Raises DuplicateError if already on the watchlist.
Raises ValidationError if the watchlist cap is reached.
"""
ticker = await _get_ticker(db, symbol)
existing = await db.execute(
select(WatchlistEntry).where(
WatchlistEntry.user_id == user_id,
WatchlistEntry.ticker_id == ticker.id,
)
)
if existing.scalar_one_or_none() is not None:
raise DuplicateError(f"Ticker already on watchlist: {ticker.symbol}")
count_result = await db.execute(
select(func.count()).select_from(WatchlistEntry).where(
WatchlistEntry.user_id == user_id,
)
)
total = count_result.scalar() or 0
if total >= WATCHLIST_MAX:
raise ValidationError(
f"Watchlist cap reached ({WATCHLIST_MAX}). "
"Remove an entry before adding a new one."
)
entry = WatchlistEntry(
user_id=user_id,
ticker_id=ticker.id,
entry_type="manual",
added_at=datetime.now(timezone.utc),
)
db.add(entry)
await db.commit()
await db.refresh(entry)
return entry
async def remove_entry(
db: AsyncSession,
user_id: int,
symbol: str,
) -> None:
"""Remove a ticker from the user's watchlist."""
ticker = await _get_ticker(db, symbol)
result = await db.execute(
select(WatchlistEntry).where(
WatchlistEntry.user_id == user_id,
WatchlistEntry.ticker_id == ticker.id,
)
)
entry = result.scalar_one_or_none()
if entry is None:
raise NotFoundError(f"Ticker not on watchlist: {ticker.symbol}")
await db.delete(entry)
await db.commit()
async def _enrich_entry(
db: AsyncSession,
entry: WatchlistEntry,
symbol: str,
) -> dict:
"""Build enriched watchlist entry dict with scores, R:R, SR levels, price."""
ticker_id = entry.ticker_id
# Composite score
comp_result = await db.execute(
select(CompositeScore).where(CompositeScore.ticker_id == ticker_id)
)
comp = comp_result.scalar_one_or_none()
# Dimension scores
dim_result = await db.execute(
select(DimensionScore).where(DimensionScore.ticker_id == ticker_id)
)
dims = [
{"dimension": ds.dimension, "score": ds.score}
for ds in dim_result.scalars().all()
]
# Best trade setup (highest R:R) for this ticker
setup_result = await db.execute(
select(TradeSetup)
.where(TradeSetup.ticker_id == ticker_id)
.order_by(TradeSetup.rr_ratio.desc())
.limit(1)
)
setup = setup_result.scalar_one_or_none()
# Active SR levels
sr_result = await db.execute(
select(SRLevel)
.where(SRLevel.ticker_id == ticker_id)
.order_by(SRLevel.strength.desc())
)
sr_levels = [
{
"price_level": lv.price_level,
"type": lv.type,
"strength": lv.strength,
}
for lv in sr_result.scalars().all()
]
# Latest two daily closes → current price + day-over-day move
price_result = await db.execute(
select(OHLCVRecord.close, OHLCVRecord.date)
.where(OHLCVRecord.ticker_id == ticker_id)
.order_by(OHLCVRecord.date.desc())
.limit(2)
)
bars = price_result.all()
last_close = bars[0].close if bars else None
prev_close = bars[1].close if len(bars) > 1 else None
change_pct = (
(last_close - prev_close) / prev_close * 100
if last_close is not None and prev_close
else None
)
price_date = bars[0].date if bars else None
return {
"symbol": symbol,
"entry_type": entry.entry_type,
"composite_score": comp.score if comp else None,
"dimensions": dims,
"rr_ratio": setup.rr_ratio if setup else None,
"rr_direction": setup.direction if setup else None,
"sr_levels": sr_levels,
"last_close": last_close,
"change_pct": change_pct,
"price_date": price_date,
"added_at": entry.added_at,
}
async def get_watchlist(
db: AsyncSession,
user_id: int,
sort_by: str = "composite",
) -> list[dict]:
"""Get the user's watchlist with enriched data.
sort_by: "composite", "rr", "change", or a dimension name
(e.g. "technical", "sr_quality", "sentiment", "fundamental", "momentum").
"""
stmt = (
select(WatchlistEntry, Ticker.symbol)
.join(Ticker, WatchlistEntry.ticker_id == Ticker.id)
.where(WatchlistEntry.user_id == user_id)
)
result = await db.execute(stmt)
rows = result.all()
entries: list[dict] = []
for entry, symbol in rows:
enriched = await _enrich_entry(db, entry, symbol)
entries.append(enriched)
# Sort
if sort_by == "composite":
entries.sort(
key=lambda e: e["composite_score"] if e["composite_score"] is not None else -1,
reverse=True,
)
elif sort_by == "rr":
entries.sort(
key=lambda e: e["rr_ratio"] if e["rr_ratio"] is not None else -1,
reverse=True,
)
elif sort_by == "change":
entries.sort(
key=lambda e: e["change_pct"] if e["change_pct"] is not None else float("-inf"),
reverse=True,
)
else:
# Sort by a specific dimension score
def _dim_sort_key(e: dict) -> float:
for d in e["dimensions"]:
if d["dimension"] == sort_by:
return d["score"]
return -1.0
entries.sort(key=_dim_sort_key, reverse=True)
return entries