live_processing.py
source api real-time async inference
File Path: src/api/live_processing.py
Purpose: Orchestrates real-time video frame ingestion, motion detection, keypoint extraction, and sign language inference.
Overview
This module implements a dual-handler architecture (Producer/Consumer) using asyncio.Queue to process video streams from WebSockets. It handles:
- Frame Ingestion: Decoding binary bytes from WebSockets into OpenCV images.
- Motion Detection: Skipping static frames to save compute.
- Feature Extraction: Concurrent MediaPipe landmarking.
- Inference Orchestration: Sliding window based classification using ONNX models.
Handlers
producer_handler(client_id, websocket, buffer)
Purpose: Receives raw bytes from the client and decodes them.
- Protocol: Receives
bytes. First byte isdraw_mode(toggle), remaining are JPEG-encoded frame. - Decoding: Uses
cv2.imdecodeviaasyncio.to_thread. - Queueing: Drops oldest frames if the buffer is full to maintain low latency.
consumer_handler(client_id, websocket, buffer)
Purpose: The main processing engine for each client connection.
- Initialization: Creates
LandmarkerProcessorasynchronously. - Workflow:
- Polls frames from the buffer.
- Performs motion detection via
MotionDetector. - If motion detected:
- Extracts keypoints via
get_frame_kps. - Updates
client_buffer(size:MAX_SIGN_FRAMES).
- Extracts keypoints via
- If buffer meets
MIN_SIGN_FRAMES:- Runs
onnx_inference. - Applies Softmax and checks
CONFIDENCE_THRESHOLD. - Uses a
sign_historydeque andHISTORY_THRESHOLDto stabilize predictions.
- Runs
- Sends JSON response (landmarks, detected sign, confidence).
Core Functions
get_frame_kps(mp_processor, frame, timestamp_ms=-1)
Purpose: Wrapper for thread-pool based keypoint extraction.
- Returns:
tuple(adjusted_kps, raw_kps).
Constants & Configuration
| Constant | Value | Description |
|---|---|---|
NUM_IDLE_FRAMES | 15 | Frames before switching back to idle state. |
MIN_SIGN_FRAMES | 15 | Minimum frames required for inference. |
MAX_SIGN_FRAMES | 50 | Maximum temporal window (matches SEQ_LEN). |
CONFIDENCE_THRESHOLD | 0.7 | Minimum probability for valid detection. |
HISTORY_THRESHOLD | 2 | Required repetitions in history for output. |
Related Documentation
Depends On:
- mediapipe_utils.py -
LandmarkerProcessor. - cv2_utils.py -
MotionDetector. - model.py -
onnx_inference.
Used By:
- websocket.py - Spawns the handlers.