Back to Rq

RQ: Workers

docs/docs/workers.md

2.918.4 KB
Original Source

A worker is a Python process that typically runs in the background and exists solely as a work horse to perform lengthy or blocking tasks that you don't want to perform inside web processes.

Starting Workers

To start crunching work, simply start a worker from the root of your project directory:

console
$ rq worker high default low
*** Listening for work on high, default, low
Got send_newsletter('[email protected]') from default
Job ended normally without result
*** Listening for work on high, default, low
...

Workers will read jobs from the given queues (the order is important) in an endless loop, waiting for new work to arrive when all jobs are done.

Each worker will process a single job at a time. Within a worker, there is no concurrent processing going on. If you want to perform jobs concurrently, simply start more workers.

You should use process managers like Supervisor or systemd to run RQ workers in production.

Burst Mode

By default, workers will start working immediately and will block and wait for new work when they run out of work. Workers can also be started in burst mode to finish all currently available work and quit as soon as all given queues are emptied.

console
$ rq worker --burst high default low
*** Listening for work on high, default, low
Got send_newsletter('[email protected]') from default
Job ended normally without result
No more work, burst finished.
Registering death.

This can be useful for batch work that needs to be processed periodically, or just to scale up your workers temporarily during peak periods.

Worker Arguments

In addition to --burst, rq worker also accepts these arguments:

  • --url or -u: URL describing Redis connection details (e.g rq worker --url redis://:[email protected]:1234/9 or rq worker --url unix:///var/run/redis/redis.sock)
  • --burst or -b: run worker in burst mode (stops after all jobs in queue have been processed).
  • --path or -P: multiple import paths are supported (e.g rq worker --path foo --path bar)
  • --config or -c: path to module containing RQ settings.
  • --results-ttl: job results will be kept for this number of seconds (defaults to 500).
  • --worker-class or -w: RQ Worker class to use (e.g rq worker --worker-class 'foo.bar.MyWorker')
  • --job-class or -j: RQ Job class to use.
  • --queue-class: RQ Queue class to use.
  • --connection-class: Redis connection class to use, defaults to redis.StrictRedis.
  • --log-format: Format for the worker logs, defaults to '%(asctime)s %(message)s'
  • --date-format: Datetime format for the worker logs, defaults to '%H:%M:%S'
  • --disable-job-desc-logging: Turn off job description logging.
  • --max-jobs: Maximum number of jobs to execute.
  • --serializer: Serializer to use. Accepts json or pickle, or a dotted import path to your own serializer (e.g. rq.serializers.JSONSerializer).

New in version 1.14.0.

  • --dequeue-strategy: The strategy to dequeue jobs from multiple queues (one of default, random or round_robin, defaults to default)
  • --max-idle-time: if specified, worker will wait for X seconds for a job to arrive before shutting down.
  • --maintenance-interval: defaults to 600 seconds. Runs maintenance tasks every X seconds.

Inside the Worker

The Worker Lifecycle

The life-cycle of a worker consists of a few phases:

  1. Boot. Loading the Python environment.
  2. Birth registration. The worker registers itself to the system so it knows of this worker.
  3. Start listening. A job is popped from any of the given Redis queues. If all queues are empty and the worker is running in burst mode, quit now. Else, wait until jobs arrive.
  4. Prepare job execution. The worker tells the system that it will begin work by setting its status to busy and registers job in the StartedJobRegistry.
  5. Fork a child process. A child process (the "work horse") is forked off to do the actual work in a fail-safe context.
  6. Process work. This performs the actual job work in the work horse.
  7. Cleanup job execution. The worker sets its status to idle and sets both the job and its result to expire based on result_ttl. Job is also removed from StartedJobRegistry and added to to FinishedJobRegistry in the case of successful execution, or FailedJobRegistry in the case of failure.
  8. Loop. Repeat from step 3.

Performance Notes

Basically the rq worker shell script is a simple fetch-fork-execute loop. When a lot of your jobs do lengthy setups, or they all depend on the same set of modules, you pay this overhead each time you run a job (since you're doing the import after the moment of forking). This is clean, because RQ won't ever leak memory this way, but also slow.

A pattern you can use to improve the throughput performance for these kind of jobs can be to import the necessary modules before the fork. There is no way of telling RQ workers to perform this set up for you, but you can do it yourself before starting the work loop.

To do this, provide your own worker script (instead of using rq worker). A simple implementation example:

python
#!/usr/bin/env python
from redis import Redis
from rq import Worker

# Preload libraries
import library_that_you_want_preloaded

# Provide the worker with the list of queues (str) to listen to.
w = Worker(['default'], connection=Redis())
w.work()

Worker Names

Workers are registered to the system under their names, which are generated randomly during instantiation (see monitoring). To override this default, specify the name when starting the worker, or use the --name cli option.

python
from redis import Redis
from rq import Queue, Worker

redis = Redis()
queue = Queue('queue_name')

# Start a worker with a custom name
worker = Worker([queue], connection=redis, name='foo')

Retrieving Worker Information

Worker instances store their runtime information in Redis. Here's how to retrieve them:

python
from redis import Redis
from rq import Queue, Worker

# Returns all workers registered in this connection
redis = Redis()
workers = Worker.all(connection=redis)

# Returns all workers in this queue (new in version 0.10.0)
queue = Queue('queue_name')
workers = Worker.all(queue=queue)
worker = workers[0]
print(worker.name)

print('Successful jobs: ' + worker.successful_job_count)
print('Failed jobs: ' + worker.failed_job_count)
print('Total working time: '+ worker.total_working_time)  # In seconds

worker also have the following properties:

  • hostname - the host where this worker is run
  • pid - worker's process ID
  • queues - queues on which this worker is listening for jobs
  • state - possible states are suspended, started, busy and idle
  • current_job - the job it's currently executing (if any)
  • last_heartbeat - the last time this worker was seen
  • birth_date - time of worker's instantiation
  • successful_job_count - number of jobs finished successfully
  • failed_job_count - number of failed jobs processed
  • total_working_time - amount of time spent executing jobs, in seconds

If you only want to know the number of workers for monitoring purposes, Worker.count() is much more performant.

python
from redis import Redis
from rq import Worker

redis = Redis()

# Count the number of workers in this Redis connection
workers = Worker.count(connection=redis)

# Count the number of workers for a specific queue
queue = Queue('queue_name', connection=redis)
workers = Worker.count(queue=queue)

Worker with Custom Serializer

When creating a worker, you can pass in a custom serializer that will be used when loading jobs from Redis. Serializers must implement loads and dumps; see rq.serializers.JSONSerializer for an example. The default serializer is pickle.

Warning: RQ uses pickle as its default serializer, which is not secure. Only run RQ against Redis instances that you trust. It is possible to construct malicious pickle data that will execute arbitrary code during unpickling.

To process jobs that were enqueued with the JSON serializer, pass either the 'json' shorthand or the JSONSerializer class itself:

python
from rq import Worker
from rq.serializers import JSONSerializer

worker = Worker('foo', serializer='json')  # or: serializer=JSONSerializer

You can also use the same serializer when creating the queue object:

python
from rq import Queue, Worker
from rq.serializers import JSONSerializer

queue = Queue('foo', serializer='json')        # or: serializer=JSONSerializer
worker = Worker([queue], serializer='json')    # or: serializer=JSONSerializer

When starting workers from the command line, pass the serializer (the shorthand json resolves to rq.serializers.JSONSerializer; a dotted import path also works):

console
$ rq worker --serializer json

Better worker process title

Worker process will have a better title (as displayed by system tools such as ps and top) after you installed a third-party package setproctitle:

sh
pip install setproctitle

Taking Down Workers

If, at any time, the worker receives SIGINT (via Ctrl+C) or SIGTERM (via kill), the worker wait until the currently running task is finished, stop the work loop and gracefully register its own death.

If, during this takedown phase, SIGINT or SIGTERM is received again, the worker will forcefully terminate the child process (sending it SIGKILL), but will still try to register its own death.

Using a Config File

If you'd like to configure rq worker via a configuration file instead of through command line arguments, you can do this by creating a Python file like settings.py:

python
REDIS_URL = 'redis://localhost:6379/1'

# You can also specify the Redis DB to use
# REDIS_HOST = 'redis.example.com'
# REDIS_PORT = 6380
# REDIS_DB = 3
# REDIS_PASSWORD = 'very secret'

# Queues to listen on
QUEUES = ['high', 'default', 'low']

# If you're using Sentry to collect your runtime exceptions, you can use this
# to configure RQ for it in a single step
# The 'sync+' prefix is required for raven: https://github.com/nvie/rq/issues/350#issuecomment-43592410
SENTRY_DSN = 'sync+http://public:[email protected]/1'

# If you want custom worker name
# NAME = 'worker-1024'

# If you want to use a dictConfig <https://docs.python.org/3/library/logging.config.html#logging-config-dictschema>
# for more complex/consistent logging requirements.
DICT_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'standard': {
            'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
        },
    },
    'handlers': {
        'default': {
            'level': 'INFO',
            'formatter': 'standard',
            'class': 'logging.StreamHandler',
            'stream': 'ext://sys.stderr',  # Default is stderr
        },
    },
    'loggers': {
        'root': {  # root logger
            'handlers': ['default'],
            'level': 'INFO',
            'propagate': False
        },
    }
}

