/[drupal]/contributions/modules/hosting/hosting.queues.inc
ViewVC logotype

Contents of /contributions/modules/hosting/hosting.queues.inc

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.56 - (show annotations) (download) (as text)
Thu Nov 5 16:18:05 2009 UTC (3 weeks ago) by adrian
Branch: MAIN
CVS Tags: DRUPAL-6--0-4-ALPHA3, HEAD
Changes since 1.55: +8 -2 lines
File MIME type: text/x-php
Replace crontab entry so that it _will_ work on Ubuntu and other systems using Dash with the new drush 2.1
1 <?php
2
3
4 function _hosting_watchdog($entry) {
5 watchdog($entry['type'], $entry['message']);
6 }
7
8 /**
9 * Main queue processing command for hostmaster.
10 *
11 * This is a single command, which will (based on configuration) run all the other
12 * queue commands (cron, backup, tasks, stats). This is done so that there
13 * is only one cron job to configure, and allow the frequency of calls to be configured
14 * from the interface.
15 */
16 function hosting_dispatch() {
17 $now = mktime();
18 variable_set("hosting_dispatch_last_run", $now);
19 drush_log('hosting_dispatch', t("dispatching queues"));
20 $platform = node_load(HOSTING_OWN_PLATFORM);
21 $root = $platform->publish_path;
22
23 if (variable_get('hosting_dispatch_enabled', false)) {
24 $queues = hosting_get_queues();
25 foreach ($queues as $queue => $info) {
26 if ($info['enabled']) {
27 if (($now - $info["last"]) >= $info["calc_frequency"]) {
28 drush_backend_fork("hosting", array($queue, 'items' => $info['calc_items']));
29 } else {
30 drush_log(dt("too early for queue @queue", array('@queue' => $queue)));
31 }
32 } else {
33 drush_log(dt("queue @queue disabled", array('@queue' => $queue)));
34 }
35 }
36 } else {
37 drush_log(dt("dispatching disabled"));
38 }
39 }
40
41 /**
42 * Retrieve a list of queues that need to be dispatched
43 *
44 * Generate a list of queues, and the frequency / amount of items
45 * that need to be processed for each of them.
46 */
47 function hosting_get_queues($refresh = false) {
48 static $cache = null;
49
50 if (is_null($cache) || $refresh) {
51 $cache = array();
52 $defaults = array(
53 'type' => 'serial',
54 'max_threads' => 6,
55 'threshold' => '100',
56 'min_threads' => 1,
57 'timeout' => strtotime("10 minutes", 0),
58 'frequency' => strtotime("5 minutes", 0),
59 'items' => 5,
60 'enabled' => TRUE,
61 'singular' => t('item'),
62 'plural' => t('items')
63 );
64 $queues = module_invoke_all("hosting_queues");
65 foreach ($queues as $key => $queue) {
66 $queue = array_merge($defaults, $queue);
67
68 // Configurable settings.
69 $configured = array(
70 'frequency' => variable_get('hosting_queue_' . $key . '_frequency', $queue['frequency']),
71 'items' => variable_get('hosting_queue_' . $key . '_items', $queue['items']),
72 'enabled' => variable_get('hosting_queue_' . $key . '_enabled', $queue['enabled']),
73 'last_run' => variable_get('hosting_queue_' . $key . '_last_run', false),
74 'running' => variable_get('hosting_queue_' . $key . '_running', false),
75 'interval' => variable_get('hosting_queue_' . $key . '_interval', false),
76 );
77 $queue = array_merge($queue, $configured);
78
79 if ($queue['type'] == 'batch') {
80 $threads = $queue['total_items'] / $queue['threshold'];
81 if ($threads <= $queue['min_threads']) {
82 $threads = $queue['min_threads'];
83 } elseif ($thread > $queue['max_threads']) {
84 $threads = $queue['max_threads'];
85 }
86 $queue['calc_threads'] = $threads;
87 $queue['calc_frequency'] = ceil($queue['frequency'] / $threads);
88 $queue['calc_items'] = ceil($queue['total_items'] / $threads);
89 }
90 else {
91 $queue['calc_frequency'] = $queue['frequency'];
92 $queue['calc_items'] = $queue['items'];
93 }
94
95 $queue['last'] = variable_get('hosting_queue_' . $key . '_last_run', 0);
96 $queue['running'] = variable_get('hosting_queue_' . $key . '_running', 0);
97 $queues[$key] = $queue;
98 }
99 $cache = $queues;
100 }
101
102 return $cache;
103 }
104
105 /**
106 * Run a queue specified by hook_hosting_queues
107 *
108 * Run an instance of a queue processor. This function contains all the book keeping
109 * functionality needed to ensure that the queues are running as scheduled.
110 */
111 function hosting_run_queue() {
112 $cmd = drush_get_command();
113 $queue = $cmd['queue'];
114 $count = drush_get_option(array('i', 'items'), 5); # process a default of 5 items at a time.
115
116 variable_set('hosting_queue_' . $queue . '_last_run', $t = mktime());
117 variable_set('hosting_queue_' . $queue . '_running', $t);
118
119 $func = "hosting_" . $queue . "_queue";
120
121 if (function_exists($func)) {
122 $func($count);
123 }
124
125 variable_del('hosting_queue_' . $queue . '_running');
126 }
127
128 /**
129 * Retrieve a list of outstanding tasks.
130 *
131 * @param limit
132 * The amount of items to return.
133 * @return
134 * An associative array containing task nodes, indexed by node id.
135 */
136 function _hosting_get_new_tasks($limit = 20) {
137 $return = array();
138 $result = db_query("SELECT t.nid FROM {hosting_task} t INNER JOIN {node} n ON t.vid = n.vid WHERE t.task_status = %d ORDER BY n.changed, n.nid ASC LIMIT %d", 0, $limit);
139 while ($node = db_fetch_object($result)) {
140 $return[$node->nid] = node_load($node->nid);
141 }
142 return $return;
143 }
144
145 /**
146 * Process the hosting task queue.
147 *
148 * Iterates through the list of outstanding tasks, and execute the commands on the back end.
149 */
150 function hosting_tasks_queue($count = 20) {
151 global $provision_errors;
152
153 drush_log(dt("Running tasks queue"));
154 $tasks = _hosting_get_new_tasks($count);
155 foreach ($tasks as $task) {
156 $task->revision = FALSE;
157 // remove the task from the queue at the start of execution
158 // this is to avoid concurrent task runs
159 $task->task_status = HOSTING_TASK_PROCESSING;
160 node_save($task);
161 drush_backend_fork("hosting task", array($task->nid));
162 }
163 }
164
165 function hosting_queues_get_arguments($task) {
166 $data = module_invoke_all('provision_args', $task, $task->task_type);
167 foreach ($data as $key => $value) {
168 if (substr($key, 0, 1) == '#') {
169 $data[(int) str_replace('#', '', $key)] = $value;
170 unset($data[$key]);
171 }
172 }
173
174 ksort($data);
175 return $data;
176 }
177
178 function _hosting_backend_invoke($cmd, $task) {
179 $proc = _drush_proc_open($cmd, FALSE);
180 if ($proc['output']) {
181 $values = drush_backend_parse_output($proc['output'], FALSE);
182
183 }
184 return FALSE;
185 }
186
187 function _hosting_queues_clean_output($return) {
188 return filter_xss($return, array());
189 }
190
191
192 function _hosting_dispatch_cmd() {
193 $node = node_load(HOSTING_OWN_WEB_SERVER);
194 $cmd = sprintf("php %s hosting dispatch --root=%s", escapeshellarg($node->drush_path),
195 escapeshellarg(HOSTING_DEFAULT_DOCROOT_PATH));
196 if (function_exists('drush_get_option')) {
197 if ($uri = drush_get_option('uri')) {
198 $cmd .= ' --uri=' . escapeshellarg($uri);
199 }
200 }
201 return $cmd;
202 }
203
204 function hosting_queues_cron_cmd() {
205 $command = _hosting_dispatch_cmd();
206 $return = <<<END
207 SHELL=/bin/bash
208 PATH=$_SERVER[PATH]
209 */1 * * * * $command
210 END;
211 return $return;
212 }
213

  ViewVC Help
Powered by ViewVC 1.1.2