Separate daemons and responsabilities insides thoses daemons. Add webhook & commands, still to test

This commit is contained in:
osaajani 2020-01-07 01:31:34 +01:00
parent 78b3ded31d
commit b5a36b1169
9 changed files with 530 additions and 184 deletions

View File

@ -57,9 +57,8 @@
/**
* Method called to read SMSs of the number
* @param float $since : Unix microtime representation of the date from wich we want to read the SMSs
* Method called to read unread SMSs of the number
* @return array : Array of the sms reads
*/
public function read (float $since) : array;
public function read () : array;
}

View File

@ -76,10 +76,9 @@
/**
* Method called to read SMSs of the number
* @param float $since : Unix microtime representation of the date from wich we want to read the SMSs
* @return array : Array of the sms reads
*/
public function read (float $since) : array
public function read () : array
{
return [];
}

View File

@ -76,10 +76,9 @@
/**
* Method called to read SMSs of the number
* @param float $since : Unix microtime representation of the date from wich we want to read the SMSs
* @return array : Array of the sms reads
*/
public function read (float $since) : array
public function read () : array
{
return [];
}

View File

@ -16,14 +16,48 @@ namespace controllers\internals;
*/
class Console extends \descartes\InternalController
{
public function server ()
/**
* Start manager daemon
*/
public function manager ()
{
$server = new \daemons\Server();
new \daemons\Manager();
}
public function phone ($number)
/**
* Start sender daemon
*/
public function sender ()
{
$server = new \daemons\Phone($number);
new \daemons\Sender();
}
/**
* Start webhook daemon
*/
public function webhook ()
{
new \daemons\Webhook();
}
/**
* Start a phone daemon
* @param $id_phone : Phone id
*/
public function phone ($id_phone)
{
$bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8');
$internal_phone = new \controllers\internals\Phone($bdd);
$phone = $internal_phone->get($id_phone);
if (!$phone)
{
return false;
}
new \daemons\Phone($phone);
}
}

129
daemons/Manager.php Normal file
View File

@ -0,0 +1,129 @@
<?php
namespace daemons;
use \Monolog\Logger;
use \Monolog\Handler\StreamHandler;
/**
* Main daemon class
*/
class Manager extends AbstractDaemon
{
private $internal_phone;
private $internal_scheduled;
private $internal_received;
private $bdd;
public function __construct()
{
$logger = new Logger('Daemon Manager');
$logger->pushHandler(new StreamHandler(PWD_LOGS . '/raspisms.log', Logger::DEBUG));
$name = "RaspiSMS Daemon Manager";
$pid_dir = PWD_PID;
$additional_signals = [];
$uniq = true; //Main server should be uniq
//Construct the server and add SIGUSR1 and SIGUSR2
parent::__construct($name, $logger, $pid_dir, $additional_signals, $uniq);
parent::start();
}
public function run()
{
//Create the internal controllers
$this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8');
$this->internal_phone = new \controllers\internals\Phone($this->bdd);
$this->start_sender_daemon();
$this->start_webhook_daemon();
$phones = $this->internal_phone->get_all();
$this->start_phones_daemons($phones);
sleep(1);
}
/**
* Function to start sender daemon
* @return void
*/
public function start_sender_daemon ()
{
$name = 'RaspiSMS Daemon Sender';
$pid_file = PWD_PID . '/' . $name . '.pid';
if (file_exists($pid_file))
{
return false;
}
//Create a new daemon for sender
exec('php ' . PWD . '/console.php controllers/internals/Console.php sender > /dev/null &');
}
/**
* Function to start webhook daemon
* @return void
*/
public function start_webhook_daemon ()
{
$name = 'RaspiSMS Daemon Webhook';
$pid_file = PWD_PID . '/' . $name . '.pid';
if (file_exists($pid_file))
{
return false;
}
//Create a new daemon for webhook
exec('php ' . PWD . '/console.php controllers/internals/Console.php webhook > /dev/null &');
}
/**
* Function to start phones daemons
* @param array $phones : Phones to start daemon for if the daemon is not already started
* @return void
*/
public function start_phones_daemons (array $phones)
{
foreach ($phones as $phone)
{
$phone_name = 'RaspiSMS Daemon Phone ' . $phone['number'];
$pid_file = PWD_PID . '/' . $phone_name . '.pid';
if (file_exists($pid_file))
{
continue;
}
//Create a new daemon for the phone
exec('php ' . PWD . '/console.php controllers/internals/Console.php phone --id_phone=\'' . $phone['id'] . '\' > /dev/null &');
}
}
public function on_start()
{
$this->logger->info("Starting Manager with pid " . getmypid());
}
public function on_stop()
{
$this->logger->info("Stopping Manager with pid " . getmypid ());
}
public function handle_other_signals($signal)
{
$this->logger->info("Signal not handled by " . $this->name . " Daemon : " . $signal);
}
}

