--- a/busui/owa/eventQueue.php +++ b/busui/owa/eventQueue.php @@ -1,1 +1,366 @@ - + + * @copyright Copyright © 2006 Peter Adams + * @license http://www.gnu.org/copyleft/gpl.html GPL v2.0 + * @category owa + * @package owa + * @version $Revision$ + * @since owa 1.0.0 + */ +class eventQueue { + + /** + * Stores listeners + * + */ + var $listeners = array(); + + /** + * Stores listener IDs by event type + * + */ + var $listenersByEventType = array(); + + /** + * Stores listener IDs by event type + * + */ + var $listenersByFilterType = array(); + + /** + * Constructor + * + */ + function __construct() { + + } + + /** + * Attach + * + * Attaches observers by event type. + * Takes a valid user defined callback function for use by PHP's call_user_func_array + * + * @param $event_name string + * @param $observer mixed can be a function name or function array + * @return bool + */ + + function attach($event_name, $observer) { + + $id = md5(microtime()); + + // Register event names for this handler + if(is_array($event_name)) { + + foreach ($event_name as $k => $name) { + + $this->listenersByEventType[$name][] = $id; + } + + } else { + + $this->listenersByEventType[$event_name][] = $id; + } + + $this->listeners[$id] = $observer; + + return true; + } + + /** + * Attach + * + * Attaches observers by filter type. + * Takes a valid user defined callback function for use by PHP's call_user_func_array + * + * @param $filter_name string + * @param $observer mixed can be a function name or function array + * @return bool + */ + + function attachFilter($filter_name, $observer, $priority = 10) { + + $id = md5(microtime()); + + $this->listenersByFilterType[$filter_name][$priority][] = $id; + + $this->listeners[$id] = $observer; + + } + + /** + * Notify + * + * Notifies all handlers of events in order that they were registered + * + * @param $event_type string + * @param $event array + * @return bool + */ + function notify($event) { + + $responses = array(); + owa_coreAPI::debug("Notifying listeners of ".$event->getEventType()); + //print_r($this->listenersByEventType[$event_type] ); + //print $event->getEventType(); + if (array_key_exists($event->getEventType(), $this->listenersByEventType)) { + $list = $this->listenersByEventType[$event->getEventType()]; + //print_r($list); + if (!empty($list)) { + foreach ($this->listenersByEventType[$event->getEventType()] as $k => $observer_id) { + //print_r($list); + $class = get_class( $this->listeners[$observer_id][0] ); + $responses[ $class ] = call_user_func_array($this->listeners[$observer_id], array($event)); + //owa_coreAPI::debug(print_r($event, true)); + owa_coreAPI::debug(sprintf("%s event handled by %s.",$event->getEventType(), get_class($this->listeners[$observer_id][0]))); + } + } + } else { + owa_coreAPI::debug("no listeners registered for this event type."); + } + + owa_coreAPI::debug('EHS: Responses - '.print_r($responses, true)); + + if ( in_array( OWA_EHS_EVENT_FAILED, $responses, true ) ) { + owa_coreAPI::debug("EHS: Event was not handled successfully by some handlers."); + //$q = $this->getAsyncEventQueue(owa_coreAPI::getSetting('base', 'event_queue_type')); + //$q->addToQueue($event); + return OWA_EHS_EVENT_FAILED; + } else { + owa_coreAPI::debug("EHS: Event was handled successfully by all handlers."); + return OWA_EHS_EVENT_HANDLED; + } + + } + + /** + * Notify Untill + * + * Notifies all handlers of events in order that they were registered + * Stops notifying after first handler returns true + * + * @param $event_type string + * @param $event array + * @return bool + */ + + function notifyUntill() { + owa_coreAPI::debug("Notifying Until listener for $event_type answers"); + } + + /** + * Filter + * + * Filters event by handlers in order that they were registered + * + * @param $filter_name string + * @param $value array + * @return $new_value mixed + */ + function filter($filter_name, $value = '') { + owa_coreAPI::debug("Filtering $filter_name"); + + if (array_key_exists($filter_name, $this->listenersByFilterType)) { + // sort the filter list by priority + ksort($this->listenersByFilterType[$filter_name]); + //get the function arguments + $args = func_get_args(); + // outer priority loop + foreach ($this->listenersByFilterType[$filter_name] as $priority) { + // inner filter class/function loop + foreach ($priority as $observer_id) { + // pass args to filter + owa_coreAPI::debug(sprintf("Filter: %s::%s. Value passed: %s", get_class($this->listeners[$observer_id][0]),$this->listeners[$observer_id][1], print_r($value, true))); + $value = call_user_func_array($this->listeners[$observer_id], array_slice($args,1)); + owa_coreAPI::debug(sprintf("Filter: %s::%s. Value returned: %s", get_class($this->listeners[$observer_id][0]),$this->listeners[$observer_id][1], print_r($value, true))); + // set filterred value as value in args for next filter + $args[1] = $value; + // debug whats going on + owa_coreAPI::debug(sprintf("%s filtered by %s.", $filter_name, get_class($this->listeners[$observer_id][0]))); + } + } + } + + return $value; + } + + /** + * Log + * + * Notifies handlers of tracking events + * Provides switch for async notification + * + * @param $event_params array + * @param $event_type string + */ + function log($event_params, $event_type = '') { + //owa_coreAPI::debug("Notifying listeners of tracking event type: $event_type"); + + if (!is_a($event_params,'owa_event')) { + $event = owa_coreAPI::supportClassFactory('base', 'event'); + $event->setProperties($event_params); + $event->setEventType($event_type); + } else { + $event = $event_params; + } + + $this->asyncNotify($event); + + } + + /** + * Async Notify + * + * Adds event to async notiication queue for notification by another process. + * + * @param $event array + * @return bool + */ + function asyncNotify($event) { + + // check config to see if async mode is enabled, if not fall back to realtime notification + if (owa_coreAPI::getSetting('base', 'queue_events')) { + owa_coreAPI::debug(sprintf("Adding event of %s to async %s queue.", $event->getEventType(), owa_coreAPI::getSetting('base', 'event_queue_type'))); + // check to see first if OWA is not already acting as a remote event queue, + // then check to see if we are configured to use a remote or local event queue + // then see if we have an endpoint + if (!owa_coreAPI::getSetting('base', 'is_remote_event_queue') && + owa_coreAPI::getSetting('base', 'use_remote_event_queue') && + owa_coreAPI::getSetting('base', 'remote_event_queue_type') && + owa_coreAPI::getSetting('base', 'remote_event_queue_endpoint')) { + // get a network queue + $q = $this->getAsyncEventQueue(owa_coreAPI::getSetting('base', 'remote_event_queue_type')); + // use a local event queue + } else { + // get a local event queue + $q = $this->getAsyncEventQueue(owa_coreAPI::getSetting('base', 'event_queue_type')); + } + + // if an event queue is returned then pass it the event + if ($q) { + + return $q->addToQueue($event); + // otherwise skip the queue and just notify the listeners immeadiately. + } else { + return $this->notify($event); + } + + // otherwise skip the queue and just notify the listeners immeadiately. + } else { + return $this->notify($event); + } + } + + function getAsyncEventQueue($type) { + + static $q = array(); + + if ( ! array_key_exists( $type, $q ) ) { + + switch( $type ) { + + case 'http': + $q['http'] = owa_coreAPI::supportClassFactory( 'base', 'httpEventQueue' ); + break; + case 'database': + $q['database'] = owa_coreAPI::supportClassFactory( 'base', 'dbEventQueue' ); + break; + case 'file': + $q['file'] = owa_coreAPI::supportClassFactory( 'base', 'fileEventQueue' ); + break; + } + } + + if ( array_key_exists( $type, $q ) ) { + return $q[$type]; + } else { + owa_coreAPI::debug('No event queue of that type exists.'); + return false; + } + } + + function eventFactory() { + + return owa_coreAPI::supportClassFactory('base', 'event'); + } + + function makeEvent($type = '') { + + $event = $this->eventFactory(); + + if ($type) { + $event->setEventType($type); + } + + return $event; + } + +/* + function processEventQueue($processing_queue_type = '') { + + // get the primary async queue + + // get an item from the queue + + // send to the notify method + + // check return status + + // mark item accordingly + } +*/ + + /** + * Singleton + * + * @static + * @return object + * @access public + */ + public static function &get_instance() { + + static $eq; + + if (empty($eq)) { + $eq = new eventQueue(); + } + + return $eq; + } + +} + +?>