You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
197 lines
6.7 KiB
Python
197 lines
6.7 KiB
Python
"""
|
|
SSE - Async Implementation for ASGI
|
|
Server Side Events, opens connection and keeps it alive, broadcasting new/changed objects to all connected clients
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import asyncio
|
|
from django.http import StreamingHttpResponse, JsonResponse
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.shortcuts import render
|
|
from rest_framework_simplejwt.authentication import JWTAuthentication
|
|
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
|
|
from asgiref.sync import sync_to_async
|
|
import threading
|
|
|
|
# Global updates queue for all objects
|
|
updates_queue = []
|
|
updates_lock = threading.Lock()
|
|
|
|
# Global event ID counter
|
|
event_id_counter = 0
|
|
event_id_lock = threading.Lock()
|
|
|
|
# called from views when an object is changed to broadcast the change to all connected clients
|
|
def sse_broadcast_update(object_name, action, data):
|
|
"""
|
|
Broadcast object update to all connected SSE clients
|
|
object_name: 'agent', 'vessel', 'cargo', and others..
|
|
action: 'created', 'updated', or 'deleted'
|
|
data: serialized object data
|
|
"""
|
|
global event_id_counter
|
|
|
|
# Map action to operation
|
|
operation_map = {
|
|
'created': 'insert',
|
|
'updated': 'update',
|
|
'deleted': 'delete'
|
|
}
|
|
|
|
with event_id_lock:
|
|
event_id_counter += 1
|
|
current_event_id = event_id_counter
|
|
|
|
with updates_lock:
|
|
updates_queue.append({
|
|
'event_id': current_event_id, # Add event ID
|
|
'type': 'data_update',
|
|
'object': object_name,
|
|
'operation': operation_map.get(action, action),
|
|
'data': data,
|
|
'timestamp': time.time()
|
|
})
|
|
|
|
# Keep only last 100 updates
|
|
while len(updates_queue) > 100:
|
|
updates_queue.pop(0)
|
|
|
|
|
|
async def sse_authenticate_request(request):
|
|
"""
|
|
Called from sse_connect, Authenticate SSE request via JWT token from Authorization header or query parameter.
|
|
Returns (user, error_response) - if error_response is not None, return it immediately.
|
|
"""
|
|
# Try to get token from Authorization header
|
|
auth_header = request.headers.get('Authorization', '')
|
|
token_key = None
|
|
|
|
if auth_header.startswith('Bearer '):
|
|
token_key = auth_header[7:]
|
|
|
|
# Fallback: try query parameter (for browsers' EventSource that can't set headers)
|
|
if not token_key:
|
|
token_key = request.GET.get('token')
|
|
|
|
if not token_key:
|
|
return None, JsonResponse({'error': 'Authentication required'}, status=401)
|
|
|
|
try:
|
|
# Validate JWT token - wrap in sync_to_async for DB queries
|
|
jwt_auth = JWTAuthentication()
|
|
validated_token = jwt_auth.get_validated_token(token_key)
|
|
user = await sync_to_async(jwt_auth.get_user)(validated_token)
|
|
return user, None
|
|
except (InvalidToken, TokenError) as e:
|
|
return None, JsonResponse({'error': 'Invalid token'}, status=401)
|
|
|
|
|
|
|
|
# User calls update_sse to establish a sse connection, then he receives updates until disconnect or error
|
|
|
|
@csrf_exempt
|
|
async def sse_connect(request):
|
|
"""SSE endpoint for all object updates (requires authentication)"""
|
|
|
|
# Authenticate the request
|
|
user, error_response = await sse_authenticate_request(request)
|
|
if error_response:
|
|
return error_response
|
|
|
|
# Check for Last-Event-ID header (browser sends this on reconnect)
|
|
last_event_id_header = request.headers.get('Last-Event-ID', None)
|
|
|
|
# On fresh connection (no Last-Event-ID), use current event counter as baseline
|
|
# On reconnection (has Last-Event-ID), use that to replay missed events
|
|
if last_event_id_header:
|
|
last_sent_event_id = int(last_event_id_header)
|
|
print(f"SSE reconnection: Last-Event-ID={last_event_id_header}")
|
|
else:
|
|
# Fresh connection - use current event counter as baseline
|
|
with event_id_lock:
|
|
last_sent_event_id = event_id_counter
|
|
print(f"SSE new connection: Starting from event #{last_sent_event_id}")
|
|
|
|
# Async generator for ASGI compatibility
|
|
async def event_generator():
|
|
nonlocal last_sent_event_id
|
|
|
|
# Send connection established message with event ID
|
|
connected_msg = {
|
|
"type": "connected",
|
|
"message": "Connected to updates stream",
|
|
"current_event_id": last_sent_event_id
|
|
}
|
|
yield f'id: {last_sent_event_id}\n'.encode('utf-8')
|
|
yield f'data: {json.dumps(connected_msg)}\n\n'.encode('utf-8')
|
|
|
|
# Keep connection alive and send new updates
|
|
while True:
|
|
try:
|
|
with updates_lock:
|
|
# Get updates since last event ID
|
|
missed_updates = [u for u in updates_queue if u['event_id'] > last_sent_event_id]
|
|
|
|
if missed_updates:
|
|
print(f"Sending {len(missed_updates)} missed updates (after event #{last_sent_event_id})")
|
|
|
|
for update in missed_updates:
|
|
event_id = update['event_id']
|
|
data = json.dumps(update)
|
|
# Send event with ID in SSE format
|
|
yield f'id: {event_id}\n'.encode('utf-8')
|
|
yield f'data: {data}\n\n'.encode('utf-8')
|
|
last_sent_event_id = event_id
|
|
|
|
# Send keepalive comment to prevent timeout
|
|
yield b': keepalive\n\n'
|
|
|
|
# Use async sleep for proper ASGI compatibility
|
|
await asyncio.sleep(1)
|
|
|
|
except Exception as e:
|
|
print(f"Updates SSE error: {e}")
|
|
break
|
|
|
|
response = StreamingHttpResponse(
|
|
event_generator(),
|
|
content_type='text/event-stream'
|
|
)
|
|
response['Cache-Control'] = 'no-cache'
|
|
response['X-Accel-Buffering'] = 'no'
|
|
|
|
return response
|
|
|
|
# @csrf_exempt
|
|
# def send_message(request):
|
|
# """Endpoint to send a message"""
|
|
# if request.method != 'POST':
|
|
# return JsonResponse({'error': 'POST only'}, status=405)
|
|
|
|
# try:
|
|
# data = json.loads(request.body)
|
|
# message_text = data.get('message', '').strip()
|
|
# sender = data.get('sender', 'Anonymous')
|
|
|
|
# if not message_text:
|
|
# return JsonResponse({'error': 'Empty message'}, status=400)
|
|
|
|
# # Add message to global queue
|
|
# with messages_lock:
|
|
# messages.append({
|
|
# 'type': 'message',
|
|
# 'sender': sender,
|
|
# 'message': message_text,
|
|
# 'timestamp': time.time()
|
|
# })
|
|
|
|
# # Keep only last 100 messages
|
|
# while len(messages) > 100:
|
|
# messages.pop(0)
|
|
|
|
# return JsonResponse({'success': True})
|
|
|
|
# except Exception as e:
|
|
# return JsonResponse({'error': str(e)}, status=500)
|