mirror of
https://github.com/OHV-IT/collabrix.git
synced 2025-12-15 16:48:36 +01:00
- Added complete Kanban board functionality with drag-and-drop - Implemented auto-save for Kanban card editing (no more edit button) - Added route persistence to remember last visited page on reload - Improved Kanban UI design with slimmer borders and compact layout - Added checklist functionality for Kanban cards - Enhanced file upload and direct messaging features - Improved authentication and user management - Added toast notifications system - Various UI/UX improvements and bug fixes
141 lines
4.3 KiB
Python
141 lines
4.3 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlmodel import Session, select
|
|
from typing import List
|
|
from app.database import get_session
|
|
from app.models import Channel, Department, User, KanbanBoard, KanbanColumn
|
|
from app.schemas import ChannelCreate, ChannelResponse
|
|
from app.auth import get_current_user
|
|
|
|
router = APIRouter(prefix="/channels", tags=["Channels"])
|
|
|
|
|
|
@router.post("/", response_model=ChannelResponse, status_code=status.HTTP_201_CREATED)
|
|
def create_channel(
|
|
channel_data: ChannelCreate,
|
|
session: Session = Depends(get_session),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Create a new channel"""
|
|
# Check if department exists
|
|
department = session.get(Department, channel_data.department_id)
|
|
if not department:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Department not found"
|
|
)
|
|
|
|
new_channel = Channel(
|
|
name=channel_data.name,
|
|
description=channel_data.description,
|
|
department_id=channel_data.department_id
|
|
)
|
|
|
|
session.add(new_channel)
|
|
session.commit()
|
|
session.refresh(new_channel)
|
|
|
|
# Automatically create a Kanban board for the new channel
|
|
kanban_board = KanbanBoard(
|
|
channel_id=new_channel.id,
|
|
name=f"Kanban Board"
|
|
)
|
|
|
|
session.add(kanban_board)
|
|
session.commit()
|
|
session.refresh(kanban_board)
|
|
|
|
# Create the 4 standard columns
|
|
default_columns = [
|
|
("ToDo", 0),
|
|
("In Progress", 1),
|
|
("Waiting", 2),
|
|
("Done", 3)
|
|
]
|
|
|
|
for name, position in default_columns:
|
|
column = KanbanColumn(
|
|
board_id=kanban_board.id,
|
|
name=name,
|
|
position=position
|
|
)
|
|
session.add(column)
|
|
|
|
session.commit()
|
|
|
|
return new_channel
|
|
|
|
|
|
@router.get("/", response_model=List[ChannelResponse])
|
|
def get_my_channels(
|
|
session: Session = Depends(get_session),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get all channels that current user has access to (based on departments)"""
|
|
# Get user with departments
|
|
statement = select(User).where(User.id == current_user.id)
|
|
user = session.exec(statement).first()
|
|
|
|
if not user or not user.departments:
|
|
return []
|
|
|
|
# Get all channels from user's departments
|
|
channels = []
|
|
for dept in user.departments:
|
|
statement = select(Channel).where(Channel.department_id == dept.id)
|
|
dept_channels = session.exec(statement).all()
|
|
channels.extend(dept_channels)
|
|
|
|
return channels
|
|
|
|
|
|
@router.get("/{channel_id}", response_model=ChannelResponse)
|
|
def get_channel(
|
|
channel_id: int,
|
|
session: Session = Depends(get_session),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get a specific channel"""
|
|
channel = session.get(Channel, channel_id)
|
|
if not channel:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Channel not found"
|
|
)
|
|
|
|
# Check if user has access to this channel (via department)
|
|
statement = select(User).where(User.id == current_user.id)
|
|
user = session.exec(statement).first()
|
|
|
|
user_dept_ids = [dept.id for dept in user.departments] if user else []
|
|
if channel.department_id not in user_dept_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this channel"
|
|
)
|
|
|
|
return channel
|
|
|
|
|
|
@router.get("/department/{department_id}", response_model=List[ChannelResponse])
|
|
def get_channels_by_department(
|
|
department_id: int,
|
|
session: Session = Depends(get_session),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get all channels in a department"""
|
|
# Check if user has access to this department
|
|
statement = select(User).where(User.id == current_user.id)
|
|
user = session.exec(statement).first()
|
|
|
|
user_dept_ids = [dept.id for dept in user.departments] if user else []
|
|
if department_id not in user_dept_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this department"
|
|
)
|
|
|
|
statement = select(Channel).where(Channel.department_id == department_id)
|
|
channels = session.exec(statement).all()
|
|
|
|
return channels
|