Add analytics
[bus.git] / busui / owa / modules / base / classes / daemon.php
blob:a/busui/owa/modules/base/classes/daemon.php -> blob:b/busui/owa/modules/base/classes/daemon.php
--- a/busui/owa/modules/base/classes/daemon.php
+++ b/busui/owa/modules/base/classes/daemon.php
@@ -1,1 +1,225 @@
-
+<?php
+
+if ( ! class_exists( 'Daemon' ) ) {
+	require_once( OWA_INCLUDE_DIR.'Daemon.class.php' );
+}
+
+if ( ! class_exists( 'CronParser.php' ) ) {
+	require_once(OWA_INCLUDE_DIR.'CronParser.php');
+}
+
+class owa_daemon extends Daemon {
+	
+	var $pids = array();
+	var $params = array();
+	var $max_workers = 5;
+	var $job_scheduling_interval = 30;
+	var $eq;
+	var $workerCountByJob = array();
+	var $lastExecutionTimeByJob = array();
+	var $jobsByPid = array();
+	var $defaultMaxWorkersPerJob = 3;
+	var $jobs;
+	
+	function __construct() {
+		
+		$this->params = $this->getArgs();
+		
+		if (isset($this->params['interval'])) {
+			$this->job_scheduling_interval = $this->params['interval'];
+		}
+		
+		if (isset($this->params['max_workers'])) {
+			$this->max_workers = $this->params['max_workers'];
+		}
+		
+		if (isset($this->params['pid_file_location'])) {
+			$this->pidFileLocation = $this->params['pid_file_location'];
+		}
+		
+		if (isset($this->params['uid'])) {
+			$this->userID = $this->params['uid'];
+		}
+		
+		if (isset($this->params['gid'])) {
+			$this->groupID = $this->params['gid'];
+		}
+
+		if (isset($this->params['pid_file_location'])) {
+			$this->pidFileLocation = $this->params['pid_file_location'];
+		}
+		
+		$s = owa_coreAPI::serviceSingleton();
+		$this->jobs = $s->getMap('backgound_jobs');
+		
+		$this->eq = owa_coreAPI::getEventDispatch();
+		
+		return parent::__construct();
+	}
+	
+	function getArgs() {
+		
+		$params = array();
+		// get params from the command line args
+		// $argv is a php super global variable
+		global $argv;
+		for ( $i=1; $i < count( $argv ); $i++ ) {
+			$it = split("=",$argv[$i]);
+			$params[$it[0]] = $it[1];
+		}
+		
+		return $params;
+	}
+
+	function _logMessage($msg, $status = DLOG_NOTICE) {
+		
+		if ($status & DLOG_TO_CONSOLE) {
+        	echo $msg."\n";
+        }
+        
+		owa_coreAPI::notice("Daemon: $msg");
+	}
+	
+	function isWorkerAvailable() {
+		
+		$active_workers = count( $this->pids );
+		$available_workers = $this->max_workers - $active_workers;
+		if ( $available_workers >= 1 ) {
+			return true;
+		} else {
+			return false;
+		}
+	}
+	
+	function isAnotherWorkerAllowed($job_name, $job_max_workers = '') {
+		
+		if ( ! $job_max_workers ) {
+			$job_max_workers = $this->defaultMaxWorkersPerJob;
+		}
+		
+		if ( array_key_exists($job_name, $this->workerCountByJob ) ) {
+			if ( $this->workerCountByJob[$job_name]	< $job_max_workers) {
+				owa_coreAPI::debug(sprintf(
+						"New worker processes is allowed for job: %s. %d of %d processes are active.", 
+						$job_name, 
+						$this->workerCountByJob[$job_name], $job_max_workers 
+				));
+				return true;
+			} else {
+				owa_coreAPI::debug(sprintf(
+						"New worker processes not allowed for job: %s. %d of %d processes are active.", 
+						$job_name, 
+						$this->workerCountByJob[$job_name], $job_max_workers 
+				));
+				return false;
+			}
+		} else {
+			owa_coreAPI::debug(sprintf(
+					"New worker processes is allowed for job: %s. %d of %d processes are active.", 
+					$job_name, 
+					$this->workerCountByJob[$job_name], $job_max_workers 
+			));
+			return true;
+		}	
+	}
+	
+	function isTimeForJob($cron_tab, $last_execution_time) {
+		
+		$cron = new CronParser();
+		$cron->calcLastRan($cron_tab);
+		$last_due = $cron->getLastRanUnix();
+		
+		if ($last_due > $last_execution_time) {
+			return true;
+		} else {
+			return false;
+		}
+	}
+	
+	function getLastExecutionTime($job_name) {
+		
+		if ( array_key_exists( $job_name, $this->lastExecutionTimeByJob ) ) {
+			return $this->lastExecutionTimeByJob[$job_name];
+		} else {
+			return 0;
+		}
+	}
+	
+	/**
+	 * This function is happening in a while loop
+	 */
+	function _doTask() {
+				
+		if ( $this->isWorkerAvailable() ) {
+			
+			$jobs = $this->jobs;
+			
+			if ( $jobs ) {
+				$i = 0;
+				//for ($i = 0; $i < $available_workers; $i++) {
+				foreach ($jobs as $k => $job) {
+					
+					if ( $this->isAnotherWorkerAllowed( $job['name'], $job['max_processes'] ) && 
+						 $this->isTimeForJob( $job['cron_tab'], $this->getLastExecutionTime( $job['name'] ) ) ) {
+						// fork a new child
+						$pid = pcntl_fork();
+						if ( ! $pid ) {
+							// this part is executed in the child
+			 				owa_coreAPI::debug( 'New child process executing job ' . print_r( $job, true ) );
+			 				pcntl_exec( OWA_DIR.'cli.php', $job['cmd'] ); // takes an array of arguments
+			 				exit();
+			 			} elseif ($pid == -1) {
+			 				// happens when something goes wrong and fork fails (handle errors here)
+			 				owa_coreAPI::debug( 'Could not fork new child' );
+			 			} else {
+			 				// this part is executed in the parent
+							// We add pids to a global array, so that when we get a kill signal
+							// we tell the kids to flush and exit.
+							if ( array_key_exists( $k, $this->workerCountByJob ) ) {
+								$this->workerCountByJob[$k]++;
+							} else {
+								$this->workerCountByJob[$k] = 1;
+								$this->lastExecutionTimeByJob[$k] = time();
+								$this->jobsByPid[$pid] = $k;
+							}
+							
+							$this->pids[] = $pid;	
+						}
+					}									
+				}
+			}
+		}
+
+		// Collect any children which have exited on their own. pcntl_waitpid will
+		// return the PID that exited or 0 or ERROR
+		// WNOHANG means we won't sit here waiting if there's not a child ready
+		// for us to reap immediately
+		// -1 means any child
+		$dead_and_gone = pcntl_waitpid( -1, $status, WNOHANG );
+		
+		while( $dead_and_gone > 0 ) {
+			// Remove the gone pid from the array
+			unset( $this->pids[array_search( $dead_and_gone, $this->pids )] );
+			$past_job = $this->jobsByPid[$dead_and_gone];
+			// decrement worker count
+			--$this->workerCountByJob[$past_job];
+			unset($this->jobsByPid[$dead_and_gone]);
+		
+			// Look for another one
+			$dead_and_gone = pcntl_waitpid( -1, $status, WNOHANG);
+		}
+		
+		owa_coreAPI::debug(sprintf(
+				"Daemon Statistics -- pidsByJob: %s, workerCountByJob: %s, lastExecutionTimeByJob: %s",
+				print_r( $this->pidsByJob, true),
+				print_r( $this->workerCountByJob, true),
+				print_r( $this->lastExecutiontimeByJob, true)
+		));
+		
+		// Sleep for some interval
+		sleep($this->job_scheduling_interval);
+	}
+}
+
+?>
+