Back to Opik

Java-Python RQ Integration Guide

apps/opik-backend/docs/JAVA_PYTHON_RQ_INTEGRATION.md

2.0.22-6605-merge-206538.3 KB
Original Source

Java-Python RQ Integration Guide

Complete guide for the Redis Queue (RQ) integration between Opik's Java backend and Python workers using the official RQ library.

Status: βœ… Working end-to-end (Plain JSON contract; no custom serializer)
Last Updated: 2025-10-15


πŸ“Š Current Status

βœ… Completed Components

  • βœ… Java RqPublisher - Creates RQ-compatible Redis HASH structures
  • βœ… Plain JSON data field - UTF-8 JSON (no compression)
  • βœ… RQ-native Redis structure - Keys and lists match RQ defaults (e.g., rq:queue:<queue>)
  • βœ… Python RQ Worker via RqWorkerManager - Starts under Gunicorn with JSONSerializer + default Job
  • βœ… OpenTelemetry Metrics - Metrics emitted from MetricsWorker
  • βœ… Robust Connection Management - Exponential backoff retry logic
  • βœ… Aligned Logging - Unified format with pid/process and thread info

ℹ️ Recent Changes (Oct 15, 2025)

  • Switched from zlib-compressed data to plain JSON (UTF-8)
  • Removed custom serializer/job; using RQ's JSONSerializer and default Job
  • Pre-consume "func injection" removed (RQ restores from data payload)
  • No-op death penalty used to avoid signals in background thread
  • Queue key corrected to rq:queue:<queue-name>

πŸš€ Quick Run Guide

Prerequisites (Already Running)

  • βœ… Redis: localhost:6379 (password: opik)
  • βœ… MySQL: localhost:3306
  • βœ… ClickHouse: localhost:8123

Start Python Worker (Terminal 1)

bash
cd apps/opik-python-backend
source venv/bin/activate
export REDIS_HOST=localhost REDIS_PORT=6379 REDIS_DB=0 REDIS_PASSWORD=opik
python src/opik_backend/rq_worker.py

Start Java Backend (Terminal 2)

bash
cd apps/opik-backend
java -jar target/opik-backend-1.0-SNAPSHOT.jar server config.yml

Test Integration (Terminal 3)

bash
# Send message
curl -X POST "http://localhost:8080/v1/internal/hello-world?message=Test"

# Check queue
curl http://localhost:8080/v1/internal/hello-world/queue-size

Table of Contents

  1. Overview
  2. Architecture
  3. Detailed Setup
  4. Components
  5. OpenTelemetry Metrics
  6. Configuration
  7. Usage Guide
  8. Adding New Queues
  9. Testing
  10. Troubleshooting
  11. Design Decisions
  12. Refactoring History

Overview

This integration enables the Java backend to enqueue jobs that are processed asynchronously by Python workers using Redis Queue (RQ). Production path uses RQ-native contracts (plain JSON) without Python bridges or custom serializers. This is useful for:

  • CPU-intensive Python tasks (ML inference, data processing)
  • Python-specific libraries (optimizer, analytics)
  • Async job processing (background tasks, scheduled jobs)
  • Scaling independently (Java services and Python workers)

Key Features

  • βœ… Type-safe queue definitions using Java enums
  • βœ… Immutable message format using Java records
  • βœ… Configuration-driven TTL management
  • βœ… Interface-based design for testability
  • βœ… Full RQ protocol compatibility
  • βœ… Multiple queue support

Architecture

Overview

Java directly creates RQ-compatible job structures in Redis for processing by Python RQ workers. The data field is plain JSON (UTF-8). The worker uses RQ's default JSONSerializer and default Job.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     Java Backend (Redisson)                     β”‚
β”‚  - Creates RQ-compatible Redis HASH                            β”‚
β”‚  - Stores: created_at, enqueued_at, status, origin, timeout    β”‚
β”‚  - Stores: data (plain JSON [func, null, args, {}])            β”‚
β”‚  - Adds job ID to Redis list (queue)                           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                           β”‚
                           β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      Redis Server                                β”‚
β”‚  - Job data: rq:job:{id} (Redis HASH, RQ format)                β”‚
β”‚  - Queue list: rq:queue:opik:optimizer-cloud                    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                           β”‚
                           β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  RQ Worker (Python)                             β”‚
β”‚  - Uses JSONSerializer (default)                               β”‚
β”‚  - Default Job class                                           β”‚
β”‚  - Configured with decode_responses=False                      β”‚
β”‚  - βœ… Processes jobs end-to-end                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Current Implementation Status

