From fefdfc2b0f6d84a5b12c2c5d88ef1a67af0358fe Mon Sep 17 00:00:00 2001 From: ezratl Date: Sat, 2 Nov 2019 12:18:06 -0400 Subject: [PATCH] Refactor interthread communication --- src/core/Configuration.h | 6 +- src/core/PiScan.h | 28 +++++ src/core/ScannerSM.cpp | 24 +++-- src/core/ScannerSM.h | 5 +- src/core/synchronize.h | 45 ++++++++ src/cubic/demod/DemodulatorPreThread.cpp | 4 +- src/piScan_backend.cpp | 127 +++++++++++++++++++++-- src/server/Connection.cpp | 45 +++++--- src/server/ServerManager.cpp | 32 ++++-- src/server/ServerManager.h | 6 +- src/server/connection.h | 5 +- src/sigproc/Demodulator.cpp | 45 +++++++- src/sigproc/Demodulator.h | 34 +++--- 13 files changed, 342 insertions(+), 64 deletions(-) create mode 100644 src/core/PiScan.h create mode 100644 src/core/synchronize.h diff --git a/src/core/Configuration.h b/src/core/Configuration.h index b3be8df..f0bf826 100644 --- a/src/core/Configuration.h +++ b/src/core/Configuration.h @@ -34,12 +34,14 @@ struct GeneralConfig { int logfileVerbosity = DEFAULT_LOGFILE_VERBOSITY; }; -#define DEFAULT_TCP_PORT 1234 -#define MAX_TCP_CONNECTIONS 5 +#define DEFAULT_TCP_PORT 1234 +#define MAX_TCP_CONNECTIONS 5 +#define DEFAULT_SPAWN_CLIENT false struct SocketServerConfig { int tcpPort = DEFAULT_TCP_PORT; int maxConnections = MAX_TCP_CONNECTIONS; + bool spawnLocalClient = DEFAULT_SPAWN_CLIENT; }; #define DEFAULT_SQUELCH 0 diff --git a/src/core/PiScan.h b/src/core/PiScan.h new file mode 100644 index 0000000..b3fd067 --- /dev/null +++ b/src/core/PiScan.h @@ -0,0 +1,28 @@ +#pragma once + +#include + +#include "clientmessage.h" + +namespace piscan { + /* system functions */ + bool stopSystem(); + + /* scanner functions */ + void startScan(); + void holdScan(std::vector index = std::vector()); + void stopScanner(); + void manualEntry(uint32_t* freq); + ScannerContext getScannerContext(); + + /* demod functions */ + void setTunerGain(float gain); + void setDemodSquelch(float level); + DemodContext getDemodContext(); + void audioMute(bool mute = true); + long long getTunerSampleRate(); + + /* server functions */ + void scannerContextUpdate(ScannerContext ctx); + void demodContextUpdate(DemodContext ctx); +} diff --git a/src/core/ScannerSM.cpp b/src/core/ScannerSM.cpp index 78bba31..f1395cd 100644 --- a/src/core/ScannerSM.cpp +++ b/src/core/ScannerSM.cpp @@ -8,6 +8,7 @@ #include #include +#include "PiScan.h" #include "ScannerSM.h" #include "ListGenerator.h" #include "loguru.hpp" @@ -90,9 +91,10 @@ void ScannerSM::ST_Load(EventData* data){ // do not issue event - SM will wait until an event is generated before proceeding //InternalEvent(ST_SCAN); - auto message = std::make_shared(SCANNER_SM, ControllerMessage::NOTIFY_READY); - _centralQueue.giveMessage(message); + //auto message = std::make_shared(SCANNER_SM, ControllerMessage::NOTIFY_READY); + //_centralQueue.giveMessage(message); LOG_F(1, "ScannerSM ready"); + notifyReady(); } void ScannerSM::ST_Scan(EventData* data){ @@ -238,8 +240,6 @@ void ScannerSM::ST_SaveAll(EventData* data){ DLOG_F(9, "ST_SaveAll"); LOG_F(1, "Saving database"); - //TODO scan list isn't modifiable yet - _systems.writeToFile(); InternalEvent(ST_STOPPED); @@ -248,9 +248,10 @@ void ScannerSM::ST_SaveAll(EventData* data){ void ScannerSM::ST_Stopped(EventData* data){ DLOG_F(9, "ST_Stopped"); stop(false); - auto message = std::make_shared(SCANNER_SM, ControllerMessage::NOTIFY_STOPPED); - _centralQueue.giveMessage(message); + //auto message = std::make_shared(SCANNER_SM, ControllerMessage::NOTIFY_STOPPED); + //_centralQueue.giveMessage(message); LOG_F(1, "ScannerSM stopped"); + notifyDeinit(); } void ScannerSM::_broadcastContextUpdate() { @@ -295,9 +296,9 @@ void ScannerSM::_enableAudioOut(bool en){ //TODO temporary //rtl_fm_mute((int)(!en)); - auto message = std::make_shared(SCANNER_SM, DemodMessage::OPEN_AUDIO, (void*) !en); - _centralQueue.giveMessage(message); - + //auto message = std::make_shared(SCANNER_SM, DemodMessage::OPEN_AUDIO, (void*) !en); + //_centralQueue.giveMessage(message); + piscan::audioMute(en); } void ScannerSM::giveMessage(std::shared_ptr message) { @@ -370,3 +371,8 @@ void ScannerSM::_handleRequest(ClientRequest& request) { delete rq; } + +ScannerContext ScannerSM::getCurrentContext(){ + std::unique_lock lock(_contextMutex); + return ScannerContext(_currentContext); +} diff --git a/src/core/ScannerSM.h b/src/core/ScannerSM.h index 558aea4..a4e3d04 100644 --- a/src/core/ScannerSM.h +++ b/src/core/ScannerSM.h @@ -15,12 +15,13 @@ #include "Entry.h" #include "messages.h" #include "clientmessage.h" +#include "synchronize.h" #define SQUELCH_TRIGGER_HITS 25 namespace piscan { -class ScannerSM: public MessageReceiver, public StateMachine { +class ScannerSM: public MessageReceiver, public StateMachine, public Synchronizable { public: ScannerSM(MessageReceiver& central, SystemList& dataSource); ~ScannerSM() {}; @@ -30,6 +31,8 @@ public: void stopScanner(); void manualEntry(uint32_t* freq); void giveMessage(std::shared_ptr message); + + ScannerContext getCurrentContext(); private: void ST_Load(EventData* data); void ST_Scan(EventData* data); diff --git a/src/core/synchronize.h b/src/core/synchronize.h new file mode 100644 index 0000000..3ada4e4 --- /dev/null +++ b/src/core/synchronize.h @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +using namespace std; + +class Synchronizable { + public: + Synchronizable(){}; + virtual ~Synchronizable(){}; + + inline void waitReady(){ + unique_lock lock(_mtx); + while(!_ready) + _cv.wait(lock); + } + + inline void waitDeinit(){ + unique_lock lock(_mtx); + while(!_stopped) + _cv.wait(lock); + } + + protected: + inline void notifyReady(){ + unique_lock lock(_mtx); + _ready = true; + _stopped = false; + _cv.notify_all(); + } + + inline void notifyDeinit(){ + unique_lock lock(_mtx); + _ready = false; + _stopped = true; + _cv.notify_all(); + } + + private: + mutex _mtx; + condition_variable _cv; + bool _ready; + bool _stopped; +}; diff --git a/src/cubic/demod/DemodulatorPreThread.cpp b/src/cubic/demod/DemodulatorPreThread.cpp index 5dee272..071082b 100644 --- a/src/cubic/demod/DemodulatorPreThread.cpp +++ b/src/cubic/demod/DemodulatorPreThread.cpp @@ -312,10 +312,10 @@ void DemodulatorPreThread::setFrequency(long long freq) { frequencyChanged.store(true); newFrequency = freq; - if(!stopping.load()){ + /*if(!stopping.load()){ std::unique_lock lock(_msgMutex); _cv.wait(lock, [this]{return this->_msgAvailable;}); - } + }*/ } long long DemodulatorPreThread::getFrequency() { diff --git a/src/piScan_backend.cpp b/src/piScan_backend.cpp index de81ace..da04351 100644 --- a/src/piScan_backend.cpp +++ b/src/piScan_backend.cpp @@ -6,6 +6,7 @@ #include //using namespace std; +#include "PiScan.h" #include "constants.h" #include "Configuration.h" #include "Demodulator.h" @@ -146,7 +147,7 @@ public: void start(){ LOG_F(1, "System Control start"); _scanner.start(); - _connectionManager.start(); + //_connectionManager.start(); _demod.start(); /* let a stop call break the loop */ @@ -284,6 +285,8 @@ static ServerManager connectionManager(io_service, messageManager); static Demodulator demod(messageManager); static SystemController sysControl(messageManager, scanSystems, scanner, connectionManager, demod); +static std::atomic_bool steadyState(false); + void terminate(){ LOG_F(WARNING, "Terminating - resources may not be properly released"); std::terminate(); @@ -326,6 +329,53 @@ void exit(int code){ std::exit(code); } +bool stopSystem(){ + if(steadyState.load()){ + return true; + } + return false; +} + +void startScan(){ + scanner.startScan(); +} + +void holdScan(std::vector index){ + scanner.holdScan(index); +} + +void stopScanner(){ + scanner.stopScanner(); +} + +void manualEntry(uint32_t* freq){ + scanner.manualEntry(freq); +} + +ScannerContext getScannerContext(){ + return scanner.getCurrentContext(); +} + +void setTunerGain(float gain){ + demod.setTunerGain(gain); +} + +void setDemodSquelch(float level){ + demod.setSquelch(level); +} + +DemodContext getDemodContext(){ + return DemodContext(demod.getTunerGain(), demod.getSquelch()); +} + +void audioMute(bool mute){ + demod.audioMute(mute); +} + +long long getTunerSampleRate() { + return demod.getTunerSampleRate(); +} + } using namespace piscan; @@ -343,8 +393,10 @@ int main(int argc, char **argv) { Configuration& config = Configuration::getConfig(); bool useDebugConsole = false; + int logVerbosity = config.getGeneralConfig().logfileVerbosity; + int c; - while((c = getopt(argc,argv,"d:p:")) != -1){ + while((c = getopt(argc,argv,"dp:f:")) != -1){ switch(c){ case 'd': useDebugConsole = true; @@ -353,6 +405,10 @@ int main(int argc, char **argv) { if(optarg) config.setWorkingPath(std::string(optarg)); break; + case 'f': + if(optarg) + logVerbosity = std::atoi(optarg); + break; } } @@ -360,7 +416,7 @@ int main(int argc, char **argv) { config.loadConfig(); config.loadState(); - loguru::add_file(config.getLogfilePath().c_str(), loguru::Truncate, config.getGeneralConfig().logfileVerbosity); + loguru::add_file(config.getLogfilePath().c_str(), loguru::Truncate, logVerbosity); messageManager.setReceiver(SYSTEM_CONTROL, &sysControl); messageManager.setReceiver(SCANNER_SM, &scanner); @@ -370,25 +426,82 @@ int main(int argc, char **argv) { setDemodulator(&demod); - + //connectionManager.useDebugServer(useDebugConsole); try { messageManager.start(); - sysControl.start(); + //sysControl.start(); + + { + scanner.start(); + connectionManager.start(useDebugConsole); + demod.start(); + + /*while(sysRun){ + //_flagLock.lock(); + if(_activeModules == ALL_FLAG){ + //_flagLock.unlock(); + break; + } + //_flagLock.unlock(); + }*/ + + LOG_F(4, "scanner wait"); + scanner.waitReady(); + LOG_F(4, "server wait"); + connectionManager.waitReady(); + LOG_F(4, "demod wait"); + demod.waitReady(); + + connectionManager.allowConnections(); + scanner.startScan(); + + sysRun = true; + } + ioThread = std::thread(runIO); } catch (std::exception& e) { LOG_F(ERROR, e.what()); goto sysexit; } - + steadyState.store(true); + LOG_F(INFO, "System initialized"); while(sysRun) usleep(100000); sysexit: + steadyState.store(false); try { - sysControl.stop(); + //sysControl.stop(); + + { + LOG_F(INFO, "Stopping system"); + scanner.stopScanner(); + //_centralQueue.giveMessage(std::make_shared(SYSTEM_CONTROL, ServerMessage::STOP, nullptr)); + connectionManager.stop(); + demod.stop(); + + /*while(1){ + //_flagLock.lock(); + if(_activeModules == 0){ + //_flagLock.unlock(); + break; + } + //_flagLock.unlock(); + }*/ + + LOG_F(4, "scanner wait"); + scanner.waitDeinit(); + LOG_F(4, "server wait"); + connectionManager.waitDeinit(); + LOG_F(4, "demod wait"); + demod.waitDeinit(); + + LOG_F(2, "All modules stopped"); + } + messageManager.stop(true); } catch (std::exception& e) { LOG_F(ERROR, e.what()); diff --git a/src/server/Connection.cpp b/src/server/Connection.cpp index d015f3c..b402369 100644 --- a/src/server/Connection.cpp +++ b/src/server/Connection.cpp @@ -5,6 +5,7 @@ * Author: ezra */ +#include "PiScan.h" #include "connection.h" //#include "request.h" @@ -72,40 +73,54 @@ int Connection::systemFunction(SystemFunction function) { }*/ int Connection::scanStart() { - ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_SCAN }; - return issueRequest(params); + //ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_SCAN }; + //return issueRequest(params); + startScan(); + return 0; } int Connection::scanHold() { - ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_HOLD }; - return issueRequest(params); + //ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_HOLD }; + //return issueRequest(params); + holdScan(); + return 0; } int Connection::scanHoldEntry(std::vector index) { - ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_HOLD }; - return issueRequest(params, new std::vector(index)); + //ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_HOLD }; + //return issueRequest(params, new std::vector(index)); + holdScan(index); + return 0; } int Connection::scanManualEntry(long freq, Modulation mode) { - ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_MANUAL }; - return issueRequest(params, new uint32_t(freq)); + //ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_MANUAL }; + //return issueRequest(params, new uint32_t(freq)); + manualEntry(new uint32_t(freq)); + return 0; } int Connection::setDemodSquelch(int level) { - ClientRequest::RequestParams params = { .type = DEMOD_CONFIGURE, .subType = - DEMOD_SET_SQUELCH }; - return issueRequest(params, new int(level)); + //ClientRequest::RequestParams params = { .type = DEMOD_CONFIGURE, .subType = + // DEMOD_SET_SQUELCH }; + //return issueRequest(params, new int(level)); + piscan::setDemodSquelch(level); + return 0; } -int Connection::setDemodGain(int level) { - ClientRequest::RequestParams params = { .type = DEMOD_CONFIGURE, .subType = - DEMOD_SET_GAIN }; - return issueRequest(params, new int(level)); +int Connection::setDemodGain(float level) { + //ClientRequest::RequestParams params = { .type = DEMOD_CONFIGURE, .subType = + // DEMOD_SET_GAIN }; + //return issueRequest(params, new int(level)); + setTunerGain(level); + return 0; } int Connection::getScannerContext() { ClientRequest::RequestParams params = { .type = GET_CONTEXT, .subType = SCANNER_CONTEXT }; return issueRequest(params); + //piscan::getScannerContext(); + //return 0; } int Connection::getDemodContext(){ diff --git a/src/server/ServerManager.cpp b/src/server/ServerManager.cpp index 6cf3af4..85b3f79 100644 --- a/src/server/ServerManager.cpp +++ b/src/server/ServerManager.cpp @@ -34,20 +34,35 @@ static ConnectionLevel const permissionMap[] = { ServerManager::ServerManager(boost::asio::io_service& io_service, MessageReceiver& central) : _io_service(io_service), _centralQueue(central), _queue(QUEUE_SIZE), _activeConnections(0)/*, _connections( MAX_CONNECTIONS)*/ { - _servers.push_back(new DebugServer(*this)); - _servers.push_back(new SocketServer(*this, _io_service)); + } -void ServerManager::start(){ +void ServerManager::start(bool useDebugServer){ + if(useDebugServer) + _servers.push_back(new DebugServer(*this)); + _servers.push_back(new SocketServer(*this, _io_service)); + _run = true; _queueThread = std::thread(&ServerManager::_queueThreadFunc, this); for(unsigned int i = 0; i < _servers.size(); i++) _servers[i]->start(); - auto message = std::make_shared(SERVER_MAN, ControllerMessage::NOTIFY_READY); - _centralQueue.giveMessage(message); + //auto message = std::make_shared(SERVER_MAN, ControllerMessage::NOTIFY_READY); + //_centralQueue.giveMessage(message); LOG_F(1, "Connection Manager started"); + notifyReady(); +} + +void ServerManager::stop() { + disconnectClients(); + _run = false; + std::unique_lock lock(_msgMutex, std::defer_lock); + if (lock.try_lock()) { + _msgAvailable = true; + lock.unlock(); + _cv.notify_one(); + } } void ServerManager::allowConnections(){ @@ -103,8 +118,9 @@ void ServerManager::_queueThreadFunc(void){ std::shared_ptr m; while(_queue.try_dequeue(m)); - _centralQueue.giveMessage(std::make_shared(SERVER_MAN, ControllerMessage::NOTIFY_STOPPED)); + //_centralQueue.giveMessage(std::make_shared(SERVER_MAN, ControllerMessage::NOTIFY_STOPPED)); LOG_F(1, "Connection Manager stopped"); + notifyDeinit(); } void ServerManager::giveMessage(std::shared_ptr message){ @@ -234,7 +250,7 @@ void ServerManager::_addConnection(boost::shared_ptr client){ //TODO for(unsigned int i = 0; i < MAX_CONNECTIONS; ++i){ if(_connections[i] == nullptr){ - LOG_F(1, "Initiating connection with %s", client.get()->identifier().c_str()); + LOG_F(1, "Initiating connection with %s", client->identifier().c_str()); client.get()->_handle = i; @@ -242,7 +258,7 @@ void ServerManager::_addConnection(boost::shared_ptr client){ if(client.get()->connect()){ _connections[i] = client; //_connections.assign(i, client); - LOG_F(INFO, "Client %s connected with handle %i", client.get()->identifier().c_str(), i); + LOG_F(INFO, "Client %s connected with handle %i", client->identifier().c_str(), i); } else{ LOG_F(1, "Connection attempt failed"); diff --git a/src/server/ServerManager.h b/src/server/ServerManager.h index 497ab38..292ad33 100644 --- a/src/server/ServerManager.h +++ b/src/server/ServerManager.h @@ -18,6 +18,7 @@ #include "connection.h" #include "clientmessage.h" #include "BackendServer.h" +#include "synchronize.h" #define MAX_CONNECTIONS 5 @@ -25,7 +26,7 @@ namespace piscan { class Connection; -class ServerManager : public MessageReceiver, public ServerInterface { +class ServerManager : public MessageReceiver, public ServerInterface, public Synchronizable { public: ServerManager(boost::asio::io_service& io_service, MessageReceiver& central); ~ServerManager() { @@ -33,7 +34,8 @@ public: delete _servers[i]; _queueThread.join(); }; - void start(); + void start(bool useDebugServer); + void stop(); void allowConnections(); void disconnectClients(); void giveMessage(std::shared_ptr message); diff --git a/src/server/connection.h b/src/server/connection.h index 7f34bfa..f7caacb 100644 --- a/src/server/connection.h +++ b/src/server/connection.h @@ -110,10 +110,13 @@ protected: int scanHoldEntry(std::vector index); int scanManualEntry(long freq, Modulation mode = FM); int setDemodSquelch(int level); - int setDemodGain(int level); + int setDemodGain(float level); int getScannerContext(); int getDemodContext(); }; + +typedef boost::shared_ptr ConnectionPtr; + } #endif /* SERVER_CONNECTION_H_ */ diff --git a/src/sigproc/Demodulator.cpp b/src/sigproc/Demodulator.cpp index 80d6cd4..e6e7da8 100644 --- a/src/sigproc/Demodulator.cpp +++ b/src/sigproc/Demodulator.cpp @@ -142,9 +142,10 @@ void Demodulator::start(){ //setModem(NFM); _demodMgr.setActiveDemodulator(_demods[NFM], false); - auto message = std::make_shared(DEMOD, ControllerMessage::NOTIFY_READY); - _centralQueue.giveMessage(message); + //auto message = std::make_shared(DEMOD, ControllerMessage::NOTIFY_READY); + //_centralQueue.giveMessage(message); LOG_F(1, "Demodulator started"); + notifyReady(); } void Demodulator::stop(){ @@ -155,9 +156,10 @@ void Demodulator::stop(){ state.gain = _gain; state.squelch = _squelchLevel; - auto message = std::make_shared(DEMOD, ControllerMessage::NOTIFY_STOPPED); - _centralQueue.giveMessage(message); + //auto message = std::make_shared(DEMOD, ControllerMessage::NOTIFY_STOPPED); + //_centralQueue.giveMessage(message); LOG_F(1, "Demodulator stopped"); + notifyDeinit(); } bool Demodulator::setFrequency(long long freq) { @@ -216,6 +218,8 @@ bool Demodulator::setModem(Modulation mode) { void Demodulator::setSquelch(float level) { _squelchLevel = level; + LOG_F(1, "Squelch set to %.1lf", level); + _contextUpdate(); } float Demodulator::getSNR() { @@ -290,3 +294,36 @@ void Demodulator::_contextUpdate(){ DemodContext* context = new DemodContext(_gain, _squelchLevel); _centralQueue.giveMessage(std::make_shared(DEMOD, ServerMessage::CONTEXT_UPDATE, context)); } + +void Demodulator::setTunerGain(float gain){ + if (gain < 0) + { + _cubic->setAGCMode(true); + _gain = AUTO_GAIN; + LOG_F(1, "Gain set to auto"); + } + else + { + _cubic->setAGCMode(false); + _cubic->setGain("TUNER", gain); + _gain = _cubic->getGain("TUNER"); + LOG_F(1, "Gain set to %.1lf", _cubic->getGain("TUNER")); + } + _contextUpdate(); +} + +float Demodulator::getTunerGain(){ + return _gain; +} + +float Demodulator::getSquelch(){ + return _squelchLevel; +} + +void Demodulator::audioMute(bool mute){ + _demodMgr.getCurrentModem()->setMuted(!mute); +} + +long long Demodulator::getTunerSampleRate(){ + return _cubic->getSampleRate(); +} diff --git a/src/sigproc/Demodulator.h b/src/sigproc/Demodulator.h index 6891ef3..d37493b 100644 --- a/src/sigproc/Demodulator.h +++ b/src/sigproc/Demodulator.h @@ -15,6 +15,7 @@ #include "DemodulatorMgr.h" #include "SDRDeviceInfo.h" #include "Configuration.h" +#include "synchronize.h" namespace piscan { @@ -41,7 +42,7 @@ public: virtual int getSignalStrength() = 0; }; -class Demodulator : public MessageReceiver, public DemodInterface { +class Demodulator : public MessageReceiver, public DemodInterface, public Synchronizable { public: Demodulator(MessageReceiver& central); ~Demodulator() { @@ -50,18 +51,6 @@ public: void start(); void stop(); -private: - MessageReceiver& _centralQueue; - Modulation _currentModem = NFM; - float _squelchLevel = DEFAULT_SQUELCH; - long long _currentFreq = 0; - float _gain = AUTO_GAIN; - - std::shared_ptr _cubic; - DemodulatorMgr& _demodMgr; - - std::map _demods; - void giveMessage(std::shared_ptr message); bool setFrequency(long long freq); bool setTunerFrequency(long long freq); @@ -73,6 +62,25 @@ private: void setSquelch(float level); float getSNR(); int getSignalStrength(); + void setTunerGain(float gain); + float getTunerGain(); + float getSquelch(); + void audioMute(bool mute); + long long getTunerSampleRate(); + +private: + MessageReceiver& _centralQueue; + Modulation _currentModem = NFM; + float _squelchLevel = DEFAULT_SQUELCH; + long long _currentFreq = 0; + std::atomic _gain; + + std::shared_ptr _cubic; + DemodulatorMgr& _demodMgr; + + std::map _demods; + + void _handleMessage(std::shared_ptr message); void _handleRequest(ClientRequest& request);