ResQ Classes

class pyres.ResQ(server='localhost:6379', password=None)

The ResQ class defines the Redis server object to which we will enqueue jobs into various queues.

The __init__ takes these keyword arguments:

server – IP address and port of the Redis server to which you want to connect. Default is localhost:6379.

password – The password, if required, of your Redis server. Default is “None”.

timeout – The timeout keyword is in the signature, but is unused. Default is “None”.

retry_connection – This keyword is in the signature but is deprecated. Default is “True”.

Both timeout and retry_connection will be removed as the python-redis client no longer uses them.

Example usage:

>>> from pyres import *
>>> r = ResQ(server="192.168.1.10:6379", password="some_pwd")
    # Assuming redis is running on default port with no password

r is a resque object on which we can enqueue tasks.:

>>>> r.enqueue(SomeClass, args)

SomeClass can be any python class with a perform method and a queue attribute on it.

close()

Close the underlying redis connection.

enqueue(klass, *args)

Enqueue a job into a specific queue. Make sure the class you are passing has queue attribute and a perform method on it.

info()

Returns a dictionary of the current status of the pending jobs, processed, no. of queues, no. of workers, no. of failed jobs.

Job Classes

class pyres.job.Job(queue, payload, resq, worker=None)

Every job on the ResQ is an instance of the Job class.

The __init__ takes these keyword arguments:

queue – A string defining the queue to which this Job will be
added.
payload – A dictionary which contains the string name of a class
which extends this Job and a list of args which will be passed to that class.

resq – An instance of the ResQ class.

worker – The name of a specific worker if you’d like this Job to be
done by that worker. Default is “None”.
fail(exception)

This method provides a way to fail a job and will use whatever failure backend you’ve provided. The default is the RedisBackend.

perform()

This method converts payload into args and calls the perform method on the payload class.

Before calling perform, a before_perform class method is called, if it exists. It takes a dictionary as an argument; currently the only things stored on the dictionary are the args passed into perform and a timestamp of when the job was enqueued.

Similarly, an after_perform class method is called after perform is finished. The metadata dictionary contains the same data, plus a timestamp of when the job was performed, a failed boolean value, and if it did fail, a retried boolean value. This method is called after retry, and is called regardless of whether an exception is ultimately thrown by the perform method.

classmethod reserve(queues, res, worker=None, timeout=10)

Reserve a job on one of the queues. This marks this job so that other workers will not pick it up.

retry(payload_class, args)

This method provides a way to retry a job after a failure. If the jobclass defined by the payload containes a retry_every attribute then pyres will attempt to retry the job until successful or until timeout defined by retry_timeout on the payload class.

static safe_str_to_class(s)

Helper function to map string class names to module classes.

Worker Classes

class pyres.worker.Worker(queues=(), server='localhost:6379', password=None, timeout=None)

Defines a worker. The pyres_worker script instantiates this Worker class and passes a comma-separated list of queues to listen on.:

>>> from pyres.worker import Worker
>>> Worker.run([queue1, queue2], server="localhost:6379")
after_fork(job)

hook for making changes immediately after forking to process a job

before_fork(job)

hook for making changes immediately before forking to process a job

fork_worker(job)

Invoked by work method. fork_worker does the actual forking to create the child process that will process the job. It’s also responsible for monitoring the child process and handling hangs and crashes.

Finally, the process method actually processes the job by eventually calling the Job instance’s perform method.

job_class

alias of Job

validate_queues()

Checks if a worker is given at least one queue to work on.

work(interval=5)

Invoked by run method. work listens on a list of queues and sleeps for interval time.

interval – Number of seconds the worker will wait until processing the next job. Default is “5”.

Whenever a worker finds a job on the queue it first calls reserve on that job to make sure another worker won’t run it, then forks itself to work on that job.

worker_pids()

Returns an array of all pids (as strings) of the workers on this machine. Used when pruning dead workers.

Failure Classes

class pyres.failure.base.BaseBackend(exp, queue, payload, worker=None)

Provides a base class that custom backends can subclass. Also provides basic traceback and message parsing.

The __init__ takes these keyword arguments:

exp – The exception generated by your failure.

queue – The queue in which the Job was enqueued when it failed.

payload – The payload that was passed to the Job.

worker – The worker that was processing the Job when it failed.

class pyres.failure.RedisBackend(exp, queue, payload, worker=None)

Extends the BaseBackend to provide a Redis backend for failed jobs.

save(resq=None)

Saves the failed Job into a “failed” Redis queue preserving all its original enqueud info.

Table Of Contents

Related Topics

This Page