Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
97.62% |
164 / 168 |
|
86.96% |
20 / 23 |
CRAP | |
0.00% |
0 / 1 |
Redis | |
97.62% |
164 / 168 |
|
86.96% |
20 / 23 |
89 | |
0.00% |
0 / 1 |
__construct | |
60.00% |
3 / 5 |
|
0.00% |
0 / 1 |
3.58 | |||
hasJob | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getJob | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
4 | |||
updateJob | |
100.00% |
22 / 22 |
|
100.00% |
1 / 1 |
12 | |||
hasJobs | |
100.00% |
10 / 10 |
|
100.00% |
1 / 1 |
6 | |||
getJobs | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
4 | |||
hasCompletedJob | |
87.50% |
7 / 8 |
|
0.00% |
0 / 1 |
4.03 | |||
hasCompletedJobs | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
3 | |||
getCompletedJob | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
getCompletedJobs | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
4 | |||
hasFailedJob | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getFailedJob | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
4 | |||
hasFailedJobs | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
3 | |||
getFailedJobs | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
4 | |||
push | |
100.00% |
21 / 21 |
|
100.00% |
1 / 1 |
10 | |||
failed | |
100.00% |
12 / 12 |
|
100.00% |
1 / 1 |
6 | |||
pop | |
100.00% |
9 / 9 |
|
100.00% |
1 / 1 |
4 | |||
clear | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
5 | |||
clearFailed | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
4 | |||
flush | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
flushFailed | |
75.00% |
3 / 4 |
|
0.00% |
0 / 1 |
1.02 | |||
flushAll | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
redis | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 |
1 | <?php |
2 | /** |
3 | * Pop PHP Framework (http://www.popphp.org/) |
4 | * |
5 | * @link https://github.com/popphp/popphp-framework |
6 | * @author Nick Sagona, III <dev@nolainteractive.com> |
7 | * @copyright Copyright (c) 2009-2023 NOLA Interactive, LLC. (http://www.nolainteractive.com) |
8 | * @license http://www.popphp.org/license New BSD License |
9 | */ |
10 | |
11 | /** |
12 | * @namespace |
13 | */ |
14 | namespace Pop\Queue\Adapter; |
15 | |
16 | use Pop\Queue\Queue; |
17 | use Pop\Queue\Processor\Jobs; |
18 | |
19 | /** |
20 | * Redis queue adapter class |
21 | * |
22 | * @category Pop |
23 | * @package Pop\Queue |
24 | * @author Nick Sagona, III <dev@nolainteractive.com> |
25 | * @copyright Copyright (c) 2009-2023 NOLA Interactive, LLC. (http://www.nolainteractive.com) |
26 | * @license http://www.popphp.org/license New BSD License |
27 | * @version 1.2.0 |
28 | */ |
29 | class Redis extends AbstractAdapter |
30 | { |
31 | |
32 | /** |
33 | * Redis object |
34 | * @var \Redis |
35 | */ |
36 | protected $redis = null; |
37 | |
38 | /** |
39 | * Constructor |
40 | * |
41 | * Instantiate the redis queue object |
42 | * |
43 | * @param string $host |
44 | * @param int $port |
45 | * @throws Exception |
46 | */ |
47 | public function __construct($host = 'localhost', $port = 6379) |
48 | { |
49 | if (!class_exists('Redis', false)) { |
50 | throw new Exception('Error: Redis is not available.'); |
51 | } |
52 | |
53 | $this->redis = new \Redis(); |
54 | if (!$this->redis->connect($host, (int)$port)) { |
55 | throw new Exception('Error: Unable to connect to the redis server.'); |
56 | } |
57 | } |
58 | |
59 | /** |
60 | * Check if queue stack has job |
61 | * |
62 | * @param mixed $jobId |
63 | * @return boolean |
64 | */ |
65 | public function hasJob($jobId) |
66 | { |
67 | return ($this->redis->get('pop-queue-' . $jobId) !== false); |
68 | } |
69 | |
70 | /** |
71 | * Get job from queue stack by job ID |
72 | * |
73 | * @param mixed $jobId |
74 | * @param boolean $unserialize |
75 | * @return array |
76 | */ |
77 | public function getJob($jobId, $unserialize = true) |
78 | { |
79 | $job = $this->redis->get('pop-queue-' . $jobId); |
80 | if ($job !== false) { |
81 | $job = unserialize($job); |
82 | $jobPayload = $this->redis->get('pop-queue-' . $jobId . '-payload'); |
83 | if ($jobPayload !== false) { |
84 | $job['payload'] = ($unserialize) ? unserialize($jobPayload) : $jobPayload; |
85 | } |
86 | } |
87 | |
88 | return $job; |
89 | } |
90 | |
91 | /** |
92 | * Update job from queue stack by job ID |
93 | * |
94 | * @param mixed $jobId |
95 | * @param mixed $completed |
96 | * @param mixed $increment |
97 | * @return void |
98 | */ |
99 | public function updateJob($jobId, $completed = false, $increment = false) |
100 | { |
101 | $jobData = $this->getJob($jobId); |
102 | |
103 | if ($jobData !== false) { |
104 | if ($completed !== false) { |
105 | $jobData['completed'] = ($completed === true) ? date('Y-m-d H:i:s') : $completed; |
106 | $queueJobs = $this->redis->get('pop-queue-' . $jobData['queue']); |
107 | $queueJobs = ($queueJobs !== false) ? unserialize($queueJobs) : []; |
108 | $queueCompletedJobs = $this->redis->get('pop-queue-' . $jobData['queue'] . '-completed'); |
109 | $queueCompletedJobs = ($queueCompletedJobs !== false) ? unserialize($queueCompletedJobs) : []; |
110 | |
111 | if (in_array($jobId, $queueJobs)) { |
112 | unset($queueJobs[array_search($jobId, $queueJobs)]); |
113 | $queueJobs = array_values($queueJobs); |
114 | } |
115 | if (!in_array($jobId, $queueCompletedJobs)) { |
116 | $queueCompletedJobs[] = $jobId; |
117 | } |
118 | |
119 | $this->redis->set('pop-queue-' . $jobData['queue'], serialize($queueJobs)); |
120 | $this->redis->set('pop-queue-' . $jobData['queue'] . '-completed', serialize($queueCompletedJobs)); |
121 | } |
122 | if ($increment !== false) { |
123 | if (($increment === true) && isset($jobData['attempts'])) { |
124 | $jobData['attempts']++; |
125 | } else { |
126 | $jobData['attempts'] = (int)$increment; |
127 | } |
128 | } |
129 | |
130 | if (isset($jobData['payload'])) { |
131 | unset($jobData['payload']); |
132 | } |
133 | |
134 | $this->redis->set('pop-queue-' . $jobId, serialize($jobData)); |
135 | } |
136 | } |
137 | |
138 | /** |
139 | * Check if queue has jobs |
140 | * |
141 | * @param mixed $queue |
142 | * @return boolean |
143 | */ |
144 | public function hasJobs($queue) |
145 | { |
146 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
147 | $queueJobs = $this->redis->get('pop-queue-' . $queueName); |
148 | |
149 | if ($queueJobs !== false) { |
150 | $queueJobs = unserialize($queueJobs); |
151 | if (!empty($queueJobs)) { |
152 | foreach ($queueJobs as $jobId) { |
153 | if ($this->hasJob($jobId)) { |
154 | return true; |
155 | } |
156 | } |
157 | } |
158 | return false; |
159 | } else { |
160 | return false; |
161 | } |
162 | } |
163 | |
164 | /** |
165 | * Get queue jobs |
166 | * |
167 | * @param mixed $queue |
168 | * @param boolean $unserialize |
169 | * @return array |
170 | */ |
171 | public function getJobs($queue, $unserialize = true) |
172 | { |
173 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
174 | $queueJobs = $this->redis->get('pop-queue-' . $queueName); |
175 | $jobs = []; |
176 | |
177 | if ($queueJobs !== false) { |
178 | $queueJobs = unserialize($queueJobs); |
179 | foreach ($queueJobs as $jobId) { |
180 | $jobs[$jobId] = $this->getJob($jobId, $unserialize); |
181 | } |
182 | } |
183 | |
184 | return $jobs; |
185 | } |
186 | |
187 | /** |
188 | * Check if queue stack has completed job |
189 | * |
190 | * @param mixed $jobId |
191 | * @return boolean |
192 | */ |
193 | public function hasCompletedJob($jobId) |
194 | { |
195 | $job = $this->getJob($jobId, false); |
196 | |
197 | if ($job !== false) { |
198 | $queueCompletedJobs = $this->redis->get('pop-queue-' . $job['queue'] . '-completed'); |
199 | if ($queueCompletedJobs !== false) { |
200 | $queueCompletedJobs = unserialize($queueCompletedJobs); |
201 | return (in_array($jobId, $queueCompletedJobs) && !empty($job['completed'])); |
202 | } else { |
203 | return false; |
204 | } |
205 | } else { |
206 | return false; |
207 | } |
208 | } |
209 | |
210 | /** |
211 | * Check if queue has completed jobs |
212 | * |
213 | * @param mixed $queue |
214 | * @return boolean |
215 | */ |
216 | public function hasCompletedJobs($queue) |
217 | { |
218 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
219 | $queueCompletedJobs = $this->redis->get('pop-queue-' . $queueName . '-completed'); |
220 | |
221 | if ($queueCompletedJobs !== false) { |
222 | $queueCompletedJobs = unserialize($queueCompletedJobs); |
223 | return (count($queueCompletedJobs) > 0); |
224 | } else { |
225 | return false; |
226 | } |
227 | } |
228 | |
229 | /** |
230 | * Get queue completed job |
231 | * |
232 | * @param mixed $jobId |
233 | * @param boolean $unserialize |
234 | * @return array |
235 | */ |
236 | public function getCompletedJob($jobId, $unserialize = true) |
237 | { |
238 | if ($this->hasCompletedJob($jobId)) { |
239 | return $this->getJob($jobId, $unserialize); |
240 | } else { |
241 | return null; |
242 | } |
243 | } |
244 | |
245 | /** |
246 | * Get queue completed jobs |
247 | * |
248 | * @param mixed $queue |
249 | * @param boolean $unserialize |
250 | * @return array |
251 | */ |
252 | public function getCompletedJobs($queue, $unserialize = true) |
253 | { |
254 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
255 | $queueCompletedJobs = $this->redis->get('pop-queue-' . $queueName . '-completed'); |
256 | $completedJobs = []; |
257 | |
258 | if ($queueCompletedJobs !== false) { |
259 | $queueCompletedJobs = unserialize($queueCompletedJobs); |
260 | foreach ($queueCompletedJobs as $jobId) { |
261 | $completedJobs[$jobId] = $this->getJob($jobId, $unserialize); |
262 | } |
263 | } |
264 | |
265 | return $completedJobs; |
266 | } |
267 | |
268 | /** |
269 | * Check if queue stack has failed job |
270 | * |
271 | * @param mixed $jobId |
272 | * @return boolean |
273 | */ |
274 | public function hasFailedJob($jobId) |
275 | { |
276 | return ($this->redis->get('pop-queue-' . $jobId . '-failed') !== false); |
277 | } |
278 | |
279 | /** |
280 | * Get failed job from queue stack by job ID |
281 | * |
282 | * @param mixed $jobId |
283 | * @param boolean $unserialize |
284 | * @return array |
285 | */ |
286 | public function getFailedJob($jobId, $unserialize = true) |
287 | { |
288 | $job = $this->redis->get('pop-queue-' . $jobId . '-failed'); |
289 | if ($job !== false) { |
290 | $job = unserialize($job); |
291 | $jobPayload = $this->redis->get('pop-queue-' . $jobId . '-payload'); |
292 | if ($jobPayload !== false) { |
293 | $job['payload'] = ($unserialize) ? unserialize($jobPayload) : $jobPayload; |
294 | } |
295 | } |
296 | return $job; |
297 | } |
298 | |
299 | /** |
300 | * Check if queue adapter has failed jobs |
301 | * |
302 | * @param mixed $queue |
303 | * @return boolean |
304 | */ |
305 | public function hasFailedJobs($queue) |
306 | { |
307 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
308 | $queueFailedJobs = $this->redis->get('pop-queue-' . $queueName . '-failed'); |
309 | |
310 | if ($queueFailedJobs !== false) { |
311 | $queueFailedJobs = unserialize($queueFailedJobs); |
312 | return (count($queueFailedJobs) > 0); |
313 | } else { |
314 | return false; |
315 | } |
316 | } |
317 | |
318 | /** |
319 | * Get queue jobs |
320 | * |
321 | * @param mixed $queue |
322 | * @param boolean $unserialize |
323 | * @return array |
324 | */ |
325 | public function getFailedJobs($queue, $unserialize = true) |
326 | { |
327 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
328 | $queueFailedJobs = $this->redis->get('pop-queue-' . $queueName . '-failed'); |
329 | $failedJobs = []; |
330 | |
331 | if ($queueFailedJobs !== false) { |
332 | $queueFailedJobs = unserialize($queueFailedJobs); |
333 | foreach ($queueFailedJobs as $jobId) { |
334 | $failedJobs[$jobId] = $this->getFailedJob($jobId, $unserialize); |
335 | } |
336 | } |
337 | |
338 | return $failedJobs; |
339 | } |
340 | |
341 | /** |
342 | * Push job onto queue stack |
343 | * |
344 | * @param mixed $queue |
345 | * @param mixed $job |
346 | * @param mixed $priority |
347 | * @return string |
348 | */ |
349 | public function push($queue, $job, $priority = null) |
350 | { |
351 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
352 | $jobId = null; |
353 | |
354 | if ($job instanceof Jobs\Schedule) { |
355 | $jobId = ($job->getJob()->hasJobId()) ? $job->getJob()->getJobId() :$job->getJob()->generateJobId(); |
356 | } else if ($job instanceof Jobs\Job) { |
357 | $jobId = ($job->hasJobId()) ? $job->getJobId() : $job->generateJobId(); |
358 | } |
359 | |
360 | $queueJobs = $this->redis->get('pop-queue-' . $queueName); |
361 | $queueJobsCompleted = $this->redis->get('pop-queue-' . $queueName . '-completed'); |
362 | $queueJobsFailed = $this->redis->get('pop-queue-' . $queueName . '-failed'); |
363 | |
364 | $queueJobs = ($queueJobs !== false) ? unserialize($queueJobs) : []; |
365 | $queueJobsCompleted = ($queueJobsCompleted !== false) ? unserialize($queueJobsCompleted) : []; |
366 | $queueJobsFailed = ($queueJobsFailed !== false) ? unserialize($queueJobsFailed) : []; |
367 | |
368 | if (!in_array($jobId, $queueJobs)) { |
369 | $queueJobs[] = $jobId; |
370 | } |
371 | |
372 | $jobData = [ |
373 | 'job_id' => $jobId, |
374 | 'queue' => $queueName, |
375 | 'priority' => $priority, |
376 | 'attempts' => 0, |
377 | 'completed' => null |
378 | ]; |
379 | |
380 | $this->redis->set('pop-queue-' . $queueName, serialize($queueJobs)); |
381 | $this->redis->set('pop-queue-' . $queueName . '-completed', serialize($queueJobsCompleted)); |
382 | $this->redis->set('pop-queue-' . $queueName . '-failed', serialize($queueJobsFailed)); |
383 | $this->redis->set('pop-queue-' . $jobId, serialize($jobData)); |
384 | $this->redis->set('pop-queue-' . $jobId . '-payload', serialize(clone $job)); |
385 | |
386 | return $jobId; |
387 | } |
388 | |
389 | /** |
390 | * Move failed job to failed queue stack |
391 | * |
392 | * @param mixed $queue |
393 | * @param mixed $jobId |
394 | * @param \Exception $exception |
395 | * @return void |
396 | */ |
397 | public function failed($queue, $jobId, \Exception $exception = null) |
398 | { |
399 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
400 | |
401 | $failedJobData = [ |
402 | 'job_id' => $jobId, |
403 | 'queue' => $queueName, |
404 | 'exception' => (null !== $exception) ? $exception->getMessage() : null, |
405 | 'failed' => date('Y-m-d H:i:s') |
406 | ]; |
407 | |
408 | $queueJobsFailed = $this->redis->get('pop-queue-' . $queueName . '-failed'); |
409 | $queueJobsFailed = ($queueJobsFailed !== false) ? unserialize($queueJobsFailed) : []; |
410 | |
411 | if (!in_array($jobId, $queueJobsFailed)) { |
412 | $queueJobsFailed[] = $jobId; |
413 | } |
414 | |
415 | $this->redis->set('pop-queue-' . $jobId . '-failed', serialize($failedJobData)); |
416 | $this->redis->set('pop-queue-' . $queueName . '-failed', serialize($queueJobsFailed)); |
417 | |
418 | if (!empty($jobId)) { |
419 | $this->pop($jobId); |
420 | } |
421 | } |
422 | |
423 | /** |
424 | * Pop job off of queue stack |
425 | * |
426 | * @param mixed $jobId |
427 | * @return void |
428 | */ |
429 | public function pop($jobId) |
430 | { |
431 | $jobData = $this->getJob($jobId); |
432 | |
433 | if ($jobData !== false) { |
434 | $queueJobs = $this->redis->get('pop-queue-' . $jobData['queue']); |
435 | $queueJobs = ($queueJobs !== false) ? unserialize($queueJobs) : []; |
436 | |
437 | if (in_array($jobId, $queueJobs)) { |
438 | unset($queueJobs[array_search($jobId, $queueJobs)]); |
439 | $queueJobs = array_values($queueJobs); |
440 | } |
441 | |
442 | $this->redis->set('pop-queue-' . $jobData['queue'], serialize($queueJobs)); |
443 | } |
444 | |
445 | $this->redis->del('pop-queue-' . $jobId); |
446 | } |
447 | |
448 | /** |
449 | * Clear jobs off of the queue stack |
450 | * |
451 | * @param mixed $queue |
452 | * @param boolean $all |
453 | * @return void |
454 | */ |
455 | public function clear($queue, $all = false) |
456 | { |
457 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
458 | $queueJobs = $this->redis->get('pop-queue-' . $queueName); |
459 | |
460 | if ($queueJobs !== false) { |
461 | $queueJobs = unserialize($queueJobs); |
462 | foreach ($queueJobs as $jobId) { |
463 | $this->redis->del('pop-queue-' . $jobId); |
464 | } |
465 | } |
466 | |
467 | if ($all) { |
468 | $this->redis->del('pop-queue-' . $queueName . '-completed'); |
469 | } |
470 | } |
471 | |
472 | /** |
473 | * Clear failed jobs off of the queue stack |
474 | * |
475 | * @param mixed $queue |
476 | * @return void |
477 | */ |
478 | public function clearFailed($queue) |
479 | { |
480 | $queueName = ($queue instanceof Queue) ? $queue->getName() : $queue; |
481 | $queueFailedJobs = $this->redis->get('pop-queue-' . $queueName . '-failed'); |
482 | |
483 | if ($queueFailedJobs !== false) { |
484 | $queueFailedJobs = unserialize($queueFailedJobs); |
485 | foreach ($queueFailedJobs as $failedJobId) { |
486 | $this->redis->del('pop-queue-' . $failedJobId . '-failed'); |
487 | } |
488 | } |
489 | |
490 | $this->redis->del('pop-queue-' . $queueName . '-failed'); |
491 | } |
492 | |
493 | /** |
494 | * Flush all jobs off of the queue stack |
495 | * |
496 | * @param boolean $all |
497 | * @return void |
498 | */ |
499 | public function flush($all = false) |
500 | { |
501 | $keys = $this->redis->keys('pop-queue-*'); |
502 | $keys = array_filter($keys, function($value) { |
503 | return (strpos($value, 'failed') === false); |
504 | }); |
505 | if ($all) { |
506 | $this->redis->del($keys); |
507 | } |
508 | } |
509 | |
510 | /** |
511 | * Flush all failed jobs off of the queue stack |
512 | * |
513 | * @return void |
514 | */ |
515 | public function flushFailed() |
516 | { |
517 | $keys = $this->redis->keys('pop-queue-*'); |
518 | $keys = array_filter($keys, function($value) { |
519 | return (strpos($value, 'failed') !== false); |
520 | }); |
521 | $this->redis->del($keys); |
522 | } |
523 | |
524 | /** |
525 | * Flush all pop queue items |
526 | * |
527 | * @return void |
528 | */ |
529 | public function flushAll() |
530 | { |
531 | $this->redis->del($this->redis->keys('pop-queue-*')); |
532 | } |
533 | |
534 | /** |
535 | * Get the redis object. |
536 | * |
537 | * @return \Redis |
538 | */ |
539 | public function redis() |
540 | { |
541 | return $this->redis; |
542 | } |
543 | |
544 | } |