Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
97.98% covered (success)
97.98%
97 / 99
96.00% covered (success)
96.00%
24 / 25
CRAP
0.00% covered (danger)
0.00%
0 / 1
Redis
97.98% covered (success)
97.98%
97 / 99
96.00% covered (success)
96.00%
24 / 25
53
0.00% covered (danger)
0.00%
0 / 1
 __construct
71.43% covered (success)
71.43%
5 / 7
0.00% covered (danger)
0.00%
0 / 1
3.21
 create
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getRedis
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 redis
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getPrefix
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getStart
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getEnd
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getStatus
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 push
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
7
 pop
100.00% covered (success)
100.00%
14 / 14
100.00% covered (success)
100.00%
1 / 1
5
 hasJobs
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasFailedJob
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getFailedJob
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
3
 hasFailedJobs
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
4
 getFailedJobs
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
4
 clearFailed
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
4
 schedule
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 getTasks
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 getTask
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 updateTask
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 removeTask
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getTaskCount
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 hasTasks
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 clearTasks
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 clear
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
2
1<?php
2/**
3 * Pop PHP Framework (http://www.popphp.org/)
4 *
5 * @link       https://github.com/popphp/popphp-framework
6 * @author     Nick Sagona, III <dev@nolainteractive.com>
7 * @copyright  Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com)
8 * @license    http://www.popphp.org/license     New BSD License
9 */
10
11/**
12 * @namespace
13 */
14namespace Pop\Queue\Adapter;
15
16use Pop\Queue\Process\AbstractJob;
17use Pop\Queue\Process\Task;
18
19/**
20 * Redis adapter class
21 *
22 * @category   Pop
23 * @package    Pop\Queue
24 * @author     Nick Sagona, III <dev@nolainteractive.com>
25 * @copyright  Copyright (c) 2009-2024 NOLA Interactive, LLC. (http://www.nolainteractive.com)
26 * @license    http://www.popphp.org/license     New BSD License
27 * @version    2.0.0
28 */
29class Redis extends AbstractTaskAdapter
30{
31
32    /**
33     * Redis object
34     * @var \Redis|null
35     */
36    protected \Redis|null $redis = null;
37
38
39    /**
40     * Queue prefix
41     * @var string
42     */
43    protected string $prefix = 'pop-queue';
44
45    /**
46     * Constructor
47     *
48     * Instantiate the redis adapter
49     *
50     * @param  string     $host
51     * @param  int|string $port
52     * @param  string     $prefix
53     * @param  ?string    $priority
54     * @throws Exception|\RedisException
55     */
56    public function __construct(
57        string $host = 'localhost', int|string $port = 6379, string $prefix = 'pop-queue', ?string $priority = null
58    )
59    {
60        if (!class_exists('Redis', false)) {
61            throw new Exception('Error: Redis is not available.');
62        }
63
64        $this->redis  = new \Redis();
65        $this->prefix = $prefix;
66        if (!$this->redis->connect($host, (int)$port)) {
67            throw new Exception('Error: Unable to connect to the redis server.');
68        }
69
70        parent::__construct($priority);
71    }
72
73    /**
74     * Create Redis adapter
75     *
76     * @param  string     $host
77     * @param  int|string $port
78     * @param  string     $prefix
79     * @param  ?string    $priority
80     * @throws Exception|\RedisException
81     * @return Redis
82     */
83    public static function create(
84        string $host = 'localhost', int|string $port = 6379, string $prefix = 'pop-queue', ?string $priority = null
85    ): Redis
86    {
87        return new self($host, $port, $prefix, $priority);
88    }
89
90    /**
91     * Get Redis object
92     *
93     * @return \Redis|null
94     */
95    public function getRedis(): \Redis|null
96    {
97        return $this->redis;
98    }
99
100    /**
101     * Get Redis object (alias)
102     *
103     * @return \Redis|null
104     */
105    public function redis(): \Redis|null
106    {
107        return $this->redis;
108    }
109
110    /**
111     * Get prefix
112     *
113     * @return string
114     */
115    public function getPrefix(): string
116    {
117        return $this->prefix;
118    }
119
120    /**
121     * Get queue start index
122     *
123     * @return int
124     */
125    public function getStart(): int
126    {
127        return 0;
128    }
129
130    /**
131     * Get queue length
132     *
133     * @return int
134     */
135    public function getEnd(): int
136    {
137        return $this->redis->lLen($this->prefix);
138    }
139
140    /**
141     * Get queue job status
142     *
143     * @param  int $index
144     * @return int
145     */
146    public function getStatus(int $index): int
147    {
148        return (int)$this->redis->lIndex($this->prefix . ':status', $index);
149    }
150
151    /**
152     * Push job on to queue
153     *
154     * @param  AbstractJob $job
155     * @return Redis
156     */
157    public function push(AbstractJob $job): Redis
158    {
159        $status = ($job->hasFailed()) ? 2 : 1;
160        if ($job->isValid()) {
161            if (($job->hasFailed()) && ($this->isFilo())) {
162                if (($this->redis->rPush($this->prefix, serialize(clone $job))) !== false) {
163                    $this->redis->rPush($this->prefix . ':status', $status);
164                }
165            } else {
166                if (($this->redis->lPush($this->prefix, serialize(clone $job))) !== false) {
167                    $this->redis->lPush($this->prefix . ':status', $status);
168                }
169            }
170        }
171
172        return $this;
173    }
174
175    /**
176     * Pop job off of queue
177     *
178     * @return ?AbstractJob
179     */
180    public function pop(): ?AbstractJob
181    {
182        $job    = false;
183        $length = $this->getEnd();
184
185        if ($this->isFilo()) {
186            $status = $this->getStatus(0);
187            if ($status != 0) {
188                $this->redis->lSet($this->prefix . ':status', 0, 0);
189                $job = $this->redis->lPop($this->prefix);
190                $this->redis->lPop($this->prefix . ':status');
191            }
192        } else {
193            $status = $this->getStatus($length - 1);
194            if ($status != 0) {
195                $this->redis->lSet($this->prefix . ':status', $length - 1, 0);
196                $job = $this->redis->rPop($this->prefix);
197                $this->redis->rPop($this->prefix . ':status');
198            }
199        }
200
201        return ($job !== false) ? unserialize($job) : null;
202    }
203
204    /**
205     * Check if adapter has jobs
206     *
207     * @return bool
208     */
209    public function hasJobs(): bool
210    {
211        return ($this->redis->lLen($this->prefix) > 0);
212    }
213
214    /**
215     * Check if adapter has failed job
216     *
217     * @param int $index
218     * @return bool
219     */
220    public function hasFailedJob(int $index): bool
221    {
222        return ($this->getStatus($index) == 2);
223    }
224
225    /**
226     * Get failed job
227     *
228     * @param  int  $index
229     * @param  bool $unserialize
230     * @return mixed
231     */
232    public function getFailedJob(int $index, bool $unserialize = true): mixed
233    {
234        $job = null;
235
236        if ($this->getStatus($index) == 2) {
237            $job = $this->redis->lIndex($this->prefix, $index);
238            if ($unserialize) {
239                $job = unserialize($job);
240            }
241        }
242
243        return $job;
244    }
245
246    /**
247     * Check if adapter has failed jobs
248     *
249     * @return bool
250     */
251    public function hasFailedJobs(): bool
252    {
253        $result = false;
254        $length = $this->redis->lLen($this->prefix);
255
256        if ($length > 0) {
257            for ($i = 0; $i < $length; $i++) {
258                if ($this->getStatus($i) == 2) {
259                    $result = true;
260                    break;
261                }
262            }
263        }
264
265        return $result;
266    }
267
268    /**
269     * Get adapter failed jobs
270     *
271     * @param  bool $unserialize
272     * @return array
273     */
274    public function getFailedJobs(bool $unserialize = true): array
275    {
276        $jobs   = [];
277        $length = $this->redis->lLen($this->prefix);
278
279        if ($length > 0) {
280            for ($i = 0; $i < $length; $i++) {
281                if ($this->getStatus($i) == 2) {
282                    $jobs[$i] = $this->getFailedJob($i, $unserialize);
283                }
284            }
285        }
286
287        return $jobs;
288    }
289
290    /**
291     * Clear failed jobs out of the queue
292     *
293     * @return Redis
294     */
295    public function clearFailed(): Redis
296    {
297        $length = $this->redis->lLen($this->prefix);
298
299        if ($length > 0) {
300            for ($i = 0; $i < $length; $i++) {
301                if ($this->getStatus($i) == 2) {
302                    $this->redis->lRem($this->prefix, $this->redis->lIndex($this->prefix, $i));
303                    $this->redis->lRem($this->prefix . ':status', $this->redis->lIndex($this->prefix . ':status', $i));
304                }
305            }
306        }
307        return $this;
308    }
309
310    /**
311     * Push job on to queue
312     *
313     * @param  Task $task
314     * @return Redis
315     */
316    public function schedule(Task $task): Redis
317    {
318        if ($task->isValid()) {
319            $this->redis->set($this->prefix . ':task-' . $task->getJobId(), serialize(clone $task));
320        }
321        return $this;
322    }
323
324    /**
325     * Get scheduled tasks
326     *
327     * @return array
328     */
329    public function getTasks(): array
330    {
331        $taskIds = $this->redis->keys($this->prefix . ':task-*');
332        return array_map(function($value) {
333            return substr($value, (strpos($value, ':task-') + 6));
334        }, $taskIds);
335    }
336
337    /**
338     * Get scheduled task
339     *
340     * @param  string $taskId
341     * @return ?Task
342     */
343    public function getTask(string $taskId): ?Task
344    {
345        $task = $this->redis->get($this->prefix . ':task-' . $taskId);
346        return ($task !== false) ? unserialize($task) : null;
347    }
348
349    /**
350     * Update scheduled task
351     *
352     * @param  Task $task
353     * @return Redis
354     */
355    public function updateTask(Task $task): Redis
356    {
357        if ($task->isValid()) {
358            $this->redis->set($this->prefix . ':task-' . $task->getJobId(), serialize(clone $task));
359        } else {
360            $this->removeTask($task->getJobId());
361        }
362        return $this;
363    }
364
365    /**
366     * Remove scheduled task
367     *
368     * @param  string $taskId
369     * @return Redis
370     */
371    public function removeTask(string $taskId): Redis
372    {
373        $this->redis->del($this->prefix . ':task-' . $taskId);
374        return $this;
375    }
376
377    /**
378     * Get scheduled tasks count
379     *
380     * @return int
381     */
382    public function getTaskCount(): int
383    {
384        $taskIds = $this->redis->keys($this->prefix . ':task-*');
385        return count(array_map(function($value) {
386            return substr($value, (strpos($value, ':task-') + 6));
387        }, $taskIds));
388    }
389
390    /**
391     * Has scheduled tasks
392     *
393     * @return bool
394     */
395    public function hasTasks(): bool
396    {
397        $taskIds = $this->redis->keys($this->prefix . ':task-*');
398        return !empty(array_map(function($value) {
399            return substr($value, (strpos($value, ':task-') + 6));
400        }, $taskIds));
401    }
402
403    /**
404     * Clear all scheduled task
405     *
406     * @return Redis
407     */
408    public function clearTasks(): Redis
409    {
410        $taskIds = $this->getTasks();
411
412        foreach ($taskIds as $taskId) {
413            $this->removeTask($taskId);
414        }
415        return $this;
416    }
417
418
419    /**
420     * Clear jobs out of queue
421     *
422     * @return Redis
423     */
424    public function clear(): Redis
425    {
426        $taskIds = $this->redis->keys($this->prefix . ':task-*');
427        foreach ($taskIds as $taskId) {
428            $this->redis->del($taskId);
429        }
430
431        $this->redis->del($this->prefix . ':status');
432        $this->redis->del($this->prefix);
433
434        return $this;
435    }
436
437}