Files
signal-platform/app/providers/openai_compatible_sentiment.py
T
dennisthiessen 126c3b3c17
Deploy / lint (push) Successful in 5s
Deploy / test (push) Successful in 32s
Deploy / deploy (push) Successful in 22s
Add DeepSeek/xAI/OpenAI-compatible sentiment providers; custom dark dropdown
Providers (admin-switchable, no redeploy):
- DeepSeek and any OpenAI-compatible endpoint (OpenRouter, Together,
  Groq, local Ollama) via a generic Chat Completions adapter + base_url
- xAI Grok with Live Search (search_parameters web+X, citations) —
  grounded tier alongside OpenAI and Gemini
- DeepSeek / generic compatible endpoints are ungrounded (no web
  search); UI shows an amber warning and labels each provider's grounding
- Optional env fallbacks DEEPSEEK_API_KEY / XAI_API_KEY

UI: replace native <select> (unstyleable white popup on Windows) with a
custom dark Dropdown component everywhere — sentiment provider, scanner
filters, market sort, indicators, admin universe, user role.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 12:42:04 +02:00

172 lines
6.7 KiB
Python

"""Sentiment provider for any OpenAI-compatible Chat Completions endpoint.
Covers DeepSeek, OpenRouter, Together, Groq, Mistral, local Ollama, etc. — any
service exposing the OpenAI Chat Completions API at a custom base_url.
NOTE: Unlike the OpenAI Responses provider and Gemini, this path has NO web
search grounding. Sentiment reflects the model's training knowledge, not live
news. Cheap, but not real-time.
"""
from __future__ import annotations
import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
import httpx
from openai import AsyncOpenAI
from app.exceptions import ProviderError, RateLimitError
from app.providers.protocol import SentimentData
logger = logging.getLogger(__name__)
_CA_BUNDLE = os.environ.get("SSL_CERT_FILE", "")
_SENTIMENT_PROMPT = """\
Assess the CURRENT market sentiment for the stock ticker {ticker} based on your \
knowledge of the company, its sector, and recent developments you are aware of.
Respond ONLY with a JSON object in this exact format (no markdown, no extra text):
{{"classification": "<bullish|bearish|neutral>", "confidence": <0-100>, "reasoning": "<brief explanation>"}}
Rules:
- classification must be exactly one of: bullish, bearish, neutral
- confidence must be an integer from 0 to 100
- reasoning should be a brief one-sentence explanation
"""
_SENTIMENT_PROMPT_SEARCH = """\
Search the web and X for the LATEST news, analyst opinions, and market developments \
about the stock ticker {ticker} from the past 24-48 hours.
Based on your search findings, analyze the CURRENT market sentiment.
Respond ONLY with a JSON object in this exact format (no markdown, no extra text):
{{"classification": "<bullish|bearish|neutral>", "confidence": <0-100>, "reasoning": "<brief explanation citing recent news>"}}
Rules:
- classification must be exactly one of: bullish, bearish, neutral
- confidence must be an integer from 0 to 100
- reasoning should cite specific recent news or events you found
"""
VALID_CLASSIFICATIONS = {"bullish", "bearish", "neutral"}
def _clean_json_text(raw: str) -> str:
clean = raw.strip()
if clean.startswith("```"):
clean = clean.split("\n", 1)[1] if "\n" in clean else clean[3:]
if clean.endswith("```"):
clean = clean[:-3]
return clean.strip()
class OpenAICompatibleSentimentProvider:
"""Sentiment via the OpenAI Chat Completions API at a configurable base_url."""
def __init__(
self,
api_key: str,
model: str,
base_url: str,
source: str = "openai_compatible",
live_search: bool = False,
extra_body: dict | None = None,
) -> None:
if not api_key:
raise ProviderError("API key is required")
if not base_url:
raise ProviderError("base_url is required for an OpenAI-compatible provider")
if not model:
raise ProviderError("model is required")
http_kwargs: dict = {}
if _CA_BUNDLE and Path(_CA_BUNDLE).exists():
http_kwargs["verify"] = _CA_BUNDLE
http_client = httpx.AsyncClient(**http_kwargs)
self._client = AsyncOpenAI(api_key=api_key, base_url=base_url, http_client=http_client)
self._model = model
self._source = source
self._live_search = live_search
self._extra_body = extra_body
@staticmethod
def _extract_citations(response: object) -> list[dict[str, str]]:
"""Best-effort extraction of xAI Live Search citations (list of URLs)."""
raw = getattr(response, "citations", None)
if not raw:
extra = getattr(response, "model_extra", None) or {}
raw = extra.get("citations") if isinstance(extra, dict) else None
citations: list[dict[str, str]] = []
for item in raw or []:
if isinstance(item, str):
citations.append({"url": item, "title": ""})
elif isinstance(item, dict) and item.get("url"):
citations.append({"url": str(item["url"]), "title": str(item.get("title", ""))})
return citations
async def fetch_sentiment(self, ticker: str) -> SentimentData:
prompt = _SENTIMENT_PROMPT_SEARCH if self._live_search else _SENTIMENT_PROMPT
kwargs: dict = {}
if self._extra_body:
kwargs["extra_body"] = self._extra_body
try:
response = await self._client.chat.completions.create(
model=self._model,
messages=[
{
"role": "system",
"content": "You are a financial sentiment analyst. Always respond with valid JSON only, no markdown fences.",
},
{"role": "user", "content": prompt.format(ticker=ticker)},
],
temperature=0.3,
**kwargs,
)
raw_text = (response.choices[0].message.content or "").strip()
if not raw_text:
raise ProviderError(f"Empty response from {self._source} for {ticker}")
parsed = json.loads(_clean_json_text(raw_text))
classification = str(parsed.get("classification", "")).lower()
if classification not in VALID_CLASSIFICATIONS:
raise ProviderError(
f"Invalid classification '{classification}' from {self._source} for {ticker}"
)
confidence = max(0, min(100, int(parsed.get("confidence", 50))))
reasoning = str(parsed.get("reasoning", ""))
if reasoning:
logger.info(
"%s sentiment for %s: %s (confidence=%d) — %s",
self._source, ticker, classification, confidence, reasoning,
)
return SentimentData(
ticker=ticker,
classification=classification,
confidence=confidence,
source=self._source,
timestamp=datetime.now(timezone.utc),
reasoning=reasoning,
citations=self._extract_citations(response) if self._live_search else [],
)
except json.JSONDecodeError as exc:
logger.error("Failed to parse %s JSON for %s: %s", self._source, ticker, exc)
raise ProviderError(f"Invalid JSON from {self._source} for {ticker}") from exc
except ProviderError:
raise
except Exception as exc:
msg = str(exc).lower()
if "429" in msg or "rate" in msg or "quota" in msg or "insufficient" in msg:
raise RateLimitError(f"{self._source} rate limit hit for {ticker}") from exc
logger.error("%s provider error for %s: %s", self._source, ticker, exc)
raise ProviderError(f"{self._source} provider error for {ticker}: {exc}") from exc