Back to Ray

Video analysis inference pipeline with Ray Serve

doc/source/serve/tutorials/video-analysis/README.ipynb

1.13.122.7 KB
Original Source

Video analysis inference pipeline with Ray Serve

This notebook demonstrates how to build a production-grade video analysis pipeline with Ray Serve. The pipeline processes videos from S3 and extracts:

The system uses SigLIP (google/siglip-so400m-patch14-384) as the vision-language backbone. SigLIP provides a unified embedding space for both images and text, enabling zero-shot classification and retrieval without task-specific fine-tuning.

Architecture

The pipeline splits work across three Ray Serve deployments, each optimized for its workload:

                              ┌─────────────────────┐
                         ┌───▶│  VideoEncoder       │
                         │    │      (GPU)          │
                         │    ├─────────────────────┤
┌─────────────────────┐  │    │ • SigLIP embedding  │
│  VideoAnalyzer      │──┤    │ • 16 frames/chunk   │
│     (Ingress)       │  │    │ • L2 normalization  │
├─────────────────────┤  │    └─────────────────────┘
│ • S3 download       │  │          num_gpus=1
│ • FFmpeg chunking   │  │
│ • Request routing   │  │    ┌─────────────────────┐
└─────────────────────┘  └───▶│  MultiDecoder       │
      num_cpus=6              │      (CPU)          │
                              ├─────────────────────┤
                              │ • Tag classification│
                              │ • Caption retrieval │
                              │ • Scene detection   │
                              └─────────────────────┘
                                    num_cpus=1

Request lifecycle:

  1. VideoAnalyzer receives HTTP request with S3 video URI
  2. Downloads video from S3, splits into fixed-duration chunks using FFmpeg
  3. Sends all chunks to VideoEncoder concurrently
  4. Encoder returns embedding references (stored in Ray object store)
  5. VideoAnalyzer sends embeddings to MultiDecoder serially (for EMA state continuity)
  6. Aggregates results and returns tags, captions, and scene changes

Why Ray Serve?

This pipeline has three distinct workloads: a GPU-bound encoder running SigLIP, a CPU-bound decoder doing cosine similarity, and a CPU-heavy ingress running FFmpeg. Traditional serving frameworks force you to bundle these into a single container with fixed resources, wasting GPU when the decoder runs or starving FFmpeg when the encoder dominates.

Ray Serve solves this with heterogeneous resource allocation per deployment. The encoder requests 1 GPU, the decoder requests 1 CPU, and the ingress requests 6 CPUs for parallel FFmpeg. Each deployment scales independently based on its own queue depth—GPU replicas scale with encoding demand while CPU replicas scale separately with decoding demand. The load test demonstrates this: throughput scales near-linearly from 2.4 to 67.5 requests/second as the system provisions replicas to match load.

The pipeline also benefits from zero-copy data transfer. The ingress passes encoder results directly to the decoder as unawaited DeploymentResponse references rather than serialized data. Ray stores the embeddings in its object store, and the decoder retrieves them directly without routing through the ingress. When encoder and decoder land on the same node, this transfer is zero-copy.

Request pipelining keeps the GPU saturated. By allowing two concurrent requests per encoder replica via max_ongoing_requests, one request prepares data on CPU while another computes on GPU. This achieves 100% GPU utilization without batching, which would add latency from waiting for requests to accumulate.

Finally, deployment composition lets you define the encoder, decoder, and ingress as separate classes, then wire them together with .bind(). Ray Serve handles deployment ordering, health checks, and request routing. The ingress maintains explicit state (EMA for scene detection) across chunks, which works correctly even when autoscaling routes requests to different decoder replicas—no sticky sessions required.


Setup

Prerequisites

Before running this notebook, ensure you have:

