Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
88.31% covered (success)
88.31%
68 / 77
88.24% covered (success)
88.24%
15 / 17
CRAP
0.00% covered (danger)
0.00%
0 / 1
Queue
88.31% covered (success)
88.31%
68 / 77
88.24% covered (success)
88.24%
15 / 17
41.43
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 create
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setPriority
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getPriority
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 isFifo
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 isFilo
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 isLilo
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 isLifo
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 addJob
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 addJobs
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 addTask
83.33% covered (success)
83.33%
5 / 6
0.00% covered (danger)
0.00%
0 / 1
3.04
 addTasks
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 work
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
4
 run
76.47% covered (success)
76.47%
26 / 34
0.00% covered (danger)
0.00%
0 / 1
15.20
 clear
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 clearFailed
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 clearTasks
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
1<?php
2/**
3 * Pop PHP Framework (http://www.popphp.org/)
4 *
5 * @link       https://github.com/popphp/popphp-framework
6 * @author     Nick Sagona, III <dev@nolainteractive.com>
7 * @copyright  Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com)
8 * @license    http://www.popphp.org/license     New BSD License
9 */
10
11/**
12 * @namespace
13 */
14namespace Pop\Queue;
15
16use Pop\Application;
17use Pop\Queue\Adapter\AdapterInterface;
18use Pop\Queue\Adapter\TaskAdapterInterface;
19use Pop\Queue\Process\AbstractJob;
20use Pop\Queue\Process\Task;
21
22/**
23 * Queue class
24 *
25 * @category   Pop
26 * @package    Pop\Queue
27 * @author     Nick Sagona, III <dev@nolainteractive.com>
28 * @copyright  Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com)
29 * @license    http://www.popphp.org/license     New BSD License
30 * @version    2.0.0
31 */
32class Queue extends AbstractQueue
33{
34
35    /**
36     * Queue priority constants
37     */
38    const FIFO = 'FIFO'; // Same as LILO
39    const FILO = 'FILO'; // Same as LIFO
40
41    /**
42     * Constructor
43     *
44     * Instantiate the queue object
45     *
46     * @param string $name
47     * @param AdapterInterface|TaskAdapterInterface $adapter
48     * @param ?string $priority
49     */
50    public function __construct(string $name, AdapterInterface|TaskAdapterInterface $adapter, ?string $priority = null)
51    {
52        $this->setName($name);
53        $this->setAdapter($adapter);
54        if ($priority !== null) {
55            $this->setPriority($priority);
56        }
57    }
58
59    /**
60     * Create the queue object
61     *
62     * @param  string $name
63     * @param  AdapterInterface|TaskAdapterInterface $adapter
64     * @param  ?string $priority
65     * @return Queue
66     */
67    public static function create(
68        string $name, AdapterInterface|TaskAdapterInterface $adapter, ?string $priority = null
69    ): Queue
70    {
71        return new self($name, $adapter, $priority);
72    }
73
74    /**
75     * Set queue priority
76     *
77     * @param  string $priority
78     * @return Queue
79     */
80    public function setPriority(string $priority = 'FIFO'): Queue
81    {
82        $this->adapter->setPriority($priority);
83        return $this;
84    }
85
86    /**
87     * Get queue priority
88     *
89     * @return string
90     */
91    public function getPriority(): string
92    {
93        return $this->adapter->getPriority();
94    }
95
96    /**
97     * Is FIFO
98     *
99     * @return bool
100     */
101    public function isFifo(): bool
102    {
103        return $this->adapter->isFifo();
104    }
105
106    /**
107     * Is FILO
108     *
109     * @return bool
110     */
111    public function isFilo(): bool
112    {
113        return $this->adapter->isFilo();
114    }
115
116    /**
117     * Is LILO (alias to FIFO)
118     *
119     * @return bool
120     */
121    public function isLilo(): bool
122    {
123        return $this->adapter->isLilo();
124    }
125
126    /**
127     * Is LIFO (alias to FILO)
128     *
129     * @return bool
130     */
131    public function isLifo(): bool
132    {
133        return $this->adapter->isLifo();
134    }
135
136    /**
137     * Add job
138     *
139     * @param  AbstractJob $job
140     * @param  ?int        $maxAttempts
141     * @return Queue
142     */
143    public function addJob(AbstractJob $job, ?int $maxAttempts = null): Queue
144    {
145        if ($maxAttempts !== null) {
146            $job->setMaxAttempts($maxAttempts);
147        }
148        $this->adapter->push($job);
149
150        return $this;
151    }
152
153    /**
154     * Add jobs
155     *
156     * @param  array $jobs
157     * @param  ?int  $maxAttempts
158     * @return Queue
159     */
160    public function addJobs(array $jobs, ?int $maxAttempts = null): Queue
161    {
162        foreach ($jobs as $job) {
163            $this->addJob($job, $maxAttempts);
164        }
165        return $this;
166    }
167
168    /**
169     * Add task (alias)
170     *
171     * @param  Task $task
172     * @param  ?int $maxAttempts
173     * @throws Exception
174     * @return Queue
175     */
176    public function addTask(Task $task, ?int $maxAttempts = null): Queue
177    {
178        if (!($this->adapter instanceof TaskAdapterInterface)) {
179            throw new Exception('Error: That queue adapter does not support scheduled tasks');
180        }
181        if ($maxAttempts !== null) {
182            $task->setMaxAttempts($maxAttempts);
183        }
184
185        $this->adapter->schedule($task);
186
187        return $this;
188    }
189
190    /**
191     * Add tasks
192     *
193     * @param  array $tasks
194     * @param  ?int  $maxAttempts
195     * @throws Exception
196     * @return Queue
197     */
198    public function addTasks(array $tasks, ?int $maxAttempts = null): Queue
199    {
200        foreach ($tasks as $task) {
201            $this->addTask($task, $maxAttempts);
202        }
203        return $this;
204    }
205
206    /**
207     * Work next job
208     *
209     * @param  ?Application $application
210     * @return ?AbstractJob
211     */
212    public function work(?Application $application = null): ?AbstractJob
213    {
214        $job = $this->adapter->pop();
215
216        if (($job instanceof AbstractJob) && ($job->isValid())) {
217            try {
218                $job->run($application);
219                $job->complete();
220            } catch (\Exception $e) {
221                $job->failed($e->getMessage());
222                $this->adapter->push($job);
223            }
224        }
225
226        return $job;
227    }
228
229    /**
230     * Run schedule
231     *
232     * @param  ?Application $application
233     * @throws Process\Exception
234     * @return array
235     */
236    public function run(?Application $application = null): array
237    {
238        $tasks = [];
239
240        if (($this->adapter instanceof TaskAdapterInterface) && ($this->adapter->hasTasks())) {
241            $taskIds = $this->adapter->getTasks();
242            foreach ($taskIds as $taskId) {
243                $task = $this->adapter->getTask($taskId);
244                if ($task instanceof Task) {
245                    $isSubMinute   = ($task->cron()->hasSeconds());
246                    $scheduleCheck = $task->cron()->evaluate();
247                    if ($isSubMinute) {
248                        $timer = 0;
249                        while ($timer < 60) {
250                            if (($task->isValid()) && ($scheduleCheck)) {
251                                $task->__wakeup();
252                                try {
253                                    $task->run($application);
254                                    $task->complete();
255                                    $tasks[$task->getJobId()] = $task;
256                                } catch (\Exception $e) {
257                                    $task->failed($e->getMessage());
258                                    $this->adapter->removeTask($taskId);
259                                    $this->adapter->schedule($task);
260                                    $tasks[$task->getJobId()] = $task;
261                                }
262                            }
263                            sleep(1);
264                            $scheduleCheck = $task->cron()->evaluate();
265                            $timer++;
266                        }
267                    } else if (($task->isValid()) && ($scheduleCheck)) {
268                        try {
269                            $task->run($application);
270                            $task->complete();
271                            $this->adapter->updateTask($task);
272                            $tasks[$task->getJobId()] = $task;
273                        } catch (\Exception $e) {
274                            $task->failed($e->getMessage());
275                            $this->adapter->updateTask($task);
276                            $tasks[$task->getJobId()] = $task;
277                        }
278                    }
279                }
280            }
281        }
282
283        return $tasks;
284    }
285
286    /**
287     * Clear jobs from queue
288     *
289     * @return Queue
290     */
291    public function clear(): Queue
292    {
293        $this->adapter->clear();
294        return $this;
295    }
296
297    /**
298     * Clear failed jobs from queue
299     *
300     * @return Queue
301     */
302    public function clearFailed(): Queue
303    {
304        $this->adapter->clearFailed();
305        return $this;
306    }
307
308    /**
309     * Clear tasks from queue
310     *
311     * @return Queue
312     */
313    public function clearTasks(): Queue
314    {
315        if ($this->adapter instanceof TaskAdapterInterface) {
316            $this->adapter->clearTasks();
317        }
318        return $this;
319    }
320
321}