Gazebo Transport

API Reference

11.4.1
gz/transport/Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GZ_TRANSPORT_DISCOVERY_HH_
19 #define GZ_TRANSPORT_DISCOVERY_HH_
20 #include <errno.h>
21 #include <string.h>
22 
23 #ifdef _WIN32
24  // For socket(), connect(), send(), and recv().
25  #include <Winsock2.h>
26  #include <Ws2def.h>
27  #include <Ws2ipdef.h>
28  #include <Ws2tcpip.h>
29  // Type used for raw data on this platform.
30  using raw_type = char;
31 #else
32  // For data types
33  #include <sys/types.h>
34  // For socket(), connect(), send(), and recv()
35  #include <sys/socket.h>
36  // For gethostbyname()
37  #include <netdb.h>
38  // For inet_addr()
39  #include <arpa/inet.h>
40  // For close()
41  #include <unistd.h>
42  // For sockaddr_in
43  #include <netinet/in.h>
44  // Type used for raw data on this platform
45  using raw_type = void;
46 #endif
47 
48 #ifdef _WIN32
49  #pragma warning(push, 0)
50 #endif
51 #ifdef _WIN32
52  #pragma warning(pop)
53  // Suppress "decorated name length exceed" warning in STL.
54  #pragma warning(disable: 4503)
55  // Suppress "depreted API warnings" in WINSOCK.
56  #pragma warning(disable: 4996)
57 #endif
58 
59 #include <gz/msgs/discovery.pb.h>
60 
61 #include <algorithm>
62 #include <condition_variable>
63 #include <limits>
64 #include <map>
65 #include <memory>
66 #include <mutex>
67 #include <string>
68 #include <thread>
69 #include <vector>
70 
71 #include <gz/msgs/Utility.hh>
72 
73 #include "gz/transport/config.hh"
74 #include "gz/transport/Export.hh"
75 #include "gz/transport/Helpers.hh"
76 #include "gz/transport/NetUtils.hh"
80 
81 namespace ignition
82 {
83  namespace transport
84  {
85  // Inline bracket to help doxygen filtering.
86  inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
88  enum class DestinationType
89  {
91  UNICAST,
93  MULTICAST,
95  ALL
96  };
97 
98  //
104  bool IGNITION_TRANSPORT_VISIBLE pollSockets(
105  const std::vector<int> &_sockets,
106  const int _timeout);
107 
116  template<typename Pub>
117  class Discovery
118  {
125  public: Discovery(const std::string &_pUuid,
126  const std::string &_ip,
127  const int _port,
128  const bool _verbose = false)
129  : multicastGroup(_ip),
130  port(_port),
131  hostAddr(determineHost()),
132  pUuid(_pUuid),
133  silenceInterval(kDefSilenceInterval),
134  activityInterval(kDefActivityInterval),
135  heartbeatInterval(kDefHeartbeatInterval),
136  connectionCb(nullptr),
137  disconnectionCb(nullptr),
138  verbose(_verbose),
139  initialized(false),
140  numHeartbeatsUninitialized(0),
141  exit(false),
142  enabled(false)
143  {
144  std::string ignIp;
145  if (env("IGN_IP", ignIp) && !ignIp.empty())
146  this->hostInterfaces = {ignIp};
147  else
148  {
149  // Get the list of network interfaces in this host.
150  this->hostInterfaces = determineInterfaces();
151  }
152 
153 #ifdef _WIN32
154  WORD wVersionRequested;
155  WSADATA wsaData;
156 
157  // Request WinSock v2.2.
158  wVersionRequested = MAKEWORD(2, 2);
159  // Load WinSock DLL.
160  if (WSAStartup(wVersionRequested, &wsaData) != 0)
161  {
162  std::cerr << "Unable to load WinSock DLL" << std::endl;
163  return;
164  }
165 #endif
166  for (const auto &netIface : this->hostInterfaces)
167  {
168  auto succeed = this->RegisterNetIface(netIface);
169 
170  // If the IP address that we're selecting as the main IP address of
171  // the host is invalid, we change it to 127.0.0.1 .
172  // This is probably because IGN_IP is set to a wrong value.
173  if (netIface == this->hostAddr && !succeed)
174  {
175  this->RegisterNetIface("127.0.0.1");
176  std::cerr << "Did you set the environment variable IGN_IP with a "
177  << "correct IP address? " << std::endl
178  << " [" << netIface << "] seems an invalid local IP "
179  << "address." << std::endl
180  << " Using 127.0.0.1 as hostname." << std::endl;
181  this->hostAddr = "127.0.0.1";
182  }
183  }
184 
185  // Socket option: SO_REUSEADDR. This options is used only for receiving
186  // data. We can reuse the same socket for receiving multicast data from
187  // multiple interfaces. We will use the socket at position 0 for
188  // receiving data.
189  int reuseAddr = 1;
190  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
191  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
192  {
193  std::cerr << "Error setting socket option (SO_REUSEADDR)."
194  << std::endl;
195  return;
196  }
197 
198 #ifdef SO_REUSEPORT
199  // Socket option: SO_REUSEPORT. This options is used only for receiving
200  // data. We can reuse the same socket for receiving multicast data from
201  // multiple interfaces. We will use the socket at position 0 for
202  // receiving data.
203  int reusePort = 1;
204  // cppcheck-suppress ConfigurationNotChecked
205  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
206  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
207  {
208  std::cerr << "Error setting socket option (SO_REUSEPORT)."
209  << std::endl;
210  return;
211  }
212 #endif
213  // Bind the first socket to the discovery port.
214  sockaddr_in localAddr;
215  memset(&localAddr, 0, sizeof(localAddr));
216  localAddr.sin_family = AF_INET;
217  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
218  localAddr.sin_port = htons(static_cast<u_short>(this->port));
219 
220  if (bind(this->sockets.at(0),
221  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
222  {
223  std::cerr << "Binding to a local port failed." << std::endl;
224  return;
225  }
226 
227  // Set 'mcastAddr' to the multicast discovery group.
228  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
229  this->mcastAddr.sin_family = AF_INET;
230  this->mcastAddr.sin_addr.s_addr =
231  inet_addr(this->multicastGroup.c_str());
232  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
233 
235  std::string ignRelay = "";
236  if (env("IGN_RELAY", ignRelay) && !ignRelay.empty())
237  {
238  relays = transport::split(ignRelay, ':');
239  }
240 
241  // Register all unicast relays.
242  for (auto const &relayAddr : relays)
243  this->AddRelayAddress(relayAddr);
244 
245  if (this->verbose)
246  this->PrintCurrentState();
247  }
248 
250  public: virtual ~Discovery()
251  {
252  // Tell the service thread to terminate.
253  this->exitMutex.lock();
254  this->exit = true;
255  this->exitMutex.unlock();
256 
257  // Wait for the service threads to finish before exit.
258  if (this->threadReception.joinable())
259  this->threadReception.join();
260 
261  // Broadcast a BYE message to trigger the remote cancellation of
262  // all our advertised topics.
263  this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
264  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
265 
266  // Close sockets.
267  for (const auto &sock : this->sockets)
268  {
269 #ifdef _WIN32
270  closesocket(sock);
271  WSACleanup();
272 #else
273  close(sock);
274 #endif
275  }
276  }
277 
281  public: void Start()
282  {
283  {
284  std::lock_guard<std::mutex> lock(this->mutex);
285 
286  // The service is already running.
287  if (this->enabled)
288  return;
289 
290  this->enabled = true;
291  }
292 
293  auto now = std::chrono::steady_clock::now();
294  this->timeNextHeartbeat = now;
295  this->timeNextActivity = now;
296 
297  // Start the thread that receives discovery information.
298  this->threadReception = std::thread(&Discovery::RecvMessages, this);
299  }
300 
305  public: bool Advertise(const Pub &_publisher)
306  {
307  {
308  std::lock_guard<std::mutex> lock(this->mutex);
309 
310  if (!this->enabled)
311  return false;
312 
313  // Add the addressing information (local publisher).
314  if (!this->info.AddPublisher(_publisher))
315  return false;
316  }
317 
318  // Only advertise a message outside this process if the scope
319  // is not 'Process'
320  if (_publisher.Options().Scope() != Scope_t::PROCESS)
321  this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
322  _publisher);
323 
324  return true;
325  }
326 
337  public: bool Discover(const std::string &_topic) const
338  {
340  bool found;
341  Addresses_M<Pub> addresses;
342 
343  {
344  std::lock_guard<std::mutex> lock(this->mutex);
345 
346  if (!this->enabled)
347  return false;
348 
349  cb = this->connectionCb;
350  }
351 
352  Pub pub;
353  pub.SetTopic(_topic);
354  pub.SetPUuid(this->pUuid);
355 
356  // Send a discovery request.
357  this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
358 
359  {
360  std::lock_guard<std::mutex> lock(this->mutex);
361  found = this->info.Publishers(_topic, addresses);
362  }
363 
364  if (found)
365  {
366  // I already have information about this topic.
367  for (const auto &proc : addresses)
368  {
369  for (const auto &node : proc.second)
370  {
371  if (cb)
372  {
373  // Execute the user's callback for a service request. Notice
374  // that we only execute one callback for preventing receive
375  // multiple service responses for a single request.
376  cb(node);
377  }
378  }
379  }
380  }
381 
382  return true;
383  }
384 
387  public: void Register(const MessagePublisher &_pub) const
388  {
389  this->SendMsg(
390  DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
391  }
392 
395  public: void Unregister(const MessagePublisher &_pub) const
396  {
397  this->SendMsg(
398  DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
399  }
400 
403  public: const TopicStorage<Pub> &Info() const
404  {
405  std::lock_guard<std::mutex> lock(this->mutex);
406  return this->info;
407  }
408 
413  public: bool Publishers(const std::string &_topic,
414  Addresses_M<Pub> &_publishers) const
415  {
416  std::lock_guard<std::mutex> lock(this->mutex);
417  return this->info.Publishers(_topic, _publishers);
418  }
419 
427  public: bool Unadvertise(const std::string &_topic,
428  const std::string &_nUuid)
429  {
430  Pub inf;
431  {
432  std::lock_guard<std::mutex> lock(this->mutex);
433 
434  if (!this->enabled)
435  return false;
436 
437  // Don't do anything if the topic is not advertised by any of my nodes
438  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
439  return true;
440 
441  // Remove the topic information.
442  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
443  }
444 
445  // Only unadvertise a message outside this process if the scope
446  // is not 'Process'.
447  if (inf.Options().Scope() != Scope_t::PROCESS)
448  {
449  this->SendMsg(DestinationType::ALL,
450  msgs::Discovery::UNADVERTISE, inf);
451  }
452 
453  return true;
454  }
455 
458  public: std::string HostAddr() const
459  {
460  std::lock_guard<std::mutex> lock(this->mutex);
461  return this->hostAddr;
462  }
463 
468  public: unsigned int ActivityInterval() const
469  {
470  std::lock_guard<std::mutex> lock(this->mutex);
471  return this->activityInterval;
472  }
473 
479  public: unsigned int HeartbeatInterval() const
480  {
481  std::lock_guard<std::mutex> lock(this->mutex);
482  return this->heartbeatInterval;
483  }
484 
489  public: unsigned int SilenceInterval() const
490  {
491  std::lock_guard<std::mutex> lock(this->mutex);
492  return this->silenceInterval;
493  }
494 
498  public: void SetActivityInterval(const unsigned int _ms)
499  {
500  std::lock_guard<std::mutex> lock(this->mutex);
501  this->activityInterval = _ms;
502  }
503 
507  public: void SetHeartbeatInterval(const unsigned int _ms)
508  {
509  std::lock_guard<std::mutex> lock(this->mutex);
510  this->heartbeatInterval = _ms;
511  }
512 
516  public: void SetSilenceInterval(const unsigned int _ms)
517  {
518  std::lock_guard<std::mutex> lock(this->mutex);
519  this->silenceInterval = _ms;
520  }
521 
526  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
527  {
528  std::lock_guard<std::mutex> lock(this->mutex);
529  this->connectionCb = _cb;
530  }
531 
536  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
537  {
538  std::lock_guard<std::mutex> lock(this->mutex);
539  this->disconnectionCb = _cb;
540  }
541 
545  public: void RegistrationsCb(const DiscoveryCallback<Pub> &_cb)
546  {
547  std::lock_guard<std::mutex> lock(this->mutex);
548  this->registrationCb = _cb;
549  }
550 
554  public: void UnregistrationsCb(const DiscoveryCallback<Pub> &_cb)
555  {
556  std::lock_guard<std::mutex> lock(this->mutex);
557  this->unregistrationCb = _cb;
558  }
559 
561  public: void PrintCurrentState() const
562  {
563  std::lock_guard<std::mutex> lock(this->mutex);
564 
565  std::cout << "---------------" << std::endl;
566  std::cout << std::boolalpha << "Enabled: "
567  << this->enabled << std::endl;
568  std::cout << "Discovery state" << std::endl;
569  std::cout << "\tUUID: " << this->pUuid << std::endl;
570  std::cout << "Settings" << std::endl;
571  std::cout << "\tActivity: " << this->activityInterval
572  << " ms." << std::endl;
573  std::cout << "\tHeartbeat: " << this->heartbeatInterval
574  << "ms." << std::endl;
575  std::cout << "\tSilence: " << this->silenceInterval
576  << " ms." << std::endl;
577  std::cout << "Known information:" << std::endl;
578  this->info.Print();
579 
580  // Used to calculate the elapsed time.
582 
583  std::cout << "Activity" << std::endl;
584  if (this->activity.empty())
585  std::cout << "\t<empty>" << std::endl;
586  else
587  {
588  for (auto &proc : this->activity)
589  {
590  // Elapsed time since the last update from this publisher.
591  std::chrono::duration<double> elapsed = now - proc.second;
592 
593  std::cout << "\t" << proc.first << std::endl;
594  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
595  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
596  << std::endl;
597  }
598  }
599  std::cout << "---------------" << std::endl;
600  }
601 
604  public: void TopicList(std::vector<std::string> &_topics) const
605  {
606  this->WaitForInit();
607  std::lock_guard<std::mutex> lock(this->mutex);
608  this->info.TopicList(_topics);
609  }
610 
613  public: void WaitForInit() const
614  {
615  std::unique_lock<std::mutex> lk(this->mutex);
616 
617  if (!this->initialized)
618  {
619  this->initializedCv.wait(lk, [this]{return this->initialized;});
620  }
621  }
622 
626  private: void UpdateActivity()
627  {
628  // The UUIDs of the processes that have expired.
630 
631  // A copy of the disconnection callback.
632  DiscoveryCallback<Pub> disconnectCb;
633 
635 
636  {
637  std::lock_guard<std::mutex> lock(this->mutex);
638 
639  if (now < this->timeNextActivity)
640  return;
641 
642  disconnectCb = this->disconnectionCb;
643 
644  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
645  {
646  // Elapsed time since the last update from this publisher.
647  auto elapsed = now - it->second;
648 
649  // This publisher has expired.
650  if (std::chrono::duration_cast<std::chrono::milliseconds>
651  (elapsed).count() > this->silenceInterval)
652  {
653  // Remove all the info entries for this process UUID.
654  this->info.DelPublishersByProc(it->first);
655 
656  uuids.push_back(it->first);
657 
658  // Remove the activity entry.
659  this->activity.erase(it++);
660  }
661  else
662  ++it;
663  }
664 
665  this->timeNextActivity = std::chrono::steady_clock::now() +
666  std::chrono::milliseconds(this->activityInterval);
667  }
668 
669  if (!disconnectCb)
670  return;
671 
672  // Notify without topic information. This is useful to inform the
673  // client that a remote node is gone, even if we were not
674  // interested in its topics.
675  for (auto const &uuid : uuids)
676  {
677  Pub publisher;
678  publisher.SetPUuid(uuid);
679  disconnectCb(publisher);
680  }
681  }
682 
684  private: void UpdateHeartbeat()
685  {
687 
688  {
689  std::lock_guard<std::mutex> lock(this->mutex);
690 
691  if (now < this->timeNextHeartbeat)
692  return;
693  }
694 
695  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
696  this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
697 
699  {
700  std::lock_guard<std::mutex> lock(this->mutex);
701 
702  // Re-advertise topics that are advertised inside this process.
703  this->info.PublishersByProc(this->pUuid, nodes);
704  }
705 
706  for (const auto &topic : nodes)
707  {
708  for (const auto &node : topic.second)
709  {
710  this->SendMsg(DestinationType::ALL,
711  msgs::Discovery::ADVERTISE, node);
712  }
713  }
714 
715  {
716  std::lock_guard<std::mutex> lock(this->mutex);
717  if (!this->initialized)
718  {
719  if (this->numHeartbeatsUninitialized == 2u)
720  {
721  // We consider discovery initialized after two heartbeat cycles.
722  this->initialized = true;
723 
724  // Notify anyone waiting for the initialization phase to finish.
725  this->initializedCv.notify_all();
726  }
727  ++this->numHeartbeatsUninitialized;
728  }
729 
730  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
731  std::chrono::milliseconds(this->heartbeatInterval);
732  }
733  }
734 
744  private: int NextTimeout() const
745  {
746  auto now = std::chrono::steady_clock::now();
747  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
748  auto timeUntilNextActivity = this->timeNextActivity - now;
749 
750  int t = static_cast<int>(
751  std::chrono::duration_cast<std::chrono::milliseconds>
752  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
753  int t2 = std::min(t, this->kTimeout);
754  return std::max(t2, 0);
755  }
756 
758  private: void RecvMessages()
759  {
760  bool timeToExit = false;
761  while (!timeToExit)
762  {
763  // Calculate the timeout.
764  int timeout = this->NextTimeout();
765 
766  if (pollSockets(this->sockets, timeout))
767  {
768  this->RecvDiscoveryUpdate();
769 
770  if (this->verbose)
771  this->PrintCurrentState();
772  }
773 
774  this->UpdateHeartbeat();
775  this->UpdateActivity();
776 
777  // Is it time to exit?
778  {
779  std::lock_guard<std::mutex> lock(this->exitMutex);
780  if (this->exit)
781  timeToExit = true;
782  }
783  }
784  }
785 
787  private: void RecvDiscoveryUpdate()
788  {
789  char rcvStr[Discovery::kMaxRcvStr];
790  sockaddr_in clntAddr;
791  socklen_t addrLen = sizeof(clntAddr);
792 
793  int64_t received = recvfrom(this->sockets.at(0),
794  reinterpret_cast<raw_type *>(rcvStr),
795  this->kMaxRcvStr, 0,
796  reinterpret_cast<sockaddr *>(&clntAddr),
797  reinterpret_cast<socklen_t *>(&addrLen));
798  if (received > 0)
799  {
800  uint16_t len = 0;
801  memcpy(&len, &rcvStr[0], sizeof(len));
802 
803  // Ignition Transport delimits each discovery message with a
804  // frame_delimiter that contains byte size information.
805  // A discovery message has the form:
806  //
807  // <frame_delimiter><frame_body>
808  //
809  // Ignition Transport version < 8 sends a frame delimiter that
810  // contains the value of sizeof(frame_delimiter)
811  // + sizeof(frame_body). In other words, the frame_delimiter
812  // contains a value that represents the total size of the
813  // frame_body and frame_delimiter in bytes.
814  //
815  // Ignition Transport version >= 8 sends a frame_delimiter
816  // that contains the value of sizeof(frame_body). In other
817  // words, the frame_delimiter contains a value that represents
818  // the total size of only the frame_body.
819  //
820  // It is possible that two incompatible versions of Ignition
821  // Transport exist on the same network. If we receive an
822  // unexpected size, then we ignore the message.
823 
824  // If-condition for version 8+
825  if (len + sizeof(len) == static_cast<uint16_t>(received))
826  {
827  std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
828  uint16_t srcPort = ntohs(clntAddr.sin_port);
829 
830  if (this->verbose)
831  {
832  std::cout << "\nReceived discovery update from "
833  << srcAddr << ": " << srcPort << std::endl;
834  }
835 
836  this->DispatchDiscoveryMsg(srcAddr, rcvStr + sizeof(len), len);
837  }
838  }
839  else if (received < 0)
840  {
841  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
842  << std::endl;
843  }
844  }
845 
850  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
851  char *_msg, uint16_t _len)
852  {
853  gz::msgs::Discovery msg;
854 
855  // Parse the message, and return if parsing failed. Parsing could
856  // fail when another discovery node is publishing messages using an
857  // older (or newer) format.
858  if (!msg.ParseFromArray(_msg, _len))
859  return;
860 
861  // Discard the message if the wire protocol is different than mine.
862  if (this->Version() != msg.version())
863  return;
864 
865  std::string recvPUuid = msg.process_uuid();
866 
867  // Discard our own discovery messages.
868  if (recvPUuid == this->pUuid)
869  return;
870 
871  // Forwarding summary:
872  // - From a unicast peer -> to multicast group (with NO_RELAY flag).
873  // - From multicast group -> to unicast peers (with RELAY flag).
874 
875  // If the RELAY flag is set, this discovery message is coming via a
876  // unicast transmission. In this case, we don't process it, we just
877  // forward it to the multicast group, and it will be dispatched once
878  // received there. Note that we also unset the RELAY flag and set the
879  // NO_RELAY flag, to avoid forwarding the message anymore.
880  if (msg.has_flags() && msg.flags().relay())
881  {
882  // Unset the RELAY flag in the header and set the NO_RELAY.
883  msg.mutable_flags()->set_relay(false);
884  msg.mutable_flags()->set_no_relay(true);
885  this->SendMulticast(msg);
886 
887  // A unicast peer contacted me. I need to save its address for
888  // sending future messages in the future.
889  this->AddRelayAddress(_fromIp);
890  return;
891  }
892  // If we are receiving this discovery message via the multicast channel
893  // and the NO_RELAY flag is not set, we forward this message via unicast
894  // to all our relays. Note that this is the most common case, where we
895  // receive a regular multicast message and we forward it to any remote
896  // relays.
897  else if (!msg.has_flags() || !msg.flags().no_relay())
898  {
899  msg.mutable_flags()->set_relay(true);
900  this->SendUnicast(msg);
901  }
902 
903  bool isSenderLocal = (std::find(this->hostInterfaces.begin(),
904  this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
905  (_fromIp.find("127.") == 0);
906 
907  // Update timestamp and cache the callbacks.
908  DiscoveryCallback<Pub> connectCb;
909  DiscoveryCallback<Pub> disconnectCb;
910  DiscoveryCallback<Pub> registerCb;
911  DiscoveryCallback<Pub> unregisterCb;
912  {
913  std::lock_guard<std::mutex> lock(this->mutex);
914  this->activity[recvPUuid] = std::chrono::steady_clock::now();
915  connectCb = this->connectionCb;
916  disconnectCb = this->disconnectionCb;
917  registerCb = this->registrationCb;
918  unregisterCb = this->unregistrationCb;
919  }
920 
921  switch (msg.type())
922  {
923  case msgs::Discovery::ADVERTISE:
924  {
925  // Read the rest of the fields.
926  Pub publisher;
927  publisher.SetFromDiscovery(msg);
928 
929  // Check scope of the topic.
930  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
931  (publisher.Options().Scope() == Scope_t::HOST &&
932  !isSenderLocal))
933  {
934  return;
935  }
936 
937  // Register an advertised address for the topic.
938  bool added;
939  {
940  std::lock_guard<std::mutex> lock(this->mutex);
941  added = this->info.AddPublisher(publisher);
942  }
943 
944  if (added && connectCb)
945  {
946  // Execute the client's callback.
947  connectCb(publisher);
948  }
949 
950  break;
951  }
952  case msgs::Discovery::SUBSCRIBE:
953  {
954  std::string recvTopic;
955  // Read the topic information.
956  if (msg.has_sub())
957  {
958  recvTopic = msg.sub().topic();
959  }
960  else
961  {
962  std::cerr << "Subscription discovery message is missing "
963  << "Subscriber information.\n";
964  break;
965  }
966 
967  // Check if at least one of my nodes advertises the topic requested.
968  Addresses_M<Pub> addresses;
969  {
970  std::lock_guard<std::mutex> lock(this->mutex);
971  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
972  {
973  break;
974  }
975 
976  if (!this->info.Publishers(recvTopic, addresses))
977  break;
978  }
979 
980  for (const auto &nodeInfo : addresses[this->pUuid])
981  {
982  // Check scope of the topic.
983  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
984  (nodeInfo.Options().Scope() == Scope_t::HOST &&
985  !isSenderLocal))
986  {
987  continue;
988  }
989 
990  // Answer an ADVERTISE message.
991  this->SendMsg(DestinationType::ALL,
992  msgs::Discovery::ADVERTISE, nodeInfo);
993  }
994 
995  break;
996  }
997  case msgs::Discovery::NEW_CONNECTION:
998  {
999  // Read the rest of the fields.
1000  Pub publisher;
1001  publisher.SetFromDiscovery(msg);
1002 
1003  if (registerCb)
1004  registerCb(publisher);
1005 
1006  break;
1007  }
1008  case msgs::Discovery::END_CONNECTION:
1009  {
1010  // Read the rest of the fields.
1011  Pub publisher;
1012  publisher.SetFromDiscovery(msg);
1013 
1014  if (unregisterCb)
1015  unregisterCb(publisher);
1016 
1017  break;
1018  }
1019  case msgs::Discovery::HEARTBEAT:
1020  {
1021  // The timestamp has already been updated.
1022  break;
1023  }
1024  case msgs::Discovery::BYE:
1025  {
1026  // Remove the activity entry for this publisher.
1027  {
1028  std::lock_guard<std::mutex> lock(this->mutex);
1029  this->activity.erase(recvPUuid);
1030  }
1031 
1032  if (disconnectCb)
1033  {
1034  Pub pub;
1035  pub.SetPUuid(recvPUuid);
1036  // Notify the new disconnection.
1037  disconnectCb(pub);
1038  }
1039 
1040  // Remove the address entry for this topic.
1041  {
1042  std::lock_guard<std::mutex> lock(this->mutex);
1043  this->info.DelPublishersByProc(recvPUuid);
1044  }
1045 
1046  break;
1047  }
1048  case msgs::Discovery::UNADVERTISE:
1049  {
1050  // Read the address.
1051  Pub publisher;
1052  publisher.SetFromDiscovery(msg);
1053 
1054  // Check scope of the topic.
1055  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1056  (publisher.Options().Scope() == Scope_t::HOST &&
1057  !isSenderLocal))
1058  {
1059  return;
1060  }
1061 
1062  if (disconnectCb)
1063  {
1064  // Notify the new disconnection.
1065  disconnectCb(publisher);
1066  }
1067 
1068  // Remove the address entry for this topic.
1069  {
1070  std::lock_guard<std::mutex> lock(this->mutex);
1071  this->info.DelPublisherByNode(publisher.Topic(),
1072  publisher.PUuid(), publisher.NUuid());
1073  }
1074 
1075  break;
1076  }
1077  default:
1078  {
1079  std::cerr << "Unknown message type [" << msg.type() << "].\n";
1080  break;
1081  }
1082  }
1083  }
1084 
1091  private: template<typename T>
1092  void SendMsg(const DestinationType &_destType,
1093  const msgs::Discovery::Type _type,
1094  const T &_pub) const
1095  {
1096  gz::msgs::Discovery discoveryMsg;
1097  discoveryMsg.set_version(this->Version());
1098  discoveryMsg.set_type(_type);
1099  discoveryMsg.set_process_uuid(this->pUuid);
1100 
1101  switch (_type)
1102  {
1103  case msgs::Discovery::ADVERTISE:
1104  case msgs::Discovery::UNADVERTISE:
1105  case msgs::Discovery::NEW_CONNECTION:
1106  case msgs::Discovery::END_CONNECTION:
1107  {
1108  _pub.FillDiscovery(discoveryMsg);
1109  break;
1110  }
1111  case msgs::Discovery::SUBSCRIBE:
1112  {
1113  discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1114  break;
1115  }
1116  case msgs::Discovery::HEARTBEAT:
1117  case msgs::Discovery::BYE:
1118  break;
1119  default:
1120  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1121  << " type [" << _type << "]" << std::endl;
1122  return;
1123  }
1124 
1125  if (_destType == DestinationType::MULTICAST ||
1126  _destType == DestinationType::ALL)
1127  {
1128  this->SendMulticast(discoveryMsg);
1129  }
1130 
1131  // Send the discovery message to the unicast relays.
1132  if (_destType == DestinationType::UNICAST ||
1133  _destType == DestinationType::ALL)
1134  {
1135  // Set the RELAY flag in the header.
1136  discoveryMsg.mutable_flags()->set_relay(true);
1137  this->SendUnicast(discoveryMsg);
1138  }
1139 
1140  if (this->verbose)
1141  {
1142  std::cout << "\t* Sending " << msgs::ToString(_type)
1143  << " msg [" << _pub.Topic() << "]" << std::endl;
1144  }
1145  }
1146 
1149  private: void SendUnicast(const msgs::Discovery &_msg) const
1150  {
1151  uint16_t msgSize;
1152 
1153 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1154  size_t msgSizeFull = _msg.ByteSizeLong();
1155 #else
1156  int msgSizeFull = _msg.ByteSize();
1157 #endif
1158  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1159  {
1160  std::cerr << "Discovery message too large to send. Discovery won't "
1161  << "work. This shouldn't happen.\n";
1162  return;
1163  }
1164  msgSize = msgSizeFull;
1165 
1166  uint16_t totalSize = sizeof(msgSize) + msgSize;
1167  char *buffer = static_cast<char *>(new char[totalSize]);
1168  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1169 
1170  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1171  {
1172  // Send the discovery message to the unicast relays.
1173  for (const auto &sockAddr : this->relayAddrs)
1174  {
1175  errno = 0;
1176  auto sent = sendto(this->sockets.at(0),
1177  reinterpret_cast<const raw_type *>(
1178  reinterpret_cast<const unsigned char*>(buffer)),
1179  totalSize, 0,
1180  reinterpret_cast<const sockaddr *>(&sockAddr),
1181  sizeof(sockAddr));
1182 
1183  if (sent != totalSize)
1184  {
1185  std::cerr << "Exception sending a unicast message:" << std::endl;
1186  std::cerr << " Return value: " << sent << std::endl;
1187  std::cerr << " Error code: " << strerror(errno) << std::endl;
1188  break;
1189  }
1190  }
1191  }
1192  else
1193  {
1194  std::cerr << "Discovery::SendUnicast: Error serializing data."
1195  << std::endl;
1196  }
1197 
1198  delete [] buffer;
1199  }
1200 
1203  private: void SendMulticast(const msgs::Discovery &_msg) const
1204  {
1205  uint16_t msgSize;
1206 
1207 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1208  size_t msgSizeFull = _msg.ByteSizeLong();
1209 #else
1210  int msgSizeFull = _msg.ByteSize();
1211 #endif
1212  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1213  {
1214  std::cerr << "Discovery message too large to send. Discovery won't "
1215  << "work. This shouldn't happen.\n";
1216  return;
1217  }
1218 
1219  msgSize = msgSizeFull;
1220  uint16_t totalSize = sizeof(msgSize) + msgSize;
1221  char *buffer = static_cast<char *>(new char[totalSize]);
1222  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1223 
1224  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1225  {
1226  // Send the discovery message to the multicast group through all the
1227  // sockets.
1228  for (const auto &sock : this->Sockets())
1229  {
1230  errno = 0;
1231  if (sendto(sock, reinterpret_cast<const raw_type *>(
1232  reinterpret_cast<const unsigned char*>(buffer)),
1233  totalSize, 0,
1234  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1235  sizeof(*(this->MulticastAddr()))) != totalSize)
1236  {
1237  // Ignore EPERM and ENOBUFS errors.
1238  //
1239  // See issue #106
1240  //
1241  // Rationale drawn from:
1242  //
1243  // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1244  // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1245  if (errno != EPERM && errno != ENOBUFS)
1246  {
1247  std::cerr << "Exception sending a multicast message:"
1248  << strerror(errno) << std::endl;
1249  }
1250  break;
1251  }
1252  }
1253  }
1254  else
1255  {
1256  std::cerr << "Discovery::SendMulticast: Error serializing data."
1257  << std::endl;
1258  }
1259 
1260  delete [] buffer;
1261  }
1262 
1265  private: const std::vector<int> &Sockets() const
1266  {
1267  return this->sockets;
1268  }
1269 
1272  private: const sockaddr_in *MulticastAddr() const
1273  {
1274  return &this->mcastAddr;
1275  }
1276 
1279  private: uint8_t Version() const
1280  {
1281  static std::string ignStats;
1282  static int topicStats =
1283  (env("IGN_TRANSPORT_TOPIC_STATISTICS", ignStats) && ignStats == "1");
1284  return this->kWireVersion + (topicStats * 100);
1285  }
1286 
1291  private: bool RegisterNetIface(const std::string &_ip)
1292  {
1293  // Make a new socket for sending discovery information.
1294  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1295  if (sock < 0)
1296  {
1297  std::cerr << "Socket creation failed." << std::endl;
1298  return false;
1299  }
1300 
1301  // Socket option: IP_MULTICAST_IF.
1302  // This socket option needs to be applied to each socket used to send
1303  // data. This option selects the source interface for outgoing messages.
1304  struct in_addr ifAddr;
1305  ifAddr.s_addr = inet_addr(_ip.c_str());
1306  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1307  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1308  {
1309  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1310  << std::endl;
1311  return false;
1312  }
1313 
1314  this->sockets.push_back(sock);
1315 
1316  // Join the multicast group. We have to do it for each network interface
1317  // but we can do it on the same socket. We will use the socket at
1318  // position 0 for receiving multicast information.
1319  struct ip_mreq group;
1320  group.imr_multiaddr.s_addr =
1321  inet_addr(this->multicastGroup.c_str());
1322  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1323  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1324  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1325  {
1326  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1327  << std::endl;
1328  return false;
1329  }
1330 
1331  return true;
1332  }
1333 
1336  private: void AddRelayAddress(const std::string &_ip)
1337  {
1338  // Sanity check: Make sure that this IP address is not already saved.
1339  for (auto const &addr : this->relayAddrs)
1340  {
1341  if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
1342  return;
1343  }
1344 
1345  sockaddr_in addr;
1346  memset(&addr, 0, sizeof(addr));
1347  addr.sin_family = AF_INET;
1348  addr.sin_addr.s_addr = inet_addr(_ip.c_str());
1349  addr.sin_port = htons(static_cast<u_short>(this->port));
1350 
1351  this->relayAddrs.push_back(addr);
1352  }
1353 
1357  private: static const unsigned int kDefActivityInterval = 100;
1358 
1362  private: static const unsigned int kDefHeartbeatInterval = 1000;
1363 
1367  private: static const unsigned int kDefSilenceInterval = 3000;
1368 
1370  private: std::string multicastGroup;
1371 
1373  private: const int kTimeout = 250;
1374 
1376  private: static const uint16_t kMaxRcvStr =
1378 
1381  private: static const uint8_t kWireVersion = 10;
1382 
1384  private: int port;
1385 
1387  private: std::string hostAddr;
1388 
1390  private: std::vector<std::string> hostInterfaces;
1391 
1393  private: std::string pUuid;
1394 
1398  private: unsigned int silenceInterval;
1399 
1403  private: unsigned int activityInterval;
1404 
1408  private: unsigned int heartbeatInterval;
1409 
1411  private: DiscoveryCallback<Pub> connectionCb;
1412 
1414  private: DiscoveryCallback<Pub> disconnectionCb;
1415 
1417  private: DiscoveryCallback<Pub> registrationCb;
1418 
1420  private: DiscoveryCallback<Pub> unregistrationCb;
1421 
1423  private: TopicStorage<Pub> info;
1424 
1430 
1432  private: bool verbose;
1433 
1435  private: std::vector<int> sockets;
1436 
1438  private: sockaddr_in mcastAddr;
1439 
1441  private: std::vector<sockaddr_in> relayAddrs;
1442 
1444  private: mutable std::mutex mutex;
1445 
1447  private: std::thread threadReception;
1448 
1450  private: Timestamp timeNextHeartbeat;
1451 
1453  private: Timestamp timeNextActivity;
1454 
1456  private: std::mutex exitMutex;
1457 
1462  private: bool initialized;
1463 
1465  private: unsigned int numHeartbeatsUninitialized;
1466 
1468  private: mutable std::condition_variable initializedCv;
1469 
1471  private: bool exit;
1472 
1474  private: bool enabled;
1475  };
1476 
1480 
1484  }
1485  }
1486 }
1487 
1488 #endif
void Register(const MessagePublisher &_pub) const
Register a node from this process as a remote subscriber.
Definition: gz/transport/Discovery.hh:387
void UnregistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a remote node unsubscribes to a topic within this proces...
Definition: gz/transport/Discovery.hh:554
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected,...
Definition: gz/transport/Discovery.hh:526
bool Publishers(const std::string &_topic, std::map< std::string, std::vector< T >> &_info) const
Get the map of publishers stored for a given topic.
Definition: gz/transport/TopicStorage.hh:207
T lock(T... args)
bool DelPublisherByNode(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid)
Remove a publisher associated to a given topic and UUID pair.
Definition: gz/transport/TopicStorage.hh:222
@ HOST
Topic/service only available to subscribers in the same machine as the publisher.
Definition: gz/transport/AdvertiseOptions.hh:28
STL class.
This class stores all the information about a publisher. It stores the topic name that publishes,...
Definition: gz/transport/Publisher.hh:44
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: gz/transport/Discovery.hh:1429
T find(T... args)
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: gz/transport/Discovery.hh:498
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds.
Definition: gz/transport/Discovery.hh:468
@ ALL
Topic/service available to any subscriber (default scope).
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::vector< std::string > split(const std::string &_orig, char _delim)
split at a one character delimiter to get a vector of something
Discovery(const std::string &_pUuid, const std::string &_ip, const int _port, const bool _verbose=false)
Constructor.
Definition: gz/transport/Discovery.hh:125
STL class.
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: gz/transport/Discovery.hh:613
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently stored.
Definition: gz/transport/TopicStorage.hh:339
T strerror(T... args)
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: gz/transport/Discovery.hh:403
This class stores all the information about a message publisher.
Definition: gz/transport/Publisher.hh:181
A class for customizing the publication options for a topic or service advertised....
Definition: gz/transport/AdvertiseOptions.hh:59
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: gz/transport/Discovery.hh:489
T unlock(T... args)
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
virtual ~Discovery()
Destructor.
Definition: gz/transport/Discovery.hh:250
T push_back(T... args)
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes....
Definition: gz/transport/Discovery.hh:479
void Unregister(const MessagePublisher &_pub) const
Unregister a node from this process as a remote subscriber.
Definition: gz/transport/Discovery.hh:395
T joinable(T... args)
void PrintCurrentState() const
Print the current discovery state.
Definition: gz/transport/Discovery.hh:561
std::chrono::steady_clock::time_point Timestamp
Definition: gz/transport/TransportTypes.hh:155
STL class.
T at(T... args)
bool DelPublishersByProc(const std::string &_pUuid)
Remove all the publishers associated to a given process.
Definition: gz/transport/TopicStorage.hh:262
bool AddPublisher(const T &_publisher)
Add a new address associated to a given topic and node UUID.
Definition: gz/transport/TopicStorage.hh:53
T c_str(T... args)
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: gz/transport/Discovery.hh:427
T erase(T... args)
STL class.
@ ALL
Send data via unicast and multicast.
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: gz/transport/Discovery.hh:117
T min(T... args)
DestinationType
Options for sending discovery messages.
Definition: gz/transport/Discovery.hh:88
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: gz/transport/Discovery.hh:516
bool HasAnyPublishers(const std::string &_topic, const std::string &_pUuid) const
Return if there is any publisher stored for the given topic and process UUID.
Definition: gz/transport/TopicStorage.hh:134
T endl(T... args)
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: gz/transport/Discovery.hh:604
T duration_cast(T... args)
std::string HostAddr() const
Get the IP address of this host.
Definition: gz/transport/Discovery.hh:458
T boolalpha(T... args)
T cbegin(T... args)
void PublishersByProc(const std::string &_pUuid, std::map< std::string, std::vector< T >> &_pubs) const
Given a process UUID, the function returns the list of publishers contained in this process UUID with...
Definition: gz/transport/TopicStorage.hh:286
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active,...
Definition: gz/transport/Discovery.hh:536
void Print() const
Print all the information for debugging purposes.
Definition: gz/transport/TopicStorage.hh:346
T empty(T... args)
@ UNICAST
Send data via unicast only.
STL class.
T memcpy(T... args)
T end(T... args)
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: gz/transport/Discovery.hh:507
T max(T... args)
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: gz/transport/Discovery.hh:281
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: gz/transport/Discovery.hh:337
@ PROCESS
Topic/service only available to subscribers in the same process as the publisher.
void RegistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a new remote node subscribes to a topic within this proc...
Definition: gz/transport/Discovery.hh:545
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: gz/transport/Discovery.hh:305
T join(T... args)
T memset(T... args)
void raw_type
Definition: gz/transport/Discovery.hh:45
bool Publisher(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid, T &_publisher) const
Get the address information for a given topic and node UUID.
Definition: gz/transport/TopicStorage.hh:169
@ MULTICAST
Send data via multicast only.
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github....
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: gz/transport/Discovery.hh:413