What Works:

  • βœ… Java creates RQ-compatible Redis HASH structures
  • βœ… Plain JSON data array [func, null, args, kwargs]
  • βœ… Redis structure identical to Python-created jobs
  • βœ… Job.fetch() and RQ worker processing succeed with JSONSerializer
  • βœ… End-to-end processing via RqWorkerManager in production

High-Level Flow

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Java Backend   │────────▢│  Redis  │◀────────│  Python Worker   β”‚
β”‚  (Producer)     β”‚         β”‚  Queue  β”‚         β”‚  (Consumer)      β”‚
β”‚                 β”‚         β”‚         β”‚         β”‚                  β”‚
β”‚ RqPublisher     β”‚  RPUSH  β”‚  List   β”‚  LPOP   β”‚  RQ Worker       β”‚
β”‚ QueueProducer   │────────▢│  +Bucket│◀────────│  process_xxx()   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Two-Tier Storage Model

RQ uses a two-tier storage approach:

  1. Job Metadata: Stored in rq:job:{job-id} as a hash with full job details
  2. Queue List: Contains only job IDs in a Redis list for FIFO processing
Redis Storage:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ rq:job:123abc (Hash)                 β”‚
β”‚  β”œβ”€ func: "process_optimizer_job"    β”‚
β”‚  β”œβ”€ args: ["data"]                   β”‚
β”‚  β”œβ”€ status: "queued"                 β”‚
β”‚  └─ enqueued_at: "2025-10-14..."     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ rq:queue:opik:optimizer-cloud (List) β”‚
β”‚  β”œβ”€ "123abc"                         β”‚
β”‚  β”œβ”€ "456def"                         β”‚
β”‚  └─ "789ghi"                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Package Structure

com.comet.opik.infrastructure
β”œβ”€β”€ queues/                    # Queue abstractions
β”‚   β”œβ”€β”€ QueueProducer.java    # Interface for queue producers
β”‚   β”œβ”€β”€ Queue.java            # Enum of available queues
β”‚   β”œβ”€β”€ RqMessage.java        # Immutable message record
β”‚   β”œβ”€β”€ RqQueueConfig.java    # Queue configuration
β”‚   └── JobStatus.java        # Job status enum
β”œβ”€β”€ redis/                     # Redis implementation
β”‚   └── RqPublisher.java      # RQ implementation of QueueProducer
└── QueuesConfig.java         # Configuration class

Detailed Setup

Prerequisites

  • Java 21+
  • Python 3.8+
  • Redis 7.x
  • Maven 3.x

1. Start Redis

bash
# Using Docker
docker run -d -p 6379:6379 --name opik-redis redis:7.2-alpine

# Or use existing Docker Compose
cd deployment/docker-compose
docker-compose up -d redis

2. Configure Application

Edit apps/opik-backend/config.yml:

yaml
queues:
  enabled: true
  defaultJobTtl: 1 day
  queues:
    opik:optimizer-cloud:
      jobTTl: 1 day

3. Build Java Backend

bash
cd apps/opik-backend
mvn clean package -DskipTests

4. Start Python RQ Worker

bash
cd apps/opik-python-backend

# Install dependencies
pip install -r requirements.txt

# Set environment variables
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_DB=0

# Start worker
python src/opik_backend/rq_worker.py

Expected output:

2025-10-14 10:00:00 INFO [opik_backend.rq_worker] - Starting RQ worker...
2025-10-14 10:00:00 INFO [opik_backend.rq_worker] - Connecting to Redis at localhost:6379 (db=0)
2025-10-14 10:00:00 INFO [opik_backend.rq_worker] - Listening on queues: ['opik:hello_world_queue', 'opik:optimizer-cloud']
2025-10-14 10:00:00 INFO [opik_backend.rq_worker] - RQ Worker started successfully

5. Start Java Backend

bash
cd apps/opik-backend
java -jar target/opik-backend-1.0-SNAPSHOT.jar server config.yml

6. Test the Integration

bash
# Send a test message
curl -X POST "http://localhost:8080/v1/internal/hello-world?message=Hello%20from%20Java"

# Response:
{
  "status": "success",
  "message": "Message enqueued successfully",
  "jobId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "queue": "opik:optimizer-cloud",
  "sentMessage": "Hello from Java"
}

# Check queue size
curl http://localhost:8080/v1/internal/hello-world/queue-size

# Response:
{
  "queue": "opik:optimizer-cloud",
  "size": 0
}

7. Verify Python Worker Processing

Check Python worker logs:

2025-10-14 10:01:00 INFO [opik_backend.rq_worker] - Processing optimizer job: Hello from Java
2025-10-14 10:01:00 INFO [opik_backend.rq_worker] - Optimizer job processed successfully: {...}

