Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
100.00% |
227 / 227 |
|
100.00% |
27 / 27 |
CRAP | |
100.00% |
1 / 1 |
Db | |
100.00% |
227 / 227 |
|
100.00% |
27 / 27 |
67 | |
100.00% |
1 / 1 |
__construct | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
3 | |||
hasJob | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
getJob | |
100.00% |
12 / 12 |
|
100.00% |
1 / 1 |
4 | |||
updateJob | |
100.00% |
18 / 18 |
|
100.00% |
1 / 1 |
6 | |||
hasJobs | |
100.00% |
9 / 9 |
|
100.00% |
1 / 1 |
2 | |||
getJobs | |
100.00% |
13 / 13 |
|
100.00% |
1 / 1 |
4 | |||
hasCompletedJob | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
1 | |||
hasCompletedJobs | |
100.00% |
9 / 9 |
|
100.00% |
1 / 1 |
2 | |||
getCompletedJob | |
100.00% |
14 / 14 |
|
100.00% |
1 / 1 |
4 | |||
getCompletedJobs | |
100.00% |
13 / 13 |
|
100.00% |
1 / 1 |
4 | |||
hasFailedJob | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
getFailedJob | |
100.00% |
12 / 12 |
|
100.00% |
1 / 1 |
3 | |||
hasFailedJobs | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
2 | |||
getFailedJobs | |
100.00% |
11 / 11 |
|
100.00% |
1 / 1 |
4 | |||
push | |
100.00% |
13 / 13 |
|
100.00% |
1 / 1 |
6 | |||
failed | |
100.00% |
12 / 12 |
|
100.00% |
1 / 1 |
5 | |||
pop | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
1 | |||
clear | |
100.00% |
9 / 9 |
|
100.00% |
1 / 1 |
3 | |||
clearFailed | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
2 | |||
flush | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
flushFailed | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
flushAll | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
db | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getTable | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getFailedTable | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
createTable | |
100.00% |
12 / 12 |
|
100.00% |
1 / 1 |
1 | |||
createFailedTable | |
100.00% |
11 / 11 |
|
100.00% |
1 / 1 |
1 |
1 | <?php |
2 | /** |
3 | * Pop PHP Framework (http://www.popphp.org/) |
4 | * |
5 | * @link https://github.com/popphp/popphp-framework |
6 | * @author Nick Sagona, III <dev@nolainteractive.com> |
7 | * @copyright Copyright (c) 2009-2023 NOLA Interactive, LLC. (http://www.nolainteractive.com) |
8 | * @license http://www.popphp.org/license New BSD License |
9 | */ |
10 | |
11 | /** |
12 | * @namespace |
13 | */ |
14 | namespace Pop\Queue\Adapter; |
15 | |
16 | use Pop\Db\Adapter\AbstractAdapter as DbAdapter; |
17 | use Pop\Queue\Queue; |
18 | use Pop\Queue\Processor\Jobs; |
19 | |
20 | /** |
21 | * Database queue adapter class |
22 | * |
23 | * @category Pop |
24 | * @package Pop\Queue |
25 | * @author Nick Sagona, III <dev@nolainteractive.com> |
26 | * @copyright Copyright (c) 2009-2023 NOLA Interactive, LLC. (http://www.nolainteractive.com) |
27 | * @license http://www.popphp.org/license New BSD License |
28 | * @version 1.2.0 |
29 | */ |
30 | class Db extends AbstractAdapter |
31 | { |
32 | |
33 | /** |
34 | * Database adapter |
35 | * @var DbAdapter |
36 | */ |
37 | protected $db = null; |
38 | |
39 | /** |
40 | * Job table |
41 | * @var string |
42 | */ |
43 | protected $table = null; |
44 | |
45 | /** |
46 | * Failed job table |
47 | * @var string |
48 | */ |
49 | protected $failedTable = null; |
50 | |
51 | /** |
52 | * Constructor |
53 | * |
54 | * Instantiate the database queue object |
55 | * |
56 | * @param DbAdapter $db |
57 | * @param string $table |
58 | * @param string $failedTable |
59 | */ |
60 | public function __construct(DbAdapter $db, $table = 'pop_queue_jobs', $failedTable = 'pop_queue_failed_jobs') |
61 | { |
62 | $this->db = $db; |
63 | $this->table = $table; |
64 | $this->failedTable = $failedTable; |
65 | |
66 | if (!$this->db->hasTable($table)) { |
67 | $this->createTable($table); |
68 | } |
69 | if (!$this->db->hasTable($failedTable)) { |
70 | $this->createFailedTable($failedTable); |
71 | } |
72 | } |
73 | |
74 | /** |
75 | * Check if queue stack has job |
76 | * |
77 | * @param mixed $jobId |
78 | * @return boolean |
79 | */ |
80 | public function hasJob($jobId) |
81 | { |
82 | $sql = $this->db->createSql(); |
83 | $sql->select()->from($this->table)->where('job_id = :job_id'); |
84 | |
85 | $this->db->prepare($sql); |
86 | $this->db->bindParams(['job_id' => $jobId]); |
87 | $this->db->execute(); |
88 | |
89 | return (count($this->db->fetchAll()) > 0); |
90 | } |
91 | |
92 | /** |
93 | * Get job from queue stack by job ID |
94 | * |
95 | * @param mixed $jobId |
96 | * @param boolean $unserialize |
97 | * @return array |
98 | */ |
99 | public function getJob($jobId, $unserialize = true) |
100 | { |
101 | $sql = $this->db->createSql(); |
102 | $sql->select()->from($this->table)->where('job_id = :job_id')->limit(1); |
103 | |
104 | $this->db->prepare($sql); |
105 | $this->db->bindParams(['job_id' => $jobId]); |
106 | $this->db->execute(); |
107 | |
108 | $rows = $this->db->fetchAll(); |
109 | $row = null; |
110 | |
111 | if (isset($rows[0])) { |
112 | $row = $rows[0]; |
113 | if (($unserialize) && isset($row['payload'])) { |
114 | $row['payload'] = unserialize(base64_decode($row['payload'])); |
115 | } |
116 | } |
117 | |
118 | return $row; |
119 | } |
120 | |
121 | /** |
122 | * Update job from queue stack by job ID |
123 | * |
124 | * @param mixed $jobId |
125 | * @param mixed $completed |
126 | * @param mixed $increment |
127 | * @return void |
128 | */ |
129 | public function updateJob($jobId, $completed = false, $increment = false) |
130 | { |
131 | $jobRecord = $this->getJob($jobId); |
132 | $values = []; |
133 | $params = []; |
134 | |
135 | if ($completed !== false) { |
136 | $values['completed'] = ':completed'; |
137 | $params['completed'] = ($completed === true) ? date('Y-m-d H:i:s') : $completed; |
138 | } |
139 | if ($increment !== false) { |
140 | $values['attempts'] = ':attempts'; |
141 | if (($increment === true) && isset($jobRecord['attempts'])) { |
142 | $jobRecord['attempts']++; |
143 | $values['attempts'] = $jobRecord['attempts']; |
144 | } else { |
145 | $params['attempts'] = (int)$increment; |
146 | } |
147 | } |
148 | |
149 | $params['job_id'] = $jobId; |
150 | |
151 | $sql = $this->db->createSql(); |
152 | $sql->update($this->table)->values($values)->where('job_id = :job_id'); |
153 | |
154 | $this->db->prepare($sql); |
155 | $this->db->bindParams($params); |
156 | $this->db->execute(); |
157 | } |
158 | |
159 | /** |
160 | * Check if queue has jobs |
161 | * |
162 | * @param mixed $queue |
163 | * @return boolean |
164 | */ |
165 | public function hasJobs($queue) |
166 | { |
167 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
168 | |
169 | $sql = $this->db->createSql(); |
170 | $sql->select()->from($this->table) |
171 | ->where('queue = :queue') |
172 | ->where('completed IS NULL'); |
173 | |
174 | $this->db->prepare($sql); |
175 | $this->db->bindParams(['queue' => $queueName]); |
176 | $this->db->execute(); |
177 | |
178 | return (count($this->db->fetchAll()) > 0); |
179 | } |
180 | |
181 | /** |
182 | * Get queue jobs |
183 | * |
184 | * @param mixed $queue |
185 | * @param boolean $unserialize |
186 | * @return array |
187 | */ |
188 | public function getJobs($queue, $unserialize = true) |
189 | { |
190 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
191 | |
192 | $sql = $this->db->createSql(); |
193 | $sql->select()->from($this->table) |
194 | ->where('queue = :queue') |
195 | ->where('completed IS NULL'); |
196 | |
197 | $this->db->prepare($sql); |
198 | $this->db->bindParams(['queue' => $queueName]); |
199 | $this->db->execute(); |
200 | |
201 | $rows = $this->db->fetchAll(); |
202 | |
203 | if ($unserialize) { |
204 | foreach ($rows as $i => $row) { |
205 | $rows[$i]['payload'] = unserialize(base64_decode($row['payload'])); |
206 | } |
207 | } |
208 | |
209 | return $rows; |
210 | } |
211 | |
212 | /** |
213 | * Check if queue stack has completed job |
214 | * |
215 | * @param mixed $jobId |
216 | * @return boolean |
217 | */ |
218 | public function hasCompletedJob($jobId) |
219 | { |
220 | $sql = $this->db->createSql(); |
221 | $sql->select()->from($this->table) |
222 | ->where('job_id = :job_id') |
223 | ->where('completed IS NOT NULL'); |
224 | |
225 | $this->db->prepare($sql); |
226 | $this->db->bindParams(['job_id' => $jobId]); |
227 | $this->db->execute(); |
228 | |
229 | return (count($this->db->fetchAll()) > 0); |
230 | } |
231 | |
232 | /** |
233 | * Check if queue has completed jobs |
234 | * |
235 | * @param mixed $queue |
236 | * @return boolean |
237 | */ |
238 | public function hasCompletedJobs($queue) |
239 | { |
240 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
241 | |
242 | $sql = $this->db->createSql(); |
243 | $sql->select()->from($this->table) |
244 | ->where('queue = :queue') |
245 | ->where('completed IS NOT NULL'); |
246 | |
247 | $this->db->prepare($sql); |
248 | $this->db->bindParams(['queue' => $queueName]); |
249 | $this->db->execute(); |
250 | |
251 | return (count($this->db->fetchAll()) > 0); |
252 | } |
253 | |
254 | /** |
255 | * Get queue completed job |
256 | * |
257 | * @param mixed $jobId |
258 | * @param boolean $unserialize |
259 | * @return array |
260 | */ |
261 | public function getCompletedJob($jobId, $unserialize = true) |
262 | { |
263 | $sql = $this->db->createSql(); |
264 | $sql->select()->from($this->table) |
265 | ->where('job_id = :job_id') |
266 | ->where('completed IS NOT NULL')->limit(1); |
267 | |
268 | $this->db->prepare($sql); |
269 | $this->db->bindParams(['job_id' => $jobId]); |
270 | $this->db->execute(); |
271 | |
272 | $rows = $this->db->fetchAll(); |
273 | $row = null; |
274 | |
275 | if (isset($rows[0])) { |
276 | $row = $rows[0]; |
277 | if (($unserialize) && isset($row['payload'])) { |
278 | $row['payload'] = unserialize(base64_decode($row['payload'])); |
279 | } |
280 | } |
281 | |
282 | return $row; |
283 | } |
284 | |
285 | /** |
286 | * Get queue completed jobs |
287 | * |
288 | * @param mixed $queue |
289 | * @param boolean $unserialize |
290 | * @return array |
291 | */ |
292 | public function getCompletedJobs($queue, $unserialize = true) |
293 | { |
294 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
295 | |
296 | $sql = $this->db->createSql(); |
297 | $sql->select()->from($this->table) |
298 | ->where('queue = :queue') |
299 | ->where('completed IS NOT NULL'); |
300 | |
301 | $this->db->prepare($sql); |
302 | $this->db->bindParams(['queue' => $queueName]); |
303 | $this->db->execute(); |
304 | |
305 | $rows = $this->db->fetchAll(); |
306 | |
307 | if ($unserialize) { |
308 | foreach ($rows as $i => $row) { |
309 | $rows[$i]['payload'] = unserialize(base64_decode($row['payload'])); |
310 | } |
311 | } |
312 | |
313 | return $rows; |
314 | } |
315 | |
316 | /** |
317 | * Check if queue stack has failed job |
318 | * |
319 | * @param mixed $jobId |
320 | * @return boolean |
321 | */ |
322 | public function hasFailedJob($jobId) |
323 | { |
324 | $sql = $this->db->createSql(); |
325 | $sql->select()->from($this->failedTable)->where('job_id = :job_id'); |
326 | |
327 | $this->db->prepare($sql); |
328 | $this->db->bindParams(['job_id' => $jobId]); |
329 | $this->db->execute(); |
330 | |
331 | return (count($this->db->fetchAll()) > 0); |
332 | } |
333 | |
334 | /** |
335 | * Get failed job from queue stack by job ID |
336 | * |
337 | * @param mixed $jobId |
338 | * @param boolean $unserialize |
339 | * @return array |
340 | */ |
341 | public function getFailedJob($jobId, $unserialize = true) |
342 | { |
343 | $sql = $this->db->createSql(); |
344 | $sql->select()->from($this->failedTable)->where('job_id = :job_id')->limit(1); |
345 | |
346 | $this->db->prepare($sql); |
347 | $this->db->bindParams(['job_id' => $jobId]); |
348 | $this->db->execute(); |
349 | |
350 | $rows = $this->db->fetchAll(); |
351 | $row = null; |
352 | |
353 | if (isset($rows[0])) { |
354 | $row = $rows[0]; |
355 | if ($unserialize) { |
356 | $row['payload'] = unserialize(base64_decode($row['payload'])); |
357 | } |
358 | } |
359 | |
360 | return $row; |
361 | } |
362 | |
363 | /** |
364 | * Check if queue adapter has failed jobs |
365 | * |
366 | * @param mixed $queue |
367 | * @return boolean |
368 | */ |
369 | public function hasFailedJobs($queue) |
370 | { |
371 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
372 | |
373 | $sql = $this->db->createSql(); |
374 | $sql->select()->from($this->failedTable)->where('queue = :queue'); |
375 | |
376 | $this->db->prepare($sql); |
377 | $this->db->bindParams(['queue' => $queueName]); |
378 | $this->db->execute(); |
379 | |
380 | return (count($this->db->fetchAll()) > 0); |
381 | } |
382 | |
383 | /** |
384 | * Get queue jobs |
385 | * |
386 | * @param mixed $queue |
387 | * @param boolean $unserialize |
388 | * @return array |
389 | */ |
390 | public function getFailedJobs($queue, $unserialize = true) |
391 | { |
392 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
393 | |
394 | $sql = $this->db->createSql(); |
395 | $sql->select()->from($this->failedTable)->where("queue = :queue"); |
396 | |
397 | $this->db->prepare($sql); |
398 | $this->db->bindParams(['queue' => $queueName]); |
399 | $this->db->execute(); |
400 | |
401 | $rows = $this->db->fetchAll(); |
402 | |
403 | if ($unserialize) { |
404 | foreach ($rows as $i => $row) { |
405 | $rows[$i]['payload'] = unserialize(base64_decode($row['payload'])); |
406 | } |
407 | } |
408 | |
409 | return $rows; |
410 | } |
411 | |
412 | /** |
413 | * Push job onto queue stack |
414 | * |
415 | * @param mixed $queue |
416 | * @param mixed $job |
417 | * @param mixed $priority |
418 | * @return string |
419 | */ |
420 | public function push($queue, $job, $priority = null) |
421 | { |
422 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
423 | $jobId = null; |
424 | |
425 | $sql = $this->db->createSql(); |
426 | $sql->insert($this->table)->values([ |
427 | 'job_id' => ':job_id', |
428 | 'queue' => ':queue', |
429 | 'payload' => ':payload', |
430 | 'priority' => ':priority', |
431 | 'attempts' => ':attempts', |
432 | 'completed' => ':completed' |
433 | ]); |
434 | |
435 | if ($job instanceof Jobs\Schedule) { |
436 | $jobId = ($job->getJob()->hasJobId()) ? $job->getJob()->getJobId() :$job->getJob()->generateJobId(); |
437 | } else if ($job instanceof Jobs\Job) { |
438 | $jobId = ($job->hasJobId()) ? $job->getJobId() : $job->generateJobId(); |
439 | } |
440 | |
441 | $this->db->prepare($sql); |
442 | $this->db->bindParams([ |
443 | 'job_id' => $jobId, |
444 | 'queue' => $queueName, |
445 | 'payload' => base64_encode(serialize(clone $job)), |
446 | 'priority' => $priority, |
447 | 'attempts' => 0, |
448 | 'completed' => null |
449 | ]); |
450 | |
451 | $this->db->execute(); |
452 | |
453 | return $jobId; |
454 | } |
455 | |
456 | /** |
457 | * Move failed job to failed queue stack |
458 | * |
459 | * @param mixed $queue |
460 | * @param mixed $jobId |
461 | * @param \Exception $exception |
462 | * @return void |
463 | */ |
464 | public function failed($queue, $jobId, \Exception $exception = null) |
465 | { |
466 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
467 | |
468 | $sql = $this->db->createSql(); |
469 | $sql->insert($this->failedTable)->values([ |
470 | 'job_id' => ':job_id', |
471 | 'queue' => ':queue', |
472 | 'payload' => ':payload', |
473 | 'exception' => ':exception', |
474 | 'failed' => ':failed' |
475 | ]); |
476 | |
477 | $jobRecord = $this->getJob($jobId, false); |
478 | |
479 | $this->db->prepare($sql); |
480 | $this->db->bindParams([ |
481 | 'job_id' => $jobId, |
482 | 'queue' => $queueName, |
483 | 'payload' => (isset($jobRecord['payload'])) ? $jobRecord['payload'] : null, |
484 | 'exception' => (null !== $exception) ? $exception->getMessage() : null, |
485 | 'failed' => date('Y-m-d H:i:s') |
486 | ]); |
487 | |
488 | $this->db->execute(); |
489 | |
490 | if (!empty($jobId)) { |
491 | $this->pop($jobId); |
492 | } |
493 | } |
494 | |
495 | /** |
496 | * Pop job off of queue stack |
497 | * |
498 | * @param mixed $jobId |
499 | * @return void |
500 | */ |
501 | public function pop($jobId) |
502 | { |
503 | $sql = $this->db->createSql(); |
504 | $sql->delete($this->table)->where('job_id = :job_id'); |
505 | |
506 | $this->db->prepare($sql); |
507 | $this->db->bindParams(['job_id' => $jobId]); |
508 | $this->db->execute(); |
509 | } |
510 | |
511 | /** |
512 | * Clear jobs off of the queue stack |
513 | * |
514 | * @param mixed $queue |
515 | * @param boolean $all |
516 | * @return void |
517 | */ |
518 | public function clear($queue, $all = false) |
519 | { |
520 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
521 | |
522 | $sql = $this->db->createSql(); |
523 | $sql->delete($this->table) |
524 | ->where('queue = :queue'); |
525 | |
526 | if (!$all) { |
527 | $sql->delete()->where('completed IS NOT NULL'); |
528 | } |
529 | |
530 | $this->db->prepare($sql); |
531 | $this->db->bindParams(['queue' => $queueName]); |
532 | $this->db->execute(); |
533 | } |
534 | |
535 | /** |
536 | * Clear failed jobs off of the queue stack |
537 | * |
538 | * @param mixed $queue |
539 | * @return void |
540 | */ |
541 | public function clearFailed($queue) |
542 | { |
543 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
544 | |
545 | $sql = $this->db->createSql(); |
546 | $sql->delete($this->failedTable) |
547 | ->where('queue = :queue'); |
548 | |
549 | $this->db->prepare($sql); |
550 | $this->db->bindParams(['queue' => $queueName]); |
551 | $this->db->execute(); |
552 | } |
553 | |
554 | /** |
555 | * Flush all jobs off of the queue stack |
556 | * |
557 | * @param boolean $all |
558 | * @return void |
559 | */ |
560 | public function flush($all = false) |
561 | { |
562 | $sql = $this->db->createSql(); |
563 | $sql->delete($this->table); |
564 | |
565 | if (!$all) { |
566 | $sql->delete()->where('completed IS NOT NULL'); |
567 | } |
568 | |
569 | $this->db->query($sql); |
570 | } |
571 | |
572 | /** |
573 | * Flush all failed jobs off of the queue stack |
574 | * |
575 | * @return void |
576 | */ |
577 | public function flushFailed() |
578 | { |
579 | $sql = $this->db->createSql(); |
580 | $sql->delete($this->failedTable); |
581 | |
582 | $this->db->query($sql); |
583 | } |
584 | |
585 | /** |
586 | * Flush all pop queue items |
587 | * |
588 | * @return void |
589 | */ |
590 | public function flushAll() |
591 | { |
592 | $this->flush(true); |
593 | $this->flushFailed(); |
594 | } |
595 | |
596 | /** |
597 | * Get the database object |
598 | * |
599 | * @return DbAdapter |
600 | */ |
601 | public function db() |
602 | { |
603 | return $this->db; |
604 | } |
605 | |
606 | /** |
607 | * Get the job table |
608 | * |
609 | * @return string |
610 | */ |
611 | public function getTable() |
612 | { |
613 | return $this->table; |
614 | } |
615 | |
616 | /** |
617 | * Get the failed job table |
618 | * |
619 | * @return string |
620 | */ |
621 | public function getFailedTable() |
622 | { |
623 | return $this->failedTable; |
624 | } |
625 | |
626 | /** |
627 | * Create the queue job table |
628 | * |
629 | * @param string $table |
630 | * @return Db |
631 | */ |
632 | public function createTable($table) |
633 | { |
634 | $schema = $this->db->createSchema(); |
635 | |
636 | $schema->create($table) |
637 | ->int('id')->increment() |
638 | ->varchar('job_id', 255) |
639 | ->varchar('queue', 255) |
640 | ->varchar('priority', 255) |
641 | ->text('payload') |
642 | ->int('attempts', 16) |
643 | ->datetime('completed') |
644 | ->primary('id'); |
645 | |
646 | $this->db->query($schema); |
647 | |
648 | return $this; |
649 | } |
650 | |
651 | /** |
652 | * Create the queue failed job table |
653 | * |
654 | * @param string $failedTable |
655 | * @return Db |
656 | */ |
657 | public function createFailedTable($failedTable) |
658 | { |
659 | $schema = $this->db->createSchema(); |
660 | |
661 | $schema->create($failedTable) |
662 | ->int('id', 16)->increment() |
663 | ->varchar('job_id', 255) |
664 | ->varchar('queue', 255) |
665 | ->text('payload') |
666 | ->text('exception') |
667 | ->datetime('failed') |
668 | ->primary('id'); |
669 | |
670 | $this->db->query($schema); |
671 | |
672 | return $this; |
673 | } |
674 | |
675 | } |