Initial event system implementation

This commit is contained in:
Ezra Taimuty-Loomis 2022-01-27 16:37:05 -05:00
parent 52d21935f9
commit 41a1b7ea3f
5 changed files with 140 additions and 1 deletions

View File

@ -130,6 +130,7 @@ target_include_directories(common PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/cubic/sdr)
target_include_directories(scan PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/cubic/sdr)
target_sources(piscan_server PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/EventBroker.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ScannerSM.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ServerManager.cpp
)

63
src/EventBroker.cpp Normal file
View File

@ -0,0 +1,63 @@
#include "EventBroker.h"
#include "events.h"
namespace piscan {
namespace events {
void publish(EventPtr event) {
EventBroker::instance()->publish(event);
}
void subscribe(std::string topic, EventHandler handler){
EventBroker::instance()->subscribe(topic, std::move(handler));
}
}
std::shared_ptr<EventBroker> EventBroker::_instance = nullptr;
std::shared_ptr<EventBroker> EventBroker::instance() {
if(!_instance){
_instance = std::make_shared<EventBroker>();
}
return _instance;
}
EventBroker::EventBroker() : WorkerThread("Event Broker") {
}
void EventBroker::publish(events::EventPtr event) {
_eventQueue.enqueue(event);
postWorkAvailable();
}
void EventBroker::subscribe(std::string topic, events::EventHandler handler) {
if (_handlers.find(topic) == _handlers.end()) {
_handlers[topic] = std::vector<events::EventHandler>();
}
_handlers[topic].push_back(std::move(handler));
}
void EventBroker::main() {
events::EventPtr event;
if(!_eventQueue.try_dequeue(event)) {
return;
}
std::string& topic = event->topic;
// Support for regex subscriptions may be added later
auto found = _handlers.find(topic);
if (found != _handlers.end()){
for (auto handler = _handlers[topic].begin(); handler != _handlers[topic].end(); handler++){
(*handler)(event);
}
}
// 'All events' subscription
found = _handlers.find("*");
if (found != _handlers.end()){
for (auto handler = _handlers["*"].begin(); handler != _handlers["*"].end(); handler++){
(*handler)(event);
}
}
}
}

36
src/EventBroker.h Normal file
View File

@ -0,0 +1,36 @@
#pragma once
#include <memory>
#include <map>
#include <vector>
#include "Thread.h"
#include "events.h"
#include "concurrentqueue.h"
namespace piscan {
class EventBroker : public WorkerThread {
public:
static std::shared_ptr<EventBroker> instance();
EventBroker();
virtual ~EventBroker(){};
void publish(events::EventPtr event);
void subscribe(std::string topic, events::EventHandler handler);
protected:
//EventBroker();
void main();
private:
static std::shared_ptr<EventBroker> _instance;
moodycamel::ConcurrentQueue<events::EventPtr> _eventQueue;
std::map<std::string, std::vector<events::EventHandler>> _handlers;
};
}

21
src/events.h Normal file
View File

@ -0,0 +1,21 @@
#pragma once
#include <memory>
#include <functional>
namespace piscan {
namespace events {
struct Event {
Event(std::string topic) : topic(topic) {};
std::string topic;
};
typedef std::shared_ptr<Event> EventPtr;
typedef std::function<void(std::shared_ptr<Event>)> EventHandler;
//typedef std::shared_ptr<EventHandler> EventHandler;
void publish(EventPtr event);
void subscribe(std::string topic, EventHandler handler);
}
}

View File

@ -12,6 +12,7 @@
#include "sigproc_types.h"
#include "Demodulator.h"
#include "Entry.h"
#include "EventBroker.h"
#include "loguru.hpp"
#include "messages.h"
#include "ServerManager.h"
@ -55,6 +56,7 @@ static piscan::scan::SystemList scanSystems;
static ScannerSM scannerInst(scanSystems);
static ServerManager connectionManager(io_service);
static piscan::sigproc::Demodulator demodInst;
static std::shared_ptr<piscan::EventBroker> eventBroker = nullptr;
static std::atomic_bool steadyState(false);
@ -103,6 +105,10 @@ void exit(int code){
std::exit(code);
}
void printEvent(events::EventPtr event) {
std::cerr << "**EVENT: " << event->topic << std::endl;
}
bool app::system::stopSystem(){
if(steadyState.load()){
return true;
@ -199,6 +205,8 @@ int main(int argc, char **argv) {
LOG_F(INFO, "Starting PiScan, version %s", PISCAN_VERSION);
eventBroker = EventBroker::instance();
piscan::config::Configuration& config = piscan::config::Configuration::getConfig();
piscan::config::State& state = piscan::config::State::getState();
bool useDebugConsole = false;
@ -207,7 +215,7 @@ int main(int argc, char **argv) {
int logVerbosity = config.getGeneralConfig().logfileVerbosity;
int c;
while((c = getopt(argc,argv,"dp:f:l")) != -1){
while((c = getopt(argc,argv,"dp:f:le:")) != -1){
switch(c){
case 'd':
useDebugConsole = true;
@ -223,6 +231,9 @@ int main(int argc, char **argv) {
case 'l':
spawnClient = true;
break;
case 'e':
std::cerr << "**Subscribe to event " << optarg << std::endl;
events::subscribe(optarg, printEvent);
}
}
@ -234,6 +245,7 @@ int main(int argc, char **argv) {
try {
{
eventBroker->start();
scannerInst.start();
connectionManager.start(useDebugConsole, spawnClient);
demodInst.start();
@ -260,6 +272,8 @@ int main(int argc, char **argv) {
steadyState.store(true);
LOG_F(INFO, "System initialized");
events::publish(std::make_shared<events::Event>("system_started"));
while(sysRun)
usleep(100000);
@ -267,6 +281,7 @@ int main(int argc, char **argv) {
steadyState.store(false);
try {
//sysControl.stop();
events::publish(std::make_shared<events::Event>("system_stopping"));
{
LOG_F(INFO, "Stopping system");
@ -281,6 +296,9 @@ int main(int argc, char **argv) {
LOG_F(4, "demod wait");
demodInst.waitDeinit();
eventBroker->stop();
eventBroker->join();
LOG_F(2, "All modules stopped");
}
} catch (std::exception& e) {