Ignition Transport

API Reference

11.0.0
Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
20 #include <errno.h>
21 #include <string.h>
22 
23 #ifdef _WIN32
24  // For socket(), connect(), send(), and recv().
25  #include <Winsock2.h>
26  #include <Ws2def.h>
27  #include <Ws2ipdef.h>
28  #include <Ws2tcpip.h>
29  // Type used for raw data on this platform.
30  using raw_type = char;
31 #else
32  // For data types
33  #include <sys/types.h>
34  // For socket(), connect(), send(), and recv()
35  #include <sys/socket.h>
36  // For gethostbyname()
37  #include <netdb.h>
38  // For inet_addr()
39  #include <arpa/inet.h>
40  // For close()
41  #include <unistd.h>
42  // For sockaddr_in
43  #include <netinet/in.h>
44  // Type used for raw data on this platform
45  using raw_type = void;
46 #endif
47 
48 #ifdef _WIN32
49  #pragma warning(push, 0)
50 #endif
51 #ifdef _WIN32
52  #pragma warning(pop)
53  // Suppress "decorated name length exceed" warning in STL.
54  #pragma warning(disable: 4503)
55  // Suppress "depreted API warnings" in WINSOCK.
56  #pragma warning(disable: 4996)
57 #endif
58 
59 #include <ignition/msgs/discovery.pb.h>
60 
61 #include <algorithm>
62 #include <condition_variable>
63 #include <limits>
64 #include <map>
65 #include <memory>
66 #include <mutex>
67 #include <string>
68 #include <thread>
69 #include <vector>
70 
71 #include <ignition/msgs/Utility.hh>
72 
73 #include "ignition/transport/config.hh"
74 #include "ignition/transport/Export.hh"
80 
81 namespace ignition
82 {
83  namespace transport
84  {
85  // Inline bracket to help doxygen filtering.
86  inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
88  enum class DestinationType
89  {
91  UNICAST,
93  MULTICAST,
95  ALL
96  };
97 
98  //
104  bool IGNITION_TRANSPORT_VISIBLE pollSockets(
105  const std::vector<int> &_sockets,
106  const int _timeout);
107 
116  template<typename Pub>
117  class Discovery
118  {
125  public: Discovery(const std::string &_pUuid,
126  const std::string &_ip,
127  const int _port,
128  const bool _verbose = false)
129  : multicastGroup(_ip),
130  port(_port),
131  hostAddr(determineHost()),
132  pUuid(_pUuid),
133  silenceInterval(kDefSilenceInterval),
134  activityInterval(kDefActivityInterval),
135  heartbeatInterval(kDefHeartbeatInterval),
136  connectionCb(nullptr),
137  disconnectionCb(nullptr),
138  verbose(_verbose),
139  initialized(false),
140  numHeartbeatsUninitialized(0),
141  exit(false),
142  enabled(false)
143  {
144  std::string ignIp;
145  if (env("IGN_IP", ignIp) && !ignIp.empty())
146  this->hostInterfaces = {ignIp};
147  else
148  {
149  // Get the list of network interfaces in this host.
150  this->hostInterfaces = determineInterfaces();
151  }
152 
153 #ifdef _WIN32
154  WORD wVersionRequested;
155  WSADATA wsaData;
156 
157  // Request WinSock v2.2.
158  wVersionRequested = MAKEWORD(2, 2);
159  // Load WinSock DLL.
160  if (WSAStartup(wVersionRequested, &wsaData) != 0)
161  {
162  std::cerr << "Unable to load WinSock DLL" << std::endl;
163  return;
164  }
165 #endif
166  for (const auto &netIface : this->hostInterfaces)
167  {
168  auto succeed = this->RegisterNetIface(netIface);
169 
170  // If the IP address that we're selecting as the main IP address of
171  // the host is invalid, we change it to 127.0.0.1 .
172  // This is probably because IGN_IP is set to a wrong value.
173  if (netIface == this->hostAddr && !succeed)
174  {
175  this->RegisterNetIface("127.0.0.1");
176  std::cerr << "Did you set the environment variable IGN_IP with a "
177  << "correct IP address? " << std::endl
178  << " [" << netIface << "] seems an invalid local IP "
179  << "address." << std::endl
180  << " Using 127.0.0.1 as hostname." << std::endl;
181  this->hostAddr = "127.0.0.1";
182  }
183  }
184 
185  // Socket option: SO_REUSEADDR. This options is used only for receiving
186  // data. We can reuse the same socket for receiving multicast data from
187  // multiple interfaces. We will use the socket at position 0 for
188  // receiving data.
189  int reuseAddr = 1;
190  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
191  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
192  {
193  std::cerr << "Error setting socket option (SO_REUSEADDR)."
194  << std::endl;
195  return;
196  }
197 
198 #ifdef SO_REUSEPORT
199  // Socket option: SO_REUSEPORT. This options is used only for receiving
200  // data. We can reuse the same socket for receiving multicast data from
201  // multiple interfaces. We will use the socket at position 0 for
202  // receiving data.
203  int reusePort = 1;
204  // cppcheck-suppress ConfigurationNotChecked
205  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
206  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
207  {
208  std::cerr << "Error setting socket option (SO_REUSEPORT)."
209  << std::endl;
210  return;
211  }
212 #endif
213  // Bind the first socket to the discovery port.
214  sockaddr_in localAddr;
215  memset(&localAddr, 0, sizeof(localAddr));
216  localAddr.sin_family = AF_INET;
217  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
218  localAddr.sin_port = htons(static_cast<u_short>(this->port));
219 
220  if (bind(this->sockets.at(0),
221  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
222  {
223  std::cerr << "Binding to a local port failed." << std::endl;
224  return;
225  }
226 
227  // Set 'mcastAddr' to the multicast discovery group.
228  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
229  this->mcastAddr.sin_family = AF_INET;
230  this->mcastAddr.sin_addr.s_addr =
231  inet_addr(this->multicastGroup.c_str());
232  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
233 
235  std::string ignRelay = "";
236  if (env("IGN_RELAY", ignRelay) && !ignRelay.empty())
237  {
238  relays = transport::split(ignRelay, ':');
239  }
240 
241  // Register all unicast relays.
242  for (auto const &relayAddr : relays)
243  this->AddRelayAddress(relayAddr);
244 
245  if (this->verbose)
246  this->PrintCurrentState();
247  }
248 
250  public: virtual ~Discovery()
251  {
252  // Tell the service thread to terminate.
253  this->exitMutex.lock();
254  this->exit = true;
255  this->exitMutex.unlock();
256 
257  // Wait for the service threads to finish before exit.
258  if (this->threadReception.joinable())
259  this->threadReception.join();
260 
261  // Broadcast a BYE message to trigger the remote cancellation of
262  // all our advertised topics.
263  this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
264  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
265 
266  // Close sockets.
267  for (const auto &sock : this->sockets)
268  {
269 #ifdef _WIN32
270  closesocket(sock);
271  WSACleanup();
272 #else
273  close(sock);
274 #endif
275  }
276  }
277 
281  public: void Start()
282  {
283  {
284  std::lock_guard<std::mutex> lock(this->mutex);
285 
286  // The service is already running.
287  if (this->enabled)
288  return;
289 
290  this->enabled = true;
291  }
292 
293  auto now = std::chrono::steady_clock::now();
294  this->timeNextHeartbeat = now;
295  this->timeNextActivity = now;
296 
297  // Start the thread that receives discovery information.
298  this->threadReception = std::thread(&Discovery::RecvMessages, this);
299  }
300 
305  public: bool Advertise(const Pub &_publisher)
306  {
307  {
308  std::lock_guard<std::mutex> lock(this->mutex);
309 
310  if (!this->enabled)
311  return false;
312 
313  // Add the addressing information (local publisher).
314  if (!this->info.AddPublisher(_publisher))
315  return false;
316  }
317 
318  // Only advertise a message outside this process if the scope
319  // is not 'Process'
320  if (_publisher.Options().Scope() != Scope_t::PROCESS)
321  this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
322  _publisher);
323 
324  return true;
325  }
326 
337  public: bool Discover(const std::string &_topic) const
338  {
340  bool found;
341  Addresses_M<Pub> addresses;
342 
343  {
344  std::lock_guard<std::mutex> lock(this->mutex);
345 
346  if (!this->enabled)
347  return false;
348 
349  cb = this->connectionCb;
350  }
351 
352  Pub pub;
353  pub.SetTopic(_topic);
354  pub.SetPUuid(this->pUuid);
355 
356  // Send a discovery request.
357  this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
358 
359  {
360  std::lock_guard<std::mutex> lock(this->mutex);
361  found = this->info.Publishers(_topic, addresses);
362  }
363 
364  if (found)
365  {
366  // I already have information about this topic.
367  for (const auto &proc : addresses)
368  {
369  for (const auto &node : proc.second)
370  {
371  if (cb)
372  {
373  // Execute the user's callback for a service request. Notice
374  // that we only execute one callback for preventing receive
375  // multiple service responses for a single request.
376  cb(node);
377  }
378  }
379  }
380  }
381 
382  return true;
383  }
384 
387  public: void Register(const MessagePublisher &_pub) const
388  {
389  this->SendMsg(
390  DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
391  }
392 
395  public: void Unregister(const MessagePublisher &_pub) const
396  {
397  this->SendMsg(
398  DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
399  }
400 
403  public: const TopicStorage<Pub> &Info() const
404  {
405  std::lock_guard<std::mutex> lock(this->mutex);
406  return this->info;
407  }
408 
413  public: bool Publishers(const std::string &_topic,
414  Addresses_M<Pub> &_publishers) const
415  {
416  std::lock_guard<std::mutex> lock(this->mutex);
417  return this->info.Publishers(_topic, _publishers);
418  }
419 
427  public: bool Unadvertise(const std::string &_topic,
428  const std::string &_nUuid)
429  {
430  Pub inf;
431  {
432  std::lock_guard<std::mutex> lock(this->mutex);
433 
434  if (!this->enabled)
435  return false;
436 
437  // Don't do anything if the topic is not advertised by any of my nodes
438  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
439  return true;
440 
441  // Remove the topic information.
442  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
443  }
444 
445  // Only unadvertise a message outside this process if the scope
446  // is not 'Process'.
447  if (inf.Options().Scope() != Scope_t::PROCESS)
448  {
449  this->SendMsg(DestinationType::ALL,
450  msgs::Discovery::UNADVERTISE, inf);
451  }
452 
453  return true;
454  }
455 
458  public: std::string HostAddr() const
459  {
460  std::lock_guard<std::mutex> lock(this->mutex);
461  return this->hostAddr;
462  }
463 
468  public: unsigned int ActivityInterval() const
469  {
470  std::lock_guard<std::mutex> lock(this->mutex);
471  return this->activityInterval;
472  }
473 
479  public: unsigned int HeartbeatInterval() const
480  {
481  std::lock_guard<std::mutex> lock(this->mutex);
482  return this->heartbeatInterval;
483  }
484 
489  public: unsigned int SilenceInterval() const
490  {
491  std::lock_guard<std::mutex> lock(this->mutex);
492  return this->silenceInterval;
493  }
494 
498  public: void SetActivityInterval(const unsigned int _ms)
499  {
500  std::lock_guard<std::mutex> lock(this->mutex);
501  this->activityInterval = _ms;
502  }
503 
507  public: void SetHeartbeatInterval(const unsigned int _ms)
508  {
509  std::lock_guard<std::mutex> lock(this->mutex);
510  this->heartbeatInterval = _ms;
511  }
512 
516  public: void SetSilenceInterval(const unsigned int _ms)
517  {
518  std::lock_guard<std::mutex> lock(this->mutex);
519  this->silenceInterval = _ms;
520  }
521 
526  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
527  {
528  std::lock_guard<std::mutex> lock(this->mutex);
529  this->connectionCb = _cb;
530  }
531 
536  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
537  {
538  std::lock_guard<std::mutex> lock(this->mutex);
539  this->disconnectionCb = _cb;
540  }
541 
545  public: void RegistrationsCb(const DiscoveryCallback<Pub> &_cb)
546  {
547  std::lock_guard<std::mutex> lock(this->mutex);
548  this->registrationCb = _cb;
549  }
550 
554  public: void UnregistrationsCb(const DiscoveryCallback<Pub> &_cb)
555  {
556  std::lock_guard<std::mutex> lock(this->mutex);
557  this->unregistrationCb = _cb;
558  }
559 
561  public: void PrintCurrentState() const
562  {
563  std::lock_guard<std::mutex> lock(this->mutex);
564 
565  std::cout << "---------------" << std::endl;
566  std::cout << std::boolalpha << "Enabled: "
567  << this->enabled << std::endl;
568  std::cout << "Discovery state" << std::endl;
569  std::cout << "\tUUID: " << this->pUuid << std::endl;
570  std::cout << "Settings" << std::endl;
571  std::cout << "\tActivity: " << this->activityInterval
572  << " ms." << std::endl;
573  std::cout << "\tHeartbeat: " << this->heartbeatInterval
574  << "ms." << std::endl;
575  std::cout << "\tSilence: " << this->silenceInterval
576  << " ms." << std::endl;
577  std::cout << "Known information:" << std::endl;
578  this->info.Print();
579 
580  // Used to calculate the elapsed time.
582 
583  std::cout << "Activity" << std::endl;
584  if (this->activity.empty())
585  std::cout << "\t<empty>" << std::endl;
586  else
587  {
588  for (auto &proc : this->activity)
589  {
590  // Elapsed time since the last update from this publisher.
591  std::chrono::duration<double> elapsed = now - proc.second;
592 
593  std::cout << "\t" << proc.first << std::endl;
594  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
595  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
596  << std::endl;
597  }
598  }
599  std::cout << "---------------" << std::endl;
600  }
601 
604  public: void TopicList(std::vector<std::string> &_topics) const
605  {
606  this->WaitForInit();
607  std::lock_guard<std::mutex> lock(this->mutex);
608  this->info.TopicList(_topics);
609  }
610 
613  public: void WaitForInit() const
614  {
615  std::unique_lock<std::mutex> lk(this->mutex);
616 
617  if (!this->initialized)
618  {
619  this->initializedCv.wait(lk, [this]{return this->initialized;});
620  }
621  }
622 
626  private: void UpdateActivity()
627  {
628  // The UUIDs of the processes that have expired.
630 
631  // A copy of the disconnection callback.
632  DiscoveryCallback<Pub> disconnectCb;
633 
635 
636  {
637  std::lock_guard<std::mutex> lock(this->mutex);
638 
639  if (now < this->timeNextActivity)
640  return;
641 
642  disconnectCb = this->disconnectionCb;
643 
644  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
645  {
646  // Elapsed time since the last update from this publisher.
647  auto elapsed = now - it->second;
648 
649  // This publisher has expired.
650  if (std::chrono::duration_cast<std::chrono::milliseconds>
651  (elapsed).count() > this->silenceInterval)
652  {
653  // Remove all the info entries for this process UUID.
654  this->info.DelPublishersByProc(it->first);
655 
656  uuids.push_back(it->first);
657 
658  // Remove the activity entry.
659  this->activity.erase(it++);
660  }
661  else
662  ++it;
663  }
664 
665  this->timeNextActivity = std::chrono::steady_clock::now() +
666  std::chrono::milliseconds(this->activityInterval);
667  }
668 
669  if (!disconnectCb)
670  return;
671 
672  // Notify without topic information. This is useful to inform the
673  // client that a remote node is gone, even if we were not
674  // interested in its topics.
675  for (auto const &uuid : uuids)
676  {
677  Pub publisher;
678  publisher.SetPUuid(uuid);
679  disconnectCb(publisher);
680  }
681  }
682 
684  private: void UpdateHeartbeat()
685  {
687 
688  {
689  std::lock_guard<std::mutex> lock(this->mutex);
690 
691  if (now < this->timeNextHeartbeat)
692  return;
693  }
694 
695  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
696  this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
697 
699  {
700  std::lock_guard<std::mutex> lock(this->mutex);
701 
702  // Re-advertise topics that are advertised inside this process.
703  this->info.PublishersByProc(this->pUuid, nodes);
704  }
705 
706  for (const auto &topic : nodes)
707  {
708  for (const auto &node : topic.second)
709  {
710  this->SendMsg(DestinationType::ALL,
711  msgs::Discovery::ADVERTISE, node);
712  }
713  }
714 
715  {
716  std::lock_guard<std::mutex> lock(this->mutex);
717  if (!this->initialized)
718  {
719  ++this->numHeartbeatsUninitialized;
720  if (this->numHeartbeatsUninitialized == 2)
721  {
722  // We consider the discovery initialized after two cycles of
723  // heartbeats sent.
724  this->initialized = true;
725 
726  // Notify anyone waiting for the initialization phase to finish.
727  this->initializedCv.notify_all();
728  }
729  }
730 
731  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
732  std::chrono::milliseconds(this->heartbeatInterval);
733  }
734  }
735 
745  private: int NextTimeout() const
746  {
747  auto now = std::chrono::steady_clock::now();
748  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
749  auto timeUntilNextActivity = this->timeNextActivity - now;
750 
751  int t = static_cast<int>(
753  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
754  int t2 = std::min(t, this->kTimeout);
755  return std::max(t2, 0);
756  }
757 
759  private: void RecvMessages()
760  {
761  bool timeToExit = false;
762  while (!timeToExit)
763  {
764  // Calculate the timeout.
765  int timeout = this->NextTimeout();
766 
767  if (pollSockets(this->sockets, timeout))
768  {
769  this->RecvDiscoveryUpdate();
770 
771  if (this->verbose)
772  this->PrintCurrentState();
773  }
774 
775  this->UpdateHeartbeat();
776  this->UpdateActivity();
777 
778  // Is it time to exit?
779  {
780  std::lock_guard<std::mutex> lock(this->exitMutex);
781  if (this->exit)
782  timeToExit = true;
783  }
784  }
785  }
786 
788  private: void RecvDiscoveryUpdate()
789  {
790  char rcvStr[Discovery::kMaxRcvStr];
791  sockaddr_in clntAddr;
792  socklen_t addrLen = sizeof(clntAddr);
793 
794  uint16_t received = recvfrom(this->sockets.at(0),
795  reinterpret_cast<raw_type *>(rcvStr),
796  this->kMaxRcvStr, 0,
797  reinterpret_cast<sockaddr *>(&clntAddr),
798  reinterpret_cast<socklen_t *>(&addrLen));
799  if (received > 0)
800  {
801  uint16_t len = 0;
802  memcpy(&len, &rcvStr[0], sizeof(len));
803 
804  // Ignition Transport delimits each discovery message with a
805  // frame_delimiter that contains byte size information.
806  // A discovery message has the form:
807  //
808  // <frame_delimiter><frame_body>
809  //
810  // Ignition Transport version < 8 sends a frame delimiter that
811  // contains the value of sizeof(frame_delimiter)
812  // + sizeof(frame_body). In other words, the frame_delimiter
813  // contains a value that represents the total size of the
814  // frame_body and frame_delimiter in bytes.
815  //
816  // Ignition Transport version >= 8 sends a frame_delimiter
817  // that contains the value of sizeof(frame_body). In other
818  // words, the frame_delimiter contains a value that represents
819  // the total size of only the frame_body.
820  //
821  // It is possible that two incompatible versions of Ignition
822  // Transport exist on the same network. If we receive an
823  // unexpected size, then we ignore the message.
824 
825  // If-condition for version 8+
826  if (len + sizeof(len) == received)
827  {
828  std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
829  uint16_t srcPort = ntohs(clntAddr.sin_port);
830 
831  if (this->verbose)
832  {
833  std::cout << "\nReceived discovery update from "
834  << srcAddr << ": " << srcPort << std::endl;
835  }
836 
837  this->DispatchDiscoveryMsg(srcAddr, rcvStr + sizeof(len), len);
838  }
839  }
840  else if (received < 0)
841  {
842  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
843  << std::endl;
844  }
845  }
846 
851  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
852  char *_msg, uint16_t _len)
853  {
854  ignition::msgs::Discovery msg;
855 
856  // Parse the message, and return if parsing failed. Parsing could
857  // fail when another discovery node is publishing messages using an
858  // older (or newer) format.
859  if (!msg.ParseFromArray(_msg, _len))
860  return;
861 
862  // Discard the message if the wire protocol is different than mine.
863  if (this->Version() != msg.version())
864  return;
865 
866  std::string recvPUuid = msg.process_uuid();
867 
868  // Discard our own discovery messages.
869  if (recvPUuid == this->pUuid)
870  return;
871 
872  // Forwarding summary:
873  // - From a unicast peer -> to multicast group (with NO_RELAY flag).
874  // - From multicast group -> to unicast peers (with RELAY flag).
875 
876  // If the RELAY flag is set, this discovery message is coming via a
877  // unicast transmission. In this case, we don't process it, we just
878  // forward it to the multicast group, and it will be dispatched once
879  // received there. Note that we also unset the RELAY flag and set the
880  // NO_RELAY flag, to avoid forwarding the message anymore.
881  if (msg.has_flags() && msg.flags().relay())
882  {
883  // Unset the RELAY flag in the header and set the NO_RELAY.
884  msg.mutable_flags()->set_relay(false);
885  msg.mutable_flags()->set_no_relay(true);
886  this->SendMulticast(msg);
887 
888  // A unicast peer contacted me. I need to save its address for
889  // sending future messages in the future.
890  this->AddRelayAddress(_fromIp);
891  return;
892  }
893  // If we are receiving this discovery message via the multicast channel
894  // and the NO_RELAY flag is not set, we forward this message via unicast
895  // to all our relays. Note that this is the most common case, where we
896  // receive a regular multicast message and we forward it to any remote
897  // relays.
898  else if (!msg.has_flags() || !msg.flags().no_relay())
899  {
900  msg.mutable_flags()->set_relay(true);
901  this->SendUnicast(msg);
902  }
903 
904  bool isSenderLocal = (std::find(this->hostInterfaces.begin(),
905  this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
906  (_fromIp.find("127.") == 0);
907 
908  // Update timestamp and cache the callbacks.
909  DiscoveryCallback<Pub> connectCb;
910  DiscoveryCallback<Pub> disconnectCb;
911  DiscoveryCallback<Pub> registerCb;
912  DiscoveryCallback<Pub> unregisterCb;
913  {
914  std::lock_guard<std::mutex> lock(this->mutex);
915  this->activity[recvPUuid] = std::chrono::steady_clock::now();
916  connectCb = this->connectionCb;
917  disconnectCb = this->disconnectionCb;
918  registerCb = this->registrationCb;
919  unregisterCb = this->unregistrationCb;
920  }
921 
922  switch (msg.type())
923  {
924  case msgs::Discovery::ADVERTISE:
925  {
926  // Read the rest of the fields.
927  Pub publisher;
928  publisher.SetFromDiscovery(msg);
929 
930  // Check scope of the topic.
931  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
932  (publisher.Options().Scope() == Scope_t::HOST &&
933  !isSenderLocal))
934  {
935  return;
936  }
937 
938  // Register an advertised address for the topic.
939  bool added;
940  {
941  std::lock_guard<std::mutex> lock(this->mutex);
942  added = this->info.AddPublisher(publisher);
943  }
944 
945  if (added && connectCb)
946  {
947  // Execute the client's callback.
948  connectCb(publisher);
949  }
950 
951  break;
952  }
953  case msgs::Discovery::SUBSCRIBE:
954  {
955  std::string recvTopic;
956  // Read the topic information.
957  if (msg.has_sub())
958  {
959  recvTopic = msg.sub().topic();
960  }
961  else
962  {
963  std::cerr << "Subscription discovery message is missing "
964  << "Subscriber information.\n";
965  break;
966  }
967 
968  // Check if at least one of my nodes advertises the topic requested.
969  Addresses_M<Pub> addresses;
970  {
971  std::lock_guard<std::mutex> lock(this->mutex);
972  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
973  {
974  break;
975  }
976 
977  if (!this->info.Publishers(recvTopic, addresses))
978  break;
979  }
980 
981  for (const auto &nodeInfo : addresses[this->pUuid])
982  {
983  // Check scope of the topic.
984  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
985  (nodeInfo.Options().Scope() == Scope_t::HOST &&
986  !isSenderLocal))
987  {
988  continue;
989  }
990 
991  // Answer an ADVERTISE message.
992  this->SendMsg(DestinationType::ALL,
993  msgs::Discovery::ADVERTISE, nodeInfo);
994  }
995 
996  break;
997  }
998  case msgs::Discovery::NEW_CONNECTION:
999  {
1000  // Read the rest of the fields.
1001  Pub publisher;
1002  publisher.SetFromDiscovery(msg);
1003 
1004  if (registerCb)
1005  registerCb(publisher);
1006 
1007  break;
1008  }
1009  case msgs::Discovery::END_CONNECTION:
1010  {
1011  // Read the rest of the fields.
1012  Pub publisher;
1013  publisher.SetFromDiscovery(msg);
1014 
1015  if (unregisterCb)
1016  unregisterCb(publisher);
1017 
1018  break;
1019  }
1020  case msgs::Discovery::HEARTBEAT:
1021  {
1022  // The timestamp has already been updated.
1023  break;
1024  }
1025  case msgs::Discovery::BYE:
1026  {
1027  // Remove the activity entry for this publisher.
1028  {
1029  std::lock_guard<std::mutex> lock(this->mutex);
1030  this->activity.erase(recvPUuid);
1031  }
1032 
1033  if (disconnectCb)
1034  {
1035  Pub pub;
1036  pub.SetPUuid(recvPUuid);
1037  // Notify the new disconnection.
1038  disconnectCb(pub);
1039  }
1040 
1041  // Remove the address entry for this topic.
1042  {
1043  std::lock_guard<std::mutex> lock(this->mutex);
1044  this->info.DelPublishersByProc(recvPUuid);
1045  }
1046 
1047  break;
1048  }
1049  case msgs::Discovery::UNADVERTISE:
1050  {
1051  // Read the address.
1052  Pub publisher;
1053  publisher.SetFromDiscovery(msg);
1054 
1055  // Check scope of the topic.
1056  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1057  (publisher.Options().Scope() == Scope_t::HOST &&
1058  !isSenderLocal))
1059  {
1060  return;
1061  }
1062 
1063  if (disconnectCb)
1064  {
1065  // Notify the new disconnection.
1066  disconnectCb(publisher);
1067  }
1068 
1069  // Remove the address entry for this topic.
1070  {
1071  std::lock_guard<std::mutex> lock(this->mutex);
1072  this->info.DelPublisherByNode(publisher.Topic(),
1073  publisher.PUuid(), publisher.NUuid());
1074  }
1075 
1076  break;
1077  }
1078  default:
1079  {
1080  std::cerr << "Unknown message type [" << msg.type() << "].\n";
1081  break;
1082  }
1083  }
1084  }
1085 
1092  private: template<typename T>
1093  void SendMsg(const DestinationType &_destType,
1094  const msgs::Discovery::Type _type,
1095  const T &_pub) const
1096  {
1097  ignition::msgs::Discovery discoveryMsg;
1098  discoveryMsg.set_version(this->Version());
1099  discoveryMsg.set_type(_type);
1100  discoveryMsg.set_process_uuid(this->pUuid);
1101 
1102  switch (_type)
1103  {
1104  case msgs::Discovery::ADVERTISE:
1105  case msgs::Discovery::UNADVERTISE:
1106  case msgs::Discovery::NEW_CONNECTION:
1107  case msgs::Discovery::END_CONNECTION:
1108  {
1109  _pub.FillDiscovery(discoveryMsg);
1110  break;
1111  }
1112  case msgs::Discovery::SUBSCRIBE:
1113  {
1114  discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1115  break;
1116  }
1117  case msgs::Discovery::HEARTBEAT:
1118  case msgs::Discovery::BYE:
1119  break;
1120  default:
1121  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1122  << " type [" << _type << "]" << std::endl;
1123  return;
1124  }
1125 
1126  if (_destType == DestinationType::MULTICAST ||
1127  _destType == DestinationType::ALL)
1128  {
1129  this->SendMulticast(discoveryMsg);
1130  }
1131 
1132  // Send the discovery message to the unicast relays.
1133  if (_destType == DestinationType::UNICAST ||
1134  _destType == DestinationType::ALL)
1135  {
1136  // Set the RELAY flag in the header.
1137  discoveryMsg.mutable_flags()->set_relay(true);
1138  this->SendUnicast(discoveryMsg);
1139  }
1140 
1141  if (this->verbose)
1142  {
1143  std::cout << "\t* Sending " << msgs::ToString(_type)
1144  << " msg [" << _pub.Topic() << "]" << std::endl;
1145  }
1146  }
1147 
1150  private: void SendUnicast(const msgs::Discovery &_msg) const
1151  {
1152  uint16_t msgSize;
1153 
1154 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1155  size_t msgSizeFull = _msg.ByteSizeLong();
1156 #else
1157  int msgSizeFull = _msg.ByteSize();
1158 #endif
1159  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1160  {
1161  std::cerr << "Discovery message too large to send. Discovery won't "
1162  << "work. This shouldn't happen.\n";
1163  return;
1164  }
1165  msgSize = msgSizeFull;
1166 
1167  uint16_t totalSize = sizeof(msgSize) + msgSize;
1168  char *buffer = static_cast<char *>(new char[totalSize]);
1169  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1170 
1171  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1172  {
1173  // Send the discovery message to the unicast relays.
1174  for (const auto &sockAddr : this->relayAddrs)
1175  {
1176  errno = 0;
1177  auto sent = sendto(this->sockets.at(0),
1178  reinterpret_cast<const raw_type *>(
1179  reinterpret_cast<const unsigned char*>(buffer)),
1180  totalSize, 0,
1181  reinterpret_cast<const sockaddr *>(&sockAddr),
1182  sizeof(sockAddr));
1183 
1184  if (sent != totalSize)
1185  {
1186  std::cerr << "Exception sending a unicast message:" << std::endl;
1187  std::cerr << " Return value: " << sent << std::endl;
1188  std::cerr << " Error code: " << strerror(errno) << std::endl;
1189  break;
1190  }
1191  }
1192  }
1193  else
1194  {
1195  std::cerr << "Discovery::SendUnicast: Error serializing data."
1196  << std::endl;
1197  }
1198 
1199  delete [] buffer;
1200  }
1201 
1204  private: void SendMulticast(const msgs::Discovery &_msg) const
1205  {
1206  uint16_t msgSize;
1207 
1208 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1209  size_t msgSizeFull = _msg.ByteSizeLong();
1210 #else
1211  int msgSizeFull = _msg.ByteSize();
1212 #endif
1213  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1214  {
1215  std::cerr << "Discovery message too large to send. Discovery won't "
1216  << "work. This shouldn't happen.\n";
1217  return;
1218  }
1219 
1220  msgSize = msgSizeFull;
1221  uint16_t totalSize = sizeof(msgSize) + msgSize;
1222  char *buffer = static_cast<char *>(new char[totalSize]);
1223  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1224 
1225  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1226  {
1227  // Send the discovery message to the multicast group through all the
1228  // sockets.
1229  for (const auto &sock : this->Sockets())
1230  {
1231  errno = 0;
1232  if (sendto(sock, reinterpret_cast<const raw_type *>(
1233  reinterpret_cast<const unsigned char*>(buffer)),
1234  totalSize, 0,
1235  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1236  sizeof(*(this->MulticastAddr()))) != totalSize)
1237  {
1238  // Ignore EPERM and ENOBUFS errors.
1239  //
1240  // See issue #106
1241  //
1242  // Rationale drawn from:
1243  //
1244  // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1245  // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1246  if (errno != EPERM && errno != ENOBUFS)
1247  {
1248  std::cerr << "Exception sending a multicast message:"
1249  << strerror(errno) << std::endl;
1250  }
1251  break;
1252  }
1253  }
1254  }
1255  else
1256  {
1257  std::cerr << "Discovery::SendMulticast: Error serializing data."
1258  << std::endl;
1259  }
1260 
1261  delete [] buffer;
1262  }
1263 
1266  private: const std::vector<int> &Sockets() const
1267  {
1268  return this->sockets;
1269  }
1270 
1273  private: const sockaddr_in *MulticastAddr() const
1274  {
1275  return &this->mcastAddr;
1276  }
1277 
1280  private: uint8_t Version() const
1281  {
1282  static std::string ignStats;
1283  static int topicStats =
1284  (env("IGN_TRANSPORT_TOPIC_STATISTICS", ignStats) && ignStats == "1");
1285  return this->kWireVersion + (topicStats * 100);
1286  }
1287 
1292  private: bool RegisterNetIface(const std::string &_ip)
1293  {
1294  // Make a new socket for sending discovery information.
1295  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1296  if (sock < 0)
1297  {
1298  std::cerr << "Socket creation failed." << std::endl;
1299  return false;
1300  }
1301 
1302  // Socket option: IP_MULTICAST_IF.
1303  // This socket option needs to be applied to each socket used to send
1304  // data. This option selects the source interface for outgoing messages.
1305  struct in_addr ifAddr;
1306  ifAddr.s_addr = inet_addr(_ip.c_str());
1307  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1308  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1309  {
1310  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1311  << std::endl;
1312  return false;
1313  }
1314 
1315  this->sockets.push_back(sock);
1316 
1317  // Join the multicast group. We have to do it for each network interface
1318  // but we can do it on the same socket. We will use the socket at
1319  // position 0 for receiving multicast information.
1320  struct ip_mreq group;
1321  group.imr_multiaddr.s_addr =
1322  inet_addr(this->multicastGroup.c_str());
1323  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1324  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1325  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1326  {
1327  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1328  << std::endl;
1329  return false;
1330  }
1331 
1332  return true;
1333  }
1334 
1337  private: void AddRelayAddress(const std::string &_ip)
1338  {
1339  // Sanity check: Make sure that this IP address is not already saved.
1340  for (auto const &addr : this->relayAddrs)
1341  {
1342  if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
1343  return;
1344  }
1345 
1346  sockaddr_in addr;
1347  memset(&addr, 0, sizeof(addr));
1348  addr.sin_family = AF_INET;
1349  addr.sin_addr.s_addr = inet_addr(_ip.c_str());
1350  addr.sin_port = htons(static_cast<u_short>(this->port));
1351 
1352  this->relayAddrs.push_back(addr);
1353  }
1354 
1358  private: static const unsigned int kDefActivityInterval = 100;
1359 
1363  private: static const unsigned int kDefHeartbeatInterval = 1000;
1364 
1368  private: static const unsigned int kDefSilenceInterval = 3000;
1369 
1371  private: std::string multicastGroup;
1372 
1374  private: const int kTimeout = 250;
1375 
1377  private: static const uint16_t kMaxRcvStr =
1379 
1382  private: static const uint8_t kWireVersion = 10;
1383 
1385  private: int port;
1386 
1388  private: std::string hostAddr;
1389 
1391  private: std::vector<std::string> hostInterfaces;
1392 
1394  private: std::string pUuid;
1395 
1399  private: unsigned int silenceInterval;
1400 
1404  private: unsigned int activityInterval;
1405 
1409  private: unsigned int heartbeatInterval;
1410 
1412  private: DiscoveryCallback<Pub> connectionCb;
1413 
1415  private: DiscoveryCallback<Pub> disconnectionCb;
1416 
1418  private: DiscoveryCallback<Pub> registrationCb;
1419 
1421  private: DiscoveryCallback<Pub> unregistrationCb;
1422 
1424  private: TopicStorage<Pub> info;
1425 
1431 
1433  private: bool verbose;
1434 
1436  private: std::vector<int> sockets;
1437 
1439  private: sockaddr_in mcastAddr;
1440 
1442  private: std::vector<sockaddr_in> relayAddrs;
1443 
1445  private: mutable std::mutex mutex;
1446 
1448  private: std::thread threadReception;
1449 
1451  private: Timestamp timeNextHeartbeat;
1452 
1454  private: Timestamp timeNextActivity;
1455 
1457  private: std::mutex exitMutex;
1458 
1463  private: bool initialized;
1464 
1466  private: unsigned int numHeartbeatsUninitialized;
1467 
1469  private: mutable std::condition_variable initializedCv;
1470 
1472  private: bool exit;
1473 
1475  private: bool enabled;
1476  };
1477 
1481 
1485  }
1486  }
1487 }
1488 
1489 #endif
Topic/service available to any subscriber (default scope).
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:427
T empty(T... args)
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1430
void Unregister(const MessagePublisher &_pub) const
Unregister a node from this process as a remote subscriber.
Definition: Discovery.hh:395
This class stores all the information about a publisher. It stores the topic name that publishes...
Definition: Publisher.hh:44
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp/src/libros/network.cpp.
Topic/service only available to subscribers in the same machine as the publisher. ...
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:281
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:613
T endl(T... args)
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:479
void raw_type
Definition: Discovery.hh:45
T duration_cast(T... args)
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:117
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:604
T boolalpha(T... args)
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:516
STL class.
void UnregistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a remote node unsubscribes to a topic within this proces...
Definition: Discovery.hh:554
void Register(const MessagePublisher &_pub) const
Register a node from this process as a remote subscriber.
Definition: Discovery.hh:387
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
STL class.
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:250
T min(T... args)
A class for customizing the publication options for a topic or service advertised. E.g.: Set the scope of a topic/service.
Definition: AdvertiseOptions.hh:59
T push_back(T... args)
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:403
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers&#39; information known for a given topic.
Definition: Discovery.hh:413
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:458
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every &#39;activity interval&#39; milliseconds...
Definition: Discovery.hh:468
T erase(T... args)
std::vector< std::string > split(const std::string &_orig, char _delim)
split at a one character delimiter to get a vector of something
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:489
T max(T... args)
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:561
T find(T... args)
Discovery(const std::string &_pUuid, const std::string &_ip, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:125
This class stores all the information about a message publisher.
Definition: Publisher.hh:181
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active...
Definition: Discovery.hh:536
T c_str(T... args)
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected...
Definition: Discovery.hh:526
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:305
Topic/service only available to subscribers in the same process as the publisher. ...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:155
DestinationType
Options for sending discovery messages.
Definition: Discovery.hh:88
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:498
void RegistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a new remote node subscribes to a topic within this proc...
Definition: Discovery.hh:545
Definition: AdvertiseOptions.hh:28
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:507
STL class.
bool env(const std::string &_name, std::string &_value)
Find the environment variable &#39;_name&#39; and return its value.
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:337