506 lines
18 KiB
Python
506 lines
18 KiB
Python
"""Admin service: user management, system settings, data cleanup, job control."""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from passlib.hash import bcrypt
|
|
from sqlalchemy import delete, func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.exceptions import DuplicateError, NotFoundError, ValidationError
|
|
from app.models.fundamental import FundamentalData
|
|
from app.models.ohlcv import OHLCVRecord
|
|
from app.models.score import CompositeScore, DimensionScore
|
|
from app.models.sentiment import SentimentScore
|
|
from app.models.sr_level import SRLevel
|
|
from app.models.settings import SystemSetting
|
|
from app.models.ticker import Ticker
|
|
from app.models.trade_setup import TradeSetup
|
|
from app.models.user import User
|
|
|
|
RECOMMENDATION_CONFIG_DEFAULTS: dict[str, float] = {
|
|
"recommendation_high_confidence_threshold": 70.0,
|
|
"recommendation_moderate_confidence_threshold": 50.0,
|
|
"recommendation_confidence_diff_threshold": 20.0,
|
|
"recommendation_signal_alignment_weight": 0.15,
|
|
"recommendation_sr_strength_weight": 0.20,
|
|
"recommendation_distance_penalty_factor": 0.10,
|
|
"recommendation_momentum_technical_divergence_threshold": 30.0,
|
|
"recommendation_fundamental_technical_divergence_threshold": 40.0,
|
|
}
|
|
|
|
DEFAULT_TICKER_UNIVERSE = "sp500"
|
|
SUPPORTED_TICKER_UNIVERSES = {"sp500", "nasdaq100", "nasdaq_all"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# User management
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def list_users(db: AsyncSession) -> list[User]:
|
|
"""Return all users ordered by id."""
|
|
result = await db.execute(select(User).order_by(User.id))
|
|
return list(result.scalars().all())
|
|
|
|
|
|
async def create_user(
|
|
db: AsyncSession,
|
|
username: str,
|
|
password: str,
|
|
role: str = "user",
|
|
has_access: bool = False,
|
|
) -> User:
|
|
"""Create a new user account (admin action)."""
|
|
result = await db.execute(select(User).where(User.username == username))
|
|
if result.scalar_one_or_none() is not None:
|
|
raise DuplicateError(f"Username already exists: {username}")
|
|
|
|
user = User(
|
|
username=username,
|
|
password_hash=bcrypt.hash(password),
|
|
role=role,
|
|
has_access=has_access,
|
|
)
|
|
db.add(user)
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
async def set_user_access(db: AsyncSession, user_id: int, has_access: bool) -> User:
|
|
"""Grant or revoke API access for a user."""
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if user is None:
|
|
raise NotFoundError(f"User not found: {user_id}")
|
|
|
|
user.has_access = has_access
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
async def reset_password(db: AsyncSession, user_id: int, new_password: str) -> User:
|
|
"""Reset a user's password."""
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if user is None:
|
|
raise NotFoundError(f"User not found: {user_id}")
|
|
|
|
user.password_hash = bcrypt.hash(new_password)
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Registration toggle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def toggle_registration(db: AsyncSession, enabled: bool) -> SystemSetting:
|
|
"""Enable or disable user registration via SystemSetting."""
|
|
result = await db.execute(
|
|
select(SystemSetting).where(SystemSetting.key == "registration_enabled")
|
|
)
|
|
setting = result.scalar_one_or_none()
|
|
value = str(enabled).lower()
|
|
|
|
if setting is None:
|
|
setting = SystemSetting(key="registration_enabled", value=value)
|
|
db.add(setting)
|
|
else:
|
|
setting.value = value
|
|
|
|
await db.commit()
|
|
await db.refresh(setting)
|
|
return setting
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# System settings CRUD
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def list_settings(db: AsyncSession) -> list[SystemSetting]:
|
|
"""Return all system settings."""
|
|
result = await db.execute(select(SystemSetting).order_by(SystemSetting.key))
|
|
return list(result.scalars().all())
|
|
|
|
|
|
async def update_setting(db: AsyncSession, key: str, value: str) -> SystemSetting:
|
|
"""Create or update a system setting."""
|
|
result = await db.execute(
|
|
select(SystemSetting).where(SystemSetting.key == key)
|
|
)
|
|
setting = result.scalar_one_or_none()
|
|
|
|
if setting is None:
|
|
setting = SystemSetting(key=key, value=value)
|
|
db.add(setting)
|
|
else:
|
|
setting.value = value
|
|
|
|
await db.commit()
|
|
await db.refresh(setting)
|
|
return setting
|
|
|
|
|
|
def _recommendation_public_to_storage_key(key: str) -> str:
|
|
return f"recommendation_{key}"
|
|
|
|
|
|
async def get_recommendation_config(db: AsyncSession) -> dict[str, float]:
|
|
result = await db.execute(
|
|
select(SystemSetting).where(SystemSetting.key.like("recommendation_%"))
|
|
)
|
|
rows = result.scalars().all()
|
|
|
|
config = dict(RECOMMENDATION_CONFIG_DEFAULTS)
|
|
for row in rows:
|
|
try:
|
|
config[row.key] = float(row.value)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
|
|
return {
|
|
"high_confidence_threshold": config["recommendation_high_confidence_threshold"],
|
|
"moderate_confidence_threshold": config["recommendation_moderate_confidence_threshold"],
|
|
"confidence_diff_threshold": config["recommendation_confidence_diff_threshold"],
|
|
"signal_alignment_weight": config["recommendation_signal_alignment_weight"],
|
|
"sr_strength_weight": config["recommendation_sr_strength_weight"],
|
|
"distance_penalty_factor": config["recommendation_distance_penalty_factor"],
|
|
"momentum_technical_divergence_threshold": config["recommendation_momentum_technical_divergence_threshold"],
|
|
"fundamental_technical_divergence_threshold": config["recommendation_fundamental_technical_divergence_threshold"],
|
|
}
|
|
|
|
|
|
async def update_recommendation_config(
|
|
db: AsyncSession,
|
|
payload: dict[str, float],
|
|
) -> dict[str, float]:
|
|
for public_key, public_value in payload.items():
|
|
storage_key = _recommendation_public_to_storage_key(public_key)
|
|
await update_setting(db, storage_key, str(public_value))
|
|
|
|
return await get_recommendation_config(db)
|
|
|
|
|
|
async def get_ticker_universe_default(db: AsyncSession) -> dict[str, str]:
|
|
result = await db.execute(
|
|
select(SystemSetting).where(SystemSetting.key == "ticker_universe_default")
|
|
)
|
|
setting = result.scalar_one_or_none()
|
|
universe = setting.value if setting else DEFAULT_TICKER_UNIVERSE
|
|
if universe not in SUPPORTED_TICKER_UNIVERSES:
|
|
universe = DEFAULT_TICKER_UNIVERSE
|
|
return {"universe": universe}
|
|
|
|
|
|
async def update_ticker_universe_default(db: AsyncSession, universe: str) -> dict[str, str]:
|
|
normalised = universe.strip().lower()
|
|
if normalised not in SUPPORTED_TICKER_UNIVERSES:
|
|
supported = ", ".join(sorted(SUPPORTED_TICKER_UNIVERSES))
|
|
raise ValidationError(f"Unsupported ticker universe '{universe}'. Supported: {supported}")
|
|
|
|
await update_setting(db, "ticker_universe_default", normalised)
|
|
return {"universe": normalised}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data cleanup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def cleanup_data(db: AsyncSession, older_than_days: int) -> dict[str, int]:
|
|
"""Delete OHLCV, sentiment, and fundamental records older than N days.
|
|
|
|
Preserves tickers, users, and latest scores.
|
|
Returns a dict with counts of deleted records per table.
|
|
"""
|
|
cutoff = datetime.now(timezone.utc) - timedelta(days=older_than_days)
|
|
counts: dict[str, int] = {}
|
|
|
|
# OHLCV — date column is a date, compare with cutoff date
|
|
result = await db.execute(
|
|
delete(OHLCVRecord).where(OHLCVRecord.date < cutoff.date())
|
|
)
|
|
counts["ohlcv"] = result.rowcount # type: ignore[assignment]
|
|
|
|
# Sentiment — timestamp is datetime
|
|
result = await db.execute(
|
|
delete(SentimentScore).where(SentimentScore.timestamp < cutoff)
|
|
)
|
|
counts["sentiment"] = result.rowcount # type: ignore[assignment]
|
|
|
|
# Fundamentals — fetched_at is datetime
|
|
result = await db.execute(
|
|
delete(FundamentalData).where(FundamentalData.fetched_at < cutoff)
|
|
)
|
|
counts["fundamentals"] = result.rowcount # type: ignore[assignment]
|
|
|
|
await db.commit()
|
|
return counts
|
|
|
|
|
|
async def get_pipeline_readiness(db: AsyncSession) -> list[dict]:
|
|
"""Return per-ticker readiness snapshot for ingestion/scoring/scanner pipeline."""
|
|
tickers_result = await db.execute(select(Ticker).order_by(Ticker.symbol.asc()))
|
|
tickers = list(tickers_result.scalars().all())
|
|
|
|
if not tickers:
|
|
return []
|
|
|
|
ticker_ids = [ticker.id for ticker in tickers]
|
|
|
|
ohlcv_stats_result = await db.execute(
|
|
select(
|
|
OHLCVRecord.ticker_id,
|
|
func.count(OHLCVRecord.id),
|
|
func.max(OHLCVRecord.date),
|
|
)
|
|
.where(OHLCVRecord.ticker_id.in_(ticker_ids))
|
|
.group_by(OHLCVRecord.ticker_id)
|
|
)
|
|
ohlcv_stats = {
|
|
ticker_id: {
|
|
"bars": int(count or 0),
|
|
"last_date": max_date.isoformat() if max_date else None,
|
|
}
|
|
for ticker_id, count, max_date in ohlcv_stats_result.all()
|
|
}
|
|
|
|
dim_rows_result = await db.execute(
|
|
select(DimensionScore).where(DimensionScore.ticker_id.in_(ticker_ids))
|
|
)
|
|
dim_map_by_ticker: dict[int, dict[str, tuple[float | None, bool]]] = {}
|
|
for row in dim_rows_result.scalars().all():
|
|
dim_map_by_ticker.setdefault(row.ticker_id, {})[row.dimension] = (row.score, row.is_stale)
|
|
|
|
sr_counts_result = await db.execute(
|
|
select(SRLevel.ticker_id, func.count(SRLevel.id))
|
|
.where(SRLevel.ticker_id.in_(ticker_ids))
|
|
.group_by(SRLevel.ticker_id)
|
|
)
|
|
sr_counts = {ticker_id: int(count or 0) for ticker_id, count in sr_counts_result.all()}
|
|
|
|
sentiment_stats_result = await db.execute(
|
|
select(
|
|
SentimentScore.ticker_id,
|
|
func.count(SentimentScore.id),
|
|
func.max(SentimentScore.timestamp),
|
|
)
|
|
.where(SentimentScore.ticker_id.in_(ticker_ids))
|
|
.group_by(SentimentScore.ticker_id)
|
|
)
|
|
sentiment_stats = {
|
|
ticker_id: {
|
|
"count": int(count or 0),
|
|
"last_at": max_ts.isoformat() if max_ts else None,
|
|
}
|
|
for ticker_id, count, max_ts in sentiment_stats_result.all()
|
|
}
|
|
|
|
fundamentals_result = await db.execute(
|
|
select(FundamentalData.ticker_id, FundamentalData.fetched_at)
|
|
.where(FundamentalData.ticker_id.in_(ticker_ids))
|
|
)
|
|
fundamentals_map = {
|
|
ticker_id: fetched_at.isoformat() if fetched_at else None
|
|
for ticker_id, fetched_at in fundamentals_result.all()
|
|
}
|
|
|
|
composites_result = await db.execute(
|
|
select(CompositeScore.ticker_id, CompositeScore.is_stale)
|
|
.where(CompositeScore.ticker_id.in_(ticker_ids))
|
|
)
|
|
composites_map = {
|
|
ticker_id: is_stale
|
|
for ticker_id, is_stale in composites_result.all()
|
|
}
|
|
|
|
setup_counts_result = await db.execute(
|
|
select(TradeSetup.ticker_id, func.count(TradeSetup.id))
|
|
.where(TradeSetup.ticker_id.in_(ticker_ids))
|
|
.group_by(TradeSetup.ticker_id)
|
|
)
|
|
setup_counts = {ticker_id: int(count or 0) for ticker_id, count in setup_counts_result.all()}
|
|
|
|
readiness: list[dict] = []
|
|
for ticker in tickers:
|
|
ohlcv = ohlcv_stats.get(ticker.id, {"bars": 0, "last_date": None})
|
|
ohlcv_bars = int(ohlcv["bars"])
|
|
ohlcv_last_date = ohlcv["last_date"]
|
|
|
|
dim_map = dim_map_by_ticker.get(ticker.id, {})
|
|
|
|
sr_count = int(sr_counts.get(ticker.id, 0))
|
|
|
|
sentiment = sentiment_stats.get(ticker.id, {"count": 0, "last_at": None})
|
|
sentiment_count = int(sentiment["count"])
|
|
sentiment_last_at = sentiment["last_at"]
|
|
|
|
fundamentals_fetched_at = fundamentals_map.get(ticker.id)
|
|
has_fundamentals = ticker.id in fundamentals_map
|
|
|
|
has_composite = ticker.id in composites_map
|
|
composite_stale = composites_map.get(ticker.id)
|
|
|
|
setup_count = int(setup_counts.get(ticker.id, 0))
|
|
|
|
missing_reasons: list[str] = []
|
|
if ohlcv_bars < 30:
|
|
missing_reasons.append("insufficient_ohlcv_bars(<30)")
|
|
if "technical" not in dim_map or dim_map["technical"][0] is None:
|
|
missing_reasons.append("missing_technical")
|
|
if "momentum" not in dim_map or dim_map["momentum"][0] is None:
|
|
missing_reasons.append("missing_momentum")
|
|
if "sr_quality" not in dim_map or dim_map["sr_quality"][0] is None:
|
|
missing_reasons.append("missing_sr_quality")
|
|
if sentiment_count == 0:
|
|
missing_reasons.append("missing_sentiment")
|
|
if not has_fundamentals:
|
|
missing_reasons.append("missing_fundamentals")
|
|
if not has_composite:
|
|
missing_reasons.append("missing_composite")
|
|
if setup_count == 0:
|
|
missing_reasons.append("missing_trade_setup")
|
|
|
|
readiness.append(
|
|
{
|
|
"symbol": ticker.symbol,
|
|
"ohlcv_bars": ohlcv_bars,
|
|
"ohlcv_last_date": ohlcv_last_date,
|
|
"dimensions": {
|
|
"technical": dim_map.get("technical", (None, True))[0],
|
|
"sr_quality": dim_map.get("sr_quality", (None, True))[0],
|
|
"sentiment": dim_map.get("sentiment", (None, True))[0],
|
|
"fundamental": dim_map.get("fundamental", (None, True))[0],
|
|
"momentum": dim_map.get("momentum", (None, True))[0],
|
|
},
|
|
"sentiment_count": sentiment_count,
|
|
"sentiment_last_at": sentiment_last_at,
|
|
"has_fundamentals": has_fundamentals,
|
|
"fundamentals_fetched_at": fundamentals_fetched_at,
|
|
"sr_level_count": sr_count,
|
|
"has_composite": has_composite,
|
|
"composite_stale": composite_stale,
|
|
"trade_setup_count": setup_count,
|
|
"missing_reasons": missing_reasons,
|
|
"ready_for_scanner": ohlcv_bars >= 15 and sr_count > 0,
|
|
}
|
|
)
|
|
|
|
return readiness
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Job control (placeholder — scheduler is Task 12.1)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
VALID_JOB_NAMES = {
|
|
"data_collector",
|
|
"sentiment_collector",
|
|
"fundamental_collector",
|
|
"rr_scanner",
|
|
"ticker_universe_sync",
|
|
}
|
|
|
|
JOB_LABELS = {
|
|
"data_collector": "Data Collector (OHLCV)",
|
|
"sentiment_collector": "Sentiment Collector",
|
|
"fundamental_collector": "Fundamental Collector",
|
|
"rr_scanner": "R:R Scanner",
|
|
"ticker_universe_sync": "Ticker Universe Sync",
|
|
}
|
|
|
|
|
|
async def list_jobs(db: AsyncSession) -> list[dict]:
|
|
"""Return status of all scheduled jobs."""
|
|
from app.scheduler import get_job_runtime_snapshot, scheduler
|
|
|
|
jobs_out = []
|
|
for name in sorted(VALID_JOB_NAMES):
|
|
# Check enabled setting
|
|
key = f"job_{name}_enabled"
|
|
result = await db.execute(
|
|
select(SystemSetting).where(SystemSetting.key == key)
|
|
)
|
|
setting = result.scalar_one_or_none()
|
|
enabled = setting.value == "true" if setting else True # default enabled
|
|
|
|
# Get scheduler job info
|
|
job = scheduler.get_job(name)
|
|
next_run = None
|
|
if job and job.next_run_time:
|
|
next_run = job.next_run_time.isoformat()
|
|
|
|
runtime = get_job_runtime_snapshot(name)
|
|
|
|
jobs_out.append({
|
|
"name": name,
|
|
"label": JOB_LABELS.get(name, name),
|
|
"enabled": enabled,
|
|
"next_run_at": next_run,
|
|
"registered": job is not None,
|
|
"running": bool(runtime.get("running", False)),
|
|
"runtime_status": runtime.get("status"),
|
|
"runtime_processed": runtime.get("processed"),
|
|
"runtime_total": runtime.get("total"),
|
|
"runtime_progress_pct": runtime.get("progress_pct"),
|
|
"runtime_current_ticker": runtime.get("current_ticker"),
|
|
"runtime_started_at": runtime.get("started_at"),
|
|
"runtime_finished_at": runtime.get("finished_at"),
|
|
"runtime_message": runtime.get("message"),
|
|
})
|
|
|
|
return jobs_out
|
|
|
|
|
|
async def trigger_job(db: AsyncSession, job_name: str) -> dict[str, str]:
|
|
"""Trigger a manual job run via the scheduler.
|
|
|
|
Runs the job immediately (in addition to its regular schedule).
|
|
"""
|
|
if job_name not in VALID_JOB_NAMES:
|
|
raise ValidationError(f"Unknown job: {job_name}. Valid jobs: {', '.join(sorted(VALID_JOB_NAMES))}")
|
|
|
|
from app.scheduler import get_job_runtime_snapshot, scheduler
|
|
|
|
runtime_target = get_job_runtime_snapshot(job_name)
|
|
if runtime_target.get("running"):
|
|
return {
|
|
"job": job_name,
|
|
"status": "busy",
|
|
"message": f"Job '{job_name}' is already running",
|
|
}
|
|
|
|
all_runtime = get_job_runtime_snapshot()
|
|
for running_name, runtime in all_runtime.items():
|
|
if running_name == job_name:
|
|
continue
|
|
if runtime.get("running"):
|
|
return {
|
|
"job": job_name,
|
|
"status": "blocked",
|
|
"message": f"Cannot trigger '{job_name}' while '{running_name}' is running",
|
|
}
|
|
|
|
job = scheduler.get_job(job_name)
|
|
if job is None:
|
|
return {"job": job_name, "status": "not_found", "message": f"Job '{job_name}' is not registered in the scheduler"}
|
|
|
|
job.modify(next_run_time=None) # Reset, then trigger immediately
|
|
from datetime import datetime, timezone
|
|
job.modify(next_run_time=datetime.now(timezone.utc))
|
|
|
|
return {"job": job_name, "status": "triggered", "message": f"Job '{job_name}' triggered for immediate execution"}
|
|
|
|
|
|
async def toggle_job(db: AsyncSession, job_name: str, enabled: bool) -> SystemSetting:
|
|
"""Enable or disable a scheduled job by storing state in SystemSetting.
|
|
|
|
Actual scheduler integration happens in Task 12.1.
|
|
"""
|
|
if job_name not in VALID_JOB_NAMES:
|
|
raise ValidationError(f"Unknown job: {job_name}. Valid jobs: {', '.join(sorted(VALID_JOB_NAMES))}")
|
|
|
|
key = f"job_{job_name}_enabled"
|
|
return await update_setting(db, key, str(enabled).lower())
|