DGSoft 93b98cfb5c Initial commit: Team Chat System with Code Snippet Library
- Complete chat application similar to Microsoft Teams
- Code snippet library with syntax highlighting
- Real-time messaging with WebSockets
- File upload with Office integration
- Department-based permissions
- Dark/Light theme support
- Production deployment with SSL/Reverse Proxy
- Docker containerization
- PostgreSQL database with SQLModel ORM
2025-12-09 22:25:03 +01:00

105 lines
3.2 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status, Query
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlmodel import Session, select
from app.config import get_settings
from app.database import get_session
from app.models import User
settings = get_settings()
# Password hashing with Argon2 (modern alternative to bcrypt)
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
# HTTP Bearer token
security = HTTPBearer(auto_error=False)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt
def decode_access_token(token: str) -> Optional[str]:
"""Decode JWT token and return username"""
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
username: str = payload.get("sub")
if username is None:
return None
return username
except JWTError:
return None
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
token_query: Optional[str] = Query(None, alias="token"),
session: Session = Depends(get_session)
) -> User:
"""Get current authenticated user from header or query parameter"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Try to get token from header first, then from query parameter
token = None
if credentials:
token = credentials.credentials
elif token_query:
token = token_query
if not token:
raise credentials_exception
username = decode_access_token(token)
if username is None:
raise credentials_exception
statement = select(User).where(User.username == username)
user = session.exec(statement).first()
if user is None:
raise credentials_exception
# Refresh to avoid returning session object
session.refresh(user)
return user
def authenticate_user(session: Session, username: str, password: str) -> Optional[User]:
"""Authenticate a user"""
statement = select(User).where(User.username == username)
user = session.exec(statement).first()
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user