128 const bool _verbose =
false)
129 : multicastGroup(_ip),
133 silenceInterval(kDefSilenceInterval),
134 activityInterval(kDefActivityInterval),
135 heartbeatInterval(kDefHeartbeatInterval),
136 connectionCb(nullptr),
137 disconnectionCb(nullptr),
140 numHeartbeatsUninitialized(0),
145 if (
env(
"GZ_IP", gzIp) && !gzIp.
empty())
147 this->hostInterfaces = {gzIp};
156 WORD wVersionRequested;
160 wVersionRequested = MAKEWORD(2, 2);
162 if (WSAStartup(wVersionRequested, &wsaData) != 0)
168 for (
const auto &netIface : this->hostInterfaces)
170 auto succeed = this->RegisterNetIface(netIface);
175 if (netIface == this->hostAddr && !succeed)
177 this->RegisterNetIface(
"127.0.0.1");
178 std::cerr <<
"Did you set the environment variable GZ_IP with a "
180 <<
" [" << netIface <<
"] seems an invalid local IP "
182 <<
" Using 127.0.0.1 as hostname." <<
std::endl;
183 this->hostAddr =
"127.0.0.1";
192 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
193 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
195 std::cerr <<
"Error setting socket option (SO_REUSEADDR)."
207 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
208 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
210 std::cerr <<
"Error setting socket option (SO_REUSEPORT)."
216 sockaddr_in localAddr;
217 memset(&localAddr, 0,
sizeof(localAddr));
218 localAddr.sin_family = AF_INET;
219 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
220 localAddr.sin_port = htons(
static_cast<u_short
>(this->port));
222 if (bind(this->sockets.at(0),
223 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
230 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
231 this->mcastAddr.sin_family = AF_INET;
232 this->mcastAddr.sin_addr.s_addr =
233 inet_addr(this->multicastGroup.c_str());
234 this->mcastAddr.sin_port = htons(
static_cast<u_short
>(this->port));
238 if (
env(
"GZ_RELAY", gzRelay) && !gzRelay.
empty())
240 relays = transport::split(gzRelay,
':');
244 for (
auto const &relayAddr : relays)
245 this->AddRelayAddress(relayAddr);
248 this->PrintCurrentState();
255 this->exitMutex.lock();
257 this->exitMutex.unlock();
260 if (this->threadReception.joinable())
261 this->threadReception.join();
265 this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
269 for (
const auto &sock : this->sockets)
292 this->enabled =
true;
296 this->timeNextHeartbeat = now;
297 this->timeNextActivity = now;
300 this->threadReception =
std::thread(&Discovery::RecvMessages,
this);
318 if (!this->info.AddPublisher(_publisher))
321 cb = this->connectionCb;
329 if (_publisher.Options().Scope() != Scope_t::PROCESS)
330 this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
358 cb = this->connectionCb;
362 pub.SetTopic(_topic);
363 pub.SetPUuid(this->pUuid);
366 this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
370 found = this->info.Publishers(_topic, addresses);
376 for (
const auto &proc : addresses)
378 for (
const auto &node : proc.second)
399 DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REP, _pub);
407 DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
415 DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
434 return this->info.Publishers(_topic, _publishers);
445 return this->remoteSubscribers.Publishers(_topic, _subscribers);
466 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
470 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
475 if (inf.Options().Scope() != Scope_t::PROCESS)
477 this->SendMsg(DestinationType::ALL,
478 msgs::Discovery::UNADVERTISE, inf);
489 return this->hostAddr;
499 return this->activityInterval;
510 return this->heartbeatInterval;
520 return this->silenceInterval;
529 this->activityInterval = _ms;
538 this->heartbeatInterval = _ms;
547 this->silenceInterval = _ms;
557 this->connectionCb = _cb;
567 this->disconnectionCb = _cb;
576 this->registrationCb = _cb;
585 this->unregistrationCb = _cb;
594 this->subscribersCb = _cb;
608 std::cout <<
"\tActivity: " << this->activityInterval
610 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
612 std::cout <<
"\tSilence: " << this->silenceInterval
621 if (this->activity.empty())
625 for (
auto &proc : this->activity)
646 this->remoteSubscribers.Clear();
652 DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REQ, pub);
656 this->info.TopicList(_topics);
659 this->remoteSubscribers.TopicList(remoteSubs);
662 for (
auto const &t : remoteSubs)
677 if (!this->initialized)
679 this->initializedCv.wait(lk, [
this]{
return this->initialized;});
686 private:
void UpdateActivity()
699 if (now < this->timeNextActivity)
702 disconnectCb = this->disconnectionCb;
704 for (
auto it = this->activity.cbegin(); it != this->activity.cend();)
707 auto elapsed = now - it->second;
710 if (std::chrono::duration_cast<std::chrono::milliseconds>
711 (elapsed).count() > this->silenceInterval)
714 this->info.DelPublishersByProc(it->first);
719 this->activity.erase(it++);
735 for (
auto const &uuid : uuids)
738 publisher.SetPUuid(uuid);
739 disconnectCb(publisher);
749 for (
auto const &addr : this->relayAddrs)
751 if (addr.sin_addr.s_addr == inet_addr(_ip.
c_str()))
756 memset(&addr, 0,
sizeof(addr));
757 addr.sin_family = AF_INET;
758 addr.sin_addr.s_addr = inet_addr(_ip.
c_str());
759 addr.sin_port = htons(
static_cast<u_short
>(this->port));
761 this->relayAddrs.push_back(addr);
772 for (
auto const &addr : this->relayAddrs) {
773 result.
push_back(inet_ntoa(addr.sin_addr));
780 private:
void UpdateHeartbeat()
787 if (now < this->timeNextHeartbeat)
791 Publisher pub(
"",
"", this->pUuid,
"", AdvertiseOptions());
792 this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
799 this->info.PublishersByProc(this->pUuid, nodes);
802 for (
const auto &topic : nodes)
804 for (
const auto &node : topic.second)
806 this->SendMsg(DestinationType::ALL,
807 msgs::Discovery::ADVERTISE, node);
813 if (!this->initialized)
815 if (this->numHeartbeatsUninitialized == 2u)
818 this->initialized =
true;
821 this->initializedCv.notify_all();
823 ++this->numHeartbeatsUninitialized;
840 private:
int NextTimeout()
const
843 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
844 auto timeUntilNextActivity = this->timeNextActivity - now;
846 int t =
static_cast<int>(
847 std::chrono::duration_cast<std::chrono::milliseconds>
848 (
std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
849 int t2 =
std::min(t, this->kTimeout);
854 private:
void RecvMessages()
856 bool timeToExit =
false;
860 int timeout = this->NextTimeout();
864 this->RecvDiscoveryUpdate();
867 this->PrintCurrentState();
870 this->UpdateHeartbeat();
871 this->UpdateActivity();
883 private:
void RecvDiscoveryUpdate()
885 char rcvStr[Discovery::kMaxRcvStr];
886 sockaddr_in clntAddr;
887 socklen_t addrLen =
sizeof(clntAddr);
889 int64_t received = recvfrom(this->sockets.at(0),
890 reinterpret_cast<raw_type *
>(rcvStr),
892 reinterpret_cast<sockaddr *
>(&clntAddr),
893 reinterpret_cast<socklen_t *
>(&addrLen));
897 memcpy(&len, &rcvStr[0],
sizeof(len));
921 if (len +
sizeof(len) ==
static_cast<uint16_t
>(received))
923 std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
924 uint16_t srcPort = ntohs(clntAddr.sin_port);
928 std::cout <<
"\nReceived discovery update from "
929 << srcAddr <<
": " << srcPort <<
std::endl;
932 this->DispatchDiscoveryMsg(srcAddr, rcvStr +
sizeof(len), len);
935 else if (received < 0)
937 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error"
946 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
947 char *_msg, uint16_t _len)
949 gz::msgs::Discovery msg;
954 if (!msg.ParseFromArray(_msg, _len))
958 if (this->Version() != msg.version())
964 if (recvPUuid == this->pUuid)
976 if (msg.has_flags() && msg.flags().relay())
979 msg.mutable_flags()->set_relay(
false);
980 msg.mutable_flags()->set_no_relay(
true);
981 this->SendMulticast(msg);
985 this->AddRelayAddress(_fromIp);
993 else if (!msg.has_flags() || !msg.flags().no_relay())
995 msg.mutable_flags()->set_relay(
true);
996 this->SendUnicast(msg);
999 bool isSenderLocal = (
std::find(this->hostInterfaces.begin(),
1000 this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
1001 (_fromIp.
find(
"127.") == 0);
1004 DiscoveryCallback<Pub> connectCb;
1005 DiscoveryCallback<Pub> disconnectCb;
1006 DiscoveryCallback<Pub> registerCb;
1007 DiscoveryCallback<Pub> unregisterCb;
1012 connectCb = this->connectionCb;
1013 disconnectCb = this->disconnectionCb;
1014 registerCb = this->registrationCb;
1015 unregisterCb = this->unregistrationCb;
1016 subscribersReqCb = this->subscribersCb;
1021 case msgs::Discovery::ADVERTISE:
1025 publisher.SetFromDiscovery(msg);
1028 if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1029 (publisher.Options().Scope() == Scope_t::HOST &&
1039 added = this->info.AddPublisher(publisher);
1042 if (added && connectCb)
1045 connectCb(publisher);
1050 case msgs::Discovery::SUBSCRIBE:
1056 recvTopic = msg.sub().topic();
1060 std::cerr <<
"Subscription discovery message is missing "
1061 <<
"Subscriber information.\n";
1066 Addresses_M<Pub> addresses;
1069 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
1074 if (!this->info.Publishers(recvTopic, addresses))
1078 for (
const auto &nodeInfo : addresses[this->pUuid])
1081 if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
1082 (nodeInfo.Options().Scope() == Scope_t::HOST &&
1089 this->SendMsg(DestinationType::ALL,
1090 msgs::Discovery::ADVERTISE, nodeInfo);
1095 case msgs::Discovery::SUBSCRIBERS_REQ:
1097 if (subscribersReqCb)
1102 case msgs::Discovery::SUBSCRIBERS_REP:
1106 publisher.SetFromDiscovery(msg);
1110 this->remoteSubscribers.AddPublisher(publisher);
1114 case msgs::Discovery::NEW_CONNECTION:
1118 publisher.SetFromDiscovery(msg);
1121 registerCb(publisher);
1125 case msgs::Discovery::END_CONNECTION:
1129 publisher.SetFromDiscovery(msg);
1132 unregisterCb(publisher);
1136 case msgs::Discovery::HEARTBEAT:
1141 case msgs::Discovery::BYE:
1146 this->activity.erase(recvPUuid);
1152 pub.SetPUuid(recvPUuid);
1160 this->info.DelPublishersByProc(recvPUuid);
1165 case msgs::Discovery::UNADVERTISE:
1169 publisher.SetFromDiscovery(msg);
1172 if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1173 (publisher.Options().Scope() == Scope_t::HOST &&
1182 disconnectCb(publisher);
1188 this->info.DelPublisherByNode(publisher.Topic(),
1189 publisher.PUuid(), publisher.NUuid());
1196 std::cerr <<
"Unknown message type [" << msg.type() <<
"].\n";
1208 private:
template<
typename T>
1209 void SendMsg(
const DestinationType &_destType,
1210 const msgs::Discovery::Type _type,
1211 const T &_pub)
const
1213 gz::msgs::Discovery discoveryMsg;
1214 discoveryMsg.set_version(this->Version());
1215 discoveryMsg.set_type(_type);
1216 discoveryMsg.set_process_uuid(this->pUuid);
1217 _pub.FillDiscovery(discoveryMsg);
1221 case msgs::Discovery::ADVERTISE:
1222 case msgs::Discovery::UNADVERTISE:
1223 case msgs::Discovery::NEW_CONNECTION:
1224 case msgs::Discovery::END_CONNECTION:
1226 _pub.FillDiscovery(discoveryMsg);
1229 case msgs::Discovery::SUBSCRIBE:
1231 discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1234 case msgs::Discovery::HEARTBEAT:
1235 case msgs::Discovery::BYE:
1236 case msgs::Discovery::SUBSCRIBERS_REQ:
1237 case msgs::Discovery::SUBSCRIBERS_REP:
1240 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message"
1241 <<
" type [" << _type <<
"]" <<
std::endl;
1245 if (_destType == DestinationType::MULTICAST ||
1246 _destType == DestinationType::ALL)
1248 this->SendMulticast(discoveryMsg);
1252 if (_destType == DestinationType::UNICAST ||
1253 _destType == DestinationType::ALL)
1256 discoveryMsg.mutable_flags()->set_relay(
true);
1257 this->SendUnicast(discoveryMsg);
1262 std::cout <<
"\t* Sending " << msgs::ToString(_type)
1263 <<
" msg [" << _pub.Topic() <<
"]" <<
std::endl;
1269 private:
void SendUnicast(
const msgs::Discovery &_msg)
const
1273#if GOOGLE_PROTOBUF_VERSION >= 3004000
1274 size_t msgSizeFull = _msg.ByteSizeLong();
1276 int msgSizeFull = _msg.ByteSize();
1278 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1280 std::cerr <<
"Discovery message too large to send. Discovery won't "
1281 <<
"work. This shouldn't happen.\n";
1284 msgSize = msgSizeFull;
1286 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1287 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1288 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1290 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1295 for (
const auto &sockAddr : this->relayAddrs)
1298 auto sent = sendto(this->sockets.at(0),
1299 reinterpret_cast<const raw_type *
>(
1300 reinterpret_cast<const unsigned char*
>(buffer)),
1302 reinterpret_cast<const sockaddr *
>(&sockAddr),
1305 if (sent != totalSize)
1316 std::cerr <<
"Discovery::SendUnicast: Error serializing data."
1325 private:
void SendMulticast(
const msgs::Discovery &_msg)
const
1329#if GOOGLE_PROTOBUF_VERSION >= 3004000
1330 size_t msgSizeFull = _msg.ByteSizeLong();
1332 int msgSizeFull = _msg.ByteSize();
1334 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1336 std::cerr <<
"Discovery message too large to send. Discovery won't "
1337 <<
"work. This shouldn't happen.\n";
1341 msgSize = msgSizeFull;
1342 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1343 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1344 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1346 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1350 for (
const auto &sock : this->Sockets())
1353 if (sendto(sock,
reinterpret_cast<const raw_type *
>(
1354 reinterpret_cast<const unsigned char*
>(buffer)),
1356 reinterpret_cast<const sockaddr *
>(this->MulticastAddr()),
1357 sizeof(*(this->MulticastAddr()))) != totalSize)
1367 if (errno != EPERM && errno != ENOBUFS)
1369 std::cerr <<
"Exception sending a multicast message:"
1378 std::cerr <<
"Discovery::SendMulticast: Error serializing data."
1389 return this->sockets;
1394 private:
const sockaddr_in *MulticastAddr()
const
1396 return &this->mcastAddr;
1401 private: uint8_t Version()
const
1404 static int topicStats;
1406 if (
env(
"GZ_TRANSPORT_TOPIC_STATISTICS", gzStats) && !gzStats.
empty())
1408 topicStats = (gzStats ==
"1");
1411 return this->kWireVersion + (topicStats * 100);
1418 private:
bool RegisterNetIface(
const std::string &_ip)
1421 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1431 struct in_addr ifAddr;
1432 ifAddr.s_addr = inet_addr(_ip.
c_str());
1433 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1434 reinterpret_cast<const char*
>(&ifAddr),
sizeof(ifAddr)) != 0)
1436 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)."
1441 this->sockets.push_back(sock);
1446 struct ip_mreq group;
1447 group.imr_multiaddr.s_addr =
1448 inet_addr(this->multicastGroup.c_str());
1449 group.imr_interface.s_addr = inet_addr(_ip.
c_str());
1450 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1451 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1453 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)."
1464 private:
static const unsigned int kDefActivityInterval = 100;
1469 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1474 private:
static const unsigned int kDefSilenceInterval = 3000;
1480 private:
const int kTimeout = 250;
1483 private:
static const uint16_t kMaxRcvStr =
1488 private:
static const uint8_t kWireVersion = 10;
1505 private:
unsigned int silenceInterval;
1510 private:
unsigned int activityInterval;
1515 private:
unsigned int heartbeatInterval;
1518 private: DiscoveryCallback<Pub> connectionCb;
1521 private: DiscoveryCallback<Pub> disconnectionCb;
1524 private: DiscoveryCallback<Pub> registrationCb;
1527 private: DiscoveryCallback<Pub> unregistrationCb;
1533 private: TopicStorage<Pub> info;
1536 private: TopicStorage<Pub> remoteSubscribers;
1545 private:
bool verbose;
1551 private: sockaddr_in mcastAddr;
1575 private:
bool initialized;
1578 private:
unsigned int numHeartbeatsUninitialized;
1587 private:
bool enabled;