The example above shows all the options that are currently supported.

To specify which module to read settings from, use the -c option:

console
$ rq worker -c settings

Alternatively, you can also pass in these options via environment variables.

Custom Worker Classes

There are times when you want to customize the worker's behavior. Some of the more common requests so far are:

  1. Managing database connectivity prior to running a job.
  2. Using a job execution model that does not require os.fork.
  3. The ability to use different concurrency models such as multiprocessing or gevent.
  4. Using a custom strategy for dequeuing jobs from different queues.

You can use the -w option to specify a different worker class to use:

console
$ rq worker -w 'path.to.GeventWorker'

By default, RQ also ships with a few worker classes.

SimpleWorker

A worker implementation that executes jobs in the same process, without using fork(), useful for:

  • Testing and debugging jobs
  • Environments where fork() is not available or desired

SimpleWorker does not provide periodic heartbeats during job execution. This means long-running jobs may appear to be "stuck" from monitoring tools' perspective. SimpleWorker is not recommended for production use unless you fully understand these limitations.

Usage:

python
from rq import SimpleWorker, Queue

queue = Queue('default')
worker = SimpleWorker([queue])
worker.work()

Or via CLI:

bash
rq worker -w rq.worker.SimpleWorker

RoundRobinWorker

The RoundRobinWorker dequeues jobs from multiple queues in a round-robin fashion. For example, if you have queues q1, q2, q3:

  • First job is taken from q1
  • Second job from q2
  • Third job from q3
  • Fourth job from q1 again
  • And so on...

