Add analytics
[bus.git] / busui / owa / modules / base / classes / daemon.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
<?php
 
if ( ! class_exists( 'Daemon' ) ) {
        require_once( OWA_INCLUDE_DIR.'Daemon.class.php' );
}
 
if ( ! class_exists( 'CronParser.php' ) ) {
        require_once(OWA_INCLUDE_DIR.'CronParser.php');
}
 
class owa_daemon extends Daemon {
        
        var $pids = array();
        var $params = array();
        var $max_workers = 5;
        var $job_scheduling_interval = 30;
        var $eq;
        var $workerCountByJob = array();
        var $lastExecutionTimeByJob = array();
        var $jobsByPid = array();
        var $defaultMaxWorkersPerJob = 3;
        var $jobs;
        
        function __construct() {
                
                $this->params = $this->getArgs();
                
                if (isset($this->params['interval'])) {
                        $this->job_scheduling_interval = $this->params['interval'];
                }
                
                if (isset($this->params['max_workers'])) {
                        $this->max_workers = $this->params['max_workers'];
                }
                
                if (isset($this->params['pid_file_location'])) {
                        $this->pidFileLocation = $this->params['pid_file_location'];
                }
                
                if (isset($this->params['uid'])) {
                        $this->userID = $this->params['uid'];
                }
                
                if (isset($this->params['gid'])) {
                        $this->groupID = $this->params['gid'];
                }
 
                if (isset($this->params['pid_file_location'])) {
                        $this->pidFileLocation = $this->params['pid_file_location'];
                }
                
                $s = owa_coreAPI::serviceSingleton();
                $this->jobs = $s->getMap('backgound_jobs');
                
                $this->eq = owa_coreAPI::getEventDispatch();
                
                return parent::__construct();
        }
        
        function getArgs() {
                
                $params = array();
                // get params from the command line args
                // $argv is a php super global variable
                global $argv;
                for ( $i=1; $i < count( $argv ); $i++ ) {
                        $it = split("=",$argv[$i]);
                        $params[$it[0]] = $it[1];
                }
                
                return $params;
        }
 
        function _logMessage($msg, $status = DLOG_NOTICE) {
                
                if ($status & DLOG_TO_CONSOLE) {
                echo $msg."\n";
        }
        
                owa_coreAPI::notice("Daemon: $msg");
        }
        
        function isWorkerAvailable() {
                
                $active_workers = count( $this->pids );
                $available_workers = $this->max_workers - $active_workers;
                if ( $available_workers >= 1 ) {
                        return true;
                } else {
                        return false;
                }
        }
        
        function isAnotherWorkerAllowed($job_name, $job_max_workers = '') {
                
                if ( ! $job_max_workers ) {
                        $job_max_workers = $this->defaultMaxWorkersPerJob;
                }
                
                if ( array_key_exists($job_name, $this->workerCountByJob ) ) {
                        if ( $this->workerCountByJob[$job_name] < $job_max_workers) {
                                owa_coreAPI::debug(sprintf(
                                                "New worker processes is allowed for job: %s. %d of %d processes are active.", 
                                                $job_name, 
                                                $this->workerCountByJob[$job_name], $job_max_workers 
                                ));
                                return true;
                        } else {
                                owa_coreAPI::debug(sprintf(
                                                "New worker processes not allowed for job: %s. %d of %d processes are active.", 
                                                $job_name, 
                                                $this->workerCountByJob[$job_name], $job_max_workers 
                                ));
                                return false;
                        }
                } else {
                        owa_coreAPI::debug(sprintf(
                                        "New worker processes is allowed for job: %s. %d of %d processes are active.", 
                                        $job_name, 
                                        $this->workerCountByJob[$job_name], $job_max_workers 
                        ));
                        return true;
                }       
        }
        
        function isTimeForJob($cron_tab, $last_execution_time) {
                
                $cron = new CronParser();
                $cron->calcLastRan($cron_tab);
                $last_due = $cron->getLastRanUnix();
                
                if ($last_due > $last_execution_time) {
                        return true;
                } else {
                        return false;
                }
        }
        
        function getLastExecutionTime($job_name) {
                
                if ( array_key_exists( $job_name, $this->lastExecutionTimeByJob ) ) {
                        return $this->lastExecutionTimeByJob[$job_name];
                } else {
                        return 0;
                }
        }
        
        /**
         * This function is happening in a while loop
         */
        function _doTask() {
                                
                if ( $this->isWorkerAvailable() ) {
                        
                        $jobs = $this->jobs;
                        
                        if ( $jobs ) {
                                $i = 0;
                                //for ($i = 0; $i < $available_workers; $i++) {
                                foreach ($jobs as $k => $job) {
                                        
                                        if ( $this->isAnotherWorkerAllowed( $job['name'], $job['max_processes'] ) && 
                                                 $this->isTimeForJob( $job['cron_tab'], $this->getLastExecutionTime( $job['name'] ) ) ) {
                                                // fork a new child
                                                $pid = pcntl_fork();
                                                if ( ! $pid ) {
                                                        // this part is executed in the child
                                                        owa_coreAPI::debug( 'New child process executing job ' . print_r( $job, true ) );
                                                        pcntl_exec( OWA_DIR.'cli.php', $job['cmd'] ); // takes an array of arguments
                                                        exit();
                                                } elseif ($pid == -1) {
                                                        // happens when something goes wrong and fork fails (handle errors here)
                                                        owa_coreAPI::debug( 'Could not fork new child' );
                                                } else {
                                                        // this part is executed in the parent
                                                        // We add pids to a global array, so that when we get a kill signal
                                                        // we tell the kids to flush and exit.
                                                        if ( array_key_exists( $k, $this->workerCountByJob ) ) {
                                                                $this->workerCountByJob[$k]++;
                                                        } else {
                                                                $this->workerCountByJob[$k] = 1;
                                                                $this->lastExecutionTimeByJob[$k] = time();
                                                                $this->jobsByPid[$pid] = $k;
                                                        }
                                                        
                                                        $this->pids[] = $pid;   
                                                }
                                        }                                                                       
                                }
                        }
                }
 
                // Collect any children which have exited on their own. pcntl_waitpid will
                // return the PID that exited or 0 or ERROR
                // WNOHANG means we won't sit here waiting if there's not a child ready
                // for us to reap immediately
                // -1 means any child
                $dead_and_gone = pcntl_waitpid( -1, $status, WNOHANG );
                
                while( $dead_and_gone > 0 ) {
                        // Remove the gone pid from the array
                        unset( $this->pids[array_search( $dead_and_gone, $this->pids )] );
                        $past_job = $this->jobsByPid[$dead_and_gone];
                        // decrement worker count
                        --$this->workerCountByJob[$past_job];
                        unset($this->jobsByPid[$dead_and_gone]);
                
                        // Look for another one
                        $dead_and_gone = pcntl_waitpid( -1, $status, WNOHANG);
                }
                
                owa_coreAPI::debug(sprintf(
                                "Daemon Statistics -- pidsByJob: %s, workerCountByJob: %s, lastExecutionTimeByJob: %s",
                                print_r( $this->pidsByJob, true),
                                print_r( $this->workerCountByJob, true),
                                print_r( $this->lastExecutiontimeByJob, true)
                ));
                
                // Sleep for some interval
                sleep($this->job_scheduling_interval);
        }
}
 
?>