Ignition Transport

API Reference

8.1.0
Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
20 #include <errno.h>
21 #include <string.h>
22 
23 #ifdef _WIN32
24  // For socket(), connect(), send(), and recv().
25  #include <Winsock2.h>
26  #include <Ws2def.h>
27  #include <Ws2ipdef.h>
28  #include <Ws2tcpip.h>
29  // Type used for raw data on this platform.
30  using raw_type = char;
31 #else
32  // For data types
33  #include <sys/types.h>
34  // For socket(), connect(), send(), and recv()
35  #include <sys/socket.h>
36  // For gethostbyname()
37  #include <netdb.h>
38  // For inet_addr()
39  #include <arpa/inet.h>
40  // For close()
41  #include <unistd.h>
42  // For sockaddr_in
43  #include <netinet/in.h>
44  // Type used for raw data on this platform
45  using raw_type = void;
46 #endif
47 
48 #ifdef _WIN32
49  #pragma warning(push, 0)
50 #endif
51 #ifdef _WIN32
52  #pragma warning(pop)
53  // Suppress "decorated name length exceed" warning in STL.
54  #pragma warning(disable: 4503)
55  // Suppress "depreted API warnings" in WINSOCK.
56  #pragma warning(disable: 4996)
57 #endif
58 
59 #include <ignition/msgs/discovery.pb.h>
60 
61 #include <algorithm>
62 #include <condition_variable>
63 #include <limits>
64 #include <map>
65 #include <memory>
66 #include <mutex>
67 #include <string>
68 #include <thread>
69 #include <vector>
70 
71 #include <ignition/msgs/Utility.hh>
72 
73 #include "ignition/transport/config.hh"
74 #include "ignition/transport/Export.hh"
80 
81 namespace ignition
82 {
83  namespace transport
84  {
85  // Inline bracket to help doxygen filtering.
86  inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
88  enum class DestinationType
89  {
91  UNICAST,
93  MULTICAST,
95  ALL
96  };
97 
98  //
104  bool IGNITION_TRANSPORT_VISIBLE pollSockets(
105  const std::vector<int> &_sockets,
106  const int _timeout);
107 
116  template<typename Pub>
117  class Discovery
118  {
124  public: Discovery(const std::string &_pUuid,
125  const int _port,
126  const bool _verbose = false)
127  : port(_port),
128  hostAddr(determineHost()),
129  pUuid(_pUuid),
130  silenceInterval(kDefSilenceInterval),
131  activityInterval(kDefActivityInterval),
132  heartbeatInterval(kDefHeartbeatInterval),
133  connectionCb(nullptr),
134  disconnectionCb(nullptr),
135  verbose(_verbose),
136  initialized(false),
137  numHeartbeatsUninitialized(0),
138  exit(false),
139  enabled(false)
140  {
141  std::string ignIp;
142  if (env("IGN_IP", ignIp) && !ignIp.empty())
143  this->hostInterfaces = {ignIp};
144  else
145  {
146  // Get the list of network interfaces in this host.
147  this->hostInterfaces = determineInterfaces();
148  }
149 
150 #ifdef _WIN32
151  WORD wVersionRequested;
152  WSADATA wsaData;
153 
154  // Request WinSock v2.2.
155  wVersionRequested = MAKEWORD(2, 2);
156  // Load WinSock DLL.
157  if (WSAStartup(wVersionRequested, &wsaData) != 0)
158  {
159  std::cerr << "Unable to load WinSock DLL" << std::endl;
160  return;
161  }
162 #endif
163  for (const auto &netIface : this->hostInterfaces)
164  {
165  auto succeed = this->RegisterNetIface(netIface);
166 
167  // If the IP address that we're selecting as the main IP address of
168  // the host is invalid, we change it to 127.0.0.1 .
169  // This is probably because IGN_IP is set to a wrong value.
170  if (netIface == this->hostAddr && !succeed)
171  {
172  this->RegisterNetIface("127.0.0.1");
173  std::cerr << "Did you set the environment variable IGN_IP with a "
174  << "correct IP address? " << std::endl
175  << " [" << netIface << "] seems an invalid local IP "
176  << "address." << std::endl
177  << " Using 127.0.0.1 as hostname." << std::endl;
178  this->hostAddr = "127.0.0.1";
179  }
180  }
181 
182  // Socket option: SO_REUSEADDR. This options is used only for receiving
183  // data. We can reuse the same socket for receiving multicast data from
184  // multiple interfaces. We will use the socket at position 0 for
185  // receiving data.
186  int reuseAddr = 1;
187  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
188  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
189  {
190  std::cerr << "Error setting socket option (SO_REUSEADDR)."
191  << std::endl;
192  return;
193  }
194 
195 #ifdef SO_REUSEPORT
196  // Socket option: SO_REUSEPORT. This options is used only for receiving
197  // data. We can reuse the same socket for receiving multicast data from
198  // multiple interfaces. We will use the socket at position 0 for
199  // receiving data.
200  int reusePort = 1;
201  // cppcheck-suppress ConfigurationNotChecked
202  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
203  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
204  {
205  std::cerr << "Error setting socket option (SO_REUSEPORT)."
206  << std::endl;
207  return;
208  }
209 #endif
210  // Bind the first socket to the discovery port.
211  sockaddr_in localAddr;
212  memset(&localAddr, 0, sizeof(localAddr));
213  localAddr.sin_family = AF_INET;
214  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
215  localAddr.sin_port = htons(static_cast<u_short>(this->port));
216 
217  if (bind(this->sockets.at(0),
218  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
219  {
220  std::cerr << "Binding to a local port failed." << std::endl;
221  return;
222  }
223 
224  // Set 'mcastAddr' to the multicast discovery group.
225  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
226  this->mcastAddr.sin_family = AF_INET;
227  this->mcastAddr.sin_addr.s_addr =
228  inet_addr(this->kMulticastGroup.c_str());
229  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
230 
232  std::string ignRelay = "";
233  if (env("IGN_RELAY", ignRelay) && !ignRelay.empty())
234  {
235  relays = transport::split(ignRelay, ':');
236  }
237 
238  // Register all unicast relays.
239  for (auto const &relayAddr : relays)
240  this->AddRelayAddress(relayAddr);
241 
242  if (this->verbose)
243  this->PrintCurrentState();
244  }
245 
247  public: virtual ~Discovery()
248  {
249  // Tell the service thread to terminate.
250  this->exitMutex.lock();
251  this->exit = true;
252  this->exitMutex.unlock();
253 
254  // Wait for the service threads to finish before exit.
255  if (this->threadReception.joinable())
256  this->threadReception.join();
257 
258  // Broadcast a BYE message to trigger the remote cancellation of
259  // all our advertised topics.
260  this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
261  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
262 
263  // Close sockets.
264  for (const auto &sock : this->sockets)
265  {
266 #ifdef _WIN32
267  closesocket(sock);
268  WSACleanup();
269 #else
270  close(sock);
271 #endif
272  }
273  }
274 
278  public: void Start()
279  {
280  {
281  std::lock_guard<std::mutex> lock(this->mutex);
282 
283  // The service is already running.
284  if (this->enabled)
285  return;
286 
287  this->enabled = true;
288  }
289 
290  auto now = std::chrono::steady_clock::now();
291  this->timeNextHeartbeat = now;
292  this->timeNextActivity = now;
293 
294  // Start the thread that receives discovery information.
295  this->threadReception = std::thread(&Discovery::RecvMessages, this);
296  }
297 
302  public: bool Advertise(const Pub &_publisher)
303  {
304  {
305  std::lock_guard<std::mutex> lock(this->mutex);
306 
307  if (!this->enabled)
308  return false;
309 
310  // Add the addressing information (local publisher).
311  if (!this->info.AddPublisher(_publisher))
312  return false;
313  }
314 
315  // Only advertise a message outside this process if the scope
316  // is not 'Process'
317  if (_publisher.Options().Scope() != Scope_t::PROCESS)
318  this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
319  _publisher);
320 
321  return true;
322  }
323 
334  public: bool Discover(const std::string &_topic) const
335  {
337  bool found;
338  Addresses_M<Pub> addresses;
339 
340  {
341  std::lock_guard<std::mutex> lock(this->mutex);
342 
343  if (!this->enabled)
344  return false;
345 
346  cb = this->connectionCb;
347  }
348 
349  Pub pub;
350  pub.SetTopic(_topic);
351  pub.SetPUuid(this->pUuid);
352 
353  // Send a discovery request.
354  this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
355 
356  {
357  std::lock_guard<std::mutex> lock(this->mutex);
358  found = this->info.Publishers(_topic, addresses);
359  }
360 
361  if (found)
362  {
363  // I already have information about this topic.
364  for (const auto &proc : addresses)
365  {
366  for (const auto &node : proc.second)
367  {
368  if (cb)
369  {
370  // Execute the user's callback for a service request. Notice
371  // that we only execute one callback for preventing receive
372  // multiple service responses for a single request.
373  cb(node);
374  }
375  }
376  }
377  }
378 
379  return true;
380  }
381 
384  public: void Register(const MessagePublisher &_pub) const
385  {
386  this->SendMsg(
387  DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
388  }
389 
392  public: void Unregister(const MessagePublisher &_pub) const
393  {
394  this->SendMsg(
395  DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
396  }
397 
400  public: const TopicStorage<Pub> &Info() const
401  {
402  std::lock_guard<std::mutex> lock(this->mutex);
403  return this->info;
404  }
405 
410  public: bool Publishers(const std::string &_topic,
411  Addresses_M<Pub> &_publishers) const
412  {
413  std::lock_guard<std::mutex> lock(this->mutex);
414  return this->info.Publishers(_topic, _publishers);
415  }
416 
424  public: bool Unadvertise(const std::string &_topic,
425  const std::string &_nUuid)
426  {
427  Pub inf;
428  {
429  std::lock_guard<std::mutex> lock(this->mutex);
430 
431  if (!this->enabled)
432  return false;
433 
434  // Don't do anything if the topic is not advertised by any of my nodes
435  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
436  return true;
437 
438  // Remove the topic information.
439  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
440  }
441 
442  // Only unadvertise a message outside this process if the scope
443  // is not 'Process'.
444  if (inf.Options().Scope() != Scope_t::PROCESS)
445  {
446  this->SendMsg(DestinationType::ALL,
447  msgs::Discovery::UNADVERTISE, inf);
448  }
449 
450  return true;
451  }
452 
455  public: std::string HostAddr() const
456  {
457  std::lock_guard<std::mutex> lock(this->mutex);
458  return this->hostAddr;
459  }
460 
465  public: unsigned int ActivityInterval() const
466  {
467  std::lock_guard<std::mutex> lock(this->mutex);
468  return this->activityInterval;
469  }
470 
476  public: unsigned int HeartbeatInterval() const
477  {
478  std::lock_guard<std::mutex> lock(this->mutex);
479  return this->heartbeatInterval;
480  }
481 
486  public: unsigned int SilenceInterval() const
487  {
488  std::lock_guard<std::mutex> lock(this->mutex);
489  return this->silenceInterval;
490  }
491 
495  public: void SetActivityInterval(const unsigned int _ms)
496  {
497  std::lock_guard<std::mutex> lock(this->mutex);
498  this->activityInterval = _ms;
499  }
500 
504  public: void SetHeartbeatInterval(const unsigned int _ms)
505  {
506  std::lock_guard<std::mutex> lock(this->mutex);
507  this->heartbeatInterval = _ms;
508  }
509 
513  public: void SetSilenceInterval(const unsigned int _ms)
514  {
515  std::lock_guard<std::mutex> lock(this->mutex);
516  this->silenceInterval = _ms;
517  }
518 
523  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
524  {
525  std::lock_guard<std::mutex> lock(this->mutex);
526  this->connectionCb = _cb;
527  }
528 
533  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
534  {
535  std::lock_guard<std::mutex> lock(this->mutex);
536  this->disconnectionCb = _cb;
537  }
538 
542  public: void RegistrationsCb(const DiscoveryCallback<Pub> &_cb)
543  {
544  std::lock_guard<std::mutex> lock(this->mutex);
545  this->registrationCb = _cb;
546  }
547 
551  public: void UnregistrationsCb(const DiscoveryCallback<Pub> &_cb)
552  {
553  std::lock_guard<std::mutex> lock(this->mutex);
554  this->unregistrationCb = _cb;
555  }
556 
558  public: void PrintCurrentState() const
559  {
560  std::lock_guard<std::mutex> lock(this->mutex);
561 
562  std::cout << "---------------" << std::endl;
563  std::cout << std::boolalpha << "Enabled: "
564  << this->enabled << std::endl;
565  std::cout << "Discovery state" << std::endl;
566  std::cout << "\tUUID: " << this->pUuid << std::endl;
567  std::cout << "Settings" << std::endl;
568  std::cout << "\tActivity: " << this->activityInterval
569  << " ms." << std::endl;
570  std::cout << "\tHeartbeat: " << this->heartbeatInterval
571  << "ms." << std::endl;
572  std::cout << "\tSilence: " << this->silenceInterval
573  << " ms." << std::endl;
574  std::cout << "Known information:" << std::endl;
575  this->info.Print();
576 
577  // Used to calculate the elapsed time.
579 
580  std::cout << "Activity" << std::endl;
581  if (this->activity.empty())
582  std::cout << "\t<empty>" << std::endl;
583  else
584  {
585  for (auto &proc : this->activity)
586  {
587  // Elapsed time since the last update from this publisher.
588  std::chrono::duration<double> elapsed = now - proc.second;
589 
590  std::cout << "\t" << proc.first << std::endl;
591  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
592  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
593  << std::endl;
594  }
595  }
596  std::cout << "---------------" << std::endl;
597  }
598 
601  public: void TopicList(std::vector<std::string> &_topics) const
602  {
603  this->WaitForInit();
604  std::lock_guard<std::mutex> lock(this->mutex);
605  this->info.TopicList(_topics);
606  }
607 
610  public: void WaitForInit() const
611  {
612  std::unique_lock<std::mutex> lk(this->mutex);
613 
614  if (!this->initialized)
615  {
616  this->initializedCv.wait(lk, [this]{return this->initialized;});
617  }
618  }
619 
623  private: void UpdateActivity()
624  {
625  // The UUIDs of the processes that have expired.
627 
628  // A copy of the disconnection callback.
629  DiscoveryCallback<Pub> disconnectCb;
630 
632 
633  {
634  std::lock_guard<std::mutex> lock(this->mutex);
635 
636  if (now < this->timeNextActivity)
637  return;
638 
639  disconnectCb = this->disconnectionCb;
640 
641  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
642  {
643  // Elapsed time since the last update from this publisher.
644  auto elapsed = now - it->second;
645 
646  // This publisher has expired.
647  if (std::chrono::duration_cast<std::chrono::milliseconds>
648  (elapsed).count() > this->silenceInterval)
649  {
650  // Remove all the info entries for this process UUID.
651  this->info.DelPublishersByProc(it->first);
652 
653  uuids.push_back(it->first);
654 
655  // Remove the activity entry.
656  this->activity.erase(it++);
657  }
658  else
659  ++it;
660  }
661 
662  this->timeNextActivity = std::chrono::steady_clock::now() +
663  std::chrono::milliseconds(this->activityInterval);
664  }
665 
666  if (!disconnectCb)
667  return;
668 
669  // Notify without topic information. This is useful to inform the
670  // client that a remote node is gone, even if we were not
671  // interested in its topics.
672  for (auto const &uuid : uuids)
673  {
674  Pub publisher;
675  publisher.SetPUuid(uuid);
676  disconnectCb(publisher);
677  }
678  }
679 
681  private: void UpdateHeartbeat()
682  {
684 
685  {
686  std::lock_guard<std::mutex> lock(this->mutex);
687 
688  if (now < this->timeNextHeartbeat)
689  return;
690  }
691 
692  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
693  this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
694 
696  {
697  std::lock_guard<std::mutex> lock(this->mutex);
698 
699  // Re-advertise topics that are advertised inside this process.
700  this->info.PublishersByProc(this->pUuid, nodes);
701  }
702 
703  for (const auto &topic : nodes)
704  {
705  for (const auto &node : topic.second)
706  {
707  this->SendMsg(DestinationType::ALL,
708  msgs::Discovery::ADVERTISE, node);
709  }
710  }
711 
712  {
713  std::lock_guard<std::mutex> lock(this->mutex);
714  if (!this->initialized)
715  {
716  ++this->numHeartbeatsUninitialized;
717  if (this->numHeartbeatsUninitialized == 2)
718  {
719  // We consider the discovery initialized after two cycles of
720  // heartbeats sent.
721  this->initialized = true;
722 
723  // Notify anyone waiting for the initialization phase to finish.
724  this->initializedCv.notify_all();
725  }
726  }
727 
728  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
729  std::chrono::milliseconds(this->heartbeatInterval);
730  }
731  }
732 
742  private: int NextTimeout() const
743  {
744  auto now = std::chrono::steady_clock::now();
745  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
746  auto timeUntilNextActivity = this->timeNextActivity - now;
747 
748  int t = static_cast<int>(
750  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
751  int t2 = std::min(t, this->kTimeout);
752  return std::max(t2, 0);
753  }
754 
756  private: void RecvMessages()
757  {
758  bool timeToExit = false;
759  while (!timeToExit)
760  {
761  // Calculate the timeout.
762  int timeout = this->NextTimeout();
763 
764  if (pollSockets(this->sockets, timeout))
765  {
766  this->RecvDiscoveryUpdate();
767 
768  if (this->verbose)
769  this->PrintCurrentState();
770  }
771 
772  this->UpdateHeartbeat();
773  this->UpdateActivity();
774 
775  // Is it time to exit?
776  {
777  std::lock_guard<std::mutex> lock(this->exitMutex);
778  if (this->exit)
779  timeToExit = true;
780  }
781  }
782  }
783 
785  private: void RecvDiscoveryUpdate()
786  {
787  char rcvStr[Discovery::kMaxRcvStr];
788  sockaddr_in clntAddr;
789  socklen_t addrLen = sizeof(clntAddr);
790 
791  uint16_t received = recvfrom(this->sockets.at(0),
792  reinterpret_cast<raw_type *>(rcvStr),
793  this->kMaxRcvStr, 0,
794  reinterpret_cast<sockaddr *>(&clntAddr),
795  reinterpret_cast<socklen_t *>(&addrLen));
796  if (received > 0)
797  {
798  uint16_t len = 0;
799  memcpy(&len, &rcvStr[0], sizeof(len));
800 
801  // Ignition Transport delimits each discovery message with a
802  // frame_delimiter that contains byte size information.
803  // A discovery message has the form:
804  //
805  // <frame_delimiter><frame_body>
806  //
807  // Ignition Transport version < 8 sends a frame delimiter that
808  // contains the value of sizeof(frame_delimiter)
809  // + sizeof(frame_body). In other words, the frame_delimiter
810  // contains a value that represents the total size of the
811  // frame_body and frame_delimiter in bytes.
812  //
813  // Ignition Transport version >= 8 sends a frame_delimiter
814  // that contains the value of sizeof(frame_body). In other
815  // words, the frame_delimiter contains a value that represents
816  // the total size of only the frame_body.
817  //
818  // It is possible that two incompatible versions of Ignition
819  // Transport exist on the same network. If we receive an
820  // unexpected size, then we ignore the message.
821 
822  // If-condition for version 8+
823  if (len + sizeof(len) == received)
824  {
825  std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
826  uint16_t srcPort = ntohs(clntAddr.sin_port);
827 
828  if (this->verbose)
829  {
830  std::cout << "\nReceived discovery update from "
831  << srcAddr << ": " << srcPort << std::endl;
832  }
833 
834  this->DispatchDiscoveryMsg(srcAddr, rcvStr + sizeof(len), len);
835  }
836  }
837  else if (received < 0)
838  {
839  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
840  << std::endl;
841  }
842  }
843 
848  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
849  char *_msg, uint16_t _len)
850  {
851  ignition::msgs::Discovery msg;
852 
853  // Parse the message, and return if parsing failed. Parsing could
854  // fail when another discovery node is publishing messages using an
855  // older (or newer) format.
856  if (!msg.ParseFromArray(_msg, _len))
857  return;
858 
859  // Discard the message if the wire protocol is different than mine.
860  if (this->Version() != msg.version())
861  return;
862 
863  std::string recvPUuid = msg.process_uuid();
864 
865  // Discard our own discovery messages.
866  if (recvPUuid == this->pUuid)
867  return;
868 
869  // Forwarding summary:
870  // - From a unicast peer -> to multicast group (with NO_RELAY flag).
871  // - From multicast group -> to unicast peers (with RELAY flag).
872 
873  // If the RELAY flag is set, this discovery message is coming via a
874  // unicast transmission. In this case, we don't process it, we just
875  // forward it to the multicast group, and it will be dispatched once
876  // received there. Note that we also unset the RELAY flag and set the
877  // NO_RELAY flag, to avoid forwarding the message anymore.
878  if (msg.has_flags() && msg.flags().relay())
879  {
880  // Unset the RELAY flag in the header and set the NO_RELAY.
881  msg.mutable_flags()->set_relay(false);
882  msg.mutable_flags()->set_no_relay(true);
883  this->SendMulticast(msg);
884 
885  // A unicast peer contacted me. I need to save its address for
886  // sending future messages in the future.
887  this->AddRelayAddress(_fromIp);
888  return;
889  }
890  // If we are receiving this discovery message via the multicast channel
891  // and the NO_RELAY flag is not set, we forward this message via unicast
892  // to all our relays. Note that this is the most common case, where we
893  // receive a regular multicast message and we forward it to any remote
894  // relays.
895  else if (!msg.has_flags() || !msg.flags().no_relay())
896  {
897  msg.mutable_flags()->set_relay(true);
898  this->SendUnicast(msg);
899  }
900 
901  // Update timestamp and cache the callbacks.
902  DiscoveryCallback<Pub> connectCb;
903  DiscoveryCallback<Pub> disconnectCb;
904  DiscoveryCallback<Pub> registerCb;
905  DiscoveryCallback<Pub> unregisterCb;
906  {
907  std::lock_guard<std::mutex> lock(this->mutex);
908  this->activity[recvPUuid] = std::chrono::steady_clock::now();
909  connectCb = this->connectionCb;
910  disconnectCb = this->disconnectionCb;
911  registerCb = this->registrationCb;
912  unregisterCb = this->unregistrationCb;
913  }
914 
915  switch (msg.type())
916  {
917  case msgs::Discovery::ADVERTISE:
918  {
919  // Read the rest of the fields.
920  Pub publisher;
921  publisher.SetFromDiscovery(msg);
922 
923  // Check scope of the topic.
924  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
925  (publisher.Options().Scope() == Scope_t::HOST &&
926  _fromIp != this->hostAddr))
927  {
928  return;
929  }
930 
931  // Register an advertised address for the topic.
932  bool added;
933  {
934  std::lock_guard<std::mutex> lock(this->mutex);
935  added = this->info.AddPublisher(publisher);
936  }
937 
938  if (added && connectCb)
939  {
940  // Execute the client's callback.
941  connectCb(publisher);
942  }
943 
944  break;
945  }
946  case msgs::Discovery::SUBSCRIBE:
947  {
948  std::string recvTopic;
949  // Read the topic information.
950  if (msg.has_sub())
951  {
952  recvTopic = msg.sub().topic();
953  }
954  else
955  {
956  std::cerr << "Subscription discovery message is missing "
957  << "Subscriber information.\n";
958  break;
959  }
960 
961  // Check if at least one of my nodes advertises the topic requested.
962  Addresses_M<Pub> addresses;
963  {
964  std::lock_guard<std::mutex> lock(this->mutex);
965  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
966  {
967  break;
968  }
969 
970  if (!this->info.Publishers(recvTopic, addresses))
971  break;
972  }
973 
974  for (const auto &nodeInfo : addresses[this->pUuid])
975  {
976  // Check scope of the topic.
977  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
978  (nodeInfo.Options().Scope() == Scope_t::HOST &&
979  _fromIp != this->hostAddr))
980  {
981  continue;
982  }
983 
984  // Answer an ADVERTISE message.
985  this->SendMsg(DestinationType::ALL,
986  msgs::Discovery::ADVERTISE, nodeInfo);
987  }
988 
989  break;
990  }
991  case msgs::Discovery::NEW_CONNECTION:
992  {
993  // Read the rest of the fields.
994  Pub publisher;
995  publisher.SetFromDiscovery(msg);
996 
997  if (registerCb)
998  registerCb(publisher);
999 
1000  break;
1001  }
1002  case msgs::Discovery::END_CONNECTION:
1003  {
1004  // Read the rest of the fields.
1005  Pub publisher;
1006  publisher.SetFromDiscovery(msg);
1007 
1008  if (unregisterCb)
1009  unregisterCb(publisher);
1010 
1011  break;
1012  }
1013  case msgs::Discovery::HEARTBEAT:
1014  {
1015  // The timestamp has already been updated.
1016  break;
1017  }
1018  case msgs::Discovery::BYE:
1019  {
1020  // Remove the activity entry for this publisher.
1021  {
1022  std::lock_guard<std::mutex> lock(this->mutex);
1023  this->activity.erase(recvPUuid);
1024  }
1025 
1026  if (disconnectCb)
1027  {
1028  Pub pub;
1029  pub.SetPUuid(recvPUuid);
1030  // Notify the new disconnection.
1031  disconnectCb(pub);
1032  }
1033 
1034  // Remove the address entry for this topic.
1035  {
1036  std::lock_guard<std::mutex> lock(this->mutex);
1037  this->info.DelPublishersByProc(recvPUuid);
1038  }
1039 
1040  break;
1041  }
1042  case msgs::Discovery::UNADVERTISE:
1043  {
1044  // Read the address.
1045  Pub publisher;
1046  publisher.SetFromDiscovery(msg);
1047 
1048  // Check scope of the topic.
1049  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1050  (publisher.Options().Scope() == Scope_t::HOST &&
1051  _fromIp != this->hostAddr))
1052  {
1053  return;
1054  }
1055 
1056  if (disconnectCb)
1057  {
1058  // Notify the new disconnection.
1059  disconnectCb(publisher);
1060  }
1061 
1062  // Remove the address entry for this topic.
1063  {
1064  std::lock_guard<std::mutex> lock(this->mutex);
1065  this->info.DelPublisherByNode(publisher.Topic(),
1066  publisher.PUuid(), publisher.NUuid());
1067  }
1068 
1069  break;
1070  }
1071  default:
1072  {
1073  std::cerr << "Unknown message type [" << msg.type() << "].\n";
1074  break;
1075  }
1076  }
1077  }
1078 
1085  private: template<typename T>
1086  void SendMsg(const DestinationType &_destType,
1087  const msgs::Discovery::Type _type,
1088  const T &_pub) const
1089  {
1090  ignition::msgs::Discovery discoveryMsg;
1091  discoveryMsg.set_version(this->Version());
1092  discoveryMsg.set_type(_type);
1093  discoveryMsg.set_process_uuid(this->pUuid);
1094 
1095  switch (_type)
1096  {
1097  case msgs::Discovery::ADVERTISE:
1098  case msgs::Discovery::UNADVERTISE:
1099  case msgs::Discovery::NEW_CONNECTION:
1100  case msgs::Discovery::END_CONNECTION:
1101  {
1102  _pub.FillDiscovery(discoveryMsg);
1103  break;
1104  }
1105  case msgs::Discovery::SUBSCRIBE:
1106  {
1107  discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1108  break;
1109  }
1110  case msgs::Discovery::HEARTBEAT:
1111  case msgs::Discovery::BYE:
1112  break;
1113  default:
1114  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1115  << " type [" << _type << "]" << std::endl;
1116  return;
1117  }
1118 
1119  if (_destType == DestinationType::MULTICAST ||
1120  _destType == DestinationType::ALL)
1121  {
1122  this->SendMulticast(discoveryMsg);
1123  }
1124 
1125  // Send the discovery message to the unicast relays.
1126  if (_destType == DestinationType::UNICAST ||
1127  _destType == DestinationType::ALL)
1128  {
1129  // Set the RELAY flag in the header.
1130  discoveryMsg.mutable_flags()->set_relay(true);
1131  this->SendUnicast(discoveryMsg);
1132  }
1133 
1134  if (this->verbose)
1135  {
1136  std::cout << "\t* Sending " << msgs::ToString(_type)
1137  << " msg [" << _pub.Topic() << "]" << std::endl;
1138  }
1139  }
1140 
1143  private: void SendUnicast(const msgs::Discovery &_msg) const
1144  {
1145  uint16_t msgSize;
1146 
1147 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1148  size_t msgSizeFull = _msg.ByteSizeLong();
1149 #else
1150  int msgSizeFull = _msg.ByteSize();
1151 #endif
1152  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1153  {
1154  std::cerr << "Discovery message too large to send. Discovery won't "
1155  << "work. This shouldn't happen.\n";
1156  return;
1157  }
1158  msgSize = msgSizeFull;
1159 
1160  uint16_t totalSize = sizeof(msgSize) + msgSize;
1161  char *buffer = static_cast<char *>(new char[totalSize]);
1162  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1163 
1164  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1165  {
1166  // Send the discovery message to the unicast relays.
1167  for (const auto &sockAddr : this->relayAddrs)
1168  {
1169  auto sent = sendto(this->sockets.at(0),
1170  reinterpret_cast<const raw_type *>(
1171  reinterpret_cast<const unsigned char*>(buffer)),
1172  totalSize, 0,
1173  reinterpret_cast<const sockaddr *>(&sockAddr),
1174  sizeof(sockAddr));
1175 
1176  if (sent != totalSize)
1177  {
1178  std::cerr << "Exception sending a unicast message" << std::endl;
1179  break;
1180  }
1181  }
1182  }
1183  else
1184  {
1185  std::cerr << "Discovery::SendUnicast: Error serializing data."
1186  << std::endl;
1187  }
1188 
1189  delete [] buffer;
1190  }
1191 
1194  private: void SendMulticast(const msgs::Discovery &_msg) const
1195  {
1196  uint16_t msgSize;
1197 
1198 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1199  size_t msgSizeFull = _msg.ByteSizeLong();
1200 #else
1201  int msgSizeFull = _msg.ByteSize();
1202 #endif
1203  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1204  {
1205  std::cerr << "Discovery message too large to send. Discovery won't "
1206  << "work. This shouldn't happen.\n";
1207  return;
1208  }
1209 
1210  msgSize = msgSizeFull;
1211  uint16_t totalSize = sizeof(msgSize) + msgSize;
1212  char *buffer = static_cast<char *>(new char[totalSize]);
1213  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1214 
1215  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1216  {
1217  // Send the discovery message to the multicast group through all the
1218  // sockets.
1219  for (const auto &sock : this->Sockets())
1220  {
1221  errno = 0;
1222  if (sendto(sock, reinterpret_cast<const raw_type *>(
1223  reinterpret_cast<const unsigned char*>(buffer)),
1224  totalSize, 0,
1225  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1226  sizeof(*(this->MulticastAddr()))) != totalSize)
1227  {
1228  // Ignore EPERM and ENOBUFS errors.
1229  //
1230  // See issue #106
1231  //
1232  // Rationale drawn from:
1233  //
1234  // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1235  // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1236  if (errno != EPERM && errno != ENOBUFS)
1237  {
1238  std::cerr << "Exception sending a multicast message:"
1239  << strerror(errno) << std::endl;
1240  }
1241  break;
1242  }
1243  }
1244  }
1245  else
1246  {
1247  std::cerr << "Discovery::SendMulticast: Error serializing data."
1248  << std::endl;
1249  }
1250 
1251  delete [] buffer;
1252  }
1253 
1256  private: const std::vector<int> &Sockets() const
1257  {
1258  return this->sockets;
1259  }
1260 
1263  private: const sockaddr_in *MulticastAddr() const
1264  {
1265  return &this->mcastAddr;
1266  }
1267 
1270  private: uint8_t Version() const
1271  {
1272  static std::string ignStats;
1273  static int topicStats =
1274  (env("IGN_TRANSPORT_TOPIC_STATISTICS", ignStats) && ignStats == "1");
1275  return this->kWireVersion + (topicStats * 100);
1276  }
1277 
1282  private: bool RegisterNetIface(const std::string &_ip)
1283  {
1284  // Make a new socket for sending discovery information.
1285  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1286  if (sock < 0)
1287  {
1288  std::cerr << "Socket creation failed." << std::endl;
1289  return false;
1290  }
1291 
1292  // Socket option: IP_MULTICAST_IF.
1293  // This socket option needs to be applied to each socket used to send
1294  // data. This option selects the source interface for outgoing messages.
1295  struct in_addr ifAddr;
1296  ifAddr.s_addr = inet_addr(_ip.c_str());
1297  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1298  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1299  {
1300  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1301  << std::endl;
1302  return false;
1303  }
1304 
1305  this->sockets.push_back(sock);
1306 
1307  // Join the multicast group. We have to do it for each network interface
1308  // but we can do it on the same socket. We will use the socket at
1309  // position 0 for receiving multicast information.
1310  struct ip_mreq group;
1311  group.imr_multiaddr.s_addr =
1312  inet_addr(this->kMulticastGroup.c_str());
1313  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1314  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1315  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1316  {
1317  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1318  << std::endl;
1319  return false;
1320  }
1321 
1322  return true;
1323  }
1324 
1327  private: void AddRelayAddress(const std::string &_ip)
1328  {
1329  // Sanity check: Make sure that this IP address is not already saved.
1330  for (auto const &addr : this->relayAddrs)
1331  {
1332  if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
1333  return;
1334  }
1335 
1336  sockaddr_in addr;
1337  memset(&addr, 0, sizeof(addr));
1338  addr.sin_family = AF_INET;
1339  addr.sin_addr.s_addr = inet_addr(_ip.c_str());
1340  addr.sin_port = htons(static_cast<u_short>(this->port));
1341 
1342  this->relayAddrs.push_back(addr);
1343  }
1344 
1348  private: static const unsigned int kDefActivityInterval = 100;
1349 
1353  private: static const unsigned int kDefHeartbeatInterval = 1000;
1354 
1358  private: static const unsigned int kDefSilenceInterval = 3000;
1359 
1361  private: const std::string kMulticastGroup = "224.0.0.7";
1362 
1364  private: const int kTimeout = 250;
1365 
1367  private: static const uint16_t kMaxRcvStr =
1369 
1372  private: static const uint8_t kWireVersion = 10;
1373 
1375  private: int port;
1376 
1378  private: std::string hostAddr;
1379 
1381  private: std::vector<std::string> hostInterfaces;
1382 
1384  private: std::string pUuid;
1385 
1389  private: unsigned int silenceInterval;
1390 
1394  private: unsigned int activityInterval;
1395 
1399  private: unsigned int heartbeatInterval;
1400 
1402  private: DiscoveryCallback<Pub> connectionCb;
1403 
1405  private: DiscoveryCallback<Pub> disconnectionCb;
1406 
1409  private: inline static DiscoveryCallback<Pub> registrationCb;
1410 
1413  private: inline static DiscoveryCallback<Pub> unregistrationCb;
1414 
1416  private: TopicStorage<Pub> info;
1417 
1423 
1425  private: bool verbose;
1426 
1428  private: std::vector<int> sockets;
1429 
1431  private: sockaddr_in mcastAddr;
1432 
1434  private: std::vector<sockaddr_in> relayAddrs;
1435 
1437  private: mutable std::mutex mutex;
1438 
1440  private: std::thread threadReception;
1441 
1443  private: Timestamp timeNextHeartbeat;
1444 
1446  private: Timestamp timeNextActivity;
1447 
1449  private: std::mutex exitMutex;
1450 
1455  private: bool initialized;
1456 
1458  private: unsigned int numHeartbeatsUninitialized;
1459 
1461  private: mutable std::condition_variable initializedCv;
1462 
1464  private: bool exit;
1465 
1467  private: bool enabled;
1468  };
1469 
1473 
1477  }
1478  }
1479 }
1480 
1481 #endif
Topic/service available to any subscriber (default scope).
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:424
T empty(T... args)
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1422
void Unregister(const MessagePublisher &_pub) const
Unregister a node from this process as a remote subscriber.
Definition: Discovery.hh:392
This class stores all the information about a publisher. It stores the topic name that publishes...
Definition: Publisher.hh:44
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp/src/libros/network.cpp.
Topic/service only available to subscribers in the same machine as the publisher. ...
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:278
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:610
T endl(T... args)
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:476
void raw_type
Definition: Discovery.hh:45
T duration_cast(T... args)
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:117
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:601
T boolalpha(T... args)
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:513
STL class.
void UnregistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a remote node unsubscribes to a topic within this proces...
Definition: Discovery.hh:551
void Register(const MessagePublisher &_pub) const
Register a node from this process as a remote subscriber.
Definition: Discovery.hh:384
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
STL class.
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:247
T min(T... args)
A class for customizing the publication options for a topic or service advertised. E.g.: Set the scope of a topic/service.
Definition: AdvertiseOptions.hh:59
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:124
T push_back(T... args)
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:400
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers&#39; information known for a given topic.
Definition: Discovery.hh:410
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:455
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every &#39;activity interval&#39; milliseconds...
Definition: Discovery.hh:465
T erase(T... args)
std::vector< std::string > split(const std::string &_orig, char _delim)
split at a one character delimiter to get a vector of something
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:486
T max(T... args)
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:558
This class stores all the information about a message publisher.
Definition: Publisher.hh:222
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active...
Definition: Discovery.hh:533
T c_str(T... args)
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected...
Definition: Discovery.hh:523
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:302
Topic/service only available to subscribers in the same process as the publisher. ...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:155
DestinationType
Options for sending discovery messages.
Definition: Discovery.hh:88
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:495
void RegistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a new remote node subscribes to a topic within this proc...
Definition: Discovery.hh:542
Definition: AdvertiseOptions.hh:28
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:504
STL class.
bool env(const std::string &_name, std::string &_value)
Find the environment variable &#39;_name&#39; and return its value.
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:334