This provides fair scheduling across queues rather than strict priority ordering.

Usage:

python
from rq import RoundRobinWorker, Queue

queues = [Queue('q1'), Queue('q2'), Queue('q3')]
worker = RoundRobinWorker(queues)
worker.work()

Or via CLI:

bash
rq worker -w rq.worker.RoundRobinWorker q1 q2 q3

RandomWorker

The RandomWorker dequeues jobs from queues randomly rather than in a fixed order. This can be useful for load balancing across queues.

Usage:

python
from rq import RandomWorker, Queue

queues = [Queue('q1'), Queue('q2'), Queue('q3')]
worker = RandomWorker(queues)
worker.work()

Or via CLI:

bash
rq worker -w rq.worker.RandomWorker q1 q2 q3

SpawnWorker

New in version 2.2.0.

The SpawnWorker uses os.spawn() instead of os.fork() to run jobs. This is useful in environments where fork() is not available, like Windows or newer versions of MacOS.

Usage:

python
from rq import SpawnWorker, Queue

queue = Queue('default')
worker = SpawnWorker([queue])
worker.work()

Or via CLI:

bash
rq worker -w rq.worker.SpawnWorker

The main differences between these workers are:

  • SimpleWorker: No process forking, simpler but less isolated execution. Also no periodic heartbeats during job executions.
  • RoundRobinWorker: Fair scheduling across queues
  • RandomWorker: Random queue selection for load balancing
  • SpawnWorker: Windows compatibility using spawn() instead of fork()

