Issue #1374000 by pwolanin, rb2k, msonnabaum: Use rpoplpush or brpoplpush for reliabl...
[project/redis_queue.git] / redis.queue.inc
1 <?php
2
3 class RedisQueue implements DrupalQueueInterface {
4 // The name of the queue that holds available items.
5 protected $avail;
6 // The queue that holds claimed items.
7 protected $claimed;
8 // The PhpRedis object which connects to the redis server.
9 protected $redis;
10
11 protected $reserve_timeout;
12
13 protected $name;
14
15 /**
16 * Start working with a queue.
17 *
18 * @param $name
19 * Arbitrary string. The name of the queue to work with.
20 */
21 public function __construct($name) {
22 $this->name = $name;
23 $this->avail = 'drupal:queue:' . $name . ':avail';
24 $this->claimed = 'drupal:queue:' . $name . ':claimed';
25 $this->lease = $this->claimed . '_lease:';
26 $this->redis = new Redis();
27 $options = self::getOptions($name);
28 $this->reserve_timeout = $options['reserve_timeout'];
29 $this->redis->connect($options['host'], $options['port']);
30 }
31
32 static function getOptions($name) {
33 $options = variable_get('redis_queue_' . $name, array());
34 $defaults = variable_get('redis_default_queue', array()) + array(
35 'host' => variable_get('redis_client_host', '127.0.0.1'),
36 'port' => variable_get('redis_client_port', 6379),
37 'reserve_timeout' => NULL,
38 );
39 $options += $defaults;
40
41 return $options;
42 }
43
44 /**
45 * Add a queue item and store it directly to the queue.
46 *
47 * @param $data
48 * Arbitrary data to be associated with the new task in the queue.
49 * @return
50 * TRUE if the item was successfully created and was (best effort) added
51 * to the queue, otherwise FALSE. We don't guarantee the item was
52 * committed to disk, that your disk wasn't hit by a meteor, etc, but as
53 * far as we know, the item is now in the queue.
54 */
55 public function createItem($data) {
56 $record = new stdClass();
57 $record->data = $data;
58 $record->qid = $this->incrementId();
59 // We cannot rely on REQUEST_TIME because many items might be created
60 // by a single request which takes longer than 1 second.
61 $record->timestamp = time();
62 $result = $this->redis->multi()
63 ->hsetnx($this->avail . '_hash', $record->qid, serialize($record))
64 ->llen($this->avail)
65 ->lpush($this->avail, $record->qid)->exec();
66 return $result[0] && $result[2] > $result[1];
67 }
68
69 protected function incrementId() {
70 return $this->redis->incr($this->avail . '_counter');
71 }
72
73 /**
74 * Retrieve the number of items in the queue.
75 *
76 * This is intended to provide a "best guess" count of the number of items in
77 * the queue. Depending on the implementation and the setup, the accuracy of
78 * the results of this function may vary.
79 *
80 * e.g. On a busy system with a large number of consumers and items, the
81 * result might only be valid for a fraction of a second and not provide an
82 * accurate representation.
83 *
84 * @return
85 * An integer estimate of the number of items in the queue.
86 */
87 public function numberOfItems() {
88 return $this->redis->lLen($this->avail);
89 }
90
91 /**
92 * Claim an item in the queue for processing.
93 *
94 * @param $lease_time
95 * How long the processing is expected to take in seconds, defaults to an
96 * hour. After this lease expires, the item will be reset and another
97 * consumer can claim the item. For idempotent tasks (which can be run
98 * multiple times without side effects), shorter lease times would result
99 * in lower latency in case a consumer fails. For tasks that should not be
100 * run more than once (non-idempotent), a larger lease time will make it
101 * more rare for a given task to run multiple times in cases of failure,
102 * at the cost of higher latency.
103 * @return
104 * On success we return an item object. If the queue is unable to claim an
105 * item it returns false. This implies a best effort to retrieve an item
106 * and either the queue is empty or there is some other non-recoverable
107 * problem.
108 */
109 public function claimItem($lease_time = 30) {
110 $item = FALSE;
111 if (is_numeric($this->reserve_timeout)) {
112 $item = $this->claimItemBlocking($lease_time);
113 }
114 else {
115 $qid = $this->redis->rpoplpush($this->avail, $this->claimed);
116 if ($qid) {
117 $job = $this->redis->hget($this->avail . '_hash', $qid);
118 if ($job) {
119 $item = unserialize($job);
120 $this->redis->setex($this->lease . $item->qid, $lease_time, '1');
121 }
122 }
123 }
124 return $item;
125 }
126
127 /**
128 * A blocking version of claimItem to be used with long-running queue workers
129 * like waiting_queue.
130 */
131 public function claimItemBlocking($lease_time) {
132 $item = FALSE;
133 $qid = $this->redis->brpoplpush($this->avail, $this->claimed, $this->reserve_timeout);
134 if ($qid) {
135 $job = $this->redis->hget($this->avail . '_hash', $qid);
136 if ($job) {
137 $item = unserialize($job);
138 $this->redis->setex($this->lease . $item->qid, $lease_time, '1');
139 }
140 }
141 return $item;
142 }
143
144 /**
145 * Delete a finished item from the queue.
146 *
147 * @param $item
148 * The item returned by DrupalQueueInterface::claimItem().
149 */
150 public function deleteItem($item) {
151 $this->redis->multi()
152 ->lrem($this->claimed, $item->qid, -1)
153 ->hdel($this->avail . '_hash', $item->qid)->exec();
154 }
155
156 public function createQueue() {}
157
158 /**
159 * Delete a queue and every item in the queue.
160 */
161 public function deleteQueue() {
162 $this->redis->del($this->claimed, $this->avail, $this->avail . '_hash', $this->avail . '_counter');
163 foreach ($this->redis->keys($this->lease . '*') as $key) {
164 $this->redis->del($key);
165 }
166 }
167
168 public function releaseItem($item) {
169 $this->redis->multi()
170 ->lrem($this->claimed, $item->qid, -1)
171 ->lpush($this->avail, $item->qid)->exec();
172 }
173
174 /**
175 * Expire claims in this queue.
176 *
177 * @return number of items returned to available.
178 */
179 public function expire() {
180 return $this->_expireArbitrary($this->claimed, $this->lease, $this->avail);
181 }
182
183 /**
184 * Helper function to expire claims using arbitrary queue keys.
185 *
186 * @return number of items returned to available.
187 */
188 protected function _expireArbitrary($claimed, $lease, $avail) {
189 $expired = 0;
190 foreach ($this->redis->lrange($claimed, 0, -1) as $qid) {
191 if (!$this->redis->exists($lease . $qid)) {
192 // The lease expired for this ID.
193 $this->redis->multi()
194 ->lrem($claimed, $qid, -1)
195 ->lpush($avail, $qid)->exec();
196 $expired++;
197 }
198 }
199 return $expired;
200 }
201
202 /**
203 * Expire claims in all queues.
204 *
205 * @return number of items returned to available.
206 */
207 public function expireAll() {
208 $expired = 0;
209 foreach ($this->redis->keys("drupal:queue:*:claimed") as $claimed) {
210 $lease = $claimed . '_lease:';
211 $avail = preg_replace('/^(drupal:queue:.*):claimed$/', '$1:avail', $claimed);
212 $expired += $this->_expireArbitrary($claimed, $lease, $avail);
213 }
214 return $expired;
215 }
216
217 /**
218 * Dumps items in the queue.
219 *
220 * @return
221 * An array of queue items.
222 */
223 public function dump() {
224 return $this->redis->hgetall($this->avail . '_hash');
225 }
226
227 /**
228 * Dump currently claimed queue items.
229 *
230 * @return
231 * An array of queue IDs.
232 */
233 public function dumpClaimed() {
234 return $this->redis->lrange($this->claimed, 0, -1);
235 }
236 }