View File

@ -10,15 +10,24 @@ use \Monolog\Handler\StreamHandler;
class Phone extends AbstractDaemon
{
private $msg_queue;
private $queue_id;
private $msg_queue_id;
private $webhook_queue;
private $last_message_at;
private $phone;
private $adapter;
private $bdd;
public function __construct($phone_number)
/**
* Constructor
* @param array $phone : A phone table entry
*/
public function __construct(array $phone)
{
$this->queue_id = (int) mb_substr($phone_number, 1);
$this->phone = $phone;
$this->msg_queue_id = (int) mb_substr($this->phone['number'], 1);
$name = 'RaspiSMS Daemon Phone ' . $this->phone['number'];
$name = 'RaspiSMS Phone ' . $phone_number;
$logger = new Logger($name);
$logger->pushHandler(new StreamHandler(PWD_LOGS . '/raspisms.log', Logger::DEBUG));
@ -35,51 +44,162 @@ class Phone extends AbstractDaemon
public function run()
{
//Stop after 5 minutes of inactivity to avoid useless daemon
if ( (microtime(true) - $this->last_message_at) > 5 * 60 )
{
$this->is_running = false;
$this->logger->info("End running");
return true;
}
$this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8');
//Send smss in queue
$this->send_smss();
//Send a sms
$this->send_sms();
//Read received smss
$this->read_smss();
usleep(0.5 * 1000000);
}
/**
* Send sms
*/
public function send_sms () : bool
private function send_smss ()
{
//Call message
$msgtype = null;
$maxsize = 409600;
$message = null;
msg_receive($this->msg_queue, SEND_MSG, $msgtype, $maxsize, $message);
$find_message = true;
while ($find_message)
{
msg_receive($this->msg_queue, QUEUE_TYPE_SEND_MSG, $msgtype, $maxsize, $message);
if (!$message)
if (!$message)
{
$find_message = false;
continue;
}
//Update last message time
$this->last_message_at = microtime(true);
$now = new \DateTime();
$at = $now->format('Y-m-d H:i:s');
$message['at'] = $at;
$this->logger->info('Try send message : ' . json_encode($message));
$sended_sms_uid = $this->adapter->send($message['destination'], $message['text'], $message['flash']);
if (!$sended_sms_uid)
{
$this->logger->info('Failed send message : ' . json_encode($message));
$internal_sended->create($at, $message['text'], $message['origin'], $message['destination'], $sended_sms_uid, $this->phone['adapter'], $message['flash'], 'failed');
continue;
}
//Run webhook
$internal_setting = new \controllers\internals\Setting($this->bdd);
$user_settings = $internal_setting->gets_for_user($this->phone['id_user']);
process_for_webhook($message, 'send_sms', $user_settings);
$this->logger->info('Successfully send message : ' . json_encode($message));
$internal_sended = new \controllers\internals\Sended($this->bdd);
$internal_sended->create($at, $message['text'], $message['origin'], $message['destination'], $sended_sms_uid, $this->phone['adapter'], $message['flash']);
}
}
/**
* Read smss for a number
*/
private function read_smss ()
{
$internal_received = new \controllers\internals\Received($this->bdd);
$internal_command = new \controllers\internals\Command($this->bdd);
$internal_setting = new \controllers\internals\Setting($this->bdd);
$smss = $this->adapter->read();
if (!$smss)
{
return true;
}
//Get users settings
$user_settings = $internal_setting->gets_for_user($this->phone['id_user']);
//Process smss
foreach ($smss as $sms)
{
$this->logger->info('Receive message : ' . json_encode($sms));
$command_result = $this->process_for_command($sms);
$sms['text'] = $command_result['text'];
$is_command = $command_result['is_command'];
$this->process_for_webhook($sms, 'receive_sms', $user_settings);
$this->internal_received->create($sms['at'], $sms['text'], $sms['origin'], $sms['destination'], 'unread', $is_command);
}
}
/**
* Process a sms to find if its a command and so execute it
* @param array $sms : The sms
* @return array : ['text' => new sms text, 'is_command' => bool]
*/
private function process_for_command (array $sms)
{
$is_command = false;
$command = $internal_command->check_for_command($this->phone['id_user'], $sms['text']);
if ($command)
{
$is_command = true;
exec($command['command']);
}
return ['text' => $command['updated_text'], 'is_command' => $is_command];
}
/**
* Process a sms to transmit a webhook query to webhook daemon if needed
* @param array $sms : The sms
* @param string $webhook_type : Type of webhook to trigger
* @param array $user_settings : Use settings
*/
private function process_for_webhook (array $sms, string $webhook_type, array $user_settings)
{
if (!$user_settings['webhook'])
{
return false;
}
//If message received, update last message time
$this->last_message_at = microtime(true);
$now = new \DateTime();
$at = $now->format('Y-m-d H:i:s');
$bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8');
$internal_sended = new \controllers\internals\Sended($bdd);
$internal_sended->create($at, $message['text'], $message['origin'], $message['destination'], $message['flash']);
//Close bdd
$bdd = null;
$internal_scheduled = null;
$this->logger->info('Send message : ' . json_encode($message));
return true;
$internal_webhook = new \controllers\internals\Webhook($this->bdd);
$webhooks = $internal_webhook->gets_for_type_and_user($this->phone['id_user'], $webhook_type);
foreach ($webhooks as $webhook)
{
$message = [
'url' => $webhook['url'],
'datas' => [
'webhook_type' => $webhook['type'],
'at' => $sms['at'],
'text' => $sms['text'],
'origin' => $sms['origin'],
'destination' => $sms['destination'],
],
];
msg_send($this->webhook_queue, QUEUE_TYPE_WEBHOOK, $webhook);
}
}
@ -87,19 +207,24 @@ class Phone extends AbstractDaemon
{
//Set last message at to construct time
$this->last_message_at = microtime(true);
$this->msg_queue = msg_get_queue($this->queue_id);
$this->logger->info("Starting Phone with pid " . getmypid());
$this->msg_queue = msg_get_queue($this->msg_queue_id);
$this->webhook_queue = msg_get_queue(QUEUE_ID_WEBHOOK);
//Instanciate adapter
$this->adapter = new \adapters\TestAdapter($this->phone['number'], $this->phone['adapter_datas']);
$this->logger->info("Starting Phone daemon with pid " . getmypid());
}
public function on_stop()
{
$this->logger->info("Closing queue : " . $this->queue_id);
msg_remove_queue($this->msg_queue); //Delete queue on daemon close
//Delete queue on daemon close
$this->logger->info("Closing queue : " . $this->msg_queue_id);
msg_remove_queue($this->msg_queue);
$this->logger->info("Stopping Phone with pid " . getmypid ());
$this->logger->info("Stopping Phone daemon with pid " . getmypid ());
}

103
daemons/Sender.php Normal file
View File

@ -0,0 +1,103 @@
<?php
namespace daemons;
use \Monolog\Logger;
use \Monolog\Handler\StreamHandler;
/**
* Main daemon class
*/
class Sender extends AbstractDaemon
{
private $internal_phone;
private $internal_scheduled;
private $internal_received;
private $bdd;
private $queues = [];
public function __construct()
{
$logger = new Logger('Daemon Sender');
$logger->pushHandler(new StreamHandler(PWD_LOGS . '/raspisms.log', Logger::DEBUG));
$name = "RaspiSMS Daemon Sender";
$pid_dir = PWD_PID;
$additional_signals = [];
$uniq = true; //Sender should be uniq
//Construct the server and add SIGUSR1 and SIGUSR2
parent::__construct($name, $logger, $pid_dir, $additional_signals, $uniq);
parent::start();
}
public function run()
{
//Create the internal controllers
$this->internal_scheduled = new \controllers\internals\Scheduled($this->bdd);
//Get smss and transmit order to send to appropriate phone daemon
$smss = $this->internal_scheduled->get_smss_to_send();
$this->transmit_smss($smss); //Add new queues to array of queues
usleep(0.5 * 1000000);
}
/**
* Function to get messages to send and transfer theme to phones daemons
* @param array $smss : Smss to send
*/
public function transmit_smss (array $smss) : void
{
foreach ($smss as $sms)
{
//If queue not already exists
$queue_id = (int) mb_substr($sms['origin'], 1);
if (!msg_queue_exists($queue_id) || !isset($queues[$queue_id]))
{
$this->queues[$queue_id] = msg_get_queue($queue_id);
}
$msg = [
'id_user' => $sms['id_user'],
'id_scheduled' => $sms['id_scheduled'],
'text' => $sms['text'],
'origin' => $sms['origin'],
'destination' => $sms['destination'],
'flash' => $sms['flash'],
];
msg_send($this->queues[$queue_id], QUEUE_TYPE_SEND_MSG, $msg);
$this->internal_scheduled->delete($sms['id_scheduled']);
}
}
public function on_start()
{
$this->logger->info("Starting Sender with pid " . getmypid());
$this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8');
}
public function on_stop()
{
$this->logger->info("Stopping Sender with pid " . getmypid ());
//Delete queues on daemon close
foreach ($this->queues as $queue_id => $queue)
{
$this->logger->info("Closing queue : " . $queue_id);
msg_remove_queue($queue);
}
}
public function handle_other_signals($signal)
{
$this->logger->info("Signal not handled by " . $this->name . " Daemon : " . $signal);
}
}

View File

@ -1,141 +0,0 @@
<?php
namespace daemons;
use \Monolog\Logger;
use \Monolog\Handler\StreamHandler;
/**
* Main daemon class
*/
class Server extends AbstractDaemon
{
private $internal_phone;
private $internal_scheduled;
private $internal_received;
private $bdd;
private $daemons_queues = [];
public function __construct()
{
$logger = new Logger('server');
$logger->pushHandler(new StreamHandler(PWD_LOGS . '/raspisms.log', Logger::DEBUG));
$name = "RaspiSMS Server";
$pid_dir = PWD_PID;
$additional_signals = [];
$uniq = true; //Main server should be uniq
//Construct the server and add SIGUSR1 and SIGUSR2
parent::__construct($name, $logger, $pid_dir, $additional_signals, $uniq);
parent::start();
}
public function run()
{
//Create the internal controllers
$this->bdd = \descartes\Model::_connect(DATABASE_HOST, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, 'UTF8');
$this->internal_phone = new \controllers\internals\Phone($this->bdd);
$this->internal_scheduled = new \controllers\internals\Scheduled($this->bdd);
$this->internal_received = new \controllers\internals\Received($this->bdd);
//Start all phones daemons
$phones = $this->internal_phone->get_all();
$this->start_daemons($phones);
//Send smss
$smss = $this->internal_scheduled->get_smss_to_send();
$this->daemons_queues = $this->send_smss($this->daemons_queues, $smss, $this->internal_scheduled); //Add new queues to array of queues
//Read smss
//$this->read_smss($this->internal_received);
sleep(0.5);
}
/**
* Function to start phones daemons
* @param array $phones : Phones to start daemon for if the daemon is not already started
* @return void
*/
public function start_daemons (array $phones) : void
{
foreach ($phones as $phone)
{
$phone_name = 'RaspiSMS Phone ' . $phone['number'];
$pid_file = PWD_PID . '/' . $phone_name . '.pid';
if (file_exists($pid_file))
{
continue;
}
//Create a new daemon for the phone
exec('php ' . PWD . '/console.php controllers/internals/Console.php phone number=\'' . $phone['number'] . '\' > /dev/null &');
}
}
/**
* Function to get messages to send and transfer theme to daemons
* @param array $queues : Queues for phones
* @param array $smss : Smss to send
* @param \controllers\internals\Scheduled $internal_scheduled : Internal Scheduled
* @return array : array of queues with new queues appened
*/
public function send_smss (array $queues, array $smss, \controllers\internals\Scheduled $internal_scheduled) : array
{
foreach ($smss as $sms)
{
//If the queue has been deleted or does not exist, create a new one
$queue_id = (int) mb_substr($sms['origin'], 1);
if (!msg_queue_exists($queue_id))
{
$queues[$queue_id] = msg_get_queue($queue_id);
}
elseif (!isset($queues[$queue_id]))
{
$queues[$queue_id] = msg_get_queue($queue_id);
}
$queue = $queues[$queue_id];
$msg = [
'id_scheduled' => $sms['id_scheduled'],
'text' => $sms['text'],
'origin' => $sms['origin'],
'destination' => $sms['destination'],
'flash' => $sms['flash'],
];
msg_send($queue, SEND_MSG, $msg);
$internal_scheduled->delete($sms['id_scheduled']);
}
return $queues;
}
public function on_start()
{
$this->logger->info("Starting Server with pid " . getmypid());
}
public function on_stop()
{
$this->logger->info("Stopping Server with pid " . getmypid ());
}
public function handle_other_signals($signal)
{
$this->logger->info("Signal not handled by " . $this->name . " Daemon : " . $signal);
}
}

99
daemons/Webhook.php Normal file
View File

@ -0,0 +1,99 @@
<?php
namespace daemons;
use \Monolog\Logger;
use \Monolog\Handler\StreamHandler;
/**
* Phone daemon class
*/
class Webhook extends AbstractDaemon
{
private $webhook_queue;
private $last_message_at;
private $phone;
private $adapter;
private $bdd;
/**
* Constructor
* @param array $phone : A phone table entry
*/
public function __construct()
{
$name = 'RaspiSMS Daemon Webhook';
$logger = new Logger($name);
$logger->pushHandler(new StreamHandler(PWD_LOGS . '/raspisms.log', Logger::DEBUG));
$pid_dir = PWD_PID;
$additional_signals = [];
$uniq = true; //Main server should be uniq
//Construct the server and add SIGUSR1 and SIGUSR2
parent::__construct($name, $logger, $pid_dir, $additional_signals, $uniq);
parent::start();
}
public function run()
{
//Call message
$msgtype = null;
$maxsize = 409600;
$message = null;
$find_message = true;
while ($find_message)
{
msg_receive($this->webhook_queue, QUEUE_ID_WEBHOOK, $msgtype, $maxsize, $message);
if (!$message)
{
$find_message = false;
continue;
}
$this->logger->info('Trigger webhook : ' . json_encode($message));
//Do the webhook http query
$curl = curl_init();
curl_setopt($curl, CURLOPT_URL, $message['url']);
curl_setopt($curl, CURLOPT_FOLLOWLOCATION, true);
curl_setopt($curl, CURLOPT_POST, true);
curl_setopt($curl, CURLOPT_POSTFIELDS, $message['datas']);
curl_exec($curl);
curl_close($curl);
}
usleep(0.5 * 1000000);
}
public function on_start()
{
//Set last message at to construct time
$this->last_message_at = microtime(true);
$this->webhook_queue = msg_get_queue(QUEUE_ID_WEBHOOK);
$this->logger->info("Starting Webhook daemon with pid " . getmypid());
}
public function on_stop()
{
//Delete queue on daemon close
$this->logger->info("Closing queue : " . QUEUE_ID_WEBHOOK);
msg_remove_queue($this->webhook_queue);
$this->logger->info("Stopping Webhook daemon with pid " . getmypid ());
}
public function handle_other_signals($signal)
{
$this->logger->info("Signal not handled by " . $this->name . " Daemon : " . $signal);
}
}