From b5a36b1169376f358be449704a773a561a3dd442 Mon Sep 17 00:00:00 2001 From: osaajani Date: Tue, 7 Jan 2020 01:31:34 +0100 Subject: [PATCH] Separate daemons and responsabilities insides thoses daemons. Add webhook & commands, still to test --- adapters/AdapterInterface.php | 5 +- adapters/OvhSmsAdapter.php | 3 +- adapters/TestAdapter.php | 3 +- controllers/internals/Console.php | 42 ++++++- daemons/Manager.php | 129 ++++++++++++++++++++ daemons/Phone.php | 189 +++++++++++++++++++++++++----- daemons/Sender.php | 103 ++++++++++++++++ daemons/Server.php | 141 ---------------------- daemons/Webhook.php | 99 ++++++++++++++++ 9 files changed, 530 insertions(+), 184 deletions(-) create mode 100644 daemons/Manager.php create mode 100644 daemons/Sender.php delete mode 100644 daemons/Server.php create mode 100644 daemons/Webhook.php diff --git a/adapters/AdapterInterface.php b/adapters/AdapterInterface.php index 01ecc5a..d3429f9 100644 --- a/adapters/AdapterInterface.php +++ b/adapters/AdapterInterface.php @@ -57,9 +57,8 @@ /** - * Method called to read SMSs of the number - * @param float $since : Unix microtime representation of the date from wich we want to read the SMSs + * Method called to read unread SMSs of the number * @return array : Array of the sms reads */ - public function read (float $since) : array; + public function read () : array; } diff --git a/adapters/OvhSmsAdapter.php b/adapters/OvhSmsAdapter.php index 0592697..fb04367 100644 --- a/adapters/OvhSmsAdapter.php +++ b/adapters/OvhSmsAdapter.php @@ -76,10 +76,9 @@ /** * Method called to read SMSs of the number - * @param float $since : Unix microtime representation of the date from wich we want to read the SMSs * @return array : Array of the sms reads */ - public function read (float $since) : array + public function read () : array { return []; } diff --git a/adapters/TestAdapter.php b/adapters/TestAdapter.php index 2c96206..760d313 100644 --- a/adapters/TestAdapter.php +++ b/adapters/TestAdapter.php @@ -76,10 +76,9 @@ /** * Method called to read SMSs of the number - * @param float $since : Unix microtime representation of the date from wich we want to read the SMSs * @return array : Array of the sms reads */ - public function read (float $since) : array + public function read () : array { return []; } diff --git a/controllers/internals/Console.php b/controllers/internals/Console.php index 9577164..23e9ade 100755 --- a/controllers/internals/Console.php +++ b/controllers/internals/Console.php @@ -16,14 +16,48 @@ namespace controllers\internals; */ class Console extends \descartes\InternalController { - public function server () + /** + * Start manager daemon + */ + public function manager () { - $server = new \daemons\Server(); + new \daemons\Manager(); } - public function phone ($number) + /** + * Start sender daemon + */ + public function sender () { - $server = new \daemons\Phone($number); + new \daemons\Sender(); + } + + + /** + * Start webhook daemon + */ + public function webhook () + { + new \daemons\Webhook(); + } + + + /** + * Start a phone daemon + * @param $id_phone : Phone id + */ + public function phone ($id_phone) + { + $bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8'); + $internal_phone = new \controllers\internals\Phone($bdd); + + $phone = $internal_phone->get($id_phone); + if (!$phone) + { + return false; + } + + new \daemons\Phone($phone); } } diff --git a/daemons/Manager.php b/daemons/Manager.php new file mode 100644 index 0000000..c6bf9b0 --- /dev/null +++ b/daemons/Manager.php @@ -0,0 +1,129 @@ +pushHandler(new StreamHandler(PWD_LOGS . '/raspisms.log', Logger::DEBUG)); + + $name = "RaspiSMS Daemon Manager"; + $pid_dir = PWD_PID; + $additional_signals = []; + $uniq = true; //Main server should be uniq + + //Construct the server and add SIGUSR1 and SIGUSR2 + parent::__construct($name, $logger, $pid_dir, $additional_signals, $uniq); + + parent::start(); + } + + + public function run() + { + //Create the internal controllers + $this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8'); + $this->internal_phone = new \controllers\internals\Phone($this->bdd); + + + $this->start_sender_daemon(); + + $this->start_webhook_daemon(); + + $phones = $this->internal_phone->get_all(); + $this->start_phones_daemons($phones); + + sleep(1); + } + + + /** + * Function to start sender daemon + * @return void + */ + public function start_sender_daemon () + { + $name = 'RaspiSMS Daemon Sender'; + $pid_file = PWD_PID . '/' . $name . '.pid'; + + if (file_exists($pid_file)) + { + return false; + } + + //Create a new daemon for sender + exec('php ' . PWD . '/console.php controllers/internals/Console.php sender > /dev/null &'); + } + + + /** + * Function to start webhook daemon + * @return void + */ + public function start_webhook_daemon () + { + $name = 'RaspiSMS Daemon Webhook'; + $pid_file = PWD_PID . '/' . $name . '.pid'; + + if (file_exists($pid_file)) + { + return false; + } + + //Create a new daemon for webhook + exec('php ' . PWD . '/console.php controllers/internals/Console.php webhook > /dev/null &'); + } + + + /** + * Function to start phones daemons + * @param array $phones : Phones to start daemon for if the daemon is not already started + * @return void + */ + public function start_phones_daemons (array $phones) + { + foreach ($phones as $phone) + { + $phone_name = 'RaspiSMS Daemon Phone ' . $phone['number']; + $pid_file = PWD_PID . '/' . $phone_name . '.pid'; + + if (file_exists($pid_file)) + { + continue; + } + + //Create a new daemon for the phone + exec('php ' . PWD . '/console.php controllers/internals/Console.php phone --id_phone=\'' . $phone['id'] . '\' > /dev/null &'); + } + } + + + public function on_start() + { + $this->logger->info("Starting Manager with pid " . getmypid()); + } + + + public function on_stop() + { + $this->logger->info("Stopping Manager with pid " . getmypid ()); + } + + + public function handle_other_signals($signal) + { + $this->logger->info("Signal not handled by " . $this->name . " Daemon : " . $signal); + } +} diff --git a/daemons/Phone.php b/daemons/Phone.php index deb003a..430324b 100644 --- a/daemons/Phone.php +++ b/daemons/Phone.php @@ -10,15 +10,24 @@ use \Monolog\Handler\StreamHandler; class Phone extends AbstractDaemon { private $msg_queue; - private $queue_id; + private $msg_queue_id; + private $webhook_queue; private $last_message_at; + private $phone; + private $adapter; + private $bdd; - public function __construct($phone_number) + /** + * Constructor + * @param array $phone : A phone table entry + */ + public function __construct(array $phone) { - $this->queue_id = (int) mb_substr($phone_number, 1); + $this->phone = $phone; + $this->msg_queue_id = (int) mb_substr($this->phone['number'], 1); + + $name = 'RaspiSMS Daemon Phone ' . $this->phone['number']; - $name = 'RaspiSMS Phone ' . $phone_number; - $logger = new Logger($name); $logger->pushHandler(new StreamHandler(PWD_LOGS . '/raspisms.log', Logger::DEBUG)); @@ -35,51 +44,162 @@ class Phone extends AbstractDaemon public function run() { + //Stop after 5 minutes of inactivity to avoid useless daemon if ( (microtime(true) - $this->last_message_at) > 5 * 60 ) { $this->is_running = false; $this->logger->info("End running"); return true; } + + $this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8'); + + //Send smss in queue + $this->send_smss(); - //Send a sms - $this->send_sms(); + //Read received smss + $this->read_smss(); + + usleep(0.5 * 1000000); } /** * Send sms */ - public function send_sms () : bool + private function send_smss () { //Call message $msgtype = null; $maxsize = 409600; $message = null; - msg_receive($this->msg_queue, SEND_MSG, $msgtype, $maxsize, $message); + $find_message = true; + while ($find_message) + { + msg_receive($this->msg_queue, QUEUE_TYPE_SEND_MSG, $msgtype, $maxsize, $message); - if (!$message) + if (!$message) + { + $find_message = false; + continue; + } + + //Update last message time + $this->last_message_at = microtime(true); + + $now = new \DateTime(); + $at = $now->format('Y-m-d H:i:s'); + + $message['at'] = $at; + + $this->logger->info('Try send message : ' . json_encode($message)); + + $sended_sms_uid = $this->adapter->send($message['destination'], $message['text'], $message['flash']); + if (!$sended_sms_uid) + { + $this->logger->info('Failed send message : ' . json_encode($message)); + $internal_sended->create($at, $message['text'], $message['origin'], $message['destination'], $sended_sms_uid, $this->phone['adapter'], $message['flash'], 'failed'); + continue; + } + + //Run webhook + $internal_setting = new \controllers\internals\Setting($this->bdd); + $user_settings = $internal_setting->gets_for_user($this->phone['id_user']); + process_for_webhook($message, 'send_sms', $user_settings); + + $this->logger->info('Successfully send message : ' . json_encode($message)); + + $internal_sended = new \controllers\internals\Sended($this->bdd); + $internal_sended->create($at, $message['text'], $message['origin'], $message['destination'], $sended_sms_uid, $this->phone['adapter'], $message['flash']); + } + } + + + /** + * Read smss for a number + */ + private function read_smss () + { + $internal_received = new \controllers\internals\Received($this->bdd); + $internal_command = new \controllers\internals\Command($this->bdd); + $internal_setting = new \controllers\internals\Setting($this->bdd); + + $smss = $this->adapter->read(); + if (!$smss) + { + return true; + } + + //Get users settings + $user_settings = $internal_setting->gets_for_user($this->phone['id_user']); + + + //Process smss + foreach ($smss as $sms) + { + $this->logger->info('Receive message : ' . json_encode($sms)); + + $command_result = $this->process_for_command($sms); + $sms['text'] = $command_result['text']; + $is_command = $command_result['is_command']; + + $this->process_for_webhook($sms, 'receive_sms', $user_settings); + + $this->internal_received->create($sms['at'], $sms['text'], $sms['origin'], $sms['destination'], 'unread', $is_command); + } + } + + + /** + * Process a sms to find if its a command and so execute it + * @param array $sms : The sms + * @return array : ['text' => new sms text, 'is_command' => bool] + */ + private function process_for_command (array $sms) + { + $is_command = false; + $command = $internal_command->check_for_command($this->phone['id_user'], $sms['text']); + if ($command) + { + $is_command = true; + exec($command['command']); + } + + return ['text' => $command['updated_text'], 'is_command' => $is_command]; + } + + + /** + * Process a sms to transmit a webhook query to webhook daemon if needed + * @param array $sms : The sms + * @param string $webhook_type : Type of webhook to trigger + * @param array $user_settings : Use settings + */ + private function process_for_webhook (array $sms, string $webhook_type, array $user_settings) + { + if (!$user_settings['webhook']) { return false; } - //If message received, update last message time - $this->last_message_at = microtime(true); - - $now = new \DateTime(); - $at = $now->format('Y-m-d H:i:s'); - - $bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8'); - $internal_sended = new \controllers\internals\Sended($bdd); - $internal_sended->create($at, $message['text'], $message['origin'], $message['destination'], $message['flash']); - - //Close bdd - $bdd = null; - $internal_scheduled = null; - - $this->logger->info('Send message : ' . json_encode($message)); - return true; + $internal_webhook = new \controllers\internals\Webhook($this->bdd); + + $webhooks = $internal_webhook->gets_for_type_and_user($this->phone['id_user'], $webhook_type); + foreach ($webhooks as $webhook) + { + $message = [ + 'url' => $webhook['url'], + 'datas' => [ + 'webhook_type' => $webhook['type'], + 'at' => $sms['at'], + 'text' => $sms['text'], + 'origin' => $sms['origin'], + 'destination' => $sms['destination'], + ], + ]; + msg_send($this->webhook_queue, QUEUE_TYPE_WEBHOOK, $webhook); + } } @@ -87,19 +207,24 @@ class Phone extends AbstractDaemon { //Set last message at to construct time $this->last_message_at = microtime(true); - - $this->msg_queue = msg_get_queue($this->queue_id); - $this->logger->info("Starting Phone with pid " . getmypid()); + $this->msg_queue = msg_get_queue($this->msg_queue_id); + $this->webhook_queue = msg_get_queue(QUEUE_ID_WEBHOOK); + + //Instanciate adapter + $this->adapter = new \adapters\TestAdapter($this->phone['number'], $this->phone['adapter_datas']); + + $this->logger->info("Starting Phone daemon with pid " . getmypid()); } public function on_stop() { - $this->logger->info("Closing queue : " . $this->queue_id); - msg_remove_queue($this->msg_queue); //Delete queue on daemon close + //Delete queue on daemon close + $this->logger->info("Closing queue : " . $this->msg_queue_id); + msg_remove_queue($this->msg_queue); - $this->logger->info("Stopping Phone with pid " . getmypid ()); + $this->logger->info("Stopping Phone daemon with pid " . getmypid ()); } diff --git a/daemons/Sender.php b/daemons/Sender.php new file mode 100644 index 0000000..cb54138 --- /dev/null +++ b/daemons/Sender.php @@ -0,0 +1,103 @@ +pushHandler(new StreamHandler(PWD_LOGS . '/raspisms.log', Logger::DEBUG)); + + $name = "RaspiSMS Daemon Sender"; + $pid_dir = PWD_PID; + $additional_signals = []; + $uniq = true; //Sender should be uniq + + //Construct the server and add SIGUSR1 and SIGUSR2 + parent::__construct($name, $logger, $pid_dir, $additional_signals, $uniq); + + parent::start(); + } + + + public function run() + { + //Create the internal controllers + $this->internal_scheduled = new \controllers\internals\Scheduled($this->bdd); + + //Get smss and transmit order to send to appropriate phone daemon + $smss = $this->internal_scheduled->get_smss_to_send(); + $this->transmit_smss($smss); //Add new queues to array of queues + + usleep(0.5 * 1000000); + } + + + /** + * Function to get messages to send and transfer theme to phones daemons + * @param array $smss : Smss to send + */ + public function transmit_smss (array $smss) : void + { + foreach ($smss as $sms) + { + //If queue not already exists + $queue_id = (int) mb_substr($sms['origin'], 1); + if (!msg_queue_exists($queue_id) || !isset($queues[$queue_id])) + { + $this->queues[$queue_id] = msg_get_queue($queue_id); + } + + $msg = [ + 'id_user' => $sms['id_user'], + 'id_scheduled' => $sms['id_scheduled'], + 'text' => $sms['text'], + 'origin' => $sms['origin'], + 'destination' => $sms['destination'], + 'flash' => $sms['flash'], + ]; + + msg_send($this->queues[$queue_id], QUEUE_TYPE_SEND_MSG, $msg); + + $this->internal_scheduled->delete($sms['id_scheduled']); + } + } + + + public function on_start() + { + $this->logger->info("Starting Sender with pid " . getmypid()); + $this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8'); + } + + + public function on_stop() + { + $this->logger->info("Stopping Sender with pid " . getmypid ()); + + //Delete queues on daemon close + foreach ($this->queues as $queue_id => $queue) + { + $this->logger->info("Closing queue : " . $queue_id); + msg_remove_queue($queue); + } + } + + + public function handle_other_signals($signal) + { + $this->logger->info("Signal not handled by " . $this->name . " Daemon : " . $signal); + } +} diff --git a/daemons/Server.php b/daemons/Server.php deleted file mode 100644 index 5cebdac..0000000 --- a/daemons/Server.php +++ /dev/null @@ -1,141 +0,0 @@ -pushHandler(new StreamHandler(PWD_LOGS . '/raspisms.log', Logger::DEBUG)); - - $name = "RaspiSMS Server"; - $pid_dir = PWD_PID; - $additional_signals = []; - $uniq = true; //Main server should be uniq - - //Construct the server and add SIGUSR1 and SIGUSR2 - parent::__construct($name, $logger, $pid_dir, $additional_signals, $uniq); - - parent::start(); - } - - - public function run() - { - //Create the internal controllers - $this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8'); - $this->internal_phone = new \controllers\internals\Phone($this->bdd); - $this->internal_scheduled = new \controllers\internals\Scheduled($this->bdd); - $this->internal_received = new \controllers\internals\Received($this->bdd); - - - //Start all phones daemons - $phones = $this->internal_phone->get_all(); - $this->start_daemons($phones); - - - //Send smss - $smss = $this->internal_scheduled->get_smss_to_send(); - $this->daemons_queues = $this->send_smss($this->daemons_queues, $smss, $this->internal_scheduled); //Add new queues to array of queues - - - //Read smss - //$this->read_smss($this->internal_received); - - sleep(0.5); - } - - - /** - * Function to start phones daemons - * @param array $phones : Phones to start daemon for if the daemon is not already started - * @return void - */ - public function start_daemons (array $phones) : void - { - foreach ($phones as $phone) - { - $phone_name = 'RaspiSMS Phone ' . $phone['number']; - $pid_file = PWD_PID . '/' . $phone_name . '.pid'; - - if (file_exists($pid_file)) - { - continue; - } - - //Create a new daemon for the phone - exec('php ' . PWD . '/console.php controllers/internals/Console.php phone number=\'' . $phone['number'] . '\' > /dev/null &'); - } - } - - - /** - * Function to get messages to send and transfer theme to daemons - * @param array $queues : Queues for phones - * @param array $smss : Smss to send - * @param \controllers\internals\Scheduled $internal_scheduled : Internal Scheduled - * @return array : array of queues with new queues appened - */ - public function send_smss (array $queues, array $smss, \controllers\internals\Scheduled $internal_scheduled) : array - { - foreach ($smss as $sms) - { - //If the queue has been deleted or does not exist, create a new one - $queue_id = (int) mb_substr($sms['origin'], 1); - if (!msg_queue_exists($queue_id)) - { - $queues[$queue_id] = msg_get_queue($queue_id); - } - elseif (!isset($queues[$queue_id])) - { - $queues[$queue_id] = msg_get_queue($queue_id); - } - - $queue = $queues[$queue_id]; - - $msg = [ - 'id_scheduled' => $sms['id_scheduled'], - 'text' => $sms['text'], - 'origin' => $sms['origin'], - 'destination' => $sms['destination'], - 'flash' => $sms['flash'], - ]; - - msg_send($queue, SEND_MSG, $msg); - - $internal_scheduled->delete($sms['id_scheduled']); - } - - return $queues; - } - - - public function on_start() - { - $this->logger->info("Starting Server with pid " . getmypid()); - } - - - public function on_stop() - { - $this->logger->info("Stopping Server with pid " . getmypid ()); - } - - - public function handle_other_signals($signal) - { - $this->logger->info("Signal not handled by " . $this->name . " Daemon : " . $signal); - } -} diff --git a/daemons/Webhook.php b/daemons/Webhook.php new file mode 100644 index 0000000..188bd19 --- /dev/null +++ b/daemons/Webhook.php @@ -0,0 +1,99 @@ +pushHandler(new StreamHandler(PWD_LOGS . '/raspisms.log', Logger::DEBUG)); + + $pid_dir = PWD_PID; + $additional_signals = []; + $uniq = true; //Main server should be uniq + + //Construct the server and add SIGUSR1 and SIGUSR2 + parent::__construct($name, $logger, $pid_dir, $additional_signals, $uniq); + + parent::start(); + } + + + public function run() + { + //Call message + $msgtype = null; + $maxsize = 409600; + $message = null; + + $find_message = true; + while ($find_message) + { + msg_receive($this->webhook_queue, QUEUE_ID_WEBHOOK, $msgtype, $maxsize, $message); + + if (!$message) + { + $find_message = false; + continue; + } + + $this->logger->info('Trigger webhook : ' . json_encode($message)); + + //Do the webhook http query + $curl = curl_init(); + curl_setopt($curl, CURLOPT_URL, $message['url']); + curl_setopt($curl, CURLOPT_FOLLOWLOCATION, true); + curl_setopt($curl, CURLOPT_POST, true); + curl_setopt($curl, CURLOPT_POSTFIELDS, $message['datas']); + curl_exec($curl); + curl_close($curl); + } + + usleep(0.5 * 1000000); + } + + + public function on_start() + { + //Set last message at to construct time + $this->last_message_at = microtime(true); + + $this->webhook_queue = msg_get_queue(QUEUE_ID_WEBHOOK); + + $this->logger->info("Starting Webhook daemon with pid " . getmypid()); + } + + + public function on_stop() + { + //Delete queue on daemon close + $this->logger->info("Closing queue : " . QUEUE_ID_WEBHOOK); + msg_remove_queue($this->webhook_queue); + + $this->logger->info("Stopping Webhook daemon with pid " . getmypid ()); + } + + + public function handle_other_signals($signal) + { + $this->logger->info("Signal not handled by " . $this->name . " Daemon : " . $signal); + } +}