Choose the appropriate worker type based on your needs for job isolation, queue scheduling, and platform compatibility.

Custom Job and Queue Classes

You can tell the worker to use a custom class for jobs and queues using --job-class and/or --queue-class.

console
$ rq worker --job-class 'custom.JobClass' --queue-class 'custom.QueueClass'

Don't forget to use those same classes when enqueueing the jobs.

For example:

python
from rq import Queue
from rq.job import Job

class CustomJob(Job):
    pass

class CustomQueue(Queue):
    job_class = CustomJob

queue = CustomQueue('default', connection=redis_conn)
queue.enqueue(some_func)

Custom DeathPenalty Classes

When a Job times-out, the worker will try to kill it using the supplied death_penalty_class (default: UnixSignalDeathPenalty). This can be overridden if you wish to attempt to kill jobs in an application specific or 'cleaner' manner.

DeathPenalty classes are constructed with the following arguments BaseDeathPenalty(timeout, JobTimeoutException, job_id=job.id)

Custom Exception Handlers

If you need to handle errors differently for different types of jobs, or simply want to customize RQ's default error handling behavior, run rq worker using the --exception-handler option:

console
$ rq worker --exception-handler 'path.to.my.ErrorHandler'

# Multiple exception handlers is also supported
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler'

If you want to disable RQ's default exception handler, use the --disable-default-exception-handler option:

console
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --disable-default-exception-handler

Sending Commands to Worker

New in version 1.6.0.

Starting in version 1.6.0, workers use Redis' pubsub mechanism to listen to external commands while they're working. Two commands are currently implemented:

Shutting Down a Worker

send_shutdown_command() instructs a worker to shutdown. This is similar to sending a SIGINT signal to a worker.

python
from redis import Redis
from rq.command import send_shutdown_command
from rq.worker import Worker

redis = Redis()

workers = Worker.all(redis)
for worker in workers:
   send_shutdown_command(redis, worker.name)  # Tells worker to shutdown

Killing a Horse

send_kill_horse_command() tells a worker to cancel a currently executing job. If worker is not currently working, this command will be ignored.

python
from redis import Redis
from rq.command import send_kill_horse_command
from rq.worker import Worker, WorkerStatus

redis = Redis()

workers = Worker.all(redis)
for worker in workers:
   if worker.state == WorkerStatus.BUSY:
      send_kill_horse_command(redis, worker.name)

Stopping a Job

New in version 1.7.0.

You can use send_stop_job_command() to tell a worker to immediately stop a currently executing job. A job that's stopped will be sent to FailedJobRegistry.

python
from redis import Redis
from rq.command import send_stop_job_command

redis = Redis()

# This will raise an exception if job is invalid or not currently executing
send_stop_job_command(redis, job_id)

Worker Pool

New in version 1.14.0.

WorkerPool allows you to run multiple workers in a single CLI command.

Usage:

shell
rq worker-pool high default low -n 3

Options:

  • -u or --url <Redis connection URL>: as defined in redis-py's docs.
  • -w or --worker-class <path.to.Worker>: defaults to rq.worker.Worker. rq.worker.SimpleWorker is also an option.
  • -n or --num-workers <number of worker>: defaults to 2.
  • -b or --burst: run workers in burst mode (stops after all jobs in queue have been processed).
  • -l or --logging-level <level>: defaults to INFO. DEBUG, WARNING, ERROR and CRITICAL are supported.
  • -S or --serializer <serializer>: accepts the shorthand json or pickle, or a dotted import path. Defaults to rq.serializers.PickleSerializer.
  • -P or --path <path>: multiple import paths are supported (e.g rq worker --path foo --path bar).
  • -j or --job-class <path.to.Job>: defaults to rq.job.Job.