Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
100.00% covered (success)
100.00%
227 / 227
100.00% covered (success)
100.00%
27 / 27
CRAP
100.00% covered (success)
100.00%
1 / 1
Db
100.00% covered (success)
100.00%
227 / 227
100.00% covered (success)
100.00%
27 / 27
67
100.00% covered (success)
100.00%
1 / 1
 __construct
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
3
 hasJob
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 getJob
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
4
 updateJob
100.00% covered (success)
100.00%
18 / 18
100.00% covered (success)
100.00%
1 / 1
6
 hasJobs
100.00% covered (success)
100.00%
9 / 9
100.00% covered (success)
100.00%
1 / 1
2
 getJobs
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
4
 hasCompletedJob
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
1
 hasCompletedJobs
100.00% covered (success)
100.00%
9 / 9
100.00% covered (success)
100.00%
1 / 1
2
 getCompletedJob
100.00% covered (success)
100.00%
14 / 14
100.00% covered (success)
100.00%
1 / 1
4
 getCompletedJobs
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
4
 hasFailedJob
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 getFailedJob
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
3
 hasFailedJobs
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
2
 getFailedJobs
100.00% covered (success)
100.00%
11 / 11
100.00% covered (success)
100.00%
1 / 1
4
 push
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
6
 failed
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
5
 pop
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 clear
100.00% covered (success)
100.00%
9 / 9
100.00% covered (success)
100.00%
1 / 1
3
 clearFailed
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
2
 flush
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 flushFailed
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 flushAll
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 db
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getTable
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getFailedTable
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 createTable
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
1
 createFailedTable
