Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
100.00% |
176 / 176 |
|
100.00% |
26 / 26 |
CRAP | |
100.00% |
1 / 1 |
Database | |
100.00% |
176 / 176 |
|
100.00% |
26 / 26 |
48 | |
100.00% |
1 / 1 |
__construct | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
create | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getDb | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
db | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getTable | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getStart | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
getEnd | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
getStatus | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
push | |
100.00% |
26 / 26 |
|
100.00% |
1 / 1 |
4 | |||
pop | |
100.00% |
15 / 15 |
|
100.00% |
1 / 1 |
5 | |||
hasJobs | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
hasFailedJob | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
1 | |||
getFailedJob | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
3 | |||
hasFailedJobs | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
getFailedJobs | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
3 | |||
clearFailed | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
schedule | |
100.00% |
16 / 16 |
|
100.00% |
1 / 1 |
2 | |||
getTasks | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
2 | |||
getTask | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
2 | |||
updateTask | |
100.00% |
14 / 14 |
|
100.00% |
1 / 1 |
2 | |||
removeTask | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
getTaskCount | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
hasTasks | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
clearTasks | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
clear | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
createTable | |
100.00% |
11 / 11 |
|
100.00% |
1 / 1 |
1 |
1 | <?php |
2 | /** |
3 | * Pop PHP Framework (http://www.popphp.org/) |
4 | * |
5 | * @link https://github.com/popphp/popphp-framework |
6 | * @author Nick Sagona, III <dev@nolainteractive.com> |
7 | * @copyright Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com) |
8 | * @license http://www.popphp.org/license New BSD License |
9 | */ |
10 | |
11 | /** |
12 | * @namespace |
13 | */ |
14 | namespace Pop\Queue\Adapter; |
15 | |
16 | use Pop\Db\Adapter\AbstractAdapter as DbAdapter; |
17 | use Pop\Queue\Process\AbstractJob; |
18 | use Pop\Queue\Process\Task; |
19 | |
20 | /** |
21 | * Database adapter class |
22 | * |
23 | * @category Pop |
24 | * @package Pop\Queue |
25 | * @author Nick Sagona, III <dev@nolainteractive.com> |
26 | * @copyright Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com) |
27 | * @license http://www.popphp.org/license New BSD License |
28 | * @version 2.0.0 |
29 | */ |
30 | class Database extends AbstractTaskAdapter |
31 | { |
32 | /** |
33 | * Database adapter |
34 | * @var ?DbAdapter |
35 | */ |
36 | protected ?DbAdapter $db = null; |
37 | |
38 | /** |
39 | * Database table |
40 | * @var ?string |
41 | */ |
42 | protected ?string $table = null; |
43 | |
44 | /** |
45 | * Constructor |
46 | * |
47 | * Instantiate the database adapter object |
48 | * |
49 | * @param DbAdapter $db |
50 | * @param string $table |
51 | */ |
52 | public function __construct(DbAdapter $db, string $table = 'pop_queue', ?string $priority = null) |
53 | { |
54 | $this->db = $db; |
55 | $this->table = $table; |
56 | |
57 | if (!$this->db->hasTable($table)) { |
58 | $this->createTable($table); |
59 | } |
60 | |
61 | parent::__construct($priority); |
62 | } |
63 | |
64 | /** |
65 | * Create database adapter |
66 | * |
67 | * @param DbAdapter $db |
68 | * @param string $table |
69 | * @return Database |
70 | */ |
71 | public static function create(DbAdapter $db, string $table = 'pop_queue', ?string $priority = null): Database |
72 | { |
73 | return new self($db, $table); |
74 | } |
75 | |
76 | /** |
77 | * Get database adapter |
78 | * |
79 | * @return ?DbAdapter |
80 | */ |
81 | public function getDb(): ?DbAdapter |
82 | { |
83 | return $this->db; |
84 | } |
85 | |
86 | /** |
87 | * Get database adapter (alias) |
88 | * |
89 | * @return ?DbAdapter |
90 | */ |
91 | public function db(): ?DbAdapter |
92 | { |
93 | return $this->db; |
94 | } |
95 | |
96 | /** |
97 | * Get database table |
98 | * |
99 | * @return ?string |
100 | */ |
101 | public function getTable(): ?string |
102 | { |
103 | return $this->table; |
104 | } |
105 | |
106 | /** |
107 | * Get queue length |
108 | * |
109 | * @return int |
110 | */ |
111 | public function getStart(): int |
112 | { |
113 | $sql = $this->db->createSql(); |
114 | $sql->select('index')->from($this->table)->where('index IS NOT NULL')->orderBy('index')->limit(1); |
115 | $this->db->query($sql); |
116 | |
117 | $rows = $this->db->fetchAll(); |
118 | return (isset($rows[0]['index'])) ? (int)$rows[0]['index'] : 0; |
119 | } |
120 | |
121 | /** |
122 | * Get queue length |
123 | * |
124 | * @return int |
125 | */ |
126 | public function getEnd(): int |
127 | { |
128 | $sql = $this->db->createSql(); |
129 | $sql->select('index')->from($this->table)->orderBy('index', 'DESC')->limit(1); |
130 | $this->db->query($sql); |
131 | |
132 | $rows = $this->db->fetchAll(); |
133 | return (isset($rows[0]['index'])) ? (int)$rows[0]['index'] : 0; |
134 | } |
135 | |
136 | /** |
137 | * Get queue job status |
138 | * |
139 | * @param int $index |
140 | * @return int |
141 | */ |
142 | public function getStatus(int $index): int |
143 | { |
144 | $sql = $this->db->createSql(); |
145 | $sql->select('status')->from($this->table)->where('index = ' . (int)$index); |
146 | $this->db->query($sql); |
147 | |
148 | $rows = $this->db->fetchAll(); |
149 | return (isset($rows[0]['status'])) ? (int)$rows[0]['status'] : 0; |
150 | } |
151 | |
152 | /** |
153 | * Push job on to queue |
154 | * |
155 | * @param AbstractJob $job |
156 | * @return Database |
157 | */ |
158 | public function push(AbstractJob $job): Database |
159 | { |
160 | $status = 1; |
161 | $index = ($this->getEnd() + 1); |
162 | |
163 | if ($job->hasFailed()) { |
164 | $status = 2; |
165 | if ($this->isFilo()) { |
166 | $index = ($this->getStart() - 1); |
167 | } |
168 | } |
169 | |
170 | if ($job->isValid()) { |
171 | $sql = $this->db->createSql(); |
172 | $sql->insert($this->table)->values([ |
173 | 'index' => ':index', |
174 | 'type' => ':type', |
175 | 'job_id' => ':job_id', |
176 | 'payload' => ':payload', |
177 | 'status' => ':status' |
178 | ]); |
179 | |
180 | $jobData = [ |
181 | 'index' => $index, |
182 | 'type' => 'job', |
183 | 'job_id' => $job->getJobId(), |
184 | 'payload' => base64_encode(serialize(clone $job)), |
185 | 'status' => $status |
186 | ]; |
187 | |
188 | $this->db->prepare($sql); |
189 | $this->db->bindParams($jobData); |
190 | $this->db->execute(); |
191 | } |
192 | |
193 | return $this; |
194 | } |
195 | |
196 | /** |
197 | * Pop job off of queue |
198 | * |
199 | * @return ?AbstractJob |
200 | */ |
201 | public function pop(): ?AbstractJob |
202 | { |
203 | $job = false; |
204 | $index = ($this->isFifo()) ? $this->getStart() : $this->getEnd(); |
205 | $status = $this->getStatus($index); |
206 | |
207 | if ($status != 0) { |
208 | $sql = $this->db->createSql(); |
209 | $sql->update($this->table)->values(['status' => 0])->where('index = ' . (int)$index); |
210 | $this->db->query($sql); |
211 | |
212 | $sql->select('payload')->from($this->table)->where('index = ' . (int)$index); |
213 | $this->db->query($sql); |
214 | $rows = $this->db->fetchAll(); |
215 | if (isset($rows[0]['payload'])) { |
216 | $job = $rows[0]['payload']; |
217 | } |
218 | $sql->delete()->from($this->table)->where('index = ' . (int)$index); |
219 | $this->db->query($sql); |
220 | } |
221 | |
222 | return ($job !== false) ? unserialize(base64_decode($job)) : null; |
223 | } |
224 | |
225 | /** |
226 | * Check if adapter has jobs |
227 | * |
228 | * @return bool |
229 | */ |
230 | public function hasJobs(): bool |
231 | { |
232 | $sql = $this->db->createSql(); |
233 | $sql->select(['total' => 'COUNT(1)'])->from($this->table)->where("type = 'job'"); |
234 | $this->db->query($sql); |
235 | $rows = $this->db->fetchAll(); |
236 | |
237 | return (isset($rows[0]['total'])) ? (int)$rows[0]['total'] : 0; |
238 | } |
239 | |
240 | /** |
241 | * Check if adapter has failed job |
242 | * |
243 | * @param int $index |
244 | * @return bool |
245 | */ |
246 | public function hasFailedJob(int $index): bool |
247 | { |
248 | $sql = $this->db->createSql(); |
249 | $sql->select()->from($this->table)->where('index = ' . (int)$index); |
250 | $this->db->query($sql); |
251 | $rows = $this->db->fetchAll(); |
252 | |
253 | return (isset($rows[0])); |
254 | } |
255 | |
256 | /** |
257 | * Get failed job from worker by job ID |
258 | * |
259 | * @param int $index |
260 | * @param bool $unserialize |
261 | * @return mixed |
262 | */ |
263 | public function getFailedJob(int $index, bool $unserialize = true): mixed |
264 | { |
265 | $sql = $this->db->createSql(); |
266 | $sql->select()->from($this->table)->where('index = ' . (int)$index); |
267 | $this->db->query($sql); |
268 | $rows = $this->db->fetchAll(); |
269 | $job = null; |
270 | if (isset($rows[0])) { |
271 | $job = ($unserialize) ? unserialize(base64_decode($rows[0]['payload'])) : $rows[0]; |
272 | } |
273 | |
274 | return $job; |
275 | } |
276 | |
277 | /** |
278 | * Check if adapter has failed jobs |
279 | * |
280 | * @return bool |
281 | */ |
282 | public function hasFailedJobs(): bool |
283 | { |
284 | $sql = $this->db->createSql(); |
285 | $sql->select(['total' => 'COUNT(1)'])->from($this->table)->where("status = 2"); |
286 | $this->db->query($sql); |
287 | $rows = $this->db->fetchAll(); |
288 | |
289 | return (isset($rows[0]['total'])) ? (int)$rows[0]['total'] : 0; |
290 | } |
291 | |
292 | /** |
293 | * Get adapter failed jobs |
294 | * |
295 | * @param bool $unserialize |
296 | * @return array |
297 | */ |
298 | public function getFailedJobs(bool $unserialize = true): array |
299 | { |
300 | $sql = $this->db->createSql(); |
301 | $sql->select()->from($this->table)->where("status = 2"); |
302 | $this->db->query($sql); |
303 | $rows = $this->db->fetchAll(); |
304 | $jobs = []; |
305 | |
306 | foreach ($rows as $row) { |
307 | $jobs[$row['index']] = ($unserialize) ? unserialize(base64_decode($row['payload'])) : $row; |
308 | } |
309 | |
310 | return $jobs; |
311 | } |
312 | |
313 | /** |
314 | * Clear failed jobs out of the queue |
315 | * |
316 | * @return Database |
317 | */ |
318 | public function clearFailed(): Database |
319 | { |
320 | $sql = $this->db->createSql(); |
321 | $sql->delete()->from($this->table)->where("status = 2"); |
322 | $this->db->query($sql); |
323 | |
324 | return $this; |
325 | } |
326 | |
327 | /** |
328 | * Schedule job with queue |
329 | * |
330 | * @param Task $task |
331 | * @return Database |
332 | */ |
333 | public function schedule(Task $task): Database |
334 | { |
335 | if ($task->isValid()) { |
336 | $sql = $this->db->createSql(); |
337 | $sql->insert($this->table)->values([ |
338 | 'type' => ':type', |
339 | 'job_id' => ':job_id', |
340 | 'payload' => ':payload' |
341 | ]); |
342 | |
343 | $jobData = [ |
344 | 'type' => 'task', |
345 | 'job_id' => $task->getJobId(), |
346 | 'payload' => base64_encode(serialize(clone $task)) |
347 | ]; |
348 | |
349 | $this->db->prepare($sql); |
350 | $this->db->bindParams($jobData); |
351 | $this->db->execute(); |
352 | } |
353 | |
354 | return $this; |
355 | } |
356 | |
357 | /** |
358 | * Get scheduled tasks |
359 | * |
360 | * @return array |
361 | */ |
362 | public function getTasks(): array |
363 | { |
364 | $sql = $this->db->createSql(); |
365 | $sql->select('job_id')->from($this->table)->where("type = 'task'"); |
366 | $this->db->query($sql); |
367 | $rows = $this->db->fetchAll(); |
368 | |
369 | $tasks = []; |
370 | |
371 | foreach ($rows as $row) { |
372 | $tasks[] = $row['job_id']; |
373 | } |
374 | |
375 | return $tasks; |
376 | } |
377 | |
378 | /** |
379 | * Get scheduled task |
380 | * |
381 | * @param string $taskId |
382 | * @return ?Task |
383 | */ |
384 | public function getTask(string $taskId): ?Task |
385 | { |
386 | $sql = $this->db->createSql(); |
387 | $sql->select('payload')->from($this->table)->where('job_id = :job_id'); |
388 | $this->db->prepare($sql); |
389 | $this->db->bindParams(['job_id' => $taskId]); |
390 | $this->db->execute(); |
391 | $rows = $this->db->fetchAll(); |
392 | |
393 | return (isset($rows[0]['payload'])) ? unserialize(base64_decode($rows[0]['payload'])) : null; |
394 | } |
395 | |
396 | /** |
397 | * Update scheduled task |
398 | * |
399 | * @param Task $task |
400 | * @return Database |
401 | */ |
402 | public function updateTask(Task $task): Database |
403 | { |
404 | if ($task->isValid()) { |
405 | $sql = $this->db->createSql(); |
406 | $sql->update($this->table)->values([ |
407 | 'payload' => ':payload' |
408 | ])->where('job_id = :job_id'); |
409 | |
410 | $jobData = [ |
411 | 'payload' => base64_encode(serialize(clone $task)), |
412 | 'job_id' => $task->getJobId() |
413 | ]; |
414 | |
415 | $this->db->prepare($sql); |
416 | $this->db->bindParams($jobData); |
417 | $this->db->execute(); |
418 | } else { |
419 | $this->removeTask($task->getJobId()); |
420 | } |
421 | |
422 | return $this; |
423 | } |
424 | |
425 | /** |
426 | * Remove scheduled task |
427 | * |
428 | * @param string $taskId |
429 | * @return Database |
430 | */ |
431 | public function removeTask(string $taskId): Database |
432 | { |
433 | $sql = $this->db->createSql(); |
434 | $sql->delete()->from($this->table)->where('job_id = :job_id'); |
435 | $this->db->prepare($sql); |
436 | $this->db->bindParams(['job_id' => $taskId]); |
437 | $this->db->execute(); |
438 | |
439 | return $this; |
440 | } |
441 | |
442 | /** |
443 | * Get scheduled tasks count |
444 | * |
445 | * @return int |
446 | */ |
447 | public function getTaskCount(): int |
448 | { |
449 | $sql = $this->db->createSql(); |
450 | $sql->select(['total' => 'COUNT(1)'])->from($this->table)->where("type = 'task'"); |
451 | $this->db->query($sql); |
452 | $rows = $this->db->fetchAll(); |
453 | |
454 | return (isset($rows[0]['total'])) ? (int)$rows[0]['total'] : 0; |
455 | } |
456 | |
457 | /** |
458 | * Has scheduled tasks |
459 | * |
460 | * @return bool |
461 | */ |
462 | public function hasTasks(): bool |
463 | { |
464 | return ($this->getTaskCount() > 0); |
465 | } |
466 | |
467 | /** |
468 | * Clear all scheduled task |
469 | * |
470 | * @return Database |
471 | */ |
472 | public function clearTasks(): Database |
473 | { |
474 | $sql = $this->db->createSql(); |
475 | $sql->delete()->from($this->table)->where("type = 'task'"); |
476 | $this->db->query($sql); |
477 | |
478 | return $this; |
479 | } |
480 | |
481 | /** |
482 | * Clear jobs out of queue |
483 | * |
484 | * @return Database |
485 | */ |
486 | public function clear(): Database |
487 | { |
488 | $sql = $this->db->createSql(); |
489 | $sql->delete()->from($this->table); |
490 | $this->db->query($sql); |
491 | |
492 | return $this; |
493 | } |
494 | |
495 | /** |
496 | * Create the database table |
497 | * |
498 | * @param string $table |
499 | * @return Database |
500 | */ |
501 | public function createTable(string $table): Database |
502 | { |
503 | $schema = $this->db->createSchema(); |
504 | |
505 | $schema->create($table) |
506 | ->int('id', 16)->increment() |
507 | ->int('index', 16)->nullable() |
508 | ->varchar('type', 255) |
509 | ->varchar('job_id', 255) |
510 | ->text('payload') |
511 | ->int('status', 1)->defaultIs(1) |
512 | ->primary('id'); |
513 | |
514 | $this->db->query($schema); |
515 | |
516 | return $this; |
517 | } |
518 | |
519 | } |