source/api/websocket.py

source-code api websocket real-time

File Path: src/api/websocket.py

Purpose: WebSocket handler for real-time Arabic sign language detection. Manages frame reception, motion detection, keypoint extraction, and sign classification.

Overview

This module implements the core real-time recognition pipeline:

  1. Receives video frames via WebSocket
  2. Buffers frames asynchronously
  3. Detects motion to trigger processing
  4. Extracts keypoints using MediaPipe
  5. Runs ONNX inference for sign classification
  6. Sends predictions back to client

State Machine

stateDiagram-v2
    [*] --> Connected: WebSocket Accept
    Connected --> Idle: No Motion (15 frames)
    Idle --> Active: Motion Detected
    Active --> Buffering: Keypoints Extracted
    Buffering --> Inferring: Buffer >= 15 frames
    Inferring --> Buffering: Continue Collecting
    Inferring --> Sending: Consensus Reached
    Sending --> Buffering: Prediction Sent
    Buffering --> Idle: No Motion (15 frames)
    Active --> Idle: No Motion (15 frames)
    Idle --> [*]: Disconnect
    Active --> [*]: Disconnect
    Buffering --> [*]: Disconnect

Dependencies

import asyncio
import gc
import time
from collections import Counter, deque
import fastapi
import numpy as np
import torch
from torch import nn

Internal Imports:

Constants

NUM_IDLE_FRAMES = 15        # Frames without motion before idle state
HISTORY_LEN = 5             # Number of predictions to track
HISTORY_THRESHOLD = 4       # Minimum occurrences for consensus
MIN_SIGN_FRAMES = 15        # Minimum frames for inference
MAX_SIGN_FRAMES = SEQ_LEN   # Maximum frames (50)
CONFIDENCE_THRESHOLD = 0.4  # Minimum confidence to record prediction
EXT_FRAME = ".jpg"          # Frame extension (unused)

Related:

  • SEQ_LEN - Sequence length from constants

Router

websocket_router = fastapi.APIRouter()

Exported To:

  • main.py - Included in main FastAPI app

Functions

get_default_state()

function state-management

Purpose: Creates initial state dictionary for WebSocket client.

Parameters: None

Returns: dict - Client state dictionary

Implementation:

def get_default_state():
    return {
        "is_idle": False,
        "idle_frames_num": 0,
        "sign_history": deque(maxlen=5),
        "last_sent_sign": None,
    }

State Fields:

  • is_idle (bool): Whether client is in idle state (no motion)
  • idle_frames_num (int): Counter for consecutive idle frames
  • sign_history (deque): Last 5 predictions for consensus
  • last_sent_sign (int|None): Last sign sent to client (prevents duplicates)

Called By:

Returns To:


ws_live_signs(websocket: fastapi.WebSocket)

function websocket async main-handler

Type: WebSocket route handler

Route: WebSocket /live-signs

Purpose: Main WebSocket handler for real-time sign language recognition. Manages the entire processing pipeline from frame reception to prediction delivery.

Parameters:

  • websocket (fastapi.WebSocket): WebSocket connection instance

Returns: None (async generator)

Decorator:

@websocket_router.websocket("/live-signs")

Implementation Overview

Flowchart

graph LR
    A[Accept Connection] --> B[Initialize State]
    B --> C[Create Frame Buffer]
    C --> D[Start Producer Task]
    D --> E[Initialize Motion Detector]
    E --> F[Initialize MediaPipe]
    F --> G{Processing Loop}
    
    G --> H{Producer Done?}
    H -->|Yes| Z[Cleanup]
    H -->|No| I{Frame Available?}
    
    I -->|No| J[Sleep 1ms]
    J --> G
    
    I -->|Yes| K[Get Frame]
    K --> L{Motion Detected?}
    
    L -->|No| M[Increment Idle Counter]
    M --> N{Idle Threshold?}
    N -->|Yes| O[Send Idle Status]
    O --> G
    N -->|No| G
    
    L -->|Yes| P[Reset Idle State]
    P --> Q[Extract Keypoints]
    Q --> R[Add to Buffer]
    R --> S{Buffer >= MIN_FRAMES?}
    
    S -->|No| G
    S -->|Yes| T[Run ONNX Inference]
    T --> U[Apply Softmax]
    U --> V{Confidence > Threshold?}
    
    V -->|Yes| W[Add to History]
    W --> X{History Consensus?}
    X -->|Yes| Y[Send Prediction]
    Y --> G
    X -->|No| G
    V -->|No| G
    
    Z --> AA[Cancel Producer]
    AA --> AB[Clear Buffer]
    AB --> AC[Close MediaPipe]
    AC --> AD[Garbage Collect]
    
    style G fill:#e1f5ff
    style T fill:#ffe1e1
    style Y fill:#e1ffe1

Processing Pipeline:

1. Connection Setup

await websocket.accept()
client_id = websocket.client
logger.info(f"Connected client: {client_id}")

Calls:

  • websocket.accept() - Accepts WebSocket connection

2. State Initialization

client_buffer = []
client_state = get_default_state()
frame_buffer = FrameBuffer(MAX_SIGN_FRAMES)
producer_task = asyncio.create_task(producer_handler(websocket, frame_buffer))
motion_detector = MotionDetector()
mp_processor: LandmarkerProcessor = await LandmarkerProcessor.create(True)

Calls:

3. Main Processing Loop

while True:
    if producer_task.done():
        break
    
    if frame_buffer.latest_idx < current_proc_idx:
        await asyncio.sleep(0.001)
        continue
    
    # Get frame and process...

Loop Logic:

  • Checks if producer task is still running
  • Waits for new frames if buffer is empty
  • Processes frames sequentially

4. Motion Detection

has_motion, gray = await asyncio.to_thread(
    motion_detector.detect, prev_gray, frame
)

Calls:

Behavior:

  • If no motion for NUM_IDLE_FRAMES (15): Send idle status, clear buffers
  • If motion detected: Reset idle counter, proceed to keypoint extraction

5. Keypoint Extraction

kps = await get_frame_kps(mp_processor, frame, now_ms)
client_buffer.append(kps)

Calls:

Buffer Management:

  • Maintains buffer between MIN_SIGN_FRAMES (15) and MAX_SIGN_FRAMES (50)
  • Trims old frames if buffer exceeds maximum

6. Inference

input_kps = np.array(client_buffer, dtype=np.float32)
input_kps = input_kps.reshape(1, input_kps.shape[0], -1)
raw_outputs = await asyncio.to_thread(
    onnx_inference, websocket.app.state.onnx_model, [input_kps]
)

Calls:

Input Shape: (1, seq_len, features) Output Shape: (1, 502) - Logits for 502 classes

7. Classification

probs = nn.functional.softmax(torch.Tensor(raw_outputs), dim=0)
pred_idx = int(torch.argmax(probs).item())
confidence = probs[pred_idx].item()
 
if confidence > CONFIDENCE_THRESHOLD:
    client_state["sign_history"].append(pred_idx)

Calls:

  • torch.nn.functional.softmax() - Converts logits to probabilities
  • torch.argmax() - Gets predicted class

Confidence Filtering:

  • Only predictions with confidence > 0.4 are added to history

8. Consensus & Response

if len(client_state["sign_history"]) == HISTORY_LEN:
    most_common_sign, sign_count = Counter(
        client_state["sign_history"]
    ).most_common(1)[0]
    if (
        sign_count >= HISTORY_THRESHOLD
        and most_common_sign != client_state["last_sent_sign"]
    ):
        client_state["last_sent_sign"] = most_common_sign
        await websocket.send_json(
            {
                "detected_sign": {
                    "sign_ar": AR_WORDS[pred_idx],
                    "sign_en": EN_WORDS[pred_idx],
                },
                "confidence": confidence,
            }
        )

Consensus Logic:

  • Waits for 5 predictions (HISTORY_LEN)
  • Requires 4/5 agreement (HISTORY_THRESHOLD)
  • Prevents duplicate sends (checks last_sent_sign)

Response Format:

{
  "detected_sign": {
    "sign_ar": "Arabic word",
    "sign_en": "English translation"
  },
  "confidence": 0.95
}

Calls:

  • websocket.send_json() - Sends prediction to client

Received By:

9. Cleanup

finally:
    logger.info(f"Cleaning up resources for {client_id}")
    
    client_buffer = None
    client_state = None
    
    producer_task.cancel()
    try:
        await producer_task
    except asyncio.CancelledError:
        ...
    
    frame_buffer.clear()
    mp_processor.close()
    
    gc.collect()

Calls:

Exception Handling:

  • WebSocketDisconnect: Client disconnected normally
  • Generic Exception: Logs error and cleans up

Called By:

Calls:


Performance Characteristics

Timing

  • Frame Reception: Async, non-blocking
  • Motion Detection: ~5-10ms per frame
  • Keypoint Extraction: ~20-30ms per frame
  • ONNX Inference: ~10-20ms per sequence
  • Total Latency: ~50-80ms per prediction

Memory

  • Frame Buffer: ~50 frames × 640×480×3 bytes ≈ 46 MB
  • Keypoint Buffer: ~50 frames × 184 × 4 × 4 bytes ≈ 147 KB
  • MediaPipe Models: ~10 MB (loaded once)
  • ONNX Model: ~5 MB (loaded once)

Concurrency

  • Producer Task: Runs in parallel with processing loop
  • Keypoint Extraction: Runs in thread pool executor
  • Inference: Runs in thread pool executor
  • Motion Detection: Runs in thread pool executor

Error Handling

Connection Errors

except fastapi.WebSocketDisconnect:
    logger.info(f"Disconnected client (consumer): {client_id}")

Processing Errors

except Exception as e:
    logger.error(f"Error detecting sign: {e}")
    continue  # Continue processing

Cleanup Errors

try:
    await producer_task
except asyncio.CancelledError:
    ...  # Expected when cancelling

Conceptual:

Source Code:


File Location: src/api/websocket.py

Lines of Code: 182

Last Updated: 2026-01-27