diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5bf77b8..0369dd4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -130,6 +130,7 @@ target_include_directories(common PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/cubic/sdr) target_include_directories(scan PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/cubic/sdr) target_sources(piscan_server PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/EventBroker.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ScannerSM.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ServerManager.cpp ) diff --git a/src/EventBroker.cpp b/src/EventBroker.cpp new file mode 100644 index 0000000..aabdfc9 --- /dev/null +++ b/src/EventBroker.cpp @@ -0,0 +1,63 @@ +#include "EventBroker.h" +#include "events.h" + +namespace piscan { + namespace events { + void publish(EventPtr event) { + EventBroker::instance()->publish(event); + } + + void subscribe(std::string topic, EventHandler handler){ + EventBroker::instance()->subscribe(topic, std::move(handler)); + } + } + + std::shared_ptr EventBroker::_instance = nullptr; + std::shared_ptr EventBroker::instance() { + if(!_instance){ + _instance = std::make_shared(); + } + return _instance; + } + + EventBroker::EventBroker() : WorkerThread("Event Broker") { + + } + + void EventBroker::publish(events::EventPtr event) { + _eventQueue.enqueue(event); + postWorkAvailable(); + } + + void EventBroker::subscribe(std::string topic, events::EventHandler handler) { + if (_handlers.find(topic) == _handlers.end()) { + _handlers[topic] = std::vector(); + } + + _handlers[topic].push_back(std::move(handler)); + } + + void EventBroker::main() { + events::EventPtr event; + if(!_eventQueue.try_dequeue(event)) { + return; + } + + std::string& topic = event->topic; + + // Support for regex subscriptions may be added later + auto found = _handlers.find(topic); + if (found != _handlers.end()){ + for (auto handler = _handlers[topic].begin(); handler != _handlers[topic].end(); handler++){ + (*handler)(event); + } + } + // 'All events' subscription + found = _handlers.find("*"); + if (found != _handlers.end()){ + for (auto handler = _handlers["*"].begin(); handler != _handlers["*"].end(); handler++){ + (*handler)(event); + } + } + } +} \ No newline at end of file diff --git a/src/EventBroker.h b/src/EventBroker.h new file mode 100644 index 0000000..4020782 --- /dev/null +++ b/src/EventBroker.h @@ -0,0 +1,36 @@ +#pragma once + +#include +#include +#include + +#include "Thread.h" +#include "events.h" +#include "concurrentqueue.h" + +namespace piscan { + +class EventBroker : public WorkerThread { +public: + static std::shared_ptr instance(); + + EventBroker(); + virtual ~EventBroker(){}; + + void publish(events::EventPtr event); + void subscribe(std::string topic, events::EventHandler handler); + +protected: + //EventBroker(); + + void main(); + +private: + static std::shared_ptr _instance; + + moodycamel::ConcurrentQueue _eventQueue; + + std::map> _handlers; +}; + +} diff --git a/src/events.h b/src/events.h new file mode 100644 index 0000000..c78be13 --- /dev/null +++ b/src/events.h @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + +namespace piscan { + namespace events { + struct Event { + Event(std::string topic) : topic(topic) {}; + std::string topic; + }; + + typedef std::shared_ptr EventPtr; + typedef std::function)> EventHandler; + //typedef std::shared_ptr EventHandler; + + void publish(EventPtr event); + + void subscribe(std::string topic, EventHandler handler); + } +} \ No newline at end of file diff --git a/src/piscan_server.cpp b/src/piscan_server.cpp index 206ceaa..2fbc435 100644 --- a/src/piscan_server.cpp +++ b/src/piscan_server.cpp @@ -12,6 +12,7 @@ #include "sigproc_types.h" #include "Demodulator.h" #include "Entry.h" +#include "EventBroker.h" #include "loguru.hpp" #include "messages.h" #include "ServerManager.h" @@ -55,6 +56,7 @@ static piscan::scan::SystemList scanSystems; static ScannerSM scannerInst(scanSystems); static ServerManager connectionManager(io_service); static piscan::sigproc::Demodulator demodInst; +static std::shared_ptr eventBroker = nullptr; static std::atomic_bool steadyState(false); @@ -103,6 +105,10 @@ void exit(int code){ std::exit(code); } +void printEvent(events::EventPtr event) { + std::cerr << "**EVENT: " << event->topic << std::endl; +} + bool app::system::stopSystem(){ if(steadyState.load()){ return true; @@ -199,6 +205,8 @@ int main(int argc, char **argv) { LOG_F(INFO, "Starting PiScan, version %s", PISCAN_VERSION); + eventBroker = EventBroker::instance(); + piscan::config::Configuration& config = piscan::config::Configuration::getConfig(); piscan::config::State& state = piscan::config::State::getState(); bool useDebugConsole = false; @@ -207,7 +215,7 @@ int main(int argc, char **argv) { int logVerbosity = config.getGeneralConfig().logfileVerbosity; int c; - while((c = getopt(argc,argv,"dp:f:l")) != -1){ + while((c = getopt(argc,argv,"dp:f:le:")) != -1){ switch(c){ case 'd': useDebugConsole = true; @@ -223,6 +231,9 @@ int main(int argc, char **argv) { case 'l': spawnClient = true; break; + case 'e': + std::cerr << "**Subscribe to event " << optarg << std::endl; + events::subscribe(optarg, printEvent); } } @@ -234,6 +245,7 @@ int main(int argc, char **argv) { try { { + eventBroker->start(); scannerInst.start(); connectionManager.start(useDebugConsole, spawnClient); demodInst.start(); @@ -260,6 +272,8 @@ int main(int argc, char **argv) { steadyState.store(true); LOG_F(INFO, "System initialized"); + events::publish(std::make_shared("system_started")); + while(sysRun) usleep(100000); @@ -267,6 +281,7 @@ int main(int argc, char **argv) { steadyState.store(false); try { //sysControl.stop(); + events::publish(std::make_shared("system_stopping")); { LOG_F(INFO, "Stopping system"); @@ -281,6 +296,9 @@ int main(int argc, char **argv) { LOG_F(4, "demod wait"); demodInst.waitDeinit(); + eventBroker->stop(); + eventBroker->join(); + LOG_F(2, "All modules stopped"); } } catch (std::exception& e) {