Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
100.00% covered (success)
100.00%
92 / 92
100.00% covered (success)
100.00%
40 / 40
CRAP
100.00% covered (success)
100.00%
1 / 1
Queue
100.00% covered (success)
100.00%
92 / 92
100.00% covered (success)
100.00%
40 / 40
64
100.00% covered (success)
100.00%
1 / 1
 __construct
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 load
100.00% covered (success)
100.00%
19 / 19
100.00% covered (success)
100.00%
1 / 1
8
 getName
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 adapter
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 application
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasApplication
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 addWorker
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 addWorkers
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 getWorkers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasWorkers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 addScheduler
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 addSchedulers
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 getSchedulers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasSchedulers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 pushSchedulers
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
5
 pushWorkers
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
5
 pushAll
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 processSchedulers
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
3
 processWorkers
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
4
 processAll
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 isQueued
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
3
 isCompleted
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasFailed
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasJob
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getJob
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasJobs
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getJobs
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasCompletedJob
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getCompletedJob
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasCompletedJobs
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getCompletedJobs
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasFailedJob
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getFailedJob
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasFailedJobs
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getFailedJobs
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 clear
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 clearFailed
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 flush
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 flushFailed
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 flushAll
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2/**
3 * Pop PHP Framework (http://www.popphp.org/)
4 *
5 * @link       https://github.com/popphp/popphp-framework
6 * @author     Nick Sagona, III <dev@nolainteractive.com>
7 * @copyright  Copyright (c) 2009-2023 NOLA Interactive, LLC. (http://www.nolainteractive.com)
8 * @license    http://www.popphp.org/license     New BSD License
9 */
10
11/**
12 * @namespace
13 */
14namespace Pop\Queue;
15
16use Pop\Queue\Adapter\AdapterInterface;
17use Pop\Queue\Processor\Jobs;
18use Pop\Application;
19
20/**
21 * Queue class
22 *
23 * @category   Pop
24 * @package    Pop\Queue
25 * @author     Nick Sagona, III <dev@nolainteractive.com>
26 * @copyright  Copyright (c) 2009-2023 NOLA Interactive, LLC. (http://www.nolainteractive.com)
27 * @license    http://www.popphp.org/license     New BSD License
28 * @version    1.2.0
29 */
30class Queue
31{
32
33    /**
34     * Queue name
35     * @var string
36     */
37    protected $name = null;
38
39    /**
40     * Queue adapter
41     * @var AdapterInterface
42     */
43    protected $adapter = null;
44
45    /**
46     * Application object
47     * @var Application
48     */
49    protected $application = null;
50
51    /**
52     * Queue workers
53     * @var Processor\Worker[]
54     */
55    protected $workers = [];
56
57    /**
58     * Queue schedulers
59     * @var Processor\Scheduler[]
60     */
61    protected $schedulers = [];
62
63    /**
64     * Constructor
65     *
66     * Instantiate the queue object
67     *
68     * @param  string                   $name
69     * @param  Adapter\AdapterInterface $adapter
70     * @param  Application              $application
71     */
72    public function __construct($name, Adapter\AdapterInterface $adapter, Application $application = null)
73    {
74        $this->name        = $name;
75        $this->adapter     = $adapter;
76        $this->application = $application;
77    }
78
79    /**
80     * Load queue from adapter
81     *
82     * @param  string                   $name
83     * @param  Adapter\AdapterInterface $adapter
84     * @param  Application              $application
85     * @return Queue
86     */
87    public static function load($name, Adapter\AdapterInterface $adapter, Application $application = null)
88    {
89        $queue = new static($name, $adapter, $application);
90
91        if ($adapter->hasJobs($name)) {
92            $jobs       = $adapter->getJobs($name);
93            $fifoWorker = new Processor\Worker();
94            $filoWorker = new Processor\Worker(Processor\Worker::FILO);
95            $scheduler  = new Processor\Scheduler();
96
97            foreach ($jobs as $job) {
98                if ($job['payload'] instanceof Jobs\Schedule) {
99                    $scheduler->addSchedule($job['payload']);
100                } else if ($job['priority'] == Processor\Worker::FILO) {
101                    $filoWorker->addJob($job['payload']);
102                } else {
103                    $fifoWorker->addJob($job['payload']);
104                }
105            }
106
107            if ($scheduler->hasSchedules()) {
108                $queue->addScheduler($scheduler);
109            }
110            if ($fifoWorker->hasJobs()) {
111                $queue->addWorker($fifoWorker);
112            }
113            if ($filoWorker->hasJobs()) {
114                $queue->addWorker($filoWorker);
115            }
116        }
117
118        return $queue;
119    }
120
121    /**
122     * Get the queue name
123     *
124     * @return string
125     */
126    public function getName()
127    {
128        return $this->name;
129    }
130
131    /**
132     * Get the adapter
133     *
134     * @return AdapterInterface
135     */
136    public function adapter()
137    {
138        return $this->adapter;
139    }
140
141    /**
142     * Get the application
143     *
144     * @return Application
145     */
146    public function application()
147    {
148        return $this->application;
149    }
150
151    /**
152     * Has application
153     *
154     * @return boolean
155     */
156    public function hasApplication()
157    {
158        return (null !== $this->application);
159    }
160
161    /**
162     * Add a worker
163     *
164     * @param  Processor\Worker $worker
165     * @return Queue
166     */
167    public function addWorker(Processor\Worker $worker)
168    {
169        $this->workers[] = $worker;
170        return $this;
171    }
172
173    /**
174     * Add workers
175     *
176     * @param  array $workers
177     * @return Queue
178     */
179    public function addWorkers(array $workers)
180    {
181        foreach ($workers as $worker) {
182            $this->addWorker($worker);
183        }
184
185        return $this;
186    }
187
188    /**
189     * Get workers
190     *
191     * @return array
192     */
193    public function getWorkers()
194    {
195        return $this->workers;
196    }
197
198    /**
199     * Has workers
200     *
201     * @return boolean
202     */
203    public function hasWorkers()
204    {
205        return !empty($this->workers);
206    }
207
208    /**
209     * Add a scheduler
210     *
211     * @param  Processor\Scheduler $scheduler
212     * @return Queue
213     */
214    public function addScheduler(Processor\Scheduler $scheduler)
215    {
216        $this->schedulers[] = $scheduler;
217        return $this;
218    }
219
220    /**
221     * Add schedulers
222     *
223     * @param  array $schedulers
224     * @return Queue
225     */
226    public function addSchedulers(array $schedulers)
227    {
228        foreach ($schedulers as $scheduler) {
229            $this->addScheduler($scheduler);
230        }
231
232        return $this;
233    }
234
235    /**
236     * Get schedulers
237     *
238     * @return array
239     */
240    public function getSchedulers()
241    {
242        return $this->schedulers;
243    }
244
245    /**
246     * Has schedulers
247     *
248     * @return boolean
249     */
250    public function hasSchedulers()
251    {
252        return !empty($this->schedulers);
253    }
254
255    /**
256     * Push scheduled jobs to queue adapter
257     *
258     * @return array
259     */
260    public function pushSchedulers()
261    {
262        $pushed = [];
263
264        foreach ($this->schedulers as $scheduler) {
265            if ($scheduler->hasSchedules()) {
266                foreach ($scheduler->getSchedules() as $schedule) {
267                    $jobId = $this->adapter->push($this, $schedule);
268                    if (!empty($jobId)) {
269                        $pushed[$jobId] = $schedule->getJob()->getJobDescription();
270                    }
271                }
272            }
273        }
274
275        return $pushed;
276    }
277
278    /**
279     * Push worker jobs to queue adapter
280     *
281     * @return array
282     */
283    public function pushWorkers()
284    {
285        $pushed = [];
286
287        foreach ($this->workers as $worker) {
288            if ($worker->hasJobs()) {
289                foreach ($worker->getJobs() as $job) {
290                    $jobId = $this->adapter->push($this, $job, $worker->getPriority());
291                    if (!empty($jobId)) {
292                        $pushed[$jobId] = $job->getJobDescription();
293                    }
294                }
295            }
296        }
297
298        return $pushed;
299    }
300
301    /**
302     * Push all jobs to queue adapter
303     *
304     * @return array
305     */
306    public function pushAll()
307    {
308        $pushedScheduled = $this->pushSchedulers();
309        $pushedProcessed = $this->pushWorkers();
310
311        return $pushedScheduled + $pushedProcessed;
312    }
313
314    /**
315     * Process schedulers in the queue
316     *
317     * @return Queue
318     */
319    public function processSchedulers()
320    {
321        if ($this->hasSchedulers()) {
322            foreach ($this->schedulers as $scheduler) {
323                $scheduler->processNext($this);
324            }
325        }
326
327        return $this;
328    }
329
330    /**
331     * Process schedulers in the queue
332     *
333     * @return Queue
334     */
335    public function processWorkers()
336    {
337        if ($this->hasWorkers()) {
338            foreach ($this->workers as $worker) {
339                while ($worker->hasNextJob()) {
340                    $worker->processNext($this);
341                }
342            }
343        }
344
345        return $this;
346    }
347
348    /**
349     * Process all schedulers and workers in the queue
350     *
351     * @return Queue
352     */
353    public function processAll()
354    {
355        $this->processSchedulers();
356        $this->processWorkers();
357
358        return $this;
359    }
360
361    /**
362     * Check if job is queued, but hasn't run yet
363     *
364     * @param  mixed  $jobId
365     * @return boolean
366     */
367    public function isQueued($jobId)
368    {
369        return (($this->adapter->hasJob($jobId)) && (!$this->adapter->hasCompletedJob($jobId)) &&
370            (!$this->adapter->hasFailedJob($jobId)));
371    }
372
373    /**
374     * Check if job is completed (alias)
375     *
376     * @param  mixed  $jobId
377     * @return boolean
378     */
379    public function isCompleted($jobId)
380    {
381        return $this->adapter->hasCompletedJob($jobId);
382    }
383
384    /**
385     * Check if job has failed (alias)
386     *
387     * @param  mixed  $jobId
388     * @return boolean
389     */
390    public function hasFailed($jobId)
391    {
392        return $this->adapter->hasFailedJob($jobId);
393    }
394
395    /**
396     * Check if queue has job
397     *
398     * @param  mixed $jobId
399     * @return boolean
400     */
401    public function hasJob($jobId)
402    {
403        return $this->adapter->hasJob($jobId);
404    }
405
406    /**
407     * Get job
408     *
409     * @param  mixed  $jobId
410     * @param  boolean $unserialize
411     * @return array
412     */
413    public function getJob($jobId, $unserialize = true)
414    {
415        return $this->adapter->getJob($jobId, $unserialize);
416    }
417
418    /**
419     * Check if queue has jobs
420     *
421     * @return boolean
422     */
423    public function hasJobs()
424    {
425        return $this->adapter->hasJobs($this->name);
426    }
427
428    /**
429     * Get queue jobs
430     *
431     * @return array
432     */
433    public function getJobs()
434    {
435        return $this->adapter->getJobs($this->name);
436    }
437
438    /**
439     * Check if queue has completed job
440     *
441     * @param  mixed $jobId
442     * @return boolean
443     */
444    public function hasCompletedJob($jobId)
445    {
446        return $this->adapter->hasCompletedJob($jobId);
447    }
448
449    /**
450     * Get completed job
451     *
452     * @param  mixed  $jobId
453     * @param  boolean $unserialize
454     * @return array
455     */
456    public function getCompletedJob($jobId, $unserialize = true)
457    {
458        return $this->adapter->getCompletedJob($jobId, $unserialize);
459    }
460
461    /**
462     * Check if queue has completed jobs
463     *
464     * @return boolean
465     */
466    public function hasCompletedJobs()
467    {
468        return $this->adapter->hasCompletedJobs($this->name);
469    }
470
471    /**
472     * Get queue completed jobs
473     *
474     * @return array
475     */
476    public function getCompletedJobs()
477    {
478        return $this->adapter->getCompletedJobs($this->name);
479    }
480
481    /**
482     * Check if queue has failed job
483     *
484     * @param  mixed $jobId
485     * @return boolean
486     */
487    public function hasFailedJob($jobId)
488    {
489        return $this->adapter->hasFailedJob($jobId);
490    }
491
492    /**
493     * Get failed job
494     *
495     * @param  mixed  $jobId
496     * @param  boolean $unserialize
497     * @return array
498     */
499    public function getFailedJob($jobId, $unserialize = true)
500    {
501        return $this->adapter->getFailedJob($jobId, $unserialize);
502    }
503
504    /**
505     * Check if queue adapter has failed jobs
506     *
507     * @return boolean
508     */
509    public function hasFailedJobs()
510    {
511        return $this->adapter->hasFailedJobs($this->name);
512    }
513
514    /**
515     * Get queue failed jobs
516     *
517     * @return array
518     */
519    public function getFailedJobs()
520    {
521        return $this->adapter->getFailedJobs($this->name);
522    }
523
524    /**
525     * Clear jobs off of the queue stack
526     *
527     * @param  boolean $all
528     * @return void
529     */
530    public function clear($all = false)
531    {
532        $this->adapter->clear($this->name, $all);
533    }
534
535    /**
536     * Clear failed jobs off of the queue stack
537     *
538     * @return void
539     */
540    public function clearFailed()
541    {
542        $this->adapter->clearFailed($this->name);
543    }
544
545    /**
546     * Flush all jobs off of the queue stack
547     *
548     * @param  boolean $all
549     * @return void
550     */
551    public function flush($all = false)
552    {
553        $this->adapter->flush($all);
554    }
555
556    /**
557     * Flush all failed jobs off of the queue stack
558     *
559     * @return void
560     */
561    public function flushFailed()
562    {
563        $this->adapter->flushFailed();
564    }
565
566    /**
567     * Flush all pop queue items
568     *
569     * @return void
570     */
571    public function flushAll()
572    {
573        $this->adapter->flushAll();
574    }
575
576}