100.00% covered (success)
100.00%
11 / 11
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2/**
3 * Pop PHP Framework (http://www.popphp.org/)
4 *
5 * @link       https://github.com/popphp/popphp-framework
6 * @author     Nick Sagona, III <dev@nolainteractive.com>
7 * @copyright  Copyright (c) 2009-2023 NOLA Interactive, LLC. (http://www.nolainteractive.com)
8 * @license    http://www.popphp.org/license     New BSD License
9 */
10
11/**
12 * @namespace
13 */
14namespace Pop\Queue\Adapter;
15
16use Pop\Db\Adapter\AbstractAdapter as DbAdapter;
17use Pop\Queue\Queue;
18use Pop\Queue\Processor\Jobs;
19
20/**
21 * Database queue adapter class
22 *
23 * @category   Pop
24 * @package    Pop\Queue
25 * @author     Nick Sagona, III <dev@nolainteractive.com>
26 * @copyright  Copyright (c) 2009-2023 NOLA Interactive, LLC. (http://www.nolainteractive.com)
27 * @license    http://www.popphp.org/license     New BSD License
28 * @version    1.2.0
29 */
30class Db extends AbstractAdapter
31{
32
33    /**
34     * Database adapter
35     * @var DbAdapter
36     */
37    protected $db = null;
38
39    /**
40     * Job table
41     * @var string
42     */
43    protected $table = null;
44
45    /**
46     * Failed job table
47     * @var string
48     */
49    protected $failedTable = null;
50
51    /**
52     * Constructor
53     *
54     * Instantiate the database queue object
55     *
56     * @param DbAdapter $db
57     * @param string    $table
58     * @param string    $failedTable
59     */
60    public function __construct(DbAdapter $db, $table = 'pop_queue_jobs', $failedTable = 'pop_queue_failed_jobs')
61    {
62        $this->db          = $db;
63        $this->table       = $table;
64        $this->failedTable = $failedTable;
65
66        if (!$this->db->hasTable($table)) {
67            $this->createTable($table);
68        }
69        if (!$this->db->hasTable($failedTable)) {
70            $this->createFailedTable($failedTable);
71        }
72    }
73
74    /**
75     * Check if queue stack has job
76     *
77     * @param  mixed $jobId
78     * @return boolean
79     */
80    public function hasJob($jobId)
81    {
82        $sql = $this->db->createSql();
83        $sql->select()->from($this->table)->where('job_id = :job_id');
84
85        $this->db->prepare($sql);
86        $this->db->bindParams(['job_id' => $jobId]);
87        $this->db->execute();
88
89        return (count($this->db->fetchAll()) > 0);
90    }
91
92    /**
93     * Get job from queue stack by job ID
94     *
95     * @param  mixed   $jobId
96     * @param  boolean $unserialize
97     * @return array
98     */
99    public function getJob($jobId, $unserialize = true)
100    {
101        $sql = $this->db->createSql();
102        $sql->select()->from($this->table)->where('job_id = :job_id')->limit(1);
103
104        $this->db->prepare($sql);
105        $this->db->bindParams(['job_id' => $jobId]);
106        $this->db->execute();
107
108        $rows = $this->db->fetchAll();
109        $row  = null;
110
111        if (isset($rows[0])) {
112            $row = $rows[0];
113            if (($unserialize) && isset($row['payload'])) {
114                $row['payload'] = unserialize(base64_decode($row['payload']));
115            }
116        }
117
118        return $row;
119    }
120
121    /**
122     * Update job from queue stack by job ID
123     *
124     * @param  mixed $jobId
125     * @param  mixed $completed
126     * @param  mixed $increment
127     * @return void
128     */
129    public function updateJob($jobId, $completed = false, $increment = false)
130    {
131        $jobRecord = $this->getJob($jobId);
132        $values    = [];
133        $params    = [];
134
135        if ($completed !== false) {
136            $values['completed'] = ':completed';
137            $params['completed'] = ($completed === true) ? date('Y-m-d H:i:s') : $completed;
138        }
139        if ($increment !== false) {
140            $values['attempts'] = ':attempts';
141            if (($increment === true) && isset($jobRecord['attempts'])) {
142                $jobRecord['attempts']++;
143                $values['attempts'] = $jobRecord['attempts'];
144            } else {
145                $params['attempts'] = (int)$increment;
146            }
147        }
148
149        $params['job_id'] = $jobId;
150
151        $sql = $this->db->createSql();
152        $sql->update($this->table)->values($values)->where('job_id = :job_id');
153
154        $this->db->prepare($sql);
155        $this->db->bindParams($params);
156        $this->db->execute();
157    }
158
159    /**
160     * Check if queue has jobs
161     *
162     * @param  mixed $queue
163     * @return boolean
164     */
165    public function hasJobs($queue)
166    {
167        $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue;
168
169        $sql = $this->db->createSql();
170        $sql->select()->from($this->table)
171            ->where('queue = :queue')
172            ->where('completed IS NULL');
173
174        $this->db->prepare($sql);
175        $this->db->bindParams(['queue' => $queueName]);
176        $this->db->execute();
177
178        return (count($this->db->fetchAll()) > 0);
179    }
180
181    /**
182     * Get queue jobs
183     *
184     * @param  mixed   $queue
185     * @param  boolean $unserialize
186     * @return array
187     */
188    public function getJobs($queue, $unserialize = true)
189    {
190        $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue;
191
192        $sql = $this->db->createSql();
193        $sql->select()->from($this->table)
194            ->where('queue = :queue')
195            ->where('completed IS NULL');
196
197        $this->db->prepare($sql);
198        $this->db->bindParams(['queue' => $queueName]);
199        $this->db->execute();
200
201        $rows = $this->db->fetchAll();
202
203        if ($unserialize) {
204            foreach ($rows as $i => $row) {
205                $rows[$i]['payload'] = unserialize(base64_decode($row['payload']));
206            }
207        }
208
209        return $rows;
210    }
211
212    /**
213     * Check if queue stack has completed job
214     *
215     * @param  mixed $jobId
216     * @return boolean
217     */
218    public function hasCompletedJob($jobId)
219    {
220        $sql = $this->db->createSql();
221        $sql->select()->from($this->table)
222            ->where('job_id = :job_id')
223            ->where('completed IS NOT NULL');
224
225        $this->db->prepare($sql);
226        $this->db->bindParams(['job_id' => $jobId]);
227        $this->db->execute();
228
229        return (count($this->db->fetchAll()) > 0);
230    }
231
232    /**
233     * Check if queue has completed jobs
234     *
235     * @param  mixed $queue
236     * @return boolean
237     */
238    public function hasCompletedJobs($queue)
239    {
240        $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue;
241
242        $sql = $this->db->createSql();
243        $sql->select()->from($this->table)
244            ->where('queue = :queue')
245            ->where('completed IS NOT NULL');
246
247        $this->db->prepare($sql);
248        $this->db->bindParams(['queue' => $queueName]);
249        $this->db->execute();
250
251        return (count($this->db->fetchAll()) > 0);
252    }
253
254    /**
255     * Get queue completed job
256     *
257     * @param  mixed   $jobId
258     * @param  boolean $unserialize
259     * @return array
260     */
261    public function getCompletedJob($jobId, $unserialize = true)
262    {
263        $sql = $this->db->createSql();
264        $sql->select()->from($this->table)
265            ->where('job_id = :job_id')
266            ->where('completed IS NOT NULL')->limit(1);
267
268        $this->db->prepare($sql);
269        $this->db->bindParams(['job_id' => $jobId]);
270        $this->db->execute();
271
272        $rows = $this->db->fetchAll();
273        $row  = null;
274
275        if (isset($rows[0])) {
276            $row = $rows[0];
277            if (($unserialize) && isset($row['payload'])) {
278                $row['payload'] = unserialize(base64_decode($row['payload']));
279            }
280        }
281
282        return $row;
283    }
284
285    /**
286     * Get queue completed jobs
287     *
288     * @param  mixed   $queue
289     * @param  boolean $unserialize
290     * @return array
291     */
292    public function getCompletedJobs($queue, $unserialize = true)
293    {
294        $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue;
295
296        $sql = $this->db->createSql();
297        $sql->select()->from($this->table)
298            ->where('queue = :queue')
299            ->where('completed IS NOT NULL');
300
301        $this->db->prepare($sql);
302        $this->db->bindParams(['queue' => $queueName]);
303        $this->db->execute();
304
305        $rows = $this->db->fetchAll();
306
307        if ($unserialize) {
308            foreach ($rows as $i => $row) {
309                $rows[$i]['payload'] = unserialize(base64_decode($row['payload']));
310            }
311        }
312
313        return $rows;
314    }
315
316    /**
317     * Check if queue stack has failed job
318     *
319     * @param  mixed $jobId
320     * @return boolean
321     */
322    public function hasFailedJob($jobId)
323    {
324        $sql = $this->db->createSql();
325        $sql->select()->from($this->failedTable)->where('job_id = :job_id');
326
327        $this->db->prepare($sql);
328        $this->db->bindParams(['job_id' => $jobId]);
329        $this->db->execute();
330
331        return (count($this->db->fetchAll()) > 0);
332    }
333
334    /**
335     * Get failed job from queue stack by job ID
336     *
337     * @param  mixed   $jobId
338     * @param  boolean $unserialize
339     * @return array
340     */
341    public function getFailedJob($jobId, $unserialize = true)
342    {
343        $sql = $this->db->createSql();
344        $sql->select()->from($this->failedTable)->where('job_id = :job_id')->limit(1);
345
346        $this->db->prepare($sql);
347        $this->db->bindParams(['job_id' => $jobId]);
348        $this->db->execute();
349
350        $rows = $this->db->fetchAll();
351        $row  = null;
352
353        if (isset($rows[0])) {
354            $row = $rows[0];
355            if ($unserialize) {
356                $row['payload'] = unserialize(base64_decode($row['payload']));
357            }
358        }
359
360        return $row;
361    }
362
363    /**
364     * Check if queue adapter has failed jobs
365     *
366     * @param  mixed $queue
367     * @return boolean
368     */
369    public function hasFailedJobs($queue)
370    {
371        $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue;
372
373        $sql = $this->db->createSql();
374        $sql->select()->from($this->failedTable)->where('queue = :queue');
375
376        $this->db->prepare($sql);
377        $this->db->bindParams(['queue' => $queueName]);
378        $this->db->execute();
379
380        return (count($this->db->fetchAll()) > 0);
381    }
382
383    /**
384     * Get queue jobs
385     *
386     * @param  mixed   $queue
387     * @param  boolean $unserialize
388     * @return array
389     */
390    public function getFailedJobs($queue, $unserialize = true)
391    {
392        $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue;
393
394        $sql = $this->db->createSql();
395        $sql->select()->from($this->failedTable)->where("queue = :queue");
396
397        $this->db->prepare($sql);
398        $this->db->bindParams(['queue' => $queueName]);
399        $this->db->execute();
400
401        $rows = $this->db->fetchAll();
402
403        if ($unserialize) {
404            foreach ($rows as $i => $row) {
405                $rows[$i]['payload'] = unserialize(base64_decode($row['payload']));
406            }
407        }
408
409        return $rows;
410    }
411
412    /**
413     * Push job onto queue stack
414     *
415     * @param  mixed $queue
416     * @param  mixed $job
417     * @param  mixed $priority
418     * @return string
419     */
420    public function push($queue, $job, $priority = null)
421    {
422        $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue;
423        $jobId     = null;
424
425        $sql = $this->db->createSql();
426        $sql->insert($this->table)->values([
427            'job_id'    => ':job_id',
428            'queue'     => ':queue',
429            'payload'   => ':payload',
430            'priority'  => ':priority',
431            'attempts'  => ':attempts',
432            'completed' => ':completed'
433        ]);
434
435        if ($job instanceof Jobs\Schedule) {
436            $jobId = ($job->getJob()->hasJobId()) ? $job->getJob()->getJobId() :$job->getJob()->generateJobId();
437        } else if ($job instanceof Jobs\Job) {
438            $jobId = ($job->hasJobId()) ? $job->getJobId() : $job->generateJobId();
439        }
440
441        $this->db->prepare($sql);
442        $this->db->bindParams([
443            'job_id'    => $jobId,
444            'queue'     => $queueName,
445            'payload'   => base64_encode(serialize(clone $job)),
446            'priority'  => $priority,
447            'attempts'  => 0,
448            'completed' => null
449        ]);
450
451        $this->db->execute();
452
453        return $jobId;
454    }
455
456    /**
457     * Move failed job to failed queue stack
458     *
459     * @param  mixed      $queue
460     * @param  mixed      $jobId
461     * @param  \Exception $exception
462     * @return void
463     */
464    public function failed($queue, $jobId, \Exception $exception = null)
465    {
466        $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue;
467
468        $sql = $this->db->createSql();
469        $sql->insert($this->failedTable)->values([
470            'job_id'    => ':job_id',
471            'queue'     => ':queue',
472            'payload'   => ':payload',
473            'exception' => ':exception',
474            'failed'    => ':failed'
475        ]);
476
477        $jobRecord = $this->getJob($jobId, false);
478
479        $this->db->prepare($sql);
480        $this->db->bindParams([
481            'job_id'    => $jobId,
482            'queue'     => $queueName,
483            'payload'   => (isset($jobRecord['payload'])) ? $jobRecord['payload'] : null,
484            'exception' => (null !== $exception) ? $exception->getMessage() : null,
485            'failed'    => date('Y-m-d H:i:s')
486        ]);
487
488        $this->db->execute();
489
490        if (!empty($jobId)) {
491            $this->pop($jobId);
492        }
493    }
494
495    /**
496     * Pop job off of queue stack
497     *
498     * @param  mixed $jobId
499     * @return void
500     */
501    public function pop($jobId)
502    {
503        $sql = $this->db->createSql();
504        $sql->delete($this->table)->where('job_id = :job_id');
505
506        $this->db->prepare($sql);
507        $this->db->bindParams(['job_id' => $jobId]);
508        $this->db->execute();
509    }
510
511    /**
512     * Clear jobs off of the queue stack
513     *
514     * @param  mixed   $queue
515     * @param  boolean $all
516     * @return void
517     */
518    public function clear($queue, $all = false)
519    {
520        $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue;
521
522        $sql = $this->db->createSql();
523        $sql->delete($this->table)
524            ->where('queue = :queue');
525
526        if (!$all) {
527            $sql->delete()->where('completed IS NOT NULL');
528        }
529
530        $this->db->prepare($sql);
531        $this->db->bindParams(['queue' => $queueName]);
532        $this->db->execute();
533    }
534
535    /**
536     * Clear failed jobs off of the queue stack
537     *
538     * @param  mixed $queue
539     * @return void
540     */
541    public function clearFailed($queue)
542    {
543        $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue;
544
545        $sql = $this->db->createSql();
546        $sql->delete($this->failedTable)
547            ->where('queue = :queue');
548
549        $this->db->prepare($sql);
550        $this->db->bindParams(['queue' => $queueName]);
551        $this->db->execute();
552    }
553
554    /**
555     * Flush all jobs off of the queue stack
556     *
557     * @param  boolean $all
558     * @return void
559     */
560    public function flush($all = false)
561    {
562        $sql = $this->db->createSql();
563        $sql->delete($this->table);
564
565        if (!$all) {
566            $sql->delete()->where('completed IS NOT NULL');
567        }
568
569        $this->db->query($sql);
570    }
571
572    /**
573     * Flush all failed jobs off of the queue stack
574     *
575     * @return void
576     */
577    public function flushFailed()
578    {
579        $sql = $this->db->createSql();
580        $sql->delete($this->failedTable);
581
582        $this->db->query($sql);
583    }
584
585    /**
586     * Flush all pop queue items
587     *
588     * @return void
589     */
590    public function flushAll()
591    {
592        $this->flush(true);
593        $this->flushFailed();
594    }
595
596    /**
597     * Get the database object
598     *
599     * @return DbAdapter
600     */
601    public function db()
602    {
603        return $this->db;
604    }
605
606    /**
607     * Get the job table
608     *
609     * @return string
610     */
611    public function getTable()
612    {
613        return $this->table;
614    }
615
616    /**
617     * Get the failed job table
618     *
619     * @return string
620     */
621    public function getFailedTable()
622    {
623        return $this->failedTable;
624    }
625
626    /**
627     * Create the queue job table
628     *
629     * @param  string $table
630     * @return Db
631     */
632    public function createTable($table)
633    {
634        $schema = $this->db->createSchema();
635
636        $schema->create($table)
637            ->int('id')->increment()
638            ->varchar('job_id', 255)
639            ->varchar('queue', 255)
640            ->varchar('priority', 255)
641            ->text('payload')
642            ->int('attempts', 16)
643            ->datetime('completed')
644            ->primary('id');
645
646        $this->db->query($schema);
647
648        return $this;
649    }
650
651    /**
652     * Create the queue failed job table
653     *
654     * @param  string $failedTable
655     * @return Db
656     */
657    public function createFailedTable($failedTable)
658    {
659        $schema = $this->db->createSchema();
660
661        $schema->create($failedTable)
662            ->int('id', 16)->increment()
663            ->varchar('job_id', 255)
664            ->varchar('queue', 255)
665            ->text('payload')
666            ->text('exception')
667            ->datetime('failed')
668            ->primary('id');
669
670        $this->db->query($schema);
671
672        return $this;
673    }
674
675}