18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_ 19 #define IGNITION_TRANSPORT_DISCOVERY_HH_ 33 #include <sys/types.h> 35 #include <sys/socket.h> 39 #include <arpa/inet.h> 43 #include <netinet/in.h> 49 #pragma warning(push, 0) 54 #pragma warning(disable: 4503) 56 #pragma warning(disable: 4996) 59 #include <ignition/msgs/discovery.pb.h> 62 #include <condition_variable> 71 #include <ignition/msgs/Utility.hh> 73 #include "ignition/transport/config.hh" 74 #include "ignition/transport/Export.hh" 86 inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
116 template<
typename Pub>
128 const bool _verbose =
false)
129 : multicastGroup(_ip),
133 silenceInterval(kDefSilenceInterval),
134 activityInterval(kDefActivityInterval),
135 heartbeatInterval(kDefHeartbeatInterval),
136 connectionCb(nullptr),
137 disconnectionCb(nullptr),
140 numHeartbeatsUninitialized(0),
145 if (
env(
"IGN_IP", ignIp) && !ignIp.
empty())
146 this->hostInterfaces = {ignIp};
154 WORD wVersionRequested;
158 wVersionRequested = MAKEWORD(2, 2);
160 if (WSAStartup(wVersionRequested, &wsaData) != 0)
166 for (
const auto &netIface : this->hostInterfaces)
168 auto succeed = this->RegisterNetIface(netIface);
173 if (netIface == this->hostAddr && !succeed)
175 this->RegisterNetIface(
"127.0.0.1");
176 std::cerr <<
"Did you set the environment variable IGN_IP with a " 178 <<
" [" << netIface <<
"] seems an invalid local IP " 180 <<
" Using 127.0.0.1 as hostname." <<
std::endl;
181 this->hostAddr =
"127.0.0.1";
190 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
191 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
193 std::cerr <<
"Error setting socket option (SO_REUSEADDR)." 205 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
206 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
208 std::cerr <<
"Error setting socket option (SO_REUSEPORT)." 214 sockaddr_in localAddr;
215 memset(&localAddr, 0,
sizeof(localAddr));
216 localAddr.sin_family = AF_INET;
217 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
218 localAddr.sin_port = htons(static_cast<u_short>(this->port));
220 if (bind(this->sockets.at(0),
221 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
228 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
229 this->mcastAddr.sin_family = AF_INET;
230 this->mcastAddr.sin_addr.s_addr =
231 inet_addr(this->multicastGroup.c_str());
232 this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
236 if (
env(
"IGN_RELAY", ignRelay) && !ignRelay.
empty())
242 for (
auto const &relayAddr : relays)
243 this->AddRelayAddress(relayAddr);
246 this->PrintCurrentState();
253 this->exitMutex.lock();
255 this->exitMutex.unlock();
258 if (this->threadReception.joinable())
259 this->threadReception.join();
267 for (
const auto &sock : this->sockets)
290 this->enabled =
true;
294 this->timeNextHeartbeat = now;
295 this->timeNextActivity = now;
298 this->threadReception =
std::thread(&Discovery::RecvMessages,
this);
314 if (!this->info.AddPublisher(_publisher))
349 cb = this->connectionCb;
353 pub.SetTopic(_topic);
354 pub.SetPUuid(this->pUuid);
361 found = this->info.Publishers(_topic, addresses);
367 for (
const auto &proc : addresses)
369 for (
const auto &node : proc.second)
417 return this->info.Publishers(_topic, _publishers);
438 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
442 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
450 msgs::Discovery::UNADVERTISE, inf);
461 return this->hostAddr;
471 return this->activityInterval;
482 return this->heartbeatInterval;
492 return this->silenceInterval;
501 this->activityInterval = _ms;
510 this->heartbeatInterval = _ms;
519 this->silenceInterval = _ms;
529 this->connectionCb = _cb;
539 this->disconnectionCb = _cb;
548 this->registrationCb = _cb;
557 this->unregistrationCb = _cb;
571 std::cout <<
"\tActivity: " << this->activityInterval
573 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
575 std::cout <<
"\tSilence: " << this->silenceInterval
584 if (this->activity.empty())
588 for (
auto &proc : this->activity)
608 this->info.TopicList(_topics);
617 if (!this->initialized)
619 this->initializedCv.wait(lk, [
this]{
return this->initialized;});
626 private:
void UpdateActivity()
639 if (now < this->timeNextActivity)
642 disconnectCb = this->disconnectionCb;
644 for (
auto it = this->activity.cbegin(); it != this->activity.cend();)
647 auto elapsed = now - it->second;
650 if (std::chrono::duration_cast<std::chrono::milliseconds>
651 (elapsed).count() > this->silenceInterval)
654 this->info.DelPublishersByProc(it->first);
659 this->activity.
erase(it++);
675 for (
auto const &uuid : uuids)
678 publisher.SetPUuid(uuid);
679 disconnectCb(publisher);
684 private:
void UpdateHeartbeat()
691 if (now < this->timeNextHeartbeat)
703 this->info.PublishersByProc(this->pUuid, nodes);
706 for (
const auto &topic : nodes)
708 for (
const auto &node : topic.second)
711 msgs::Discovery::ADVERTISE, node);
717 if (!this->initialized)
719 ++this->numHeartbeatsUninitialized;
720 if (this->numHeartbeatsUninitialized == 2)
724 this->initialized =
true;
727 this->initializedCv.notify_all();
745 private:
int NextTimeout()
const 748 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
749 auto timeUntilNextActivity = this->timeNextActivity - now;
751 int t =
static_cast<int>(
753 (
std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
754 int t2 =
std::min(t, this->kTimeout);
759 private:
void RecvMessages()
761 bool timeToExit =
false;
765 int timeout = this->NextTimeout();
769 this->RecvDiscoveryUpdate();
772 this->PrintCurrentState();
775 this->UpdateHeartbeat();
776 this->UpdateActivity();
788 private:
void RecvDiscoveryUpdate()
790 char rcvStr[Discovery::kMaxRcvStr];
791 sockaddr_in clntAddr;
792 socklen_t addrLen =
sizeof(clntAddr);
794 uint16_t received = recvfrom(this->sockets.at(0),
795 reinterpret_cast<raw_type *
>(rcvStr),
797 reinterpret_cast<sockaddr *>(&clntAddr),
798 reinterpret_cast<socklen_t *
>(&addrLen));
802 memcpy(&len, &rcvStr[0],
sizeof(len));
826 if (len +
sizeof(len) == received)
828 std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
829 uint16_t srcPort = ntohs(clntAddr.sin_port);
833 std::cout <<
"\nReceived discovery update from " 834 << srcAddr <<
": " << srcPort <<
std::endl;
837 this->DispatchDiscoveryMsg(srcAddr, rcvStr +
sizeof(len), len);
840 else if (received < 0)
842 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error" 851 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
852 char *_msg, uint16_t _len)
854 ignition::msgs::Discovery msg;
859 if (!msg.ParseFromArray(_msg, _len))
863 if (this->Version() != msg.version())
869 if (recvPUuid == this->pUuid)
881 if (msg.has_flags() && msg.flags().relay())
884 msg.mutable_flags()->set_relay(
false);
885 msg.mutable_flags()->set_no_relay(
true);
886 this->SendMulticast(msg);
890 this->AddRelayAddress(_fromIp);
898 else if (!msg.has_flags() || !msg.flags().no_relay())
900 msg.mutable_flags()->set_relay(
true);
901 this->SendUnicast(msg);
904 bool isSenderLocal = (
std::find(this->hostInterfaces.begin(),
905 this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
906 (_fromIp.
find(
"127.") == 0);
916 connectCb = this->connectionCb;
917 disconnectCb = this->disconnectionCb;
918 registerCb = this->registrationCb;
919 unregisterCb = this->unregistrationCb;
924 case msgs::Discovery::ADVERTISE:
928 publisher.SetFromDiscovery(msg);
942 added = this->info.AddPublisher(publisher);
945 if (added && connectCb)
948 connectCb(publisher);
953 case msgs::Discovery::SUBSCRIBE:
959 recvTopic = msg.sub().topic();
963 std::cerr <<
"Subscription discovery message is missing " 964 <<
"Subscriber information.\n";
972 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
977 if (!this->info.Publishers(recvTopic, addresses))
981 for (
const auto &nodeInfo : addresses[this->pUuid])
993 msgs::Discovery::ADVERTISE, nodeInfo);
998 case msgs::Discovery::NEW_CONNECTION:
1002 publisher.SetFromDiscovery(msg);
1005 registerCb(publisher);
1009 case msgs::Discovery::END_CONNECTION:
1013 publisher.SetFromDiscovery(msg);
1016 unregisterCb(publisher);
1020 case msgs::Discovery::HEARTBEAT:
1025 case msgs::Discovery::BYE:
1030 this->activity.erase(recvPUuid);
1036 pub.SetPUuid(recvPUuid);
1044 this->info.DelPublishersByProc(recvPUuid);
1049 case msgs::Discovery::UNADVERTISE:
1053 publisher.SetFromDiscovery(msg);
1066 disconnectCb(publisher);
1072 this->info.DelPublisherByNode(publisher.Topic(),
1073 publisher.PUuid(), publisher.NUuid());
1080 std::cerr <<
"Unknown message type [" << msg.type() <<
"].\n";
1092 private:
template<
typename T>
1094 const msgs::Discovery::Type _type,
1095 const T &_pub)
const 1097 ignition::msgs::Discovery discoveryMsg;
1098 discoveryMsg.set_version(this->Version());
1099 discoveryMsg.set_type(_type);
1100 discoveryMsg.set_process_uuid(this->pUuid);
1104 case msgs::Discovery::ADVERTISE:
1105 case msgs::Discovery::UNADVERTISE:
1106 case msgs::Discovery::NEW_CONNECTION:
1107 case msgs::Discovery::END_CONNECTION:
1109 _pub.FillDiscovery(discoveryMsg);
1112 case msgs::Discovery::SUBSCRIBE:
1114 discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1117 case msgs::Discovery::HEARTBEAT:
1118 case msgs::Discovery::BYE:
1121 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message" 1122 <<
" type [" << _type <<
"]" <<
std::endl;
1129 this->SendMulticast(discoveryMsg);
1137 discoveryMsg.mutable_flags()->set_relay(
true);
1138 this->SendUnicast(discoveryMsg);
1143 std::cout <<
"\t* Sending " << msgs::ToString(_type)
1144 <<
" msg [" << _pub.Topic() <<
"]" <<
std::endl;
1150 private:
void SendUnicast(
const msgs::Discovery &_msg)
const 1154 #if GOOGLE_PROTOBUF_VERSION >= 3004000 1155 size_t msgSizeFull = _msg.ByteSizeLong();
1157 int msgSizeFull = _msg.ByteSize();
1159 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1161 std::cerr <<
"Discovery message too large to send. Discovery won't " 1162 <<
"work. This shouldn't happen.\n";
1165 msgSize = msgSizeFull;
1167 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1168 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1169 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1171 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1174 for (
const auto &sockAddr : this->relayAddrs)
1177 auto sent = sendto(this->sockets.at(0),
1178 reinterpret_cast<const raw_type *
>(
1179 reinterpret_cast<const unsigned char*
>(buffer)),
1181 reinterpret_cast<const sockaddr *
>(&sockAddr),
1184 if (sent != totalSize)
1195 std::cerr <<
"Discovery::SendUnicast: Error serializing data." 1204 private:
void SendMulticast(
const msgs::Discovery &_msg)
const 1208 #if GOOGLE_PROTOBUF_VERSION >= 3004000 1209 size_t msgSizeFull = _msg.ByteSizeLong();
1211 int msgSizeFull = _msg.ByteSize();
1213 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1215 std::cerr <<
"Discovery message too large to send. Discovery won't " 1216 <<
"work. This shouldn't happen.\n";
1220 msgSize = msgSizeFull;
1221 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1222 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1223 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1225 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1229 for (
const auto &sock : this->Sockets())
1232 if (sendto(sock, reinterpret_cast<const raw_type *>(
1233 reinterpret_cast<const unsigned char*>(buffer)),
1235 reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1236 sizeof(*(this->MulticastAddr()))) != totalSize)
1246 if (errno != EPERM && errno != ENOBUFS)
1248 std::cerr <<
"Exception sending a multicast message:" 1257 std::cerr <<
"Discovery::SendMulticast: Error serializing data." 1268 return this->sockets;
1273 private:
const sockaddr_in *MulticastAddr()
const 1275 return &this->mcastAddr;
1280 private: uint8_t Version()
const 1283 static int topicStats =
1284 (
env(
"IGN_TRANSPORT_TOPIC_STATISTICS", ignStats) && ignStats ==
"1");
1285 return this->kWireVersion + (topicStats * 100);
1292 private:
bool RegisterNetIface(
const std::string &_ip)
1295 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1305 struct in_addr ifAddr;
1306 ifAddr.s_addr = inet_addr(_ip.
c_str());
1307 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1308 reinterpret_cast<const char*>(&ifAddr),
sizeof(ifAddr)) != 0)
1310 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)." 1315 this->sockets.push_back(sock);
1320 struct ip_mreq group;
1321 group.imr_multiaddr.s_addr =
1322 inet_addr(this->multicastGroup.c_str());
1323 group.imr_interface.s_addr = inet_addr(_ip.
c_str());
1324 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1325 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1327 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)." 1337 private:
void AddRelayAddress(
const std::string &_ip)
1340 for (
auto const &addr : this->relayAddrs)
1342 if (addr.sin_addr.s_addr == inet_addr(_ip.
c_str()))
1347 memset(&addr, 0,
sizeof(addr));
1348 addr.sin_family = AF_INET;
1349 addr.sin_addr.s_addr = inet_addr(_ip.
c_str());
1350 addr.sin_port = htons(static_cast<u_short>(this->port));
1352 this->relayAddrs.push_back(addr);
1358 private:
static const unsigned int kDefActivityInterval = 100;
1363 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1368 private:
static const unsigned int kDefSilenceInterval = 3000;
1374 private:
const int kTimeout = 250;
1377 private:
static const uint16_t kMaxRcvStr =
1382 private:
static const uint8_t kWireVersion = 10;
1399 private:
unsigned int silenceInterval;
1404 private:
unsigned int activityInterval;
1409 private:
unsigned int heartbeatInterval;
1435 private:
bool verbose;
1441 private: sockaddr_in mcastAddr;
1465 private:
bool initialized;
1468 private:
unsigned int numHeartbeatsUninitialized;
1477 private:
bool enabled;
Topic/service available to any subscriber (default scope).
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:427
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1432
void Unregister(const MessagePublisher &_pub) const
Unregister a node from this process as a remote subscriber.
Definition: Discovery.hh:395
This class stores all the information about a publisher. It stores the topic name that publishes...
Definition: Publisher.hh:44
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp/src/libros/network.cpp.
Send data via multicast only.
Topic/service only available to subscribers in the same machine as the publisher. ...
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:281
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:613
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:479
void raw_type
Definition: Discovery.hh:45
T duration_cast(T... args)
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:117
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:604
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:516
void UnregistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a remote node unsubscribes to a topic within this proces...
Definition: Discovery.hh:554
void Register(const MessagePublisher &_pub) const
Register a node from this process as a remote subscriber.
Definition: Discovery.hh:387
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:250
A class for customizing the publication options for a topic or service advertised. E.g.: Set the scope of a topic/service.
Definition: AdvertiseOptions.hh:59
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:403
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: Discovery.hh:413
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:458
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds...
Definition: Discovery.hh:468
std::vector< std::string > split(const std::string &_orig, char _delim)
split at a one character delimiter to get a vector of something
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:489
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:561
Discovery(const std::string &_pUuid, const std::string &_ip, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:125
This class stores all the information about a message publisher.
Definition: Publisher.hh:190
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active...
Definition: Discovery.hh:536
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected...
Definition: Discovery.hh:526
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:305
Topic/service only available to subscribers in the same process as the publisher. ...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:155
DestinationType
Options for sending discovery messages.
Definition: Discovery.hh:88
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:498
void RegistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a new remote node subscribes to a topic within this proc...
Definition: Discovery.hh:545
Send data via unicast only.
Definition: AdvertiseOptions.hh:28
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:507
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:337