/[drupal]/contributions/modules/datasync/datasync.module
ViewVC logotype

Contents of /contributions/modules/datasync/datasync.module

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.1 - (show annotations) (download) (as text)
Mon Jul 21 22:49:59 2008 UTC (16 months, 1 week ago) by andrewlevine
Branch: MAIN
CVS Tags: HEAD
Branch point for: DRUPAL-6--1, DRUPAL-5
File MIME type: text/x-php
Committing actual files
1 <?php
2 // $Id$
3
4 /*
5 * TODO: There should be some way for administrators to see the statuses of all
6 * the running/completed jobs
7 * TODO: Statistics on jobs
8 */
9 include_once(drupal_get_path('module','datasync') . '/datasync_constants.inc');
10
11
12 /**************************
13 * DRUPAL HOOKS
14 ***************************/
15 function datasync_menu($may_cache) {
16 $items = array();
17 if ($may_cache) {
18 $items[] = array(
19 'path' => 'datasync/queue',
20 'title' => '',
21 'access' => TRUE,
22 'callback' => 'datasync_queue_callback',
23 'type' => MENU_CALLBACK,
24 );
25 $items[] = array(
26 'path' => 'datasync/jobs',
27 'title' => '',
28 'access' => TRUE,
29 'callback' => 'datasync_jobs_callback',
30 'type' => MENU_CALLBACK,
31 );
32 $items[] = array(
33 'path' => 'admin/settings/datasync',
34 'title' => t('DataSync'),
35 'description' => t('Manage DataSync Settings'),
36 'access' => user_access('administer site configuration'),
37 'callback' => 'drupal_get_form',
38 'callback arguments' => array('datasync_admin_settings'),
39 'type' => MENU_NORMAL_ITEM,
40 );
41 $items[] = array(
42 'path' => 'admin/settings/datasync/general',
43 'title' => t('DataSync'),
44 'type' => MENU_DEFAULT_LOCAL_TASK,
45 'weight' => -10
46 );
47 }
48 else {
49
50 }
51 return $items;
52 }
53
54 /**********************************
55 * DATASYNC CORE FUNCTIONS
56 * -All these functions except take_jobs should probably only be called from within transactions (D6)
57 * -They all return FALSE on error
58 * -All except take_jobs return TRUE on success
59 * -take_jobs returns an array of job_ids on success and runs inside of its own transactions (D6)
60 **********************************/
61
62 /**
63 * Reserve $num_jobs jobs as running and return job IDs to be worked on.
64 *
65 * @param $num_jobs
66 * Number of jobs to reserve and return.
67 * @param $valid_status
68 * Only get jobs with these statuses.
69 * @param $consumers
70 * Only get jobs of this consumer type.
71 * @return
72 * An array of job IDs
73 */
74 function datasync_take_jobs($num_jobs, $valid_status, $consumers) {
75 $consumers = datasync_enabled_consumers($consumers);
76 $status_in_string = implode(', ', $valid_status);
77 $consumer_in_string = "'" . implode("', '", $consumers) . "'";
78 //db_query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
79 //db_query('BEGIN;');
80 $result = db_query_range("SELECT job_id FROM {datasync_jobs} WHERE consumer IN ($consumer_in_string) AND status IN ($status_in_string) AND completed=%d AND started=%d ORDER BY job_id ASC", DATASYNC_INCOMPLETE, DATASYNC_UNSTARTED, 0, $num_jobs);
81 $job_ids = array();
82 while ($row = db_fetch_object($result)) {
83 $job_ids[] = $row->job_id;
84 }
85 if (empty($job_ids)) {
86 //db_query('ROLLBACK;');
87 return array();
88 }
89 $in_string = implode(', ', $job_ids);
90 db_query("UPDATE {datasync_jobs} SET started=%d WHERE job_id IN ($in_string)", time());
91
92 if (db_affected_rows() == count($job_ids)) {
93 //db_query('COMMIT;');
94 return $job_ids;
95 }
96 //db_query('ROLLBACK;');
97 return FALSE;
98 }
99
100 /**
101 * Change a job's status to $current_status and set its next status to $next_status
102 *
103 * @param $job_id
104 * Job ID of the job that we are modifying
105 * @param $current_status
106 * status that the job will have after this switch.
107 * @param $next_status
108 * next_status the job will have after this switch. NULL means just increment status by one.
109 * @param $started
110 * unix time value of when the job was started (usually DATASYNC_UNSTARTED).
111 * @param $completed
112 * unix time value of when the job was completed (usually DATASYNC_INCOMPLETE)
113 * @return
114 * TRUE on success, FALSE on failure
115 */
116 function datasync_switch_status($job_id, $current_status, $next_status=NULL, $started=DATASYNC_UNSTARTED, $completed=DATASYNC_INCOMPLETE) {
117 $c = datasync_enabled_consumers(array(datasync_consumer_by_job_id($job_id)));
118 if (empty($c)) {
119 return FALSE;
120 }
121 if ($next_status !== NULL) {
122 db_query("UPDATE {datasync_jobs} SET started=%d, completed=%d, status=%d, next_status=%d WHERE job_id=%d", $started, $completed, $current_status, $next_status, $job_id);
123 }
124 else {
125 db_query("UPDATE {datasync_jobs} SET started=%d, completed=%d, status=%d, next_status=NULL WHERE job_id=%d", $started, $completed, $current_status, $job_id);
126 }
127 return TRUE;
128 }
129
130 /**
131 * Add a job of type $consumer to the job queue
132 *
133 * @param $consumer
134 * Type of job we are adding
135 * @return
136 * Not FALSE on success, FALSE on failure
137 */
138 function datasync_add_job($consumer) {
139 $c = datasync_enabled_consumers(array($consumer));
140 if (empty($c)) {
141 return FALSE;
142 }
143 $job_id = db_next_id('{datasync_jobs}_job_id');
144 db_query("INSERT INTO {datasync_jobs} (job_id, consumer, status, next_status, started, completed) VALUES (%d, '%s', %d, %d, %d, %d)",
145 $job_id, $consumer, DATASYNC_ADDED, DATASYNC_INITIALIZE, time(), time());
146 return db_error();
147 }
148
149 /**
150 * Remove a job from the jobs queue
151 *
152 * @param $job_id
153 * Job ID of the job to delete
154 * @return
155 * TRUE on success, FALSE on failure
156 */
157 function datasync_remove_job($job_id) {
158 $c = datasync_enabled_consumers(array(datasync_consumer_by_job_id($job_id)));
159 if (empty($c)) {
160 return FALSE;
161 }
162 db_query("DELETE FROM {datasync_queue} WHERE job_id=%d", $job_id);
163 if (db_affected_rows() == 1) {
164 return TRUE;
165 }
166 return FALSE;
167 }
168
169 /**
170 * Return information from the datasync_jobs table on a job
171 *
172 * @param $job_id
173 * Job ID of the job we are returning info on.
174 * @return
175 * An object of the row of the datasync_jobs table.
176 */
177 function datasync_get_job_info($job_id) {
178 return db_fetch_object(db_query("SELECT * FROM {datasync_jobs} WHERE job_id=%d", $job_id));
179 }
180
181 /**
182 * Enqueue's an item to be versioned and inserted at a later date.
183 *
184 * @param $guid
185 * GUID of the item we are inserting.
186 * @param $job_id
187 * Job ID we are enqueueing an item for.
188 * @param $action
189 * Action we are performing (normally DATASYNC_INSERT, DATASYING_UPDATE or DATASYNC_DELETE)
190 * @param $data,
191 * Data to be inserted in the datasync_queue table.
192 * @param $version
193 * Version of the data we are inserting (must be integer)
194 * @return
195 * TRUE on success, FALSE on failure.
196 */
197 function datasync_enqueue($guid, $job_id, $action, $data="", $version=NULL) {
198 if ($consumer = datasync_consumer_by_job_id($job_id)) {
199 $c = datasync_enabled_consumers(array($consumer));
200 if (empty($c)) {
201 return FALSE;
202 }
203 $qid = db_next_id("{datasync_queue}_qid");
204 if ($version === NULL) {
205 $version = time();
206 }
207 //print_r(array($qid, $action, $c[0], $guid, $job_id, $version, $data));
208 db_query("INSERT INTO {datasync_queue} (qid, action, consumer, guid, job_id, version, data) VALUES (%d, '%s', '%s', '%s', %d, %d, '%s')",
209 $qid, $action, $c[0], $guid, $job_id, $version, $data);
210 if (db_affected_rows() == 1) {
211 return TRUE;
212 }
213 }
214 return FALSE;
215 }
216
217 /**
218 * Dequeue an item that has already had its action take place.
219 *
220 * @param $qid
221 * The QID of the item we are deuqueuing
222 * @return
223 * TRUE on success, FALSE on failure
224 */
225 function datasync_dequeue($qid) {
226 $c = datasync_enabled_consumers(array(datasync_consumer_by_qid($qid)));
227 if (empty($c)) {
228 return FALSE;
229 }
230 //TODO we can make a queue_deleted table to take care of myISAM's
231 //speed when using just INSERT and SELECT
232 db_query("DELETE FROM {datasync_queue} WHERE qid=%d", $qid);
233 if (db_affected_rows() == 1) {
234 return TRUE;
235 }
236 return FALSE;
237 }
238
239 /************************************
240 * DATASYNC CALLBACKS
241 * //WARNING: AS OF YET, THESE CALLBACKS ARE UNTESTED
242 * //TODO add data encryption if necessary
243 **************************************/
244 function datasync_jobs_callback() {
245 $input = $_POST['datasync'];
246 //string returned means error, array means success
247 if (!is_array(($result = datasync_callback_data($input)))) {
248 print 'error|' . $result;
249 exit;
250 }
251
252 switch($result['api_function']) {
253 case 'switch_status':
254 datasync_switch_status_callback_helper($result);
255 break;
256 case 'add_job':
257 datasync_add_job_callback_helper($result);
258 break;
259 case 'remove_job':
260 datasync_remove_job_callback_helper($result);
261 break;
262 case 'take_jobs':
263 datasync_take_jobs_callback_helper($result);
264 break;
265 default:
266 print 'error|' . t('No such API function @f', array('@f', $result['api_function']));
267 break;
268 }
269 exit;
270 }
271
272 function datasync_switch_status_callback_helper($result) {
273 if(!isset($result['job_id']) || !ctype_digit($result['job_id'])) {
274 print 'error|' . t('Invalid job_id @jid given', array('@jid' => $result['job_id']));
275 return;
276 }
277 if (isset($result['current_status']) && !ctype_digit($result['current_status'])) {
278 print 'error|' . t('Invalid current_status: @str', array('@str' => $result['current_status']));
279 return;
280 }
281 if (isset($result['next_status']) && !ctype_digit($result['next_status'])) {
282 print 'error|' . t('Invalid next_status: @str', array('@str' => $result['next_status']));
283 return;
284 }
285 //db_query('BEGIN;');
286 $success = datasync_switch_status($result['job_id'], $result['current_status'], $result['next_status']);
287 if ($success) {
288 //db_query('COMMIT;');
289 print 'success';
290 }
291 else {
292 //db_query('ROLLBACK;');
293 print 'error|' . t('status switch of "@jid" failed.', array('@jid' => $result['job_id']));
294 }
295 }
296
297 function datasync_add_job_callback_helper($result) {
298 if (!isset($result['consumer']) || strlen($result['consumer'] > 50)) {
299 print 'error|' . t('Invalid consumer: @str', array('@str' => $result['consumer']));
300 return;
301 }
302 //db_query('BEGIN;');
303 $success = datasync_add_job($result['consumer']);
304 if ($success) {
305 //db_query('COMMIT;');
306 print 'success';
307 }
308 else {
309 //db_query('ROLLBACK;');
310 print 'error|' . t('addition of job "@jid" failed.', array('@jid' => $result['job_id']));
311 }
312 }
313
314 function datasync_remove_job_callback_helper($result) {
315 if(!isset($result['job_id']) || !ctype_digit($result['job_id'])) {
316 print 'error|' . t('Invalid job_id @jid given', array('@jid' => $result['job_id']));
317 return;
318 }
319 //db_query('BEGIN;');
320 $success = datasync_remove_job($result['job_id']);
321 if ($success) {
322 //db_query('COMMIT;');
323 print 'success';
324 }
325 else {
326 //db_query('ROLLBACK;');
327 print 'error|' . t('addition of job "@jid" failed.', array('@jid' => $result['job_id']));
328 }
329 }
330
331 function datasync_take_jobs_callback_helper($result) {
332 if (!isset($result['num_jobs']) || !ctype_digit($result['num_jobs'])) {
333 print 'error|' . t('Invalid num_jobs: @str', array('@str' => $result['num_jobs']));
334 return;
335 }
336 if(!isset($result['valid_status']) || !($vs = base64_decode($result['valid_status']))) {
337 print 'error|' . t('Invalid valid_status: @str', array('@str' => $result['valid_status']));
338 return;
339 }
340 if(!isset($result['consumers']) || !($c = base64_decode($result['consumers']))) {
341 print 'error|' . t('Invalid consumers: @str', array('@str' => $result['consumers']));
342 return;
343 }
344
345 $job_ids = datasync_take_jobs($result['num_jobs'], $vs, $c);
346 if ($job_ids !== FALSE) {
347 print 'success|' . base64_encode(implode('|', $job_ids));
348 }
349 else {
350 print 'error|' . t('Taking of jobs failed.');
351 }
352 }
353
354 function datasync_queue_callback() {
355 $input = $_POST['datasync'];
356 //string returned means error, array means success
357 if (!is_array(($result = datasync_callback_data($input)))) {
358 print 'error|' . $result;
359 exit;
360 }
361
362 switch($result['api_function']) {
363 case 'enqueue':
364 datasync_enqueue_callback_helper($result);
365 break;
366 case 'dequeue':
367 datasync_dequeue_callback_helper($result);
368 break;
369 default:
370 print 'error|' . t('No such API function @f', array('@f', $result['api_function']));
371 break;
372 }
373 exit;
374 }
375
376 function datasync_enqueue_callback_helper($result) {
377 if (!isset($result['guid']) || strlen($result['guid'] > 150)) {
378 print 'error|' . t('Invalid guid: @str', array('@str' => $result['guid']));
379 return;
380 }
381 else if (!isset($result['action']) || !ctype_digit($result['action'])) {
382 print 'error|' . t('Invalid action: @str', array('@str' => $result['action']));
383 return;
384 }
385 else if (isset($result['version']) && !ctype_digit($result['version'])) {
386 print 'error|' . t('Invalid version: @str', array('@str' => $result['version']));
387 return;
388 }
389 else if (!isset($result['version'])) {
390 $result['version'] = NULL;
391 }
392 if (!isset($result['data'])) {
393 $result['data'] = '';
394 }
395
396 //db_query('BEGIN;');
397 $success = datasync_enqueue($result['guid'], $result['job_id'], $result['action'], $result['data'], $result['version']);
398 if ($success) {
399 //db_query('COMMIT;');
400 print 'success';
401 }
402 else {
403 //db_query('ROLLBACK;');
404 print 'error|' . t('enqueue of "@jid" failed.', array('@jid' => $result['job_id']));
405 }
406 }
407
408 function datasync_dequeue_callback_helper($result) {
409 if (!isset($result['qid']) || !ctype_digit($result['qid'])) {
410 print 'error|' . t('Invalid qid: @str', array('@str' => $result['guid']));
411 return;
412 }
413 //db_query('BEGIN;');
414 $success = datasync_dequeue($result['qid']);
415 if ($success) {
416 //db_query('COMMIT;');
417 print 'success';
418 }
419 else {
420 //db_query('ROLLBACK;');
421 print 'error|' . t('dequeue of "@qid" failed.', array('@qid' => $result['qid']));
422 }
423 }
424 /*
425 * Returns a string with the error message on error.
426 * Returns an array of data on success.
427 */
428 function datasync_callback_data($input) {
429 $input = explode('|', $input);
430 if (count($input) != 2) {
431 return t('Bad input format.');
432 }
433 $key = sha1(ds_variable_get('datasync_hmac', ''));
434 if ($input[1] != datasync_hmacsha1($key, $input[0], 128, 'sha1')) {
435 return t('Invalid hmacsha1.');
436 }
437 $data = base64_decode($input[0]);
438 $data = explode('|', $data);
439 if ((count($data) % 2) != 0) {
440 return t('Invalid number of items in data array.');
441 }
442 $items = array();
443 for ($i=0; $i<count($data);$i+=2) {
444 $items[$data[$i]] = $data[$i+1];
445 }
446 if(!isset($items['time']) || !ctype_digit($items['time']) || abs(time() - $items['time']) > 15) {
447 return t('Invalid time given');
448 }
449 return $items;
450 }
451
452 function datasync_hmacsha1($key, $data, $blocksize, $hashfunc) {
453 if (strlen($key)>$blocksize)
454 $key=pack('H*', $hashfunc($key));
455 $key=str_pad($key,$blocksize,chr(0x00));
456 $ipad=str_repeat(chr(0x36),$blocksize);
457 $opad=str_repeat(chr(0x5c),$blocksize);
458 $hmac = pack(
459 'H*',$hashfunc(
460 ($key^$opad).pack(
461 'H*',$hashfunc(
462 ($key^$ipad).$data
463 )
464 )
465 )
466 );
467 return bin2hex($hmac);
468 }
469
470 /**************************************
471 * INTERFACE STUFF
472 **************************************/
473 function datasync_admin_settings() {
474 $form = array();
475
476 $form['hmac'] = array(
477 '#type' => 'textfield',
478 '#title' => t('HMAC Key'),
479 '#description' => t('Secret key that is required to use callbacks.'),
480 '#default_value' => ds_variable_get('datasync_hmac', ''),
481 );
482
483 $producers = datasync_get_producers(FALSE, FALSE);
484 $options = array();
485 foreach ($producers as $key => $producer) {
486 $options[$key] = $producer['name'];
487 }
488 $form['producers'] = array(
489 '#type' => 'checkboxes',
490 '#title' => t('Enable these Producers'),
491 '#default_value' => ds_variable_get('datasync_producers', array()),
492 '#options' => $options,
493 );
494 $form['consumers'] = array(
495 '#type' => 'checkboxes',
496 '#title' => t('Enable these Consumers'),
497 '#default_value' => ds_variable_get('datasync_consumers', array()),
498 '#options' => $options,
499 );
500
501 $consumers = datasync_get_consumers(FALSE, FALSE);
502 $options = array();
503 foreach ($consumers as $key => $consumer) {
504 $options[$key] = $consumer['name'];
505 }
506
507 $form['submit'] = array(
508 '#type' => 'submit',
509 '#value' => t('Save'),
510 );
511
512 return $form;
513 }
514
515 function datasync_admin_settings_submit($form_id, $form_values) {
516 ds_variable_set('datasync_producers', $form_values['producers']);
517 ds_variable_set('datasync_consumers', $form_values['consumers']);
518 ds_variable_set('datasync_hmac', $form_values['hmac']);
519 }
520
521 /**************************************
522 * MISC HELPER FUNCTIONS
523 **************************************/
524 function datasync_get_producers($enabled_only, $generic_only) {
525 static $producers;
526 static $producers_enabled;
527 static $producers_generic;
528 static $result;
529 $key = (int)$enabled_only . (int)$generic_only;
530 if(!is_array($result) || !isset($result[$key])) {
531 if (!is_array($producers)) {
532 $producers = module_invoke_all('ds_producers');
533 //sort producers by weight
534 uasort($producers, 'datasync_sort_by_weight');
535 }
536 //always fill with producers
537 $result[$key] = $producers;
538 if ($enabled_only) {
539 if (!is_array($producers_enabled)) {
540 $enabled = ds_variable_get('datasync_producers', array());
541 $producers_enabled = $producers;
542 foreach ($producers as $name => $producer) {
543 if (!$enabled[$name]) {
544 unset($producers_enabled[$name]);
545 }
546 }
547 }
548 $result[$key] = array_intersect_key($result[$key], $producers_enabled);
549 }
550 if ($generic_only) {
551 if (!is_array($producers_generic)) {
552 $enabled = ds_variable_get('datasync_producers', array());
553 $producers_generic = $producers;
554 foreach ($producers as $name => $producer) {
555 if (!isset($producer['generic']['consumers'])) {
556 unset($producers_generic[$name]);
557 }
558 }
559 }
560 $result[$key] = array_intersect_key($result[$key], $producers_generic);
561 }
562 }
563 return $result[$key];
564 }
565
566 function datasync_enabled_producers($producers) {
567 $enabled = ds_variable_get('datasync_producers', array());
568 foreach ($producers as $k => $name) {
569 if (!$enabled[$name]) {
570 unset($producers[$k]);
571 }
572 }
573 return $producers;
574 }
575
576 function datasync_enabled_consumers($consumers) {
577 $enabled = ds_variable_get('datasync_consumers', array());
578 foreach ($consumers as $k => $name) {
579 if (!$enabled[$name]) {
580 unset($consumers[$k]);
581 }
582 }
583 return $consumers;
584 }
585
586 function datasync_get_consumers($enabled_only, $generic_only) {
587 static $consumers;
588 static $consumers_enabled;
589 static $consumers_generic;
590 static $result;
591 $key = (int)$enabled_only . (int)$generic_only;
592 if(!is_array($result) || !isset($result[$key])) {
593 if (!is_array($consumers)) {
594 $consumers = module_invoke_all('ds_consumers');
595 //sort consumers by weight
596 uasort($consumers, 'datasync_sort_by_weight');
597 }
598 //always fill with consumers
599 $result[$key] = $consumers;
600 if ($enabled_only) {
601 if (!is_array($consumers_enabled)) {
602 $enabled = ds_variable_get('datasync_consumers', array());
603 $consumers_enabled = $consumers;
604 foreach ($consumers as $name => $consumer) {
605 if (!$enabled[$name]) {
606 unset($consumers_enabled[$name]);
607 }
608 }
609 }
610 $result[$key] = array_intersect_key($result[$key], $consumers_enabled);
611 }
612 if ($generic_only) {
613 if (!is_array($consumers_generic)) {
614 $enabled = ds_variable_get('datasync_consumers', array());
615 $consumers_generic = $consumers;
616 foreach ($consumers as $name => $consumer) {
617 if (!isset($consumer['generic'])) {
618 unset($consumers_generic[$name]);
619 }
620 }
621 }
622 $result[$key] = array_intersect_key($result[$key], $consumers_generic);
623 }
624 }
625 return $result[$key];
626 }
627
628 function datasync_sort_by_weight($a, $b) {
629 $aw = array_key_exists('weight', $a) ? $a['weight'] : 0;
630 $bw = array_key_exists('weight', $b) ? $b['weight'] : 0;
631 if ($aw == $bw) {
632 return 0;
633 }
634 return ($aw < $bw) ? -1 : 1;
635 }
636
637 function datasync_consumer_by_job_id($job_id) {
638 //TODO static here
639 return db_result(db_query("SELECT consumer FROM {datasync_jobs} WHERE job_id=%d", $job_id));
640 }
641
642 function datasync_consumer_by_qid($qid) {
643 //TODO static here
644 return db_result(db_query("SELECT consumer FROM {datasync_queue} WHERE qid=%d", $qid));
645 }
646
647 //Verifies if a process is running in linux
648 function datasync_process_running($pid) {
649 exec("ps $pid", $state);
650 return(count($state) >= 2);
651 }
652
653 //Checks if the current host is in the given list
654 function datasync_host_in_list($hosts) {
655 exec("hostname", $current);
656 trim($current[0]);
657 return in_array($current[0], $hosts) ? $current[0] : FALSE;
658 }
659
660 /**
661 * Copied from bootstrap.inc and modified to make sure
662 * transactions aren't autocommitted and that we always get
663 * the latest values (D6)
664 */
665 function ds_variable_get($name, $default) {
666 $result = db_result(db_query("SELECT value FROM {datasync_variable} WHERE name='%s'", $name));
667 return $result === FALSE ? $default : unserialize($result);
668 }
669
670 function ds_variable_set($name, $value) {
671 db_query("DELETE FROM {datasync_variable} WHERE name = '%s'", $name);
672 db_query("INSERT INTO {datasync_variable} (name, value) VALUES ('%s', '%s')", $name, serialize($value));
673 if (db_affected_rows() != 1) {
674 return FALSE;
675 }
676 return TRUE;
677 }
678
679 function ds_variable_del($name) {
680 db_query("DELETE FROM {datasync_variable} WHERE name = '%s'", $name);
681 if (db_error != 0) {
682 return FALSE;
683 }
684 return TRUE;
685 }
686
687 function datasync_log_failure($message) {
688 db_query("INSERT INTO {datasync_failures} (value, timestamp) VALUES ('%s', %d)", $message, time());
689 }
690

  ViewVC Help
Powered by ViewVC 1.1.2