Refactor interthread communication

This commit is contained in:
ezratl 2019-11-02 12:18:06 -04:00
parent 93b94dbcb8
commit fefdfc2b0f
13 changed files with 342 additions and 64 deletions

View File

@ -34,12 +34,14 @@ struct GeneralConfig {
int logfileVerbosity = DEFAULT_LOGFILE_VERBOSITY;
};
#define DEFAULT_TCP_PORT 1234
#define MAX_TCP_CONNECTIONS 5
#define DEFAULT_TCP_PORT 1234
#define MAX_TCP_CONNECTIONS 5
#define DEFAULT_SPAWN_CLIENT false
struct SocketServerConfig {
int tcpPort = DEFAULT_TCP_PORT;
int maxConnections = MAX_TCP_CONNECTIONS;
bool spawnLocalClient = DEFAULT_SPAWN_CLIENT;
};
#define DEFAULT_SQUELCH 0

28
src/core/PiScan.h Normal file
View File

@ -0,0 +1,28 @@
#pragma once
#include <vector>
#include "clientmessage.h"
namespace piscan {
/* system functions */
bool stopSystem();
/* scanner functions */
void startScan();
void holdScan(std::vector<int> index = std::vector<int>());
void stopScanner();
void manualEntry(uint32_t* freq);
ScannerContext getScannerContext();
/* demod functions */
void setTunerGain(float gain);
void setDemodSquelch(float level);
DemodContext getDemodContext();
void audioMute(bool mute = true);
long long getTunerSampleRate();
/* server functions */
void scannerContextUpdate(ScannerContext ctx);
void demodContextUpdate(DemodContext ctx);
}

View File

