Connection manager implementation and fix circ dependencies

This commit is contained in:
ezratl 2019-03-02 01:24:22 -05:00
parent 440084f0a7
commit 6ebeab508d
12 changed files with 488 additions and 146 deletions

View File

@ -16,5 +16,23 @@
#define CONFIG_PATH "~/piscan.config"
#endif
enum ConnectionLevel {
RECEIVE_ONLY, VIEWER, FULL_CONTROL,
};
enum RequestType {
NOTIFY_DISCONNECTED = 0,
SYSTEM_FUNCTION,
SCANNER_FUNCTION,
DATABASE_RETRIEVE,
DATABASE_MODIFY,
CONFIG_RETRIEVE,
CONFIG_MODIFY,
};
enum {
SYSTEM_STOP,
};
#endif /* CORE_CONSTANTS_H_ */

View File

@ -21,14 +21,17 @@ enum {
SERVER_MAN,
AUDIO_MAN,
CLIENT = 255,
CLIENT = 254,
ALL = 255
};
static int activeMessages = 0;
/* basic interthread message structure */
class Message {
public:
Message(unsigned char src, unsigned char dst, void* data = 0) : source(src), destination(dst), pData(data) {}
virtual ~Message() {};
Message(unsigned char src, unsigned char dst, void* data = 0) : source(src), destination(dst), pData(data) { activeMessages++; }
virtual ~Message() { activeMessages--; };
const unsigned char source;
const unsigned char destination;
void* const pData;

View File

@ -5,8 +5,7 @@ using namespace std;
#include "constants.h"
#include "Demodulator.h"
#include "Entry.h"
//#include "loguru.hpp"
#include "loguru.cpp"
#include "loguru.hpp"
#include "messages.h"
#include "ServerManager.h"
#include "ScannerStateMachine.h"
@ -26,6 +25,9 @@ using namespace std;
#define DEMOD_FLAG 0x04
#define AUDIO_FLAG 0x08
//#define ALL_FLAG 0x0F
#define ALL_FLAG 0x03
class MessageManager : public MessageReceiver {
public:
MessageManager() : _run(false) {};
@ -40,6 +42,16 @@ public:
}
}
void start() {
_run = true;
_workThread = std::thread(&MessageManager::_handlerThreadFunc, this);
}
void stop(bool block) {
_run = false;
if(block)
_workThread.join();
}
private:
moodycamel::ConcurrentQueue<Message*> _queue;
std::thread _workThread;
@ -47,11 +59,14 @@ private:
MessageReceiver* _receivers[MESSAGE_RECEIVERS];
void _handlerThreadFunc(void){
LOG_F(2, "MessageManager started");
while(_run){
Message* message;
if(_queue.try_dequeue(message)){
DCHECK_F(message != nullptr);
DCHECK_F(message->destination != message->source);
DLOG_F(9, "Message receive | dst:%d | src:%d", message->destination, message->source);
if(message->destination < MESSAGE_RECEIVERS){
MessageReceiver* receiver = _receivers[message->destination];
@ -69,6 +84,7 @@ private:
}
}
}
LOG_F(2, "MessageManager stopped");
}
void giveMessage(Message& message){
@ -78,28 +94,131 @@ private:
class SystemController : public MessageReceiver {
public:
SystemController(MessageReceiver& central) : _centralQueue(central) {}
SystemController(MessageReceiver& central, SystemList& syslist,
ScannerStateMachine& scan, ServerManager& conmgr, Demodulator& dm) :
_centralQueue(central), _systemList(syslist), _scanner(scan), _connectionManager(
conmgr), _demod(dm), _flagLock(_flagsMutex, std::defer_lock) {
//_flagLock(_flagsMutex);
//_flagLock.unlock();
}
~SystemController() {};
void start(){
LOG_F(1, "System Control start");
_scanner.start();
_connectionManager.start();
while(1){
//_flagLock.lock();
if(_activeModules == ALL_FLAG){
//_flagLock.unlock();
break;
}
//_flagLock.unlock();
}
LOG_F(2, "All modules started");
_connectionManager.allowConnections();
}
void stop(){
LOG_F(1, "Stopping system");
_scanner.stopScanner();
_centralQueue.giveMessage(*(new ServerMessage(SYSTEM_CONTROL, ServerMessage::STOP, nullptr)));
while(1){
//_flagLock.lock();
if(_activeModules == 0){
//_flagLock.unlock();
break;
}
//_flagLock.unlock();
}
LOG_F(2, "All modules stopped");
}
private:
MessageReceiver& _centralQueue;
SystemList& _systemList;
ScannerStateMachine& _scanner;
ServerManager& _connectionManager;
Demodulator& _demod;
std::mutex _flagsMutex;
std::unique_lock<std::mutex> _flagLock;
unsigned char _activeModules = 0;
void giveMessage(Message& message){
auto msg = dynamic_cast<ControllerMessage&>(message);
ClientRequest* request;
switch(msg.type){
case ControllerMessage::CLIENT_REQUEST:
//TODO request handling
request = reinterpret_cast<ClientRequest*>(message.pData);
processRequest(*request);
break;
case ControllerMessage::NOTIFY_READY:
//TODO module start
switch(msg.source){
case SCANNER_SM:
_activeModules |= SCANNER_FLAG;
break;
case DEMOD:
_activeModules |= DEMOD_FLAG;
break;
case SERVER_MAN:
_activeModules |= CONNMGR_FLAG;
break;
case AUDIO_MAN:
_activeModules |= AUDIO_FLAG;
break;
default:
break;
}
break;
case ControllerMessage::NOTIFY_STOPPED:
//TODO module delete
switch (msg.source) {
case SCANNER_SM:
_activeModules &= ~SCANNER_FLAG;
break;
case DEMOD:
_activeModules &= ~DEMOD_FLAG;
break;
case SERVER_MAN:
_activeModules &= ~CONNMGR_FLAG;
break;
case AUDIO_MAN:
_activeModules &= ~AUDIO_FLAG;
break;
default:
break;
}
break;
}
delete &message;
}
void processRequest(ClientRequest& request){
switch(request.rqInfo.type){
case SYSTEM_FUNCTION:
switch(request.rqInfo.subType){
case SYSTEM_STOP:
break;
default:
break;
}
break;
default:
break;
}
delete &request;
}
};
@ -107,8 +226,8 @@ static MessageManager messageManager;
static SystemList scanSystems;
static ScannerStateMachine scanner(messageManager, scanSystems);
static ServerManager connectionManager(messageManager);
static SystemController sysControl(messageManager);
static Demodulator demod;
static SystemController sysControl(messageManager, scanSystems, scanner, connectionManager, demod);
void setDemodulator(DemodInterface* demod) {
DCHECK_F(demod != nullptr);
@ -117,7 +236,7 @@ void setDemodulator(DemodInterface* demod) {
int main(int argc, char **argv) {
loguru::init(argc, argv);
loguru::add_file(LOG_PATH, loguru::Truncate, loguru::Verbosity_MAX);
loguru::add_file(LOG_PATH, loguru::Truncate, loguru::Verbosity_2);
loguru::g_stderr_verbosity = loguru::Verbosity_MAX;
LOG_F(INFO, "PiScan started");
@ -129,5 +248,18 @@ int main(int argc, char **argv) {
setDemodulator(&demod);
messageManager.start();
sysControl.start();
for(unsigned int i = 0; i > 0; i++);
sysControl.stop();
messageManager.stop(true);
LOG_F(INFO, "PiScan stopped");
if(activeMessages > 0)
DLOG_F(WARNING, "Memory leak: %i messages not deleted!", activeMessages);
return 0;
}

View File

@ -9,7 +9,7 @@
#define BACKENDSERVER_H_
#include "messages.h"
#include "ServerManager.h"
#include "connection.h"
class BackendServer : public MessageReceiver {
public:

View File

@ -5,15 +5,9 @@
* Author: ezra
*/
#include "ServerManager.h"
Connection::ConnectionLevel ClientRequest::permissionMap[] = {
[SYSTEM_FUNCTION] = Connection::ConnectionLevel::FULL_CONTROL,
[SCANNER_FUNCTION] = Connection::ConnectionLevel::FULL_CONTROL,
[DATABASE_RETRIEVE] = Connection::ConnectionLevel::VIEWER,
[DATABASE_MODIFY] = Connection::ConnectionLevel::FULL_CONTROL,
[CONFIG_RETRIEVE] = Connection::ConnectionLevel::VIEWER,
[CONFIG_MODIFY] = Connection::ConnectionLevel::FULL_CONTROL,
};
//#include "connection.h"
//#include "request.h"

View File

@ -6,17 +6,24 @@
*/
#include <iostream>
#include <string>
#include <sstream>
#include "constants.h"
#include "DebugServer.h"
#include "loguru.hpp"
void DebugConsole::connect(){
std::cout << "Console connected";
std::cout << "Connecting...";
_run = true;
_requestThread = std::thread(&DebugConsole::_consoleInputFunc, this);
}
void DebugConsole::disconnect(){
_run = false;
std::cout << "Console disconnected";
std::fprintf(stdin, "\n");
std::cout << "\nConsole disconnected\n";
notifyDisconnected();
}
@ -24,15 +31,33 @@ void DebugConsole::giveMessage(Message& message){
delete &message;
}
void DebugConsole::_consoleInputFunc() {
std::string input = "";
std::cout << "\nConsole connected\n";
while(_run){
input.clear();
std::getline(std::cin, input);
if(input.compare("stop") == 0){
_run = false;
ClientRequest::RequestParams params;
issueRequest(params);
}
}
std::cout << "\nConsole thread exited\n";
}
void DebugServer::start(){
this->_connection = new DebugConsole();
this->_host.requestConnection(*_connection);
this->_host.requestConnection(_connection);
if(_connection == nullptr)
DLOG_F(WARNING, "Debug connection failed");
}
void DebugServer::stop(){
if (_connection->_run)
_connection->disconnect();
}
void DebugServer::giveMessage(Message& message){

View File

@ -8,21 +8,31 @@
#ifndef SERVERDEBUGOUTPUT_H_
#define SERVERDEBUGOUTPUT_H_
#include <thread>
#include "BackendServer.h"
#include "ServerManager.h"
#include "connection.h"
class DebugServer;
class DebugConsole : public Connection {
public:
DebugConsole() : Connection(FULL_CONTROL, AUDIO_NONE) {}
~DebugConsole() {};
~DebugConsole() {
_requestThread.join();
};
void connect();
void disconnect();
void giveMessage(Message& message);
friend DebugServer;
private:
bool _run = false;
std::thread _requestThread;
void _consoleInputFunc();
};
class DebugServer : public BackendServer {

View File

@ -6,6 +6,8 @@
*/
#include "ServerManager.h"
#include "loguru.hpp"
#include "DebugServer.h"
#define MAX_CONNECTIONS 5
#define QUEUE_SIZE 64
@ -13,50 +15,77 @@
ServerManager::ServerManager(MessageReceiver& central) :
_centralQueue(central), _queue(QUEUE_SIZE), _activeConnections(0), _connections(
MAX_CONNECTIONS) {
_servers.push_back(new DebugServer(*this));
}
void ServerManager::start(){
//TODO
_run = true;
_queueThread = std::thread(&ServerManager::_queueThreadFunc, this);
for(unsigned int i = 0; i < _servers.size(); i++)
_servers[i]->start();
Message* message = new ControllerMessage(SERVER_MAN, ControllerMessage::NOTIFY_READY);
_centralQueue.giveMessage(*message);
LOG_F(1, "Connection Manager started");
}
void ServerManager::allowConnections(){
//TODO
_allowConnections = true;
LOG_F(2, "Begin accepting connections");
}
void ServerManager::disconnectClients(){
LOG_F(1, "Disconnecting all clients");
//TODO might need locks for array
_allowConnections = false;
for(int i = 0; i < MAX_CONNECTIONS; ++i){
Connection* con = _connections[i];
if(con != NULL){
if(con != nullptr){
con->disconnect();
}
}
}
void ServerManager::_queueThreadFunc(void){
//todo break condition
while(1){
while(_run){
Message* message;
if(_queue.try_dequeue(message)){
assert(message != nullptr);
if(message->destination != SERVER_MAN){
if(message->destination != SERVER_MAN && message->source != CLIENT){
_centralQueue.giveMessage(*message);
}
else{
_handleMessage(*message);
}
}
Connection* newCon = nullptr;
if(_allowConnections && _connectionQueue.try_dequeue(newCon)){
_addConnection(*newCon);
}
}
for(int i = 0; i < _servers.size(); i++)
_servers[i]->stop();
// empty queue
Message* m;
while(_queue.try_dequeue(m))
delete m;
_centralQueue.giveMessage(*(new ControllerMessage(SERVER_MAN, ControllerMessage::NOTIFY_STOPPED)));
LOG_F(1, "Connection Manager stopped");
}
void ServerManager::giveMessage(Message& message){
_queue.enqueue(&message);
}
ServerInterface::RequestResponse ServerManager::requestConnection(Connection& client){
int ServerManager::requestConnection(void* client){
if(_activeConnections < MAX_CONNECTIONS){
_addConnection(client);
_connectionQueue.enqueue(static_cast<Connection*>(client));
return RQ_ACCEPTED;
}
else{
@ -65,58 +94,109 @@ ServerInterface::RequestResponse ServerManager::requestConnection(Connection& cl
}
}
ServerInterface::RequestResponse ServerManager::giveRequest(ClientRequest& request){
if(_connections[request.src->_handle] != request.src){
delete &request;
int ServerManager::giveRequest(void* request){
assert(request != nullptr);
auto rq = static_cast<ClientRequest*>(request);
/*if(_connections[rq->src]._handle != rq->src){
delete &rq;
return RQ_INVALID_HANDLE;
}
else if(request.src->_level < ClientRequest::permissionMap[request.requestType]){
delete &request;
else if(rq->src->_level < Connection::permissionMap[rq->rqInfo.type]){
delete &rq;
return RQ_INSUFFICIENT_PERMISSION;
}
}*/
//todo new message, request passed as data
//unsigned char dest = 0;
Message* message;
switch(request.requestType){
case ClientRequest::SYSTEM_FUNCTION:
//dest = SYSTEM_CONTROL;
switch(rq->rqInfo.type){
case NOTIFY_DISCONNECTED:
delete _connections[rq->source];
break;
case ClientRequest::SCANNER_FUNCTION:
case SYSTEM_FUNCTION:
//dest = SYSTEM_CONTROL;
message = new ControllerMessage(CLIENT, ControllerMessage::CLIENT_REQUEST, rq);
break;
case SCANNER_FUNCTION:
//dest = SCANNER_SM;
message = new ScannerMessage(CLIENT, ScannerMessage::CLIENT_REQUEST, &request);
message = new ScannerMessage(CLIENT, ScannerMessage::CLIENT_REQUEST, rq);
break;
case ClientRequest::DATABASE_RETRIEVE:
case DATABASE_RETRIEVE:
//dest = SYSTEM_CONTROL;
delete rq;
break;
case ClientRequest::DATABASE_MODIFY:
case DATABASE_MODIFY:
//dest = SYSTEM_CONTROL;
delete rq;
break;
case ClientRequest::CONFIG_RETRIEVE:
case CONFIG_RETRIEVE:
//dest = SYSTEM_CONTROL;
delete rq;
break;
case ClientRequest::CONFIG_MODIFY:
case CONFIG_MODIFY:
//dest = SYSTEM_CONTROL;
delete rq;
break;
}
this->giveMessage(*message);
//delete request;
return RQ_ACCEPTED;
return rq->rqHandle;
}
void ServerManager::_handleMessage(Message& message){
//TODO
if(message.source == CLIENT){
if(_allowConnections){
_addConnection(*(static_cast<Connection*>(message.pData)));
}
else {
// connections not yet allowed, put request back on queue
giveMessage(message);
return;
}
}
else{
auto msg = dynamic_cast<ServerMessage&>(message);
switch(msg.type){
case ServerMessage::CONTEXT_UPDATE:
break;
case ServerMessage::NOTIFY_ALL_CLIENTS:
break;
case ServerMessage::NOTIFY_USERS:
break;
case ServerMessage::NOTIFY_VIEWERS:
break;
case ServerMessage::STOP:
disconnectClients();
_run = false;
break;
default:
break;
}
}
delete &message;
}
void ServerManager::_addConnection(Connection& client){
//TODO
for(int i = 0; i < MAX_CONNECTIONS; ++i){
if(_connections[i] == NULL){
if(_connections[i] == nullptr){
LOG_F(1, "Initiating connection with handle %i", i);
_connections[i] = &client;
client._handle = i;
client.serverManager = this;
client._serverManager = this;
client.connect();
break;
}
}
}
/*ConnectionLevel Connection::permissionMap[] = {
[SYSTEM_FUNCTION] = FULL_CONTROL,
[SCANNER_FUNCTION] = FULL_CONTROL,
[DATABASE_RETRIEVE] = VIEWER,
[DATABASE_MODIFY] = FULL_CONTROL,
[CONFIG_RETRIEVE] = VIEWER,
[CONFIG_MODIFY] = FULL_CONTROL,
};*/

View File

@ -11,95 +11,19 @@
#include <vector>
#include "messages.h"
#define HANDLE_NULL -1
#include "request.h"
#include "connection.h"
#include "BackendServer.h"
class Connection;
class ClientRequest;
class ServerInterface;
class ServerManager;
class Connection : public MessageReceiver {
public:
enum ConnectionLevel {
RECEIVE_ONLY,
VIEWER,
FULL_CONTROL,
};
enum AudioReceive {
AUDIO_NONE,
AUDIO_RECEIVE,
};
Connection(ConnectionLevel lvl, AudioReceive aud) :
_level(lvl), _audio(aud), serverManager(nullptr), _handle(HANDLE_NULL) {}
virtual ~Connection() {};
virtual void giveMessage(Message& message) = 0;
virtual void connect() = 0;
virtual void disconnect() = 0;
private:
friend class ServerManager;
int _handle;
ConnectionLevel _level;
AudioReceive _audio;
protected:
ServerInterface* serverManager;
void notifyDisconnected() {
}
};
class ClientRequest : public Message {
public:
enum {
REQUEST,
NOTIFY_DISCONNECTED,
};
enum RequestType {
SYSTEM_FUNCTION = 0,
SCANNER_FUNCTION,
DATABASE_RETRIEVE,
DATABASE_MODIFY,
CONFIG_RETRIEVE,
CONFIG_MODIFY,
};
ClientRequest(Connection& client, unsigned char dst);
~ClientRequest() {};
private:
static Connection::ConnectionLevel permissionMap[];
friend class ServerManager;
Connection* src;
RequestType requestType;
void (*_callback)(void*);
};
class ServerInterface {
public:
virtual ~ServerInterface() {};
enum RequestResponse {
RQ_ACCEPTED,
RQ_DENIED,
RQ_INSUFFICIENT_PERMISSION,
RQ_INVALID_HANDLE,
};
virtual RequestResponse requestConnection(Connection& client) = 0;
virtual RequestResponse giveRequest(ClientRequest& request) = 0;
};
class ServerManager : public MessageReceiver, public ServerInterface {
public:
ServerManager(MessageReceiver& central);
~ServerManager() {};
~ServerManager() {
for(unsigned int i = 0; i < _servers.size(); i++)
delete _servers[i];
_queueThread.join(); };
void start();
void allowConnections();
@ -110,15 +34,20 @@ protected:
private:
MessageReceiver& _centralQueue;
moodycamel::ConcurrentQueue<Message*> _queue;
moodycamel::ReaderWriterQueue<Connection*> _connectionQueue;
int _activeConnections;
std::vector<Connection*> _connections;
std::vector<BackendServer*> _servers;
std::thread _queueThread;
bool _allowConnections = false;
bool _run = false;
void _queueThreadFunc(void);
void _handleMessage(Message& message);
void _addConnection(Connection& client);
RequestResponse requestConnection(Connection& client);
RequestResponse giveRequest(ClientRequest& request);
int requestConnection(void* client);
int giveRequest(void* request);
};

View File

@ -0,0 +1,73 @@
/*
* connection.h
*
* Created on: Mar 1, 2019
* Author: ezra
*/
#ifndef SERVER_CONNECTION_H_
#define SERVER_CONNECTION_H_
#include "constants.h"
#include "messages.h"
#include "request.h"
#define HANDLE_NULL -1
class ServerInterface {
public:
virtual ~ServerInterface() {};
enum RequestResponse {
RQ_ACCEPTED,
RQ_DENIED,
RQ_INSUFFICIENT_PERMISSION,
RQ_INVALID_HANDLE,
};
virtual int requestConnection(void* client) = 0;
virtual int giveRequest(void* request) = 0;
};
class Connection : public MessageReceiver {
public:
enum AudioReceive {
AUDIO_NONE,
AUDIO_RECEIVE,
};
Connection(ConnectionLevel lvl, AudioReceive aud) :
_level(lvl), _audio(aud), _serverManager(nullptr), _handle(HANDLE_NULL) {}
virtual ~Connection() {};
virtual void giveMessage(Message& message) = 0;
virtual void connect() = 0;
virtual void disconnect() = 0;
private:
friend class ServerManager;
friend class ClientRequest;
//static ConnectionLevel permissionMap[];
ConnectionLevel _level;
AudioReceive _audio;
ServerInterface* _serverManager;
int _handle;
protected:
void notifyDisconnected() {
ClientRequest::RequestParams params = {.type = static_cast<RequestType>(ClientRequest::NOTIFY_DISCONNECTED)};
issueRequest(params);
}
int issueRequest(ClientRequest::RequestParams params){
ClientRequest* rq = new ClientRequest(_handle, params, nullptr);
return _serverManager->giveRequest(rq);
}
};
#endif /* SERVER_CONNECTION_H_ */

View File

@ -0,0 +1,46 @@
/*
* request.h
*
* Created on: Mar 1, 2019
* Author: ezra
*/
#ifndef SERVER_REQUEST_H_
#define SERVER_REQUEST_H_
#include "messages.h"
#include "constants.h"
class ServerManager;
class ClientRequest : public Message {
public:
enum {
REQUEST,
NOTIFY_DISCONNECTED,
};
struct RequestParams {
RequestType type;
int subType;
};
ClientRequest(unsigned char handle, RequestParams info, void (*callback)(void*) = 0) :
Message(handle, 0), rqInfo(info), _callback(callback) {}
~ClientRequest() {};
int rqHandle = 0;
RequestParams rqInfo;
void (*_callback)(void*);
private:
friend class ServerManager;
};
#endif /* SERVER_REQUEST_H_ */

View File

@ -10,12 +10,36 @@
#include <ctime>
#include <iostream>
#include <unistd.h>
#include <thread>
#include "rtl_fm.h"
using namespace std;
static float lastval, curval;
static int millis;
void printrssi(){
while(1){
curval = rtl_fm_get_rssi();
//cerr << "rssi:" << curval << "\t\t" << (curval - lastval) << "\r";
if(abs(curval - lastval) > 0.4){
millis += 10;
}
else if(millis > 0){
cerr << curval << " settled after " << millis << "ms\n";
millis = 0;
}
lastval = curval;
usleep(10000);
}
}
int main(int argc, char** argv){
cerr << "Starting rtl_fm test\n";
@ -33,7 +57,7 @@ int main(int argc, char** argv){
timespec_get(&time2, TIME_UTC);
fprintf(stderr, "%li\n", (time2.tv_nsec - time1.tv_nsec)/1000);
usleep(3000000);
//usleep(3000000);
timespec_get(&time1, TIME_UTC);
while(rtl_fm_setfreq(472087500));
timespec_get(&time2, TIME_UTC);
@ -45,13 +69,21 @@ int main(int argc, char** argv){
timespec_get(&time2, TIME_UTC);
fprintf(stderr, "%li\n", (time2.tv_nsec - time1.tv_nsec)/1000);
//thread rssit(printrssi);
int freq = 0;
while(1){
cin >> freq;
if(freq == 0)
break;
else
else{
timespec_get(&time1, TIME_UTC);
while(rtl_fm_setfreq(freq));
timespec_get(&time2, TIME_UTC);
fprintf(stderr, "%li us\n", (time2.tv_nsec - time1.tv_nsec)/1000);
}
usleep(30000);
cerr << "rssi:" << rtl_fm_get_rssi() << "\n";
}
rtl_fm_deinit();