18 #ifndef GZ_TRANSPORT_DISCOVERY_HH_
19 #define GZ_TRANSPORT_DISCOVERY_HH_
33 #include <sys/types.h>
35 #include <sys/socket.h>
39 #include <arpa/inet.h>
43 #include <netinet/in.h>
49 #pragma warning(push, 0)
54 #pragma warning(disable: 4503)
56 #pragma warning(disable: 4996)
59 #include <gz/msgs/discovery.pb.h>
62 #include <condition_variable>
71 #include <gz/msgs/Utility.hh>
73 #include "gz/transport/config.hh"
74 #include "gz/transport/Export.hh"
86 inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
116 template<
typename Pub>
128 const bool _verbose =
false)
129 : multicastGroup(_ip),
133 silenceInterval(kDefSilenceInterval),
134 activityInterval(kDefActivityInterval),
135 heartbeatInterval(kDefHeartbeatInterval),
136 connectionCb(nullptr),
137 disconnectionCb(nullptr),
140 numHeartbeatsUninitialized(0),
145 if (
env(
"GZ_IP", gzIp) && !gzIp.
empty())
147 this->hostInterfaces = {gzIp};
156 WORD wVersionRequested;
160 wVersionRequested = MAKEWORD(2, 2);
162 if (WSAStartup(wVersionRequested, &wsaData) != 0)
168 for (
const auto &netIface : this->hostInterfaces)
170 auto succeed = this->RegisterNetIface(netIface);
175 if (netIface == this->hostAddr && !succeed)
177 this->RegisterNetIface(
"127.0.0.1");
178 std::cerr <<
"Did you set the environment variable GZ_IP with a "
180 <<
" [" << netIface <<
"] seems an invalid local IP "
182 <<
" Using 127.0.0.1 as hostname." <<
std::endl;
183 this->hostAddr =
"127.0.0.1";
192 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
193 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
195 std::cerr <<
"Error setting socket option (SO_REUSEADDR)."
207 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
208 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
210 std::cerr <<
"Error setting socket option (SO_REUSEPORT)."
216 sockaddr_in localAddr;
217 memset(&localAddr, 0,
sizeof(localAddr));
218 localAddr.sin_family = AF_INET;
219 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
220 localAddr.sin_port = htons(
static_cast<u_short
>(this->port));
222 if (bind(this->sockets.at(0),
223 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
230 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
231 this->mcastAddr.sin_family = AF_INET;
232 this->mcastAddr.sin_addr.s_addr =
233 inet_addr(this->multicastGroup.c_str());
234 this->mcastAddr.sin_port = htons(
static_cast<u_short
>(this->port));
238 if (
env(
"GZ_RELAY", gzRelay) && !gzRelay.
empty())
244 for (
auto const &relayAddr : relays)
245 this->AddRelayAddress(relayAddr);
248 this->PrintCurrentState();
255 this->exitMutex.lock();
257 this->exitMutex.unlock();
260 if (this->threadReception.joinable())
261 this->threadReception.join();
269 for (
const auto &sock : this->sockets)
292 this->enabled =
true;
296 this->timeNextHeartbeat = now;
297 this->timeNextActivity = now;
300 this->threadReception =
std::thread(&Discovery::RecvMessages,
this);
318 if (!this->info.AddPublisher(_publisher))
321 cb = this->connectionCb;
358 cb = this->connectionCb;
362 pub.SetTopic(_topic);
363 pub.SetPUuid(this->pUuid);
370 found = this->info.Publishers(_topic, addresses);
376 for (
const auto &proc : addresses)
378 for (
const auto &node : proc.second)
434 return this->info.Publishers(_topic, _publishers);
445 return this->remoteSubscribers.Publishers(_topic, _subscribers);
466 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
470 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
478 msgs::Discovery::UNADVERTISE, inf);
489 return this->hostAddr;
499 return this->activityInterval;
510 return this->heartbeatInterval;
520 return this->silenceInterval;
529 this->activityInterval = _ms;
538 this->heartbeatInterval = _ms;
547 this->silenceInterval = _ms;
557 this->connectionCb = _cb;
567 this->disconnectionCb = _cb;
576 this->registrationCb = _cb;
585 this->unregistrationCb = _cb;
594 this->subscribersCb = _cb;
608 std::cout <<
"\tActivity: " << this->activityInterval
610 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
612 std::cout <<
"\tSilence: " << this->silenceInterval
621 if (this->activity.empty())
625 for (
auto &proc : this->activity)
646 this->remoteSubscribers.Clear();
656 this->info.TopicList(_topics);
659 this->remoteSubscribers.TopicList(remoteSubs);
662 for (
auto const &t : remoteSubs)
677 if (!this->initialized)
679 this->initializedCv.wait(lk, [
this]{
return this->initialized;});
686 private:
void UpdateActivity()
699 if (now < this->timeNextActivity)
702 disconnectCb = this->disconnectionCb;
704 for (
auto it = this->activity.cbegin(); it != this->activity.cend();)
707 auto elapsed = now - it->second;
710 if (std::chrono::duration_cast<std::chrono::milliseconds>
711 (elapsed).count() > this->silenceInterval)
714 this->info.DelPublishersByProc(it->first);
719 this->activity.
erase(it++);
735 for (
auto const &uuid : uuids)
738 publisher.SetPUuid(uuid);
739 disconnectCb(publisher);
744 private:
void UpdateHeartbeat()
751 if (now < this->timeNextHeartbeat)
755 Publisher pub(
"",
"", this->pUuid,
"", AdvertiseOptions());
763 this->info.PublishersByProc(this->pUuid, nodes);
766 for (
const auto &topic : nodes)
768 for (
const auto &node : topic.second)
771 msgs::Discovery::ADVERTISE, node);
777 if (!this->initialized)
779 if (this->numHeartbeatsUninitialized == 2u)
782 this->initialized =
true;
785 this->initializedCv.notify_all();
787 ++this->numHeartbeatsUninitialized;
804 private:
int NextTimeout()
const
807 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
808 auto timeUntilNextActivity = this->timeNextActivity - now;
810 int t =
static_cast<int>(
811 std::chrono::duration_cast<std::chrono::milliseconds>
812 (
std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
813 int t2 =
std::min(t, this->kTimeout);
818 private:
void RecvMessages()
820 bool timeToExit =
false;
824 int timeout = this->NextTimeout();
828 this->RecvDiscoveryUpdate();
831 this->PrintCurrentState();
834 this->UpdateHeartbeat();
835 this->UpdateActivity();
847 private:
void RecvDiscoveryUpdate()
849 char rcvStr[Discovery::kMaxRcvStr];
850 sockaddr_in clntAddr;
851 socklen_t addrLen =
sizeof(clntAddr);
853 int64_t received = recvfrom(this->sockets.at(0),
854 reinterpret_cast<raw_type *
>(rcvStr),
856 reinterpret_cast<sockaddr *
>(&clntAddr),
857 reinterpret_cast<socklen_t *
>(&addrLen));
861 memcpy(&len, &rcvStr[0],
sizeof(len));
885 if (len +
sizeof(len) ==
static_cast<uint16_t
>(received))
887 std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
888 uint16_t srcPort = ntohs(clntAddr.sin_port);
892 std::cout <<
"\nReceived discovery update from "
893 << srcAddr <<
": " << srcPort <<
std::endl;
896 this->DispatchDiscoveryMsg(srcAddr, rcvStr +
sizeof(len), len);
899 else if (received < 0)
901 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error"
910 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
911 char *_msg, uint16_t _len)
913 gz::msgs::Discovery msg;
918 if (!msg.ParseFromArray(_msg, _len))
922 if (this->Version() != msg.version())
928 if (recvPUuid == this->pUuid)
940 if (msg.has_flags() && msg.flags().relay())
943 msg.mutable_flags()->set_relay(
false);
944 msg.mutable_flags()->set_no_relay(
true);
945 this->SendMulticast(msg);
949 this->AddRelayAddress(_fromIp);
957 else if (!msg.has_flags() || !msg.flags().no_relay())
959 msg.mutable_flags()->set_relay(
true);
960 this->SendUnicast(msg);
963 bool isSenderLocal = (
std::find(this->hostInterfaces.begin(),
964 this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
965 (_fromIp.
find(
"127.") == 0);
968 DiscoveryCallback<Pub> connectCb;
969 DiscoveryCallback<Pub> disconnectCb;
970 DiscoveryCallback<Pub> registerCb;
971 DiscoveryCallback<Pub> unregisterCb;
976 connectCb = this->connectionCb;
977 disconnectCb = this->disconnectionCb;
978 registerCb = this->registrationCb;
979 unregisterCb = this->unregistrationCb;
980 subscribersReqCb = this->subscribersCb;
985 case msgs::Discovery::ADVERTISE:
989 publisher.SetFromDiscovery(msg);
1003 added = this->info.AddPublisher(publisher);
1006 if (added && connectCb)
1009 connectCb(publisher);
1014 case msgs::Discovery::SUBSCRIBE:
1020 recvTopic = msg.sub().topic();
1024 std::cerr <<
"Subscription discovery message is missing "
1025 <<
"Subscriber information.\n";
1030 Addresses_M<Pub> addresses;
1033 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
1038 if (!this->info.Publishers(recvTopic, addresses))
1042 for (
const auto &nodeInfo : addresses[this->pUuid])
1054 msgs::Discovery::ADVERTISE, nodeInfo);
1059 case msgs::Discovery::SUBSCRIBERS_REQ:
1061 if (subscribersReqCb)
1066 case msgs::Discovery::SUBSCRIBERS_REP:
1070 publisher.SetFromDiscovery(msg);
1074 this->remoteSubscribers.AddPublisher(publisher);
1078 case msgs::Discovery::NEW_CONNECTION:
1082 publisher.SetFromDiscovery(msg);
1085 registerCb(publisher);
1089 case msgs::Discovery::END_CONNECTION:
1093 publisher.SetFromDiscovery(msg);
1096 unregisterCb(publisher);
1100 case msgs::Discovery::HEARTBEAT:
1105 case msgs::Discovery::BYE:
1110 this->activity.erase(recvPUuid);
1116 pub.SetPUuid(recvPUuid);
1124 this->info.DelPublishersByProc(recvPUuid);
1129 case msgs::Discovery::UNADVERTISE:
1133 publisher.SetFromDiscovery(msg);
1146 disconnectCb(publisher);
1152 this->info.DelPublisherByNode(publisher.Topic(),
1153 publisher.PUuid(), publisher.NUuid());
1160 std::cerr <<
"Unknown message type [" << msg.type() <<
"].\n";
1172 private:
template<
typename T>
1174 const msgs::Discovery::Type _type,
1175 const T &_pub)
const
1177 gz::msgs::Discovery discoveryMsg;
1178 discoveryMsg.set_version(this->Version());
1179 discoveryMsg.set_type(_type);
1180 discoveryMsg.set_process_uuid(this->pUuid);
1181 _pub.FillDiscovery(discoveryMsg);
1185 case msgs::Discovery::ADVERTISE:
1186 case msgs::Discovery::UNADVERTISE:
1187 case msgs::Discovery::NEW_CONNECTION:
1188 case msgs::Discovery::END_CONNECTION:
1190 _pub.FillDiscovery(discoveryMsg);
1193 case msgs::Discovery::SUBSCRIBE:
1195 discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1198 case msgs::Discovery::HEARTBEAT:
1199 case msgs::Discovery::BYE:
1200 case msgs::Discovery::SUBSCRIBERS_REQ:
1201 case msgs::Discovery::SUBSCRIBERS_REP:
1204 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message"
1205 <<
" type [" << _type <<
"]" <<
std::endl;
1212 this->SendMulticast(discoveryMsg);
1220 discoveryMsg.mutable_flags()->set_relay(
true);
1221 this->SendUnicast(discoveryMsg);
1226 std::cout <<
"\t* Sending " << msgs::ToString(_type)
1227 <<
" msg [" << _pub.Topic() <<
"]" <<
std::endl;
1233 private:
void SendUnicast(
const msgs::Discovery &_msg)
const
1237 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1238 size_t msgSizeFull = _msg.ByteSizeLong();
1240 int msgSizeFull = _msg.ByteSize();
1242 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1244 std::cerr <<
"Discovery message too large to send. Discovery won't "
1245 <<
"work. This shouldn't happen.\n";
1248 msgSize = msgSizeFull;
1250 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1251 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1252 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1254 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1257 for (
const auto &sockAddr : this->relayAddrs)
1260 auto sent = sendto(this->sockets.at(0),
1261 reinterpret_cast<const raw_type *
>(
1262 reinterpret_cast<const unsigned char*
>(buffer)),
1264 reinterpret_cast<const sockaddr *
>(&sockAddr),
1267 if (sent != totalSize)
1278 std::cerr <<
"Discovery::SendUnicast: Error serializing data."
1287 private:
void SendMulticast(
const msgs::Discovery &_msg)
const
1291 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1292 size_t msgSizeFull = _msg.ByteSizeLong();
1294 int msgSizeFull = _msg.ByteSize();
1296 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1298 std::cerr <<
"Discovery message too large to send. Discovery won't "
1299 <<
"work. This shouldn't happen.\n";
1303 msgSize = msgSizeFull;
1304 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1305 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1306 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1308 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1312 for (
const auto &sock : this->Sockets())
1315 if (sendto(sock,
reinterpret_cast<const raw_type *
>(
1316 reinterpret_cast<const unsigned char*
>(buffer)),
1318 reinterpret_cast<const sockaddr *
>(this->MulticastAddr()),
1319 sizeof(*(this->MulticastAddr()))) != totalSize)
1329 if (errno != EPERM && errno != ENOBUFS)
1331 std::cerr <<
"Exception sending a multicast message:"
1340 std::cerr <<
"Discovery::SendMulticast: Error serializing data."
1351 return this->sockets;
1356 private:
const sockaddr_in *MulticastAddr()
const
1358 return &this->mcastAddr;
1363 private: uint8_t Version()
const
1366 static int topicStats;
1368 if (
env(
"GZ_TRANSPORT_TOPIC_STATISTICS", gzStats) && !gzStats.
empty())
1370 topicStats = (gzStats ==
"1");
1373 return this->kWireVersion + (topicStats * 100);
1380 private:
bool RegisterNetIface(
const std::string &_ip)
1383 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1393 struct in_addr ifAddr;
1394 ifAddr.s_addr = inet_addr(_ip.
c_str());
1395 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1396 reinterpret_cast<const char*
>(&ifAddr),
sizeof(ifAddr)) != 0)
1398 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)."
1403 this->sockets.push_back(sock);
1408 struct ip_mreq group;
1409 group.imr_multiaddr.s_addr =
1410 inet_addr(this->multicastGroup.c_str());
1411 group.imr_interface.s_addr = inet_addr(_ip.
c_str());
1412 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1413 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1415 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)."
1425 private:
void AddRelayAddress(
const std::string &_ip)
1428 for (
auto const &addr : this->relayAddrs)
1430 if (addr.sin_addr.s_addr == inet_addr(_ip.
c_str()))
1435 memset(&addr, 0,
sizeof(addr));
1436 addr.sin_family = AF_INET;
1437 addr.sin_addr.s_addr = inet_addr(_ip.
c_str());
1438 addr.sin_port = htons(
static_cast<u_short
>(this->port));
1440 this->relayAddrs.push_back(addr);
1446 private:
static const unsigned int kDefActivityInterval = 100;
1451 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1456 private:
static const unsigned int kDefSilenceInterval = 3000;
1462 private:
const int kTimeout = 250;
1465 private:
static const uint16_t kMaxRcvStr =
1470 private:
static const uint8_t kWireVersion = 10;
1487 private:
unsigned int silenceInterval;
1492 private:
unsigned int activityInterval;
1497 private:
unsigned int heartbeatInterval;
1500 private: DiscoveryCallback<Pub> connectionCb;
1503 private: DiscoveryCallback<Pub> disconnectionCb;
1506 private: DiscoveryCallback<Pub> registrationCb;
1509 private: DiscoveryCallback<Pub> unregistrationCb;
1515 private: TopicStorage<Pub> info;
1518 private: TopicStorage<Pub> remoteSubscribers;
1527 private:
bool verbose;
1533 private: sockaddr_in mcastAddr;
1557 private:
bool initialized;
1560 private:
unsigned int numHeartbeatsUninitialized;
1569 private:
bool enabled;