""" SSE - Async Implementation for ASGI Server Side Events, opens connection and keeps it alive, broadcasting new/changed objects to all connected clients """ import json import time import asyncio from django.http import StreamingHttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from django.shortcuts import render from rest_framework_simplejwt.authentication import JWTAuthentication from rest_framework_simplejwt.exceptions import InvalidToken, TokenError from asgiref.sync import sync_to_async import threading # Global updates queue for all objects updates_queue = [] updates_lock = threading.Lock() # Global event ID counter event_id_counter = 0 event_id_lock = threading.Lock() # called from views when an object is changed to broadcast the change to all connected clients def sse_broadcast_update(object_name, action, data): """ Broadcast object update to all connected SSE clients object_name: 'agent', 'vessel', 'cargo', and others.. action: 'created', 'updated', or 'deleted' data: serialized object data """ global event_id_counter # Map action to operation operation_map = { 'created': 'insert', 'updated': 'update', 'deleted': 'delete' } with event_id_lock: event_id_counter += 1 current_event_id = event_id_counter with updates_lock: updates_queue.append({ 'event_id': current_event_id, # Add event ID 'type': 'data_update', 'object': object_name, 'operation': operation_map.get(action, action), 'data': data, 'timestamp': time.time() }) # Keep only last 100 updates while len(updates_queue) > 100: updates_queue.pop(0) async def sse_authenticate_request(request): """ Called from sse_connect, Authenticate SSE request via JWT token from Authorization header or query parameter. Returns (user, error_response) - if error_response is not None, return it immediately. """ # Try to get token from Authorization header auth_header = request.headers.get('Authorization', '') token_key = None if auth_header.startswith('Bearer '): token_key = auth_header[7:] # Fallback: try query parameter (for browsers' EventSource that can't set headers) if not token_key: token_key = request.GET.get('token') if not token_key: return None, JsonResponse({'error': 'Authentication required'}, status=401) try: # Validate JWT token - wrap in sync_to_async for DB queries jwt_auth = JWTAuthentication() validated_token = jwt_auth.get_validated_token(token_key) user = await sync_to_async(jwt_auth.get_user)(validated_token) return user, None except (InvalidToken, TokenError) as e: return None, JsonResponse({'error': 'Invalid token'}, status=401) # User calls update_sse to establish a sse connection, then he receives updates until disconnect or error @csrf_exempt async def sse_connect(request): """SSE endpoint for all object updates (requires authentication)""" # Authenticate the request user, error_response = await sse_authenticate_request(request) if error_response: return error_response # Check for Last-Event-ID header (browser sends this on reconnect) last_event_id_header = request.headers.get('Last-Event-ID', None) # On fresh connection (no Last-Event-ID), use current event counter as baseline # On reconnection (has Last-Event-ID), use that to replay missed events if last_event_id_header: last_sent_event_id = int(last_event_id_header) print(f"SSE reconnection: Last-Event-ID={last_event_id_header}") else: # Fresh connection - use current event counter as baseline with event_id_lock: last_sent_event_id = event_id_counter print(f"SSE new connection: Starting from event #{last_sent_event_id}") # Async generator for ASGI compatibility async def event_generator(): nonlocal last_sent_event_id # Send connection established message with event ID connected_msg = { "type": "connected", "message": "Connected to updates stream", "current_event_id": last_sent_event_id } yield f'id: {last_sent_event_id}\n'.encode('utf-8') yield f'data: {json.dumps(connected_msg)}\n\n'.encode('utf-8') # Keep connection alive and send new updates while True: try: with updates_lock: # Get updates since last event ID missed_updates = [u for u in updates_queue if u['event_id'] > last_sent_event_id] if missed_updates: print(f"Sending {len(missed_updates)} missed updates (after event #{last_sent_event_id})") for update in missed_updates: event_id = update['event_id'] data = json.dumps(update) # Send event with ID in SSE format yield f'id: {event_id}\n'.encode('utf-8') yield f'data: {data}\n\n'.encode('utf-8') last_sent_event_id = event_id # Send keepalive comment to prevent timeout yield b': keepalive\n\n' # Use async sleep for proper ASGI compatibility await asyncio.sleep(1) except Exception as e: print(f"Updates SSE error: {e}") break response = StreamingHttpResponse( event_generator(), content_type='text/event-stream' ) response['Cache-Control'] = 'no-cache' response['X-Accel-Buffering'] = 'no' return response # @csrf_exempt # def send_message(request): # """Endpoint to send a message""" # if request.method != 'POST': # return JsonResponse({'error': 'POST only'}, status=405) # try: # data = json.loads(request.body) # message_text = data.get('message', '').strip() # sender = data.get('sender', 'Anonymous') # if not message_text: # return JsonResponse({'error': 'Empty message'}, status=400) # # Add message to global queue # with messages_lock: # messages.append({ # 'type': 'message', # 'sender': sender, # 'message': message_text, # 'timestamp': time.time() # }) # # Keep only last 100 messages # while len(messages) > 100: # messages.pop(0) # return JsonResponse({'success': True}) # except Exception as e: # return JsonResponse({'error': str(e)}, status=500)