Files
signal-platform/app/services/auth_service.py
T
dennisthiessen 5a0e8c8258
Deploy / lint (push) Successful in 7s
Deploy / test (push) Successful in 35s
Deploy / deploy (push) Successful in 24s
Fix sidebar username, Signals filter clarity and layout
- JWT now carries a username claim; sidebar shows "Signed in as <name>"
  instead of the bare user id (sub). Re-login required for the new claim.
- Signals: Min R:R / Min Confidence inputs reflect the effective filter —
  auto-filled from the activation gate when "Qualified only" is on, reset
  to 0 when off (no more misleading 0 while the gate is active).
- Signals layout: Run Scanner moved to its own action row (it's a job
  trigger, not a filter); qualified toggle grouped with the refinement
  filters under one Filters panel.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-14 12:11:39 +02:00

68 lines
2.3 KiB
Python

"""Auth service: registration, login, and JWT token generation."""
from datetime import datetime, timedelta, timezone
from jose import jwt
from passlib.hash import bcrypt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.dependencies import JWT_ALGORITHM
from app.exceptions import AuthenticationError, AuthorizationError, DuplicateError
from app.models.settings import SystemSetting
from app.models.user import User
async def register(db: AsyncSession, username: str, password: str) -> User:
"""Register a new user.
Checks if registration is enabled via SystemSetting, rejects duplicates,
and creates a user with role='user' and has_access=False.
"""
# Check registration toggle
result = await db.execute(
select(SystemSetting).where(SystemSetting.key == "registration_enabled")
)
setting = result.scalar_one_or_none()
if setting is not None and setting.value.lower() == "false":
raise AuthorizationError("Registration is closed")
# Check duplicate username
result = await db.execute(select(User).where(User.username == username))
if result.scalar_one_or_none() is not None:
raise DuplicateError(f"Username already exists: {username}")
user = User(
username=username,
password_hash=bcrypt.hash(password),
role="user",
has_access=False,
)
db.add(user)
await db.commit()
await db.refresh(user)
return user
async def login(db: AsyncSession, username: str, password: str) -> str:
"""Authenticate user and return a JWT access token.
Returns the same error message for wrong username or wrong password
to avoid leaking which field is incorrect.
"""
result = await db.execute(select(User).where(User.username == username))
user = result.scalar_one_or_none()
if user is None or not bcrypt.verify(password, user.password_hash):
raise AuthenticationError("Invalid credentials")
payload = {
"sub": str(user.id),
"username": user.username,
"role": user.role,
"exp": datetime.now(timezone.utc) + timedelta(minutes=settings.jwt_expiry_minutes),
}
token = jwt.encode(payload, settings.jwt_secret, algorithm=JWT_ALGORITHM)
return token