"""Auth service: registration, login, and JWT token generation.""" from datetime import datetime, timedelta, timezone from jose import jwt from passlib.hash import bcrypt from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.dependencies import JWT_ALGORITHM from app.exceptions import AuthenticationError, AuthorizationError, DuplicateError from app.models.settings import SystemSetting from app.models.user import User async def register(db: AsyncSession, username: str, password: str) -> User: """Register a new user. Checks if registration is enabled via SystemSetting, rejects duplicates, and creates a user with role='user' and has_access=False. """ # Check registration toggle result = await db.execute( select(SystemSetting).where(SystemSetting.key == "registration_enabled") ) setting = result.scalar_one_or_none() if setting is not None and setting.value.lower() == "false": raise AuthorizationError("Registration is closed") # Check duplicate username result = await db.execute(select(User).where(User.username == username)) if result.scalar_one_or_none() is not None: raise DuplicateError(f"Username already exists: {username}") user = User( username=username, password_hash=bcrypt.hash(password), role="user", has_access=False, ) db.add(user) await db.commit() await db.refresh(user) return user async def login(db: AsyncSession, username: str, password: str) -> str: """Authenticate user and return a JWT access token. Returns the same error message for wrong username or wrong password to avoid leaking which field is incorrect. """ result = await db.execute(select(User).where(User.username == username)) user = result.scalar_one_or_none() if user is None or not bcrypt.verify(password, user.password_hash): raise AuthenticationError("Invalid credentials") payload = { "sub": str(user.id), "role": user.role, "exp": datetime.now(timezone.utc) + timedelta(minutes=settings.jwt_expiry_minutes), } token = jwt.encode(payload, settings.jwt_secret, algorithm=JWT_ALGORITHM) return token