Multithreading fixes and rtl_fm integration

This commit is contained in:
ezratl 2019-03-05 00:12:39 -05:00
parent 6ebeab508d
commit a4eeb6053e
24 changed files with 12173 additions and 129 deletions

View File

@ -19,6 +19,6 @@ add_subdirectory(server)
add_subdirectory(sigproc)
add_subdirectory(tests)
target_link_libraries(piScan_backend core drivers external scan server sigproc pthread dl)
target_link_libraries(piScan_backend core drivers external scan server sigproc pthread dl rtlsdr)

View File

@ -3,8 +3,8 @@ add_library(core
Configuration.h
constants.h
messages.h
ScannerStateMachine.cpp
ScannerStateMachine.h
ScannerSM.cpp
ScannerSM.h
StateMachine.cpp
StateMachine.h
)

View File

@ -5,12 +5,12 @@
* Author: ezra
*/
#include <ScannerSM.h>
#include "loguru.hpp"
#include "ScannerStateMachine.h"
#define DELAY_TIMEOUT 2.0
ScannerStateMachine::ScannerStateMachine(MessageReceiver& central, SystemList& dataSource) :
ScannerSM::ScannerSM(MessageReceiver& central, SystemList& dataSource) :
StateMachine(7), _centralQueue(central), _systems(dataSource), _currentSystem(nullptr), _currentEntry(nullptr) {
}
@ -23,7 +23,7 @@ ScannerStateMachine::ScannerStateMachine(MessageReceiver& central, SystemList& d
// ST_STOPPED,
// };
void ScannerStateMachine::startScan(){
void ScannerSM::startScan(){
LOG_F(1, "ExtEvent: startScan");
BEGIN_TRANSITION_MAP
TRANSITION_MAP_ENTRY(ST_SCAN)
@ -36,7 +36,7 @@ void ScannerStateMachine::startScan(){
END_TRANSITION_MAP(NULL)
}
void ScannerStateMachine::holdScan(){
void ScannerSM::holdScan(){
_externalHold = true;
LOG_F(1, "ExtEvent: holdScan");
BEGIN_TRANSITION_MAP
@ -50,7 +50,7 @@ void ScannerStateMachine::holdScan(){
END_TRANSITION_MAP(NULL)
}
void ScannerStateMachine::stopScanner(){
void ScannerSM::stopScanner(){
LOG_F(1, "ExtEvent: stopScanner");
BEGIN_TRANSITION_MAP
TRANSITION_MAP_ENTRY(ST_SAVEALL)
@ -63,7 +63,8 @@ void ScannerStateMachine::stopScanner(){
END_TRANSITION_MAP(NULL)
}
void ScannerStateMachine::ST_Load(EventData* data){
void ScannerSM::ST_Load(EventData* data){
DLOG_F(9, "ST_Load");
//file read and system tree population
// do not issue event - SM will wait until an event is generated before proceeding
@ -73,7 +74,8 @@ void ScannerStateMachine::ST_Load(EventData* data){
LOG_F(1, "ScannerSM ready");
}
void ScannerStateMachine::ST_Scan(EventData* data){
void ScannerSM::ST_Scan(EventData* data){
DLOG_F(9, "ST_Scan");
_enableAudioOut(false);
// incremental scan pattern
@ -98,7 +100,8 @@ void ScannerStateMachine::ST_Scan(EventData* data){
}
void ScannerStateMachine::ST_Hold(EventData* data){
void ScannerSM::ST_Hold(EventData* data){
DLOG_F(9, "ST_Hold");
_enableAudioOut(false);
_broadcastEntryContext(_currentSystem, _currentEntry);
@ -129,7 +132,8 @@ void ScannerStateMachine::ST_Hold(EventData* data){
}
void ScannerStateMachine::ST_Receive(EventData* data){
void ScannerSM::ST_Receive(EventData* data){
DLOG_F(9, "ST_Receive");
_enableAudioOut(true);
_broadcastEntryContext(_currentSystem, _currentEntry);
@ -143,23 +147,26 @@ void ScannerStateMachine::ST_Receive(EventData* data){
}
}
void ScannerStateMachine::ST_Manual(EventData* data){
void ScannerSM::ST_Manual(EventData* data){
//TODO state for later implementation
}
void ScannerStateMachine::ST_SaveAll(EventData* data){
void ScannerSM::ST_SaveAll(EventData* data){
DLOG_F(9, "ST_SaveAll");
LOG_F(1, "Saving database");
InternalEvent(ST_STOPPED);
}
void ScannerStateMachine::ST_Stopped(EventData* data){
void ScannerSM::ST_Stopped(EventData* data){
DLOG_F(9, "ST_Stopped");
stop(false);
Message* message = new ControllerMessage(SCANNER_SM, ControllerMessage::NOTIFY_STOPPED);
_centralQueue.giveMessage(*message);
LOG_F(1, "ScannerSM stopped");
}
void ScannerStateMachine::_broadcastSystemContext(RadioSystem* sys){
void ScannerSM::_broadcastSystemContext(RadioSystem* sys){
assert(sys != NULL);
//TODO not thread safe
_currentContext.state = static_cast<States>(currentState);
@ -169,7 +176,7 @@ void ScannerStateMachine::_broadcastSystemContext(RadioSystem* sys){
_centralQueue.giveMessage(*message);
}
void ScannerStateMachine::_broadcastEntryContext(RadioSystem* sys, Entry* entry){
void ScannerSM::_broadcastEntryContext(RadioSystem* sys, Entry* entry){
assert(sys != NULL);
assert(entry != NULL);
//TODO not thread safe
@ -180,7 +187,7 @@ void ScannerStateMachine::_broadcastEntryContext(RadioSystem* sys, Entry* entry)
_centralQueue.giveMessage(*message);
}
void ScannerStateMachine::_enableAudioOut(bool en){
void ScannerSM::_enableAudioOut(bool en){
Message* message;
if(en){
message = new AudioMessage(SCANNER_SM, AudioMessage::ENABLE_OUTPUT);
@ -191,7 +198,7 @@ void ScannerStateMachine::_enableAudioOut(bool en){
_centralQueue.giveMessage(*message);
}
void ScannerStateMachine::giveMessage(Message& message) {
void ScannerSM::giveMessage(Message& message) {
//assert(message != NULL);
auto msg = dynamic_cast<ScannerMessage&>(message);

View File

@ -16,10 +16,10 @@
#include "messages.h"
class ScannerStateMachine: public MessageReceiver, public StateMachine {
class ScannerSM: public MessageReceiver, public StateMachine {
public:
ScannerStateMachine(MessageReceiver& central, SystemList& dataSource);
~ScannerStateMachine() {};
ScannerSM(MessageReceiver& central, SystemList& dataSource);
~ScannerSM() {};
void startScan();
void holdScan();
@ -35,13 +35,13 @@ private:
void ST_Stopped(EventData* data);
BEGIN_STATE_MAP
STATE_MAP_ENTRY(&ScannerStateMachine::ST_Load)
STATE_MAP_ENTRY(&ScannerStateMachine::ST_Scan)
STATE_MAP_ENTRY(&ScannerStateMachine::ST_Hold)
STATE_MAP_ENTRY(&ScannerStateMachine::ST_Receive)
STATE_MAP_ENTRY(&ScannerStateMachine::ST_Manual)
STATE_MAP_ENTRY(&ScannerStateMachine::ST_SaveAll)
STATE_MAP_ENTRY(&ScannerStateMachine::ST_Stopped)
STATE_MAP_ENTRY(&ScannerSM::ST_Load)
STATE_MAP_ENTRY(&ScannerSM::ST_Scan)
STATE_MAP_ENTRY(&ScannerSM::ST_Hold)
STATE_MAP_ENTRY(&ScannerSM::ST_Receive)
STATE_MAP_ENTRY(&ScannerSM::ST_Manual)
STATE_MAP_ENTRY(&ScannerSM::ST_SaveAll)
STATE_MAP_ENTRY(&ScannerSM::ST_Stopped)
END_STATE_MAP
enum States {

View File

@ -20,6 +20,7 @@ StateMachine::StateMachine(int maxStates) :
void StateMachine::start() {
_run = true;
InternalEvent(0);
_stateMachineThread = std::thread(&StateMachine::StateThreadFunc, this);
}
@ -44,7 +45,7 @@ void StateMachine::ExternalEvent(unsigned char newState,
std::lock_guard<std::mutex> lock(_eventMutex);
// generate the event and execute the state engine
InternalEvent(newState, pData);
//StateEngine();
_cv.notify_one();
}
}
@ -65,13 +66,15 @@ void StateMachine::StateEngine(void)
EventData* pDataTemp = NULL;
// TBD - lock semaphore here
std::lock_guard<std::mutex> lock(_eventMutex);
std::unique_lock<std::mutex> lock(_eventMutex);
_cv.wait(lock, [this]{return this->_eventGenerated;});
// while events are being generated keep executing states
if (_eventGenerated) {
while (_eventGenerated) {
pDataTemp = _pEventData; // copy of event data pointer
_pEventData = NULL; // event data used up, reset ptr
_eventGenerated = false; // event used up, reset flag
lock.unlock();
assert(currentState < _maxStates);
@ -84,7 +87,12 @@ void StateMachine::StateEngine(void)
delete pDataTemp;
pDataTemp = NULL;
}
if(!_run)
return;
if(_eventGenerated)
lock.lock();
}
// TBD - unlock semaphore here
}

View File

@ -8,6 +8,7 @@
#ifndef SERVER_STATEMACHINE_H_
#define SERVER_STATEMACHINE_H_
#include <condition_variable>
#include <stdio.h>
#include <thread>
#include <mutex>
@ -26,7 +27,10 @@ class StateMachine
{
public:
StateMachine(int maxStates);
virtual ~StateMachine() {}
virtual ~StateMachine() {
_run = false;
_stateMachineThread.join();
}
void start();
void stop(bool);
protected:
@ -45,6 +49,7 @@ private:
void StateThreadFunc(void);
std::thread _stateMachineThread;
std::mutex _eventMutex;
std::condition_variable _cv;
bool _run = false;
};

View File

@ -237,6 +237,8 @@ struct demod_state demod2;
struct output_state output;
struct controller_state controller;
int sq_rms;
void usage(void)
{
fprintf(stderr,
@ -834,6 +836,24 @@ int squelch_to_rms(int db, struct dongle_state *dongle, struct demod_state *demo
return (int)linear + 1;
}
double rms_to_db(int rms, struct dongle_state *dongle, struct demod_state *demod)
{
double linear, gain, downsample;
gain = 50.0;
if (dongle->gain != AUTO_GAIN) {
gain = (double) (dongle->gain) / 10.0;
}
gain = 50.0 - gain;
gain = pow(10.0, gain / 20.0);
downsample = 1024.0 / (double) demod->downsample;
linear = rms - 1.0;
linear = linear * downsample;
linear = linear * gain;
return 20.0 * log10(linear);
}
void software_agc(struct demod_state *d)
{
int i = 0;
@ -907,11 +927,12 @@ void full_demod(struct demod_state *d)
low_pass(d);
}
/* power squelch */
if (d->squelch_level) {
//if (d->squelch_level) {
sr = rms(d->lowpassed, d->lp_len, 1);
if (sr < d->squelch_level) {
sq_rms = sr;
/* if (sr < d->squelch_level) {
do_squelch = 1;}
}
}*/
if (do_squelch) {
d->squelch_hits++;
for (i=0; i<d->lp_len; i++) {
@ -1299,6 +1320,7 @@ int rtl_fm_setfreq(uint32_t freq){
optimal_settings(freq, demod.rate_in);
//r = rtlsdr_set_center_freq(dongle.dev, freq);
r = verbose_set_frequency(dongle.dev, dongle.freq);
dongle.mute = BUFFER_DUMP;
return r;
}
@ -1331,8 +1353,14 @@ int rtl_fm_setmode(enum mode_demod newMode) {
return 0;
}
int rtl_fm_get_rssi(){
return 0;
float rtl_fm_get_rssi(){
//struct demod_state* d = &demod;
//safe_cond_wait(&d->ready, &d->ready_m);
//pthread_rwlock_wrlock(&d->rw);
//full_demod(d);
int sr = sq_rms;
//pthread_rwlock_unlock(&d->rw);
return rms_to_db(sr, &dongle, &demod);
}
void frequency_range(struct controller_state *s, char *arg)

View File

@ -3,6 +3,9 @@
#ifndef DRIVERS_RTL_FM_H_
#define DRIVERS_RTL_FM_H_
#define RTL_MIN_FREQ 24000000
#define RTL_MAX_FREQ 1450000000
enum custom_atan {
STD,
FAST,
@ -27,7 +30,7 @@ extern "C" {
int rtl_fm_setfreq(uint32_t freq);
int rtl_fm_setmode(enum mode_demod newMode);
int rtl_fm_get_rssi();
float rtl_fm_get_rssi();
#ifdef __cplusplus
}
#endif

10626
piScan_backend/src/external/RtAudio.cpp vendored Normal file

File diff suppressed because it is too large Load Diff

1201
piScan_backend/src/external/RtAudio.h vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,8 @@
#include <condition_variable>
#include <csignal>
#include <iostream>
#include <thread>
#include <unistd.h>
using namespace std;
#include "constants.h"
@ -8,7 +11,7 @@ using namespace std;
#include "loguru.hpp"
#include "messages.h"
#include "ServerManager.h"
#include "ScannerStateMachine.h"
#include "ScannerSM.h"
#include "SystemList.h"
//enum {
@ -26,11 +29,13 @@ using namespace std;
#define AUDIO_FLAG 0x08
//#define ALL_FLAG 0x0F
#define ALL_FLAG 0x03
#define ALL_FLAG 0x07
static bool sysRun;
class MessageManager : public MessageReceiver {
public:
MessageManager() : _run(false) {};
MessageManager() {};
~MessageManager() {};
void setReceiver(unsigned char id, MessageReceiver* ptr){
@ -49,19 +54,29 @@ public:
void stop(bool block) {
_run = false;
std::unique_lock<std::mutex> lock(_msgMutex);
_msgAvailable = true;
lock.unlock();
_cv.notify_one();
if(block)
_workThread.join();
}
private:
moodycamel::ConcurrentQueue<Message*> _queue;
std::thread _workThread;
bool _run;
std::mutex _msgMutex;
std::condition_variable _cv;
bool _msgAvailable = false;
bool _run = false;
MessageReceiver* _receivers[MESSAGE_RECEIVERS];
void _handlerThreadFunc(void){
LOG_F(2, "MessageManager started");
while(_run){
std::unique_lock<std::mutex> lock(_msgMutex);
_cv.wait(lock, [this]{return this->_msgAvailable;});
Message* message;
if(_queue.try_dequeue(message)){
DCHECK_F(message != nullptr);
@ -82,20 +97,29 @@ private:
DLOG_F(ERROR, "Message has invalid destination | dst:%d | src:%d", message->destination, message->source);
delete &message;
}
_msgAvailable = false;
}
lock.unlock();
}
LOG_F(2, "MessageManager stopped");
}
void giveMessage(Message& message){
_queue.enqueue(&message);
std::unique_lock<std::mutex> lock(_msgMutex);
_msgAvailable = true;
lock.unlock();
_cv.notify_one();
}
};
class SystemController : public MessageReceiver {
public:
SystemController(MessageReceiver& central, SystemList& syslist,
ScannerStateMachine& scan, ServerManager& conmgr, Demodulator& dm) :
ScannerSM& scan, ServerManager& conmgr, Demodulator& dm) :
_centralQueue(central), _systemList(syslist), _scanner(scan), _connectionManager(
conmgr), _demod(dm), _flagLock(_flagsMutex, std::defer_lock) {
//_flagLock(_flagsMutex);
@ -107,6 +131,7 @@ public:
LOG_F(1, "System Control start");
_scanner.start();
_connectionManager.start();
_demod.start();
while(1){
//_flagLock.lock();
@ -117,15 +142,18 @@ public:
//_flagLock.unlock();
}
LOG_F(2, "All modules started");
LOG_F(INFO, "System initialized");
_connectionManager.allowConnections();
sysRun = true;
}
void stop(){
LOG_F(1, "Stopping system");
LOG_F(INFO, "Stopping system");
_scanner.stopScanner();
_centralQueue.giveMessage(*(new ServerMessage(SYSTEM_CONTROL, ServerMessage::STOP, nullptr)));
_demod.stop();
while(1){
//_flagLock.lock();
@ -142,7 +170,7 @@ public:
private:
MessageReceiver& _centralQueue;
SystemList& _systemList;
ScannerStateMachine& _scanner;
ScannerSM& _scanner;
ServerManager& _connectionManager;
Demodulator& _demod;
@ -182,12 +210,15 @@ private:
case ControllerMessage::NOTIFY_STOPPED:
switch (msg.source) {
case SCANNER_SM:
DLOG_F(8, "scanner stopped");
_activeModules &= ~SCANNER_FLAG;
break;
case DEMOD:
_activeModules &= ~DEMOD_FLAG;
DLOG_F(8, "demod stopped");
break;
case SERVER_MAN:
DLOG_F(8, "conmgr stopped");
_activeModules &= ~CONNMGR_FLAG;
break;
case AUDIO_MAN:
@ -207,7 +238,8 @@ private:
case SYSTEM_FUNCTION:
switch(request.rqInfo.subType){
case SYSTEM_STOP:
LOG_F(1, "Stop request from client %i", request.source);
sysRun = false;
break;
default:
break;
@ -224,11 +256,16 @@ private:
static MessageManager messageManager;
static SystemList scanSystems;
static ScannerStateMachine scanner(messageManager, scanSystems);
static ScannerSM scanner(messageManager, scanSystems);
static ServerManager connectionManager(messageManager);
static Demodulator demod;
static Demodulator demod(messageManager);
static SystemController sysControl(messageManager, scanSystems, scanner, connectionManager, demod);
void sigHandler(int signal){
LOG_F(INFO, "Stop triggered by interrupt");
sysRun = false;
}
void setDemodulator(DemodInterface* demod) {
DCHECK_F(demod != nullptr);
Entry::demod = demod;
@ -238,7 +275,11 @@ int main(int argc, char **argv) {
loguru::init(argc, argv);
loguru::add_file(LOG_PATH, loguru::Truncate, loguru::Verbosity_2);
loguru::g_stderr_verbosity = loguru::Verbosity_MAX;
LOG_F(INFO, "PiScan started");
signal(SIGINT, sigHandler);
signal(SIGTERM, sigHandler);
LOG_F(INFO, "Starting PiScan");
messageManager.setReceiver(SYSTEM_CONTROL, &sysControl);
messageManager.setReceiver(SCANNER_SM, &scanner);
@ -251,12 +292,13 @@ int main(int argc, char **argv) {
messageManager.start();
sysControl.start();
for(unsigned int i = 0; i > 0; i++);
while(sysRun)
usleep(100000);
sysControl.stop();
messageManager.stop(true);
LOG_F(INFO, "PiScan stopped");
LOG_F(INFO, "PiScan stopped, exiting");
if(activeMessages > 0)
DLOG_F(WARNING, "Memory leak: %i messages not deleted!", activeMessages);

View File

@ -14,19 +14,19 @@
class Entry {
public:
Entry(char* tag, bool lo, bool del);
Entry(std::string tag, bool lo, bool del);
virtual ~Entry() {};
char* getTag() { return &tag[0]; }
virtual char* getModulation() = 0;
virtual char* getIdentity() = 0;
std::string* getTag() { return &tag; }
virtual std::string getModulation() = 0;
virtual std::string* getIdentity() = 0;
bool isLockedOut() { return lockedOut; }
bool useDelay() { return scanDelay; }
void lockout(bool val) { lockedOut = val; }
virtual bool hasSignal() = 0;
protected:
char tag[TAG_LENGTH];
std::string tag;
bool lockedOut;
bool scanDelay;
@ -47,7 +47,7 @@ public:
FMChannel(unsigned long freq, char* tag, bool lo, bool del) : Channel(freq, tag, lo, del){}
~FMChannel() {};
char* getModulation() {
std::string getModulation() {
return "FM";
}

View File

@ -10,6 +10,7 @@
#include <stdint.h>
#include <string>
#include <vector>
#include "Entry.h"
@ -36,7 +37,10 @@ public:
AnalogSystem() : RadioSystem() {};
~AnalogSystem() {};
Entry* operator[](size_t pos) { return nullptr; };
Entry* operator[](size_t pos) { return &entries[pos]; };
protected:
std::vector<Entry> entries;
};
#endif /* RADIOSYSTEM_H_ */

View File

@ -13,17 +13,18 @@
#include "DebugServer.h"
#include "loguru.hpp"
void DebugConsole::connect(){
std::cout << "Connecting...";
bool DebugConsole::connect(){
std::cerr << "Connecting...";
_run = true;
_requestThread = std::thread(&DebugConsole::_consoleInputFunc, this);
return true;
}
void DebugConsole::disconnect(){
_run = false;
std::fprintf(stdin, "\n");
std::cout << "\nConsole disconnected\n";
std::cerr << "\nConsole disconnected\n";
notifyDisconnected();
}
@ -33,7 +34,7 @@ void DebugConsole::giveMessage(Message& message){
void DebugConsole::_consoleInputFunc() {
std::string input = "";
std::cout << "\nConsole connected\n";
std::cerr << "\nConsole connected\n";
while(_run){
input.clear();
std::getline(std::cin, input);
@ -41,11 +42,11 @@ void DebugConsole::_consoleInputFunc() {
if(input.compare("stop") == 0){
_run = false;
ClientRequest::RequestParams params;
ClientRequest::RequestParams params = {.type = SYSTEM_FUNCTION, .subType = SYSTEM_STOP};
issueRequest(params);
}
}
std::cout << "\nConsole thread exited\n";
std::cerr << "\nConsole thread exited\n";
}
void DebugServer::start(){

View File

@ -22,7 +22,7 @@ public:
_requestThread.join();
};
void connect();
bool connect();
void disconnect();
void giveMessage(Message& message);

View File

@ -5,6 +5,8 @@
* Author: ezra
*/
//#include <unistd.h>
#include "ServerManager.h"
#include "loguru.hpp"
#include "DebugServer.h"
@ -49,24 +51,31 @@ void ServerManager::disconnectClients(){
void ServerManager::_queueThreadFunc(void){
while(_run){
std::unique_lock<std::mutex> lock(_msgMutex);
_cv.wait(lock, [this]{return this->_msgAvailable;});
Message* message;
if(_queue.try_dequeue(message)){
assert(message != nullptr);
if(message->destination != SERVER_MAN && message->source != CLIENT){
if(message->destination != SERVER_MAN){
_centralQueue.giveMessage(*message);
}
else{
_handleMessage(*message);
}
_msgAvailable = false;
}
Connection* newCon = nullptr;
if(_allowConnections && _connectionQueue.try_dequeue(newCon)){
_addConnection(*newCon);
_msgAvailable = false;
}
lock.unlock();
}
for(int i = 0; i < _servers.size(); i++)
for(unsigned int i = 0; i < _servers.size(); i++)
_servers[i]->stop();
// empty queue
@ -80,12 +89,20 @@ void ServerManager::_queueThreadFunc(void){
void ServerManager::giveMessage(Message& message){
_queue.enqueue(&message);
std::unique_lock<std::mutex> lock(_msgMutex);
_msgAvailable = true;
lock.unlock();
_cv.notify_one();
}
int ServerManager::requestConnection(void* client){
if(_activeConnections < MAX_CONNECTIONS){
DLOG_F(8, "New connection request");
_connectionQueue.enqueue(static_cast<Connection*>(client));
std::unique_lock<std::mutex> lock(_msgMutex);
_msgAvailable = true;
lock.unlock();
_cv.notify_one();
return RQ_ACCEPTED;
}
else{
@ -109,6 +126,7 @@ int ServerManager::giveRequest(void* request){
Message* message;
switch(rq->rqInfo.type){
case NOTIFY_DISCONNECTED:
LOG_F(INFO, "Connection %i disconnected", rq->source);
delete _connections[rq->source];
break;
case SYSTEM_FUNCTION:
@ -137,40 +155,29 @@ int ServerManager::giveRequest(void* request){
break;
}
this->giveMessage(*message);
if(message != nullptr)
this->giveMessage(*message);
return rq->rqHandle;
}
void ServerManager::_handleMessage(Message& message){
//TODO
if(message.source == CLIENT){
if(_allowConnections){
_addConnection(*(static_cast<Connection*>(message.pData)));
}
else {
// connections not yet allowed, put request back on queue
giveMessage(message);
return;
}
}
else{
auto msg = dynamic_cast<ServerMessage&>(message);
switch(msg.type){
case ServerMessage::CONTEXT_UPDATE:
break;
case ServerMessage::NOTIFY_ALL_CLIENTS:
break;
case ServerMessage::NOTIFY_USERS:
break;
case ServerMessage::NOTIFY_VIEWERS:
break;
case ServerMessage::STOP:
disconnectClients();
_run = false;
break;
default:
break;
}
assert(message.destination == SERVER_MAN);
auto msg = dynamic_cast<ServerMessage&>(message);
switch (msg.type) {
case ServerMessage::CONTEXT_UPDATE:
break;
case ServerMessage::NOTIFY_ALL_CLIENTS:
break;
case ServerMessage::NOTIFY_USERS:
break;
case ServerMessage::NOTIFY_VIEWERS:
break;
case ServerMessage::STOP:
disconnectClients();
_run = false;
break;
default:
break;
}
delete &message;
@ -182,10 +189,17 @@ void ServerManager::_addConnection(Connection& client){
if(_connections[i] == nullptr){
LOG_F(1, "Initiating connection with handle %i", i);
_connections[i] = &client;
client._handle = i;
client._serverManager = this;
client.connect();
if(client.connect()){
_connections[i] = &client;
LOG_F(INFO, "Client %i connected", i);
}
else{
LOG_F(INFO, "Connection attempt failed");
delete &client;
}
break;
}
}

View File

@ -8,6 +8,8 @@
#ifndef SERVER_SERVERMANAGER_H_
#define SERVER_SERVERMANAGER_H_
#include <condition_variable>
#include <thread>
#include <vector>
#include "messages.h"
@ -39,6 +41,9 @@ private:
std::vector<Connection*> _connections;
std::vector<BackendServer*> _servers;
std::thread _queueThread;
std::mutex _msgMutex;
std::condition_variable _cv;
bool _msgAvailable = false;
bool _allowConnections = false;
bool _run = false;

View File

@ -44,7 +44,7 @@ public:
virtual ~Connection() {};
virtual void giveMessage(Message& message) = 0;
virtual void connect() = 0;
virtual bool connect() = 0;
virtual void disconnect() = 0;
private:
@ -60,7 +60,7 @@ protected:
void notifyDisconnected() {
ClientRequest::RequestParams params = {.type = static_cast<RequestType>(ClientRequest::NOTIFY_DISCONNECTED)};
ClientRequest::RequestParams params = {.type = NOTIFY_DISCONNECTED};
issueRequest(params);
}

View File

@ -27,14 +27,14 @@ public:
};
ClientRequest(unsigned char handle, RequestParams info, void (*callback)(void*) = 0) :
ClientRequest(unsigned char handle, RequestParams info, void (*callback)(int, void*) = 0) :
Message(handle, 0), rqInfo(info), _callback(callback) {}
~ClientRequest() {};
int rqHandle = 0;
RequestParams rqInfo;
void (*_callback)(void*);
void (*_callback)(int, void*);
private:

View File

@ -0,0 +1,20 @@
#include "loguru.hpp"
#include "messages.h"
#include "RtAudio.h"
class AudioManager : public MessageReceiver {
public:
AudioManager();
~AudioManager() {};
private:
bool _outputLocally;
RtAudio _audioDriver;
void giveMessage(Message& message);
bool _startAudioOutput();
bool _stopAudioOutput();
void _muteAudio();
void _unmuteAudio();
};

View File

@ -6,4 +6,39 @@
*/
#include "Demodulator.h"
#include "loguru.hpp"
void Demodulator::start(){
if(_tuner.init() != TUNER_SUCCESS){
LOG_F(ERROR, "Error starting tuner!");
}
Message* message = new ControllerMessage(DEMOD, ControllerMessage::NOTIFY_READY);
_centralQueue.giveMessage(*message);
LOG_F(1, "Demodulator started");
}
void Demodulator::stop(){
if(_tuner.stop() != TUNER_SUCCESS){
LOG_F(ERROR, "Error stopping tuner!");
}
Message* message = new ControllerMessage(DEMOD, ControllerMessage::NOTIFY_STOPPED);
_centralQueue.giveMessage(*message);
LOG_F(1, "Demodulator stopped");
}
bool Demodulator::setFrequency(unsigned long freq) {
if(_tuner.setFrequency(freq) == TunerStatus::TUNER_SUCCESS)
return true;
return false;
}
float Demodulator::getRssi() {
return _tuner.rssi();
}
float Demodulator::getDecodedPL() { return 0; }
unsigned int Demodulator::getDecodedDC() { return 0; }
bool Demodulator::squelchThresholdMet() {
return (getRssi() >= _squelchLevel);
}

View File

@ -9,13 +9,16 @@
#define SIGPROC_DEMODULATOR_H_
#include "messages.h"
#include "Tuner.h"
#define DEFAULT_SQUELCH 65.0
class DemodInterface {
public:
virtual ~DemodInterface() {};
virtual bool setFrequency(unsigned long freq) = 0;
virtual int getRssi() = 0;
virtual float getRssi() = 0;
virtual float getDecodedPL() = 0;
virtual unsigned int getDecodedDC() = 0;
virtual bool squelchThresholdMet() = 0;
@ -23,16 +26,27 @@ public:
class Demodulator : public MessageReceiver, public DemodInterface {
public:
Demodulator() {};
~Demodulator() {};
Demodulator(MessageReceiver& central) : _centralQueue(central), _tuner(*(new RtlFmTuner())) {
//_tuner = *(new RtlFmTuner());
};
~Demodulator() {
delete &_tuner;
};
void start();
void stop();
private:
MessageReceiver& _centralQueue;
Tuner& _tuner;
float _squelchLevel = DEFAULT_SQUELCH;
void giveMessage(Message& message) {};
bool setFrequency(unsigned long freq) { return false; };
int getRssi() { return 0; };
float getDecodedPL() { return 0; };
unsigned int getDecodedDC() { return 0; };
bool squelchThresholdMet() { return false; };
bool setFrequency(unsigned long freq);
float getRssi();
float getDecodedPL();
unsigned int getDecodedDC();
bool squelchThresholdMet();
};
#endif /* SIGPROC_DEMODULATOR_H_ */

View File

@ -7,12 +7,4 @@
#include "Tuner.h"
Tuner::Tuner() {
// TODO Auto-generated constructor stub
}
Tuner::~Tuner() {
// TODO Auto-generated destructor stub
}

View File

@ -9,13 +9,19 @@
#define DRIVERS_TUNER_H_
#include <stdint.h>
#include <unistd.h>
typedef enum {
#include "rtl_fm.h"
#include "loguru.hpp"
#define RTL_DEMOD_WAIT 50000
enum TunerStatus {
TUNER_ERROR = -1,
TUNER_SUCCESS,
TUNER_BUSY,
TUNER_UNSUPPORTED,
} TunerStatus;
};
typedef enum {
MODE_FM,
@ -29,18 +35,51 @@ typedef struct {
int8_t gain;
} Frequency;
class RtlFmTuner;
class Tuner {
public:
Tuner();
virtual ~Tuner();
Tuner() {};
virtual ~Tuner() {};
TunerStatus setFrequency(Frequency* newFreq);
TunerStatus setSquelchLevel(int8_t newLevel);
virtual TunerStatus init() { return TUNER_UNSUPPORTED; };
virtual TunerStatus stop() { return TUNER_UNSUPPORTED; };
virtual TunerStatus setFrequency(uint32_t freq) { return TUNER_UNSUPPORTED; };
virtual TunerStatus setSquelchLevel(int8_t newLevel) { return TUNER_UNSUPPORTED; };
virtual float rssi() { return 0; };
};
class RtlTuner : public Tuner {
class RtlFmTuner : public Tuner {
public:
RtlFmTuner() {};
~RtlFmTuner() {};
TunerStatus init() {
LOG_F(2, "Starting rtl_fm");
if(rtl_fm_init(nullptr, 0, 12000))
return TUNER_ERROR;
return TUNER_SUCCESS;
}
TunerStatus stop() {
LOG_F(2, "Stopping rtl_fm");
if(rtl_fm_deinit())
return TUNER_ERROR;
return TUNER_SUCCESS;
}
TunerStatus setFrequency(uint32_t freq){
if(freq >= RTL_MIN_FREQ && freq <= RTL_MAX_FREQ){
if(rtl_fm_setfreq(freq))
return TUNER_ERROR;
usleep(RTL_DEMOD_WAIT);
return TUNER_SUCCESS;
}
else
return TUNER_UNSUPPORTED;
}
float rssi() { return rtl_fm_get_rssi(); }
private:
};