Fix sync issues with unsubscribe
This commit is contained in:
parent
2beffedeea
commit
c8e88e8be2
|
|
@ -35,13 +35,9 @@ namespace piscan {
|
|||
_eventQueue.enqueue(event);
|
||||
postWorkAvailable();
|
||||
}
|
||||
|
||||
void EventBroker::subscribe(std::string topic, int subscriber, events::EventHandler handler) {
|
||||
_subscribeQueue.enqueue(std::make_tuple(topic, subscriber, handler));
|
||||
postWorkAvailable();
|
||||
}
|
||||
|
||||
void EventBroker::_subscribe(std::string topic, int subscriber, events::EventHandler handler) {
|
||||
void EventBroker::subscribe(std::string topic, int subscriber, events::EventHandler handler) {
|
||||
std::unique_lock<std::mutex> lock(_handler_mutex);
|
||||
if (_handlers.find(topic) == _handlers.end()) {
|
||||
_handlers[topic] = std::map<int, events::EventHandler>();
|
||||
}
|
||||
|
|
@ -50,26 +46,11 @@ namespace piscan {
|
|||
}
|
||||
|
||||
void EventBroker::unsubscribe(std::string topic, int subscriber) {
|
||||
_unsubscribeQueue.enqueue(std::make_tuple(topic, subscriber));
|
||||
postWorkAvailable();
|
||||
}
|
||||
|
||||
void EventBroker::_unsubscribe(std::string topic, int subscriber) {
|
||||
std::unique_lock<std::mutex> lock(_handler_mutex);
|
||||
_handlers[topic].erase(subscriber);
|
||||
}
|
||||
|
||||
void EventBroker::main() {
|
||||
|
||||
std::tuple<std::string, int, events::EventHandler> subParams;
|
||||
if(_subscribeQueue.try_dequeue(subParams)) {
|
||||
_subscribe(std::get<0>(subParams), std::get<1>(subParams), std::move(std::get<2>(subParams)));
|
||||
}
|
||||
|
||||
std::tuple<std::string, int> unsubParams;
|
||||
if(_unsubscribeQueue.try_dequeue(unsubParams)) {
|
||||
_unsubscribe(std::get<0>(unsubParams), std::get<1>(unsubParams));
|
||||
}
|
||||
|
||||
events::EventPtr event;
|
||||
if(!_eventQueue.try_dequeue(event)) {
|
||||
workAvailable = false;
|
||||
|
|
@ -78,6 +59,7 @@ namespace piscan {
|
|||
|
||||
std::string& topic = event->topic;
|
||||
|
||||
std::unique_lock<std::mutex> lock(_handler_mutex);
|
||||
// Support for regex subscriptions may be added later
|
||||
auto found = _handlers.find(topic);
|
||||
if (found != _handlers.end()){
|
||||
|
|
|
|||
|
|
@ -31,10 +31,9 @@ private:
|
|||
static std::shared_ptr<EventBroker> _instance;
|
||||
|
||||
moodycamel::ConcurrentQueue<events::EventPtr> _eventQueue;
|
||||
moodycamel::ConcurrentQueue<std::tuple<std::string, int, events::EventHandler>> _subscribeQueue;
|
||||
moodycamel::ConcurrentQueue<std::tuple<std::string, int>> _unsubscribeQueue;
|
||||
|
||||
std::map<std::string, std::map<int, events::EventHandler>> _handlers;
|
||||
std::mutex _handler_mutex;
|
||||
|
||||
void _subscribe(std::string topic, int subscriber, events::EventHandler handler);
|
||||
void _unsubscribe(std::string topic, int subscriber);
|
||||
|
|
|
|||
|
|
@ -147,6 +147,8 @@ bool Demodulator::setFrequency(long long freq) {
|
|||
DLOG_F(9, "Frequency already set");
|
||||
return true;
|
||||
}*/
|
||||
if(freq == _currentFreq)
|
||||
return true;
|
||||
|
||||
_demodMgr.getCurrentModem()->setFrequency(freq);
|
||||
|
||||
|
|
@ -161,11 +163,12 @@ bool Demodulator::setFrequency(long long freq) {
|
|||
_demodMgr.getCurrentModem()->setFrequency(freq);
|
||||
//this is totally arbitrary
|
||||
//usleep(DEMOD_BUFFER_TIME);
|
||||
events::publish(std::make_shared<events::GenericNumberEvent>("demod_frequency_set", freq));
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(app::system::getConfig().getDemodConfig().demodDelay));
|
||||
|
||||
_currentFreq = freq;
|
||||
|
||||
|
||||
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
@ -174,6 +177,7 @@ bool Demodulator::setTunerFrequency(long long freq){
|
|||
_cubic->setFrequency(freq);
|
||||
_demodMgr.getCurrentModem()->setFrequency(freq);
|
||||
//usleep(200000);
|
||||
events::publish(std::make_shared<events::GenericNumberEvent>("demod_frequency_set", freq));
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(app::system::getConfig().getDemodConfig().retuneDelay));
|
||||
return true;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue