mirror of
https://github.com/RaspbianFrance/raspisms.git
synced 2025-08-28 15:00:05 +02:00
Improve redis queue handling of error
This commit is contained in:
parent
08cbb0f12e
commit
9e2bd47e34
1 changed files with 19 additions and 11 deletions
|
@ -86,21 +86,29 @@ class RedisQueue implements Queue
|
|||
// Ignore error if the group already exists
|
||||
}
|
||||
|
||||
// Read a single message starting from the oldest (>)
|
||||
$messages = $this->redis->xReadGroup($this->group, $this->consumer, [$stream => '>'], 1);
|
||||
if (!count($messages))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
// Find the message, acknowledge it and return it
|
||||
foreach ($messages as $stream_name => $entries)
|
||||
try
|
||||
{
|
||||
foreach ($entries as $message_id => $message)
|
||||
// Read a single message starting from the oldest (>)
|
||||
$messages = $this->redis->xReadGroup($this->group, $this->consumer, [$stream => '>'], 1);
|
||||
if (!count($messages))
|
||||
{
|
||||
$success = $this->redis->xAck($stream, $this->group, [$message_id]);
|
||||
return $message['message'];
|
||||
return null;
|
||||
}
|
||||
|
||||
// Find the message, acknowledge it and return it
|
||||
foreach ($messages as $stream_name => $entries)
|
||||
{
|
||||
foreach ($entries as $message_id => $message)
|
||||
{
|
||||
$success = $this->redis->xAck($stream, $this->group, [$message_id]);
|
||||
return $message['message'];
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception $e)
|
||||
{
|
||||
throw new \Exception('Redis server failed to answer !');
|
||||
}
|
||||
|
||||
return null;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue