Easy Python job queues with RQ

Job queueing is an important consideration for a web application, especially one that needs to play nice and share resources with other web applications. There are lots of options out there with varying levels of complexity and power, but for a simple pure Python job queue that just works, RQ is quick and easy to get up and running.

RQ is a Python job queueing package designed to work out of the box, using a Redis database as a message broker (the bit that allows the app and workers to exchange information about jobs). To use it, you just need a redis-server installation and the rq module in your python environment.

Installation

You can either build redis-server from source, or install it using your package manager.

From source:

wget https://download.redis.io/redis-stable.tar.gz
tar -xzvf redis-stable.tar.gz
cd redis-stable
make

Or, to install binaries in /usr/local/bin:

sudo make install

Using apt on Debian or Ubuntu:

curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg

echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list

sudo apt-get update
sudo apt-get install redis

To start redis-server, simply run:

redis-server

To install rq, just use pip:

pip install rq

Set up a job queue

To run a job, we just need a function to define the work to be done:

def do_work():
    return 'Productive work'

With redis-server running, we connect to the database and set up a queue. By default, redis-server runs on port 6379, but you can specify a different port either as a command-line option or in the redis config file. You can also have multiple redis instances running on different ports or hosts if you want to split work across multiple connections.

from rq import Queue
from redis import Redis

from my_module import do_work

conn = Redis('localhost', 6379)
queue = Queue(connection=conn)

We also need some workers listening on the queue. You can start these using RQ’s command-line interface, or by writing your own worker script. The latter has the advantage of allowing you to pre-import modules required for the jobs the workers will do, which can save a lot of work if imports are a significant proportion of the execution time of your jobs:

from redis import Redis
from rq import Worker, Queue, Connection

from my_module import do_work

conn = Redis('localhost', 6379)
queue = Queue(connection=conn)

worker = Worker([queue], connection=redis)
worker.work()

You can use your worker script to start as many workers as you need.

With redis-server running and workers listening on our queue, we’re ready to add some tasks. Once enqueued, we can check the status of our job and handle results and errors as needed. Here’s a silly toy example:

from rq import Queue
from redis import Redis
from time import sleep

from my_module import do_work

conn = Redis('localhost', 6379)
queue = Queue(connection=conn)

job = queue.enqueue(do_work)

while True:
    if job.is_finished:
        print(job.return_value())
        break
    elif job.is_failed():
        print('A very helpful error message')
        break
    else:
        sleep(5)

A more practical way of tracking jobs is to use the RQ job registries to check the status of your jobs. For example,

from redis import Redis
from rq import Queue
from rq.job import Job
from redis import Redis
from time import sleep

conn = Redis('localhost', 6379)
queue = Queue(connection=conn)
finished = queue.finished_job_registry

while True:
    finished = queue.finished_job_registry
    job_ids = finished_job_registry.get_job_ids()

    if not job_ids:
        print('Nothing to do, checking again in 60 seconds')
        sleep(60)
        continue

    jobs = Job.fetch_many(job_ids, connection=conn)
    for job in jobs:
        print(f'Job finished: {job.id} {job.func_name}')
        print(job.return_value())

If you need to keep track of specific jobs, you can assign a custom job id on creation and fetch your job at a later date:

job = queue.enqueue(do_work, job_id='my-very-important-job')
sleep(60)
job = Job.fetch('my-very-important-job')

For what is intended to be a simple, low barrier to entry library, there’s a lot you can do with rq. That said, the documentation is… lacking. If there isn’t an example or pattern in the docs that does what you need, you’ll probably end up scouring the source to figure out what options are available to you or what an object looks like. If you just need a basic job queue for a python application that works out of the box with minimal dependencies, RQ might just be the tool for you. If, however, you find yourself wanting to do more, or you want to use a different message broker, you may find Celery more useful. I’ll probably write about it in the future, once I find something I can’t do with RQ.

Author