/[drupal]/drupal/modules/system/system.queue.inc
ViewVC logotype

Contents of /drupal/modules/system/system.queue.inc

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.7 - (show annotations) (download) (as text)
Sat Oct 31 13:53:48 2009 UTC (4 weeks ago) by dries
Branch: MAIN
CVS Tags: DRUPAL-7-0-UNSTABLE-10, HEAD
Changes since 1.6: +1 -10 lines
File MIME type: text/x-php
- Patch #602306 by alex_b: removed some unused code.
1 <?php
2 // $Id: system.queue.inc,v 1.6 2009/10/14 10:59:15 dries Exp $
3
4 /**
5 * @file
6 * Queue functionality.
7 */
8
9 /**
10 * @defgroup queue Queue operations
11 * @{
12 * The queue system allows placing items in a queue and processing them later.
13 * The system tries to ensure that only one consumer can process an item.
14 *
15 * Before a queue can be used it needs to be created by
16 * DrupalQueueInterface::createQueue().
17 *
18 * Items can be added to the queue by passing an arbitrary data object to
19 * DrupalQueueInterface::createItem().
20 *
21 * To process an item, call DrupalQueueInterface::claimItem() and specify how
22 * long you want to have a lease for working on that item. When finished
23 * processing, the item needs to be deleted by calling
24 * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
25 * made available again by the DrapalQueueInterface implementation once the
26 * lease expires. Another consumer will then be able to receive it when calling
27 * DrupalQueueInterface::claimItem().
28 *
29 * The $item object used by the DrupalQueueInterface can contain arbitrary
30 * metadata depending on the implementation. Systems using the interface should
31 * only rely on the data property which will contain the information passed to
32 * DrupalQueueInterface::createItem(). The full queue item returned by
33 * DrupalQueueInterface::claimItem() needs to be passed to
34 * DrupalQueueInterface::deleteItem() once processing is completed.
35 *
36 * While the queue system makes a best effort to preserve order in messages,
37 * due to the pluggable nature of the queue, there is no guarantee that items
38 * will be delivered on claim in the order they were sent. For example, some
39 * implementations like beanstalkd or others with distributed back-ends like
40 * Amazon SQS will be managing jobs for a large set of producers and consumers
41 * where a strict FIFO ordering will likely not be preserved.
42 *
43 * The system also makes no guarantees about a task only being executed once:
44 * callers that have non-idempotent tasks either need to live with the
45 * possiblity of the task being invoked multiple times in cases where a claim
46 * lease expires, or need to implement their own transactions to make their
47 * tasks idempotent.
48 */
49
50 /**
51 * Factory class for interacting with queues.
52 */
53 class DrupalQueue {
54 /**
55 * Get a queue object for a given name.
56 *
57 * @param $name
58 * Arbitrary string. The name of the queue to work with.
59 * @return
60 * The queue object for a given name.
61 */
62 public static function get($name) {
63 static $queues;
64 if (!isset($queues[$name])) {
65 $class = variable_get('queue_module_' . $name, 'System') . 'Queue';
66 $queues[$name] = new $class($name);
67 }
68 return $queues[$name];
69 }
70 }
71
72 interface DrupalQueueInterface {
73 /**
74 * Start working with a queue.
75 *
76 * @param $name
77 * Arbitrary string. The name of the queue to work with.
78 */
79 public function __construct($name);
80
81 /**
82 * Add a queue item and store it directly to the queue.
83 *
84 * @param $data
85 * Arbitrary data to be associated with the new task in the queue.
86 * @return
87 * TRUE if the item was successfully created and was (best effort) added
88 * to the queue, otherwise FALSE. We don't guarantee the item was
89 * committed to disk, that your disk wasn't hit by a meteor, etc, but as
90 * far as we know, the item is now in the queue.
91 */
92 public function createItem($data);
93
94 /**
95 * Retrieve the number of items in the queue.
96 *
97 * This is intended to provide a "best guess" count of the number of items in
98 * the queue. Depending on the implementation and the setup, the accuracy of
99 * the results of this function may vary.
100 *
101 * e.g. On a busy system with a large number of consumers and items, the
102 * result might only be valid for a fraction of a second and not provide an
103 * accurate representation.
104 *
105 * @return
106 * An integer estimate of the number of items in the queue.
107 */
108 public function numberOfItems();
109
110 /**
111 * Claim an item in the queue for processing.
112 *
113 * @param $lease_time
114 * How long the processing is expected to take in seconds, defaults to an
115 * hour. After this lease expires, the item will be reset and another
116 * consumer can claim the item. For idempotent tasks (which can be run
117 * multiple times without side effects), shorter lease times would result
118 * in lower latency in case a consumer fails. For tasks that should not be
119 * run more than once (non-idempotent), a larger lease time will make it
120 * more rare for a given task to run multiple times in cases of failure,
121 * at the cost of higher latency.
122 * @return
123 * On success we return an item object. If the queue is unable to claim an
124 * item it returns false. This implies a best effort to retrieve an item
125 * and either the queue is empty or there is some other non-recoverable
126 * problem.
127 */
128 public function claimItem($lease_time = 3600);
129
130 /**
131 * Delete a finished item from the queue.
132 *
133 * @param $item
134 * The item returned by DrupalQueueInterface::claimItem().
135 */
136 public function deleteItem($item);
137
138 /**
139 * Create a queue.
140 *
141 * Called during installation and should be used to perform any necessary
142 * initialization operations. This should not be confused with the
143 * constructor for these objects, which is called every time an object is
144 * instantiated to operate on a queue. This operation is only needed the
145 * first time a given queue is going to be initialized (for example, to make
146 * a new database table or directory to hold tasks for the queue -- it
147 * depends on the queue implementation if this is necessary at all).
148 */
149 public function createQueue();
150
151 /**
152 * Delete a queue and every item in the queue.
153 */
154 public function deleteQueue();
155 }
156
157 /**
158 * Default queue implementation.
159 */
160 class SystemQueue implements DrupalQueueInterface {
161 /**
162 * The name of the queue this instance is working with.
163 *
164 * @var string
165 */
166 protected $name;
167
168 public function __construct($name) {
169 $this->name = $name;
170 }
171
172 public function createItem($data) {
173 $record = new stdClass();
174 $record->name = $this->name;
175 $record->data = $data;
176 // We cannot rely on REQUEST_TIME because many items might be created by a
177 // single request which takes longer than 1 second.
178 $record->created = time();
179 return drupal_write_record('queue', $record) !== FALSE;
180 }
181
182 public function numberOfItems() {
183 return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
184 }
185
186 public function claimItem($lease_time = 30) {
187 // Claim an item by updating its expire fields. If claim is not successful
188 // another thread may have claimed the item in the meantime. Therefore loop
189 // until an item is successfully claimed or we are reasonably sure there
190 // are no unclaimed items left.
191 while (TRUE) {
192 $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
193 if ($item) {
194 // Try to update the item. Only one thread can succeed in UPDATEing the
195 // same row. We cannot rely on REQUEST_TIME because items might be
196 // claimed by a single consumer which runs longer than 1 second. If we
197 // continue to use REQUEST_TIME instead of the current time(), we steal
198 // time from the lease, and will tend to reset items before the lease
199 // should really expire.
200 $update = db_update('queue')
201 ->fields(array(
202 'expire' => time() + $lease_time,
203 ))
204 ->condition('item_id', $item->item_id)
205 ->condition('expire', 0);
206 // If there are affected rows, this update succeeded.
207 if ($update->execute()) {
208 $item->data = unserialize($item->data);
209 return $item;
210 }
211 }
212 else {
213 // No items currently available to claim.
214 return FALSE;
215 }
216 }
217 }
218
219 public function deleteItem($item) {
220 db_delete('queue')
221 ->condition('item_id', $item->item_id)
222 ->execute();
223 }
224
225 public function createQueue() {
226 // All tasks are stored in a single database table (which is created when
227 // Drupal is first installed) so there is nothing we need to do to create
228 // a new queue.
229 }
230
231 public function deleteQueue() {
232 db_delete('queue')
233 ->condition('name', $this->name)
234 ->execute();
235 }
236 }
237
238 /**
239 * @} End of "defgroup queue".
240 */

  ViewVC Help
Powered by ViewVC 1.1.2