mirror of
https://github.com/OHV-IT/collabrix.git
synced 2025-12-16 09:08:36 +01:00
- Implement unread message indicators with Material-UI icons - Add BlinkingEnvelope component with theme-compatible colors - Create UnreadMessagesContext for managing unread states - Integrate WebSocket message handling for real-time notifications - Icons only appear for inactive channels/DMs, disappear when opened - Add test functionality (double-click to mark as unread) - Fix WebSocket URL handling for production deployment - Unify WebSocket architecture using presence connection for all messages
122 lines
4.8 KiB
Python
122 lines
4.8 KiB
Python
from fastapi import WebSocket, WebSocketDisconnect
|
|
from typing import Dict, List, Optional
|
|
import json
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
class ConnectionManager:
|
|
def __init__(self):
|
|
# Maps channel_id to list of WebSocket connections
|
|
self.active_connections: Dict[int, List[WebSocket]] = {}
|
|
# Maps user_id to their connection info
|
|
self.user_connections: Dict[int, Dict] = {}
|
|
|
|
async def connect(self, websocket: WebSocket, channel_id: int, user_id: int):
|
|
"""Accept a new WebSocket connection for a channel"""
|
|
await websocket.accept()
|
|
if channel_id not in self.active_connections:
|
|
self.active_connections[channel_id] = []
|
|
self.active_connections[channel_id].append(websocket)
|
|
|
|
# Track user connection
|
|
self.user_connections[user_id] = {
|
|
'websocket': websocket,
|
|
'channel_id': channel_id,
|
|
'last_activity': time.time(),
|
|
'connected_at': time.time()
|
|
}
|
|
|
|
def disconnect(self, websocket: WebSocket, channel_id: int, user_id: int):
|
|
"""Remove a WebSocket connection"""
|
|
if channel_id in self.active_connections:
|
|
if websocket in self.active_connections[channel_id]:
|
|
self.active_connections[channel_id].remove(websocket)
|
|
|
|
# Clean up empty channel lists
|
|
if not self.active_connections[channel_id]:
|
|
del self.active_connections[channel_id]
|
|
|
|
# Remove user connection
|
|
if user_id in self.user_connections:
|
|
del self.user_connections[user_id]
|
|
|
|
def update_activity(self, user_id: int):
|
|
"""Update last activity time for a user"""
|
|
if user_id in self.user_connections:
|
|
self.user_connections[user_id]['last_activity'] = time.time()
|
|
|
|
def get_user_status(self, user_id: int) -> str:
|
|
"""Get user online status"""
|
|
if user_id not in self.user_connections:
|
|
return 'offline'
|
|
|
|
# User is online as long as they have an active connection
|
|
return 'online'
|
|
|
|
def get_all_user_statuses(self) -> Dict[int, str]:
|
|
"""Get status for all users"""
|
|
statuses = {}
|
|
for user_id in self.user_connections:
|
|
statuses[user_id] = self.get_user_status(user_id)
|
|
return statuses
|
|
|
|
async def send_personal_message(self, message: str, websocket: WebSocket):
|
|
"""Send a message to a specific WebSocket"""
|
|
await websocket.send_text(message)
|
|
|
|
async def broadcast_to_channel(self, message: dict, channel_id: int):
|
|
"""Broadcast a message to all connections in a channel"""
|
|
if channel_id in self.active_connections:
|
|
message_str = json.dumps(message)
|
|
disconnected = []
|
|
|
|
for connection in self.active_connections[channel_id]:
|
|
try:
|
|
await connection.send_text(message_str)
|
|
except Exception:
|
|
# Mark for removal if send fails
|
|
disconnected.append(connection)
|
|
|
|
# Also broadcast to channel 0 (global listeners) for messages
|
|
if message.get("type") in ["message", "direct_message"] and 0 in self.active_connections:
|
|
for connection in self.active_connections[0]:
|
|
try:
|
|
await connection.send_text(message_str)
|
|
except Exception:
|
|
pass
|
|
|
|
async def broadcast_user_status_update(self, user_id: int, status: str):
|
|
"""Broadcast user status update to all connected clients"""
|
|
message = {
|
|
"type": "user_status_update",
|
|
"user_id": user_id,
|
|
"status": status,
|
|
"timestamp": time.time()
|
|
}
|
|
|
|
# Broadcast to all channels (presence connections are on channel 0)
|
|
for channel_id in self.active_connections:
|
|
message_str = json.dumps(message)
|
|
disconnected = []
|
|
|
|
for connection in self.active_connections[channel_id]:
|
|
try:
|
|
await connection.send_text(message_str)
|
|
except Exception:
|
|
disconnected.append(connection)
|
|
|
|
# Clean up disconnected clients
|
|
for connection in disconnected:
|
|
user_id_to_remove = None
|
|
for uid, conn_info in self.user_connections.items():
|
|
if conn_info['websocket'] == connection:
|
|
user_id_to_remove = uid
|
|
break
|
|
if user_id_to_remove:
|
|
self.disconnect(connection, channel_id, user_id_to_remove)
|
|
|
|
|
|
# Global connection manager instance
|
|
manager = ConnectionManager()
|