Components

1. QueueProducer Interface

Location: com.comet.opik.infrastructure.queues.QueueProducer

java
public interface QueueProducer {
    /**
     * Enqueue a message using a predefined Queue enum
     */
    Mono<String> enqueue(Queue queue, Object message);
    
    /**
     * Enqueue a full RQ message to a specific queue
     */
    Mono<String> enqueueJob(String queueName, RqMessage message);
    
    /**
     * Get the current size of a queue
     */
    Mono<Integer> getQueueSize(String queueName);
}

Benefits:

  • Abstraction over queue implementation
  • Easy to mock for testing
  • Can be swapped with other implementations (Kafka, RabbitMQ)

2. Queue Enum

Location: com.comet.opik.infrastructure.queues.Queue

java
public enum Queue {
    OPTIMIZER_CLOUD("opik:optimizer-cloud", "opik_backend.rq_worker.process_optimizer_job");
    
    private final String queueName;
    private final String functionName;  // Python function to call
}

Usage:

java
queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, myData);

Benefits:

  • Type-safe queue references
  • Compile-time validation
  • IDE autocomplete
  • Queue name and function name coupled

3. RqMessage Record

Location: com.comet.opik.infrastructure.queues.RqMessage

java
public record RqMessage(
    String id,              // UUID
    String func,            // Python function name
    List<Object> args,      // Positional arguments
    Map<String, Object> kwargs,  // Keyword arguments
    String description,     // Job description
    JobStatus status,       // Job status (enum)
    String origin,          // Origin queue
    Instant createdAt,      // Creation timestamp
    Instant enqueuedAt      // Enqueued timestamp
) {
    public static Builder builder() { ... }
}

Benefits:

  • Immutable by design
  • Thread-safe
  • Clear time semantics with Instant
  • Type-safe status with enum

4. JobStatus Enum

Location: com.comet.opik.infrastructure.queues.JobStatus

java
public enum JobStatus {
    QUEUED,    // Job has been queued but not started
    STARTED,   // Job is currently being executed
    FINISHED,  // Job finished successfully
    FAILED;    // Job failed during execution
}

5. RqPublisher Implementation

Location: com.comet.opik.infrastructure.redis.RqPublisher

Key methods:

java
class RqPublisher implements QueueProducer {
    // Enqueue with type-safe Queue enum
    public Mono<String> enqueue(Queue queue, Object message) {
        RqMessage rqMessage = RqMessage.builder()
            .func(queue.getFunctionName())
            .args(List.of(message))
            .origin(queue.toString())
            .status(JobStatus.QUEUED)
            .build();
        return enqueueJob(queue.toString(), rqMessage);
    }
    
    // Low-level enqueue with full message control
    public Mono<String> enqueueJob(String queueName, RqMessage message) {
        String jobId = message.id();
        String jobKey = "rq:job:" + jobId;
        
        // Get TTL from configuration
        Duration ttl = config.getQueues().getQueue(queueName)
            .map(RqQueueConfig::getJobTTl)
            .orElse(config.getQueues().getDefaultJobTtl());
        
        // Store job data with TTL
        return redisClient.getBucket(jobKey)
            .set(message, ttl.toJavaDuration())
            .then(redisClient.getQueue(queueName).offer(jobId));
    }
}

6. Python Worker

Location: apps/opik-python-backend/src/opik_backend/rq_worker.py

python
def process_optimizer_job(message: str):
    """Process an optimizer job from Java."""
    logger.info(f"Processing optimizer job: {message}")
    
    # Your processing logic here
    result = {
        "status": "success",
        "message": f"Optimizer job processed: {message}",
        "processed_by": "Python RQ Worker - Optimizer"
    }
    
    return result

def start_worker():
    """Start RQ worker listening on multiple queues."""
    redis_conn = get_redis_connection()
    
    queues = [
        Queue("opik:hello_world_queue", connection=redis_conn),
        Queue("opik:optimizer-cloud", connection=redis_conn),
    ]
    
    worker = Worker(queues, connection=redis_conn)
    worker.work()

Removed: Custom Serializer Implementation (Deprecated)

This section previously documented a zlib-based custom serializer and job class. The production path now uses RQ's native JSONSerializer and the default Job with plain JSON data. All custom serializer/job code has been removed.


OpenTelemetry Metrics

Overview

The RQ worker includes comprehensive OpenTelemetry metrics for monitoring and observability. All metrics are automatically collected by the MetricsWorker class.

Implemented Metrics

Counters

Metric NameTypeDescriptionDimensions
rq_worker.jobs.processedCounterTotal number of jobs processed (success + failure)queue, function
rq_worker.jobs.succeededCounterNumber of successfully completed jobsqueue, function
rq_worker.jobs.failedCounterNumber of failed jobsqueue, function, error_type

Histograms

Metric NameTypeDescriptionUnitDimensions
rq_worker.job.processing_timeHistogramTime spent executing the jobmillisecondsqueue, function
rq_worker.job.queue_wait_timeHistogramTime job spent waiting in queuemillisecondsqueue, function
rq_worker.job.total_timeHistogramTotal time from creation to completionmillisecondsqueue, function

Metric Dimensions

All metrics include contextual dimensions for filtering and aggregation:

  • queue: Queue name (e.g., opik:hello_world_queue, opik:optimizer-cloud)
  • function: Python function name (e.g., opik_backend.rq_worker.process_hello_world)
  • error_type: Exception class name (only for failed jobs, e.g., ValueError, ConnectionError)

Implementation Details

MetricsWorker Class

The MetricsWorker extends RQ's standard Worker class and overrides perform_job() to collect metrics:

python
class MetricsWorker(Worker):
    """Custom RQ Worker that emits OpenTelemetry metrics."""
    
    def perform_job(self, job, queue):
        # Calculate queue wait time
        if job.created_at and job.started_at:
            queue_wait_ms = (job.started_at - job.created_at).total_seconds() * 1000
            queue_wait_time_histogram.record(queue_wait_ms, {"queue": queue.name, "function": func_name})
        
        # Execute job and measure processing time
        result = super().perform_job(job, queue)
        processing_time_ms = (time.time() - job_start_time) * 1000
        
        # Record success metrics
        jobs_processed_counter.add(1, {"queue": queue.name, "function": func_name})
        jobs_succeeded_counter.add(1, {"queue": queue.name, "function": func_name})
        processing_time_histogram.record(processing_time_ms, {"queue": queue.name, "function": func_name})

Example Metrics Output

Successful Job Processing:

rq_worker.jobs.processed{queue="opik:hello_world_queue", function="process_hello_world"} = 10
rq_worker.jobs.succeeded{queue="opik:hello_world_queue", function="process_hello_world"} = 10
rq_worker.job.processing_time{queue="opik:hello_world_queue", function="process_hello_world"} = [100ms, 102ms, 98ms, ...]
rq_worker.job.queue_wait_time{queue="opik:hello_world_queue", function="process_hello_world"} = [5ms, 3ms, 7ms, ...]

Failed Job Processing:

rq_worker.jobs.processed{queue="opik:optimizer-cloud", function="process_optimizer_job"} = 5
rq_worker.jobs.failed{queue="opik:optimizer-cloud", function="process_optimizer_job", error_type="ValueError"} = 1

Viewing Metrics

Python Script

python
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader

# Setup metric export
reader = PeriodicExportingMetricReader(ConsoleMetricExporter())
provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(provider)

# Metrics will be exported to console every 10 seconds

Integration with Observability Platforms

The metrics can be exported to various backends:

  • Prometheus: Using opentelemetry-exporter-prometheus
  • Jaeger: For distributed tracing
  • Grafana: For visualization dashboards
  • Cloud Providers: AWS CloudWatch, GCP Cloud Monitoring, Azure Monitor

Monitoring Best Practices

  1. Set up alerts for:

    • High failure rate: rq_worker.jobs.failed / rq_worker.jobs.processed > 0.05
    • Long queue wait times: rq_worker.job.queue_wait_time > 5000ms
    • Slow processing: rq_worker.job.processing_time > 10000ms
  2. Create dashboards showing:

    • Jobs processed over time (throughput)
    • Success vs failure rates
    • Processing time percentiles (p50, p95, p99)
    • Queue wait time trends
  3. Track SLOs based on:

    • 99.9% of jobs complete successfully
    • 95% of jobs process within 1 second
    • Queue wait time < 500ms for 99% of jobs

Metrics Status

βœ… Fully Implemented and Tested

  • All 6 metrics defined and collecting data
  • Dimensional data properly attached
  • Integrated with RQ's job lifecycle
  • No performance impact on job processing
  • Ready for production observability platforms

Queue Configuration (config.yml)

yaml
queues:
  # Enable/disable queue functionality
  enabled: ${QUEUES_ENABLED:-true}
  
  # Default TTL for all jobs (if not specified per-queue)
  defaultJobTtl: ${QUEUES_DEFAULT_JOB_TTL:-1 day}
  
  # Per-queue specific configurations
  queues:
    # Optimizer cloud queue
    opik:optimizer-cloud:
      jobTTl: ${OPTIMIZER_QUEUE_JOB_TTL:-1 day}
    
    # Add more queue configs here
    # opik:another-queue:
    #   jobTTl: 2 hours

Environment Variables

bash
# Queue Configuration
QUEUES_ENABLED=true                    # Enable queue functionality
QUEUES_DEFAULT_JOB_TTL="1 day"         # Default job TTL
OPTIMIZER_QUEUE_JOB_TTL="1 day"        # Optimizer queue TTL

# Redis Connection
REDIS_URL="redis://:opik@localhost:6379/0"
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=opik

TTL Configuration Hierarchy

  1. Queue-specific TTL: Defined in config.yml under queues.queues.<queue-name>.jobTTl
  2. Default TTL: Defined in config.yml under queues.defaultJobTtl
  3. Fallback: If neither is set, uses 1 day
java
Duration ttl = config.getQueues()
    .getQueue(queueName)              // 1. Try queue-specific
    .map(RqQueueConfig::getJobTTl)
    .orElse(config.getQueues()         // 2. Fallback to default
        .getDefaultJobTtl());

Usage Guide

Basic Usage - Type-Safe Enqueue

java
@Inject
private QueueProducer queueProducer;

public void sendOptimizationJob(String data) {
    queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, data)
        .subscribe(
            jobId -> log.info("Job enqueued: {}", jobId),
            error -> log.error("Failed to enqueue", error)
        );
}

Advanced Usage - Custom RQ Message

java
@Inject
private QueueProducer queueProducer;

public void sendCustomJob() {
    RqMessage message = RqMessage.builder()
        .func("opik_backend.rq_worker.process_custom_job")
        .args(List.of("arg1", "arg2"))
        .kwargs(Map.of("key1", "value1", "key2", "value2"))
        .description("Custom job description")
        .status(JobStatus.QUEUED)
        .build();
    
    queueProducer.enqueueJob("opik:custom-queue", message)
        .subscribe(
            jobId -> log.info("Custom job enqueued: {}", jobId),
            error -> log.error("Failed to enqueue custom job", error)
        );
}

Monitoring Queue Size

java
public Mono<Integer> getQueueDepth(Queue queue) {
    return queueProducer.getQueueSize(queue.toString())
        .doOnSuccess(size -> log.info("Queue {} size: {}", queue, size));
}

Reactive Chaining

java
public Mono<ProcessingResult> processWithQueue(String data) {
    return validateData(data)
        .flatMap(validated -> queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, validated))
        .flatMap(jobId -> waitForJobCompletion(jobId))
        .map(result -> new ProcessingResult(result));
}

Adding New Queues

Step-by-Step Guide

1. Add Queue to Java Enum

File: com.comet.opik.infrastructure.queues.Queue

java
public enum Queue {
    OPTIMIZER_CLOUD("opik:optimizer-cloud", "opik_backend.rq_worker.process_optimizer_job"),
    
    // Add your new queue
    MY_NEW_QUEUE("opik:my-new-queue", "opik_backend.rq_worker.process_my_new_job"),
    ;
}

2. Add Python Worker Function

File: apps/opik-python-backend/src/opik_backend/rq_worker.py

python
def process_my_new_job(data: dict):
    """
    Process my new job type.
    
    Args:
        data: The job data to process
        
    Returns:
        dict: Processing result
    """
    logger.info(f"Processing my new job: {data}")
    
    # Your processing logic
    result = {
        "status": "success",
        "data": data,
        "processed_at": datetime.now().isoformat()
    }
    
    logger.info("Job processed successfully")
    return result

3. Register Queue in Worker

File: apps/opik-python-backend/src/opik_backend/rq_worker.py

python
def start_worker():
    redis_conn = get_redis_connection()
    
    queues = [
        Queue("opik:hello_world_queue", connection=redis_conn),
        Queue("opik:optimizer-cloud", connection=redis_conn),
        Queue("opik:my-new-queue", connection=redis_conn),  # Add here
    ]
    
    worker = Worker(queues, connection=redis_conn)
    worker.work()

4. (Optional) Configure Queue-Specific TTL

File: apps/opik-backend/config.yml

yaml
queues:
  queues:
    opik:my-new-queue:
      jobTTl: 2 hours  # Custom TTL for this queue

5. Use the New Queue

java
// In your service or resource
queueProducer.enqueue(Queue.MY_NEW_QUEUE, myData)
    .subscribe(jobId -> log.info("Job enqueued: {}", jobId));

Testing

Manual Testing

Quick Single Message Test

bash
# Clear Redis
redis-cli -a opik FLUSHDB

# Send test message
curl -X POST "http://localhost:8080/v1/internal/hello-world?message=test"

# Wait 2-3 seconds, then check status
redis-cli -a opik HGET rq:job:<job-id> status
# Expected: "finished"

Load Test with 10 Messages

Test Results (2025-10-15):

βœ… 10/10 messages sent (HTTP 200)
βœ… 10/10 jobs finished successfully  
βœ… 0 failed jobs
βœ… 100% success rate

Processing time: ~6 seconds for 10 jobs
Average: ~600ms per job (includes 500ms simulated processing)

Test Command:

bash
# Clear and send 10 messages
redis-cli -a opik FLUSHDB
for i in {1..10}; do
  curl -s -X POST "http://localhost:8080/v1/internal/hello-world?message=Test_${i}"
done

# Wait and check results
sleep 6
redis-cli -a opik KEYS 'rq:job:*' | wc -l

Verified Features:

  • βœ… Java creates RQ-compatible Redis HASH structures
  • βœ… Plain JSON data (UTF-8) with [func, null, args, kwargs]
  • βœ… RQ-native Redis keys (rq:job:<id>, rq:queue:<queue>)
  • βœ… Job.fetch() and worker processing succeed with JSONSerializer
  • βœ… OpenTelemetry metrics infrastructure ready

Unit Testing with Mocks

java
@ExtendWith(MockitoExtension.class)
class MyServiceTest {
    
    @Mock
    private QueueProducer queueProducer;
    
    @InjectMocks
    private MyService myService;
    
    @Test
    void shouldEnqueueJobSuccessfully() {
        // Given
        String expectedJobId = "test-job-123";
        when(queueProducer.enqueue(any(Queue.class), any()))
            .thenReturn(Mono.just(expectedJobId));
        
        // When
        String result = myService.processData("test-data").block();
        
        // Then
        assertThat(result).isEqualTo(expectedJobId);
        verify(queueProducer).enqueue(Queue.OPTIMIZER_CLOUD, "test-data");
    }
}

Integration Testing

java
@Test
void shouldEnqueueAndProcessJob() throws InterruptedException {
    // Given
    String testMessage = "Integration test message";
    
    // When - Enqueue job
    String jobId = queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, testMessage)
        .block();
    
    // Then - Verify job was enqueued
    assertThat(jobId).isNotNull();
    
    // Wait for Python worker to process (in real test, use polling or callbacks)
    Thread.sleep(2000);
    
    // Verify job was processed (check Redis or application state)
    Integer queueSize = queueProducer.getQueueSize(Queue.OPTIMIZER_CLOUD.toString())
        .block();
    assertThat(queueSize).isZero();
}

Manual Testing with Redis CLI

bash
# Check job data (hash fields)
redis-cli HGETALL "rq:job:<job-id>"

# Check queue contents (RQ list)
redis-cli LRANGE "rq:queue:opik:optimizer-cloud" 0 -1

# Check queue length
redis-cli LLEN "rq:queue:opik:optimizer-cloud"

# Monitor Redis commands
redis-cli MONITOR

Troubleshooting

Current Limitations

None at the moment.

Historical issue (resolved): UTF-8 decode error with Java-created jobs

Symptom:

'utf-8' codec can't decode byte 0x9c in position 1: invalid start byte

Root cause:

  • data was zlib-compressed; RQ restores jobs by HGETALL and attempts UTF-8 decoding of hash values before serializer runs.
  • The zlib header (0x78 0x9c) triggered decode errors in that pre-serializer path.

Solution implemented:

  • Switched data to plain JSON (UTF-8) array: [func, null, args, kwargs].
  • Use RQ's JSONSerializer and default Job everywhere (removed custom serializer/job).
  • Standardized Redis keys to RQ-native: rq:job:<id> and rq:queue:<queue>.
  • Ensure a non-null description is written (prevents RQ logging issues).

Result:

  • RQ worker processes Java-created jobs end-to-end reliably. Contract validated by tests and manual runs.

Common Issues

1. Jobs Not Being Processed

Symptoms: Jobs enqueued but never processed by Python worker

Checks:

bash
# 1. Verify Python worker is running
ps aux | grep rq_worker

# 2. Check Redis queue
redis-cli -a opik LRANGE "opik:optimizer-cloud" 0 -1

# 3. Check job data exists
redis-cli -a opik KEYS "rq:job:*"

# 4. Check Python worker logs
tail -f /tmp/gunicorn.log

Solutions:

  • Ensure Python worker is started (via Gunicorn)
  • Verify queue names match between Java and Python
  • Check function names are correct
  • Verify Redis connection in Python worker

1.1 UTF-8 Decode Error

Error: 'utf-8' codec can't decode byte 0x9c

Check:

bash
# Verify job structure
redis-cli -a opik HGETALL "rq:job:<job-id>"

