3 class RedisQueue implements DrupalQueueInterface
{
4 // The name of the queue that holds available items.
6 // The queue that holds claimed items.
8 // The PhpRedis object which connects to the redis server.
11 protected
$reserve_timeout;
16 * Start working with a queue.
19 * Arbitrary string. The name of the queue to work with.
21 public
function __construct($name) {
23 $this->avail
= 'drupal:queue:' .
$name .
':avail';
24 $this->claimed
= 'drupal:queue:' .
$name .
':claimed';
25 $this->lease
= $this->claimed .
'_lease:';
26 $this->redis
= new
Redis();
27 $options = self
::getOptions($name);
28 $this->reserve_timeout
= $options['reserve_timeout'];
29 $this->redis
->connect($options['host'], $options['port']);
32 static
function getOptions($name) {
33 $options = variable_get('redis_queue_' .
$name, array());
34 $defaults = variable_get('redis_default_queue', array()) + array(
35 'host' => variable_get('redis_client_host', '127.0.0.1'),
36 'port' => variable_get('redis_client_port', 6379),
37 'reserve_timeout' => NULL
,
39 $options += $defaults;
45 * Add a queue item and store it directly to the queue.
48 * Arbitrary data to be associated with the new task in the queue.
50 * TRUE if the item was successfully created and was (best effort) added
51 * to the queue, otherwise FALSE. We don't guarantee the item was
52 * committed to disk, that your disk wasn't hit by a meteor, etc, but as
53 * far as we know, the item is now in the queue.
55 public
function createItem($data) {
56 $record = new
stdClass();
57 $record->data
= $data;
58 $record->qid
= $this->incrementId();
59 // We cannot rely on REQUEST_TIME because many items might be created
60 // by a single request which takes longer than 1 second.
61 $record->timestamp
= time();
62 $result = $this->redis
->multi()
63 ->hsetnx($this->avail .
'_hash', $record->qid
, serialize($record))
65 ->lpush($this->avail
, $record->qid
)->exec();
66 return $result[0] && $result[2] > $result[1];
69 protected
function incrementId() {
70 return $this->redis
->incr($this->avail .
'_counter');
74 * Retrieve the number of items in the queue.
76 * This is intended to provide a "best guess" count of the number of items in
77 * the queue. Depending on the implementation and the setup, the accuracy of
78 * the results of this function may vary.
80 * e.g. On a busy system with a large number of consumers and items, the
81 * result might only be valid for a fraction of a second and not provide an
82 * accurate representation.
85 * An integer estimate of the number of items in the queue.
87 public
function numberOfItems() {
88 return $this->redis
->lLen($this->avail
);
92 * Claim an item in the queue for processing.
95 * How long the processing is expected to take in seconds, defaults to an
96 * hour. After this lease expires, the item will be reset and another
97 * consumer can claim the item. For idempotent tasks (which can be run
98 * multiple times without side effects), shorter lease times would result
99 * in lower latency in case a consumer fails. For tasks that should not be
100 * run more than once (non-idempotent), a larger lease time will make it
101 * more rare for a given task to run multiple times in cases of failure,
102 * at the cost of higher latency.
104 * On success we return an item object. If the queue is unable to claim an
105 * item it returns false. This implies a best effort to retrieve an item
106 * and either the queue is empty or there is some other non-recoverable
109 public
function claimItem($lease_time = 30) {
111 if (is_numeric($this->reserve_timeout
)) {
112 $item = $this->claimItemBlocking($lease_time);
115 $qid = $this->redis
->rpoplpush($this->avail
, $this->claimed
);
117 $job = $this->redis
->hget($this->avail .
'_hash', $qid);
119 $item = unserialize($job);
120 $this->redis
->setex($this->lease .
$item->qid
, $lease_time, '1');
128 * A blocking version of claimItem to be used with long-running queue workers
129 * like waiting_queue.
131 public
function claimItemBlocking($lease_time) {
133 $qid = $this->redis
->brpoplpush($this->avail
, $this->claimed
, $this->reserve_timeout
);
135 $job = $this->redis
->hget($this->avail .
'_hash', $qid);
137 $item = unserialize($job);
138 $this->redis
->setex($this->lease .
$item->qid
, $lease_time, '1');
145 * Delete a finished item from the queue.
148 * The item returned by DrupalQueueInterface::claimItem().
150 public
function deleteItem($item) {
151 $this->redis
->multi()
152 ->lrem($this->claimed
, $item->qid
, -1)
153 ->hdel($this->avail .
'_hash', $item->qid
)->exec();
156 public
function createQueue() {}
159 * Delete a queue and every item in the queue.
161 public
function deleteQueue() {
162 $this->redis
->del($this->claimed
, $this->avail
, $this->avail .
'_hash', $this->avail .
'_counter');
163 foreach ($this->redis
->keys($this->lease .
'*') as
$key) {
164 $this->redis
->del($key);
168 public
function releaseItem($item) {
169 $this->redis
->multi()
170 ->lrem($this->claimed
, $item->qid
, -1)
171 ->lpush($this->avail
, $item->qid
)->exec();
175 * Expire claims in this queue.
177 * @return number of items returned to available.
179 public
function expire() {
180 return $this->_expireArbitrary($this->claimed
, $this->lease
, $this->avail
);
184 * Helper function to expire claims using arbitrary queue keys.
186 * @return number of items returned to available.
188 protected
function _expireArbitrary($claimed, $lease, $avail) {
190 foreach ($this->redis
->lrange($claimed, 0, -1) as
$qid) {
191 if (!$this->redis
->exists($lease .
$qid)) {
192 // The lease expired for this ID.
193 $this->redis
->multi()
194 ->lrem($claimed, $qid, -1)
195 ->lpush($avail, $qid)->exec();
203 * Expire claims in all queues.
205 * @return number of items returned to available.
207 public
function expireAll() {
209 foreach ($this->redis
->keys("drupal:queue:*:claimed") as
$claimed) {
210 $lease = $claimed .
'_lease:';
211 $avail = preg_replace('/^(drupal:queue:.*):claimed$/', '$1:avail', $claimed);
212 $expired += $this->_expireArbitrary($claimed, $lease, $avail);
218 * Dumps items in the queue.
221 * An array of queue items.
223 public
function dump() {
224 return $this->redis
->hgetall($this->avail .
'_hash');
228 * Dump currently claimed queue items.
231 * An array of queue IDs.
233 public
function dumpClaimed() {
234 return $this->redis
->lrange($this->claimed
, 0, -1);