| 1 |
<?php
|
| 2 |
// $Id: system.queue.inc,v 1.6 2009/10/14 10:59:15 dries Exp $
|
| 3 |
|
| 4 |
/**
|
| 5 |
* @file
|
| 6 |
* Queue functionality.
|
| 7 |
*/
|
| 8 |
|
| 9 |
/**
|
| 10 |
* @defgroup queue Queue operations
|
| 11 |
* @{
|
| 12 |
* The queue system allows placing items in a queue and processing them later.
|
| 13 |
* The system tries to ensure that only one consumer can process an item.
|
| 14 |
*
|
| 15 |
* Before a queue can be used it needs to be created by
|
| 16 |
* DrupalQueueInterface::createQueue().
|
| 17 |
*
|
| 18 |
* Items can be added to the queue by passing an arbitrary data object to
|
| 19 |
* DrupalQueueInterface::createItem().
|
| 20 |
*
|
| 21 |
* To process an item, call DrupalQueueInterface::claimItem() and specify how
|
| 22 |
* long you want to have a lease for working on that item. When finished
|
| 23 |
* processing, the item needs to be deleted by calling
|
| 24 |
* DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
|
| 25 |
* made available again by the DrapalQueueInterface implementation once the
|
| 26 |
* lease expires. Another consumer will then be able to receive it when calling
|
| 27 |
* DrupalQueueInterface::claimItem().
|
| 28 |
*
|
| 29 |
* The $item object used by the DrupalQueueInterface can contain arbitrary
|
| 30 |
* metadata depending on the implementation. Systems using the interface should
|
| 31 |
* only rely on the data property which will contain the information passed to
|
| 32 |
* DrupalQueueInterface::createItem(). The full queue item returned by
|
| 33 |
* DrupalQueueInterface::claimItem() needs to be passed to
|
| 34 |
* DrupalQueueInterface::deleteItem() once processing is completed.
|
| 35 |
*
|
| 36 |
* While the queue system makes a best effort to preserve order in messages,
|
| 37 |
* due to the pluggable nature of the queue, there is no guarantee that items
|
| 38 |
* will be delivered on claim in the order they were sent. For example, some
|
| 39 |
* implementations like beanstalkd or others with distributed back-ends like
|
| 40 |
* Amazon SQS will be managing jobs for a large set of producers and consumers
|
| 41 |
* where a strict FIFO ordering will likely not be preserved.
|
| 42 |
*
|
| 43 |
* The system also makes no guarantees about a task only being executed once:
|
| 44 |
* callers that have non-idempotent tasks either need to live with the
|
| 45 |
* possiblity of the task being invoked multiple times in cases where a claim
|
| 46 |
* lease expires, or need to implement their own transactions to make their
|
| 47 |
* tasks idempotent.
|
| 48 |
*/
|
| 49 |
|
| 50 |
/**
|
| 51 |
* Factory class for interacting with queues.
|
| 52 |
*/
|
| 53 |
class DrupalQueue {
|
| 54 |
/**
|
| 55 |
* Get a queue object for a given name.
|
| 56 |
*
|
| 57 |
* @param $name
|
| 58 |
* Arbitrary string. The name of the queue to work with.
|
| 59 |
* @return
|
| 60 |
* The queue object for a given name.
|
| 61 |
*/
|
| 62 |
public static function get($name) {
|
| 63 |
static $queues;
|
| 64 |
if (!isset($queues[$name])) {
|
| 65 |
$class = variable_get('queue_module_' . $name, 'System') . 'Queue';
|
| 66 |
$queues[$name] = new $class($name);
|
| 67 |
}
|
| 68 |
return $queues[$name];
|
| 69 |
}
|
| 70 |
}
|
| 71 |
|
| 72 |
interface DrupalQueueInterface {
|
| 73 |
/**
|
| 74 |
* Start working with a queue.
|
| 75 |
*
|
| 76 |
* @param $name
|
| 77 |
* Arbitrary string. The name of the queue to work with.
|
| 78 |
*/
|
| 79 |
public function __construct($name);
|
| 80 |
|
| 81 |
/**
|
| 82 |
* Add a queue item and store it directly to the queue.
|
| 83 |
*
|
| 84 |
* @param $data
|
| 85 |
* Arbitrary data to be associated with the new task in the queue.
|
| 86 |
* @return
|
| 87 |
* TRUE if the item was successfully created and was (best effort) added
|
| 88 |
* to the queue, otherwise FALSE. We don't guarantee the item was
|
| 89 |
* committed to disk, that your disk wasn't hit by a meteor, etc, but as
|
| 90 |
* far as we know, the item is now in the queue.
|
| 91 |
*/
|
| 92 |
public function createItem($data);
|
| 93 |
|
| 94 |
/**
|
| 95 |
* Retrieve the number of items in the queue.
|
| 96 |
*
|
| 97 |
* This is intended to provide a "best guess" count of the number of items in
|
| 98 |
* the queue. Depending on the implementation and the setup, the accuracy of
|
| 99 |
* the results of this function may vary.
|
| 100 |
*
|
| 101 |
* e.g. On a busy system with a large number of consumers and items, the
|
| 102 |
* result might only be valid for a fraction of a second and not provide an
|
| 103 |
* accurate representation.
|
| 104 |
*
|
| 105 |
* @return
|
| 106 |
* An integer estimate of the number of items in the queue.
|
| 107 |
*/
|
| 108 |
public function numberOfItems();
|
| 109 |
|
| 110 |
/**
|
| 111 |
* Claim an item in the queue for processing.
|
| 112 |
*
|
| 113 |
* @param $lease_time
|
| 114 |
* How long the processing is expected to take in seconds, defaults to an
|
| 115 |
* hour. After this lease expires, the item will be reset and another
|
| 116 |
* consumer can claim the item. For idempotent tasks (which can be run
|
| 117 |
* multiple times without side effects), shorter lease times would result
|
| 118 |
* in lower latency in case a consumer fails. For tasks that should not be
|
| 119 |
* run more than once (non-idempotent), a larger lease time will make it
|
| 120 |
* more rare for a given task to run multiple times in cases of failure,
|
| 121 |
* at the cost of higher latency.
|
| 122 |
* @return
|
| 123 |
* On success we return an item object. If the queue is unable to claim an
|
| 124 |
* item it returns false. This implies a best effort to retrieve an item
|
| 125 |
* and either the queue is empty or there is some other non-recoverable
|
| 126 |
* problem.
|
| 127 |
*/
|
| 128 |
public function claimItem($lease_time = 3600);
|
| 129 |
|
| 130 |
/**
|
| 131 |
* Delete a finished item from the queue.
|
| 132 |
*
|
| 133 |
* @param $item
|
| 134 |
* The item returned by DrupalQueueInterface::claimItem().
|
| 135 |
*/
|
| 136 |
public function deleteItem($item);
|
| 137 |
|
| 138 |
/**
|
| 139 |
* Create a queue.
|
| 140 |
*
|
| 141 |
* Called during installation and should be used to perform any necessary
|
| 142 |
* initialization operations. This should not be confused with the
|
| 143 |
* constructor for these objects, which is called every time an object is
|
| 144 |
* instantiated to operate on a queue. This operation is only needed the
|
| 145 |
* first time a given queue is going to be initialized (for example, to make
|
| 146 |
* a new database table or directory to hold tasks for the queue -- it
|
| 147 |
* depends on the queue implementation if this is necessary at all).
|
| 148 |
*/
|
| 149 |
public function createQueue();
|
| 150 |
|
| 151 |
/**
|
| 152 |
* Delete a queue and every item in the queue.
|
| 153 |
*/
|
| 154 |
public function deleteQueue();
|
| 155 |
}
|
| 156 |
|
| 157 |
/**
|
| 158 |
* Default queue implementation.
|
| 159 |
*/
|
| 160 |
class SystemQueue implements DrupalQueueInterface {
|
| 161 |
/**
|
| 162 |
* The name of the queue this instance is working with.
|
| 163 |
*
|
| 164 |
* @var string
|
| 165 |
*/
|
| 166 |
protected $name;
|
| 167 |
|
| 168 |
public function __construct($name) {
|
| 169 |
$this->name = $name;
|
| 170 |
}
|
| 171 |
|
| 172 |
public function createItem($data) {
|
| 173 |
$record = new stdClass();
|
| 174 |
$record->name = $this->name;
|
| 175 |
$record->data = $data;
|
| 176 |
// We cannot rely on REQUEST_TIME because many items might be created by a
|
| 177 |
// single request which takes longer than 1 second.
|
| 178 |
$record->created = time();
|
| 179 |
return drupal_write_record('queue', $record) !== FALSE;
|
| 180 |
}
|
| 181 |
|
| 182 |
public function numberOfItems() {
|
| 183 |
return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
|
| 184 |
}
|
| 185 |
|
| 186 |
public function claimItem($lease_time = 30) {
|
| 187 |
// Claim an item by updating its expire fields. If claim is not successful
|
| 188 |
// another thread may have claimed the item in the meantime. Therefore loop
|
| 189 |
// until an item is successfully claimed or we are reasonably sure there
|
| 190 |
// are no unclaimed items left.
|
| 191 |
while (TRUE) {
|
| 192 |
$item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
|
| 193 |
if ($item) {
|
| 194 |
// Try to update the item. Only one thread can succeed in UPDATEing the
|
| 195 |
// same row. We cannot rely on REQUEST_TIME because items might be
|
| 196 |
// claimed by a single consumer which runs longer than 1 second. If we
|
| 197 |
// continue to use REQUEST_TIME instead of the current time(), we steal
|
| 198 |
// time from the lease, and will tend to reset items before the lease
|
| 199 |
// should really expire.
|
| 200 |
$update = db_update('queue')
|
| 201 |
->fields(array(
|
| 202 |
'expire' => time() + $lease_time,
|
| 203 |
))
|
| 204 |
->condition('item_id', $item->item_id)
|
| 205 |
->condition('expire', 0);
|
| 206 |
// If there are affected rows, this update succeeded.
|
| 207 |
if ($update->execute()) {
|
| 208 |
$item->data = unserialize($item->data);
|
| 209 |
return $item;
|
| 210 |
}
|
| 211 |
}
|
| 212 |
else {
|
| 213 |
// No items currently available to claim.
|
| 214 |
return FALSE;
|
| 215 |
}
|
| 216 |
}
|
| 217 |
}
|
| 218 |
|
| 219 |
public function deleteItem($item) {
|
| 220 |
db_delete('queue')
|
| 221 |
->condition('item_id', $item->item_id)
|
| 222 |
->execute();
|
| 223 |
}
|
| 224 |
|
| 225 |
public function createQueue() {
|
| 226 |
// All tasks are stored in a single database table (which is created when
|
| 227 |
// Drupal is first installed) so there is nothing we need to do to create
|
| 228 |
// a new queue.
|
| 229 |
}
|
| 230 |
|
| 231 |
public function deleteQueue() {
|
| 232 |
db_delete('queue')
|
| 233 |
->condition('name', $this->name)
|
| 234 |
->execute();
|
| 235 |
}
|
| 236 |
}
|
| 237 |
|
| 238 |
/**
|
| 239 |
* @} End of "defgroup queue".
|
| 240 |
*/
|