doc/source/serve/tutorials/video-analysis/README.ipynb
This notebook demonstrates how to build a production-grade video analysis pipeline with Ray Serve. The pipeline processes videos from S3 and extracts:
The system uses SigLIP (google/siglip-so400m-patch14-384) as the vision-language backbone. SigLIP provides a unified embedding space for both images and text, enabling zero-shot classification and retrieval without task-specific fine-tuning.
The pipeline splits work across three Ray Serve deployments, each optimized for its workload:
┌─────────────────────┐
┌───▶│ VideoEncoder │
│ │ (GPU) │
│ ├─────────────────────┤
┌─────────────────────┐ │ │ • SigLIP embedding │
│ VideoAnalyzer │──┤ │ • 16 frames/chunk │
│ (Ingress) │ │ │ • L2 normalization │
├─────────────────────┤ │ └─────────────────────┘
│ • S3 download │ │ num_gpus=1
│ • FFmpeg chunking │ │
│ • Request routing │ │ ┌─────────────────────┐
└─────────────────────┘ └───▶│ MultiDecoder │
num_cpus=6 │ (CPU) │
├─────────────────────┤
│ • Tag classification│
│ • Caption retrieval │
│ • Scene detection │
└─────────────────────┘
num_cpus=1
Request lifecycle:
VideoAnalyzer receives HTTP request with S3 video URIVideoEncoder concurrentlyVideoAnalyzer sends embeddings to MultiDecoder serially (for EMA state continuity)This pipeline has three distinct workloads: a GPU-bound encoder running SigLIP, a CPU-bound decoder doing cosine similarity, and a CPU-heavy ingress running FFmpeg. Traditional serving frameworks force you to bundle these into a single container with fixed resources, wasting GPU when the decoder runs or starving FFmpeg when the encoder dominates.
Ray Serve solves this with heterogeneous resource allocation per deployment. The encoder requests 1 GPU, the decoder requests 1 CPU, and the ingress requests 6 CPUs for parallel FFmpeg. Each deployment scales independently based on its own queue depth—GPU replicas scale with encoding demand while CPU replicas scale separately with decoding demand. The load test demonstrates this: throughput scales near-linearly from 2.4 to 67.5 requests/second as the system provisions replicas to match load.
The pipeline also benefits from zero-copy data transfer. The ingress passes encoder results directly to the decoder as unawaited DeploymentResponse references rather than serialized data. Ray stores the embeddings in its object store, and the decoder retrieves them directly without routing through the ingress. When encoder and decoder land on the same node, this transfer is zero-copy.
Request pipelining keeps the GPU saturated. By allowing two concurrent requests per encoder replica via max_ongoing_requests, one request prepares data on CPU while another computes on GPU. This achieves 100% GPU utilization without batching, which would add latency from waiting for requests to accumulate.
Finally, deployment composition lets you define the encoder, decoder, and ingress as separate classes, then wire them together with .bind(). Ray Serve handles deployment ordering, health checks, and request routing. The ingress maintains explicit state (EMA for scene detection) across chunks, which works correctly even when autoscaling routes requests to different decoder replicas—no sticky sessions required.
Before running this notebook, ensure you have:
| Requirement | Purpose |
|---|---|
| Pexels API key | Download sample video (free at https://www.pexels.com/api/) |
| S3 bucket | Store videos and text embeddings |
| AWS credentials | Read/write access to your S3 bucket |
| ffmpeg | Video processing and frame extraction |
| GPU | Run SigLIP model for encoding (1 GPU minimum) |
Set these environment variables before running:
export PEXELS_API_KEY="your-pexels-api-key"
export S3_BUCKET="your-bucket-name"
export AWS_ACCESS_KEY_ID="..."
export AWS_SECRET_ACCESS_KEY="..."
Note on GPU type: The benchmarks, design choices, and hyperparameters in this notebook were tuned for NVIDIA L4 GPUs. Different GPU types (A10G, T4, A100, etc.) have different memory bandwidth, compute throughput, and batch characteristics. You may need to adjust
max_ongoing_requests, chunk sizes, and concurrency limits for optimal performance on other hardware.
import os
PEXELS_API_KEY = os.environ.get("PEXELS_API_KEY") # Or set directly: "your-api-key"
S3_BUCKET = os.environ.get("S3_BUCKET") # Or set directly: "your-bucket"
Before running the pipeline, we need a sample video in S3. This section downloads a video from Pexels, normalizes it, and uploads to S3.
Why normalize videos? We re-encode all videos to a consistent format:
import asyncio
try:
asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
from scripts.download_stock_videos import download_sample_videos
# Download sample videos (checks for existing manifest first, skips Pexels API if found)
S3_PREFIX = "anyscale-example/stock-videos/"
video_paths = asyncio.get_event_loop().run_until_complete(download_sample_videos(
api_key=PEXELS_API_KEY,
bucket=S3_BUCKET,
total=1, # Just need one sample video
s3_prefix=S3_PREFIX,
overwrite=False
))
if not video_paths:
raise RuntimeError("No videos downloaded")
SAMPLE_VIDEO_URI = video_paths[0]
print(f"\nSample video ready: {SAMPLE_VIDEO_URI}")
The decoder matches video embeddings against precomputed text embeddings for tags and descriptions. We define the text banks here and use a Ray task to compute embeddings on GPU and upload to S3.
from textbanks import TAGS, DESCRIPTIONS
print(f"Tags: {TAGS}")
print(f"Descriptions: {DESCRIPTIONS}")
import ray
from jobs.generate_text_embeddings import generate_embeddings_task
S3_EMBEDDINGS_PREFIX = "anyscale-example/embeddings/"
# Run the Ray task (uses TAGS and DESCRIPTIONS from textbanks module)
# Note: runtime_env ships local modules to worker nodes (job working_dir only applies to driver)
ray.init(runtime_env={"working_dir": "."}, ignore_reinit_error=True)
result = ray.get(generate_embeddings_task.remote(S3_BUCKET, S3_EMBEDDINGS_PREFIX))
print(f"\nText embeddings ready:")
print(f" Tags: {result['tag_embeddings']['s3_uri']}")
print(f" Descriptions: {result['description_embeddings']['s3_uri']}")
This section walks through building the video analysis pipeline step by step, introducing Ray Serve concepts as we go.
The VideoEncoder deployment runs on GPU and converts video frames to embeddings using SigLIP. Key configuration:
num_gpus=1: Each replica requires a dedicated GPUmax_ongoing_requests=2: Allows pipelining—while one request computes on GPU, another prepares data on CPUWhy no request batching? A single chunk (16 frames @ 384×384) already saturates GPU compute. Batching multiple requests would require holding them until a batch forms, adding latency without throughput gain. Instead, we use max_ongoing_requests=2 to pipeline preparation and computation.
Why asyncio.to_thread? Ray Serve deployments run in an async event loop. The encode_frames method is CPU/GPU-bound (PyTorch inference), which would block the event loop and prevent concurrent request handling. Wrapping it in asyncio.to_thread offloads the blocking work to a thread pool, keeping the event loop free to accept new requests.
Why pass DeploymentResponse to decoder? Instead of awaiting the encoder result in VideoAnalyzer and passing raw data to the decoder, we pass the unawaited DeploymentResponse directly. Ray Serve automatically resolves this reference when the decoder needs it, storing the embeddings in the object store. This avoids an unnecessary serialize/deserialize round-trip through VideoAnalyzer—the decoder retrieves data directly from the object store, enabling zero-copy transfer if encoder and decoder are on the same node.
from constants import MODEL_NAME
print(f"MODEL_NAME: {MODEL_NAME}")
from deployments.encoder import VideoEncoder
import inspect
print(inspect.getsource(VideoEncoder.func_or_class))
The MultiDecoder deployment runs on CPU and performs three tasks:
The decoder loads precomputed text embeddings from S3 at startup.
Why separate GPU/CPU deployments? The encoder needs GPU for neural network inference; the decoder only does numpy dot products. Separating them allows independent scaling—encoders are limited by GPU count, decoders scale cheaply with CPU cores. This avoids tying expensive GPU resources to lightweight CPU work.
Why EMA for scene detection? Exponential Moving Average reuses existing SigLIP embeddings without an extra model. The algorithm computes score = 1 - cosine(frame, EMA) where EMA updates as ema = 0.9*ema + 0.1*frame. A simple threshold (score > 0.15) detects abrupt scene changes while smoothing noise.
from deployments.decoder import MultiDecoder
print(inspect.getsource(MultiDecoder.func_or_class))
Before we can process a video, we need to split it into fixed-duration chunks and extract frames. The chunking process:
ffprobeffmpeg processesasyncio.Semaphore to avoid CPU oversubscriptionEach chunk extracts 16 frames uniformly sampled across its duration, resized to 384×384 (matching SigLIP's input size).
Direct S3 download vs presigned URLs: We download the video to local disk before processing. An alternative is generating a presigned S3 URL and passing it directly to FFmpeg. Benchmarks show direct download is faster—FFmpeg's HTTP client doesn't handle S3's chunked responses as efficiently as aioboto3, and network latency compounds across multiple seeks.
Single FFmpeg vs parallel FFmpeg: Two approaches for extracting frames from multiple chunks:
select filter to pick frames at specific timestampsChose the Single FFmpeg approach since it outperforms parallel FFmpeg on longer videos and yields similar performance for typical 10s chunks. This method is both efficient and scalable as chunk counts grow.
Chunk duration: We use 10-second chunks. Shorter chunks increase overhead (more FFmpeg calls, more encoder/decoder round-trips). Longer chunks increases processing efficiency but degrade inference quality—SigLIP processes exactly 16 frames per chunk, so a 60-second chunk samples one frame every 3.75 seconds, missing fast scene changes. The 10-second sweet spot balances throughput with temporal resolution (~1.6 fps sampling).
The VideoAnalyzer ingress deployment orchestrates the encoder and decoder. It uses FastAPI integration with @serve.ingress for HTTP endpoints.
Why num_cpus=6? The analyzer runs FFmpeg for frame extraction. Each FFmpeg process uses FFMPEG_THREADS=2, and we run up to NUM_WORKERS=3 concurrent processes. So 2 × 3 = 6 CPUs ensures FFmpeg doesn't contend for CPU during parallel chunk extraction.
Why max_ongoing_requests=4? The encoder has max_ongoing_requests=2. We want the analyzer to stay ahead: while 2 videos are encoding, we download and chunk 2 more videos so they're ready when encoder slots free up. This keeps the GPU pipeline saturated without excessive memory from queued requests.
Why cache the S3 client? Creating a new aioboto3 client per request adds overhead (connection setup, credential resolution). Caching the client in __init__ and reusing it across requests amortizes this cost. The client is thread-safe and handles connection pooling internally.
Why encode in parallel but decode serially? Encoding is stateless—each chunk's frames go through SigLIP independently, so we fire all chunks concurrently with asyncio.gather. Decoding requires temporal ordering—the EMA for scene detection must process chunks in order (chunk 0's EMA state feeds into chunk 1). The VideoAnalyzer calls the decoder serially, passing EMA state from each response to the next request. This explicit state passing ensures correct scene detection even when multiple decoder replicas exist under autoscaling.
from app import VideoAnalyzer
print(inspect.getsource(VideoAnalyzer.func_or_class))
encoder = VideoEncoder.bind()
decoder = MultiDecoder.bind(bucket=S3_BUCKET, s3_prefix=S3_EMBEDDINGS_PREFIX)
app = VideoAnalyzer.bind(encoder=encoder, decoder=decoder)
from ray import serve
serve.run(app, name="video-analyzer", route_prefix="/", blocking=False)
import httpx
import time
# Wait for deployment to be ready
print("Waiting for deployment to be healthy...")
start = time.time()
while True:
try:
with httpx.Client(timeout=5.0) as client:
response = client.get("http://localhost:8000/health")
if response.status_code == 200:
print(f"Deployment ready in {time.time() - start:.1f}s")
break
except httpx.RequestError:
pass
time.sleep(1.0)
if time.time() - start > 120:
raise TimeoutError("Deployment did not become healthy within 120s")
import httpx
import time
import uuid
# Send a sample request to the deployed service
payload = {
"stream_id": uuid.uuid4().hex[:8],
"video_path": SAMPLE_VIDEO_URI,
"num_frames": 16,
"chunk_duration": 10.0,
}
print(f"Analyzing video: {SAMPLE_VIDEO_URI}")
print(f"Stream ID: {payload['stream_id']}\n")
start = time.perf_counter()
with httpx.Client(timeout=300.0) as client:
response = client.post("http://localhost:8000/analyze", json=payload)
latency_ms = (time.perf_counter() - start) * 1000
result = response.json()
# Print results
print("=" * 60)
print(f"Video duration: {result['video_duration']:.1f}s")
print(f"Chunks processed: {result['num_chunks']}")
print()
print("Top Tags:")
for tag in result["tags"]:
print(f" {tag['score']:.3f} {tag['text']}")
print()
print("Best Caption:")
print(f" {result['retrieval_caption']['score']:.3f} {result['retrieval_caption']['text']}")
print()
print(f"Scene Changes: {result['num_scene_changes']}")
for sc in result["scene_changes"][:5]: # Show first 5
print(f" {sc['timestamp']:6.2f}s score={sc['score']:.3f}")
print()
print("Timing:")
timing = result["timing_ms"]
print(f" S3 download: {timing['s3_download_ms']:7.1f} ms")
print(f" Video decode: {timing['decode_video_ms']:7.1f} ms")
print(f" Encode (GPU): {timing['encode_ms']:7.1f} ms")
print(f" Decode (CPU): {timing['decode_ms']:7.1f} ms")
print(f" Total server: {timing['total_ms']:7.1f} ms")
print(f" Round-trip: {latency_ms:7.1f} ms")
print("=" * 60)
serve.shutdown()
To evaluate the pipeline's performance under realistic conditions, we ran load tests using the client/load_test.py script with concurrency levels ranging from 2 to 64 concurrent requests. The Ray Serve application was configured with autoscaling enabled, allowing replicas to scale dynamically based on demand.
The load test was executed using:
python -m client.load_test --video s3://bucket/video.mp4 --concurrency <N>
Test parameters:
target_num_ongoing_requests)The charts below show autoscaling in action during the concurrency=2 to 64 load test:
As load increases, replicas scale up to maintain target queue depth. The system reaches steady state once enough replicas are provisioned to handle the request rate.
To ensure fair comparison, we discarded the first half of each test run to exclude the autoscaling warm-up period where latencies are elevated due to replica initialization.
| Concurrency | Requests | P50 (ms) | P95 (ms) | P99 (ms) | Throughput (req/s) |
|---|---|---|---|---|---|
| 2 | 152 | 838 | 843 | 844 | 2.37 |
| 4 | 590 | 858 | 984 | 1,025 | 4.68 |
| 8 | 1,103 | 885 | 1,065 | 1,120 | 8.97 |
| 16 | 2,240 | 900 | 1,074 | 1,128 | 17.78 |
| 32 | 4,141 | 928 | 1,095 | 1,151 | 34.75 |
| 64 | 17,283 | 967 | 1,128 | 1,188 | 67.55 |
Key findings:
The chart below shows how processing time is distributed across pipeline stages at each concurrency level:
At concurrency 64 (best throughput):
The CPU decoding stage (tag/caption generation) dominates processing time. S3 download and video decoding are relatively fast due to local caching and optimized FFmpeg settings.
The charts below show throughput scaling and the latency-throughput tradeoff:
Observations:
Linear scaling: Throughput scales almost linearly with concurrency, indicating that autoscaling successfully provisions enough replicas to handle increased load.
Latency-throughput tradeoff: The right chart shows that P95 latency increases slightly as throughput grows (from ~843ms to ~1,128ms), but remains within acceptable bounds. This ~34% latency increase enables a ~28x throughput improvement.
No saturation point: Even at concurrency 64, throughput continues to scale. The system could likely handle higher concurrency with additional GPU resources.
The near-linear scaling demonstrates that the pipeline architecture—with separate GPU encoder, CPU decoder, and CPU-bound ingress—allows each component to scale independently based on its resource requirements.