From 36c5d7ec0c56a6dda0836a9e042141c976208860 Mon Sep 17 00:00:00 2001 From: osaajani <> Date: Thu, 31 Oct 2024 21:22:56 +0100 Subject: [PATCH] Add support for Redis in addition to System V queues --- controllers/internals/Mailer.php | 7 +- controllers/internals/Queue.php | 86 ++++++++++++++++++++++ controllers/internals/Webhook.php | 7 +- daemons/Mailer.php | 25 ++----- daemons/Phone.php | 28 +++----- daemons/Sender.php | 37 +++++----- daemons/Webhook.php | 41 ++++------- env.prod.php.dist | 7 +- models/Queue.php | 39 ++++++++++ models/RedisQueue.php | 108 ++++++++++++++++++++++++++++ models/SystemVQueue.php | 115 ++++++++++++++++++++++++++++++ 11 files changed, 407 insertions(+), 93 deletions(-) create mode 100644 controllers/internals/Queue.php create mode 100644 models/Queue.php create mode 100644 models/RedisQueue.php create mode 100644 models/SystemVQueue.php diff --git a/controllers/internals/Mailer.php b/controllers/internals/Mailer.php index 04d7204..7c8fa90 100644 --- a/controllers/internals/Mailer.php +++ b/controllers/internals/Mailer.php @@ -117,11 +117,10 @@ class Mailer extends \descartes\Controller 'attachments' => $attachments, ]; - $error_code = null; - $queue = msg_get_queue(QUEUE_ID_EMAIL); - $success = msg_send($queue, QUEUE_TYPE_EMAIL, $message, true, true, $error_code); + $queue = new Queue(QUEUE_ID_EMAIL); + $queue->push(json_encode($message), QUEUE_TYPE_EMAIL); - return (bool) $success; + return true; } /** diff --git a/controllers/internals/Queue.php b/controllers/internals/Queue.php new file mode 100644 index 0000000..2275cb9 --- /dev/null +++ b/controllers/internals/Queue.php @@ -0,0 +1,86 @@ + + * + * This source file is subject to the GPL-3.0 license that is bundled + * with this source code in the file LICENSE. + */ + +namespace controllers\internals; + +use Exception; +use models\RedisQueue; +use models\SystemVQueue; + +class Queue extends \descartes\InternalController +{ + private $queue; + + /** + * A class to interact with queue, the class is in charge to choose the type of queue (redis/system v) to use + */ + public function __construct($id) + { + if (USE_REDIS_QUEUES ?? false) + { + $params = []; + if (REDIS_HOST ?? false) + { + $params['host'] = REDIS_HOST; + } + + if (REDIS_PORT ?? false) + { + $params['port'] = REDIS_PORT; + } + + if (REDIS_PASSWORD ?? false) + { + $params['auth'] = REDIS_PASSWORD; + } + + $this->queue = new RedisQueue($id, $params, 'raspisms', 'raspisms'); + } + else + { + $this->queue = new SystemVQueue($id); + } + } + + /** + * Add a message to the queue + * + * @param string $message : The message to add to the queue + * @param ?string $tag : A tag to associate to the message for routing purposes, if null will add to general queue + */ + public function push($message, ?string $tag = null) + { + return $this->queue->push($message, $tag); + } + + /** + * Read the older message in the queue + * + * @return mixed $message : The oldest message or null if no message found, can be anything + * @param ?string $tag : A tag to associate to the message for routing purposes, if null will read from general queue + * @param mixed : The message to add to the queue, can be anything, the queue will have to treat it by itself + */ + public function read(?string $tag = null) + { + return $this->queue->read($tag); + } + + /** + * Function to close system V queue for cleaning resources, usefull only if system V queue + */ + public function close() + { + if ($this->queue instanceof SystemVQueue) + { + $this->queue->close(); + } + } +} diff --git a/controllers/internals/Webhook.php b/controllers/internals/Webhook.php index 6a2e8cf..fc9cd24 100644 --- a/controllers/internals/Webhook.php +++ b/controllers/internals/Webhook.php @@ -135,12 +135,11 @@ class Webhook extends StandardController ], ]; - $error_code = null; - $queue = msg_get_queue(QUEUE_ID_WEBHOOK); - msg_send($queue, QUEUE_TYPE_WEBHOOK, $message, true, true, $error_code); + $queue = new Queue(QUEUE_ID_WEBHOOK); + $success = $queue->push(json_encode($message), QUEUE_TYPE_WEBHOOK); } - return true; + return (bool) $success; } /** diff --git a/daemons/Mailer.php b/daemons/Mailer.php index a25b253..cc0cf34 100644 --- a/daemons/Mailer.php +++ b/daemons/Mailer.php @@ -11,6 +11,7 @@ namespace daemons; +use controllers\internals\Queue; use Monolog\Handler\StreamHandler; use Monolog\Logger; @@ -19,7 +20,7 @@ use Monolog\Logger; */ class Mailer extends AbstractDaemon { - private $mailer_queue; + private ?Queue $mailer_queue; private $last_message_at; private $bdd; @@ -49,27 +50,15 @@ class Mailer extends AbstractDaemon $find_message = true; while ($find_message) { - //Call message - $msgtype = null; - $maxsize = 409600; - $message = null; + $message = $this->mailer_queue->read(QUEUE_TYPE_EMAIL); - $error_code = null; - $success = msg_receive($this->mailer_queue, QUEUE_TYPE_EMAIL, $msgtype, $maxsize, $message, true, MSG_IPC_NOWAIT, $error_code); //MSG_IPC_NOWAIT == dont wait if no message found - if (!$success && MSG_ENOMSG !== $error_code) + if ($message === null) { - $this->logger->critical('Error for mailer queue reading, error code : ' . $error_code); $find_message = false; - continue; } - if (!$message) - { - $find_message = false; - - continue; - } + $message = json_decode($message, true); $this->logger->info('Try sending email : ' . json_encode($message)); @@ -92,7 +81,7 @@ class Mailer extends AbstractDaemon public function on_start() { //Set last message at to construct time - $this->mailer_queue = msg_get_queue(QUEUE_ID_EMAIL); + $this->mailer_queue = new Queue(QUEUE_ID_EMAIL); $this->logger->info('Starting Mailer daemon with pid ' . getmypid()); } @@ -101,8 +90,6 @@ class Mailer extends AbstractDaemon { //Delete queue on daemon close $this->logger->info('Closing queue : ' . QUEUE_ID_EMAIL); - msg_remove_queue($this->mailer_queue); - $this->logger->info('Stopping Mailer daemon with pid ' . getmypid()); } diff --git a/daemons/Phone.php b/daemons/Phone.php index 1348a9e..c792513 100644 --- a/daemons/Phone.php +++ b/daemons/Phone.php @@ -11,6 +11,7 @@ namespace daemons; +use controllers\internals\Queue; use Monolog\Handler\StreamHandler; use Monolog\Logger; @@ -22,7 +23,7 @@ class Phone extends AbstractDaemon private $max_inactivity = 5 * 60; private $read_delay = 20 / 0.5; private $read_tick = 0; - private $msg_queue; + private ?Queue $queue; private $webhook_queue; private $last_message_at; private $phone; @@ -85,7 +86,7 @@ class Phone extends AbstractDaemon //Set last message at to construct time $this->last_message_at = microtime(true); - $this->msg_queue = msg_get_queue(QUEUE_ID_PHONE); + $this->queue = new Queue(QUEUE_ID_PHONE); //Instanciate adapter $adapter_class = $this->phone['adapter']; @@ -96,7 +97,7 @@ class Phone extends AbstractDaemon public function on_stop() { - $this->logger->info('Stopping Phone daemon with pid ' . getmypid()); + $this->logger->info('Stopping Phone daemon with pid ' . getmypid()); } public function handle_other_signals($signal) @@ -114,30 +115,17 @@ class Phone extends AbstractDaemon $find_message = true; while ($find_message) { - //Call message - $msgtype = null; - $maxsize = 409600; - $message = null; - - // Message type is forged from a prefix concat with the phone ID $message_type = (int) QUEUE_TYPE_SEND_MSG_PREFIX . $this->phone['id']; - $error_code = null; - $success = msg_receive($this->msg_queue, $message_type, $msgtype, $maxsize, $message, true, MSG_IPC_NOWAIT, $error_code); //MSG_IPC_NOWAIT == dont wait if no message found + $message = $this->queue->read($message_type); - if (!$success && MSG_ENOMSG !== $error_code) - { - $this->logger->critical('Error reading MSG SEND Queue, error code : ' . $error_code); - - return false; - } - - if (!$message) + if ($message === null) { $find_message = false; - continue; } + $message = json_decode($message, true); + //Update last message time $this->last_message_at = microtime(true); diff --git a/daemons/Sender.php b/daemons/Sender.php index 9acb00a..d740323 100644 --- a/daemons/Sender.php +++ b/daemons/Sender.php @@ -11,6 +11,8 @@ namespace daemons; +use controllers\internals\Queue; +use Exception; use Monolog\Handler\StreamHandler; use Monolog\Logger; @@ -19,12 +21,9 @@ use Monolog\Logger; */ class Sender extends AbstractDaemon { - private $internal_phone; private $internal_scheduled; - private $internal_received; - private $internal_sended; private $bdd; - private $msg_queue; + private ?Queue $queue; public function __construct() { @@ -44,9 +43,7 @@ class Sender extends AbstractDaemon public function run() { - //Create the internal controllers $this->internal_scheduled = new \controllers\internals\Scheduled($this->bdd); - $this->internal_sended = new \controllers\internals\Sended($this->bdd); //Get smss and transmit order to send to appropriate phone daemon $smss_per_scheduled = $this->internal_scheduled->get_smss_to_send(); @@ -64,12 +61,6 @@ class Sender extends AbstractDaemon { foreach ($smss_per_scheduled as $id_scheduled => $smss) { - //If queue not already exists - if (!msg_queue_exists(QUEUE_ID_PHONE) || !isset($this->msg_queue)) - { - $this->msg_queue = msg_get_queue(QUEUE_ID_PHONE); - } - foreach ($smss as $sms) { $msg = [ @@ -84,9 +75,10 @@ class Sender extends AbstractDaemon 'medias' => $sms['medias'] ?? [], ]; + // Message type is forged from a prefix concat with the phone ID $message_type = (int) QUEUE_TYPE_SEND_MSG_PREFIX . $sms['id_phone']; - msg_send($this->msg_queue, $message_type, $msg); + $this->queue->push(json_encode($msg), $message_type); $this->logger->info('Transmit sms send signal to phone ' . $sms['id_phone'] . ' on queue ' . QUEUE_ID_PHONE . ' with message type ' . $message_type . '.'); } @@ -97,16 +89,25 @@ class Sender extends AbstractDaemon public function on_start() { - $this->logger->info('Starting Sender with pid ' . getmypid()); - $this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD); + try + { + $this->logger->info('Starting Sender with pid ' . getmypid()); + $this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD); + $this->queue = new Queue(QUEUE_ID_PHONE); + } + catch (Exception $e) + { + $this->logger->error('Failed to start sender daemon : ' . $e->getMessage()); + } + } public function on_stop() { //Delete queue on daemon close - $this->logger->info('Closing queue : ' . $this->msg_queue); - msg_remove_queue($this->msg_queue); - + $this->logger->info('Closing queue : ' . QUEUE_ID_PHONE); + $this->queue->close(); + $this->logger->info('Stopping Sender with pid ' . getmypid()); } diff --git a/daemons/Webhook.php b/daemons/Webhook.php index 180d8cf..4bce044 100644 --- a/daemons/Webhook.php +++ b/daemons/Webhook.php @@ -11,6 +11,7 @@ namespace daemons; +use controllers\internals\Queue; use GuzzleHttp\Promise\Utils; use Monolog\Handler\StreamHandler; use Monolog\Logger; @@ -20,11 +21,7 @@ use Monolog\Logger; */ class Webhook extends AbstractDaemon { - private $webhook_queue; - private $last_message_at; - private $phone; - private $adapter; - private $bdd; + private ?Queue $webhook_queue; private $guzzle_client; /** @@ -56,30 +53,17 @@ class Webhook extends AbstractDaemon $promises = []; while ($find_message) { - //Call message - $msgtype = null; - $maxsize = 409600; - $message = null; + $message = $this->webhook_queue->read(QUEUE_TYPE_WEBHOOK); - $error_code = null; - $success = msg_receive($this->webhook_queue, QUEUE_TYPE_WEBHOOK, $msgtype, $maxsize, $message, true, MSG_IPC_NOWAIT, $error_code); //MSG_IPC_NOWAIT == dont wait if no message found - if (!$success && MSG_ENOMSG !== $error_code) + if ($message === null) { - $this->logger->critical('Error for webhook queue reading, error code : ' . $error_code); $find_message = false; - continue; } - if (!$message) - { - $find_message = false; - - continue; - } - - $this->logger->info('Trigger webhook : ' . json_encode($message)); + $this->logger->info('Trigger webhook : ' . $message); + $message = json_decode($message, true); $promises[] = $this->guzzle_client->postAsync($message['url'], ['form_params' => $message['data']]); } @@ -97,10 +81,13 @@ class Webhook extends AbstractDaemon public function on_start() { - //Set last message at to construct time - $this->last_message_at = microtime(true); - - $this->webhook_queue = msg_get_queue(QUEUE_ID_WEBHOOK); + try{ + $this->webhook_queue = new Queue(QUEUE_ID_WEBHOOK); + } + catch (\Exception $e) + { + $this->logger->info('Webhook : failed with ' . $e->getMessage()); + } $this->logger->info('Starting Webhook daemon with pid ' . getmypid()); } @@ -109,7 +96,7 @@ class Webhook extends AbstractDaemon { //Delete queue on daemon close $this->logger->info('Closing queue : ' . QUEUE_ID_WEBHOOK); - msg_remove_queue($this->webhook_queue); + unset($this->webhook_queue); $this->logger->info('Stopping Webhook daemon with pid ' . getmypid()); } diff --git a/env.prod.php.dist b/env.prod.php.dist index 628b500..bf6b165 100644 --- a/env.prod.php.dist +++ b/env.prod.php.dist @@ -26,6 +26,11 @@ 'HOST' => '%APP_URL_SHORTENER_HOST%', 'USER' => '%APP_URL_SHORTENER_USER%', 'PASS' => '%APP_URL_SHORTENER_PASS%', - ] + ], + // Define if we should use a Redis instance instead of System V Queues + 'USE_REDIS_QUEUES' => %APP_USE_REDIS_QUEUES%, + 'REDIS_HOST' => '%APP_REDIS_HOST%', + 'REDIS_PORT' => '%APP_REDIS_PORT%', + 'REDIS_PASSWORD' => '%APP_REDIS_PASSWORD%', ]; diff --git a/models/Queue.php b/models/Queue.php new file mode 100644 index 0000000..d3ff025 --- /dev/null +++ b/models/Queue.php @@ -0,0 +1,39 @@ + + * + * This source file is subject to the GPL-3.0 license that is bundled + * with this source code in the file LICENSE. + */ + +namespace models; + +/** + * + */ +interface Queue +{ + /** + * A FIFO Queue to exchange messages, the backend mechanism can be whatever we want, but the queue take message, tag for routing is optionnal + * @param string $id : A unique identifier for the queue + */ + public function __construct($id); + + /** + * Add a message to the queue + * + * @param string $message : The message to add to the queue, must be a string, for complex data just use json + * @param ?string $tag : A tag to associate to the message for routing purposes, if not set will add to general queue + */ + public function push($message, ?string $tag = null); + + /** + * Read the older message in the queue (non-blocking) + * @param ?string $tag : A tag to associate to the message for routing purposes, if not set will read from general queue + * @return ?string $message : The oldest message or null if no message found, can be anything + */ + public function read(?string $tag = null); +} diff --git a/models/RedisQueue.php b/models/RedisQueue.php new file mode 100644 index 0000000..95ac92d --- /dev/null +++ b/models/RedisQueue.php @@ -0,0 +1,108 @@ + + * + * This source file is subject to the GPL-3.0 license that is bundled + * with this source code in the file LICENSE. + */ + +namespace models; + +use Exception; + +/** + * + */ +class RedisQueue implements Queue +{ + private \Redis $redis; + private $group; + private $consumer; + private $id; + + /** + * A Redis queue to store and exchange messages using redis streams + * routing is based on queue uniq id as stream name, combined with ':tag' if routing is needed, messages are stored as json + * @param string $id : A unique identifier for the queue + * @param array $redis_parameters : Parameters for the redis server, such as host, port, etc. Default to a basic local redis on port 6379 + * @param string $group : Name to use for the redis group that must read this queue, default to 'default' + * @param string $consumer : Name to use for the redis consumer in the group that must read this queue, default to 'default' + */ + public function __construct($id, $redis_parameters = [], $group = 'default', $consumer = 'default') + { + $this->id = $id; + $this->redis = new \Redis(); + $success = $this->redis->connect($redis_parameters['host'], intval($redis_parameters['port']), 1, '', 0, 0, ['auth' => $redis_parameters['auth']]); + + if (!$success) + { + throw new \Exception('Failed to connect to redis server !'); + } + + $this->group = $group; + $this->consumer = $consumer; + } + + /** + * Add a message to the queue + * + * @param string $message : The message to add to the queue + * @param ?string $tag : A tag to associate to the message for routing purposes, if null will add to general queue + */ + public function push($message, ?string $tag = null) + { + $stream = $this->id . ($tag !== null ? ":$tag" : ''); + $success = $this->redis->xAdd($stream, '*', ['message' => $message]); + + if (!$success) + { + throw new \Exception('Failed to push a message !'); + } + + return true; + } + + /** + * Read the older message in the queue + * + * @return mixed $message : The oldest message or null if no message found, can be anything + * @param ?string $tag : A tag to associate to the message for routing purposes, if null will read from general queue + * @param mixed : The message to add to the queue, can be anything, the queue will have to treat it by itself + */ + public function read(?string $tag = null) + { + $stream = $this->id . ($tag !== null ? ":$tag" : ''); + + // Create the consumer group if it doesn't already exist + try + { + $this->redis->xGroup('CREATE', $stream, $this->group, '$', true); + } + catch (Exception $e) + { + // Ignore error if the group already exists + } + + // Read a single message starting from the oldest (>) + $messages = $this->redis->xReadGroup($this->group, $this->consumer, [$stream => '>'], 1); + if (!count($messages)) + { + return null; + } + + // Find the message, acknowledge it and return it + foreach ($messages as $stream_name => $entries) + { + foreach ($entries as $message_id => $message) + { + $success = $this->redis->xAck($stream, $this->group, [$message_id]); + return $message['message']; + } + } + + return null; + } +} diff --git a/models/SystemVQueue.php b/models/SystemVQueue.php new file mode 100644 index 0000000..8a9a782 --- /dev/null +++ b/models/SystemVQueue.php @@ -0,0 +1,115 @@ + + * + * This source file is subject to the GPL-3.0 license that is bundled + * with this source code in the file LICENSE. + */ + +namespace models; + +/** + * + */ +class SystemVQueue implements Queue +{ + private $id; + private $queue; + + /** + * A queue using System V message queues to store and exchange messages + * routing is based on queue id and message type + * + * ** Attention : Instead of string, all ids and tags must be numbers, its the system v queues works, no reliable way arround it** + * @param int $id : A unique identifier for the queue, *this must be generated with ftok* + + */ + public function __construct($id) + { + $this->id = (int) $id; + } + + /** + * Function to close the system v queue on destruction + */ + public function close() + { + if ($this->queue) + { + msg_remove_queue($this->queue); + } + } + + /** + * Function to get the message queue and ensure it is open, we should always call it during push/read just to + * make sure another process didn't close the queue + */ + private function get_queue() + { + $this->queue = msg_get_queue($this->id); + + if (!$this->queue) + { + throw new \Exception('Impossible to get a System V message queue for id ' . $this->id); + } + } + + /** + * Add a message to the queue + * + * @param string $message : The message to add to the queue + * @param ?string $tag : A tag to associate to the message for routing purposes. + * Though this is a string, we MUST pass a valid number, its the way System V queue works + */ + public function push($message, ?string $tag = '0') + { + $tag = (int) $tag; + + $this->get_queue(); + $error_code = null; + $success = msg_send($this->queue, $tag, $message, true, false, $error_code); + if (!$success) + { + throw new \Exception('Impossible to send the message on system V queue, error code : ' . $error_code); + } + + return true; + } + + /** + * Read the older message in the queue + * + * @param ?string $tag : A tag to associate to the message for routing purposes + * Though this is a string, we MUST pass a valid number, its the way System V queue works + * + * @return mixed $message : The oldest message or null if no message found, can be anything + */ + public function read(?string $tag = '0') + { + $tag = (int) $tag; + + $msgtype = null; + $maxsize = 409600; + $message = null; + + // Message type is forged from a prefix concat with the phone ID + $error_code = null; + $this->get_queue(); + $success = msg_receive($this->queue, $tag, $msgtype, $maxsize, $message, true, MSG_IPC_NOWAIT, $error_code); //MSG_IPC_NOWAIT == dont wait if no message found + + if (!$success && MSG_ENOMSG !== $error_code) + { + throw new \Exception('Impossible to read messages on system V queue, error code : ' . $error_code); + } + + if (!$message) + { + return null; + } + + return $message; + } +}