apps/opik-backend/docs/JAVA_PYTHON_RQ_INTEGRATION.md
Complete guide for the Redis Queue (RQ) integration between Opik's Java backend and Python workers using the official RQ library.
Status: β
Working end-to-end (Plain JSON contract; no custom serializer)
Last Updated: 2025-10-15
data field - UTF-8 JSON (no compression)rq:queue:<queue>)MetricsWorkerdata to plain JSON (UTF-8)JSONSerializer and default Jobdata payload)rq:queue:<queue-name>opik)cd apps/opik-python-backend
source venv/bin/activate
export REDIS_HOST=localhost REDIS_PORT=6379 REDIS_DB=0 REDIS_PASSWORD=opik
python src/opik_backend/rq_worker.py
cd apps/opik-backend
java -jar target/opik-backend-1.0-SNAPSHOT.jar server config.yml
# Send message
curl -X POST "http://localhost:8080/v1/internal/hello-world?message=Test"
# Check queue
curl http://localhost:8080/v1/internal/hello-world/queue-size
This integration enables the Java backend to enqueue jobs that are processed asynchronously by Python workers using Redis Queue (RQ). Production path uses RQ-native contracts (plain JSON) without Python bridges or custom serializers. This is useful for:
Java directly creates RQ-compatible job structures in Redis for processing by Python RQ workers. The data field is plain JSON (UTF-8). The worker uses RQ's default JSONSerializer and default Job.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Java Backend (Redisson) β
β - Creates RQ-compatible Redis HASH β
β - Stores: created_at, enqueued_at, status, origin, timeout β
β - Stores: data (plain JSON [func, null, args, {}]) β
β - Adds job ID to Redis list (queue) β
ββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Redis Server β
β - Job data: rq:job:{id} (Redis HASH, RQ format) β
β - Queue list: rq:queue:opik:optimizer-cloud β
ββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RQ Worker (Python) β
β - Uses JSONSerializer (default) β
β - Default Job class β
β - Configured with decode_responses=False β
β - β
Processes jobs end-to-end β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
What Works:
data array [func, null, args, kwargs]Job.fetch() and RQ worker processing succeed with JSONSerializerRqWorkerManager in productionβββββββββββββββββββ βββββββββββ ββββββββββββββββββββ
β Java Backend ββββββββββΆβ Redis βββββββββββ Python Worker β
β (Producer) β β Queue β β (Consumer) β
β β β β β β
β RqPublisher β RPUSH β List β LPOP β RQ Worker β
β QueueProducer ββββββββββΆβ +Bucketβββββββββββ process_xxx() β
βββββββββββββββββββ βββββββββββ ββββββββββββββββββββ
RQ uses a two-tier storage approach:
rq:job:{job-id} as a hash with full job detailsRedis Storage:
ββββββββββββββββββββββββββββββββββββββββ
β rq:job:123abc (Hash) β
β ββ func: "process_optimizer_job" β
β ββ args: ["data"] β
β ββ status: "queued" β
β ββ enqueued_at: "2025-10-14..." β
ββββββββββββββββββββββββββββββββββββββββ
ββββββββββββββββββββββββββββββββββββββββ
β rq:queue:opik:optimizer-cloud (List) β
β ββ "123abc" β
β ββ "456def" β
β ββ "789ghi" β
ββββββββββββββββββββββββββββββββββββββββ
com.comet.opik.infrastructure
βββ queues/ # Queue abstractions
β βββ QueueProducer.java # Interface for queue producers
β βββ Queue.java # Enum of available queues
β βββ RqMessage.java # Immutable message record
β βββ RqQueueConfig.java # Queue configuration
β βββ JobStatus.java # Job status enum
βββ redis/ # Redis implementation
β βββ RqPublisher.java # RQ implementation of QueueProducer
βββ QueuesConfig.java # Configuration class
# Using Docker
docker run -d -p 6379:6379 --name opik-redis redis:7.2-alpine
# Or use existing Docker Compose
cd deployment/docker-compose
docker-compose up -d redis
Edit apps/opik-backend/config.yml:
queues:
enabled: true
defaultJobTtl: 1 day
queues:
opik:optimizer-cloud:
jobTTl: 1 day
cd apps/opik-backend
mvn clean package -DskipTests
cd apps/opik-python-backend
# Install dependencies
pip install -r requirements.txt
# Set environment variables
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_DB=0
# Start worker
python src/opik_backend/rq_worker.py
Expected output:
2025-10-14 10:00:00 INFO [opik_backend.rq_worker] - Starting RQ worker...
2025-10-14 10:00:00 INFO [opik_backend.rq_worker] - Connecting to Redis at localhost:6379 (db=0)
2025-10-14 10:00:00 INFO [opik_backend.rq_worker] - Listening on queues: ['opik:hello_world_queue', 'opik:optimizer-cloud']
2025-10-14 10:00:00 INFO [opik_backend.rq_worker] - RQ Worker started successfully
cd apps/opik-backend
java -jar target/opik-backend-1.0-SNAPSHOT.jar server config.yml
# Send a test message
curl -X POST "http://localhost:8080/v1/internal/hello-world?message=Hello%20from%20Java"
# Response:
{
"status": "success",
"message": "Message enqueued successfully",
"jobId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"queue": "opik:optimizer-cloud",
"sentMessage": "Hello from Java"
}
# Check queue size
curl http://localhost:8080/v1/internal/hello-world/queue-size
# Response:
{
"queue": "opik:optimizer-cloud",
"size": 0
}
Check Python worker logs:
2025-10-14 10:01:00 INFO [opik_backend.rq_worker] - Processing optimizer job: Hello from Java
2025-10-14 10:01:00 INFO [opik_backend.rq_worker] - Optimizer job processed successfully: {...}
Location: com.comet.opik.infrastructure.queues.QueueProducer
public interface QueueProducer {
/**
* Enqueue a message using a predefined Queue enum
*/
Mono<String> enqueue(Queue queue, Object message);
/**
* Enqueue a full RQ message to a specific queue
*/
Mono<String> enqueueJob(String queueName, RqMessage message);
/**
* Get the current size of a queue
*/
Mono<Integer> getQueueSize(String queueName);
}
Benefits:
Location: com.comet.opik.infrastructure.queues.Queue
public enum Queue {
OPTIMIZER_CLOUD("opik:optimizer-cloud", "opik_backend.rq_worker.process_optimizer_job");
private final String queueName;
private final String functionName; // Python function to call
}
Usage:
queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, myData);
Benefits:
Location: com.comet.opik.infrastructure.queues.RqMessage
public record RqMessage(
String id, // UUID
String func, // Python function name
List<Object> args, // Positional arguments
Map<String, Object> kwargs, // Keyword arguments
String description, // Job description
JobStatus status, // Job status (enum)
String origin, // Origin queue
Instant createdAt, // Creation timestamp
Instant enqueuedAt // Enqueued timestamp
) {
public static Builder builder() { ... }
}
Benefits:
InstantLocation: com.comet.opik.infrastructure.queues.JobStatus
public enum JobStatus {
QUEUED, // Job has been queued but not started
STARTED, // Job is currently being executed
FINISHED, // Job finished successfully
FAILED; // Job failed during execution
}
Location: com.comet.opik.infrastructure.redis.RqPublisher
Key methods:
class RqPublisher implements QueueProducer {
// Enqueue with type-safe Queue enum
public Mono<String> enqueue(Queue queue, Object message) {
RqMessage rqMessage = RqMessage.builder()
.func(queue.getFunctionName())
.args(List.of(message))
.origin(queue.toString())
.status(JobStatus.QUEUED)
.build();
return enqueueJob(queue.toString(), rqMessage);
}
// Low-level enqueue with full message control
public Mono<String> enqueueJob(String queueName, RqMessage message) {
String jobId = message.id();
String jobKey = "rq:job:" + jobId;
// Get TTL from configuration
Duration ttl = config.getQueues().getQueue(queueName)
.map(RqQueueConfig::getJobTTl)
.orElse(config.getQueues().getDefaultJobTtl());
// Store job data with TTL
return redisClient.getBucket(jobKey)
.set(message, ttl.toJavaDuration())
.then(redisClient.getQueue(queueName).offer(jobId));
}
}
Location: apps/opik-python-backend/src/opik_backend/rq_worker.py
def process_optimizer_job(message: str):
"""Process an optimizer job from Java."""
logger.info(f"Processing optimizer job: {message}")
# Your processing logic here
result = {
"status": "success",
"message": f"Optimizer job processed: {message}",
"processed_by": "Python RQ Worker - Optimizer"
}
return result
def start_worker():
"""Start RQ worker listening on multiple queues."""
redis_conn = get_redis_connection()
queues = [
Queue("opik:hello_world_queue", connection=redis_conn),
Queue("opik:optimizer-cloud", connection=redis_conn),
]
worker = Worker(queues, connection=redis_conn)
worker.work()
This section previously documented a zlib-based custom serializer and job class. The production path now uses RQ's native JSONSerializer and the default Job with plain JSON data. All custom serializer/job code has been removed.
The RQ worker includes comprehensive OpenTelemetry metrics for monitoring and observability. All metrics are automatically collected by the MetricsWorker class.
| Metric Name | Type | Description | Dimensions |
|---|---|---|---|
rq_worker.jobs.processed | Counter | Total number of jobs processed (success + failure) | queue, function |
rq_worker.jobs.succeeded | Counter | Number of successfully completed jobs | queue, function |
rq_worker.jobs.failed | Counter | Number of failed jobs | queue, function, error_type |
| Metric Name | Type | Description | Unit | Dimensions |
|---|---|---|---|---|
rq_worker.job.processing_time | Histogram | Time spent executing the job | milliseconds | queue, function |
rq_worker.job.queue_wait_time | Histogram | Time job spent waiting in queue | milliseconds | queue, function |
rq_worker.job.total_time | Histogram | Total time from creation to completion | milliseconds | queue, function |
All metrics include contextual dimensions for filtering and aggregation:
opik:hello_world_queue, opik:optimizer-cloud)opik_backend.rq_worker.process_hello_world)ValueError, ConnectionError)The MetricsWorker extends RQ's standard Worker class and overrides perform_job() to collect metrics:
class MetricsWorker(Worker):
"""Custom RQ Worker that emits OpenTelemetry metrics."""
def perform_job(self, job, queue):
# Calculate queue wait time
if job.created_at and job.started_at:
queue_wait_ms = (job.started_at - job.created_at).total_seconds() * 1000
queue_wait_time_histogram.record(queue_wait_ms, {"queue": queue.name, "function": func_name})
# Execute job and measure processing time
result = super().perform_job(job, queue)
processing_time_ms = (time.time() - job_start_time) * 1000
# Record success metrics
jobs_processed_counter.add(1, {"queue": queue.name, "function": func_name})
jobs_succeeded_counter.add(1, {"queue": queue.name, "function": func_name})
processing_time_histogram.record(processing_time_ms, {"queue": queue.name, "function": func_name})
Successful Job Processing:
rq_worker.jobs.processed{queue="opik:hello_world_queue", function="process_hello_world"} = 10
rq_worker.jobs.succeeded{queue="opik:hello_world_queue", function="process_hello_world"} = 10
rq_worker.job.processing_time{queue="opik:hello_world_queue", function="process_hello_world"} = [100ms, 102ms, 98ms, ...]
rq_worker.job.queue_wait_time{queue="opik:hello_world_queue", function="process_hello_world"} = [5ms, 3ms, 7ms, ...]
Failed Job Processing:
rq_worker.jobs.processed{queue="opik:optimizer-cloud", function="process_optimizer_job"} = 5
rq_worker.jobs.failed{queue="opik:optimizer-cloud", function="process_optimizer_job", error_type="ValueError"} = 1
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
# Setup metric export
reader = PeriodicExportingMetricReader(ConsoleMetricExporter())
provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(provider)
# Metrics will be exported to console every 10 seconds
The metrics can be exported to various backends:
opentelemetry-exporter-prometheusSet up alerts for:
rq_worker.jobs.failed / rq_worker.jobs.processed > 0.05rq_worker.job.queue_wait_time > 5000msrq_worker.job.processing_time > 10000msCreate dashboards showing:
Track SLOs based on:
β Fully Implemented and Tested
queues:
# Enable/disable queue functionality
enabled: ${QUEUES_ENABLED:-true}
# Default TTL for all jobs (if not specified per-queue)
defaultJobTtl: ${QUEUES_DEFAULT_JOB_TTL:-1 day}
# Per-queue specific configurations
queues:
# Optimizer cloud queue
opik:optimizer-cloud:
jobTTl: ${OPTIMIZER_QUEUE_JOB_TTL:-1 day}
# Add more queue configs here
# opik:another-queue:
# jobTTl: 2 hours
# Queue Configuration
QUEUES_ENABLED=true # Enable queue functionality
QUEUES_DEFAULT_JOB_TTL="1 day" # Default job TTL
OPTIMIZER_QUEUE_JOB_TTL="1 day" # Optimizer queue TTL
# Redis Connection
REDIS_URL="redis://:opik@localhost:6379/0"
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=opik
config.yml under queues.queues.<queue-name>.jobTTlconfig.yml under queues.defaultJobTtlDuration ttl = config.getQueues()
.getQueue(queueName) // 1. Try queue-specific
.map(RqQueueConfig::getJobTTl)
.orElse(config.getQueues() // 2. Fallback to default
.getDefaultJobTtl());
@Inject
private QueueProducer queueProducer;
public void sendOptimizationJob(String data) {
queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, data)
.subscribe(
jobId -> log.info("Job enqueued: {}", jobId),
error -> log.error("Failed to enqueue", error)
);
}
@Inject
private QueueProducer queueProducer;
public void sendCustomJob() {
RqMessage message = RqMessage.builder()
.func("opik_backend.rq_worker.process_custom_job")
.args(List.of("arg1", "arg2"))
.kwargs(Map.of("key1", "value1", "key2", "value2"))
.description("Custom job description")
.status(JobStatus.QUEUED)
.build();
queueProducer.enqueueJob("opik:custom-queue", message)
.subscribe(
jobId -> log.info("Custom job enqueued: {}", jobId),
error -> log.error("Failed to enqueue custom job", error)
);
}
public Mono<Integer> getQueueDepth(Queue queue) {
return queueProducer.getQueueSize(queue.toString())
.doOnSuccess(size -> log.info("Queue {} size: {}", queue, size));
}
public Mono<ProcessingResult> processWithQueue(String data) {
return validateData(data)
.flatMap(validated -> queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, validated))
.flatMap(jobId -> waitForJobCompletion(jobId))
.map(result -> new ProcessingResult(result));
}
File: com.comet.opik.infrastructure.queues.Queue
public enum Queue {
OPTIMIZER_CLOUD("opik:optimizer-cloud", "opik_backend.rq_worker.process_optimizer_job"),
// Add your new queue
MY_NEW_QUEUE("opik:my-new-queue", "opik_backend.rq_worker.process_my_new_job"),
;
}
File: apps/opik-python-backend/src/opik_backend/rq_worker.py
def process_my_new_job(data: dict):
"""
Process my new job type.
Args:
data: The job data to process
Returns:
dict: Processing result
"""
logger.info(f"Processing my new job: {data}")
# Your processing logic
result = {
"status": "success",
"data": data,
"processed_at": datetime.now().isoformat()
}
logger.info("Job processed successfully")
return result
File: apps/opik-python-backend/src/opik_backend/rq_worker.py
def start_worker():
redis_conn = get_redis_connection()
queues = [
Queue("opik:hello_world_queue", connection=redis_conn),
Queue("opik:optimizer-cloud", connection=redis_conn),
Queue("opik:my-new-queue", connection=redis_conn), # Add here
]
worker = Worker(queues, connection=redis_conn)
worker.work()
File: apps/opik-backend/config.yml
queues:
queues:
opik:my-new-queue:
jobTTl: 2 hours # Custom TTL for this queue
// In your service or resource
queueProducer.enqueue(Queue.MY_NEW_QUEUE, myData)
.subscribe(jobId -> log.info("Job enqueued: {}", jobId));
# Clear Redis
redis-cli -a opik FLUSHDB
# Send test message
curl -X POST "http://localhost:8080/v1/internal/hello-world?message=test"
# Wait 2-3 seconds, then check status
redis-cli -a opik HGET rq:job:<job-id> status
# Expected: "finished"
Test Results (2025-10-15):
β
10/10 messages sent (HTTP 200)
β
10/10 jobs finished successfully
β
0 failed jobs
β
100% success rate
Processing time: ~6 seconds for 10 jobs
Average: ~600ms per job (includes 500ms simulated processing)
Test Command:
# Clear and send 10 messages
redis-cli -a opik FLUSHDB
for i in {1..10}; do
curl -s -X POST "http://localhost:8080/v1/internal/hello-world?message=Test_${i}"
done
# Wait and check results
sleep 6
redis-cli -a opik KEYS 'rq:job:*' | wc -l
Verified Features:
data (UTF-8) with [func, null, args, kwargs]rq:job:<id>, rq:queue:<queue>)Job.fetch() and worker processing succeed with JSONSerializer@ExtendWith(MockitoExtension.class)
class MyServiceTest {
@Mock
private QueueProducer queueProducer;
@InjectMocks
private MyService myService;
@Test
void shouldEnqueueJobSuccessfully() {
// Given
String expectedJobId = "test-job-123";
when(queueProducer.enqueue(any(Queue.class), any()))
.thenReturn(Mono.just(expectedJobId));
// When
String result = myService.processData("test-data").block();
// Then
assertThat(result).isEqualTo(expectedJobId);
verify(queueProducer).enqueue(Queue.OPTIMIZER_CLOUD, "test-data");
}
}
@Test
void shouldEnqueueAndProcessJob() throws InterruptedException {
// Given
String testMessage = "Integration test message";
// When - Enqueue job
String jobId = queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, testMessage)
.block();
// Then - Verify job was enqueued
assertThat(jobId).isNotNull();
// Wait for Python worker to process (in real test, use polling or callbacks)
Thread.sleep(2000);
// Verify job was processed (check Redis or application state)
Integer queueSize = queueProducer.getQueueSize(Queue.OPTIMIZER_CLOUD.toString())
.block();
assertThat(queueSize).isZero();
}
# Check job data (hash fields)
redis-cli HGETALL "rq:job:<job-id>"
# Check queue contents (RQ list)
redis-cli LRANGE "rq:queue:opik:optimizer-cloud" 0 -1
# Check queue length
redis-cli LLEN "rq:queue:opik:optimizer-cloud"
# Monitor Redis commands
redis-cli MONITOR
None at the moment.
Symptom:
'utf-8' codec can't decode byte 0x9c in position 1: invalid start byte
Root cause:
data was zlib-compressed; RQ restores jobs by HGETALL and attempts UTF-8 decoding of hash values before serializer runs.0x78 0x9c) triggered decode errors in that pre-serializer path.Solution implemented:
data to plain JSON (UTF-8) array: [func, null, args, kwargs].JSONSerializer and default Job everywhere (removed custom serializer/job).rq:job:<id> and rq:queue:<queue>.description is written (prevents RQ logging issues).Result:
Symptoms: Jobs enqueued but never processed by Python worker
Checks:
# 1. Verify Python worker is running
ps aux | grep rq_worker
# 2. Check Redis queue
redis-cli -a opik LRANGE "opik:optimizer-cloud" 0 -1
# 3. Check job data exists
redis-cli -a opik KEYS "rq:job:*"
# 4. Check Python worker logs
tail -f /tmp/gunicorn.log
Solutions:
Error: 'utf-8' codec can't decode byte 0x9c
Check:
# Verify job structure
redis-cli -a opik HGETALL "rq:job:<job-id>"
# Check if data field is binary
redis-cli -a opik HGET "rq:job:<job-id>" data | xxd | head
Solution: This is the known limitation. See Current Limitations for potential workarounds.
Error: AttributeError: module 'opik_backend.rq_worker' has no attribute 'process_xxx'
Solution:
Queue enum matches Python function name exactlyrq_worker.pySymptoms: Jobs disappear from Redis before being processed
Solution:
# Increase TTL in config.yml
queues:
defaultJobTtl: 7 days # Increase default
queues:
opik:my-queue:
jobTTl: 2 days # Or per-queue
Error: redis.exceptions.ConnectionError: Error connecting to Redis
Checks:
# Test Redis connectivity
redis-cli -h localhost -p 6379 PING
# Check Redis is running
docker ps | grep redis
# Test from Python
python -c "import redis; r = redis.Redis(); print(r.ping())"
Solutions:
REDIS_HOST and REDIS_PORT environment variablesError: TypeError: Object of type X is not JSON serializable
Solution:
// Bad - custom objects not serializable
MyCustomObject obj = new MyCustomObject();
queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, obj); // β Fails
// Good - use JSON-friendly types
Map<String, Object> data = Map.of(
"field1", obj.getField1(),
"field2", obj.getField2()
);
queueProducer.enqueue(Queue.OPTIMIZER_CLOUD, data); // β
Works
Java (config.yml):
logging:
loggers:
com.comet.opik.infrastructure.redis: DEBUG
com.comet.opik.infrastructure.queues: DEBUG
Python:
logging.basicConfig(level=logging.DEBUG)
redis-cli MONITOR | grep "opik:"
# Get all job IDs
redis-cli KEYS "rq:job:*"
# Check specific job (hash)
redis-cli HGETALL "rq:job:<job-id>"
# Check queue (RQ list)
redis-cli LRANGE "rq:queue:opik:optimizer-cloud" 0 -1
Decision: Use Java records instead of Lombok @Data classes
Reasons:
Decision: Use java.time.Instant instead of Long (epoch millis/seconds)
Reasons:
Decision: Use JobStatus enum instead of String
Reasons:
Decision: Configure TTL at queue level, not per message
Reasons:
Decision: Create QueueProducer interface instead of using RqPublisher directly
Reasons:
Decision: Store full job data in bucket, only job ID in queue
Reasons:
Original Structure:
infrastructure/rq/
βββ RqPublisher.java (concrete class)
βββ RqMessage.java (Lombok @Data)
βββ RqQueueConfig.java (with factory methods)
βββ JobStatus.java (not enum)
Issues:
Changes:
RqMessage from Lombok to recordLong to InstantJobStatus enumBenefits:
Changes:
QueueProducer interfaceQueue enum for type-safe queue definitionsqueues/ and redis/)QueuesConfig for configurationBenefits:
βββ infrastructure/
βββ queues/ # Abstractions
β βββ QueueProducer.java # Interface
β βββ Queue.java # Enum
β βββ RqMessage.java # Record
β βββ RqQueueConfig.java # Config
β βββ JobStatus.java # Enum
βββ redis/ # Implementation
β βββ RqPublisher.java # Concrete class
βββ QueuesConfig.java # Configuration
SOLID Principles:
RqPublisher can be substituted with any QueueProducerQueueProducer interfaceQueueProducer, not RqPublisherDRY (Don't Repeat Yourself):
Queue enum)KISS (Keep It Simple):
Immutability:
# Queue operations
RPUSH opik:optimizer-cloud <job-id> # Add job to queue
LPOP opik:optimizer-cloud # Remove job from queue
LLEN opik:optimizer-cloud # Get queue length
LRANGE opik:optimizer-cloud 0 -1 # View all jobs
# Job data operations
SET rq:job:<job-id> <json-data> # Store job data
GET rq:job:<job-id> # Get job data
DEL rq:job:<job-id> # Delete job data
TTL rq:job:<job-id> # Check TTL
# Monitoring
KEYS rq:job:* # List all jobs
KEYS opik:* # List all queues
MONITOR # Watch all commands
# Start worker
python src/opik_backend/rq_worker.py
# Start with custom Redis
REDIS_HOST=custom-host REDIS_PORT=6380 python src/opik_backend/rq_worker.py
# View job status (using RQ CLI)
rq info --url redis://localhost:6379
# Empty queue
rq empty opik:optimizer-cloud --url redis://localhost:6379
| Variable | Default | Description |
|---|---|---|
QUEUES_ENABLED | true | Enable queue functionality |
QUEUES_DEFAULT_JOB_TTL | 1 day | Default job TTL |
OPTIMIZER_QUEUE_JOB_TTL | 1 day | Optimizer queue job TTL |
REDIS_HOST | localhost | Redis host |
REDIS_PORT | 6379 | Redis port |
REDIS_DB | 0 | Redis database number |
REDIS_PASSWORD | opik | Redis password |
REDIS_URL | redis://:opik@localhost:6379/0 | Full Redis connection string |
For issues or questions:
Last Updated: 2025-10-15
Version: 2.0 (Post-Refactoring)