Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
100.00% covered (success)
100.00%
176 / 176
100.00% covered (success)
100.00%
26 / 26
CRAP
100.00% covered (success)
100.00%
1 / 1
Database
100.00% covered (success)
100.00%
176 / 176
100.00% covered (success)
100.00%
26 / 26
48
100.00% covered (success)
100.00%
1 / 1
 __construct
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 create
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getDb
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 db
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getTable
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getStart
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 getEnd
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 getStatus
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 push
100.00% covered (success)
100.00%
26 / 26
100.00% covered (success)
100.00%
1 / 1
4
 pop
100.00% covered (success)
100.00%
15 / 15
100.00% covered (success)
100.00%
1 / 1
5
 hasJobs
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 hasFailedJob
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 getFailedJob
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
3
 hasFailedJobs
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 getFailedJobs
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
3
 clearFailed
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 schedule
100.00% covered (success)
100.00%
16 / 16
100.00% covered (success)
100.00%
1 / 1
2
 getTasks
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
2
 getTask
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
2
 updateTask
100.00% covered (success)
100.00%
14 / 14
100.00% covered (success)
100.00%
1 / 1
2
 removeTask
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 getTaskCount
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 hasTasks
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 clearTasks
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 clear
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 createTable
100.00% covered (success)
100.00%
11 / 11
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2/**
3 * Pop PHP Framework (http://www.popphp.org/)
4 *
5 * @link       https://github.com/popphp/popphp-framework
6 * @author     Nick Sagona, III <dev@nolainteractive.com>
7 * @copyright  Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com)
8 * @license    http://www.popphp.org/license     New BSD License
9 */
10
11/**
12 * @namespace
13 */
14namespace Pop\Queue\Adapter;
15
16use Pop\Db\Adapter\AbstractAdapter as DbAdapter;
17use Pop\Queue\Process\AbstractJob;
18use Pop\Queue\Process\Task;
19
20/**
21 * Database adapter class
22 *
23 * @category   Pop
24 * @package    Pop\Queue
25 * @author     Nick Sagona, III <dev@nolainteractive.com>
26 * @copyright  Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com)
27 * @license    http://www.popphp.org/license     New BSD License
28 * @version    2.0.0
29 */
30class Database extends AbstractTaskAdapter
31{
32    /**
33     * Database adapter
34     * @var ?DbAdapter
35     */
36    protected ?DbAdapter $db = null;
37
38    /**
39     * Database table
40     * @var ?string
41     */
42    protected ?string $table = null;
43
44    /**
45     * Constructor
46     *
47     * Instantiate the database adapter object
48     *
49     * @param DbAdapter $db
50     * @param string    $table
51     */
52    public function __construct(DbAdapter $db, string $table = 'pop_queue', ?string $priority = null)
53    {
54        $this->db    = $db;
55        $this->table = $table;
56
57        if (!$this->db->hasTable($table)) {
58            $this->createTable($table);
59        }
60
61        parent::__construct($priority);
62    }
63
64    /**
65     * Create database adapter
66     *
67     * @param  DbAdapter $db
68     * @param  string    $table
69     * @return Database
70     */
71    public static function create(DbAdapter $db, string $table = 'pop_queue', ?string $priority = null): Database
72    {
73        return new self($db, $table);
74    }
75
76    /**
77     * Get database adapter
78     *
79     * @return ?DbAdapter
80     */
81    public function getDb(): ?DbAdapter
82    {
83        return $this->db;
84    }
85
86    /**
87     * Get database adapter (alias)
88     *
89     * @return ?DbAdapter
90     */
91    public function db(): ?DbAdapter
92    {
93        return $this->db;
94    }
95
96    /**
97     * Get database table
98     *
99     * @return ?string
100     */
101    public function getTable(): ?string
102    {
103        return $this->table;
104    }
105
106    /**
107     * Get queue length
108     *
109     * @return int
110     */
111    public function getStart(): int
112    {
113        $sql = $this->db->createSql();
114        $sql->select('index')->from($this->table)->where('index IS NOT NULL')->orderBy('index')->limit(1);
115        $this->db->query($sql);
116
117        $rows = $this->db->fetchAll();
118        return (isset($rows[0]['index'])) ? (int)$rows[0]['index'] : 0;
119    }
120
121    /**
122     * Get queue length
123     *
124     * @return int
125     */
126    public function getEnd(): int
127    {
128        $sql = $this->db->createSql();
129        $sql->select('index')->from($this->table)->orderBy('index', 'DESC')->limit(1);
130        $this->db->query($sql);
131
132        $rows = $this->db->fetchAll();
133        return (isset($rows[0]['index'])) ? (int)$rows[0]['index'] : 0;
134    }
135
136    /**
137     * Get queue job status
138     *
139     * @param  int $index
140     * @return int
141     */
142    public function getStatus(int $index): int
143    {
144        $sql = $this->db->createSql();
145        $sql->select('status')->from($this->table)->where('index = ' . (int)$index);
146        $this->db->query($sql);
147
148        $rows = $this->db->fetchAll();
149        return (isset($rows[0]['status'])) ? (int)$rows[0]['status'] : 0;
150    }
151
152    /**
153     * Push job on to queue
154     *
155     * @param  AbstractJob $job
156     * @return Database
157     */
158    public function push(AbstractJob $job): Database
159    {
160        $status = 1;
161        $index  = ($this->getEnd() + 1);
162
163        if ($job->hasFailed()) {
164            $status = 2;
165            if ($this->isFilo()) {
166                $index = ($this->getStart() - 1);
167            }
168        }
169
170        if ($job->isValid()) {
171            $sql = $this->db->createSql();
172            $sql->insert($this->table)->values([
173                'index'   => ':index',
174                'type'    => ':type',
175                'job_id'  => ':job_id',
176                'payload' => ':payload',
177                'status'  => ':status'
178            ]);
179
180            $jobData = [
181                'index'   => $index,
182                'type'    => 'job',
183                'job_id'  => $job->getJobId(),
184                'payload' => base64_encode(serialize(clone $job)),
185                'status'  => $status
186            ];
187
188            $this->db->prepare($sql);
189            $this->db->bindParams($jobData);
190            $this->db->execute();
191        }
192
193        return $this;
194    }
195
196    /**
197     * Pop job off of queue
198     *
199     * @return ?AbstractJob
200     */
201    public function pop(): ?AbstractJob
202    {
203        $job    = false;
204        $index  = ($this->isFifo()) ? $this->getStart() : $this->getEnd();
205        $status = $this->getStatus($index);
206
207        if ($status != 0) {
208            $sql = $this->db->createSql();
209            $sql->update($this->table)->values(['status' => 0])->where('index = ' . (int)$index);
210            $this->db->query($sql);
211
212            $sql->select('payload')->from($this->table)->where('index = ' . (int)$index);
213            $this->db->query($sql);
214            $rows = $this->db->fetchAll();
215            if (isset($rows[0]['payload'])) {
216                $job = $rows[0]['payload'];
217            }
218            $sql->delete()->from($this->table)->where('index = ' . (int)$index);
219            $this->db->query($sql);
220        }
221
222        return ($job !== false) ? unserialize(base64_decode($job)) : null;
223    }
224
225    /**
226     * Check if adapter has jobs
227     *
228     * @return bool
229     */
230    public function hasJobs(): bool
231    {
232        $sql = $this->db->createSql();
233        $sql->select(['total' => 'COUNT(1)'])->from($this->table)->where("type = 'job'");
234        $this->db->query($sql);
235        $rows = $this->db->fetchAll();
236
237        return (isset($rows[0]['total'])) ? (int)$rows[0]['total'] : 0;
238    }
239
240    /**
241     * Check if adapter has failed job
242     *
243     * @param  int $index
244     * @return bool
245     */
246    public function hasFailedJob(int $index): bool
247    {
248        $sql = $this->db->createSql();
249        $sql->select()->from($this->table)->where('index = ' . (int)$index);
250        $this->db->query($sql);
251        $rows = $this->db->fetchAll();
252
253        return (isset($rows[0]));
254    }
255
256    /**
257     * Get failed job from worker by job ID
258     *
259     * @param  int $index
260     * @param  bool $unserialize
261     * @return mixed
262     */
263    public function getFailedJob(int $index, bool $unserialize = true): mixed
264    {
265        $sql = $this->db->createSql();
266        $sql->select()->from($this->table)->where('index = ' . (int)$index);
267        $this->db->query($sql);
268        $rows = $this->db->fetchAll();
269        $job  = null;
270        if (isset($rows[0])) {
271            $job = ($unserialize) ? unserialize(base64_decode($rows[0]['payload'])) : $rows[0];
272        }
273
274        return $job;
275    }
276
277    /**
278     * Check if adapter has failed jobs
279     *
280     * @return bool
281     */
282    public function hasFailedJobs(): bool
283    {
284        $sql = $this->db->createSql();
285        $sql->select(['total' => 'COUNT(1)'])->from($this->table)->where("status = 2");
286        $this->db->query($sql);
287        $rows = $this->db->fetchAll();
288
289        return (isset($rows[0]['total'])) ? (int)$rows[0]['total'] : 0;
290    }
291
292    /**
293     * Get adapter failed jobs
294     *
295     * @param  bool $unserialize
296     * @return array
297     */
298    public function getFailedJobs(bool $unserialize = true): array
299    {
300        $sql = $this->db->createSql();
301        $sql->select()->from($this->table)->where("status = 2");
302        $this->db->query($sql);
303        $rows = $this->db->fetchAll();
304        $jobs = [];
305
306        foreach ($rows as $row) {
307            $jobs[$row['index']] = ($unserialize) ? unserialize(base64_decode($row['payload'])) : $row;
308        }
309
310        return $jobs;
311    }
312
313    /**
314     * Clear failed jobs out of the queue
315     *
316     * @return Database
317     */
318    public function clearFailed(): Database
319    {
320        $sql = $this->db->createSql();
321        $sql->delete()->from($this->table)->where("status = 2");
322        $this->db->query($sql);
323
324        return $this;
325    }
326
327    /**
328     * Schedule job with queue
329     *
330     * @param  Task $task
331     * @return Database
332     */
333    public function schedule(Task $task): Database
334    {
335        if ($task->isValid()) {
336            $sql = $this->db->createSql();
337            $sql->insert($this->table)->values([
338                'type'    => ':type',
339                'job_id'  => ':job_id',
340                'payload' => ':payload'
341            ]);
342
343            $jobData = [
344                'type'    => 'task',
345                'job_id'  => $task->getJobId(),
346                'payload' => base64_encode(serialize(clone $task))
347            ];
348
349            $this->db->prepare($sql);
350            $this->db->bindParams($jobData);
351            $this->db->execute();
352        }
353
354        return $this;
355    }
356
357    /**
358     * Get scheduled tasks
359     *
360     * @return array
361     */
362    public function getTasks(): array
363    {
364        $sql = $this->db->createSql();
365        $sql->select('job_id')->from($this->table)->where("type = 'task'");
366        $this->db->query($sql);
367        $rows = $this->db->fetchAll();
368
369        $tasks = [];
370
371        foreach ($rows as $row) {
372            $tasks[] = $row['job_id'];
373        }
374
375        return $tasks;
376    }
377
378    /**
379     * Get scheduled task
380     *
381     * @param  string $taskId
382     * @return ?Task
383     */
384    public function getTask(string $taskId): ?Task
385    {
386        $sql = $this->db->createSql();
387        $sql->select('payload')->from($this->table)->where('job_id = :job_id');
388        $this->db->prepare($sql);
389        $this->db->bindParams(['job_id' => $taskId]);
390        $this->db->execute();
391        $rows = $this->db->fetchAll();
392
393        return (isset($rows[0]['payload'])) ? unserialize(base64_decode($rows[0]['payload'])) : null;
394    }
395
396    /**
397     * Update scheduled task
398     *
399     * @param  Task $task
400     * @return Database
401     */
402    public function updateTask(Task $task): Database
403    {
404        if ($task->isValid()) {
405            $sql = $this->db->createSql();
406            $sql->update($this->table)->values([
407                'payload' => ':payload'
408            ])->where('job_id = :job_id');
409
410            $jobData = [
411                'payload' => base64_encode(serialize(clone $task)),
412                'job_id'  => $task->getJobId()
413            ];
414
415            $this->db->prepare($sql);
416            $this->db->bindParams($jobData);
417            $this->db->execute();
418        } else {
419            $this->removeTask($task->getJobId());
420        }
421
422        return $this;
423    }
424
425    /**
426     * Remove scheduled task
427     *
428     * @param  string $taskId
429     * @return Database
430     */
431    public function removeTask(string $taskId): Database
432    {
433        $sql = $this->db->createSql();
434        $sql->delete()->from($this->table)->where('job_id = :job_id');
435        $this->db->prepare($sql);
436        $this->db->bindParams(['job_id' => $taskId]);
437        $this->db->execute();
438
439        return $this;
440    }
441
442    /**
443     * Get scheduled tasks count
444     *
445     * @return int
446     */
447    public function getTaskCount(): int
448    {
449        $sql = $this->db->createSql();
450        $sql->select(['total' => 'COUNT(1)'])->from($this->table)->where("type = 'task'");
451        $this->db->query($sql);
452        $rows = $this->db->fetchAll();
453
454        return (isset($rows[0]['total'])) ? (int)$rows[0]['total'] : 0;
455    }
456
457    /**
458     * Has scheduled tasks
459     *
460     * @return bool
461     */
462    public function hasTasks(): bool
463    {
464        return ($this->getTaskCount() > 0);
465    }
466
467    /**
468     * Clear all scheduled task
469     *
470     * @return Database
471     */
472    public function clearTasks(): Database
473    {
474        $sql = $this->db->createSql();
475        $sql->delete()->from($this->table)->where("type = 'task'");
476        $this->db->query($sql);
477
478        return $this;
479    }
480
481    /**
482     * Clear jobs out of queue
483     *
484     * @return Database
485     */
486    public function clear(): Database
487    {
488        $sql = $this->db->createSql();
489        $sql->delete()->from($this->table);
490        $this->db->query($sql);
491
492        return $this;
493    }
494
495    /**
496     * Create the database table
497     *
498     * @param  string $table
499     * @return Database
500     */
501    public function createTable(string $table): Database
502    {
503        $schema = $this->db->createSchema();
504
505        $schema->create($table)
506            ->int('id', 16)->increment()
507            ->int('index', 16)->nullable()
508            ->varchar('type', 255)
509            ->varchar('job_id', 255)
510            ->text('payload')
511            ->int('status', 1)->defaultIs(1)
512            ->primary('id');
513
514        $this->db->query($schema);
515
516        return $this;
517    }
518
519}