Gazebo Transport

API Reference

8.5.0
gz/transport/Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GZ_TRANSPORT_DISCOVERY_HH_
19 #define GZ_TRANSPORT_DISCOVERY_HH_
20 #include <errno.h>
21 #include <string.h>
22 
23 #ifdef _WIN32
24  // For socket(), connect(), send(), and recv().
25  #include <Winsock2.h>
26  #include <Ws2def.h>
27  #include <Ws2ipdef.h>
28  #include <Ws2tcpip.h>
29  // Type used for raw data on this platform.
30  using raw_type = char;
31 #else
32  // For data types
33  #include <sys/types.h>
34  // For socket(), connect(), send(), and recv()
35  #include <sys/socket.h>
36  // For gethostbyname()
37  #include <netdb.h>
38  // For inet_addr()
39  #include <arpa/inet.h>
40  // For close()
41  #include <unistd.h>
42  // For sockaddr_in
43  #include <netinet/in.h>
44  // Type used for raw data on this platform
45  using raw_type = void;
46 #endif
47 
48 #ifdef _WIN32
49  #pragma warning(push, 0)
50 #endif
51 #ifdef _WIN32
52  #pragma warning(pop)
53  // Suppress "decorated name length exceed" warning in STL.
54  #pragma warning(disable: 4503)
55  // Suppress "depreted API warnings" in WINSOCK.
56  #pragma warning(disable: 4996)
57 #endif
58 
59 #include <gz/msgs/discovery.pb.h>
60 
61 #include <algorithm>
62 #include <condition_variable>
63 #include <limits>
64 #include <map>
65 #include <memory>
66 #include <mutex>
67 #include <string>
68 #include <thread>
69 #include <vector>
70 
71 #include <gz/msgs/Utility.hh>
72 
73 #include "gz/transport/config.hh"
74 #include "gz/transport/Export.hh"
75 #include "gz/transport/Helpers.hh"
76 #include "gz/transport/NetUtils.hh"
80 
81 namespace ignition
82 {
83  namespace transport
84  {
85  // Inline bracket to help doxygen filtering.
86  inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
88  enum class DestinationType
89  {
91  UNICAST,
93  MULTICAST,
95  ALL
96  };
97 
98  //
104  bool IGNITION_TRANSPORT_VISIBLE pollSockets(
105  const std::vector<int> &_sockets,
106  const int _timeout);
107 
116  template<typename Pub>
117  class Discovery
118  {
124  public: Discovery(const std::string &_pUuid,
125  const int _port,
126  const bool _verbose = false)
127  : port(_port),
128  hostAddr(determineHost()),
129  pUuid(_pUuid),
130  silenceInterval(kDefSilenceInterval),
131  activityInterval(kDefActivityInterval),
132  heartbeatInterval(kDefHeartbeatInterval),
133  connectionCb(nullptr),
134  disconnectionCb(nullptr),
135  verbose(_verbose),
136  initialized(false),
137  numHeartbeatsUninitialized(0),
138  exit(false),
139  enabled(false)
140  {
141  std::string ignIp;
142  if (env("IGN_IP", ignIp) && !ignIp.empty())
143  this->hostInterfaces = {ignIp};
144  else
145  {
146  // Get the list of network interfaces in this host.
147  this->hostInterfaces = determineInterfaces();
148  }
149 
150 #ifdef _WIN32
151  WORD wVersionRequested;
152  WSADATA wsaData;
153 
154  // Request WinSock v2.2.
155  wVersionRequested = MAKEWORD(2, 2);
156  // Load WinSock DLL.
157  if (WSAStartup(wVersionRequested, &wsaData) != 0)
158  {
159  std::cerr << "Unable to load WinSock DLL" << std::endl;
160  return;
161  }
162 #endif
163  for (const auto &netIface : this->hostInterfaces)
164  {
165  auto succeed = this->RegisterNetIface(netIface);
166 
167  // If the IP address that we're selecting as the main IP address of
168  // the host is invalid, we change it to 127.0.0.1 .
169  // This is probably because IGN_IP is set to a wrong value.
170  if (netIface == this->hostAddr && !succeed)
171  {
172  this->RegisterNetIface("127.0.0.1");
173  std::cerr << "Did you set the environment variable IGN_IP with a "
174  << "correct IP address? " << std::endl
175  << " [" << netIface << "] seems an invalid local IP "
176  << "address." << std::endl
177  << " Using 127.0.0.1 as hostname." << std::endl;
178  this->hostAddr = "127.0.0.1";
179  }
180  }
181 
182  // Socket option: SO_REUSEADDR. This options is used only for receiving
183  // data. We can reuse the same socket for receiving multicast data from
184  // multiple interfaces. We will use the socket at position 0 for
185  // receiving data.
186  int reuseAddr = 1;
187  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
188  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
189  {
190  std::cerr << "Error setting socket option (SO_REUSEADDR)."
191  << std::endl;
192  return;
193  }
194 
195 #ifdef SO_REUSEPORT
196  // Socket option: SO_REUSEPORT. This options is used only for receiving
197  // data. We can reuse the same socket for receiving multicast data from
198  // multiple interfaces. We will use the socket at position 0 for
199  // receiving data.
200  int reusePort = 1;
201  // cppcheck-suppress ConfigurationNotChecked
202  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
203  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
204  {
205  std::cerr << "Error setting socket option (SO_REUSEPORT)."
206  << std::endl;
207  return;
208  }
209 #endif
210  // Bind the first socket to the discovery port.
211  sockaddr_in localAddr;
212  memset(&localAddr, 0, sizeof(localAddr));
213  localAddr.sin_family = AF_INET;
214  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
215  localAddr.sin_port = htons(static_cast<u_short>(this->port));
216 
217  if (bind(this->sockets.at(0),
218  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
219  {
220  std::cerr << "Binding to a local port failed." << std::endl;
221  return;
222  }
223 
224  // Set 'mcastAddr' to the multicast discovery group.
225  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
226  this->mcastAddr.sin_family = AF_INET;
227  this->mcastAddr.sin_addr.s_addr =
228  inet_addr(this->kMulticastGroup.c_str());
229  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
230 
232  std::string ignRelay = "";
233  if (env("IGN_RELAY", ignRelay) && !ignRelay.empty())
234  {
235  relays = transport::split(ignRelay, ':');
236  }
237 
238  // Register all unicast relays.
239  for (auto const &relayAddr : relays)
240  this->AddRelayAddress(relayAddr);
241 
242  if (this->verbose)
243  this->PrintCurrentState();
244  }
245 
247  public: virtual ~Discovery()
248  {
249  // Tell the service thread to terminate.
250  this->exitMutex.lock();
251  this->exit = true;
252  this->exitMutex.unlock();
253 
254  // Wait for the service threads to finish before exit.
255  if (this->threadReception.joinable())
256  this->threadReception.join();
257 
258  // Broadcast a BYE message to trigger the remote cancellation of
259  // all our advertised topics.
260  this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
261  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
262 
263  // Close sockets.
264  for (const auto &sock : this->sockets)
265  {
266 #ifdef _WIN32
267  closesocket(sock);
268  WSACleanup();
269 #else
270  close(sock);
271 #endif
272  }
273  }
274 
278  public: void Start()
279  {
280  {
281  std::lock_guard<std::mutex> lock(this->mutex);
282 
283  // The service is already running.
284  if (this->enabled)
285  return;
286 
287  this->enabled = true;
288  }
289 
290  auto now = std::chrono::steady_clock::now();
291  this->timeNextHeartbeat = now;
292  this->timeNextActivity = now;
293 
294  // Start the thread that receives discovery information.
295  this->threadReception = std::thread(&Discovery::RecvMessages, this);
296  }
297 
302  public: bool Advertise(const Pub &_publisher)
303  {
304  {
305  std::lock_guard<std::mutex> lock(this->mutex);
306 
307  if (!this->enabled)
308  return false;
309 
310  // Add the addressing information (local publisher).
311  if (!this->info.AddPublisher(_publisher))
312  return false;
313  }
314 
315  // Only advertise a message outside this process if the scope
316  // is not 'Process'
317  if (_publisher.Options().Scope() != Scope_t::PROCESS)
318  this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
319  _publisher);
320 
321  return true;
322  }
323 
334  public: bool Discover(const std::string &_topic) const
335  {
337  bool found;
338  Addresses_M<Pub> addresses;
339 
340  {
341  std::lock_guard<std::mutex> lock(this->mutex);
342 
343  if (!this->enabled)
344  return false;
345 
346  cb = this->connectionCb;
347  }
348 
349  Pub pub;
350  pub.SetTopic(_topic);
351  pub.SetPUuid(this->pUuid);
352 
353  // Send a discovery request.
354  this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
355 
356  {
357  std::lock_guard<std::mutex> lock(this->mutex);
358  found = this->info.Publishers(_topic, addresses);
359  }
360 
361  if (found)
362  {
363  // I already have information about this topic.
364  for (const auto &proc : addresses)
365  {
366  for (const auto &node : proc.second)
367  {
368  if (cb)
369  {
370  // Execute the user's callback for a service request. Notice
371  // that we only execute one callback for preventing receive
372  // multiple service responses for a single request.
373  cb(node);
374  }
375  }
376  }
377  }
378 
379  return true;
380  }
381 
384  public: void Register(const MessagePublisher &_pub) const
385  {
386  this->SendMsg(
387  DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
388  }
389 
392  public: void Unregister(const MessagePublisher &_pub) const
393  {
394  this->SendMsg(
395  DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
396  }
397 
400  public: const TopicStorage<Pub> &Info() const
401  {
402  std::lock_guard<std::mutex> lock(this->mutex);
403  return this->info;
404  }
405 
410  public: bool Publishers(const std::string &_topic,
411  Addresses_M<Pub> &_publishers) const
412  {
413  std::lock_guard<std::mutex> lock(this->mutex);
414  return this->info.Publishers(_topic, _publishers);
415  }
416 
424  public: bool Unadvertise(const std::string &_topic,
425  const std::string &_nUuid)
426  {
427  Pub inf;
428  {
429  std::lock_guard<std::mutex> lock(this->mutex);
430 
431  if (!this->enabled)
432  return false;
433 
434  // Don't do anything if the topic is not advertised by any of my nodes
435  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
436  return true;
437 
438  // Remove the topic information.
439  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
440  }
441 
442  // Only unadvertise a message outside this process if the scope
443  // is not 'Process'.
444  if (inf.Options().Scope() != Scope_t::PROCESS)
445  {
446  this->SendMsg(DestinationType::ALL,
447  msgs::Discovery::UNADVERTISE, inf);
448  }
449 
450  return true;
451  }
452 
455  public: std::string HostAddr() const
456  {
457  std::lock_guard<std::mutex> lock(this->mutex);
458  return this->hostAddr;
459  }
460 
465  public: unsigned int ActivityInterval() const
466  {
467  std::lock_guard<std::mutex> lock(this->mutex);
468  return this->activityInterval;
469  }
470 
476  public: unsigned int HeartbeatInterval() const
477  {
478  std::lock_guard<std::mutex> lock(this->mutex);
479  return this->heartbeatInterval;
480  }
481 
486  public: unsigned int SilenceInterval() const
487  {
488  std::lock_guard<std::mutex> lock(this->mutex);
489  return this->silenceInterval;
490  }
491 
495  public: void SetActivityInterval(const unsigned int _ms)
496  {
497  std::lock_guard<std::mutex> lock(this->mutex);
498  this->activityInterval = _ms;
499  }
500 
504  public: void SetHeartbeatInterval(const unsigned int _ms)
505  {
506  std::lock_guard<std::mutex> lock(this->mutex);
507  this->heartbeatInterval = _ms;
508  }
509 
513  public: void SetSilenceInterval(const unsigned int _ms)
514  {
515  std::lock_guard<std::mutex> lock(this->mutex);
516  this->silenceInterval = _ms;
517  }
518 
523  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
524  {
525  std::lock_guard<std::mutex> lock(this->mutex);
526  this->connectionCb = _cb;
527  }
528 
533  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
534  {
535  std::lock_guard<std::mutex> lock(this->mutex);
536  this->disconnectionCb = _cb;
537  }
538 
542  public: void RegistrationsCb(const DiscoveryCallback<Pub> &_cb)
543  {
544  std::lock_guard<std::mutex> lock(this->mutex);
545  this->registrationCb = _cb;
546  }
547 
551  public: void UnregistrationsCb(const DiscoveryCallback<Pub> &_cb)
552  {
553  std::lock_guard<std::mutex> lock(this->mutex);
554  this->unregistrationCb = _cb;
555  }
556 
558  public: void PrintCurrentState() const
559  {
560  std::lock_guard<std::mutex> lock(this->mutex);
561 
562  std::cout << "---------------" << std::endl;
563  std::cout << std::boolalpha << "Enabled: "
564  << this->enabled << std::endl;
565  std::cout << "Discovery state" << std::endl;
566  std::cout << "\tUUID: " << this->pUuid << std::endl;
567  std::cout << "Settings" << std::endl;
568  std::cout << "\tActivity: " << this->activityInterval
569  << " ms." << std::endl;
570  std::cout << "\tHeartbeat: " << this->heartbeatInterval
571  << "ms." << std::endl;
572  std::cout << "\tSilence: " << this->silenceInterval
573  << " ms." << std::endl;
574  std::cout << "Known information:" << std::endl;
575  this->info.Print();
576 
577  // Used to calculate the elapsed time.
579 
580  std::cout << "Activity" << std::endl;
581  if (this->activity.empty())
582  std::cout << "\t<empty>" << std::endl;
583  else
584  {
585  for (auto &proc : this->activity)
586  {
587  // Elapsed time since the last update from this publisher.
588  std::chrono::duration<double> elapsed = now - proc.second;
589 
590  std::cout << "\t" << proc.first << std::endl;
591  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
592  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
593  << std::endl;
594  }
595  }
596  std::cout << "---------------" << std::endl;
597  }
598 
601  public: void TopicList(std::vector<std::string> &_topics) const
602  {
603  this->WaitForInit();
604  std::lock_guard<std::mutex> lock(this->mutex);
605  this->info.TopicList(_topics);
606  }
607 
610  public: void WaitForInit() const
611  {
612  std::unique_lock<std::mutex> lk(this->mutex);
613 
614  if (!this->initialized)
615  {
616  this->initializedCv.wait(lk, [this]{return this->initialized;});
617  }
618  }
619 
623  private: void UpdateActivity()
624  {
625  // The UUIDs of the processes that have expired.
627 
628  // A copy of the disconnection callback.
629  DiscoveryCallback<Pub> disconnectCb;
630 
632 
633  {
634  std::lock_guard<std::mutex> lock(this->mutex);
635 
636  if (now < this->timeNextActivity)
637  return;
638 
639  disconnectCb = this->disconnectionCb;
640 
641  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
642  {
643  // Elapsed time since the last update from this publisher.
644  auto elapsed = now - it->second;
645 
646  // This publisher has expired.
647  if (std::chrono::duration_cast<std::chrono::milliseconds>
648  (elapsed).count() > this->silenceInterval)
649  {
650  // Remove all the info entries for this process UUID.
651  this->info.DelPublishersByProc(it->first);
652 
653  uuids.push_back(it->first);
654 
655  // Remove the activity entry.
656  this->activity.erase(it++);
657  }
658  else
659  ++it;
660  }
661 
662  this->timeNextActivity = std::chrono::steady_clock::now() +
663  std::chrono::milliseconds(this->activityInterval);
664  }
665 
666  if (!disconnectCb)
667  return;
668 
669  // Notify without topic information. This is useful to inform the
670  // client that a remote node is gone, even if we were not
671  // interested in its topics.
672  for (auto const &uuid : uuids)
673  {
674  Pub publisher;
675  publisher.SetPUuid(uuid);
676  disconnectCb(publisher);
677  }
678  }
679 
681  private: void UpdateHeartbeat()
682  {
684 
685  {
686  std::lock_guard<std::mutex> lock(this->mutex);
687 
688  if (now < this->timeNextHeartbeat)
689  return;
690  }
691 
692  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
693  this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
694 
696  {
697  std::lock_guard<std::mutex> lock(this->mutex);
698 
699  // Re-advertise topics that are advertised inside this process.
700  this->info.PublishersByProc(this->pUuid, nodes);
701  }
702 
703  for (const auto &topic : nodes)
704  {
705  for (const auto &node : topic.second)
706  {
707  this->SendMsg(DestinationType::ALL,
708  msgs::Discovery::ADVERTISE, node);
709  }
710  }
711 
712  {
713  std::lock_guard<std::mutex> lock(this->mutex);
714  if (!this->initialized)
715  {
716  if (this->numHeartbeatsUninitialized == 2u)
717  {
718  // We consider discovery initialized after two heartbeat cycles.
719  this->initialized = true;
720 
721  // Notify anyone waiting for the initialization phase to finish.
722  this->initializedCv.notify_all();
723  }
724  ++this->numHeartbeatsUninitialized;
725  }
726 
727  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
728  std::chrono::milliseconds(this->heartbeatInterval);
729  }
730  }
731 
741  private: int NextTimeout() const
742  {
743  auto now = std::chrono::steady_clock::now();
744  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
745  auto timeUntilNextActivity = this->timeNextActivity - now;
746 
747  int t = static_cast<int>(
748  std::chrono::duration_cast<std::chrono::milliseconds>
749  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
750  int t2 = std::min(t, this->kTimeout);
751  return std::max(t2, 0);
752  }
753 
755  private: void RecvMessages()
756  {
757  bool timeToExit = false;
758  while (!timeToExit)
759  {
760  // Calculate the timeout.
761  int timeout = this->NextTimeout();
762 
763  if (pollSockets(this->sockets, timeout))
764  {
765  this->RecvDiscoveryUpdate();
766 
767  if (this->verbose)
768  this->PrintCurrentState();
769  }
770 
771  this->UpdateHeartbeat();
772  this->UpdateActivity();
773 
774  // Is it time to exit?
775  {
776  std::lock_guard<std::mutex> lock(this->exitMutex);
777  if (this->exit)
778  timeToExit = true;
779  }
780  }
781  }
782 
784  private: void RecvDiscoveryUpdate()
785  {
786  char rcvStr[Discovery::kMaxRcvStr];
787  sockaddr_in clntAddr;
788  socklen_t addrLen = sizeof(clntAddr);
789 
790  int64_t received = recvfrom(this->sockets.at(0),
791  reinterpret_cast<raw_type *>(rcvStr),
792  this->kMaxRcvStr, 0,
793  reinterpret_cast<sockaddr *>(&clntAddr),
794  reinterpret_cast<socklen_t *>(&addrLen));
795  if (received > 0)
796  {
797  uint16_t len = 0;
798  memcpy(&len, &rcvStr[0], sizeof(len));
799 
800  // Ignition Transport delimits each discovery message with a
801  // frame_delimiter that contains byte size information.
802  // A discovery message has the form:
803  //
804  // <frame_delimiter><frame_body>
805  //
806  // Ignition Transport version < 8 sends a frame delimiter that
807  // contains the value of sizeof(frame_delimiter)
808  // + sizeof(frame_body). In other words, the frame_delimiter
809  // contains a value that represents the total size of the
810  // frame_body and frame_delimiter in bytes.
811  //
812  // Ignition Transport version >= 8 sends a frame_delimiter
813  // that contains the value of sizeof(frame_body). In other
814  // words, the frame_delimiter contains a value that represents
815  // the total size of only the frame_body.
816  //
817  // It is possible that two incompatible versions of Ignition
818  // Transport exist on the same network. If we receive an
819  // unexpected size, then we ignore the message.
820 
821  // If-condition for version 8+
822  if (len + sizeof(len) == static_cast<uint16_t>(received))
823  {
824  std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
825  uint16_t srcPort = ntohs(clntAddr.sin_port);
826 
827  if (this->verbose)
828  {
829  std::cout << "\nReceived discovery update from "
830  << srcAddr << ": " << srcPort << std::endl;
831  }
832 
833  this->DispatchDiscoveryMsg(srcAddr, rcvStr + sizeof(len), len);
834  }
835  }
836  else if (received < 0)
837  {
838  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
839  << std::endl;
840  }
841  }
842 
847  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
848  char *_msg, uint16_t _len)
849  {
850  gz::msgs::Discovery msg;
851 
852  // Parse the message, and return if parsing failed. Parsing could
853  // fail when another discovery node is publishing messages using an
854  // older (or newer) format.
855  if (!msg.ParseFromArray(_msg, _len))
856  return;
857 
858  // Discard the message if the wire protocol is different than mine.
859  if (this->Version() != msg.version())
860  return;
861 
862  std::string recvPUuid = msg.process_uuid();
863 
864  // Discard our own discovery messages.
865  if (recvPUuid == this->pUuid)
866  return;
867 
868  // Forwarding summary:
869  // - From a unicast peer -> to multicast group (with NO_RELAY flag).
870  // - From multicast group -> to unicast peers (with RELAY flag).
871 
872  // If the RELAY flag is set, this discovery message is coming via a
873  // unicast transmission. In this case, we don't process it, we just
874  // forward it to the multicast group, and it will be dispatched once
875  // received there. Note that we also unset the RELAY flag and set the
876  // NO_RELAY flag, to avoid forwarding the message anymore.
877  if (msg.has_flags() && msg.flags().relay())
878  {
879  // Unset the RELAY flag in the header and set the NO_RELAY.
880  msg.mutable_flags()->set_relay(false);
881  msg.mutable_flags()->set_no_relay(true);
882  this->SendMulticast(msg);
883 
884  // A unicast peer contacted me. I need to save its address for
885  // sending future messages in the future.
886  this->AddRelayAddress(_fromIp);
887  return;
888  }
889  // If we are receiving this discovery message via the multicast channel
890  // and the NO_RELAY flag is not set, we forward this message via unicast
891  // to all our relays. Note that this is the most common case, where we
892  // receive a regular multicast message and we forward it to any remote
893  // relays.
894  else if (!msg.has_flags() || !msg.flags().no_relay())
895  {
896  msg.mutable_flags()->set_relay(true);
897  this->SendUnicast(msg);
898  }
899 
900  // Update timestamp and cache the callbacks.
901  DiscoveryCallback<Pub> connectCb;
902  DiscoveryCallback<Pub> disconnectCb;
903  DiscoveryCallback<Pub> registerCb;
904  DiscoveryCallback<Pub> unregisterCb;
905  {
906  std::lock_guard<std::mutex> lock(this->mutex);
907  this->activity[recvPUuid] = std::chrono::steady_clock::now();
908  connectCb = this->connectionCb;
909  disconnectCb = this->disconnectionCb;
910  registerCb = this->registrationCb;
911  unregisterCb = this->unregistrationCb;
912  }
913 
914  switch (msg.type())
915  {
916  case msgs::Discovery::ADVERTISE:
917  {
918  // Read the rest of the fields.
919  Pub publisher;
920  publisher.SetFromDiscovery(msg);
921 
922  // Check scope of the topic.
923  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
924  (publisher.Options().Scope() == Scope_t::HOST &&
925  _fromIp != this->hostAddr))
926  {
927  return;
928  }
929 
930  // Register an advertised address for the topic.
931  bool added;
932  {
933  std::lock_guard<std::mutex> lock(this->mutex);
934  added = this->info.AddPublisher(publisher);
935  }
936 
937  if (added && connectCb)
938  {
939  // Execute the client's callback.
940  connectCb(publisher);
941  }
942 
943  break;
944  }
945  case msgs::Discovery::SUBSCRIBE:
946  {
947  std::string recvTopic;
948  // Read the topic information.
949  if (msg.has_sub())
950  {
951  recvTopic = msg.sub().topic();
952  }
953  else
954  {
955  std::cerr << "Subscription discovery message is missing "
956  << "Subscriber information.\n";
957  break;
958  }
959 
960  // Check if at least one of my nodes advertises the topic requested.
961  Addresses_M<Pub> addresses;
962  {
963  std::lock_guard<std::mutex> lock(this->mutex);
964  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
965  {
966  break;
967  }
968 
969  if (!this->info.Publishers(recvTopic, addresses))
970  break;
971  }
972 
973  for (const auto &nodeInfo : addresses[this->pUuid])
974  {
975  // Check scope of the topic.
976  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
977  (nodeInfo.Options().Scope() == Scope_t::HOST &&
978  _fromIp != this->hostAddr))
979  {
980  continue;
981  }
982 
983  // Answer an ADVERTISE message.
984  this->SendMsg(DestinationType::ALL,
985  msgs::Discovery::ADVERTISE, nodeInfo);
986  }
987 
988  break;
989  }
990  case msgs::Discovery::NEW_CONNECTION:
991  {
992  // Read the rest of the fields.
993  Pub publisher;
994  publisher.SetFromDiscovery(msg);
995 
996  if (registerCb)
997  registerCb(publisher);
998 
999  break;
1000  }
1001  case msgs::Discovery::END_CONNECTION:
1002  {
1003  // Read the rest of the fields.
1004  Pub publisher;
1005  publisher.SetFromDiscovery(msg);
1006 
1007  if (unregisterCb)
1008  unregisterCb(publisher);
1009 
1010  break;
1011  }
1012  case msgs::Discovery::HEARTBEAT:
1013  {
1014  // The timestamp has already been updated.
1015  break;
1016  }
1017  case msgs::Discovery::BYE:
1018  {
1019  // Remove the activity entry for this publisher.
1020  {
1021  std::lock_guard<std::mutex> lock(this->mutex);
1022  this->activity.erase(recvPUuid);
1023  }
1024 
1025  if (disconnectCb)
1026  {
1027  Pub pub;
1028  pub.SetPUuid(recvPUuid);
1029  // Notify the new disconnection.
1030  disconnectCb(pub);
1031  }
1032 
1033  // Remove the address entry for this topic.
1034  {
1035  std::lock_guard<std::mutex> lock(this->mutex);
1036  this->info.DelPublishersByProc(recvPUuid);
1037  }
1038 
1039  break;
1040  }
1041  case msgs::Discovery::UNADVERTISE:
1042  {
1043  // Read the address.
1044  Pub publisher;
1045  publisher.SetFromDiscovery(msg);
1046 
1047  // Check scope of the topic.
1048  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1049  (publisher.Options().Scope() == Scope_t::HOST &&
1050  _fromIp != this->hostAddr))
1051  {
1052  return;
1053  }
1054 
1055  if (disconnectCb)
1056  {
1057  // Notify the new disconnection.
1058  disconnectCb(publisher);
1059  }
1060 
1061  // Remove the address entry for this topic.
1062  {
1063  std::lock_guard<std::mutex> lock(this->mutex);
1064  this->info.DelPublisherByNode(publisher.Topic(),
1065  publisher.PUuid(), publisher.NUuid());
1066  }
1067 
1068  break;
1069  }
1070  default:
1071  {
1072  std::cerr << "Unknown message type [" << msg.type() << "].\n";
1073  break;
1074  }
1075  }
1076  }
1077 
1084  private: template<typename T>
1085  void SendMsg(const DestinationType &_destType,
1086  const msgs::Discovery::Type _type,
1087  const T &_pub) const
1088  {
1089  gz::msgs::Discovery discoveryMsg;
1090  discoveryMsg.set_version(this->Version());
1091  discoveryMsg.set_type(_type);
1092  discoveryMsg.set_process_uuid(this->pUuid);
1093 
1094  switch (_type)
1095  {
1096  case msgs::Discovery::ADVERTISE:
1097  case msgs::Discovery::UNADVERTISE:
1098  case msgs::Discovery::NEW_CONNECTION:
1099  case msgs::Discovery::END_CONNECTION:
1100  {
1101  _pub.FillDiscovery(discoveryMsg);
1102  break;
1103  }
1104  case msgs::Discovery::SUBSCRIBE:
1105  {
1106  discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1107  break;
1108  }
1109  case msgs::Discovery::HEARTBEAT:
1110  case msgs::Discovery::BYE:
1111  break;
1112  default:
1113  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1114  << " type [" << _type << "]" << std::endl;
1115  return;
1116  }
1117 
1118  if (_destType == DestinationType::MULTICAST ||
1119  _destType == DestinationType::ALL)
1120  {
1121  this->SendMulticast(discoveryMsg);
1122  }
1123 
1124  // Send the discovery message to the unicast relays.
1125  if (_destType == DestinationType::UNICAST ||
1126  _destType == DestinationType::ALL)
1127  {
1128  // Set the RELAY flag in the header.
1129  discoveryMsg.mutable_flags()->set_relay(true);
1130  this->SendUnicast(discoveryMsg);
1131  }
1132 
1133  if (this->verbose)
1134  {
1135  std::cout << "\t* Sending " << msgs::ToString(_type)
1136  << " msg [" << _pub.Topic() << "]" << std::endl;
1137  }
1138  }
1139 
1142  private: void SendUnicast(const msgs::Discovery &_msg) const
1143  {
1144  uint16_t msgSize;
1145 
1146 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1147  size_t msgSizeFull = _msg.ByteSizeLong();
1148 #else
1149  int msgSizeFull = _msg.ByteSize();
1150 #endif
1151  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1152  {
1153  std::cerr << "Discovery message too large to send. Discovery won't "
1154  << "work. This shouldn't happen.\n";
1155  return;
1156  }
1157  msgSize = msgSizeFull;
1158 
1159  uint16_t totalSize = sizeof(msgSize) + msgSize;
1160  char *buffer = static_cast<char *>(new char[totalSize]);
1161  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1162 
1163  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1164  {
1165  // Send the discovery message to the unicast relays.
1166  for (const auto &sockAddr : this->relayAddrs)
1167  {
1168  auto sent = sendto(this->sockets.at(0),
1169  reinterpret_cast<const raw_type *>(
1170  reinterpret_cast<const unsigned char*>(buffer)),
1171  totalSize, 0,
1172  reinterpret_cast<const sockaddr *>(&sockAddr),
1173  sizeof(sockAddr));
1174 
1175  if (sent != totalSize)
1176  {
1177  std::cerr << "Exception sending a unicast message" << std::endl;
1178  break;
1179  }
1180  }
1181  }
1182  else
1183  {
1184  std::cerr << "Discovery::SendUnicast: Error serializing data."
1185  << std::endl;
1186  }
1187 
1188  delete [] buffer;
1189  }
1190 
1193  private: void SendMulticast(const msgs::Discovery &_msg) const
1194  {
1195  uint16_t msgSize;
1196 
1197 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1198  size_t msgSizeFull = _msg.ByteSizeLong();
1199 #else
1200  int msgSizeFull = _msg.ByteSize();
1201 #endif
1202  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1203  {
1204  std::cerr << "Discovery message too large to send. Discovery won't "
1205  << "work. This shouldn't happen.\n";
1206  return;
1207  }
1208 
1209  msgSize = msgSizeFull;
1210  uint16_t totalSize = sizeof(msgSize) + msgSize;
1211  char *buffer = static_cast<char *>(new char[totalSize]);
1212  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1213 
1214  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1215  {
1216  // Send the discovery message to the multicast group through all the
1217  // sockets.
1218  for (const auto &sock : this->Sockets())
1219  {
1220  errno = 0;
1221  if (sendto(sock, reinterpret_cast<const raw_type *>(
1222  reinterpret_cast<const unsigned char*>(buffer)),
1223  totalSize, 0,
1224  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1225  sizeof(*(this->MulticastAddr()))) != totalSize)
1226  {
1227  // Ignore EPERM and ENOBUFS errors.
1228  //
1229  // See issue #106
1230  //
1231  // Rationale drawn from:
1232  //
1233  // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1234  // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1235  if (errno != EPERM && errno != ENOBUFS)
1236  {
1237  std::cerr << "Exception sending a multicast message:"
1238  << strerror(errno) << std::endl;
1239  }
1240  break;
1241  }
1242  }
1243  }
1244  else
1245  {
1246  std::cerr << "Discovery::SendMulticast: Error serializing data."
1247  << std::endl;
1248  }
1249 
1250  delete [] buffer;
1251  }
1252 
1255  private: const std::vector<int> &Sockets() const
1256  {
1257  return this->sockets;
1258  }
1259 
1262  private: const sockaddr_in *MulticastAddr() const
1263  {
1264  return &this->mcastAddr;
1265  }
1266 
1269  private: uint8_t Version() const
1270  {
1271  static std::string ignStats;
1272  static int topicStats =
1273  (env("IGN_TRANSPORT_TOPIC_STATISTICS", ignStats) && ignStats == "1");
1274  return this->kWireVersion + (topicStats * 100);
1275  }
1276 
1281  private: bool RegisterNetIface(const std::string &_ip)
1282  {
1283  // Make a new socket for sending discovery information.
1284  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1285  if (sock < 0)
1286  {
1287  std::cerr << "Socket creation failed." << std::endl;
1288  return false;
1289  }
1290 
1291  // Socket option: IP_MULTICAST_IF.
1292  // This socket option needs to be applied to each socket used to send
1293  // data. This option selects the source interface for outgoing messages.
1294  struct in_addr ifAddr;
1295  ifAddr.s_addr = inet_addr(_ip.c_str());
1296  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1297  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1298  {
1299  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1300  << std::endl;
1301  return false;
1302  }
1303 
1304  this->sockets.push_back(sock);
1305 
1306  // Join the multicast group. We have to do it for each network interface
1307  // but we can do it on the same socket. We will use the socket at
1308  // position 0 for receiving multicast information.
1309  struct ip_mreq group;
1310  group.imr_multiaddr.s_addr =
1311  inet_addr(this->kMulticastGroup.c_str());
1312  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1313  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1314  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1315  {
1316  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1317  << std::endl;
1318  return false;
1319  }
1320 
1321  return true;
1322  }
1323 
1326  private: void AddRelayAddress(const std::string &_ip)
1327  {
1328  // Sanity check: Make sure that this IP address is not already saved.
1329  for (auto const &addr : this->relayAddrs)
1330  {
1331  if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
1332  return;
1333  }
1334 
1335  sockaddr_in addr;
1336  memset(&addr, 0, sizeof(addr));
1337  addr.sin_family = AF_INET;
1338  addr.sin_addr.s_addr = inet_addr(_ip.c_str());
1339  addr.sin_port = htons(static_cast<u_short>(this->port));
1340 
1341  this->relayAddrs.push_back(addr);
1342  }
1343 
1347  private: static const unsigned int kDefActivityInterval = 100;
1348 
1352  private: static const unsigned int kDefHeartbeatInterval = 1000;
1353 
1357  private: static const unsigned int kDefSilenceInterval = 3000;
1358 
1360  private: const std::string kMulticastGroup = "224.0.0.7";
1361 
1363  private: const int kTimeout = 250;
1364 
1366  private: static const uint16_t kMaxRcvStr =
1368 
1371  private: static const uint8_t kWireVersion = 10;
1372 
1374  private: int port;
1375 
1377  private: std::string hostAddr;
1378 
1380  private: std::vector<std::string> hostInterfaces;
1381 
1383  private: std::string pUuid;
1384 
1388  private: unsigned int silenceInterval;
1389 
1393  private: unsigned int activityInterval;
1394 
1398  private: unsigned int heartbeatInterval;
1399 
1401  private: DiscoveryCallback<Pub> connectionCb;
1402 
1404  private: DiscoveryCallback<Pub> disconnectionCb;
1405 
1407  private: DiscoveryCallback<Pub> registrationCb;
1408 
1410  private: DiscoveryCallback<Pub> unregistrationCb;
1411 
1413  private: TopicStorage<Pub> info;
1414 
1420 
1422  private: bool verbose;
1423 
1425  private: std::vector<int> sockets;
1426 
1428  private: sockaddr_in mcastAddr;
1429 
1431  private: std::vector<sockaddr_in> relayAddrs;
1432 
1434  private: mutable std::mutex mutex;
1435 
1437  private: std::thread threadReception;
1438 
1440  private: Timestamp timeNextHeartbeat;
1441 
1443  private: Timestamp timeNextActivity;
1444 
1446  private: std::mutex exitMutex;
1447 
1452  private: bool initialized;
1453 
1455  private: unsigned int numHeartbeatsUninitialized;
1456 
1458  private: mutable std::condition_variable initializedCv;
1459 
1461  private: bool exit;
1462 
1464  private: bool enabled;
1465  };
1466 
1470 
1474  }
1475  }
1476 }
1477 
1478 #endif
void Register(const MessagePublisher &_pub) const
Register a node from this process as a remote subscriber.
Definition: gz/transport/Discovery.hh:384
void UnregistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a remote node unsubscribes to a topic within this proces...
Definition: gz/transport/Discovery.hh:551
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected,...
Definition: gz/transport/Discovery.hh:523
bool Publishers(const std::string &_topic, std::map< std::string, std::vector< T >> &_info) const
Get the map of publishers stored for a given topic.
Definition: gz/transport/TopicStorage.hh:207
T lock(T... args)
bool DelPublisherByNode(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid)
Remove a publisher associated to a given topic and UUID pair.
Definition: gz/transport/TopicStorage.hh:222
@ HOST
Topic/service only available to subscribers in the same machine as the publisher.
Definition: gz/transport/AdvertiseOptions.hh:28
STL class.
This class stores all the information about a publisher. It stores the topic name that publishes,...
Definition: gz/transport/Publisher.hh:44
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: gz/transport/Discovery.hh:1419
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: gz/transport/Discovery.hh:495
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds.
Definition: gz/transport/Discovery.hh:465
@ ALL
Topic/service available to any subscriber (default scope).
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::vector< std::string > split(const std::string &_orig, char _delim)
split at a one character delimiter to get a vector of something
STL class.
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: gz/transport/Discovery.hh:610
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently stored.
Definition: gz/transport/TopicStorage.hh:339
T strerror(T... args)
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: gz/transport/Discovery.hh:400
This class stores all the information about a message publisher.
Definition: gz/transport/Publisher.hh:222
A class for customizing the publication options for a topic or service advertised....
Definition: gz/transport/AdvertiseOptions.hh:59
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: gz/transport/Discovery.hh:486
T unlock(T... args)
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
virtual ~Discovery()
Destructor.
Definition: gz/transport/Discovery.hh:247
T push_back(T... args)
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes....
Definition: gz/transport/Discovery.hh:476
void Unregister(const MessagePublisher &_pub) const
Unregister a node from this process as a remote subscriber.
Definition: gz/transport/Discovery.hh:392
T joinable(T... args)
void PrintCurrentState() const
Print the current discovery state.
Definition: gz/transport/Discovery.hh:558
std::chrono::steady_clock::time_point Timestamp
Definition: gz/transport/TransportTypes.hh:155
STL class.
T at(T... args)
bool DelPublishersByProc(const std::string &_pUuid)
Remove all the publishers associated to a given process.
Definition: gz/transport/TopicStorage.hh:262
bool AddPublisher(const T &_publisher)
Add a new address associated to a given topic and node UUID.
Definition: gz/transport/TopicStorage.hh:53
T c_str(T... args)
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: gz/transport/Discovery.hh:424
T erase(T... args)
STL class.
@ ALL
Send data via unicast and multicast.
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: gz/transport/Discovery.hh:117
T min(T... args)
DestinationType
Options for sending discovery messages.
Definition: gz/transport/Discovery.hh:88
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: gz/transport/Discovery.hh:513
bool HasAnyPublishers(const std::string &_topic, const std::string &_pUuid) const
Return if there is any publisher stored for the given topic and process UUID.
Definition: gz/transport/TopicStorage.hh:134
T endl(T... args)
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: gz/transport/Discovery.hh:601
T duration_cast(T... args)
std::string HostAddr() const
Get the IP address of this host.
Definition: gz/transport/Discovery.hh:455
T boolalpha(T... args)
T cbegin(T... args)
void PublishersByProc(const std::string &_pUuid, std::map< std::string, std::vector< T >> &_pubs) const
Given a process UUID, the function returns the list of publishers contained in this process UUID with...
Definition: gz/transport/TopicStorage.hh:286
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active,...
Definition: gz/transport/Discovery.hh:533
void Print() const
Print all the information for debugging purposes.
Definition: gz/transport/TopicStorage.hh:346
T empty(T... args)
@ UNICAST
Send data via unicast only.
STL class.
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: gz/transport/Discovery.hh:124
T memcpy(T... args)
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: gz/transport/Discovery.hh:504
T max(T... args)
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: gz/transport/Discovery.hh:278
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: gz/transport/Discovery.hh:334
@ PROCESS
Topic/service only available to subscribers in the same process as the publisher.
void RegistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a new remote node subscribes to a topic within this proc...
Definition: gz/transport/Discovery.hh:542
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: gz/transport/Discovery.hh:302
T join(T... args)
T memset(T... args)
void raw_type
Definition: gz/transport/Discovery.hh:45
bool Publisher(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid, T &_publisher) const
Get the address information for a given topic and node UUID.
Definition: gz/transport/TopicStorage.hh:169
@ MULTICAST
Send data via multicast only.
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github....
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: gz/transport/Discovery.hh:410