Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
97.98% |
97 / 99 |
|
96.00% |
24 / 25 |
CRAP | |
0.00% |
0 / 1 |
Redis | |
97.98% |
97 / 99 |
|
96.00% |
24 / 25 |
53 | |
0.00% |
0 / 1 |
__construct | |
71.43% |
5 / 7 |
|
0.00% |
0 / 1 |
3.21 | |||
create | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getRedis | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
redis | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getPrefix | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getStart | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getEnd | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getStatus | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
push | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
7 | |||
pop | |
100.00% |
14 / 14 |
|
100.00% |
1 / 1 |
5 | |||
hasJobs | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
hasFailedJob | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getFailedJob | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
3 | |||
hasFailedJobs | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
4 | |||
getFailedJobs | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
4 | |||
clearFailed | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
4 | |||
schedule | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
getTasks | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
getTask | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
2 | |||
updateTask | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
2 | |||
removeTask | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
getTaskCount | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
hasTasks | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
clearTasks | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
2 | |||
clear | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
2 |
1 | <?php |
2 | /** |
3 | * Pop PHP Framework (http://www.popphp.org/) |
4 | * |
5 | * @link https://github.com/popphp/popphp-framework |
6 | * @author Nick Sagona, III <dev@nolainteractive.com> |
7 | * @copyright Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com) |
8 | * @license http://www.popphp.org/license New BSD License |
9 | */ |
10 | |
11 | /** |
12 | * @namespace |
13 | */ |
14 | namespace Pop\Queue\Adapter; |
15 | |
16 | use Pop\Queue\Process\AbstractJob; |
17 | use Pop\Queue\Process\Task; |
18 | |
19 | /** |
20 | * Redis adapter class |
21 | * |
22 | * @category Pop |
23 | * @package Pop\Queue |
24 | * @author Nick Sagona, III <dev@nolainteractive.com> |
25 | * @copyright Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com) |
26 | * @license http://www.popphp.org/license New BSD License |
27 | * @version 2.0.0 |
28 | */ |
29 | class Redis extends AbstractTaskAdapter |
30 | { |
31 | |
32 | /** |
33 | * Redis object |
34 | * @var \Redis|null |
35 | */ |
36 | protected \Redis|null $redis = null; |
37 | |
38 | |
39 | /** |
40 | * Queue prefix |
41 | * @var string |
42 | */ |
43 | protected string $prefix = 'pop-queue'; |
44 | |
45 | /** |
46 | * Constructor |
47 | * |
48 | * Instantiate the redis adapter |
49 | * |
50 | * @param string $host |
51 | * @param int|string $port |
52 | * @param string $prefix |
53 | * @param ?string $priority |
54 | * @throws Exception|\RedisException |
55 | */ |
56 | public function __construct( |
57 | string $host = 'localhost', int|string $port = 6379, string $prefix = 'pop-queue', ?string $priority = null |
58 | ) |
59 | { |
60 | if (!class_exists('Redis', false)) { |
61 | throw new Exception('Error: Redis is not available.'); |
62 | } |
63 | |
64 | $this->redis = new \Redis(); |
65 | $this->prefix = $prefix; |
66 | if (!$this->redis->connect($host, (int)$port)) { |
67 | throw new Exception('Error: Unable to connect to the redis server.'); |
68 | } |
69 | |
70 | parent::__construct($priority); |
71 | } |
72 | |
73 | /** |
74 | * Create Redis adapter |
75 | * |
76 | * @param string $host |
77 | * @param int|string $port |
78 | * @param string $prefix |
79 | * @param ?string $priority |
80 | * @throws Exception|\RedisException |
81 | * @return Redis |
82 | */ |
83 | public static function create( |
84 | string $host = 'localhost', int|string $port = 6379, string $prefix = 'pop-queue', ?string $priority = null |
85 | ): Redis |
86 | { |
87 | return new self($host, $port, $prefix, $priority); |
88 | } |
89 | |
90 | /** |
91 | * Get Redis object |
92 | * |
93 | * @return \Redis|null |
94 | */ |
95 | public function getRedis(): \Redis|null |
96 | { |
97 | return $this->redis; |
98 | } |
99 | |
100 | /** |
101 | * Get Redis object (alias) |
102 | * |
103 | * @return \Redis|null |
104 | */ |
105 | public function redis(): \Redis|null |
106 | { |
107 | return $this->redis; |
108 | } |
109 | |
110 | /** |
111 | * Get prefix |
112 | * |
113 | * @return string |
114 | */ |
115 | public function getPrefix(): string |
116 | { |
117 | return $this->prefix; |
118 | } |
119 | |
120 | /** |
121 | * Get queue start index |
122 | * |
123 | * @return int |
124 | */ |
125 | public function getStart(): int |
126 | { |
127 | return 0; |
128 | } |
129 | |
130 | /** |
131 | * Get queue length |
132 | * |
133 | * @return int |
134 | */ |
135 | public function getEnd(): int |
136 | { |
137 | return $this->redis->lLen($this->prefix); |
138 | } |
139 | |
140 | /** |
141 | * Get queue job status |
142 | * |
143 | * @param int $index |
144 | * @return int |
145 | */ |
146 | public function getStatus(int $index): int |
147 | { |
148 | return (int)$this->redis->lIndex($this->prefix . ':status', $index); |
149 | } |
150 | |
151 | /** |
152 | * Push job on to queue |
153 | * |
154 | * @param AbstractJob $job |
155 | * @return Redis |
156 | */ |
157 | public function push(AbstractJob $job): Redis |
158 | { |
159 | $status = ($job->hasFailed()) ? 2 : 1; |
160 | if ($job->isValid()) { |
161 | if (($job->hasFailed()) && ($this->isFilo())) { |
162 | if (($this->redis->rPush($this->prefix, serialize(clone $job))) !== false) { |
163 | $this->redis->rPush($this->prefix . ':status', $status); |
164 | } |
165 | } else { |
166 | if (($this->redis->lPush($this->prefix, serialize(clone $job))) !== false) { |
167 | $this->redis->lPush($this->prefix . ':status', $status); |
168 | } |
169 | } |
170 | } |
171 | |
172 | return $this; |
173 | } |
174 | |
175 | /** |
176 | * Pop job off of queue |
177 | * |
178 | * @return ?AbstractJob |
179 | */ |
180 | public function pop(): ?AbstractJob |
181 | { |
182 | $job = false; |
183 | $length = $this->getEnd(); |
184 | |
185 | if ($this->isFilo()) { |
186 | $status = $this->getStatus(0); |
187 | if ($status != 0) { |
188 | $this->redis->lSet($this->prefix . ':status', 0, 0); |
189 | $job = $this->redis->lPop($this->prefix); |
190 | $this->redis->lPop($this->prefix . ':status'); |
191 | } |
192 | } else { |
193 | $status = $this->getStatus($length - 1); |
194 | if ($status != 0) { |
195 | $this->redis->lSet($this->prefix . ':status', $length - 1, 0); |
196 | $job = $this->redis->rPop($this->prefix); |
197 | $this->redis->rPop($this->prefix . ':status'); |
198 | } |
199 | } |
200 | |
201 | return ($job !== false) ? unserialize($job) : null; |
202 | } |
203 | |
204 | /** |
205 | * Check if adapter has jobs |
206 | * |
207 | * @return bool |
208 | */ |
209 | public function hasJobs(): bool |
210 | { |
211 | return ($this->redis->lLen($this->prefix) > 0); |
212 | } |
213 | |
214 | /** |
215 | * Check if adapter has failed job |
216 | * |
217 | * @param int $index |
218 | * @return bool |
219 | */ |
220 | public function hasFailedJob(int $index): bool |
221 | { |
222 | return ($this->getStatus($index) == 2); |
223 | } |
224 | |
225 | /** |
226 | * Get failed job |
227 | * |
228 | * @param int $index |
229 | * @param bool $unserialize |
230 | * @return mixed |
231 | */ |
232 | public function getFailedJob(int $index, bool $unserialize = true): mixed |
233 | { |
234 | $job = null; |
235 | |
236 | if ($this->getStatus($index) == 2) { |
237 | $job = $this->redis->lIndex($this->prefix, $index); |
238 | if ($unserialize) { |
239 | $job = unserialize($job); |
240 | } |
241 | } |
242 | |
243 | return $job; |
244 | } |
245 | |
246 | /** |
247 | * Check if adapter has failed jobs |
248 | * |
249 | * @return bool |
250 | */ |
251 | public function hasFailedJobs(): bool |
252 | { |
253 | $result = false; |
254 | $length = $this->redis->lLen($this->prefix); |
255 | |
256 | if ($length > 0) { |
257 | for ($i = 0; $i < $length; $i++) { |
258 | if ($this->getStatus($i) == 2) { |
259 | $result = true; |
260 | break; |
261 | } |
262 | } |
263 | } |
264 | |
265 | return $result; |
266 | } |
267 | |
268 | /** |
269 | * Get adapter failed jobs |
270 | * |
271 | * @param bool $unserialize |
272 | * @return array |
273 | */ |
274 | public function getFailedJobs(bool $unserialize = true): array |
275 | { |
276 | $jobs = []; |
277 | $length = $this->redis->lLen($this->prefix); |
278 | |
279 | if ($length > 0) { |
280 | for ($i = 0; $i < $length; $i++) { |
281 | if ($this->getStatus($i) == 2) { |
282 | $jobs[$i] = $this->getFailedJob($i, $unserialize); |
283 | } |
284 | } |
285 | } |
286 | |
287 | return $jobs; |
288 | } |
289 | |
290 | /** |
291 | * Clear failed jobs out of the queue |
292 | * |
293 | * @return Redis |
294 | */ |
295 | public function clearFailed(): Redis |
296 | { |
297 | $length = $this->redis->lLen($this->prefix); |
298 | |
299 | if ($length > 0) { |
300 | for ($i = 0; $i < $length; $i++) { |
301 | if ($this->getStatus($i) == 2) { |
302 | $this->redis->lRem($this->prefix, $this->redis->lIndex($this->prefix, $i)); |
303 | $this->redis->lRem($this->prefix . ':status', $this->redis->lIndex($this->prefix . ':status', $i)); |
304 | } |
305 | } |
306 | } |
307 | return $this; |
308 | } |
309 | |
310 | /** |
311 | * Push job on to queue |
312 | * |
313 | * @param Task $task |
314 | * @return Redis |
315 | */ |
316 | public function schedule(Task $task): Redis |
317 | { |
318 | if ($task->isValid()) { |
319 | $this->redis->set($this->prefix . ':task-' . $task->getJobId(), serialize(clone $task)); |
320 | } |
321 | return $this; |
322 | } |
323 | |
324 | /** |
325 | * Get scheduled tasks |
326 | * |
327 | * @return array |
328 | */ |
329 | public function getTasks(): array |
330 | { |
331 | $taskIds = $this->redis->keys($this->prefix . ':task-*'); |
332 | return array_map(function($value) { |
333 | return substr($value, (strpos($value, ':task-') + 6)); |
334 | }, $taskIds); |
335 | } |
336 | |
337 | /** |
338 | * Get scheduled task |
339 | * |
340 | * @param string $taskId |
341 | * @return ?Task |
342 | */ |
343 | public function getTask(string $taskId): ?Task |
344 | { |
345 | $task = $this->redis->get($this->prefix . ':task-' . $taskId); |
346 | return ($task !== false) ? unserialize($task) : null; |
347 | } |
348 | |
349 | /** |
350 | * Update scheduled task |
351 | * |
352 | * @param Task $task |
353 | * @return Redis |
354 | */ |
355 | public function updateTask(Task $task): Redis |
356 | { |
357 | if ($task->isValid()) { |
358 | $this->redis->set($this->prefix . ':task-' . $task->getJobId(), serialize(clone $task)); |
359 | } else { |
360 | $this->removeTask($task->getJobId()); |
361 | } |
362 | return $this; |
363 | } |
364 | |
365 | /** |
366 | * Remove scheduled task |
367 | * |
368 | * @param string $taskId |
369 | * @return Redis |
370 | */ |
371 | public function removeTask(string $taskId): Redis |
372 | { |
373 | $this->redis->del($this->prefix . ':task-' . $taskId); |
374 | return $this; |
375 | } |
376 | |
377 | /** |
378 | * Get scheduled tasks count |
379 | * |
380 | * @return int |
381 | */ |
382 | public function getTaskCount(): int |
383 | { |
384 | $taskIds = $this->redis->keys($this->prefix . ':task-*'); |
385 | return count(array_map(function($value) { |
386 | return substr($value, (strpos($value, ':task-') + 6)); |
387 | }, $taskIds)); |
388 | } |
389 | |
390 | /** |
391 | * Has scheduled tasks |
392 | * |
393 | * @return bool |
394 | */ |
395 | public function hasTasks(): bool |
396 | { |
397 | $taskIds = $this->redis->keys($this->prefix . ':task-*'); |
398 | return !empty(array_map(function($value) { |
399 | return substr($value, (strpos($value, ':task-') + 6)); |
400 | }, $taskIds)); |
401 | } |
402 | |
403 | /** |
404 | * Clear all scheduled task |
405 | * |
406 | * @return Redis |
407 | */ |
408 | public function clearTasks(): Redis |
409 | { |
410 | $taskIds = $this->getTasks(); |
411 | |
412 | foreach ($taskIds as $taskId) { |
413 | $this->removeTask($taskId); |
414 | } |
415 | return $this; |
416 | } |
417 | |
418 | |
419 | /** |
420 | * Clear jobs out of queue |
421 | * |
422 | * @return Redis |
423 | */ |
424 | public function clear(): Redis |
425 | { |
426 | $taskIds = $this->redis->keys($this->prefix . ':task-*'); |
427 | foreach ($taskIds as $taskId) { |
428 | $this->redis->del($taskId); |
429 | } |
430 | |
431 | $this->redis->del($this->prefix . ':status'); |
432 | $this->redis->del($this->prefix); |
433 | |
434 | return $this; |
435 | } |
436 | |
437 | } |