From c8e88e8be2793f261e23f2fb1188abd5fbdf9a18 Mon Sep 17 00:00:00 2001 From: Ezra Taimuty-Loomis Date: Fri, 28 Jan 2022 18:28:38 -0500 Subject: [PATCH] Fix sync issues with unsubscribe --- src/EventBroker.cpp | 26 ++++---------------------- src/EventBroker.h | 3 +-- src/sigproc/Demodulator.cpp | 6 +++++- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/src/EventBroker.cpp b/src/EventBroker.cpp index 4d17f60..07882fd 100644 --- a/src/EventBroker.cpp +++ b/src/EventBroker.cpp @@ -35,13 +35,9 @@ namespace piscan { _eventQueue.enqueue(event); postWorkAvailable(); } - - void EventBroker::subscribe(std::string topic, int subscriber, events::EventHandler handler) { - _subscribeQueue.enqueue(std::make_tuple(topic, subscriber, handler)); - postWorkAvailable(); - } - void EventBroker::_subscribe(std::string topic, int subscriber, events::EventHandler handler) { + void EventBroker::subscribe(std::string topic, int subscriber, events::EventHandler handler) { + std::unique_lock lock(_handler_mutex); if (_handlers.find(topic) == _handlers.end()) { _handlers[topic] = std::map(); } @@ -50,26 +46,11 @@ namespace piscan { } void EventBroker::unsubscribe(std::string topic, int subscriber) { - _unsubscribeQueue.enqueue(std::make_tuple(topic, subscriber)); - postWorkAvailable(); - } - - void EventBroker::_unsubscribe(std::string topic, int subscriber) { + std::unique_lock lock(_handler_mutex); _handlers[topic].erase(subscriber); } void EventBroker::main() { - - std::tuple subParams; - if(_subscribeQueue.try_dequeue(subParams)) { - _subscribe(std::get<0>(subParams), std::get<1>(subParams), std::move(std::get<2>(subParams))); - } - - std::tuple unsubParams; - if(_unsubscribeQueue.try_dequeue(unsubParams)) { - _unsubscribe(std::get<0>(unsubParams), std::get<1>(unsubParams)); - } - events::EventPtr event; if(!_eventQueue.try_dequeue(event)) { workAvailable = false; @@ -78,6 +59,7 @@ namespace piscan { std::string& topic = event->topic; + std::unique_lock lock(_handler_mutex); // Support for regex subscriptions may be added later auto found = _handlers.find(topic); if (found != _handlers.end()){ diff --git a/src/EventBroker.h b/src/EventBroker.h index 9b0fa30..65e9630 100644 --- a/src/EventBroker.h +++ b/src/EventBroker.h @@ -31,10 +31,9 @@ private: static std::shared_ptr _instance; moodycamel::ConcurrentQueue _eventQueue; - moodycamel::ConcurrentQueue> _subscribeQueue; - moodycamel::ConcurrentQueue> _unsubscribeQueue; std::map> _handlers; + std::mutex _handler_mutex; void _subscribe(std::string topic, int subscriber, events::EventHandler handler); void _unsubscribe(std::string topic, int subscriber); diff --git a/src/sigproc/Demodulator.cpp b/src/sigproc/Demodulator.cpp index 5b9ff16..e580ec6 100644 --- a/src/sigproc/Demodulator.cpp +++ b/src/sigproc/Demodulator.cpp @@ -147,6 +147,8 @@ bool Demodulator::setFrequency(long long freq) { DLOG_F(9, "Frequency already set"); return true; }*/ + if(freq == _currentFreq) + return true; _demodMgr.getCurrentModem()->setFrequency(freq); @@ -161,11 +163,12 @@ bool Demodulator::setFrequency(long long freq) { _demodMgr.getCurrentModem()->setFrequency(freq); //this is totally arbitrary //usleep(DEMOD_BUFFER_TIME); + events::publish(std::make_shared("demod_frequency_set", freq)); std::this_thread::sleep_for(std::chrono::microseconds(app::system::getConfig().getDemodConfig().demodDelay)); _currentFreq = freq; - + return true; } @@ -174,6 +177,7 @@ bool Demodulator::setTunerFrequency(long long freq){ _cubic->setFrequency(freq); _demodMgr.getCurrentModem()->setFrequency(freq); //usleep(200000); + events::publish(std::make_shared("demod_frequency_set", freq)); std::this_thread::sleep_for(std::chrono::microseconds(app::system::getConfig().getDemodConfig().retuneDelay)); return true; }