RequirementPurpose
Pexels API keyDownload sample video (free at https://www.pexels.com/api/)
S3 bucketStore videos and text embeddings
AWS credentialsRead/write access to your S3 bucket
ffmpegVideo processing and frame extraction
GPURun SigLIP model for encoding (1 GPU minimum)

Set these environment variables before running:

bash
export PEXELS_API_KEY="your-pexels-api-key"
export S3_BUCKET="your-bucket-name"
export AWS_ACCESS_KEY_ID="..."
export AWS_SECRET_ACCESS_KEY="..."

Note on GPU type: The benchmarks, design choices, and hyperparameters in this notebook were tuned for NVIDIA L4 GPUs. Different GPU types (A10G, T4, A100, etc.) have different memory bandwidth, compute throughput, and batch characteristics. You may need to adjust max_ongoing_requests, chunk sizes, and concurrency limits for optimal performance on other hardware.

python
import os

PEXELS_API_KEY = os.environ.get("PEXELS_API_KEY")  # Or set directly: "your-api-key"
S3_BUCKET = os.environ.get("S3_BUCKET")  # Or set directly: "your-bucket"

Download sample video

Before running the pipeline, we need a sample video in S3. This section downloads a video from Pexels, normalizes it, and uploads to S3.

Why normalize videos? We re-encode all videos to a consistent format:

  • 384×384 resolution: Matches SigLIP's input size exactly, eliminating resize during inference
  • 30 fps: Standardizes frame timing for consistent chunk boundaries
  • H.264 codec (libx264): Fast seeking—FFmpeg can jump directly to any timestamp without decoding preceding frames. Some source codecs (VP9, HEVC) require sequential decoding, adding latency for chunk extraction
python
import asyncio

try:
    asyncio.get_running_loop()
except RuntimeError:
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
python
from scripts.download_stock_videos import download_sample_videos

# Download sample videos (checks for existing manifest first, skips Pexels API if found)
S3_PREFIX = "anyscale-example/stock-videos/"
video_paths = asyncio.get_event_loop().run_until_complete(download_sample_videos(
    api_key=PEXELS_API_KEY,
    bucket=S3_BUCKET,
    total=1,  # Just need one sample video
    s3_prefix=S3_PREFIX,
    overwrite=False
))

if not video_paths:
    raise RuntimeError("No videos downloaded")

SAMPLE_VIDEO_URI = video_paths[0]
print(f"\nSample video ready: {SAMPLE_VIDEO_URI}")

Generate embeddings for text bank

The decoder matches video embeddings against precomputed text embeddings for tags and descriptions. We define the text banks here and use a Ray task to compute embeddings on GPU and upload to S3.

python
from textbanks import TAGS, DESCRIPTIONS

print(f"Tags: {TAGS}")
print(f"Descriptions: {DESCRIPTIONS}")
python
import ray

from jobs.generate_text_embeddings import generate_embeddings_task

S3_EMBEDDINGS_PREFIX = "anyscale-example/embeddings/"

# Run the Ray task (uses TAGS and DESCRIPTIONS from textbanks module)
# Note: runtime_env ships local modules to worker nodes (job working_dir only applies to driver)
ray.init(runtime_env={"working_dir": "."}, ignore_reinit_error=True)
result = ray.get(generate_embeddings_task.remote(S3_BUCKET, S3_EMBEDDINGS_PREFIX))

print(f"\nText embeddings ready:")
print(f"  Tags: {result['tag_embeddings']['s3_uri']}")
print(f"  Descriptions: {result['description_embeddings']['s3_uri']}")

Build the Ray Serve application

This section walks through building the video analysis pipeline step by step, introducing Ray Serve concepts as we go.

GPU encoder

The VideoEncoder deployment runs on GPU and converts video frames to embeddings using SigLIP. Key configuration:

  • num_gpus=1: Each replica requires a dedicated GPU
  • max_ongoing_requests=2: Allows pipelining—while one request computes on GPU, another prepares data on CPU

Why no request batching? A single chunk (16 frames @ 384×384) already saturates GPU compute. Batching multiple requests would require holding them until a batch forms, adding latency without throughput gain. Instead, we use max_ongoing_requests=2 to pipeline preparation and computation.

Why asyncio.to_thread? Ray Serve deployments run in an async event loop. The encode_frames method is CPU/GPU-bound (PyTorch inference), which would block the event loop and prevent concurrent request handling. Wrapping it in asyncio.to_thread offloads the blocking work to a thread pool, keeping the event loop free to accept new requests.

Why pass DeploymentResponse to decoder? Instead of awaiting the encoder result in VideoAnalyzer and passing raw data to the decoder, we pass the unawaited DeploymentResponse directly. Ray Serve automatically resolves this reference when the decoder needs it, storing the embeddings in the object store. This avoids an unnecessary serialize/deserialize round-trip through VideoAnalyzer—the decoder retrieves data directly from the object store, enabling zero-copy transfer if encoder and decoder are on the same node.

python
from constants import MODEL_NAME

print(f"MODEL_NAME: {MODEL_NAME}")
python
from deployments.encoder import VideoEncoder
import inspect

print(inspect.getsource(VideoEncoder.func_or_class))

CPU decoder

The MultiDecoder deployment runs on CPU and performs three tasks:

  1. Tag classification: Cosine similarity between video embedding and precomputed tag embeddings
  2. Caption retrieval: Find the best-matching description from a text bank
  3. Scene detection: EMA-based anomaly detection comparing each frame to recent history

The decoder loads precomputed text embeddings from S3 at startup.

Why separate GPU/CPU deployments? The encoder needs GPU for neural network inference; the decoder only does numpy dot products. Separating them allows independent scaling—encoders are limited by GPU count, decoders scale cheaply with CPU cores. This avoids tying expensive GPU resources to lightweight CPU work.

Why EMA for scene detection? Exponential Moving Average reuses existing SigLIP embeddings without an extra model. The algorithm computes score = 1 - cosine(frame, EMA) where EMA updates as ema = 0.9*ema + 0.1*frame. A simple threshold (score > 0.15) detects abrupt scene changes while smoothing noise.

python
from deployments.decoder import MultiDecoder

print(inspect.getsource(MultiDecoder.func_or_class))

Video chunking

Before we can process a video, we need to split it into fixed-duration chunks and extract frames. The chunking process:

  1. Get video duration using ffprobe
  2. Define chunk boundaries (e.g., 0-10s, 10-20s, 20-30s for a 30s video)
  3. Extract frames in parallel using multiple concurrent ffmpeg processes
  4. Limit concurrency with asyncio.Semaphore to avoid CPU oversubscription

Each chunk extracts 16 frames uniformly sampled across its duration, resized to 384×384 (matching SigLIP's input size).

Design choices

Direct S3 download vs presigned URLs: We download the video to local disk before processing. An alternative is generating a presigned S3 URL and passing it directly to FFmpeg. Benchmarks show direct download is faster—FFmpeg's HTTP client doesn't handle S3's chunked responses as efficiently as aioboto3, and network latency compounds across multiple seeks.

Single FFmpeg vs parallel FFmpeg: Two approaches for extracting frames from multiple chunks:

  • Single FFmpeg: One process reads the entire video, using select filter to pick frames at specific timestamps
  • Parallel FFmpeg: Multiple concurrent processes, each extracting one chunk

Chose the Single FFmpeg approach since it outperforms parallel FFmpeg on longer videos and yields similar performance for typical 10s chunks. This method is both efficient and scalable as chunk counts grow.

Chunk duration: We use 10-second chunks. Shorter chunks increase overhead (more FFmpeg calls, more encoder/decoder round-trips). Longer chunks increases processing efficiency but degrade inference quality—SigLIP processes exactly 16 frames per chunk, so a 60-second chunk samples one frame every 3.75 seconds, missing fast scene changes. The 10-second sweet spot balances throughput with temporal resolution (~1.6 fps sampling).

Deployment composition

The VideoAnalyzer ingress deployment orchestrates the encoder and decoder. It uses FastAPI integration with @serve.ingress for HTTP endpoints.

Design choices

Why num_cpus=6? The analyzer runs FFmpeg for frame extraction. Each FFmpeg process uses FFMPEG_THREADS=2, and we run up to NUM_WORKERS=3 concurrent processes. So 2 × 3 = 6 CPUs ensures FFmpeg doesn't contend for CPU during parallel chunk extraction.

Why max_ongoing_requests=4? The encoder has max_ongoing_requests=2. We want the analyzer to stay ahead: while 2 videos are encoding, we download and chunk 2 more videos so they're ready when encoder slots free up. This keeps the GPU pipeline saturated without excessive memory from queued requests.

Why cache the S3 client? Creating a new aioboto3 client per request adds overhead (connection setup, credential resolution). Caching the client in __init__ and reusing it across requests amortizes this cost. The client is thread-safe and handles connection pooling internally.

Why encode in parallel but decode serially? Encoding is stateless—each chunk's frames go through SigLIP independently, so we fire all chunks concurrently with asyncio.gather. Decoding requires temporal ordering—the EMA for scene detection must process chunks in order (chunk 0's EMA state feeds into chunk 1). The VideoAnalyzer calls the decoder serially, passing EMA state from each response to the next request. This explicit state passing ensures correct scene detection even when multiple decoder replicas exist under autoscaling.

python
from app import VideoAnalyzer

print(inspect.getsource(VideoAnalyzer.func_or_class))
python
encoder = VideoEncoder.bind()
decoder = MultiDecoder.bind(bucket=S3_BUCKET, s3_prefix=S3_EMBEDDINGS_PREFIX)
app = VideoAnalyzer.bind(encoder=encoder, decoder=decoder)
python
from ray import serve

serve.run(app, name="video-analyzer", route_prefix="/", blocking=False)
python
import httpx
import time

# Wait for deployment to be ready
print("Waiting for deployment to be healthy...")
start = time.time()
while True:
    try:
        with httpx.Client(timeout=5.0) as client:
            response = client.get("http://localhost:8000/health")
            if response.status_code == 200:
                print(f"Deployment ready in {time.time() - start:.1f}s")
                break
    except httpx.RequestError:
        pass
    time.sleep(1.0)
    if time.time() - start > 120:
        raise TimeoutError("Deployment did not become healthy within 120s")

python
import httpx
import time
import uuid

# Send a sample request to the deployed service
payload = {
    "stream_id": uuid.uuid4().hex[:8],
    "video_path": SAMPLE_VIDEO_URI,
    "num_frames": 16,
    "chunk_duration": 10.0,
}

print(f"Analyzing video: {SAMPLE_VIDEO_URI}")
print(f"Stream ID: {payload['stream_id']}\n")

start = time.perf_counter()
with httpx.Client(timeout=300.0) as client:
    response = client.post("http://localhost:8000/analyze", json=payload)
latency_ms = (time.perf_counter() - start) * 1000

result = response.json()

# Print results
print("=" * 60)
print(f"Video duration: {result['video_duration']:.1f}s")
print(f"Chunks processed: {result['num_chunks']}")
print()

print("Top Tags:")
for tag in result["tags"]:
    print(f"  {tag['score']:.3f}  {tag['text']}")
print()

print("Best Caption:")
print(f"  {result['retrieval_caption']['score']:.3f}  {result['retrieval_caption']['text']}")
print()

print(f"Scene Changes: {result['num_scene_changes']}")
for sc in result["scene_changes"][:5]:  # Show first 5
    print(f"  {sc['timestamp']:6.2f}s  score={sc['score']:.3f}")
print()

print("Timing:")
timing = result["timing_ms"]
print(f"  S3 download:  {timing['s3_download_ms']:7.1f} ms")
print(f"  Video decode: {timing['decode_video_ms']:7.1f} ms")
print(f"  Encode (GPU): {timing['encode_ms']:7.1f} ms")
print(f"  Decode (CPU): {timing['decode_ms']:7.1f} ms")
print(f"  Total server: {timing['total_ms']:7.1f} ms")
print(f"  Round-trip:   {latency_ms:7.1f} ms")
print("=" * 60)

python
serve.shutdown()

Load Test Results

To evaluate the pipeline's performance under realistic conditions, we ran load tests using the client/load_test.py script with concurrency levels ranging from 2 to 64 concurrent requests. The Ray Serve application was configured with autoscaling enabled, allowing replicas to scale dynamically based on demand.

Methodology

The load test was executed using:

bash
python -m client.load_test --video s3://bucket/video.mp4 --concurrency <N>

Test parameters:

  • Concurrency levels: 2, 4, 8, 16, 32, 64
  • Chunk duration: 10 seconds
  • Frames per chunk: 16
  • Autoscaling: Enabled (replicas scale based on target_num_ongoing_requests)

The charts below show autoscaling in action during the concurrency=2 to 64 load test:

As load increases, replicas scale up to maintain target queue depth. The system reaches steady state once enough replicas are provisioned to handle the request rate.

To ensure fair comparison, we discarded the first half of each test run to exclude the autoscaling warm-up period where latencies are elevated due to replica initialization.

Results Summary

ConcurrencyRequestsP50 (ms)P95 (ms)P99 (ms)Throughput (req/s)
21528388438442.37
45908589841,0254.68
81,1038851,0651,1208.97
162,2409001,0741,12817.78
324,1419281,0951,15134.75
6417,2839671,1281,18867.55

Key findings:

  • 100% success rate across all 25,509 requests analyzed
  • Near-linear throughput scaling: Throughput increased from 2.37 req/s at concurrency 2 to 67.55 req/s at concurrency 64 (~28x improvement)
  • Stable latencies under load: P95 latency remained between 843ms and 1,128ms across all concurrency levels, demonstrating effective autoscaling

Processing Time Breakdown

The chart below shows how processing time is distributed across pipeline stages at each concurrency level:

At concurrency 64 (best throughput):

  • S3 Download: 77ms (8%)
  • Video Decode (FFmpeg): 98ms (10%)
  • Encode (GPU): <1ms (<1%) — Note: This number is artificially low due to how timing is measured. Because the output of VideoEncoder is passed directly into MultiDecoder as an argument, the instrumentation mistakenly attributes most of the computational time to the downstream stage. In reality, VideoEncoder performs the bulk of the work, so its true processing time is significantly higher than reported here.
  • Decode (CPU): 757ms (81%)

The CPU decoding stage (tag/caption generation) dominates processing time. S3 download and video decoding are relatively fast due to local caching and optimized FFmpeg settings.

Throughput Analysis

The charts below show throughput scaling and the latency-throughput tradeoff:

Observations:

  1. Linear scaling: Throughput scales almost linearly with concurrency, indicating that autoscaling successfully provisions enough replicas to handle increased load.

  2. Latency-throughput tradeoff: The right chart shows that P95 latency increases slightly as throughput grows (from ~843ms to ~1,128ms), but remains within acceptable bounds. This ~34% latency increase enables a ~28x throughput improvement.

  3. No saturation point: Even at concurrency 64, throughput continues to scale. The system could likely handle higher concurrency with additional GPU resources.

The near-linear scaling demonstrates that the pipeline architecture—with separate GPU encoder, CPU decoder, and CPU-bound ingress—allows each component to scale independently based on its resource requirements.