Compare commits

...

7 Commits

Author SHA1 Message Date
Ezra Taimuty-Loomis c8e88e8be2 Fix sync issues with unsubscribe 2022-01-28 18:28:38 -05:00
Ezra Taimuty-Loomis 2beffedeea Pretty event printing 2022-01-28 16:40:44 -05:00
Ezra Taimuty-Loomis 111e443ec3 Event unsubscribe support 2022-01-28 15:36:45 -05:00
Ezra Taimuty-Loomis d6e1ba6a5f Replace context updates with events 2022-01-27 19:04:33 -05:00
Ezra Taimuty-Loomis 41a1b7ea3f Initial event system implementation 2022-01-27 16:37:05 -05:00
Ezra Taimuty-Loomis 52d21935f9 Thread convenience class 2022-01-27 16:36:42 -05:00
Ezra Taimuty-Loomis 181b4eee2b Initial API definition 2022-01-26 20:30:38 -05:00
23 changed files with 940 additions and 14 deletions

View File

@ -130,6 +130,7 @@ target_include_directories(common PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/cubic/sdr)
target_include_directories(scan PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/cubic/sdr)
target_sources(piscan_server PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/EventBroker.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ScannerSM.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ServerManager.cpp
)

96
src/EventBroker.cpp Normal file
View File

@ -0,0 +1,96 @@
#include <tuple>
#include "EventBroker.h"
#include "events.h"
#include "loguru.hpp"
namespace piscan {
namespace events {
void publish(EventPtr event) {
EventBroker::instance()->publish(event);
}
void subscribe(std::string topic, int subscriber, EventHandler handler){
EventBroker::instance()->subscribe(topic, subscriber, std::move(handler));
}
void unsubscribe(std::string topic, int subscriber){
EventBroker::instance()->unsubscribe(topic, subscriber);
}
}
std::shared_ptr<EventBroker> EventBroker::_instance = nullptr;
std::shared_ptr<EventBroker> EventBroker::instance() {
if(!_instance){
_instance = std::make_shared<EventBroker>();
}
return _instance;
}
EventBroker::EventBroker() : WorkerThread("Event Broker") {
}
void EventBroker::publish(events::EventPtr event) {
_eventQueue.enqueue(event);
postWorkAvailable();
}
void EventBroker::subscribe(std::string topic, int subscriber, events::EventHandler handler) {
std::unique_lock<std::mutex> lock(_handler_mutex);
if (_handlers.find(topic) == _handlers.end()) {
_handlers[topic] = std::map<int, events::EventHandler>();
}
_handlers[topic][subscriber] = std::move(handler);
}
void EventBroker::unsubscribe(std::string topic, int subscriber) {
std::unique_lock<std::mutex> lock(_handler_mutex);
_handlers[topic].erase(subscriber);
}
void EventBroker::main() {
events::EventPtr event;
if(!_eventQueue.try_dequeue(event)) {
workAvailable = false;
return;
}
std::string& topic = event->topic;
std::unique_lock<std::mutex> lock(_handler_mutex);
// Support for regex subscriptions may be added later
auto found = _handlers.find(topic);
if (found != _handlers.end()){
for (auto handler = _handlers[topic].begin(); handler != _handlers[topic].end(); handler++){
try {
if ((*handler).second == nullptr) {
_handlers[topic].erase((*handler).first);
continue;
}
(*handler).second(event);
} catch (std::exception& e) {
LOG_F(WARNING, "Invalid event handler encountered, deleting");
_handlers[topic].erase((*handler).first);
}
}
}
// 'All events' subscription
found = _handlers.find("*");
if (found != _handlers.end()){
for (auto handler = _handlers["*"].begin(); handler != _handlers["*"].end(); handler++){
try {
if ((*handler).second == nullptr) {
_handlers[topic].erase((*handler).first);
continue;
}
(*handler).second(event);
} catch (std::exception& e) {
LOG_F(WARNING, "Invalid event handler encountered, deleting");
_handlers[topic].erase((*handler).first);
}
}
}
}
}

42
src/EventBroker.h Normal file
View File