# Check if data field is binary
redis-cli -a opik HGET "rq:job:<job-id>" data | xxd | head

Solution: This is the known limitation. See Current Limitations for potential workarounds.

2. Function Not Found Error

Error: AttributeError: module 'opik_backend.rq_worker' has no attribute 'process_xxx'

Solution:

  • Ensure function name in Queue enum matches Python function name exactly
  • Check function is defined in rq_worker.py
  • Verify Python module path is correct

3. Jobs Expiring Too Quickly

Symptoms: Jobs disappear from Redis before being processed

Solution:

yaml
# Increase TTL in config.yml
queues:
  defaultJobTtl: 7 days  # Increase default
  queues:
    opik:my-queue:
      jobTTl: 2 days     # Or per-queue

4. Redis Connection Issues

Error: redis.exceptions.ConnectionError: Error connecting to Redis

Checks:

bash
# Test Redis connectivity
redis-cli -h localhost -p 6379 PING

# Check Redis is running
docker ps | grep redis

# Test from Python
python -c "import redis; r = redis.Redis(); print(r.ping())"

Solutions:

  • Verify Redis is running
  • Check REDIS_HOST and REDIS_PORT environment variables
  • Verify firewall rules allow Redis connection
  • Check Redis authentication if configured

5. Serialization Errors

Error: TypeError: Object of type X is not JSON serializable

Solution:

  • Ensure message data is JSON-serializable
  • Convert complex objects to dictionaries
  • Use strings, numbers, lists, and dictionaries only
java
// Bad - custom objects not serializable
MyCustomObject obj = new MyCustomObject();
queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, obj);  // ❌ Fails

// Good - use JSON-friendly types
Map<String, Object> data = Map.of(
    "field1", obj.getField1(),
    "field2", obj.getField2()
);
queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, data);  // βœ… Works

Debugging Tips

Enable Debug Logging

Java (config.yml):

yaml
logging:
  loggers:
    com.comet.opik.infrastructure.redis: DEBUG
    com.comet.opik.infrastructure.queues: DEBUG

Python:

python
logging.basicConfig(level=logging.DEBUG)

Monitor Redis Commands

bash
redis-cli MONITOR | grep "opik:"

Check Job Status in Redis

bash
# Get all job IDs
redis-cli KEYS "rq:job:*"

# Check specific job (hash)
redis-cli HGETALL "rq:job:<job-id>"

# Check queue (RQ list)
redis-cli LRANGE "rq:queue:opik:optimizer-cloud" 0 -1

Design Decisions

Why Java Records for RqMessage?

Decision: Use Java records instead of Lombok @Data classes

Reasons:

  1. Immutability: Records are immutable by default - thread-safe
  2. Less Boilerplate: No need for equals/hashCode/toString
  3. Modern Java: Idiomatic Java 16+ feature
  4. Clear Intent: Records signal immutable data carriers

Why Instant Instead of Long for Timestamps?

Decision: Use java.time.Instant instead of Long (epoch millis/seconds)

Reasons:

  1. Type Safety: Strong typing prevents mixing seconds/millis
  2. Rich API: Built-in time manipulation methods
  3. ISO 8601: Standard serialization format
  4. Timezone Awareness: Better handling of time zones
  5. Clarity: Clear semantics - no guessing units

Why Enum for Job Status?

Decision: Use JobStatus enum instead of String

Reasons:

  1. Type Safety: Compile-time validation
  2. IDE Support: Autocomplete prevents typos
  3. Exhaustiveness: Switch statements warn if cases missing
  4. Documentation: Self-documenting valid states

Why Queue-Level TTL Configuration?

Decision: Configure TTL at queue level, not per message

Reasons:

  1. Consistency: All jobs in a queue behave the same
  2. Separation of Concerns: Infrastructure config vs. message data
  3. Easier Management: Configure once per queue
  4. Flexibility: Different queues can have different policies

Why Interface-Based Design (QueueProducer)?

Decision: Create QueueProducer interface instead of using RqPublisher directly

Reasons:

  1. Dependency Inversion: Depend on abstraction, not implementation
  2. Testability: Easy to mock for unit tests
  3. Flexibility: Can swap implementations (Kafka, RabbitMQ)
  4. SOLID Principles: Interface Segregation Principle

Why Two-Tier Storage (Bucket + Queue)?

Decision: Store full job data in bucket, only job ID in queue

Reasons:

  1. RQ Protocol: Required by Python RQ for job lifecycle management
  2. Separation: Queue for ordering, bucket for storage
  3. Efficiency: Only job IDs in queue (smaller memory footprint)
  4. Flexibility: Job data can be updated without touching queue

