Initial version of TG reflector support

This commit is contained in:
Tobias Blomberg 2019-07-26 13:06:15 +02:00
parent 0ea6db972e
commit 797dc982e0
12 changed files with 947 additions and 122 deletions

View File

@ -19,7 +19,7 @@ add_version_target(SVXREFLECTOR VERSION_DEPENDS)
# Build the executable
add_executable(svxreflector
svxreflector.cpp Reflector.cpp ReflectorClient.cpp
svxreflector.cpp Reflector.cpp ReflectorClient.cpp TGHandler.cpp
${VERSION_DEPENDS}
)
target_link_libraries(svxreflector ${LIBS})

View File

@ -0,0 +1,213 @@
/**
@file ProtoVer.h
@brief A couple of classes for doing protocol version manipulations
@author Tobias Blomberg / SM0SVX
@date 2019-07-25
\verbatim
SvxReflector - An audio reflector for connecting SvxLink Servers
Copyright (C) 2003-2019 Tobias Blomberg / SM0SVX
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
\endverbatim
*/
#ifndef PROTO_VER_INCLUDED
#define PROTO_VER_INCLUDED
/****************************************************************************
*
* System Includes
*
****************************************************************************/
#include <limits>
/****************************************************************************
*
* Project Includes
*
****************************************************************************/
/****************************************************************************
*
* Local Includes
*
****************************************************************************/
/****************************************************************************
*
* Forward declarations
*
****************************************************************************/
/****************************************************************************
*
* Namespace
*
****************************************************************************/
//namespace MyNameSpace
//{
/****************************************************************************
*
* Forward declarations of classes inside of the declared namespace
*
****************************************************************************/
/****************************************************************************
*
* Defines & typedefs
*
****************************************************************************/
/****************************************************************************
*
* Exported Global Variables
*
****************************************************************************/
/****************************************************************************
*
* Class definitions
*
****************************************************************************/
/**
@brief A class for doing protocol version manipulations
@author Tobias Blomberg / SM0SVX
@date 2019-07-25
A_detailed_class_description
*/
class ProtoVer
{
public:
static ProtoVer max(void)
{
return ProtoVer(
std::numeric_limits<uint16_t>::max(),
std::numeric_limits<uint16_t>::max());
}
/**
* @brief Default constructor
*/
ProtoVer(void) : m_major_ver(0), m_minor_ver(0) {}
ProtoVer(uint16_t major_ver, uint16_t minor_ver)
: m_major_ver(major_ver), m_minor_ver(minor_ver) {}
void set(uint16_t major_ver, uint16_t minor_ver)
{
m_major_ver = major_ver;
m_minor_ver = minor_ver;
}
uint16_t major(void) const { return m_major_ver; }
uint16_t minor(void) const { return m_minor_ver; }
bool isValid(void) const { return (m_major_ver > 0) || (m_minor_ver > 0); }
bool operator ==(const ProtoVer& rhs) const
{
return (m_major_ver == rhs.m_major_ver) &&
(m_minor_ver == rhs.m_minor_ver);
}
bool operator !=(const ProtoVer& rhs) const
{
return !(m_major_ver == rhs.m_major_ver);
}
bool operator <(const ProtoVer& rhs) const
{
return (m_major_ver < rhs.m_major_ver) ||
((m_major_ver == rhs.m_major_ver) &&
(m_minor_ver < rhs.m_minor_ver));
}
bool operator >(const ProtoVer& rhs) const
{
return (m_major_ver > rhs.m_major_ver) ||
((m_major_ver == rhs.m_major_ver) &&
(m_minor_ver > rhs.m_minor_ver));
}
bool operator <=(const ProtoVer& rhs) const
{
return (*this == rhs) || (*this < rhs);
}
bool operator >=(const ProtoVer& rhs) const
{
return (*this == rhs) || (*this > rhs);
}
/**
* @brief A_brief_member_function_description
* @param param1 Description_of_param1
* @return Return_value_of_this_member_function
*/
private:
uint16_t m_major_ver;
uint16_t m_minor_ver;
}; /* class ProtoVer */
/**
@brief A class for doing protocol version manipulations on ranges
@author Tobias Blomberg / SM0SVX
@date 2019-07-25
A_detailed_class_description
*/
class ProtoVerRange
{
public:
ProtoVerRange(void) {}
ProtoVerRange(const ProtoVer& min, const ProtoVer& max)
: m_min(min), m_max(max)
{
}
bool isValid(void) const { return m_min.isValid() && m_max.isValid(); }
bool isWithinRange(const ProtoVer& ver) const
{
return (ver >= m_min) && (ver <= m_max);
}
private:
ProtoVer m_min;
ProtoVer m_max;
}; /* class ProtoVerRange */
//} /* namespace */
#endif /* PROTO_VER_INCLUDED */
/*
* This file has not been truncated
*/

View File

@ -53,7 +53,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#include "Reflector.h"
#include "ReflectorClient.h"
#include "TGHandler.h"
/****************************************************************************
@ -118,13 +118,10 @@ void delete_client(ReflectorClient *client);
****************************************************************************/
Reflector::Reflector(void)
: m_srv(0), m_udp_sock(0), m_talker(0),
m_talker_timeout_timer(1000, Timer::TYPE_PERIODIC),
m_sql_timeout(0), m_sql_timeout_cnt(0), m_sql_timeout_blocktime(60)
: m_srv(0), m_udp_sock(0)
{
timerclear(&m_last_talker_timestamp);
m_talker_timeout_timer.expired.connect(
mem_fun(*this, &Reflector::checkTalkerTimeout));
TGHandler::instance()->talkerUpdated.connect(
mem_fun(*this, &Reflector::onTalkerUpdated));
} /* Reflector::Reflector */
@ -138,6 +135,8 @@ Reflector::~Reflector(void)
{
delete (*it).second;
}
delete TGHandler::instance();
} /* Reflector::~Reflector */
@ -186,9 +185,13 @@ bool Reflector::initialize(Async::Config &cfg)
m_udp_sock->dataReceived.connect(
mem_fun(*this, &Reflector::udpDatagramReceived));
cfg.getValue("GLOBAL", "SQL_TIMEOUT", m_sql_timeout);
cfg.getValue("GLOBAL", "SQL_TIMEOUT_BLOCKTIME", m_sql_timeout_blocktime);
m_sql_timeout_blocktime = max(m_sql_timeout_blocktime, 1U);
unsigned sql_timeout = 0;
cfg.getValue("GLOBAL", "SQL_TIMEOUT", sql_timeout);
TGHandler::instance()->setSqlTimeout(sql_timeout);
unsigned sql_timeout_blocktime = 60;
cfg.getValue("GLOBAL", "SQL_TIMEOUT_BLOCKTIME", sql_timeout_blocktime);
TGHandler::instance()->setSqlTimeoutBlocktime(sql_timeout_blocktime);
return true;
} /* Reflector::initialize */
@ -264,6 +267,8 @@ void Reflector::clientDisconnected(Async::FramedTcpConnection *con,
assert(it != m_client_con_map.end());
ReflectorClient *client = (*it).second;
TGHandler::instance()->removeClient(client);
if (!client->callsign().empty())
{
cout << client->callsign() << ": ";
@ -278,11 +283,6 @@ void Reflector::clientDisconnected(Async::FramedTcpConnection *con,
m_client_map.erase(client->clientId());
m_client_con_map.erase(it);
if (client == m_talker)
{
setTalker(0);
}
if (!client->callsign().empty())
{
broadcastMsgExcept(MsgNodeLeft(client->callsign()), client);
@ -362,32 +362,70 @@ void Reflector::udpDatagramReceived(const IpAddress& addr, uint16_t port,
if (!msg.unpack(ss))
{
cerr << "*** WARNING[" << client->callsign()
<< "]: Could not unpack incoming MsgUdpAudio message" << endl;
<< "]: Could not unpack incoming MsgUdpAudioV1 message" << endl;
return;
}
if (!msg.audioData().empty())
uint32_t tg = TGHandler::instance()->TGForClient(client);
if (!msg.audioData().empty() && (tg > 0))
{
if (m_talker == 0)
TGHandler::instance()->setTalkerForTG(tg, client);
ReflectorClient* talker = TGHandler::instance()->talkerForTG(tg);
if (talker == client)
{
setTalker(client);
cout << m_talker->callsign() << ": Talker start" << endl;
}
if (m_talker == client)
{
gettimeofday(&m_last_talker_timestamp, NULL);
broadcastUdpMsgExcept(client, msg);
broadcastUdpMsgExcept(tg, client, msg);
//broadcastUdpMsgExcept(tg, client, msg,
// ProtoVerRange(ProtoVer(0, 6),
// ProtoVer(1, ProtoVer::max().minor())));
//MsgUdpAudio msg_v2(msg);
//broadcastUdpMsgExcept(tg, client, msg_v2,
// ProtoVerRange(ProtoVer(2, 0), ProtoVer::max()));
}
}
}
break;
}
//case MsgUdpAudio::TYPE:
//{
// if (!client->isBlocked())
// {
// MsgUdpAudio msg;
// if (!msg.unpack(ss))
// {
// cerr << "*** WARNING[" << client->callsign()
// << "]: Could not unpack incoming MsgUdpAudio message" << endl;
// return;
// }
// if (!msg.audioData().empty())
// {
// if (m_talker == 0)
// {
// setTalker(client);
// cout << m_talker->callsign() << ": Talker start on TG #"
// << msg.tg() << endl;
// }
// if (m_talker == client)
// {
// gettimeofday(&m_last_talker_timestamp, NULL);
// broadcastUdpMsgExcept(tg, client, msg,
// ProtoVerRange(ProtoVer(2, 0), ProtoVer::max()));
// MsgUdpAudioV1 msg_v1(msg.audioData());
// broadcastUdpMsgExcept(tg, client, msg_v1,
// ProtoVerRange(ProtoVer(0, 6),
// ProtoVer(1, ProtoVer::max().minor())));
// }
// }
// }
// break;
//}
case MsgUdpFlushSamples::TYPE:
{
if (client == m_talker)
uint32_t tg = TGHandler::instance()->TGForClient(client);
ReflectorClient* talker = TGHandler::instance()->talkerForTG(tg);
if ((tg > 0) && (client == talker))
{
cout << m_talker->callsign() << ": Talker stop" << endl;
setTalker(0);
TGHandler::instance()->setTalkerForTG(tg, 0);
}
// To be 100% correct the reflector should wait for all connected
// clients to send a MsgUdpAllSamplesFlushed message but that will
@ -415,13 +453,15 @@ void Reflector::udpDatagramReceived(const IpAddress& addr, uint16_t port,
void Reflector::broadcastUdpMsgExcept(const ReflectorClient *except,
const ReflectorUdpMsg& msg)
const ReflectorUdpMsg& msg,
const ProtoVerRange& pv_range)
{
for (ReflectorClientMap::iterator it = m_client_map.begin();
it != m_client_map.end(); ++it)
{
ReflectorClient *client = (*it).second;
if ((client != except) &&
(!pv_range.isValid() || pv_range.isWithinRange(client->protoVer())) &&
(client->conState() == ReflectorClient::STATE_CONNECTED))
{
(*it).second->sendUdpMsg(msg);
@ -430,51 +470,39 @@ void Reflector::broadcastUdpMsgExcept(const ReflectorClient *except,
} /* Reflector::broadcastUdpMsgExcept */
void Reflector::checkTalkerTimeout(Async::Timer *t)
void Reflector::broadcastUdpMsgExcept(uint32_t tg,
const ReflectorClient *except,
const ReflectorUdpMsg& msg,
const ProtoVerRange& pv_range)
{
if (m_talker != 0)
const TGHandler::ClientSet& clients = TGHandler::instance()->clientsForTG(tg);
for (TGHandler::ClientSet::iterator it = clients.begin();
it != clients.end(); ++it)
{
struct timeval now, diff;
gettimeofday(&now, NULL);
timersub(&now, &m_last_talker_timestamp, &diff);
if (diff.tv_sec > TALKER_AUDIO_TIMEOUT)
ReflectorClient *client = *it;
if ((client != except) &&
(!pv_range.isValid() || pv_range.isWithinRange(client->protoVer())) &&
(client->conState() == ReflectorClient::STATE_CONNECTED))
{
cout << m_talker->callsign() << ": Talker audio timeout"
<< endl;
setTalker(0);
}
if ((m_sql_timeout_cnt > 0) && (--m_sql_timeout_cnt == 0))
{
cout << m_talker->callsign() << ": Talker squelch timeout"
<< endl;
m_talker->setBlock(m_sql_timeout_blocktime);
setTalker(0);
client->sendUdpMsg(msg);
}
}
} /* Reflector::checkTalkerTimeout */
} /* Reflector::broadcastUdpMsgExcept */
void Reflector::setTalker(ReflectorClient *client)
void Reflector::onTalkerUpdated(uint32_t tg, ReflectorClient* old_talker,
ReflectorClient *new_talker)
{
if (client == m_talker)
if (old_talker != 0)
{
return;
cout << old_talker->callsign() << ": Talker stop on TG #" << tg << endl;
broadcastMsgExcept(MsgTalkerStop(old_talker->callsign()));
broadcastUdpMsgExcept(tg, old_talker, MsgUdpFlushSamples());
}
if (client == 0)
if (new_talker != 0)
{
broadcastMsgExcept(MsgTalkerStop(m_talker->callsign()));
broadcastUdpMsgExcept(m_talker, MsgUdpFlushSamples());
m_sql_timeout_cnt = 0;
m_talker = 0;
}
else
{
assert(m_talker == 0);
m_sql_timeout_cnt = m_sql_timeout;
m_talker = client;
broadcastMsgExcept(MsgTalkerStart(m_talker->callsign()));
cout << new_talker->callsign() << ": Talker start on TG #" << tg << endl;
broadcastMsgExcept(MsgTalkerStart(new_talker->callsign()));
}
} /* Reflector::setTalker */

View File

@ -57,6 +57,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
****************************************************************************/
#include "ProtoVer.h"
/****************************************************************************
@ -165,8 +166,6 @@ class Reflector : public sigc::trackable
bool sendUdpDatagram(ReflectorClient *client, const void *buf, size_t count);
private:
static const time_t TALKER_AUDIO_TIMEOUT = 3; // Max three seconds gap
typedef std::map<uint32_t, ReflectorClient*> ReflectorClientMap;
typedef std::map<Async::FramedTcpConnection*,
ReflectorClient*> ReflectorClientConMap;
@ -175,13 +174,7 @@ class Reflector : public sigc::trackable
FramedTcpServer* m_srv;
Async::UdpSocket* m_udp_sock;
ReflectorClientMap m_client_map;
ReflectorClient* m_talker;
Async::Timer m_talker_timeout_timer;
struct timeval m_last_talker_timestamp;
ReflectorClientConMap m_client_con_map;
unsigned m_sql_timeout;
unsigned m_sql_timeout_cnt;
unsigned m_sql_timeout_blocktime;
Async::Config* m_cfg;
Reflector(const Reflector&);
@ -192,9 +185,14 @@ class Reflector : public sigc::trackable
void udpDatagramReceived(const Async::IpAddress& addr, uint16_t port,
void *buf, int count);
void broadcastUdpMsgExcept(const ReflectorClient *except,
const ReflectorUdpMsg& msg);
void checkTalkerTimeout(Async::Timer *t);
void setTalker(ReflectorClient *client);
const ReflectorUdpMsg& msg,
const ProtoVerRange& pv_range=ProtoVerRange());
void broadcastUdpMsgExcept(uint32_t tg,
const ReflectorClient *except,
const ReflectorUdpMsg& msg,
const ProtoVerRange& pv_range=ProtoVerRange());
void onTalkerUpdated(uint32_t tg, ReflectorClient* old_talker,
ReflectorClient *new_talker);
}; /* class Reflector */

View File

@ -6,7 +6,7 @@
\verbatim
SvxReflector - An audio reflector for connecting SvxLink Servers
Copyright (C) 2003-2017 Tobias Blomberg / SM0SVX
Copyright (C) 2003-2019 Tobias Blomberg / SM0SVX
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -57,7 +57,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#include "ReflectorClient.h"
#include "Reflector.h"
#include "TGHandler.h"
/****************************************************************************
@ -173,6 +173,7 @@ ReflectorClient::ReflectorClient(Reflector *ref, Async::FramedTcpConnection *con
ReflectorClient::~ReflectorClient(void)
{
TGHandler::instance()->removeClient(this);
} /* ReflectorClient::~ReflectorClient */
@ -295,6 +296,9 @@ void ReflectorClient::onFrameReceived(FramedTcpConnection *con,
case MsgAuthResponse::TYPE:
handleMsgAuthResponse(ss);
break;
case MsgSwitchTG::TYPE:
handleSwitchTG(ss);
break;
case MsgError::TYPE:
handleMsgError(ss);
break;
@ -325,10 +329,11 @@ void ReflectorClient::handleMsgProtoVer(std::istream& is)
sendError("Illegal MsgProtoVer protocol message received");
return;
}
m_client_proto_ver.major_ver = msg.majorVer();
m_client_proto_ver.minor_ver = msg.minorVer();
if (m_client_proto_ver < ProtoVer(MIN_MAJOR_VER, MIN_MINOR_VER) ||
m_client_proto_ver > ProtoVer(MsgProtoVer::MAJOR, MsgProtoVer::MINOR))
m_client_proto_ver.set(msg.majorVer(), msg.minorVer());
ProtoVerRange valid_proto_ver_range(
ProtoVer(MIN_MAJOR_VER, MIN_MINOR_VER),
ProtoVer(MsgProtoVer::MAJOR, MsgProtoVer::MINOR));
if (!valid_proto_ver_range.isWithinRange(m_client_proto_ver))
{
cout << "Client " << m_con->remoteHost() << ":" << m_con->remotePort()
<< " Incompatible protocol version: "
@ -381,8 +386,8 @@ void ReflectorClient::handleMsgAuthResponse(std::istream& is)
sendMsg(MsgAuthOk());
cout << m_callsign << ": Login OK from "
<< m_con->remoteHost() << ":" << m_con->remotePort()
<< " with protocol version " << m_client_proto_ver.major_ver
<< "." << m_client_proto_ver.minor_ver
<< " with protocol version " << m_client_proto_ver.major()
<< "." << m_client_proto_ver.minor()
<< endl;
m_con_state = STATE_CONNECTED;
MsgServerInfo msg_srv_info(m_client_id, m_supported_codecs);
@ -393,6 +398,10 @@ void ReflectorClient::handleMsgAuthResponse(std::istream& is)
MsgNodeList msg_node_list(msg_srv_info.nodes());
sendMsg(msg_node_list);
}
if (m_client_proto_ver < ProtoVer(2, 0))
{
TGHandler::instance()->switchTo(this, 1);
}
m_reflector->broadcastMsgExcept(MsgNodeJoined(m_callsign), this);
}
else
@ -411,6 +420,22 @@ void ReflectorClient::handleMsgAuthResponse(std::istream& is)
} /* ReflectorClient::handleMsgAuthResponse */
void ReflectorClient::handleSwitchTG(std::istream& is)
{
MsgSwitchTG msg;
if (!msg.unpack(is))
{
cout << "Client " << m_con->remoteHost() << ":" << m_con->remotePort()
<< " ERROR: Could not unpack MsgSwitchTG" << endl;
sendError("Illegal MsgSwitchTG protocol message received");
return;
}
cout << m_callsign << ": Switch to TG #" << msg.tg() << endl;
m_current_tg = msg.tg();
TGHandler::instance()->switchTo(this, msg.tg());
} /* ReflectorClient::handleSwitchTG */
void ReflectorClient::handleMsgError(std::istream& is)
{
MsgError msg;

View File

@ -55,6 +55,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
****************************************************************************/
#include "ReflectorMsg.h"
#include "ProtoVer.h"
/****************************************************************************
@ -234,43 +235,13 @@ class ReflectorClient
*/
ConState conState(void) const { return m_con_state; }
/**
* @brief Get the protocol version of the client
* @return Returns the protocol version of the client
*/
const ProtoVer& protoVer(void) const { return m_client_proto_ver; }
private:
struct ProtoVer
{
uint16_t major_ver;
uint16_t minor_ver;
ProtoVer(void) : major_ver(0), minor_ver(0) {}
ProtoVer(uint16_t major_ver, uint16_t minor_ver)
: major_ver(major_ver), minor_ver(minor_ver) {}
bool operator ==(const ProtoVer& rhs) const
{
return (major_ver == rhs.major_ver) && (minor_ver == rhs.minor_ver);
}
bool operator !=(const ProtoVer& rhs) const
{
return !(major_ver == rhs.major_ver);
}
bool operator <(const ProtoVer& rhs) const
{
return (major_ver < rhs.major_ver) ||
((major_ver == rhs.major_ver) && (minor_ver < rhs.minor_ver));
}
bool operator >(const ProtoVer& rhs) const
{
return (major_ver > rhs.major_ver) ||
((major_ver == rhs.major_ver) && (minor_ver > rhs.minor_ver));
}
bool operator <=(const ProtoVer& rhs) const
{
return (*this == rhs) || (*this < rhs);
}
bool operator >=(const ProtoVer& rhs) const
{
return (*this == rhs) || (*this > rhs);
}
};
static const uint16_t MIN_MAJOR_VER = 0;
static const uint16_t MIN_MINOR_VER = 6;
static uint32_t next_client_id;
@ -301,6 +272,7 @@ class ReflectorClient
unsigned m_remaining_blocktime;
ProtoVer m_client_proto_ver;
std::vector<std::string> m_supported_codecs;
uint32_t m_current_tg;
ReflectorClient(const ReflectorClient&);
ReflectorClient& operator=(const ReflectorClient&);
@ -308,6 +280,7 @@ class ReflectorClient
std::vector<uint8_t>& data);
void handleMsgProtoVer(std::istream& is);
void handleMsgAuthResponse(std::istream& is);
void handleSwitchTG(std::istream& is);
void handleMsgError(std::istream& is);
void sendError(const std::string& msg);
void onDiscTimeout(Async::Timer *t);

View File

@ -236,7 +236,7 @@ protocol version that the server does not support, the client is denied access.
class MsgProtoVer : public ReflectorMsgBase<5>
{
public:
static const uint16_t MAJOR = 1;
static const uint16_t MAJOR = 2;
static const uint16_t MINOR = 0;
MsgProtoVer(void) : m_major(MAJOR), m_minor(MINOR) {}
uint16_t majorVer(void) const { return m_major; }
@ -572,6 +572,27 @@ class MsgTalkerStop : public ReflectorMsgBase<105>
}; /* MsgTalkerStop */
/**
@brief Choose talk group TCP network message
@author Tobias Blomberg / SM0SVX
@date 2019-07-25
This message is sent by the client to choose which talk group to use for
communication.
*/
class MsgSwitchTG : public ReflectorMsgBase<106>
{
public:
MsgSwitchTG(void) : m_tg(0) {}
MsgSwitchTG(uint32_t tg) : m_tg(tg) {}
uint32_t tg(void) const { return m_tg; }
ASYNC_MSG_MEMBERS(m_tg);
private:
uint32_t m_tg;
}; /* MsgSwitchTG */
@ -594,7 +615,7 @@ class MsgUdpHeartbeat : public ReflectorUdpMsgBase<1>
/**
@brief Audio UDP network message
@brief Audio UDP network message V1
@author Tobias Blomberg / SM0SVX
@date 2017-02-12
@ -604,6 +625,8 @@ class MsgUdpAudio : public ReflectorUdpMsgBase<101>
{
public:
MsgUdpAudio(void) {}
MsgUdpAudio(const std::vector<uint8_t>& audio_data)
: m_audio_data(audio_data) {}
MsgUdpAudio(const void *buf, int count)
{
if (count > 0)
@ -613,6 +636,7 @@ class MsgUdpAudio : public ReflectorUdpMsgBase<101>
}
}
std::vector<uint8_t>& audioData(void) { return m_audio_data; }
const std::vector<uint8_t>& audioData(void) const { return m_audio_data; }
ASYNC_MSG_MEMBERS(m_audio_data)
@ -650,6 +674,39 @@ class MsgUdpAllSamplesFlushed : public ReflectorUdpMsgBase<103>
}; /* MsgUdpAllSamplesFlushed */
/**
@brief Audio UDP network message V2
@author Tobias Blomberg / SM0SVX
@date 2019-07-25
This is the message used to transmit audio to the other side.
*/
//class MsgUdpAudio : public ReflectorUdpMsgBase<104>
//{
// public:
// MsgUdpAudio(void) : m_tg(0) {}
// MsgUdpAudio(const MsgUdpAudioV1& msg_v1)
// : m_tg(0), m_audio_data(msg_v1.audioData()) {}
// MsgUdpAudio(uint32_t tg, const void *buf, int count)
// : m_tg(tg)
// {
// if (count > 0)
// {
// const uint8_t *bbuf = reinterpret_cast<const uint8_t*>(buf);
// m_audio_data.assign(bbuf, bbuf+count);
// }
// }
// uint32_t tg(void) { return m_tg; }
// std::vector<uint8_t>& audioData(void) { return m_audio_data; }
//
// ASYNC_MSG_MEMBERS(m_tg, m_audio_data)
//
// private:
// uint32_t m_tg;
// std::vector<uint8_t> m_audio_data;
//}; /* MsgUdpAudio */
#endif /* REFLECTOR_MSG_INCLUDED */

View File

@ -0,0 +1,325 @@
/**
@file TGHandler.cpp
@brief A class for handling talk groups
@author Tobias Blomberg / SM0SVX
@date 2019-07-26
\verbatim
SvxReflector - An audio reflector for connecting SvxLink Servers
Copyright (C) 2003-2019 Tobias Blomberg / SM0SVX
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
\endverbatim
*/
/****************************************************************************
*
* System Includes
*
****************************************************************************/
#include <iostream>
#include <cassert>
#include <algorithm>
/****************************************************************************
*
* Project Includes
*
****************************************************************************/
/****************************************************************************
*
* Local Includes
*
****************************************************************************/
#include "TGHandler.h"
/****************************************************************************
*
* Namespaces to use
*
****************************************************************************/
using namespace std;
/****************************************************************************
*
* Defines & typedefs
*
****************************************************************************/
/****************************************************************************
*
* Local class definitions
*
****************************************************************************/
/****************************************************************************
*
* Prototypes
*
****************************************************************************/
/****************************************************************************
*
* Exported Global Variables
*
****************************************************************************/
/****************************************************************************
*
* Local Global Variables
*
****************************************************************************/
/****************************************************************************
*
* Public member functions
*
****************************************************************************/
TGHandler::TGHandler(void)
: m_timeout_timer(1000, Async::Timer::TYPE_PERIODIC),
m_sql_timeout(0), m_sql_timeout_blocktime(60)
{
m_timeout_timer.expired.connect(
mem_fun(*this, &TGHandler::checkTalkerTimeout));
} /* TGHandler::TGHandler */
TGHandler::~TGHandler(void)
{
for (IdMap::iterator it = m_id_map.begin(); it != m_id_map.end(); ++it)
{
TGInfo* tg_info = it->second;
delete tg_info;
}
} /* TGHandler::~TGHandler */
void TGHandler::setSqlTimeoutBlocktime(unsigned sql_timeout_blocktime)
{
m_sql_timeout_blocktime = std::max(sql_timeout_blocktime, 1U);
} /* TGHandler::setSqlTimeoutBlocktime */
void TGHandler::switchTo(ReflectorClient *client, uint32_t tg)
{
TGInfo *tg_info = 0;
ClientMap::iterator client_map_it = m_client_map.find(client);
if (client_map_it != m_client_map.end())
{
tg_info = client_map_it->second;
assert(tg_info != 0);
if (tg_info->id == tg)
{
return;
}
removeClientP(tg_info, client);
}
IdMap::iterator id_map_it = m_id_map.find(tg);
if (id_map_it != m_id_map.end())
{
tg_info = id_map_it->second;
}
else
{
tg_info = new TGInfo(tg);
m_id_map[tg] = tg_info;
}
tg_info->clients.insert(client);
m_client_map[client] = tg_info;
printTGStatus();
} /* TGHandler::switchTo */
void TGHandler::removeClient(ReflectorClient* client)
{
ClientMap::iterator client_map_it = m_client_map.find(client);
if (client_map_it != m_client_map.end())
{
TGInfo* tg_info = client_map_it->second;
if (tg_info->talker == client)
{
setTalkerForTG(tg_info->id, 0);
}
removeClientP(tg_info, client);
printTGStatus();
}
} /* TGHandler::removeClient */
const TGHandler::ClientSet& TGHandler::clientsForTG(uint32_t tg) const
{
static TGHandler::ClientSet empty_set;
IdMap::const_iterator id_map_it = m_id_map.find(tg);
if (id_map_it == m_id_map.end())
{
return empty_set;
}
return id_map_it->second->clients;
} /* TGHandler::clientsForTG */
void TGHandler::setTalkerForTG(uint32_t tg, ReflectorClient* new_talker)
{
IdMap::const_iterator id_map_it = m_id_map.find(tg);
if (id_map_it == m_id_map.end())
{
return;
}
TGInfo* tg_info = id_map_it->second;
ReflectorClient* old_talker = tg_info->talker;
if (new_talker == old_talker)
{
gettimeofday(&tg_info->last_talker_timestamp, NULL);
return;
}
tg_info->sql_timeout_cnt = (new_talker != 0) ? m_sql_timeout : 0;
id_map_it->second->talker = new_talker;
talkerUpdated(tg, old_talker, new_talker);
printTGStatus();
} /* TGHandler::setTalkerForTG */
ReflectorClient* TGHandler::talkerForTG(uint32_t tg) const
{
IdMap::const_iterator id_map_it = m_id_map.find(tg);
if (id_map_it == m_id_map.end())
{
return 0;
}
return id_map_it->second->talker;
} /* TGHandler::talkerForTG */
uint32_t TGHandler::TGForClient(ReflectorClient* client)
{
ClientMap::iterator client_map_it = m_client_map.find(client);
if (client_map_it == m_client_map.end())
{
return 0;
}
return client_map_it->second->id;
} /* TGHandler::TGForClient */
/****************************************************************************
*
* Protected member functions
*
****************************************************************************/
/****************************************************************************
*
* Private member functions
*
****************************************************************************/
void TGHandler::checkTalkerTimeout(Async::Timer *t)
{
for (IdMap::iterator it = m_id_map.begin(); it != m_id_map.end(); ++it)
{
TGInfo *tg_info = it->second;
assert(tg_info != 0);
if (tg_info->talker != 0)
{
struct timeval now, diff;
gettimeofday(&now, NULL);
timersub(&now, &tg_info->last_talker_timestamp, &diff);
if (diff.tv_sec > TALKER_AUDIO_TIMEOUT)
{
cout << tg_info->talker->callsign() << ": Talker audio timeout on TG #"
<< tg_info->id << endl;
setTalkerForTG(tg_info->id, 0);
}
if ((tg_info->sql_timeout_cnt > 0) && (--tg_info->sql_timeout_cnt == 0))
{
cout << tg_info->talker->callsign() << ": Talker audio timeout on TG #"
<< tg_info->id << endl;
tg_info->talker->setBlock(m_sql_timeout_blocktime);
setTalkerForTG(tg_info->id, 0);
}
}
}
} /* TGHandler::checkTalkerTimeout */
void TGHandler::removeClientP(TGInfo *tg_info, ReflectorClient* client)
{
assert(tg_info != 0);
assert(client != 0);
if (client == tg_info->talker)
{
tg_info->talker = 0;
}
tg_info->clients.erase(client);
m_client_map.erase(client);
if (tg_info->clients.empty())
{
m_id_map.erase(tg_info->id);
delete tg_info;
}
} /* TGHandler::removeClientP */
void TGHandler::printTGStatus(void)
{
std::cout << "### ----------- BEGIN ----------------" << std::endl;
for (IdMap::const_iterator it = m_id_map.begin();
it != m_id_map.end(); ++it)
{
TGInfo *tg_info = it->second;
std::cout << "### " << tg_info->id << ": ";
for (ClientSet::const_iterator it = tg_info->clients.begin();
it != tg_info->clients.end(); ++it)
{
ReflectorClient* client = *it;
if (client == tg_info->talker)
{
std::cout << "*";
}
std::cout << client->callsign() << " ";
}
std::cout << std::endl;
}
std::cout << "### ------------ END -----------------" << std::endl;
} /* TGHandler::printTGStatus */
/*
* This file has not been truncated
*/

View File

@ -0,0 +1,201 @@
/**
@file TGHandler.h
@brief A class for handling talk groups
@author Tobias Blomberg / SM0SVX
@date 2019-07-25
\verbatim
SvxReflector - An audio reflector for connecting SvxLink Servers
Copyright (C) 2003-2019 Tobias Blomberg / SM0SVX
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
\endverbatim
*/
#ifndef TG_HANDLER_INCLUDED
#define TG_HANDLER_INCLUDED
/****************************************************************************
*
* System Includes
*
****************************************************************************/
#include <map>
#include <set>
#include <sigc++/sigc++.h>
#include <sys/time.h>
/****************************************************************************
*
* Project Includes
*
****************************************************************************/
/****************************************************************************
*
* Local Includes
*
****************************************************************************/
#include "ReflectorClient.h"
/****************************************************************************
*
* Forward declarations
*
****************************************************************************/
/****************************************************************************
*
* Namespace
*
****************************************************************************/
//namespace MyNameSpace
//{
/****************************************************************************
*
* Forward declarations of classes inside of the declared namespace
*
****************************************************************************/
/****************************************************************************
*
* Defines & typedefs
*
****************************************************************************/
/****************************************************************************
*
* Exported Global Variables
*
****************************************************************************/
/****************************************************************************
*
* Class definitions
*
****************************************************************************/
/**
@brief Handle talk groups
@author Tobias Blomberg / SM0SVX
@date 2019-07-25
This class is responsible for keeping track of all talk groups that are used in
the system.
*/
class TGHandler : public sigc::trackable
{
public:
typedef std::set<ReflectorClient*> ClientSet;
static TGHandler* instance(void)
{
static TGHandler *tg_handler = new TGHandler;
return tg_handler;
}
/**
* @brief Default constructor
*/
TGHandler(void);
/**
* @brief Destructor
*/
~TGHandler(void);
/**
* @brief A_brief_member_function_description
* @param param1 Description_of_param1
* @return Return_value_of_this_member_function
*/
void setSqlTimeout(unsigned sql_timeout) { m_sql_timeout = sql_timeout; }
void setSqlTimeoutBlocktime(unsigned sql_timeout_blocktime);
void switchTo(ReflectorClient *client, uint32_t tg);
void removeClient(ReflectorClient* client);
const ClientSet& clientsForTG(uint32_t tg) const;
void setTalkerForTG(uint32_t tg, ReflectorClient* client);
ReflectorClient* talkerForTG(uint32_t tg) const;
uint32_t TGForClient(ReflectorClient* client);
sigc::signal<void, uint32_t,
ReflectorClient*, ReflectorClient*> talkerUpdated;
private:
static const time_t TALKER_AUDIO_TIMEOUT = 3; // Max three seconds gap
struct TGInfo
{
uint32_t id;
ClientSet clients;
ReflectorClient* talker;
struct timeval last_talker_timestamp;
unsigned sql_timeout_cnt;
TGInfo(uint32_t tg)
: id(tg), talker(0), sql_timeout_cnt(0)
{
timerclear(&last_talker_timestamp);
}
};
typedef std::map<uint32_t, TGInfo*> IdMap;
typedef std::map<const ReflectorClient*, TGInfo*> ClientMap;
IdMap m_id_map;
ClientMap m_client_map;
Async::Timer m_timeout_timer;
unsigned m_sql_timeout;
unsigned m_sql_timeout_blocktime;
TGHandler(const TGHandler&);
TGHandler& operator=(const TGHandler&);
void checkTalkerTimeout(Async::Timer *t);
void removeClientP(TGInfo *tg_info, ReflectorClient* client);
void printTGStatus(void);
}; /* class TGHandler */
//} /* namespace */
#endif /* TG_HANDLER_INCLUDED */
/*
* This file has not been truncated
*/

View File

@ -125,7 +125,7 @@ ReflectorLogic::ReflectorLogic(Async::Config& cfg, const std::string& name)
m_flush_timeout_timer(3000, Timer::TYPE_ONESHOT, false),
m_udp_heartbeat_tx_cnt(0), m_udp_heartbeat_rx_cnt(0),
m_tcp_heartbeat_tx_cnt(0), m_tcp_heartbeat_rx_cnt(0),
m_con_state(STATE_DISCONNECTED), m_enc(0)
m_con_state(STATE_DISCONNECTED), m_enc(0), m_default_tg(0)
{
m_reconnect_timer.expired.connect(
sigc::hide(mem_fun(*this, &ReflectorLogic::reconnect)));
@ -224,6 +224,8 @@ bool ReflectorLogic::initialize(void)
}
m_logic_con_out = prev_src;
cfg().getValue(name(), "DEFAULT_TG", m_default_tg);
if (!LogicBase::initialize())
{
return false;
@ -491,6 +493,7 @@ void ReflectorLogic::handleMsgServerInfo(std::istream& is)
m_con_state = STATE_CONNECTED;
sendMsg(MsgSwitchTG(m_default_tg));
sendUdpMsg(MsgUdpHeartbeat());
} /* ReflectorLogic::handleMsgAuthChallenge */

View File

@ -184,6 +184,7 @@ class ReflectorLogic : public LogicBase
struct timeval m_last_talker_timestamp;
ConState m_con_state;
Async::AudioEncoder* m_enc;
uint32_t m_default_tg;
ReflectorLogic(const ReflectorLogic&);
ReflectorLogic& operator=(const ReflectorLogic&);

View File

@ -87,6 +87,7 @@ HOST=reflector.example.com
CALLSIGN="MYCALL"
AUTH_KEY="Change this key now!"
#JITTER_BUFFER_DELAY=0
DEFAULT_TG=24099
[LinkToR4]
CONNECT_LOGICS=RepeaterLogic:94:SK3AB,SimplexLogic:92:SK3CD