18 #ifndef GZ_TRANSPORT_DISCOVERY_HH_
19 #define GZ_TRANSPORT_DISCOVERY_HH_
33 #include <sys/types.h>
35 #include <sys/socket.h>
39 #include <arpa/inet.h>
43 #include <netinet/in.h>
49 #pragma warning(push, 0)
54 #pragma warning(disable: 4503)
56 #pragma warning(disable: 4996)
59 #include <gz/msgs/discovery.pb.h>
62 #include <condition_variable>
71 #include <gz/msgs/Utility.hh>
73 #include "gz/transport/config.hh"
74 #include "gz/transport/Export.hh"
86 inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
116 template<
typename Pub>
128 const bool _verbose =
false)
129 : multicastGroup(_ip),
133 silenceInterval(kDefSilenceInterval),
134 activityInterval(kDefActivityInterval),
135 heartbeatInterval(kDefHeartbeatInterval),
136 connectionCb(nullptr),
137 disconnectionCb(nullptr),
140 numHeartbeatsUninitialized(0),
145 if (
env(
"GZ_IP", gzIp) && !gzIp.
empty())
147 this->hostInterfaces = {gzIp};
150 else if (
env(
"IGN_IP", gzIp) && !gzIp.
empty())
152 std::cerr <<
"IGN_IP is deprecated and will be removed! "
154 this->hostInterfaces = {gzIp};
163 WORD wVersionRequested;
167 wVersionRequested = MAKEWORD(2, 2);
169 if (WSAStartup(wVersionRequested, &wsaData) != 0)
175 for (
const auto &netIface : this->hostInterfaces)
177 auto succeed = this->RegisterNetIface(netIface);
182 if (netIface == this->hostAddr && !succeed)
184 this->RegisterNetIface(
"127.0.0.1");
185 std::cerr <<
"Did you set the environment variable IGN_IP with a "
187 <<
" [" << netIface <<
"] seems an invalid local IP "
189 <<
" Using 127.0.0.1 as hostname." <<
std::endl;
190 this->hostAddr =
"127.0.0.1";
199 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
200 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
202 std::cerr <<
"Error setting socket option (SO_REUSEADDR)."
214 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
215 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
217 std::cerr <<
"Error setting socket option (SO_REUSEPORT)."
223 sockaddr_in localAddr;
224 memset(&localAddr, 0,
sizeof(localAddr));
225 localAddr.sin_family = AF_INET;
226 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
227 localAddr.sin_port = htons(
static_cast<u_short
>(this->port));
229 if (bind(this->sockets.at(0),
230 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
237 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
238 this->mcastAddr.sin_family = AF_INET;
239 this->mcastAddr.sin_addr.s_addr =
240 inet_addr(this->multicastGroup.c_str());
241 this->mcastAddr.sin_port = htons(
static_cast<u_short
>(this->port));
245 if (
env(
"GZ_RELAY", gzRelay) && !gzRelay.
empty())
250 else if (
env(
"IGN_RELAY", gzRelay) && !gzRelay.
empty())
252 std::cout <<
"IGN_RELAY is deprecated and will be removed! "
258 for (
auto const &relayAddr : relays)
259 this->AddRelayAddress(relayAddr);
262 this->PrintCurrentState();
269 this->exitMutex.lock();
271 this->exitMutex.unlock();
274 if (this->threadReception.joinable())
275 this->threadReception.join();
283 for (
const auto &sock : this->sockets)
306 this->enabled =
true;
310 this->timeNextHeartbeat = now;
311 this->timeNextActivity = now;
314 this->threadReception =
std::thread(&Discovery::RecvMessages,
this);
330 if (!this->info.AddPublisher(_publisher))
365 cb = this->connectionCb;
369 pub.SetTopic(_topic);
370 pub.SetPUuid(this->pUuid);
377 found = this->info.Publishers(_topic, addresses);
383 for (
const auto &proc : addresses)
385 for (
const auto &node : proc.second)
433 return this->info.Publishers(_topic, _publishers);
454 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
458 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
466 msgs::Discovery::UNADVERTISE, inf);
477 return this->hostAddr;
487 return this->activityInterval;
498 return this->heartbeatInterval;
508 return this->silenceInterval;
517 this->activityInterval = _ms;
526 this->heartbeatInterval = _ms;
535 this->silenceInterval = _ms;
545 this->connectionCb = _cb;
555 this->disconnectionCb = _cb;
564 this->registrationCb = _cb;
573 this->unregistrationCb = _cb;
587 std::cout <<
"\tActivity: " << this->activityInterval
589 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
591 std::cout <<
"\tSilence: " << this->silenceInterval
600 if (this->activity.empty())
604 for (
auto &proc : this->activity)
624 this->info.TopicList(_topics);
633 if (!this->initialized)
635 this->initializedCv.wait(lk, [
this]{
return this->initialized;});
642 private:
void UpdateActivity()
655 if (now < this->timeNextActivity)
658 disconnectCb = this->disconnectionCb;
660 for (
auto it = this->activity.cbegin(); it != this->activity.cend();)
663 auto elapsed = now - it->second;
666 if (std::chrono::duration_cast<std::chrono::milliseconds>
667 (elapsed).count() > this->silenceInterval)
670 this->info.DelPublishersByProc(it->first);
675 this->activity.
erase(it++);
691 for (
auto const &uuid : uuids)
694 publisher.SetPUuid(uuid);
695 disconnectCb(publisher);
700 private:
void UpdateHeartbeat()
707 if (now < this->timeNextHeartbeat)
711 Publisher pub(
"",
"", this->pUuid,
"", AdvertiseOptions());
719 this->info.PublishersByProc(this->pUuid, nodes);
722 for (
const auto &topic : nodes)
724 for (
const auto &node : topic.second)
727 msgs::Discovery::ADVERTISE, node);
733 if (!this->initialized)
735 if (this->numHeartbeatsUninitialized == 2u)
738 this->initialized =
true;
741 this->initializedCv.notify_all();
743 ++this->numHeartbeatsUninitialized;
760 private:
int NextTimeout()
const
763 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
764 auto timeUntilNextActivity = this->timeNextActivity - now;
766 int t =
static_cast<int>(
767 std::chrono::duration_cast<std::chrono::milliseconds>
768 (
std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
769 int t2 =
std::min(t, this->kTimeout);
774 private:
void RecvMessages()
776 bool timeToExit =
false;
780 int timeout = this->NextTimeout();
784 this->RecvDiscoveryUpdate();
787 this->PrintCurrentState();
790 this->UpdateHeartbeat();
791 this->UpdateActivity();
803 private:
void RecvDiscoveryUpdate()
805 char rcvStr[Discovery::kMaxRcvStr];
806 sockaddr_in clntAddr;
807 socklen_t addrLen =
sizeof(clntAddr);
809 int64_t received = recvfrom(this->sockets.at(0),
810 reinterpret_cast<raw_type *
>(rcvStr),
812 reinterpret_cast<sockaddr *
>(&clntAddr),
813 reinterpret_cast<socklen_t *
>(&addrLen));
817 memcpy(&len, &rcvStr[0],
sizeof(len));
841 if (len +
sizeof(len) ==
static_cast<uint16_t
>(received))
843 std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
844 uint16_t srcPort = ntohs(clntAddr.sin_port);
848 std::cout <<
"\nReceived discovery update from "
849 << srcAddr <<
": " << srcPort <<
std::endl;
852 this->DispatchDiscoveryMsg(srcAddr, rcvStr +
sizeof(len), len);
855 else if (received < 0)
857 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error"
866 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
867 char *_msg, uint16_t _len)
869 gz::msgs::Discovery msg;
874 if (!msg.ParseFromArray(_msg, _len))
878 if (this->Version() != msg.version())
884 if (recvPUuid == this->pUuid)
896 if (msg.has_flags() && msg.flags().relay())
899 msg.mutable_flags()->set_relay(
false);
900 msg.mutable_flags()->set_no_relay(
true);
901 this->SendMulticast(msg);
905 this->AddRelayAddress(_fromIp);
913 else if (!msg.has_flags() || !msg.flags().no_relay())
915 msg.mutable_flags()->set_relay(
true);
916 this->SendUnicast(msg);
919 bool isSenderLocal = (
std::find(this->hostInterfaces.begin(),
920 this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
921 (_fromIp.
find(
"127.") == 0);
924 DiscoveryCallback<Pub> connectCb;
925 DiscoveryCallback<Pub> disconnectCb;
926 DiscoveryCallback<Pub> registerCb;
927 DiscoveryCallback<Pub> unregisterCb;
931 connectCb = this->connectionCb;
932 disconnectCb = this->disconnectionCb;
933 registerCb = this->registrationCb;
934 unregisterCb = this->unregistrationCb;
939 case msgs::Discovery::ADVERTISE:
943 publisher.SetFromDiscovery(msg);
957 added = this->info.AddPublisher(publisher);
960 if (added && connectCb)
963 connectCb(publisher);
968 case msgs::Discovery::SUBSCRIBE:
974 recvTopic = msg.sub().topic();
978 std::cerr <<
"Subscription discovery message is missing "
979 <<
"Subscriber information.\n";
984 Addresses_M<Pub> addresses;
987 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
992 if (!this->info.Publishers(recvTopic, addresses))
996 for (
const auto &nodeInfo : addresses[this->pUuid])
1008 msgs::Discovery::ADVERTISE, nodeInfo);
1013 case msgs::Discovery::NEW_CONNECTION:
1017 publisher.SetFromDiscovery(msg);
1020 registerCb(publisher);
1024 case msgs::Discovery::END_CONNECTION:
1028 publisher.SetFromDiscovery(msg);
1031 unregisterCb(publisher);
1035 case msgs::Discovery::HEARTBEAT:
1040 case msgs::Discovery::BYE:
1045 this->activity.erase(recvPUuid);
1051 pub.SetPUuid(recvPUuid);
1059 this->info.DelPublishersByProc(recvPUuid);
1064 case msgs::Discovery::UNADVERTISE:
1068 publisher.SetFromDiscovery(msg);
1081 disconnectCb(publisher);
1087 this->info.DelPublisherByNode(publisher.Topic(),
1088 publisher.PUuid(), publisher.NUuid());
1095 std::cerr <<
"Unknown message type [" << msg.type() <<
"].\n";
1107 private:
template<
typename T>
1109 const msgs::Discovery::Type _type,
1110 const T &_pub)
const
1112 gz::msgs::Discovery discoveryMsg;
1113 discoveryMsg.set_version(this->Version());
1114 discoveryMsg.set_type(_type);
1115 discoveryMsg.set_process_uuid(this->pUuid);
1119 case msgs::Discovery::ADVERTISE:
1120 case msgs::Discovery::UNADVERTISE:
1121 case msgs::Discovery::NEW_CONNECTION:
1122 case msgs::Discovery::END_CONNECTION:
1124 _pub.FillDiscovery(discoveryMsg);
1127 case msgs::Discovery::SUBSCRIBE:
1129 discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1132 case msgs::Discovery::HEARTBEAT:
1133 case msgs::Discovery::BYE:
1136 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message"
1137 <<
" type [" << _type <<
"]" <<
std::endl;
1144 this->SendMulticast(discoveryMsg);
1152 discoveryMsg.mutable_flags()->set_relay(
true);
1153 this->SendUnicast(discoveryMsg);
1158 std::cout <<
"\t* Sending " << msgs::ToString(_type)
1159 <<
" msg [" << _pub.Topic() <<
"]" <<
std::endl;
1165 private:
void SendUnicast(
const msgs::Discovery &_msg)
const
1169 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1170 size_t msgSizeFull = _msg.ByteSizeLong();
1172 int msgSizeFull = _msg.ByteSize();
1174 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1176 std::cerr <<
"Discovery message too large to send. Discovery won't "
1177 <<
"work. This shouldn't happen.\n";
1180 msgSize = msgSizeFull;
1182 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1183 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1184 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1186 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1189 for (
const auto &sockAddr : this->relayAddrs)
1192 auto sent = sendto(this->sockets.at(0),
1193 reinterpret_cast<const raw_type *
>(
1194 reinterpret_cast<const unsigned char*
>(buffer)),
1196 reinterpret_cast<const sockaddr *
>(&sockAddr),
1199 if (sent != totalSize)
1210 std::cerr <<
"Discovery::SendUnicast: Error serializing data."
1219 private:
void SendMulticast(
const msgs::Discovery &_msg)
const
1223 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1224 size_t msgSizeFull = _msg.ByteSizeLong();
1226 int msgSizeFull = _msg.ByteSize();
1228 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1230 std::cerr <<
"Discovery message too large to send. Discovery won't "
1231 <<
"work. This shouldn't happen.\n";
1235 msgSize = msgSizeFull;
1236 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1237 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1238 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1240 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1244 for (
const auto &sock : this->Sockets())
1247 if (sendto(sock,
reinterpret_cast<const raw_type *
>(
1248 reinterpret_cast<const unsigned char*
>(buffer)),
1250 reinterpret_cast<const sockaddr *
>(this->MulticastAddr()),
1251 sizeof(*(this->MulticastAddr()))) != totalSize)
1261 if (errno != EPERM && errno != ENOBUFS)
1263 std::cerr <<
"Exception sending a multicast message:"
1272 std::cerr <<
"Discovery::SendMulticast: Error serializing data."
1283 return this->sockets;
1288 private:
const sockaddr_in *MulticastAddr()
const
1290 return &this->mcastAddr;
1295 private: uint8_t Version()
const
1298 static int topicStats;
1300 if (
env(
"GZ_TRANSPORT_TOPIC_STATISTICS", gzStats) && !gzStats.
empty())
1302 topicStats = (gzStats ==
"1");
1305 else if (
env(
"IGN_TRANSPORT_TOPIC_STATISTICS", gzStats)
1306 && !gzStats.
empty())
1308 std::cout <<
"IGN_TRANSPORT_TOPIC_STATISTICS is deprecated! "
1309 <<
"Use GZ_TRANSPORT_TOPIC_STATISTICS instead!"
1311 topicStats = (gzStats ==
"1");
1314 return this->kWireVersion + (topicStats * 100);
1321 private:
bool RegisterNetIface(
const std::string &_ip)
1324 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1334 struct in_addr ifAddr;
1335 ifAddr.s_addr = inet_addr(_ip.
c_str());
1336 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1337 reinterpret_cast<const char*
>(&ifAddr),
sizeof(ifAddr)) != 0)
1339 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)."
1344 this->sockets.push_back(sock);
1349 struct ip_mreq group;
1350 group.imr_multiaddr.s_addr =
1351 inet_addr(this->multicastGroup.c_str());
1352 group.imr_interface.s_addr = inet_addr(_ip.
c_str());
1353 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1354 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1356 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)."
1366 private:
void AddRelayAddress(
const std::string &_ip)
1369 for (
auto const &addr : this->relayAddrs)
1371 if (addr.sin_addr.s_addr == inet_addr(_ip.
c_str()))
1376 memset(&addr, 0,
sizeof(addr));
1377 addr.sin_family = AF_INET;
1378 addr.sin_addr.s_addr = inet_addr(_ip.
c_str());
1379 addr.sin_port = htons(
static_cast<u_short
>(this->port));
1381 this->relayAddrs.push_back(addr);
1387 private:
static const unsigned int kDefActivityInterval = 100;
1392 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1397 private:
static const unsigned int kDefSilenceInterval = 3000;
1403 private:
const int kTimeout = 250;
1406 private:
static const uint16_t kMaxRcvStr =
1411 private:
static const uint8_t kWireVersion = 10;
1428 private:
unsigned int silenceInterval;
1433 private:
unsigned int activityInterval;
1438 private:
unsigned int heartbeatInterval;
1441 private: DiscoveryCallback<Pub> connectionCb;
1444 private: DiscoveryCallback<Pub> disconnectionCb;
1447 private: DiscoveryCallback<Pub> registrationCb;
1450 private: DiscoveryCallback<Pub> unregistrationCb;
1453 private: TopicStorage<Pub> info;
1462 private:
bool verbose;
1468 private: sockaddr_in mcastAddr;
1492 private:
bool initialized;
1495 private:
unsigned int numHeartbeatsUninitialized;
1504 private:
bool enabled;