Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
88.31% |
68 / 77 |
|
88.24% |
15 / 17 |
CRAP | |
0.00% |
0 / 1 |
Queue | |
88.31% |
68 / 77 |
|
88.24% |
15 / 17 |
41.43 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
2 | |||
create | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
setPriority | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
getPriority | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
isFifo | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
isFilo | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
isLilo | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
isLifo | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
addJob | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
2 | |||
addJobs | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
addTask | |
83.33% |
5 / 6 |
|
0.00% |
0 / 1 |
3.04 | |||
addTasks | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
work | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
4 | |||
run | |
76.47% |
26 / 34 |
|
0.00% |
0 / 1 |
15.20 | |||
clear | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
clearFailed | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
clearTasks | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 |
1 | <?php |
2 | /** |
3 | * Pop PHP Framework (http://www.popphp.org/) |
4 | * |
5 | * @link https://github.com/popphp/popphp-framework |
6 | * @author Nick Sagona, III <dev@nolainteractive.com> |
7 | * @copyright Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com) |
8 | * @license http://www.popphp.org/license New BSD License |
9 | */ |
10 | |
11 | /** |
12 | * @namespace |
13 | */ |
14 | namespace Pop\Queue; |
15 | |
16 | use Pop\Application; |
17 | use Pop\Queue\Adapter\AdapterInterface; |
18 | use Pop\Queue\Adapter\TaskAdapterInterface; |
19 | use Pop\Queue\Process\AbstractJob; |
20 | use Pop\Queue\Process\Task; |
21 | |
22 | /** |
23 | * Queue class |
24 | * |
25 | * @category Pop |
26 | * @package Pop\Queue |
27 | * @author Nick Sagona, III <dev@nolainteractive.com> |
28 | * @copyright Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com) |
29 | * @license http://www.popphp.org/license New BSD License |
30 | * @version 2.0.0 |
31 | */ |
32 | class Queue extends AbstractQueue |
33 | { |
34 | |
35 | /** |
36 | * Queue priority constants |
37 | */ |
38 | const FIFO = 'FIFO'; // Same as LILO |
39 | const FILO = 'FILO'; // Same as LIFO |
40 | |
41 | /** |
42 | * Constructor |
43 | * |
44 | * Instantiate the queue object |
45 | * |
46 | * @param string $name |
47 | * @param AdapterInterface|TaskAdapterInterface $adapter |
48 | * @param ?string $priority |
49 | */ |
50 | public function __construct(string $name, AdapterInterface|TaskAdapterInterface $adapter, ?string $priority = null) |
51 | { |
52 | $this->setName($name); |
53 | $this->setAdapter($adapter); |
54 | if ($priority !== null) { |
55 | $this->setPriority($priority); |
56 | } |
57 | } |
58 | |
59 | /** |
60 | * Create the queue object |
61 | * |
62 | * @param string $name |
63 | * @param AdapterInterface|TaskAdapterInterface $adapter |
64 | * @param ?string $priority |
65 | * @return Queue |
66 | */ |
67 | public static function create( |
68 | string $name, AdapterInterface|TaskAdapterInterface $adapter, ?string $priority = null |
69 | ): Queue |
70 | { |
71 | return new self($name, $adapter, $priority); |
72 | } |
73 | |
74 | /** |
75 | * Set queue priority |
76 | * |
77 | * @param string $priority |
78 | * @return Queue |
79 | */ |
80 | public function setPriority(string $priority = 'FIFO'): Queue |
81 | { |
82 | $this->adapter->setPriority($priority); |
83 | return $this; |
84 | } |
85 | |
86 | /** |
87 | * Get queue priority |
88 | * |
89 | * @return string |
90 | */ |
91 | public function getPriority(): string |
92 | { |
93 | return $this->adapter->getPriority(); |
94 | } |
95 | |
96 | /** |
97 | * Is FIFO |
98 | * |
99 | * @return bool |
100 | */ |
101 | public function isFifo(): bool |
102 | { |
103 | return $this->adapter->isFifo(); |
104 | } |
105 | |
106 | /** |
107 | * Is FILO |
108 | * |
109 | * @return bool |
110 | */ |
111 | public function isFilo(): bool |
112 | { |
113 | return $this->adapter->isFilo(); |
114 | } |
115 | |
116 | /** |
117 | * Is LILO (alias to FIFO) |
118 | * |
119 | * @return bool |
120 | */ |
121 | public function isLilo(): bool |
122 | { |
123 | return $this->adapter->isLilo(); |
124 | } |
125 | |
126 | /** |
127 | * Is LIFO (alias to FILO) |
128 | * |
129 | * @return bool |
130 | */ |
131 | public function isLifo(): bool |
132 | { |
133 | return $this->adapter->isLifo(); |
134 | } |
135 | |
136 | /** |
137 | * Add job |
138 | * |
139 | * @param AbstractJob $job |
140 | * @param ?int $maxAttempts |
141 | * @return Queue |
142 | */ |
143 | public function addJob(AbstractJob $job, ?int $maxAttempts = null): Queue |
144 | { |
145 | if ($maxAttempts !== null) { |
146 | $job->setMaxAttempts($maxAttempts); |
147 | } |
148 | $this->adapter->push($job); |
149 | |
150 | return $this; |
151 | } |
152 | |
153 | /** |
154 | * Add jobs |
155 | * |
156 | * @param array $jobs |
157 | * @param ?int $maxAttempts |
158 | * @return Queue |
159 | */ |
160 | public function addJobs(array $jobs, ?int $maxAttempts = null): Queue |
161 | { |
162 | foreach ($jobs as $job) { |
163 | $this->addJob($job, $maxAttempts); |
164 | } |
165 | return $this; |
166 | } |
167 | |
168 | /** |
169 | * Add task (alias) |
170 | * |
171 | * @param Task $task |
172 | * @param ?int $maxAttempts |
173 | * @throws Exception |
174 | * @return Queue |
175 | */ |
176 | public function addTask(Task $task, ?int $maxAttempts = null): Queue |
177 | { |
178 | if (!($this->adapter instanceof TaskAdapterInterface)) { |
179 | throw new Exception('Error: That queue adapter does not support scheduled tasks'); |
180 | } |
181 | if ($maxAttempts !== null) { |
182 | $task->setMaxAttempts($maxAttempts); |
183 | } |
184 | |
185 | $this->adapter->schedule($task); |
186 | |
187 | return $this; |
188 | } |
189 | |
190 | /** |
191 | * Add tasks |
192 | * |
193 | * @param array $tasks |
194 | * @param ?int $maxAttempts |
195 | * @throws Exception |
196 | * @return Queue |
197 | */ |
198 | public function addTasks(array $tasks, ?int $maxAttempts = null): Queue |
199 | { |
200 | foreach ($tasks as $task) { |
201 | $this->addTask($task, $maxAttempts); |
202 | } |
203 | return $this; |
204 | } |
205 | |
206 | /** |
207 | * Work next job |
208 | * |
209 | * @param ?Application $application |
210 | * @return ?AbstractJob |
211 | */ |
212 | public function work(?Application $application = null): ?AbstractJob |
213 | { |
214 | $job = $this->adapter->pop(); |
215 | |
216 | if (($job instanceof AbstractJob) && ($job->isValid())) { |
217 | try { |
218 | $job->run($application); |
219 | $job->complete(); |
220 | } catch (\Exception $e) { |
221 | $job->failed($e->getMessage()); |
222 | $this->adapter->push($job); |
223 | } |
224 | } |
225 | |
226 | return $job; |
227 | } |
228 | |
229 | /** |
230 | * Run schedule |
231 | * |
232 | * @param ?Application $application |
233 | * @throws Process\Exception |
234 | * @return array |
235 | */ |
236 | public function run(?Application $application = null): array |
237 | { |
238 | $tasks = []; |
239 | |
240 | if (($this->adapter instanceof TaskAdapterInterface) && ($this->adapter->hasTasks())) { |
241 | $taskIds = $this->adapter->getTasks(); |
242 | foreach ($taskIds as $taskId) { |
243 | $task = $this->adapter->getTask($taskId); |
244 | if ($task instanceof Task) { |
245 | $isSubMinute = ($task->cron()->hasSeconds()); |
246 | $scheduleCheck = $task->cron()->evaluate(); |
247 | if ($isSubMinute) { |
248 | $timer = 0; |
249 | while ($timer < 60) { |
250 | if (($task->isValid()) && ($scheduleCheck)) { |
251 | $task->__wakeup(); |
252 | try { |
253 | $task->run($application); |
254 | $task->complete(); |
255 | $tasks[$task->getJobId()] = $task; |
256 | } catch (\Exception $e) { |
257 | $task->failed($e->getMessage()); |
258 | $this->adapter->removeTask($taskId); |
259 | $this->adapter->schedule($task); |
260 | $tasks[$task->getJobId()] = $task; |
261 | } |
262 | } |
263 | sleep(1); |
264 | $scheduleCheck = $task->cron()->evaluate(); |
265 | $timer++; |
266 | } |
267 | } else if (($task->isValid()) && ($scheduleCheck)) { |
268 | try { |
269 | $task->run($application); |
270 | $task->complete(); |
271 | $this->adapter->updateTask($task); |
272 | $tasks[$task->getJobId()] = $task; |
273 | } catch (\Exception $e) { |
274 | $task->failed($e->getMessage()); |
275 | $this->adapter->updateTask($task); |
276 | $tasks[$task->getJobId()] = $task; |
277 | } |
278 | } |
279 | } |
280 | } |
281 | } |
282 | |
283 | return $tasks; |
284 | } |
285 | |
286 | /** |
287 | * Clear jobs from queue |
288 | * |
289 | * @return Queue |
290 | */ |
291 | public function clear(): Queue |
292 | { |
293 | $this->adapter->clear(); |
294 | return $this; |
295 | } |
296 | |
297 | /** |
298 | * Clear failed jobs from queue |
299 | * |
300 | * @return Queue |
301 | */ |
302 | public function clearFailed(): Queue |
303 | { |
304 | $this->adapter->clearFailed(); |
305 | return $this; |
306 | } |
307 | |
308 | /** |
309 | * Clear tasks from queue |
310 | * |
311 | * @return Queue |
312 | */ |
313 | public function clearTasks(): Queue |
314 | { |
315 | if ($this->adapter instanceof TaskAdapterInterface) { |
316 | $this->adapter->clearTasks(); |
317 | } |
318 | return $this; |
319 | } |
320 | |
321 | } |