@ -0,0 +1,42 @@
#pragma once
#include <memory>
#include <map>
#include <vector>
#include <tuple>
#include "Thread.h"
#include "events.h"
#include "concurrentqueue.h"
namespace piscan {
class EventBroker : public WorkerThread {
public:
static std::shared_ptr<EventBroker> instance();
EventBroker();
virtual ~EventBroker(){};
void publish(events::EventPtr event);
void subscribe(std::string topic, int subscriber, events::EventHandler handler);
void unsubscribe(std::string topic, int subscriber);
protected:
//EventBroker();
void main();
private:
static std::shared_ptr<EventBroker> _instance;
moodycamel::ConcurrentQueue<events::EventPtr> _eventQueue;
std::map<std::string, std::map<int, events::EventHandler>> _handlers;
std::mutex _handler_mutex;
void _subscribe(std::string topic, int subscriber, events::EventHandler handler);
void _unsubscribe(std::string topic, int subscriber);
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <vector>
#include <tuple>
#include "messages/context.h"
#include "Configuration.h"
@ -15,6 +16,13 @@ class DemodInterface;
namespace piscan {
namespace app {
enum ReturnStatus {
SUCCESS,
INVALID,
NOT_IMPLEMENTED,
};
typedef std::tuple<ReturnStatus, void*> BasicReturnTuple;
struct ManualEntryData {
public:
ManualEntryData(ManualEntryData& copy) : freq(copy.freq), modulation(copy.modulation){};
@ -55,9 +63,9 @@ long long getTunerSampleRate();
/* server functions */
namespace server {
void scannerContextUpdate(piscan::server::context::ScannerContext ctx);
/*void scannerContextUpdate(piscan::server::context::ScannerContext ctx);
void demodContextUpdate(piscan::server::context::DemodContext ctx);
void signalLevelUpdate(int level);
void signalLevelUpdate(int level);*/
}
/* audio related */
@ -65,5 +73,140 @@ namespace audio {
AudioThread* getAudioController();
}
/* database */
namespace data {
/*
Retrieve the entire System tree
*/
BasicReturnTuple getScanList(); //TODO
/*
Retrieve list of Systems and their indices, tags, and types
*/
BasicReturnTuple getSystemList();
/*
Retrieve tree of System and its Entries at index
*/
BasicReturnTuple getSystemByIndex(size_t sys_index); //TODO
/*
Retrieve list of Entries within indexed System and their indices and descriptors
*/
BasicReturnTuple getEntryList(size_t sys_index);
/*
Retrieve Entry at index
*/
BasicReturnTuple getEntryByIndex(size_t sys_index, size_t entry_index); //TODO
namespace system {
/*
Create a new Radio System
*/
BasicReturnTuple create(/*TODO data*/);
/*
Replace the Radio System header at index. Entries will be retained unless the system type is changed.
*/
BasicReturnTuple replace(size_t sys_index /*, TODO new*/);
/*
Remove the Radio System and its Entries at index. Indices of succeeding Systems will be updated upon success
*/
BasicReturnTuple remove(size_t sys_index);
/*
Set lockout status of System at index.
- '0' for unlocked
- '-1' for persistent lock
- '>1' lock for duration in seconds
*/
BasicReturnTuple setLockout(size_t sys_index, int duration_seconds);
/*
Move Radio System from original index to new index. All other indices are updated upon success
*/
BasicReturnTuple setIndex(size_t original_sys_index, size_t new_sys_index);
namespace entry {
/*
Create a new Entry within the indexed System
*/
BasicReturnTuple create(size_t sys_index /*,TODO data*/);
/*
Replace the Entry at index
*/
BasicReturnTuple replace(size_t sys_index, size_t entry_index /*, TODO new*/);
/*
Remove the Entry at index. Succeeding indices within the System are updated upon success
*/
BasicReturnTuple remove(size_t sys_index, size_t entry_index);
/*
Set lockout status of Entry at index.
- '0' for unlocked
- '-1' for persistent lock
- '>1' lock for duration in seconds
*/
BasicReturnTuple setLockout(size_t sys_index, size_t entry_index, int duration_seconds);
/*
Move Entry within System from original index to new index. All other indices are updated upon success
*/
BasicReturnTuple setIndex(size_t sys_index, size_t original_entry_index, size_t new_entry_index);
}
}
}
namespace configuration {
/*
Retrieve the full system configuration
*/
BasicReturnTuple getFullConfig();
/*
Set the full system configuration. Requires restart
*/
BasicReturnTuple setConfig(/*TODO*/);
/*
Retrieve general configuration
*/
BasicReturnTuple getGeneralConfig();
/*
Set the general configuration
*/
BasicReturnTuple setGeneralConfig(/*TODO*/);
/*
Retrieve configuration for demodulators
*/
BasicReturnTuple getDemodConfig();
/*
Set the configuration for demodulators. Restart bit required
*/
BasicReturnTuple setDemodConfig(/*TODO*/);
/*
Retrieve configuration for RTSP server
*/
BasicReturnTuple getAudioServerConfig();
/*
Set the configuration for RTSP server. Requires restart
*/
BasicReturnTuple setAudioServerConfig(/*TODO*/);
/*
Retrieve a list of configured tuners
*/
BasicReturnTuple getTunerList();
}
} // namespace app
} // namespace piscan

View File

@ -7,6 +7,7 @@
#include <unistd.h>
#include <mutex>
#include <map>
#include "PiScan.h"
#include "ScannerSM.h"
@ -18,7 +19,7 @@
#include "RadioSystem.h"
#include "request.h"
#include "Configuration.h"
#include "events.h"
#define DELAY_TIMEOUT 2.0
@ -334,6 +335,7 @@ void ScannerSM::ST_Stopped(EventData* /* data */){
void ScannerSM::_broadcastContextUpdate() {
DLOG_F(6, "Broadcasting context");
// TODO old context remain until new request system setup
lock_guard<mutex> lock(_contextMutex);
if (_currentContext.state != piscan::server::context::ScannerContext::SCAN)
{
@ -359,7 +361,38 @@ void ScannerSM::_broadcastContextUpdate() {
_currentContext.clearFields();
}
app::server::scannerContextUpdate(_currentContext);
/*app::server::scannerContextUpdate(_currentContext);*/
/* EVENT BASED IMPLEMENTATION */
events::ScannerStateEvent event;
std::map<unsigned char, events::ScannerStateEvent::ScannerState> states = {
std::make_pair(States::ST_HOLD, events::ScannerStateEvent::ScannerState::HOLD),
std::make_pair(States::ST_SCAN, events::ScannerStateEvent::ScannerState::SCAN),
std::make_pair(States::ST_RECEIVE, events::ScannerStateEvent::ScannerState::RECEIVE),
std::make_pair(States::ST_LOAD, events::ScannerStateEvent::ScannerState::OTHER_STATE),
std::make_pair(States::ST_INVALID, events::ScannerStateEvent::ScannerState::OTHER_STATE),
};
event.state = states[currentState];
if (currentState != States::ST_SCAN) {
if (_manualMode)
{
event.systemTag = "Manual";
event.entryTag = "Manual entry";
event.entryIndex = "MAN";
}
else
{
//event.systemTag = _currentSystem->tag();
event.systemTag = _systems[_currentEntry->getSysIndex()]->tag();
event.entryTag = _currentEntry->tag();
event.entryIndex = to_string(_currentEntry->getSysIndex()) + "-" + to_string(_currentEntry->getEntryIndex());
}
event.frequency = _currentEntry->freq();
event.modulation = _currentEntry->modulation();
event.delayMS = _currentEntry->delayMS();
event.lockout = _currentEntry->isLockedOut();
}
events::publish(std::make_shared<events::ScannerStateEvent>(std::move(event)));
}
void ScannerSM::_enableAudioOut(bool en){

View File

@ -220,7 +220,7 @@ int ServerManager::giveRequest(void* request){
}
void ServerManager::_handleMessage(std::shared_ptr<Message> message){
assert(message->destination == SERVER_MAN);
//assert(message->destination == SERVER_MAN);
auto msg = std::dynamic_pointer_cast<ServerMessage>(message);
int* level = nullptr;
switch (msg->type) {

View File

@ -1,4 +1,5 @@
add_library(common
${CMAKE_CURRENT_SOURCE_DIR}/config_api.cpp
${CMAKE_CURRENT_SOURCE_DIR}/Configuration.cpp
${CMAKE_CURRENT_SOURCE_DIR}/State.cpp
)

66
src/common/config_api.cpp Normal file
View File

@ -0,0 +1,66 @@
#include "PiScan.h"
namespace piscan::app::configuration {
/*
*/
BasicReturnTuple getFullConfig()
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
*/
BasicReturnTuple setConfig(/*TODO*/)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
*/
BasicReturnTuple getGeneralConfig()
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
*/
BasicReturnTuple setGeneralConfig(/*TODO*/)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
*/
BasicReturnTuple getDemodConfig()
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
*/
BasicReturnTuple setDemodConfig(/*TODO*/)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
*/
BasicReturnTuple getAudioServerConfig()
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
*/
BasicReturnTuple setAudioServerConfig(/*TODO*/)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
*/
BasicReturnTuple getTunerList()
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
}

View File

@ -13,6 +13,7 @@
#include "loguru.hpp"
#include "threadname.h"
#include "PiScan.h"
#include "events.h"
#define SDR_THREAD_NAME "SDR Interface"
@ -116,10 +117,13 @@ bool SDRThread::init() {
//4. Apply other settings: Frequency, PPM correction, Gains, Device-specific settings:
device->setFrequency(SOAPY_SDR_RX,0,"RF",frequency - offset.load());
//TEMP
piscan::events::publish(std::make_shared<piscan::events::GenericNumberEvent>("tuner_frequency_set", frequency - offset.load()));
if (devInfo->hasCORR(SOAPY_SDR_RX, 0)) {
hasPPM.store(true);
device->setFrequency(SOAPY_SDR_RX,0,"CORR",ppm.load());
piscan::events::publish(std::make_shared<piscan::events::GenericNumberEvent>("tuner_ppm_set", ppm.load()));
} else {
hasPPM.store(false);
}
@ -514,6 +518,7 @@ void SDRThread::updateSettings() {
if (ppm_changed.load() && hasPPM.load()) {
device->setFrequency(SOAPY_SDR_RX,0,"CORR",ppm.load());
piscan::events::publish(std::make_shared<piscan::events::GenericNumberEvent>("tuner_ppm_set", ppm.load()));
ppm_changed.store(false);
}
@ -521,8 +526,10 @@ void SDRThread::updateSettings() {
if (frequency_locked.load() && !frequency_lock_init.load()) {
device->setFrequency(SOAPY_SDR_RX,0,"RF",lock_freq.load());
frequency_lock_init.store(true);
piscan::events::publish(std::make_shared<piscan::events::GenericNumberEvent>("tuner_frequency_set", lock_freq.load()));
} else if (!frequency_locked.load()) {
device->setFrequency(SOAPY_SDR_RX,0,"RF",frequency.load() - offset.load());
piscan::events::publish(std::make_shared<piscan::events::GenericNumberEvent>("tuner_frequency_set", frequency.load() - offset.load()));
}
freq_changed.store(false);

125
src/events.h Normal file
View File

@ -0,0 +1,125 @@
#pragma once
#include <memory>
#include <functional>
namespace piscan {
namespace events {
struct Event {
Event(std::string topic) : topic(topic) {};
virtual ~Event(){};
inline virtual std::string to_string() {
return "Event type: " + topic + "\n";
}
std::string topic;
};
typedef std::shared_ptr<Event> EventPtr;
typedef std::function<void(std::shared_ptr<Event>)> EventHandler;
//typedef std::shared_ptr<EventHandler> EventHandler;
void publish(EventPtr event);
void subscribe(std::string topic, int subscriber, EventHandler handler);
void unsubscribe(std::string topic, int subscriber);
/*---------------------------------*/
struct GenericStringEvent : public Event {
GenericStringEvent(std::string topic, std::string data) : Event(topic), data(data) {};
virtual ~GenericStringEvent() {};
inline virtual std::string to_string() {
return Event::to_string() + "\tData:\t" + data + "\n";
}
std::string data;
};
struct GenericNumberEvent : public Event {
GenericNumberEvent(std::string topic, long long data) : Event(topic), data(data) {};
virtual ~GenericNumberEvent() {};
inline virtual std::string to_string() {
return Event::to_string() + "\tData:\t" + std::to_string(data) + "\n";
}
long long data;
};
struct GenericBoolEvent : public Event {
GenericBoolEvent(std::string topic, bool data) : Event(topic), data(data) {};
virtual ~GenericBoolEvent() {};
inline virtual std::string to_string() {
return Event::to_string() + "\tData:\t" + std::to_string(data) + "\n";
}
bool data;
};
struct ScannerStateEvent : public Event {
virtual ~ScannerStateEvent() {};
enum ScannerState {
OTHER_STATE,
SCAN,
HOLD,
RECEIVE,
};
ScannerStateEvent() : Event("scanner_state_change") {};
inline virtual std::string to_string() {
std::string tail = (state == HOLD || state == RECEIVE)?
"\tSystem:\t" + systemTag + "\n" +
"\tEntry:\t" + entryTag + "\n" +
"\tFrequency:\t" + std::to_string(frequency) + "\n" +
"\tModulation:\t" + modulation + "\n" +
"\tIndex:\t" + entryIndex + "\n" +
"\tDelay:\t" + std::to_string(delayMS) + "\n" +
"\tLockout:\t" + std::to_string(lockout) + "\n"
: "";
return Event::to_string() +
"\tState:\t" + std::to_string(state) + "\n" + tail;
}
//TODO ported from context messages
ScannerState state = OTHER_STATE;
std::string systemTag = "";
std::string entryTag = "";
long long frequency = 0;
std::string modulation = "";
std::string entryIndex = "";
int delayMS = 0;
bool lockout = false;
};
struct DemodStateEvent : public Event {
DemodStateEvent() : Event("demod_state_change") {};
virtual ~DemodStateEvent() {};
inline virtual std::string to_string() {
return Event::to_string() +
"\tTuner gain:\t" + std::to_string(tunerGainState) + "\n"
"\tSquelch:\t" + std::to_string(squelchState) + "\n";
}
float tunerGainState;
int squelchState;
};
struct SignalLevelEvent : public Event {
SignalLevelEvent(int level) : Event("signal_level"), level(level) {};
virtual ~SignalLevelEvent(){};
inline virtual std::string to_string() {
return Event::to_string() + "\tSignal level:\t" + std::to_string(level) + "\n";
}
int level;
};
struct LoggingEvent : public Event {
LoggingEvent(int level, std::string line) : Event("log_write"), level(level), line(line) {};
virtual ~LoggingEvent(){};
inline virtual std::string to_string() {
return Event::to_string() + "\t> " + line + "\n";
}
int level;
std::string line;
};
}
}

View File

@ -12,6 +12,7 @@
#include "sigproc_types.h"
#include "Demodulator.h"
#include "Entry.h"
#include "EventBroker.h"
#include "loguru.hpp"
#include "messages.h"
#include "ServerManager.h"
@ -55,6 +56,7 @@ static piscan::scan::SystemList scanSystems;
static ScannerSM scannerInst(scanSystems);
static ServerManager connectionManager(io_service);
static piscan::sigproc::Demodulator demodInst;
static std::shared_ptr<piscan::EventBroker> eventBroker = nullptr;
static std::atomic_bool steadyState(false);
@ -103,10 +105,15 @@ void exit(int code){
std::exit(code);
}
void printEvent(events::EventPtr event) {
std::cerr << event->to_string();
}
bool app::system::stopSystem(){
if(steadyState.load()){
return true;
}
sysRun = false;
return false;
}
@ -173,7 +180,7 @@ const piscan::server::context::SystemInfo app::system::getSystemInfo(){
return info;
}
void app::server::scannerContextUpdate(piscan::server::context::ScannerContext ctx){
/*void app::server::scannerContextUpdate(piscan::server::context::ScannerContext ctx){
connectionManager.giveMessage(make_shared<ServerMessage>(SCANNER_SM, ServerMessage::CONTEXT_UPDATE, new piscan::server::context::ScannerContext(ctx)));
}
@ -183,7 +190,7 @@ void app::server::demodContextUpdate(piscan::server::context::DemodContext ctx){
void app::server::signalLevelUpdate(int level){
connectionManager.giveMessage(make_shared<ServerMessage>(DEMOD, ServerMessage::SIGNAL_LEVEL, new int(level)));
}
}*/
}
@ -199,6 +206,8 @@ int main(int argc, char **argv) {
LOG_F(INFO, "Starting PiScan, version %s", PISCAN_VERSION);
eventBroker = EventBroker::instance();
piscan::config::Configuration& config = piscan::config::Configuration::getConfig();
piscan::config::State& state = piscan::config::State::getState();
bool useDebugConsole = false;
@ -207,7 +216,7 @@ int main(int argc, char **argv) {
int logVerbosity = config.getGeneralConfig().logfileVerbosity;
int c;
while((c = getopt(argc,argv,"dp:f:l")) != -1){
while((c = getopt(argc,argv,"dp:f:le:")) != -1){
switch(c){
case 'd':
useDebugConsole = true;
@ -223,6 +232,9 @@ int main(int argc, char **argv) {
case 'l':
spawnClient = true;
break;
case 'e':
std::cerr << "**Subscribe to event " << optarg << std::endl;
events::subscribe(optarg, 1, printEvent);
}
}
@ -234,6 +246,7 @@ int main(int argc, char **argv) {
try {
{
eventBroker->start();
scannerInst.start();
connectionManager.start(useDebugConsole, spawnClient);
demodInst.start();
@ -260,6 +273,8 @@ int main(int argc, char **argv) {
steadyState.store(true);
LOG_F(INFO, "System initialized");
events::publish(std::make_shared<events::Event>("system_started"));
while(sysRun)
usleep(100000);
@ -267,6 +282,7 @@ int main(int argc, char **argv) {
steadyState.store(false);
try {
//sysControl.stop();
events::publish(std::make_shared<events::Event>("system_stopping"));
{
LOG_F(INFO, "Stopping system");
@ -281,6 +297,9 @@ int main(int argc, char **argv) {
LOG_F(4, "demod wait");
demodInst.waitDeinit();
eventBroker->stop();
eventBroker->join();
LOG_F(2, "All modules stopped");
}
} catch (std::exception& e) {

View File

@ -1,4 +1,5 @@
add_library(scan
${CMAKE_CURRENT_SOURCE_DIR}/api.cpp
${CMAKE_CURRENT_SOURCE_DIR}/Entry.cpp
${CMAKE_CURRENT_SOURCE_DIR}/RadioSystem.cpp
${CMAKE_CURRENT_SOURCE_DIR}/SystemList.cpp

138
src/scan/api.cpp Normal file
View File

@ -0,0 +1,138 @@
#include <tuple>
#include "PiScan.h"
namespace piscan::app::data
{
/*
Retrieve the entire System tree
*/
BasicReturnTuple getScanList()
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
Retrieve list of Systems and their indices, tags, and types
*/
BasicReturnTuple getSystemList()
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
Retrieve tree of System and its Entries at index
*/
BasicReturnTuple getSystemByIndex(size_t sys_index)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
} //TODO
/*
Retrieve list of Entries within indexed System and their indices and descriptors
*/
BasicReturnTuple getEntryList(size_t sys_index)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
Retrieve Entry at index
*/
BasicReturnTuple getEntryByIndex(size_t sys_index, size_t entry_index)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
} //TODO
namespace system
{
/*
Create a new Radio System
*/
BasicReturnTuple create(/*TODO data*/)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
Replace the Radio System header at index. Entries will be retained unless the system type is changed.
*/
BasicReturnTuple replace(size_t sys_index /*, TODO new*/)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
Remove the Radio System and its Entries at index. Indices of succeeding Systems will be updated upon success
*/
BasicReturnTuple remove(size_t sys_index)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
Set lockout status of System at index.
- '0' for unlocked
- '-1' for persistent lock
- '>1' lock for duration in seconds
*/
BasicReturnTuple setLockout(size_t sys_index, int duration_seconds)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
Move Radio System from original index to new index. All other indices are updated upon success
*/
BasicReturnTuple setIndex(size_t original_sys_index, size_t new_sys_index)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
namespace entry
{
/*
Create a new Entry within the indexed System
*/
BasicReturnTuple create(size_t sys_index /*,TODO data*/)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
Replace the Entry at index
*/
BasicReturnTuple replace(size_t sys_index, size_t entry_index /*, TODO new*/)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
Remove the Entry at index. Succeeding indices within the System are updated upon success
*/
BasicReturnTuple remove(size_t sys_index, size_t entry_index)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
Set lockout status of Entry at index.
- '0' for unlocked
- '-1' for persistent lock
- '>1' lock for duration in seconds
*/
BasicReturnTuple setLockout(size_t sys_index, size_t entry_index, int duration_seconds)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
/*
Move Entry within System from original index to new index. All other indices are updated upon success
*/
BasicReturnTuple setIndex(size_t sys_index, size_t original_entry_index, size_t new_entry_index)
{
return std::make_tuple(NOT_IMPLEMENTED, nullptr);
}
}
}
}

View File

@ -5,9 +5,14 @@
* Author: ezra
*/
#include <memory>
#include <iostream>
#include "PiScan.h"
#include "connection.h"
#include "request.h"
#include "events.h"
#include "messages/context.h"
namespace piscan {
namespace server {
@ -74,6 +79,43 @@ int Connection::systemFunction(SystemFunction function) {
return issueRequest(params);
}*/
// TODO temporary workaround until new connection interfaces built
bool Connection::connect() {
events::subscribe("scanner_state_change", (1000+_handle), [this](events::EventPtr event){
auto evt = std::dynamic_pointer_cast<events::ScannerStateEvent>(event);
piscan::server::context::ScannerContext ctx;
ctx.state = static_cast<context::ScannerContext::ScannerState>(evt->state);
ctx.systemTag = evt->systemTag;
ctx.entryTag = evt->entryTag;
ctx.frequency = evt->frequency;
ctx.modulation = evt->modulation;
ctx.entryIndex = evt->entryIndex;
ctx.delayMS = evt->delayMS;
ctx.lockout = evt->lockout;
contextUpdate(ctx);
});
events::subscribe("demod_state_change", (1000+_handle), [this](events::EventPtr event){
auto evt = std::dynamic_pointer_cast<events::DemodStateEvent>(event);
piscan::server::context::DemodContext ctx(evt->tunerGainState, evt->squelchState);
contextUpdate(ctx);
});
events::subscribe("signal_level", (1000+_handle), [this](events::EventPtr event){
auto evt = std::dynamic_pointer_cast<events::SignalLevelEvent>(event);
handleSignalLevel(evt->level);
});
return true;
}
void Connection::disconnect() {
events::unsubscribe("scanner_state_change", (1000+_handle));
events::unsubscribe("demod_state_change", (1000+_handle));
events::unsubscribe("signal_level", (1000+_handle));
}
int Connection::scanStart() {
//ClientRequest::RequestParams params = { .type = SCANNER_FUNCTION, .subType = SCANNER_STATE_SCAN };
//return issueRequest(params);

View File

@ -11,6 +11,7 @@
#include <vector>
#include <map>
#include "PiScan.h"
#include "DebugServer.h"
#include "loguru.hpp"
#include "threadname.h"
@ -24,8 +25,10 @@ namespace connection {
#define DS_THREAD_NAME "DebugConsole"
using namespace piscan;
using piscan::app::BasicReturnTuple;
bool DebugConsole::connect(){
Connection::connect();
std::cerr << "\nConnecting...\n";
_run = true;
_requestThread = std::thread(&DebugConsole::_consoleInputFunc, this);
@ -33,6 +36,7 @@ bool DebugConsole::connect(){
}
void DebugConsole::disconnect(){
Connection::disconnect();
_run = false;
std::fprintf(stdin, "\n");
@ -49,6 +53,37 @@ void DebugConsole::_consoleInputFunc() {
std::string intermediate;
std::cerr << "\nConsole connected\n";
std::map<app::ReturnStatus, std::string> friendlyReturnCodes;
friendlyReturnCodes[app::SUCCESS] = "Success";
friendlyReturnCodes[app::INVALID] = "Invalid parameters";
friendlyReturnCodes[app::NOT_IMPLEMENTED] = "Function not yet implemented";
std::map<std::string, std::function<piscan::app::BasicReturnTuple()>> apiMap;
apiMap["scanlist"] = []() { return app::data::getScanList(); };
apiMap["systems"] = []() { return app::data::getSystemList(); };
apiMap["systemat"] = [tokens]() { return app::data::getSystemByIndex(std::stoi(tokens[1])); };
apiMap["entrylist"] = [tokens]() { return app::data::getEntryList(std::stoi(tokens[1])); };
apiMap["entryat"] = [tokens]() { return app::data::getEntryByIndex(std::stoi(tokens[1]), std::stoi(tokens[2])); };
apiMap["createsystem"] = [tokens]() { return app::data::system::create(/*TODO*/); };
apiMap["replacesystem"] = [tokens]() { return app::data::system::replace(std::stoi(tokens[1])/*, TODO*/); };
apiMap["removesystem"] = [tokens]() { return app::data::system::remove(std::stoi(tokens[1])); };
apiMap["locksystem"] = [tokens]() { return app::data::system::setLockout(std::stoi(tokens[1]), std::stoi(tokens[2])); };
apiMap["setsystemindex"] = [tokens]() { return app::data::system::setIndex(std::stoi(tokens[1]), std::stoi(tokens[2])); };
apiMap["createentry"] = [tokens]() { return app::data::system::entry::create(std::stoi(tokens[1])/*,TODO*/); };
apiMap["replaceentry"] = [tokens]() { return app::data::system::entry::replace(std::stoi(tokens[1]), std::stoi(tokens[2])/*, TODO*/); };
apiMap["removeentry"] = [tokens]() { return app::data::system::entry::remove(std::stoi(tokens[1]), std::stoi(tokens[2])); };
apiMap["lockentry"] = [tokens]() { return app::data::system::entry::setLockout(std::stoi(tokens[1]), std::stoi(tokens[2]), std::stoi(tokens[3])); };
apiMap["setentryindex"] = [tokens]() { return app::data::system::entry::setIndex(std::stoi(tokens[1]), std::stoi(tokens[2]), std::stoi(tokens[3])); };
apiMap["config"] = []() { return app::configuration::getFullConfig(); };
apiMap["setconfig"] = [tokens]() { return app::configuration::setConfig(); };
apiMap["generalcfg"] = []() { return app::configuration::getGeneralConfig(); };
apiMap["setgeneralcfg"] = [tokens]() { return app::configuration::setGeneralConfig(); };
apiMap["demodcfg"] = []() { return app::configuration::getDemodConfig(); };
apiMap["setdemodcfg"] = [tokens]() { return app::configuration::setDemodConfig(); };
apiMap["rtspcfg"] = []() { return app::configuration::getAudioServerConfig(); };
apiMap["setrtspcfg"] = [tokens]() { return app::configuration::setAudioServerConfig(); };
apiMap["tunerlist"] = []() { return app::configuration::getTunerList(); };
getSystemInfo();
getScannerContext();
getDemodContext();
@ -64,7 +99,8 @@ void DebugConsole::_consoleInputFunc() {
try {
if (!tokens[0].compare("exit")) {
_run = false;
systemFunction(SystemFunction::STOP);
//systemFunction(SystemFunction::STOP);
app::system::stopSystem();
}
else if (!tokens[0].compare("verbosity")){
loguru::g_stderr_verbosity = std::stoi(tokens[1]);
@ -122,8 +158,16 @@ void DebugConsole::_consoleInputFunc() {
<< "\n\tget [subcommand]"
<< "\n\t\tcontext\t\tReturns scanner status"
<< "\n";
for (auto cmd = apiMap.begin(); cmd != apiMap.end(); cmd++) {
std::cerr << "\t" << cmd->first << "\n";
}
} else
try {
auto ret = apiMap[tokens[0]]();
std::cerr << "Status: " << friendlyReturnCodes[std::get<0>(ret)] << "\n";
} catch (std::exception& e) {
std::cerr << "Invalid command\n";
}
} catch (std::exception& e) {
std::cerr << "Argument missing or typo in the command\n";
}

View File

@ -21,11 +21,13 @@ namespace server {
namespace connection {
bool SocketConnection::connect(){
Connection::connect();
_startRead();
return true;
}
void SocketConnection::disconnect(){
Connection::disconnect();
if (_socket.is_open()) {
_socket.close();
notifyDisconnected();

View File

@ -42,8 +42,8 @@ public:
_level(lvl), _audio(aud), _serverManager(nullptr), _handle(HANDLE_NULL) {}
virtual ~Connection() {};
virtual bool connect() = 0;
virtual void disconnect() = 0;
virtual bool connect();
virtual void disconnect();
virtual void contextUpdate(const piscan::server::context::ScannerContext context) = 0;
virtual void contextUpdate(const piscan::server::context::DemodContext context) = 0;
virtual void handleSystemMessage(const piscan::server::context::GeneralMessage message) = 0;

View File

@ -11,6 +11,7 @@
#include "PiScan.h"
#include "Demodulator.h"
#include "loguru.hpp"
#include "events.h"
#define INIT_FREQUENCY 100000000
#define NUM_RATES_DEFAULT 4
@ -115,7 +116,8 @@ void Demodulator::start(){
int level = getSignalStrength();
LOG_F(7, "Signal strength %i", level);
app::server::signalLevelUpdate(level);
//app::server::signalLevelUpdate(level);
events::publish(std::make_shared<events::SignalLevelEvent>(level));
});
//_sigLevelRefresher = new IntervalTimer();
_sigLevelRefresher.create(SIGLEVEL_REFRESH_INTERVAL, func);
@ -145,6 +147,8 @@ bool Demodulator::setFrequency(long long freq) {
DLOG_F(9, "Frequency already set");
return true;
}*/
if(freq == _currentFreq)
return true;
_demodMgr.getCurrentModem()->setFrequency(freq);
@ -159,11 +163,12 @@ bool Demodulator::setFrequency(long long freq) {
_demodMgr.getCurrentModem()->setFrequency(freq);
//this is totally arbitrary
//usleep(DEMOD_BUFFER_TIME);
events::publish(std::make_shared<events::GenericNumberEvent>("demod_frequency_set", freq));
std::this_thread::sleep_for(std::chrono::microseconds(app::system::getConfig().getDemodConfig().demodDelay));
_currentFreq = freq;
return true;
}
@ -172,6 +177,7 @@ bool Demodulator::setTunerFrequency(long long freq){
_cubic->setFrequency(freq);
_demodMgr.getCurrentModem()->setFrequency(freq);
//usleep(200000);
events::publish(std::make_shared<events::GenericNumberEvent>("demod_frequency_set", freq));
std::this_thread::sleep_for(std::chrono::microseconds(app::system::getConfig().getDemodConfig().retuneDelay));
return true;
}
@ -290,7 +296,11 @@ void Demodulator::_handleRequest(ClientRequest& request){
}
void Demodulator::_contextUpdate(){
app::server::demodContextUpdate(piscan::server::context::DemodContext(_gain, _squelchLevel));
//app::server::demodContextUpdate(piscan::server::context::DemodContext(_gain, _squelchLevel));
events::DemodStateEvent event;
event.squelchState = _squelchLevel;
event.tunerGainState = _gain;
events::publish(std::make_shared<events::DemodStateEvent>(std::move(event)));
}
void Demodulator::setTunerGain(float gain){

View File

@ -1,4 +1,5 @@
add_library(util
${CMAKE_CURRENT_SOURCE_DIR}/Thread.cpp
${CMAKE_CURRENT_SOURCE_DIR}/StateMachine.cpp
)

95
src/util/Thread.cpp Normal file
View File

@ -0,0 +1,95 @@
#include "Thread.h"
#include "threadname.h"
#include "loguru.hpp"
namespace piscan {
ThreadBase::ThreadBase(std::string name) : name(name), _run(true) {
}
void ThreadBase::initialize() {
LOG_F(1, "Initializing");
}
void ThreadBase::deinitialize() {
LOG_F(1, "Deinitializing");
}
void ThreadBase::start() {
LOG_F(1, "Starting %s", name.c_str());
_run = true;
_thread = std::thread([this]() { (*this).run(); });
}
void ThreadBase::stop() {
LOG_F(1, "Stopping %s", name.c_str());
_run = false;
}
void ThreadBase::join() {
if (_thread.joinable()) {
_thread.join();
}
}
void ThreadBase::run() {
setThreadName(name.c_str());
initialize();
LOG_F(1, "Started");
while(isRunning()) {
main();
}
deinitialize();
LOG_F(1, "Stopped");
}
void WorkerThread::start() {
_stopping = false;
ThreadBase::start();
}
void WorkerThread::stop() {
ThreadBase::stop();
std::unique_lock<std::mutex> lock(_mtx);
_stopping = true;
_cv.notify_all();
}
void WorkerThread::run() {
setThreadName(name.c_str());
initialize();
LOG_F(1, "Started");
while (isRunning())
{
std::unique_lock<std::mutex> lock(_mtx);
while(!workAvailable && !_stopping) {
_cv.wait(lock);
}
if(workAvailable){
main();
}
else if(_stopping){
break;
}
}
deinitialize();
LOG_F(1, "Stopped");
}
bool ThreadBase::isRunning() {
return _run.load();
}
void WorkerThread::postWorkAvailable() {
std::unique_lock<std::mutex> lock(_mtx);
workAvailable = true;
_cv.notify_all();
}
}

59
src/util/Thread.h Normal file
View File

@ -0,0 +1,59 @@
#pragma once
#include <thread>
#include <atomic>
#include <mutex>
#include <condition_variable>
namespace piscan {
class ThreadBase {
public:
virtual ~ThreadBase() {};
virtual void initialize();
virtual void deinitialize();
virtual void start();
virtual void stop();
virtual void join();
const std::string name;
protected:
ThreadBase(std::string name);
virtual void run();
virtual void main() = 0;
bool isRunning();
//Logger logger();
private:
std::atomic_bool _run;
std::thread _thread;
};
class WorkerThread : public ThreadBase {
public:
virtual void start();
virtual void stop();
protected:
WorkerThread(std::string name) : ThreadBase(name) {};
virtual ~WorkerThread(){};
virtual void run();
virtual void main() = 0;
void postWorkAvailable();
bool workAvailable;
private:
std::mutex _mtx;
std::condition_variable _cv;
bool _stopping;
};
}

View File

@ -2,4 +2,5 @@ package_add_test(util_tests
${CMAKE_CURRENT_SOURCE_DIR}/IntervalTimer_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/StateMachine_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/synchronize_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/Thread_test.cpp
)

View File