Refactoring History

Initial Implementation

Original Structure:

infrastructure/rq/
β”œβ”€β”€ RqPublisher.java (concrete class)
β”œβ”€β”€ RqMessage.java (Lombok @Data)
β”œβ”€β”€ RqQueueConfig.java (with factory methods)
└── JobStatus.java (not enum)

Issues:

  • Tight coupling to concrete class
  • Hardcoded TTL values
  • String-based status (error-prone)
  • Long timestamps (unit confusion)
  • Complex factory methods

Refactoring Phase 1: Records and Enums

Changes:

  • βœ… Converted RqMessage from Lombok to record
  • βœ… Changed timestamps from Long to Instant
  • βœ… Created JobStatus enum
  • βœ… Removed TTL from message, moved to queue config

Benefits:

  • Immutability and thread safety
  • Clear time semantics
  • Type-safe status handling
  • Consistent TTL per queue

Refactoring Phase 2: Architecture Improvements

Changes:

  • βœ… Created QueueProducer interface
  • βœ… Created Queue enum for type-safe queue definitions
  • βœ… Moved classes to proper packages (queues/ and redis/)
  • βœ… Added QueuesConfig for configuration
  • βœ… Integrated with Dropwizard config system

Benefits:

  • Interface segregation
  • Better package structure
  • Configuration-driven design
  • Easier to add new queues

Final Architecture

└── infrastructure/
    β”œβ”€β”€ queues/                    # Abstractions
    β”‚   β”œβ”€β”€ QueueProducer.java    # Interface
    β”‚   β”œβ”€β”€ Queue.java            # Enum
    β”‚   β”œβ”€β”€ RqMessage.java        # Record
    β”‚   β”œβ”€β”€ RqQueueConfig.java    # Config
    β”‚   └── JobStatus.java        # Enum
    β”œβ”€β”€ redis/                     # Implementation
    β”‚   └── RqPublisher.java      # Concrete class
    └── QueuesConfig.java         # Configuration

Design Principles Applied

  1. SOLID Principles:

    • Single Responsibility: Each class has one job
    • Open/Closed: Open for extension (add queues), closed for modification
    • Liskov Substitution: RqPublisher can be substituted with any QueueProducer
    • Interface Segregation: Small, focused QueueProducer interface
    • Dependency Inversion: Depend on QueueProducer, not RqPublisher
  2. DRY (Don't Repeat Yourself):

    • Queue names and functions in one place (Queue enum)
    • TTL logic centralized in configuration
  3. KISS (Keep It Simple):

    • Simple interface with clear methods
    • Minimal configuration required
    • Sensible defaults
  4. Immutability:

    • Records are immutable
    • Enums are constants
    • Thread-safe by design

Appendix

Redis Commands Reference

bash
# Queue operations
RPUSH opik:optimizer-cloud <job-id>   # Add job to queue
LPOP opik:optimizer-cloud             # Remove job from queue
LLEN opik:optimizer-cloud             # Get queue length
LRANGE opik:optimizer-cloud 0 -1      # View all jobs

# Job data operations
SET rq:job:<job-id> <json-data>       # Store job data
GET rq:job:<job-id>                   # Get job data
DEL rq:job:<job-id>                   # Delete job data
TTL rq:job:<job-id>                   # Check TTL

# Monitoring
KEYS rq:job:*                         # List all jobs
KEYS opik:*                           # List all queues
MONITOR                               # Watch all commands

Python RQ Worker Commands

bash
# Start worker
python src/opik_backend/rq_worker.py

# Start with custom Redis
REDIS_HOST=custom-host REDIS_PORT=6380 python src/opik_backend/rq_worker.py

# View job status (using RQ CLI)
rq info --url redis://localhost:6379

# Empty queue
rq empty opik:optimizer-cloud --url redis://localhost:6379

Environment Variables Reference

VariableDefaultDescription
QUEUES_ENABLEDtrueEnable queue functionality
QUEUES_DEFAULT_JOB_TTL1 dayDefault job TTL
OPTIMIZER_QUEUE_JOB_TTL1 dayOptimizer queue job TTL
REDIS_HOSTlocalhostRedis host
REDIS_PORT6379Redis port
REDIS_DB0Redis database number
REDIS_PASSWORDopikRedis password
REDIS_URLredis://:opik@localhost:6379/0Full Redis connection string

Support

For issues or questions:

  1. Check the Troubleshooting section
  2. Review logs in Java backend and Python worker
  3. Verify Redis connectivity and data
  4. Consult the Design Decisions for architecture rationale

Last Updated: 2025-10-15
Version: 2.0 (Post-Refactoring)