@ -8,6 +8,7 @@
#include <unistd.h>
#include <mutex>
#include "PiScan.h"
#include "ScannerSM.h"
#include "ListGenerator.h"
#include "loguru.hpp"
@ -90,9 +91,10 @@ void ScannerSM::ST_Load(EventData* data){
// do not issue event - SM will wait until an event is generated before proceeding
//InternalEvent(ST_SCAN);
auto message = std::make_shared<ControllerMessage>(SCANNER_SM, ControllerMessage::NOTIFY_READY);
_centralQueue.giveMessage(message);
//auto message = std::make_shared<ControllerMessage>(SCANNER_SM, ControllerMessage::NOTIFY_READY);
//_centralQueue.giveMessage(message);
LOG_F(1, "ScannerSM ready");
notifyReady();
}
void ScannerSM::ST_Scan(EventData* data){
@ -238,8 +240,6 @@ void ScannerSM::ST_SaveAll(EventData* data){
DLOG_F(9, "ST_SaveAll");
LOG_F(1, "Saving database");
//TODO scan list isn't modifiable yet
_systems.writeToFile();
InternalEvent(ST_STOPPED);
@ -248,9 +248,10 @@ void ScannerSM::ST_SaveAll(EventData* data){
void ScannerSM::ST_Stopped(EventData* data){
DLOG_F(9, "ST_Stopped");
stop(false);
auto message = std::make_shared<ControllerMessage>(SCANNER_SM, ControllerMessage::NOTIFY_STOPPED);
_centralQueue.giveMessage(message);
//auto message = std::make_shared<ControllerMessage>(SCANNER_SM, ControllerMessage::NOTIFY_STOPPED);
//_centralQueue.giveMessage(message);
LOG_F(1, "ScannerSM stopped");
notifyDeinit();
}
void ScannerSM::_broadcastContextUpdate() {
@ -295,9 +296,9 @@ void ScannerSM::_enableAudioOut(bool en){
//TODO temporary
//rtl_fm_mute((int)(!en));
auto message = std::make_shared<DemodMessage>(SCANNER_SM, DemodMessage::OPEN_AUDIO, (void*) !en);
_centralQueue.giveMessage(message);
//auto message = std::make_shared<DemodMessage>(SCANNER_SM, DemodMessage::OPEN_AUDIO, (void*) !en);
//_centralQueue.giveMessage(message);
piscan::audioMute(en);
}
void ScannerSM::giveMessage(std::shared_ptr<Message> message) {
@ -370,3 +371,8 @@ void ScannerSM::_handleRequest(ClientRequest& request) {
delete rq;
}
ScannerContext ScannerSM::getCurrentContext(){
std::unique_lock<std::mutex> lock(_contextMutex);
return ScannerContext(_currentContext);
}

View File

@ -15,12 +15,13 @@
#include "Entry.h"
#include "messages.h"
#include "clientmessage.h"
#include "synchronize.h"
#define SQUELCH_TRIGGER_HITS 25
namespace piscan {
class ScannerSM: public MessageReceiver, public StateMachine {
class ScannerSM: public MessageReceiver, public StateMachine, public Synchronizable {
public:
ScannerSM(MessageReceiver& central, SystemList& dataSource);
~ScannerSM() {};
@ -30,6 +31,8 @@ public:
void stopScanner();
void manualEntry(uint32_t* freq);
void giveMessage(std::shared_ptr<Message> message);
ScannerContext getCurrentContext();
private:
void ST_Load(EventData* data);
void ST_Scan(EventData* data);

45
src/core/synchronize.h Normal file
View File

@ -0,0 +1,45 @@
#pragma once
#include <mutex>
#include <condition_variable>
using namespace std;
class Synchronizable {
public:
Synchronizable(){};
virtual ~Synchronizable(){};
inline void waitReady(){
unique_lock<mutex> lock(_mtx);
while(!_ready)
_cv.wait(lock);
}
inline void waitDeinit(){
unique_lock<mutex> lock(_mtx);
while(!_stopped)
_cv.wait(lock);
}
protected:
inline void notifyReady(){
unique_lock<mutex> lock(_mtx);
_ready = true;
_stopped = false;
_cv.notify_all();
}
inline void notifyDeinit(){
unique_lock<mutex> lock(_mtx);
_ready = false;
_stopped = true;
_cv.notify_all();
}
private:
mutex _mtx;
condition_variable _cv;
bool _ready;
bool _stopped;
};

View File

@ -312,10 +312,10 @@ void DemodulatorPreThread::setFrequency(long long freq) {
frequencyChanged.store(true);
newFrequency = freq;
if(!stopping.load()){
/*if(!stopping.load()){
std::unique_lock<std::mutex> lock(_msgMutex);
_cv.wait(lock, [this]{return this->_msgAvailable;});
}
}*/
}
long long DemodulatorPreThread::getFrequency() {

View File

@ -6,6 +6,7 @@
#include <boost/asio.hpp>
//using namespace std;
#include "PiScan.h"
#include "constants.h"
#include "Configuration.h"
#include "Demodulator.h"
@ -146,7 +147,7 @@ public:
void start(){
LOG_F(1, "System Control start");
_scanner.start();
_connectionManager.start();
//_connectionManager.start();
_demod.start();
/* let a stop call break the loop */
@ -284,6 +285,8 @@ static ServerManager connectionManager(io_service, messageManager);
static Demodulator demod(messageManager);
static SystemController sysControl(messageManager, scanSystems, scanner, connectionManager, demod);
static std::atomic_bool steadyState(false);
void terminate(){
LOG_F(WARNING, "Terminating - resources may not be properly released");
std::terminate();
@ -326,6 +329,53 @@ void exit(int code){
std::exit(code);
}
bool stopSystem(){
if(steadyState.load()){
return true;
}
return false;
}
void startScan(){
scanner.startScan();
}
void holdScan(std::vector<int> index){
scanner.holdScan(index);
}
void stopScanner(){
scanner.stopScanner();
}
void manualEntry(uint32_t* freq){
scanner.manualEntry(freq);
}
ScannerContext getScannerContext(){
return scanner.getCurrentContext();
}
void setTunerGain(float gain){
demod.setTunerGain(gain);
}
void setDemodSquelch(float level){
demod.setSquelch(level);
}
DemodContext getDemodContext(){
return DemodContext(demod.getTunerGain(), demod.getSquelch());
}
void audioMute(bool mute){
demod.audioMute(mute);
}
long long getTunerSampleRate() {
return demod.getTunerSampleRate();
}
}
using namespace piscan;
@ -343,8 +393,10 @@ int main(int argc, char **argv) {
Configuration& config = Configuration::getConfig();
bool useDebugConsole = false;
int logVerbosity = config.getGeneralConfig().logfileVerbosity;
int c;
while((c = getopt(argc,argv,"d:p:")) != -1){
while((c = getopt(argc,argv,"dp:f:")) != -1){
switch(c){
case 'd':
useDebugConsole = true;
@ -353,6 +405,10 @@ int main(int argc, char **argv) {
if(optarg)
config.setWorkingPath(std::string(optarg));
break;
case 'f':
if(optarg)
logVerbosity = std::atoi(optarg);
break;
}
}
@ -360,7 +416,7 @@ int main(int argc, char **argv) {
config.loadConfig();
config.loadState();
loguru::add_file(config.getLogfilePath().c_str(), loguru::Truncate, config.getGeneralConfig().logfileVerbosity);
loguru::add_file(config.getLogfilePath().c_str(), loguru::Truncate, logVerbosity);
messageManager.setReceiver(SYSTEM_CONTROL, &sysControl);
messageManager.setReceiver(SCANNER_SM, &scanner);
@ -370,25 +426,82 @@ int main(int argc, char **argv) {
setDemodulator(&demod);
//connectionManager.useDebugServer(useDebugConsole);
try {
messageManager.start();
sysControl.start();
//sysControl.start();
{
scanner.start();
connectionManager.start(useDebugConsole);
demod.start();
/*while(sysRun){
//_flagLock.lock();
if(_activeModules == ALL_FLAG){
//_flagLock.unlock();
break;
}
//_flagLock.unlock();
}*/
LOG_F(4, "scanner wait");
scanner.waitReady();
LOG_F(4, "server wait");
connectionManager.waitReady();
LOG_F(4, "demod wait");
demod.waitReady();
connectionManager.allowConnections();
scanner.startScan();
sysRun = true;
}
ioThread = std::thread(runIO);
} catch (std::exception& e) {
LOG_F(ERROR, e.what());
goto sysexit;
}
steadyState.store(true);
LOG_F(INFO, "System initialized");
while(sysRun)
usleep(100000);
sysexit:
steadyState.store(false);
try {
sysControl.stop();
//sysControl.stop();
{
LOG_F(INFO, "Stopping system");
scanner.stopScanner();
//_centralQueue.giveMessage(std::make_shared<ServerMessage>(SYSTEM_CONTROL, ServerMessage::STOP, nullptr));
connectionManager.stop();
demod.stop();
/*while(1){
//_flagLock.lock();
if(_activeModules == 0){
//_flagLock.unlock();
break;
}
//_flagLock.unlock();
}*/
LOG_F(4, "scanner wait");
scanner.waitDeinit();
LOG_F(4, "server wait");
connectionManager.waitDeinit();
LOG_F(4, "demod wait");
demod.waitDeinit();
LOG_F(2, "All modules stopped");
}
messageManager.stop(true);
} catch (std::exception& e) {
LOG_F(ERROR, e.what());

View File

@ -5,6 +5,7 @@
* Author: ezra
*/
#include "PiScan.h"
#include "connection.h"
//#include "request.h"
@ -72,40 +73,54 @@ int Connection::systemFunction(SystemFunction function) {
}*/
int Connection::scanStart() {
ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_SCAN };
return issueRequest(params);
//ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_SCAN };
//return issueRequest(params);
startScan();
return 0;
}
int Connection::scanHold() {
ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_HOLD };
return issueRequest(params);
//ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_HOLD };
//return issueRequest(params);
holdScan();
return 0;
}
int Connection::scanHoldEntry(std::vector<int> index) {
ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_HOLD };
return issueRequest(params, new std::vector<int>(index));
//ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_HOLD };
//return issueRequest(params, new std::vector<int>(index));
holdScan(index);
return 0;
}
int Connection::scanManualEntry(long freq, Modulation mode) {
ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_MANUAL };
return issueRequest(params, new uint32_t(freq));
//ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_MANUAL };
//return issueRequest(params, new uint32_t(freq));
manualEntry(new uint32_t(freq));
return 0;
}
int Connection::setDemodSquelch(int level) {
ClientRequest::RequestParams params = { .type = DEMOD_CONFIGURE, .subType =
DEMOD_SET_SQUELCH };
return issueRequest(params, new int(level));
//ClientRequest::RequestParams params = { .type = DEMOD_CONFIGURE, .subType =
// DEMOD_SET_SQUELCH };
//return issueRequest(params, new int(level));
piscan::setDemodSquelch(level);
return 0;
}
int Connection::setDemodGain(int level) {
ClientRequest::RequestParams params = { .type = DEMOD_CONFIGURE, .subType =
DEMOD_SET_GAIN };
return issueRequest(params, new int(level));
int Connection::setDemodGain(float level) {
//ClientRequest::RequestParams params = { .type = DEMOD_CONFIGURE, .subType =
// DEMOD_SET_GAIN };
//return issueRequest(params, new int(level));
setTunerGain(level);
return 0;
}
int Connection::getScannerContext() {
ClientRequest::RequestParams params = { .type = GET_CONTEXT, .subType = SCANNER_CONTEXT };
return issueRequest(params);
//piscan::getScannerContext();
//return 0;
}
int Connection::getDemodContext(){

View File

@ -34,20 +34,35 @@ static ConnectionLevel const permissionMap[] = {
ServerManager::ServerManager(boost::asio::io_service& io_service, MessageReceiver& central) : _io_service(io_service),
_centralQueue(central), _queue(QUEUE_SIZE), _activeConnections(0)/*, _connections(
MAX_CONNECTIONS)*/ {
_servers.push_back(new DebugServer(*this));
_servers.push_back(new SocketServer(*this, _io_service));
}
void ServerManager::start(){
void ServerManager::start(bool useDebugServer){
if(useDebugServer)
_servers.push_back(new DebugServer(*this));
_servers.push_back(new SocketServer(*this, _io_service));
_run = true;
_queueThread = std::thread(&ServerManager::_queueThreadFunc, this);
for(unsigned int i = 0; i < _servers.size(); i++)
_servers[i]->start();
auto message = std::make_shared<ControllerMessage>(SERVER_MAN, ControllerMessage::NOTIFY_READY);
_centralQueue.giveMessage(message);
//auto message = std::make_shared<ControllerMessage>(SERVER_MAN, ControllerMessage::NOTIFY_READY);
//_centralQueue.giveMessage(message);
LOG_F(1, "Connection Manager started");
notifyReady();
}
void ServerManager::stop() {
disconnectClients();
_run = false;
std::unique_lock<std::mutex> lock(_msgMutex, std::defer_lock);
if (lock.try_lock()) {
_msgAvailable = true;
lock.unlock();
_cv.notify_one();
}
}
void ServerManager::allowConnections(){
@ -103,8 +118,9 @@ void ServerManager::_queueThreadFunc(void){
std::shared_ptr<Message> m;
while(_queue.try_dequeue(m));
_centralQueue.giveMessage(std::make_shared<ControllerMessage>(SERVER_MAN, ControllerMessage::NOTIFY_STOPPED));
//_centralQueue.giveMessage(std::make_shared<ControllerMessage>(SERVER_MAN, ControllerMessage::NOTIFY_STOPPED));
LOG_F(1, "Connection Manager stopped");
notifyDeinit();
}
void ServerManager::giveMessage(std::shared_ptr<Message> message){
@ -234,7 +250,7 @@ void ServerManager::_addConnection(boost::shared_ptr<Connection> client){
//TODO
for(unsigned int i = 0; i < MAX_CONNECTIONS; ++i){
if(_connections[i] == nullptr){
LOG_F(1, "Initiating connection with %s", client.get()->identifier().c_str());
LOG_F(1, "Initiating connection with %s", client->identifier().c_str());
client.get()->_handle = i;
@ -242,7 +258,7 @@ void ServerManager::_addConnection(boost::shared_ptr<Connection> client){
if(client.get()->connect()){
_connections[i] = client;
//_connections.assign(i, client);
LOG_F(INFO, "Client %s connected with handle %i", client.get()->identifier().c_str(), i);
LOG_F(INFO, "Client %s connected with handle %i", client->identifier().c_str(), i);
}
else{
LOG_F(1, "Connection attempt failed");

View File

@ -18,6 +18,7 @@
#include "connection.h"
#include "clientmessage.h"
#include "BackendServer.h"
#include "synchronize.h"
#define MAX_CONNECTIONS 5
@ -25,7 +26,7 @@ namespace piscan {
class Connection;
class ServerManager : public MessageReceiver, public ServerInterface {
class ServerManager : public MessageReceiver, public ServerInterface, public Synchronizable {
public:
ServerManager(boost::asio::io_service& io_service, MessageReceiver& central);
~ServerManager() {
@ -33,7 +34,8 @@ public:
delete _servers[i];
_queueThread.join(); };
void start();
void start(bool useDebugServer);
void stop();
void allowConnections();
void disconnectClients();
void giveMessage(std::shared_ptr<Message> message);

View File

@ -110,10 +110,13 @@ protected:
int scanHoldEntry(std::vector<int> index);
int scanManualEntry(long freq, Modulation mode = FM);
int setDemodSquelch(int level);
int setDemodGain(int level);
int setDemodGain(float level);
int getScannerContext();
int getDemodContext();
};
typedef boost::shared_ptr<Connection> ConnectionPtr;
}
#endif /* SERVER_CONNECTION_H_ */

View File

@ -142,9 +142,10 @@ void Demodulator::start(){
//setModem(NFM);
_demodMgr.setActiveDemodulator(_demods[NFM], false);
auto message = std::make_shared<ControllerMessage>(DEMOD, ControllerMessage::NOTIFY_READY);
_centralQueue.giveMessage(message);
//auto message = std::make_shared<ControllerMessage>(DEMOD, ControllerMessage::NOTIFY_READY);
//_centralQueue.giveMessage(message);
LOG_F(1, "Demodulator started");
notifyReady();
}
void Demodulator::stop(){
@ -155,9 +156,10 @@ void Demodulator::stop(){
state.gain = _gain;
state.squelch = _squelchLevel;
auto message = std::make_shared<ControllerMessage>(DEMOD, ControllerMessage::NOTIFY_STOPPED);
_centralQueue.giveMessage(message);
//auto message = std::make_shared<ControllerMessage>(DEMOD, ControllerMessage::NOTIFY_STOPPED);
//_centralQueue.giveMessage(message);
LOG_F(1, "Demodulator stopped");
notifyDeinit();
}
bool Demodulator::setFrequency(long long freq) {
@ -216,6 +218,8 @@ bool Demodulator::setModem(Modulation mode) {
void Demodulator::setSquelch(float level) {
_squelchLevel = level;
LOG_F(1, "Squelch set to %.1lf", level);
_contextUpdate();
}
float Demodulator::getSNR() {
@ -290,3 +294,36 @@ void Demodulator::_contextUpdate(){
DemodContext* context = new DemodContext(_gain, _squelchLevel);
_centralQueue.giveMessage(std::make_shared<ServerMessage>(DEMOD, ServerMessage::CONTEXT_UPDATE, context));
}
void Demodulator::setTunerGain(float gain){
if (gain < 0)
{
_cubic->setAGCMode(true);
_gain = AUTO_GAIN;
LOG_F(1, "Gain set to auto");
}
else
{
_cubic->setAGCMode(false);
_cubic->setGain("TUNER", gain);
_gain = _cubic->getGain("TUNER");
LOG_F(1, "Gain set to %.1lf", _cubic->getGain("TUNER"));
}
_contextUpdate();
}
float Demodulator::getTunerGain(){
return _gain;
}
float Demodulator::getSquelch(){
return _squelchLevel;
}
void Demodulator::audioMute(bool mute){
_demodMgr.getCurrentModem()->setMuted(!mute);
}
long long Demodulator::getTunerSampleRate(){
return _cubic->getSampleRate();
}

View File

@ -15,6 +15,7 @@
#include "DemodulatorMgr.h"
#include "SDRDeviceInfo.h"
#include "Configuration.h"
#include "synchronize.h"
namespace piscan {
@ -41,7 +42,7 @@ public:
virtual int getSignalStrength() = 0;
};
class Demodulator : public MessageReceiver, public DemodInterface {
class Demodulator : public MessageReceiver, public DemodInterface, public Synchronizable {
public:
Demodulator(MessageReceiver& central);
~Demodulator() {
@ -50,18 +51,6 @@ public:
void start();
void stop();
private:
MessageReceiver& _centralQueue;
Modulation _currentModem = NFM;
float _squelchLevel = DEFAULT_SQUELCH;
long long _currentFreq = 0;
float _gain = AUTO_GAIN;
std::shared_ptr<CubicSDR> _cubic;
DemodulatorMgr& _demodMgr;
std::map<Modulation, DemodulatorInstancePtr> _demods;
void giveMessage(std::shared_ptr<Message> message);
bool setFrequency(long long freq);
bool setTunerFrequency(long long freq);
@ -73,6 +62,25 @@ private:
void setSquelch(float level);
float getSNR();
int getSignalStrength();
void setTunerGain(float gain);
float getTunerGain();
float getSquelch();
void audioMute(bool mute);
long long getTunerSampleRate();
private:
MessageReceiver& _centralQueue;
Modulation _currentModem = NFM;
float _squelchLevel = DEFAULT_SQUELCH;
long long _currentFreq = 0;
std::atomic<float> _gain;
std::shared_ptr<CubicSDR> _cubic;
DemodulatorMgr& _demodMgr;
std::map<Modulation, DemodulatorInstancePtr> _demods;
void _handleMessage(std::shared_ptr<DemodMessage> message);
void _handleRequest(ClientRequest& request);