from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status, Query from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlmodel import Session, select from app.config import get_settings from app.database import get_session from app.models import User settings = get_settings() # Password hashing with Argon2 (modern alternative to bcrypt) pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") # HTTP Bearer token security = HTTPBearer(auto_error=False) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against a hash""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create a JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) return encoded_jwt def decode_access_token(token: str) -> Optional[str]: """Decode JWT token and return username""" try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) username: str = payload.get("sub") if username is None: return None return username except JWTError: return None async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), token_query: Optional[str] = Query(None, alias="token"), session: Session = Depends(get_session) ) -> User: """Get current authenticated user from header or query parameter""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) # Try to get token from header first, then from query parameter token = None if credentials: token = credentials.credentials elif token_query: token = token_query if not token: raise credentials_exception username = decode_access_token(token) if username is None: raise credentials_exception statement = select(User).where(User.username == username) user = session.exec(statement).first() if user is None: raise credentials_exception # Refresh to avoid returning session object session.refresh(user) return user def authenticate_user(session: Session, username: str, password: str) -> Optional[User]: """Authenticate a user""" statement = select(User).where(User.username == username) user = session.exec(statement).first() if not user: return None if not verify_password(password, user.hashed_password): return None return user