Add support for Redis in addition to System V queues
This commit is contained in:
parent
5c697b5240
commit
36c5d7ec0c
|
@ -117,11 +117,10 @@ class Mailer extends \descartes\Controller
|
||||||
'attachments' => $attachments,
|
'attachments' => $attachments,
|
||||||
];
|
];
|
||||||
|
|
||||||
$error_code = null;
|
$queue = new Queue(QUEUE_ID_EMAIL);
|
||||||
$queue = msg_get_queue(QUEUE_ID_EMAIL);
|
$queue->push(json_encode($message), QUEUE_TYPE_EMAIL);
|
||||||
$success = msg_send($queue, QUEUE_TYPE_EMAIL, $message, true, true, $error_code);
|
|
||||||
|
|
||||||
return (bool) $success;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of RaspiSMS.
|
||||||
|
*
|
||||||
|
* (c) Pierre-Lin Bonnemaison <plebwebsas@gmail.com>
|
||||||
|
*
|
||||||
|
* This source file is subject to the GPL-3.0 license that is bundled
|
||||||
|
* with this source code in the file LICENSE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace controllers\internals;
|
||||||
|
|
||||||
|
use Exception;
|
||||||
|
use models\RedisQueue;
|
||||||
|
use models\SystemVQueue;
|
||||||
|
|
||||||
|
class Queue extends \descartes\InternalController
|
||||||
|
{
|
||||||
|
private $queue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A class to interact with queue, the class is in charge to choose the type of queue (redis/system v) to use
|
||||||
|
*/
|
||||||
|
public function __construct($id)
|
||||||
|
{
|
||||||
|
if (USE_REDIS_QUEUES ?? false)
|
||||||
|
{
|
||||||
|
$params = [];
|
||||||
|
if (REDIS_HOST ?? false)
|
||||||
|
{
|
||||||
|
$params['host'] = REDIS_HOST;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (REDIS_PORT ?? false)
|
||||||
|
{
|
||||||
|
$params['port'] = REDIS_PORT;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (REDIS_PASSWORD ?? false)
|
||||||
|
{
|
||||||
|
$params['auth'] = REDIS_PASSWORD;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->queue = new RedisQueue($id, $params, 'raspisms', 'raspisms');
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
$this->queue = new SystemVQueue($id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a message to the queue
|
||||||
|
*
|
||||||
|
* @param string $message : The message to add to the queue
|
||||||
|
* @param ?string $tag : A tag to associate to the message for routing purposes, if null will add to general queue
|
||||||
|
*/
|
||||||
|
public function push($message, ?string $tag = null)
|
||||||
|
{
|
||||||
|
return $this->queue->push($message, $tag);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the older message in the queue
|
||||||
|
*
|
||||||
|
* @return mixed $message : The oldest message or null if no message found, can be anything
|
||||||
|
* @param ?string $tag : A tag to associate to the message for routing purposes, if null will read from general queue
|
||||||
|
* @param mixed : The message to add to the queue, can be anything, the queue will have to treat it by itself
|
||||||
|
*/
|
||||||
|
public function read(?string $tag = null)
|
||||||
|
{
|
||||||
|
return $this->queue->read($tag);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Function to close system V queue for cleaning resources, usefull only if system V queue
|
||||||
|
*/
|
||||||
|
public function close()
|
||||||
|
{
|
||||||
|
if ($this->queue instanceof SystemVQueue)
|
||||||
|
{
|
||||||
|
$this->queue->close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -135,12 +135,11 @@ class Webhook extends StandardController
|
||||||
],
|
],
|
||||||
];
|
];
|
||||||
|
|
||||||
$error_code = null;
|
$queue = new Queue(QUEUE_ID_WEBHOOK);
|
||||||
$queue = msg_get_queue(QUEUE_ID_WEBHOOK);
|
$success = $queue->push(json_encode($message), QUEUE_TYPE_WEBHOOK);
|
||||||
msg_send($queue, QUEUE_TYPE_WEBHOOK, $message, true, true, $error_code);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return (bool) $success;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
|
|
||||||
namespace daemons;
|
namespace daemons;
|
||||||
|
|
||||||
|
use controllers\internals\Queue;
|
||||||
use Monolog\Handler\StreamHandler;
|
use Monolog\Handler\StreamHandler;
|
||||||
use Monolog\Logger;
|
use Monolog\Logger;
|
||||||
|
|
||||||
|
@ -19,7 +20,7 @@ use Monolog\Logger;
|
||||||
*/
|
*/
|
||||||
class Mailer extends AbstractDaemon
|
class Mailer extends AbstractDaemon
|
||||||
{
|
{
|
||||||
private $mailer_queue;
|
private ?Queue $mailer_queue;
|
||||||
private $last_message_at;
|
private $last_message_at;
|
||||||
private $bdd;
|
private $bdd;
|
||||||
|
|
||||||
|
@ -49,27 +50,15 @@ class Mailer extends AbstractDaemon
|
||||||
$find_message = true;
|
$find_message = true;
|
||||||
while ($find_message)
|
while ($find_message)
|
||||||
{
|
{
|
||||||
//Call message
|
$message = $this->mailer_queue->read(QUEUE_TYPE_EMAIL);
|
||||||
$msgtype = null;
|
|
||||||
$maxsize = 409600;
|
|
||||||
$message = null;
|
|
||||||
|
|
||||||
$error_code = null;
|
if ($message === null)
|
||||||
$success = msg_receive($this->mailer_queue, QUEUE_TYPE_EMAIL, $msgtype, $maxsize, $message, true, MSG_IPC_NOWAIT, $error_code); //MSG_IPC_NOWAIT == dont wait if no message found
|
|
||||||
if (!$success && MSG_ENOMSG !== $error_code)
|
|
||||||
{
|
{
|
||||||
$this->logger->critical('Error for mailer queue reading, error code : ' . $error_code);
|
|
||||||
$find_message = false;
|
$find_message = false;
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$message)
|
$message = json_decode($message, true);
|
||||||
{
|
|
||||||
$find_message = false;
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->logger->info('Try sending email : ' . json_encode($message));
|
$this->logger->info('Try sending email : ' . json_encode($message));
|
||||||
|
|
||||||
|
@ -92,7 +81,7 @@ class Mailer extends AbstractDaemon
|
||||||
public function on_start()
|
public function on_start()
|
||||||
{
|
{
|
||||||
//Set last message at to construct time
|
//Set last message at to construct time
|
||||||
$this->mailer_queue = msg_get_queue(QUEUE_ID_EMAIL);
|
$this->mailer_queue = new Queue(QUEUE_ID_EMAIL);
|
||||||
|
|
||||||
$this->logger->info('Starting Mailer daemon with pid ' . getmypid());
|
$this->logger->info('Starting Mailer daemon with pid ' . getmypid());
|
||||||
}
|
}
|
||||||
|
@ -101,8 +90,6 @@ class Mailer extends AbstractDaemon
|
||||||
{
|
{
|
||||||
//Delete queue on daemon close
|
//Delete queue on daemon close
|
||||||
$this->logger->info('Closing queue : ' . QUEUE_ID_EMAIL);
|
$this->logger->info('Closing queue : ' . QUEUE_ID_EMAIL);
|
||||||
msg_remove_queue($this->mailer_queue);
|
|
||||||
|
|
||||||
$this->logger->info('Stopping Mailer daemon with pid ' . getmypid());
|
$this->logger->info('Stopping Mailer daemon with pid ' . getmypid());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
|
|
||||||
namespace daemons;
|
namespace daemons;
|
||||||
|
|
||||||
|
use controllers\internals\Queue;
|
||||||
use Monolog\Handler\StreamHandler;
|
use Monolog\Handler\StreamHandler;
|
||||||
use Monolog\Logger;
|
use Monolog\Logger;
|
||||||
|
|
||||||
|
@ -22,7 +23,7 @@ class Phone extends AbstractDaemon
|
||||||
private $max_inactivity = 5 * 60;
|
private $max_inactivity = 5 * 60;
|
||||||
private $read_delay = 20 / 0.5;
|
private $read_delay = 20 / 0.5;
|
||||||
private $read_tick = 0;
|
private $read_tick = 0;
|
||||||
private $msg_queue;
|
private ?Queue $queue;
|
||||||
private $webhook_queue;
|
private $webhook_queue;
|
||||||
private $last_message_at;
|
private $last_message_at;
|
||||||
private $phone;
|
private $phone;
|
||||||
|
@ -85,7 +86,7 @@ class Phone extends AbstractDaemon
|
||||||
//Set last message at to construct time
|
//Set last message at to construct time
|
||||||
$this->last_message_at = microtime(true);
|
$this->last_message_at = microtime(true);
|
||||||
|
|
||||||
$this->msg_queue = msg_get_queue(QUEUE_ID_PHONE);
|
$this->queue = new Queue(QUEUE_ID_PHONE);
|
||||||
|
|
||||||
//Instanciate adapter
|
//Instanciate adapter
|
||||||
$adapter_class = $this->phone['adapter'];
|
$adapter_class = $this->phone['adapter'];
|
||||||
|
@ -96,7 +97,7 @@ class Phone extends AbstractDaemon
|
||||||
|
|
||||||
public function on_stop()
|
public function on_stop()
|
||||||
{
|
{
|
||||||
$this->logger->info('Stopping Phone daemon with pid ' . getmypid());
|
$this->logger->info('Stopping Phone daemon with pid ' . getmypid());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function handle_other_signals($signal)
|
public function handle_other_signals($signal)
|
||||||
|
@ -114,30 +115,17 @@ class Phone extends AbstractDaemon
|
||||||
$find_message = true;
|
$find_message = true;
|
||||||
while ($find_message)
|
while ($find_message)
|
||||||
{
|
{
|
||||||
//Call message
|
|
||||||
$msgtype = null;
|
|
||||||
$maxsize = 409600;
|
|
||||||
$message = null;
|
|
||||||
|
|
||||||
// Message type is forged from a prefix concat with the phone ID
|
|
||||||
$message_type = (int) QUEUE_TYPE_SEND_MSG_PREFIX . $this->phone['id'];
|
$message_type = (int) QUEUE_TYPE_SEND_MSG_PREFIX . $this->phone['id'];
|
||||||
$error_code = null;
|
$message = $this->queue->read($message_type);
|
||||||
$success = msg_receive($this->msg_queue, $message_type, $msgtype, $maxsize, $message, true, MSG_IPC_NOWAIT, $error_code); //MSG_IPC_NOWAIT == dont wait if no message found
|
|
||||||
|
|
||||||
if (!$success && MSG_ENOMSG !== $error_code)
|
if ($message === null)
|
||||||
{
|
|
||||||
$this->logger->critical('Error reading MSG SEND Queue, error code : ' . $error_code);
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!$message)
|
|
||||||
{
|
{
|
||||||
$find_message = false;
|
$find_message = false;
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$message = json_decode($message, true);
|
||||||
|
|
||||||
//Update last message time
|
//Update last message time
|
||||||
$this->last_message_at = microtime(true);
|
$this->last_message_at = microtime(true);
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,8 @@
|
||||||
|
|
||||||
namespace daemons;
|
namespace daemons;
|
||||||
|
|
||||||
|
use controllers\internals\Queue;
|
||||||
|
use Exception;
|
||||||
use Monolog\Handler\StreamHandler;
|
use Monolog\Handler\StreamHandler;
|
||||||
use Monolog\Logger;
|
use Monolog\Logger;
|
||||||
|
|
||||||
|
@ -19,12 +21,9 @@ use Monolog\Logger;
|
||||||
*/
|
*/
|
||||||
class Sender extends AbstractDaemon
|
class Sender extends AbstractDaemon
|
||||||
{
|
{
|
||||||
private $internal_phone;
|
|
||||||
private $internal_scheduled;
|
private $internal_scheduled;
|
||||||
private $internal_received;
|
|
||||||
private $internal_sended;
|
|
||||||
private $bdd;
|
private $bdd;
|
||||||
private $msg_queue;
|
private ?Queue $queue;
|
||||||
|
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{
|
{
|
||||||
|
@ -44,9 +43,7 @@ class Sender extends AbstractDaemon
|
||||||
|
|
||||||
public function run()
|
public function run()
|
||||||
{
|
{
|
||||||
//Create the internal controllers
|
|
||||||
$this->internal_scheduled = new \controllers\internals\Scheduled($this->bdd);
|
$this->internal_scheduled = new \controllers\internals\Scheduled($this->bdd);
|
||||||
$this->internal_sended = new \controllers\internals\Sended($this->bdd);
|
|
||||||
|
|
||||||
//Get smss and transmit order to send to appropriate phone daemon
|
//Get smss and transmit order to send to appropriate phone daemon
|
||||||
$smss_per_scheduled = $this->internal_scheduled->get_smss_to_send();
|
$smss_per_scheduled = $this->internal_scheduled->get_smss_to_send();
|
||||||
|
@ -64,12 +61,6 @@ class Sender extends AbstractDaemon
|
||||||
{
|
{
|
||||||
foreach ($smss_per_scheduled as $id_scheduled => $smss)
|
foreach ($smss_per_scheduled as $id_scheduled => $smss)
|
||||||
{
|
{
|
||||||
//If queue not already exists
|
|
||||||
if (!msg_queue_exists(QUEUE_ID_PHONE) || !isset($this->msg_queue))
|
|
||||||
{
|
|
||||||
$this->msg_queue = msg_get_queue(QUEUE_ID_PHONE);
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach ($smss as $sms)
|
foreach ($smss as $sms)
|
||||||
{
|
{
|
||||||
$msg = [
|
$msg = [
|
||||||
|
@ -84,9 +75,10 @@ class Sender extends AbstractDaemon
|
||||||
'medias' => $sms['medias'] ?? [],
|
'medias' => $sms['medias'] ?? [],
|
||||||
];
|
];
|
||||||
|
|
||||||
|
|
||||||
// Message type is forged from a prefix concat with the phone ID
|
// Message type is forged from a prefix concat with the phone ID
|
||||||
$message_type = (int) QUEUE_TYPE_SEND_MSG_PREFIX . $sms['id_phone'];
|
$message_type = (int) QUEUE_TYPE_SEND_MSG_PREFIX . $sms['id_phone'];
|
||||||
msg_send($this->msg_queue, $message_type, $msg);
|
$this->queue->push(json_encode($msg), $message_type);
|
||||||
$this->logger->info('Transmit sms send signal to phone ' . $sms['id_phone'] . ' on queue ' . QUEUE_ID_PHONE . ' with message type ' . $message_type . '.');
|
$this->logger->info('Transmit sms send signal to phone ' . $sms['id_phone'] . ' on queue ' . QUEUE_ID_PHONE . ' with message type ' . $message_type . '.');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,16 +89,25 @@ class Sender extends AbstractDaemon
|
||||||
|
|
||||||
public function on_start()
|
public function on_start()
|
||||||
{
|
{
|
||||||
$this->logger->info('Starting Sender with pid ' . getmypid());
|
try
|
||||||
$this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD);
|
{
|
||||||
|
$this->logger->info('Starting Sender with pid ' . getmypid());
|
||||||
|
$this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD);
|
||||||
|
$this->queue = new Queue(QUEUE_ID_PHONE);
|
||||||
|
}
|
||||||
|
catch (Exception $e)
|
||||||
|
{
|
||||||
|
$this->logger->error('Failed to start sender daemon : ' . $e->getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function on_stop()
|
public function on_stop()
|
||||||
{
|
{
|
||||||
//Delete queue on daemon close
|
//Delete queue on daemon close
|
||||||
$this->logger->info('Closing queue : ' . $this->msg_queue);
|
$this->logger->info('Closing queue : ' . QUEUE_ID_PHONE);
|
||||||
msg_remove_queue($this->msg_queue);
|
$this->queue->close();
|
||||||
|
|
||||||
$this->logger->info('Stopping Sender with pid ' . getmypid());
|
$this->logger->info('Stopping Sender with pid ' . getmypid());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
|
|
||||||
namespace daemons;
|
namespace daemons;
|
||||||
|
|
||||||
|
use controllers\internals\Queue;
|
||||||
use GuzzleHttp\Promise\Utils;
|
use GuzzleHttp\Promise\Utils;
|
||||||
use Monolog\Handler\StreamHandler;
|
use Monolog\Handler\StreamHandler;
|
||||||
use Monolog\Logger;
|
use Monolog\Logger;
|
||||||
|
@ -20,11 +21,7 @@ use Monolog\Logger;
|
||||||
*/
|
*/
|
||||||
class Webhook extends AbstractDaemon
|
class Webhook extends AbstractDaemon
|
||||||
{
|
{
|
||||||
private $webhook_queue;
|
private ?Queue $webhook_queue;
|
||||||
private $last_message_at;
|
|
||||||
private $phone;
|
|
||||||
private $adapter;
|
|
||||||
private $bdd;
|
|
||||||
private $guzzle_client;
|
private $guzzle_client;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -56,30 +53,17 @@ class Webhook extends AbstractDaemon
|
||||||
$promises = [];
|
$promises = [];
|
||||||
while ($find_message)
|
while ($find_message)
|
||||||
{
|
{
|
||||||
//Call message
|
$message = $this->webhook_queue->read(QUEUE_TYPE_WEBHOOK);
|
||||||
$msgtype = null;
|
|
||||||
$maxsize = 409600;
|
|
||||||
$message = null;
|
|
||||||
|
|
||||||
$error_code = null;
|
if ($message === null)
|
||||||
$success = msg_receive($this->webhook_queue, QUEUE_TYPE_WEBHOOK, $msgtype, $maxsize, $message, true, MSG_IPC_NOWAIT, $error_code); //MSG_IPC_NOWAIT == dont wait if no message found
|
|
||||||
if (!$success && MSG_ENOMSG !== $error_code)
|
|
||||||
{
|
{
|
||||||
$this->logger->critical('Error for webhook queue reading, error code : ' . $error_code);
|
|
||||||
$find_message = false;
|
$find_message = false;
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$message)
|
$this->logger->info('Trigger webhook : ' . $message);
|
||||||
{
|
|
||||||
$find_message = false;
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->logger->info('Trigger webhook : ' . json_encode($message));
|
|
||||||
|
|
||||||
|
$message = json_decode($message, true);
|
||||||
$promises[] = $this->guzzle_client->postAsync($message['url'], ['form_params' => $message['data']]);
|
$promises[] = $this->guzzle_client->postAsync($message['url'], ['form_params' => $message['data']]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,10 +81,13 @@ class Webhook extends AbstractDaemon
|
||||||
|
|
||||||
public function on_start()
|
public function on_start()
|
||||||
{
|
{
|
||||||
//Set last message at to construct time
|
try{
|
||||||
$this->last_message_at = microtime(true);
|
$this->webhook_queue = new Queue(QUEUE_ID_WEBHOOK);
|
||||||
|
}
|
||||||
$this->webhook_queue = msg_get_queue(QUEUE_ID_WEBHOOK);
|
catch (\Exception $e)
|
||||||
|
{
|
||||||
|
$this->logger->info('Webhook : failed with ' . $e->getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
$this->logger->info('Starting Webhook daemon with pid ' . getmypid());
|
$this->logger->info('Starting Webhook daemon with pid ' . getmypid());
|
||||||
}
|
}
|
||||||
|
@ -109,7 +96,7 @@ class Webhook extends AbstractDaemon
|
||||||
{
|
{
|
||||||
//Delete queue on daemon close
|
//Delete queue on daemon close
|
||||||
$this->logger->info('Closing queue : ' . QUEUE_ID_WEBHOOK);
|
$this->logger->info('Closing queue : ' . QUEUE_ID_WEBHOOK);
|
||||||
msg_remove_queue($this->webhook_queue);
|
unset($this->webhook_queue);
|
||||||
|
|
||||||
$this->logger->info('Stopping Webhook daemon with pid ' . getmypid());
|
$this->logger->info('Stopping Webhook daemon with pid ' . getmypid());
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,11 @@
|
||||||
'HOST' => '%APP_URL_SHORTENER_HOST%',
|
'HOST' => '%APP_URL_SHORTENER_HOST%',
|
||||||
'USER' => '%APP_URL_SHORTENER_USER%',
|
'USER' => '%APP_URL_SHORTENER_USER%',
|
||||||
'PASS' => '%APP_URL_SHORTENER_PASS%',
|
'PASS' => '%APP_URL_SHORTENER_PASS%',
|
||||||
]
|
],
|
||||||
|
|
||||||
|
// Define if we should use a Redis instance instead of System V Queues
|
||||||
|
'USE_REDIS_QUEUES' => %APP_USE_REDIS_QUEUES%,
|
||||||
|
'REDIS_HOST' => '%APP_REDIS_HOST%',
|
||||||
|
'REDIS_PORT' => '%APP_REDIS_PORT%',
|
||||||
|
'REDIS_PASSWORD' => '%APP_REDIS_PASSWORD%',
|
||||||
];
|
];
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of RaspiSMS.
|
||||||
|
*
|
||||||
|
* (c) Pierre-Lin Bonnemaison <plebwebsas@gmail.com>
|
||||||
|
*
|
||||||
|
* This source file is subject to the GPL-3.0 license that is bundled
|
||||||
|
* with this source code in the file LICENSE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace models;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
interface Queue
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* A FIFO Queue to exchange messages, the backend mechanism can be whatever we want, but the queue take message, tag for routing is optionnal
|
||||||
|
* @param string $id : A unique identifier for the queue
|
||||||
|
*/
|
||||||
|
public function __construct($id);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a message to the queue
|
||||||
|
*
|
||||||
|
* @param string $message : The message to add to the queue, must be a string, for complex data just use json
|
||||||
|
* @param ?string $tag : A tag to associate to the message for routing purposes, if not set will add to general queue
|
||||||
|
*/
|
||||||
|
public function push($message, ?string $tag = null);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the older message in the queue (non-blocking)
|
||||||
|
* @param ?string $tag : A tag to associate to the message for routing purposes, if not set will read from general queue
|
||||||
|
* @return ?string $message : The oldest message or null if no message found, can be anything
|
||||||
|
*/
|
||||||
|
public function read(?string $tag = null);
|
||||||
|
}
|
|
@ -0,0 +1,108 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of RaspiSMS.
|
||||||
|
*
|
||||||
|
* (c) Pierre-Lin Bonnemaison <plebwebsas@gmail.com>
|
||||||
|
*
|
||||||
|
* This source file is subject to the GPL-3.0 license that is bundled
|
||||||
|
* with this source code in the file LICENSE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace models;
|
||||||
|
|
||||||
|
use Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class RedisQueue implements Queue
|
||||||
|
{
|
||||||
|
private \Redis $redis;
|
||||||
|
private $group;
|
||||||
|
private $consumer;
|
||||||
|
private $id;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Redis queue to store and exchange messages using redis streams
|
||||||
|
* routing is based on queue uniq id as stream name, combined with ':tag' if routing is needed, messages are stored as json
|
||||||
|
* @param string $id : A unique identifier for the queue
|
||||||
|
* @param array $redis_parameters : Parameters for the redis server, such as host, port, etc. Default to a basic local redis on port 6379
|
||||||
|
* @param string $group : Name to use for the redis group that must read this queue, default to 'default'
|
||||||
|
* @param string $consumer : Name to use for the redis consumer in the group that must read this queue, default to 'default'
|
||||||
|
*/
|
||||||
|
public function __construct($id, $redis_parameters = [], $group = 'default', $consumer = 'default')
|
||||||
|
{
|
||||||
|
$this->id = $id;
|
||||||
|
$this->redis = new \Redis();
|
||||||
|
$success = $this->redis->connect($redis_parameters['host'], intval($redis_parameters['port']), 1, '', 0, 0, ['auth' => $redis_parameters['auth']]);
|
||||||
|
|
||||||
|
if (!$success)
|
||||||
|
{
|
||||||
|
throw new \Exception('Failed to connect to redis server !');
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->group = $group;
|
||||||
|
$this->consumer = $consumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a message to the queue
|
||||||
|
*
|
||||||
|
* @param string $message : The message to add to the queue
|
||||||
|
* @param ?string $tag : A tag to associate to the message for routing purposes, if null will add to general queue
|
||||||
|
*/
|
||||||
|
public function push($message, ?string $tag = null)
|
||||||
|
{
|
||||||
|
$stream = $this->id . ($tag !== null ? ":$tag" : '');
|
||||||
|
$success = $this->redis->xAdd($stream, '*', ['message' => $message]);
|
||||||
|
|
||||||
|
if (!$success)
|
||||||
|
{
|
||||||
|
throw new \Exception('Failed to push a message !');
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the older message in the queue
|
||||||
|
*
|
||||||
|
* @return mixed $message : The oldest message or null if no message found, can be anything
|
||||||
|
* @param ?string $tag : A tag to associate to the message for routing purposes, if null will read from general queue
|
||||||
|
* @param mixed : The message to add to the queue, can be anything, the queue will have to treat it by itself
|
||||||
|
*/
|
||||||
|
public function read(?string $tag = null)
|
||||||
|
{
|
||||||
|
$stream = $this->id . ($tag !== null ? ":$tag" : '');
|
||||||
|
|
||||||
|
// Create the consumer group if it doesn't already exist
|
||||||
|
try
|
||||||
|
{
|
||||||
|
$this->redis->xGroup('CREATE', $stream, $this->group, '$', true);
|
||||||
|
}
|
||||||
|
catch (Exception $e)
|
||||||
|
{
|
||||||
|
// Ignore error if the group already exists
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read a single message starting from the oldest (>)
|
||||||
|
$messages = $this->redis->xReadGroup($this->group, $this->consumer, [$stream => '>'], 1);
|
||||||
|
if (!count($messages))
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the message, acknowledge it and return it
|
||||||
|
foreach ($messages as $stream_name => $entries)
|
||||||
|
{
|
||||||
|
foreach ($entries as $message_id => $message)
|
||||||
|
{
|
||||||
|
$success = $this->redis->xAck($stream, $this->group, [$message_id]);
|
||||||
|
return $message['message'];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,115 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of RaspiSMS.
|
||||||
|
*
|
||||||
|
* (c) Pierre-Lin Bonnemaison <plebwebsas@gmail.com>
|
||||||
|
*
|
||||||
|
* This source file is subject to the GPL-3.0 license that is bundled
|
||||||
|
* with this source code in the file LICENSE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace models;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class SystemVQueue implements Queue
|
||||||
|
{
|
||||||
|
private $id;
|
||||||
|
private $queue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A queue using System V message queues to store and exchange messages
|
||||||
|
* routing is based on queue id and message type
|
||||||
|
*
|
||||||
|
* ** Attention : Instead of string, all ids and tags must be numbers, its the system v queues works, no reliable way arround it**
|
||||||
|
* @param int $id : A unique identifier for the queue, *this must be generated with ftok*
|
||||||
|
|
||||||
|
*/
|
||||||
|
public function __construct($id)
|
||||||
|
{
|
||||||
|
$this->id = (int) $id;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Function to close the system v queue on destruction
|
||||||
|
*/
|
||||||
|
public function close()
|
||||||
|
{
|
||||||
|
if ($this->queue)
|
||||||
|
{
|
||||||
|
msg_remove_queue($this->queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Function to get the message queue and ensure it is open, we should always call it during push/read just to
|
||||||
|
* make sure another process didn't close the queue
|
||||||
|
*/
|
||||||
|
private function get_queue()
|
||||||
|
{
|
||||||
|
$this->queue = msg_get_queue($this->id);
|
||||||
|
|
||||||
|
if (!$this->queue)
|
||||||
|
{
|
||||||
|
throw new \Exception('Impossible to get a System V message queue for id ' . $this->id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a message to the queue
|
||||||
|
*
|
||||||
|
* @param string $message : The message to add to the queue
|
||||||
|
* @param ?string $tag : A tag to associate to the message for routing purposes.
|
||||||
|
* Though this is a string, we MUST pass a valid number, its the way System V queue works
|
||||||
|
*/
|
||||||
|
public function push($message, ?string $tag = '0')
|
||||||
|
{
|
||||||
|
$tag = (int) $tag;
|
||||||
|
|
||||||
|
$this->get_queue();
|
||||||
|
$error_code = null;
|
||||||
|
$success = msg_send($this->queue, $tag, $message, true, false, $error_code);
|
||||||
|
if (!$success)
|
||||||
|
{
|
||||||
|
throw new \Exception('Impossible to send the message on system V queue, error code : ' . $error_code);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the older message in the queue
|
||||||
|
*
|
||||||
|
* @param ?string $tag : A tag to associate to the message for routing purposes
|
||||||
|
* Though this is a string, we MUST pass a valid number, its the way System V queue works
|
||||||
|
*
|
||||||
|
* @return mixed $message : The oldest message or null if no message found, can be anything
|
||||||
|
*/
|
||||||
|
public function read(?string $tag = '0')
|
||||||
|
{
|
||||||
|
$tag = (int) $tag;
|
||||||
|
|
||||||
|
$msgtype = null;
|
||||||
|
$maxsize = 409600;
|
||||||
|
$message = null;
|
||||||
|
|
||||||
|
// Message type is forged from a prefix concat with the phone ID
|
||||||
|
$error_code = null;
|
||||||
|
$this->get_queue();
|
||||||
|
$success = msg_receive($this->queue, $tag, $msgtype, $maxsize, $message, true, MSG_IPC_NOWAIT, $error_code); //MSG_IPC_NOWAIT == dont wait if no message found
|
||||||
|
|
||||||
|
if (!$success && MSG_ENOMSG !== $error_code)
|
||||||
|
{
|
||||||
|
throw new \Exception('Impossible to read messages on system V queue, error code : ' . $error_code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$message)
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $message;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue