Gazebo Transport

API Reference

12.2.2
gz/transport/Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GZ_TRANSPORT_DISCOVERY_HH_
19 #define GZ_TRANSPORT_DISCOVERY_HH_
20 #include <errno.h>
21 #include <string.h>
22 
23 #ifdef _WIN32
24  // For socket(), connect(), send(), and recv().
25  #include <Winsock2.h>
26  #include <Ws2def.h>
27  #include <Ws2ipdef.h>
28  #include <Ws2tcpip.h>
29  // Type used for raw data on this platform.
30  using raw_type = char;
31 #else
32  // For data types
33  #include <sys/types.h>
34  // For socket(), connect(), send(), and recv()
35  #include <sys/socket.h>
36  // For gethostbyname()
37  #include <netdb.h>
38  // For inet_addr()
39  #include <arpa/inet.h>
40  // For close()
41  #include <unistd.h>
42  // For sockaddr_in
43  #include <netinet/in.h>
44  // Type used for raw data on this platform
45  using raw_type = void;
46 #endif
47 
48 #ifdef _WIN32
49  #pragma warning(push, 0)
50 #endif
51 #ifdef _WIN32
52  #pragma warning(pop)
53  // Suppress "decorated name length exceed" warning in STL.
54  #pragma warning(disable: 4503)
55  // Suppress "depreted API warnings" in WINSOCK.
56  #pragma warning(disable: 4996)
57 #endif
58 
59 #include <gz/msgs/discovery.pb.h>
60 
61 #include <algorithm>
62 #include <condition_variable>
63 #include <limits>
64 #include <map>
65 #include <memory>
66 #include <mutex>
67 #include <string>
68 #include <thread>
69 #include <vector>
70 
71 #include <gz/msgs/Utility.hh>
72 
73 #include "gz/transport/config.hh"
74 #include "gz/transport/Export.hh"
75 #include "gz/transport/Helpers.hh"
76 #include "gz/transport/NetUtils.hh"
80 
81 namespace gz
82 {
83  namespace transport
84  {
85  // Inline bracket to help doxygen filtering.
86  inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
88  enum class DestinationType
89  {
91  UNICAST,
93  MULTICAST,
95  ALL
96  };
97 
98  //
104  bool GZ_TRANSPORT_VISIBLE pollSockets(
105  const std::vector<int> &_sockets,
106  const int _timeout);
107 
116  template<typename Pub>
117  class Discovery
118  {
125  public: Discovery(const std::string &_pUuid,
126  const std::string &_ip,
127  const int _port,
128  const bool _verbose = false)
129  : multicastGroup(_ip),
130  port(_port),
131  hostAddr(determineHost()),
132  pUuid(_pUuid),
133  silenceInterval(kDefSilenceInterval),
134  activityInterval(kDefActivityInterval),
135  heartbeatInterval(kDefHeartbeatInterval),
136  connectionCb(nullptr),
137  disconnectionCb(nullptr),
138  verbose(_verbose),
139  initialized(false),
140  numHeartbeatsUninitialized(0),
141  exit(false),
142  enabled(false)
143  {
144  std::string gzIp;
145  if (env("GZ_IP", gzIp) && !gzIp.empty())
146  {
147  this->hostInterfaces = {gzIp};
148  }
149  // TODO(CH3): Deprecated. Remove on tock.
150  else if (env("IGN_IP", gzIp) && !gzIp.empty())
151  {
152  std::cerr << "IGN_IP is deprecated and will be removed! "
153  << "Use GZ_IP instead!" << std::endl;
154  this->hostInterfaces = {gzIp};
155  }
156  else
157  {
158  // Get the list of network interfaces in this host.
159  this->hostInterfaces = determineInterfaces();
160  }
161 
162 #ifdef _WIN32
163  WORD wVersionRequested;
164  WSADATA wsaData;
165 
166  // Request WinSock v2.2.
167  wVersionRequested = MAKEWORD(2, 2);
168  // Load WinSock DLL.
169  if (WSAStartup(wVersionRequested, &wsaData) != 0)
170  {
171  std::cerr << "Unable to load WinSock DLL" << std::endl;
172  return;
173  }
174 #endif
175  for (const auto &netIface : this->hostInterfaces)
176  {
177  auto succeed = this->RegisterNetIface(netIface);
178 
179  // If the IP address that we're selecting as the main IP address of
180  // the host is invalid, we change it to 127.0.0.1 .
181  // This is probably because IGN_IP is set to a wrong value.
182  if (netIface == this->hostAddr && !succeed)
183  {
184  this->RegisterNetIface("127.0.0.1");
185  std::cerr << "Did you set the environment variable IGN_IP with a "
186  << "correct IP address? " << std::endl
187  << " [" << netIface << "] seems an invalid local IP "
188  << "address." << std::endl
189  << " Using 127.0.0.1 as hostname." << std::endl;
190  this->hostAddr = "127.0.0.1";
191  }
192  }
193 
194  // Socket option: SO_REUSEADDR. This options is used only for receiving
195  // data. We can reuse the same socket for receiving multicast data from
196  // multiple interfaces. We will use the socket at position 0 for
197  // receiving data.
198  int reuseAddr = 1;
199  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
200  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
201  {
202  std::cerr << "Error setting socket option (SO_REUSEADDR)."
203  << std::endl;
204  return;
205  }
206 
207 #ifdef SO_REUSEPORT
208  // Socket option: SO_REUSEPORT. This options is used only for receiving
209  // data. We can reuse the same socket for receiving multicast data from
210  // multiple interfaces. We will use the socket at position 0 for
211  // receiving data.
212  int reusePort = 1;
213  // cppcheck-suppress ConfigurationNotChecked
214  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
215  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
216  {
217  std::cerr << "Error setting socket option (SO_REUSEPORT)."
218  << std::endl;
219  return;
220  }
221 #endif
222  // Bind the first socket to the discovery port.
223  sockaddr_in localAddr;
224  memset(&localAddr, 0, sizeof(localAddr));
225  localAddr.sin_family = AF_INET;
226  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
227  localAddr.sin_port = htons(static_cast<u_short>(this->port));
228 
229  if (bind(this->sockets.at(0),
230  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
231  {
232  std::cerr << "Binding to a local port failed." << std::endl;
233  return;
234  }
235 
236  // Set 'mcastAddr' to the multicast discovery group.
237  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
238  this->mcastAddr.sin_family = AF_INET;
239  this->mcastAddr.sin_addr.s_addr =
240  inet_addr(this->multicastGroup.c_str());
241  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
242 
244  std::string gzRelay = "";
245  if (env("GZ_RELAY", gzRelay) && !gzRelay.empty())
246  {
247  relays = transport::split(gzRelay, ':');
248  }
249  // TODO(CH3): Deprecated. Remove on tock.
250  else if (env("IGN_RELAY", gzRelay) && !gzRelay.empty())
251  {
252  std::cout << "IGN_RELAY is deprecated and will be removed! "
253  << "Use GZ_RELAY instead!" << std::endl;
254  relays = transport::split(gzRelay, ':');
255  }
256 
257  // Register all unicast relays.
258  for (auto const &relayAddr : relays)
259  this->AddRelayAddress(relayAddr);
260 
261  if (this->verbose)
262  this->PrintCurrentState();
263  }
264 
266  public: virtual ~Discovery()
267  {
268  // Tell the service thread to terminate.
269  this->exitMutex.lock();
270  this->exit = true;
271  this->exitMutex.unlock();
272 
273  // Wait for the service threads to finish before exit.
274  if (this->threadReception.joinable())
275  this->threadReception.join();
276 
277  // Broadcast a BYE message to trigger the remote cancellation of
278  // all our advertised topics.
279  this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
280  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
281 
282  // Close sockets.
283  for (const auto &sock : this->sockets)
284  {
285 #ifdef _WIN32
286  closesocket(sock);
287  WSACleanup();
288 #else
289  close(sock);
290 #endif
291  }
292  }
293 
297  public: void Start()
298  {
299  {
300  std::lock_guard<std::mutex> lock(this->mutex);
301 
302  // The service is already running.
303  if (this->enabled)
304  return;
305 
306  this->enabled = true;
307  }
308 
309  auto now = std::chrono::steady_clock::now();
310  this->timeNextHeartbeat = now;
311  this->timeNextActivity = now;
312 
313  // Start the thread that receives discovery information.
314  this->threadReception = std::thread(&Discovery::RecvMessages, this);
315  }
316 
321  public: bool Advertise(const Pub &_publisher)
322  {
323  {
324  std::lock_guard<std::mutex> lock(this->mutex);
325 
326  if (!this->enabled)
327  return false;
328 
329  // Add the addressing information (local publisher).
330  if (!this->info.AddPublisher(_publisher))
331  return false;
332  }
333 
334  // Only advertise a message outside this process if the scope
335  // is not 'Process'
336  if (_publisher.Options().Scope() != Scope_t::PROCESS)
337  this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
338  _publisher);
339 
340  return true;
341  }
342 
353  public: bool Discover(const std::string &_topic) const
354  {
356  bool found;
357  Addresses_M<Pub> addresses;
358 
359  {
360  std::lock_guard<std::mutex> lock(this->mutex);
361 
362  if (!this->enabled)
363  return false;
364 
365  cb = this->connectionCb;
366  }
367 
368  Pub pub;
369  pub.SetTopic(_topic);
370  pub.SetPUuid(this->pUuid);
371 
372  // Send a discovery request.
373  this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
374 
375  {
376  std::lock_guard<std::mutex> lock(this->mutex);
377  found = this->info.Publishers(_topic, addresses);
378  }
379 
380  if (found)
381  {
382  // I already have information about this topic.
383  for (const auto &proc : addresses)
384  {
385  for (const auto &node : proc.second)
386  {
387  if (cb)
388  {
389  // Execute the user's callback for a service request. Notice
390  // that we only execute one callback for preventing receive
391  // multiple service responses for a single request.
392  cb(node);
393  }
394  }
395  }
396  }
397 
398  return true;
399  }
400 
403  public: void Register(const MessagePublisher &_pub) const
404  {
405  this->SendMsg(
406  DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
407  }
408 
411  public: void Unregister(const MessagePublisher &_pub) const
412  {
413  this->SendMsg(
414  DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
415  }
416 
419  public: const TopicStorage<Pub> &Info() const
420  {
421  std::lock_guard<std::mutex> lock(this->mutex);
422  return this->info;
423  }
424 
429  public: bool Publishers(const std::string &_topic,
430  Addresses_M<Pub> &_publishers) const
431  {
432  std::lock_guard<std::mutex> lock(this->mutex);
433  return this->info.Publishers(_topic, _publishers);
434  }
435 
443  public: bool Unadvertise(const std::string &_topic,
444  const std::string &_nUuid)
445  {
446  Pub inf;
447  {
448  std::lock_guard<std::mutex> lock(this->mutex);
449 
450  if (!this->enabled)
451  return false;
452 
453  // Don't do anything if the topic is not advertised by any of my nodes
454  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
455  return true;
456 
457  // Remove the topic information.
458  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
459  }
460 
461  // Only unadvertise a message outside this process if the scope
462  // is not 'Process'.
463  if (inf.Options().Scope() != Scope_t::PROCESS)
464  {
465  this->SendMsg(DestinationType::ALL,
466  msgs::Discovery::UNADVERTISE, inf);
467  }
468 
469  return true;
470  }
471 
474  public: std::string HostAddr() const
475  {
476  std::lock_guard<std::mutex> lock(this->mutex);
477  return this->hostAddr;
478  }
479 
484  public: unsigned int ActivityInterval() const
485  {
486  std::lock_guard<std::mutex> lock(this->mutex);
487  return this->activityInterval;
488  }
489 
495  public: unsigned int HeartbeatInterval() const
496  {
497  std::lock_guard<std::mutex> lock(this->mutex);
498  return this->heartbeatInterval;
499  }
500 
505  public: unsigned int SilenceInterval() const
506  {
507  std::lock_guard<std::mutex> lock(this->mutex);
508  return this->silenceInterval;
509  }
510 
514  public: void SetActivityInterval(const unsigned int _ms)
515  {
516  std::lock_guard<std::mutex> lock(this->mutex);
517  this->activityInterval = _ms;
518  }
519 
523  public: void SetHeartbeatInterval(const unsigned int _ms)
524  {
525  std::lock_guard<std::mutex> lock(this->mutex);
526  this->heartbeatInterval = _ms;
527  }
528 
532  public: void SetSilenceInterval(const unsigned int _ms)
533  {
534  std::lock_guard<std::mutex> lock(this->mutex);
535  this->silenceInterval = _ms;
536  }
537 
542  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
543  {
544  std::lock_guard<std::mutex> lock(this->mutex);
545  this->connectionCb = _cb;
546  }
547 
552  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
553  {
554  std::lock_guard<std::mutex> lock(this->mutex);
555  this->disconnectionCb = _cb;
556  }
557 
561  public: void RegistrationsCb(const DiscoveryCallback<Pub> &_cb)
562  {
563  std::lock_guard<std::mutex> lock(this->mutex);
564  this->registrationCb = _cb;
565  }
566 
570  public: void UnregistrationsCb(const DiscoveryCallback<Pub> &_cb)
571  {
572  std::lock_guard<std::mutex> lock(this->mutex);
573  this->unregistrationCb = _cb;
574  }
575 
577  public: void PrintCurrentState() const
578  {
579  std::lock_guard<std::mutex> lock(this->mutex);
580 
581  std::cout << "---------------" << std::endl;
582  std::cout << std::boolalpha << "Enabled: "
583  << this->enabled << std::endl;
584  std::cout << "Discovery state" << std::endl;
585  std::cout << "\tUUID: " << this->pUuid << std::endl;
586  std::cout << "Settings" << std::endl;
587  std::cout << "\tActivity: " << this->activityInterval
588  << " ms." << std::endl;
589  std::cout << "\tHeartbeat: " << this->heartbeatInterval
590  << "ms." << std::endl;
591  std::cout << "\tSilence: " << this->silenceInterval
592  << " ms." << std::endl;
593  std::cout << "Known information:" << std::endl;
594  this->info.Print();
595 
596  // Used to calculate the elapsed time.
598 
599  std::cout << "Activity" << std::endl;
600  if (this->activity.empty())
601  std::cout << "\t<empty>" << std::endl;
602  else
603  {
604  for (auto &proc : this->activity)
605  {
606  // Elapsed time since the last update from this publisher.
607  std::chrono::duration<double> elapsed = now - proc.second;
608 
609  std::cout << "\t" << proc.first << std::endl;
610  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
611  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
612  << std::endl;
613  }
614  }
615  std::cout << "---------------" << std::endl;
616  }
617 
620  public: void TopicList(std::vector<std::string> &_topics) const
621  {
622  this->WaitForInit();
623  std::lock_guard<std::mutex> lock(this->mutex);
624  this->info.TopicList(_topics);
625  }
626 
629  public: void WaitForInit() const
630  {
631  std::unique_lock<std::mutex> lk(this->mutex);
632 
633  if (!this->initialized)
634  {
635  this->initializedCv.wait(lk, [this]{return this->initialized;});
636  }
637  }
638 
642  private: void UpdateActivity()
643  {
644  // The UUIDs of the processes that have expired.
646 
647  // A copy of the disconnection callback.
648  DiscoveryCallback<Pub> disconnectCb;
649 
651 
652  {
653  std::lock_guard<std::mutex> lock(this->mutex);
654 
655  if (now < this->timeNextActivity)
656  return;
657 
658  disconnectCb = this->disconnectionCb;
659 
660  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
661  {
662  // Elapsed time since the last update from this publisher.
663  auto elapsed = now - it->second;
664 
665  // This publisher has expired.
666  if (std::chrono::duration_cast<std::chrono::milliseconds>
667  (elapsed).count() > this->silenceInterval)
668  {
669  // Remove all the info entries for this process UUID.
670  this->info.DelPublishersByProc(it->first);
671 
672  uuids.push_back(it->first);
673 
674  // Remove the activity entry.
675  this->activity.erase(it++);
676  }
677  else
678  ++it;
679  }
680 
681  this->timeNextActivity = std::chrono::steady_clock::now() +
682  std::chrono::milliseconds(this->activityInterval);
683  }
684 
685  if (!disconnectCb)
686  return;
687 
688  // Notify without topic information. This is useful to inform the
689  // client that a remote node is gone, even if we were not
690  // interested in its topics.
691  for (auto const &uuid : uuids)
692  {
693  Pub publisher;
694  publisher.SetPUuid(uuid);
695  disconnectCb(publisher);
696  }
697  }
698 
700  private: void UpdateHeartbeat()
701  {
703 
704  {
705  std::lock_guard<std::mutex> lock(this->mutex);
706 
707  if (now < this->timeNextHeartbeat)
708  return;
709  }
710 
711  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
712  this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
713 
715  {
716  std::lock_guard<std::mutex> lock(this->mutex);
717 
718  // Re-advertise topics that are advertised inside this process.
719  this->info.PublishersByProc(this->pUuid, nodes);
720  }
721 
722  for (const auto &topic : nodes)
723  {
724  for (const auto &node : topic.second)
725  {
726  this->SendMsg(DestinationType::ALL,
727  msgs::Discovery::ADVERTISE, node);
728  }
729  }
730 
731  {
732  std::lock_guard<std::mutex> lock(this->mutex);
733  if (!this->initialized)
734  {
735  if (this->numHeartbeatsUninitialized == 2u)
736  {
737  // We consider discovery initialized after two heartbeat cycles.
738  this->initialized = true;
739 
740  // Notify anyone waiting for the initialization phase to finish.
741  this->initializedCv.notify_all();
742  }
743  ++this->numHeartbeatsUninitialized;
744  }
745 
746  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
747  std::chrono::milliseconds(this->heartbeatInterval);
748  }
749  }
750 
760  private: int NextTimeout() const
761  {
762  auto now = std::chrono::steady_clock::now();
763  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
764  auto timeUntilNextActivity = this->timeNextActivity - now;
765 
766  int t = static_cast<int>(
767  std::chrono::duration_cast<std::chrono::milliseconds>
768  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
769  int t2 = std::min(t, this->kTimeout);
770  return std::max(t2, 0);
771  }
772 
774  private: void RecvMessages()
775  {
776  bool timeToExit = false;
777  while (!timeToExit)
778  {
779  // Calculate the timeout.
780  int timeout = this->NextTimeout();
781 
782  if (pollSockets(this->sockets, timeout))
783  {
784  this->RecvDiscoveryUpdate();
785 
786  if (this->verbose)
787  this->PrintCurrentState();
788  }
789 
790  this->UpdateHeartbeat();
791  this->UpdateActivity();
792 
793  // Is it time to exit?
794  {
795  std::lock_guard<std::mutex> lock(this->exitMutex);
796  if (this->exit)
797  timeToExit = true;
798  }
799  }
800  }
801 
803  private: void RecvDiscoveryUpdate()
804  {
805  char rcvStr[Discovery::kMaxRcvStr];
806  sockaddr_in clntAddr;
807  socklen_t addrLen = sizeof(clntAddr);
808 
809  int64_t received = recvfrom(this->sockets.at(0),
810  reinterpret_cast<raw_type *>(rcvStr),
811  this->kMaxRcvStr, 0,
812  reinterpret_cast<sockaddr *>(&clntAddr),
813  reinterpret_cast<socklen_t *>(&addrLen));
814  if (received > 0)
815  {
816  uint16_t len = 0;
817  memcpy(&len, &rcvStr[0], sizeof(len));
818 
819  // Gazebo Transport delimits each discovery message with a
820  // frame_delimiter that contains byte size information.
821  // A discovery message has the form:
822  //
823  // <frame_delimiter><frame_body>
824  //
825  // Gazebo Transport version < 8 sends a frame delimiter that
826  // contains the value of sizeof(frame_delimiter)
827  // + sizeof(frame_body). In other words, the frame_delimiter
828  // contains a value that represents the total size of the
829  // frame_body and frame_delimiter in bytes.
830  //
831  // Gazebo Transport version >= 8 sends a frame_delimiter
832  // that contains the value of sizeof(frame_body). In other
833  // words, the frame_delimiter contains a value that represents
834  // the total size of only the frame_body.
835  //
836  // It is possible that two incompatible versions of Gazebo
837  // Transport exist on the same network. If we receive an
838  // unexpected size, then we ignore the message.
839 
840  // If-condition for version 8+
841  if (len + sizeof(len) == static_cast<uint16_t>(received))
842  {
843  std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
844  uint16_t srcPort = ntohs(clntAddr.sin_port);
845 
846  if (this->verbose)
847  {
848  std::cout << "\nReceived discovery update from "
849  << srcAddr << ": " << srcPort << std::endl;
850  }
851 
852  this->DispatchDiscoveryMsg(srcAddr, rcvStr + sizeof(len), len);
853  }
854  }
855  else if (received < 0)
856  {
857  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
858  << std::endl;
859  }
860  }
861 
866  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
867  char *_msg, uint16_t _len)
868  {
869  gz::msgs::Discovery msg;
870 
871  // Parse the message, and return if parsing failed. Parsing could
872  // fail when another discovery node is publishing messages using an
873  // older (or newer) format.
874  if (!msg.ParseFromArray(_msg, _len))
875  return;
876 
877  // Discard the message if the wire protocol is different than mine.
878  if (this->Version() != msg.version())
879  return;
880 
881  std::string recvPUuid = msg.process_uuid();
882 
883  // Discard our own discovery messages.
884  if (recvPUuid == this->pUuid)
885  return;
886 
887  // Forwarding summary:
888  // - From a unicast peer -> to multicast group (with NO_RELAY flag).
889  // - From multicast group -> to unicast peers (with RELAY flag).
890 
891  // If the RELAY flag is set, this discovery message is coming via a
892  // unicast transmission. In this case, we don't process it, we just
893  // forward it to the multicast group, and it will be dispatched once
894  // received there. Note that we also unset the RELAY flag and set the
895  // NO_RELAY flag, to avoid forwarding the message anymore.
896  if (msg.has_flags() && msg.flags().relay())
897  {
898  // Unset the RELAY flag in the header and set the NO_RELAY.
899  msg.mutable_flags()->set_relay(false);
900  msg.mutable_flags()->set_no_relay(true);
901  this->SendMulticast(msg);
902 
903  // A unicast peer contacted me. I need to save its address for
904  // sending future messages in the future.
905  this->AddRelayAddress(_fromIp);
906  return;
907  }
908  // If we are receiving this discovery message via the multicast channel
909  // and the NO_RELAY flag is not set, we forward this message via unicast
910  // to all our relays. Note that this is the most common case, where we
911  // receive a regular multicast message and we forward it to any remote
912  // relays.
913  else if (!msg.has_flags() || !msg.flags().no_relay())
914  {
915  msg.mutable_flags()->set_relay(true);
916  this->SendUnicast(msg);
917  }
918 
919  bool isSenderLocal = (std::find(this->hostInterfaces.begin(),
920  this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
921  (_fromIp.find("127.") == 0);
922 
923  // Update timestamp and cache the callbacks.
924  DiscoveryCallback<Pub> connectCb;
925  DiscoveryCallback<Pub> disconnectCb;
926  DiscoveryCallback<Pub> registerCb;
927  DiscoveryCallback<Pub> unregisterCb;
928  {
929  std::lock_guard<std::mutex> lock(this->mutex);
930  this->activity[recvPUuid] = std::chrono::steady_clock::now();
931  connectCb = this->connectionCb;
932  disconnectCb = this->disconnectionCb;
933  registerCb = this->registrationCb;
934  unregisterCb = this->unregistrationCb;
935  }
936 
937  switch (msg.type())
938  {
939  case msgs::Discovery::ADVERTISE:
940  {
941  // Read the rest of the fields.
942  Pub publisher;
943  publisher.SetFromDiscovery(msg);
944 
945  // Check scope of the topic.
946  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
947  (publisher.Options().Scope() == Scope_t::HOST &&
948  !isSenderLocal))
949  {
950  return;
951  }
952 
953  // Register an advertised address for the topic.
954  bool added;
955  {
956  std::lock_guard<std::mutex> lock(this->mutex);
957  added = this->info.AddPublisher(publisher);
958  }
959 
960  if (added && connectCb)
961  {
962  // Execute the client's callback.
963  connectCb(publisher);
964  }
965 
966  break;
967  }
968  case msgs::Discovery::SUBSCRIBE:
969  {
970  std::string recvTopic;
971  // Read the topic information.
972  if (msg.has_sub())
973  {
974  recvTopic = msg.sub().topic();
975  }
976  else
977  {
978  std::cerr << "Subscription discovery message is missing "
979  << "Subscriber information.\n";
980  break;
981  }
982 
983  // Check if at least one of my nodes advertises the topic requested.
984  Addresses_M<Pub> addresses;
985  {
986  std::lock_guard<std::mutex> lock(this->mutex);
987  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
988  {
989  break;
990  }
991 
992  if (!this->info.Publishers(recvTopic, addresses))
993  break;
994  }
995 
996  for (const auto &nodeInfo : addresses[this->pUuid])
997  {
998  // Check scope of the topic.
999  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
1000  (nodeInfo.Options().Scope() == Scope_t::HOST &&
1001  !isSenderLocal))
1002  {
1003  continue;
1004  }
1005 
1006  // Answer an ADVERTISE message.
1007  this->SendMsg(DestinationType::ALL,
1008  msgs::Discovery::ADVERTISE, nodeInfo);
1009  }
1010 
1011  break;
1012  }
1013  case msgs::Discovery::NEW_CONNECTION:
1014  {
1015  // Read the rest of the fields.
1016  Pub publisher;
1017  publisher.SetFromDiscovery(msg);
1018 
1019  if (registerCb)
1020  registerCb(publisher);
1021 
1022  break;
1023  }
1024  case msgs::Discovery::END_CONNECTION:
1025  {
1026  // Read the rest of the fields.
1027  Pub publisher;
1028  publisher.SetFromDiscovery(msg);
1029 
1030  if (unregisterCb)
1031  unregisterCb(publisher);
1032 
1033  break;
1034  }
1035  case msgs::Discovery::HEARTBEAT:
1036  {
1037  // The timestamp has already been updated.
1038  break;
1039  }
1040  case msgs::Discovery::BYE:
1041  {
1042  // Remove the activity entry for this publisher.
1043  {
1044  std::lock_guard<std::mutex> lock(this->mutex);
1045  this->activity.erase(recvPUuid);
1046  }
1047 
1048  if (disconnectCb)
1049  {
1050  Pub pub;
1051  pub.SetPUuid(recvPUuid);
1052  // Notify the new disconnection.
1053  disconnectCb(pub);
1054  }
1055 
1056  // Remove the address entry for this topic.
1057  {
1058  std::lock_guard<std::mutex> lock(this->mutex);
1059  this->info.DelPublishersByProc(recvPUuid);
1060  }
1061 
1062  break;
1063  }
1064  case msgs::Discovery::UNADVERTISE:
1065  {
1066  // Read the address.
1067  Pub publisher;
1068  publisher.SetFromDiscovery(msg);
1069 
1070  // Check scope of the topic.
1071  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1072  (publisher.Options().Scope() == Scope_t::HOST &&
1073  !isSenderLocal))
1074  {
1075  return;
1076  }
1077 
1078  if (disconnectCb)
1079  {
1080  // Notify the new disconnection.
1081  disconnectCb(publisher);
1082  }
1083 
1084  // Remove the address entry for this topic.
1085  {
1086  std::lock_guard<std::mutex> lock(this->mutex);
1087  this->info.DelPublisherByNode(publisher.Topic(),
1088  publisher.PUuid(), publisher.NUuid());
1089  }
1090 
1091  break;
1092  }
1093  default:
1094  {
1095  std::cerr << "Unknown message type [" << msg.type() << "].\n";
1096  break;
1097  }
1098  }
1099  }
1100 
1107  private: template<typename T>
1108  void SendMsg(const DestinationType &_destType,
1109  const msgs::Discovery::Type _type,
1110  const T &_pub) const
1111  {
1112  gz::msgs::Discovery discoveryMsg;
1113  discoveryMsg.set_version(this->Version());
1114  discoveryMsg.set_type(_type);
1115  discoveryMsg.set_process_uuid(this->pUuid);
1116 
1117  switch (_type)
1118  {
1119  case msgs::Discovery::ADVERTISE:
1120  case msgs::Discovery::UNADVERTISE:
1121  case msgs::Discovery::NEW_CONNECTION:
1122  case msgs::Discovery::END_CONNECTION:
1123  {
1124  _pub.FillDiscovery(discoveryMsg);
1125  break;
1126  }
1127  case msgs::Discovery::SUBSCRIBE:
1128  {
1129  discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1130  break;
1131  }
1132  case msgs::Discovery::HEARTBEAT:
1133  case msgs::Discovery::BYE:
1134  break;
1135  default:
1136  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1137  << " type [" << _type << "]" << std::endl;
1138  return;
1139  }
1140 
1141  if (_destType == DestinationType::MULTICAST ||
1142  _destType == DestinationType::ALL)
1143  {
1144  this->SendMulticast(discoveryMsg);
1145  }
1146 
1147  // Send the discovery message to the unicast relays.
1148  if (_destType == DestinationType::UNICAST ||
1149  _destType == DestinationType::ALL)
1150  {
1151  // Set the RELAY flag in the header.
1152  discoveryMsg.mutable_flags()->set_relay(true);
1153  this->SendUnicast(discoveryMsg);
1154  }
1155 
1156  if (this->verbose)
1157  {
1158  std::cout << "\t* Sending " << msgs::ToString(_type)
1159  << " msg [" << _pub.Topic() << "]" << std::endl;
1160  }
1161  }
1162 
1165  private: void SendUnicast(const msgs::Discovery &_msg) const
1166  {
1167  uint16_t msgSize;
1168 
1169 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1170  size_t msgSizeFull = _msg.ByteSizeLong();
1171 #else
1172  int msgSizeFull = _msg.ByteSize();
1173 #endif
1174  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1175  {
1176  std::cerr << "Discovery message too large to send. Discovery won't "
1177  << "work. This shouldn't happen.\n";
1178  return;
1179  }
1180  msgSize = msgSizeFull;
1181 
1182  uint16_t totalSize = sizeof(msgSize) + msgSize;
1183  char *buffer = static_cast<char *>(new char[totalSize]);
1184  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1185 
1186  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1187  {
1188  // Send the discovery message to the unicast relays.
1189  for (const auto &sockAddr : this->relayAddrs)
1190  {
1191  errno = 0;
1192  auto sent = sendto(this->sockets.at(0),
1193  reinterpret_cast<const raw_type *>(
1194  reinterpret_cast<const unsigned char*>(buffer)),
1195  totalSize, 0,
1196  reinterpret_cast<const sockaddr *>(&sockAddr),
1197  sizeof(sockAddr));
1198 
1199  if (sent != totalSize)
1200  {
1201  std::cerr << "Exception sending a unicast message:" << std::endl;
1202  std::cerr << " Return value: " << sent << std::endl;
1203  std::cerr << " Error code: " << strerror(errno) << std::endl;
1204  break;
1205  }
1206  }
1207  }
1208  else
1209  {
1210  std::cerr << "Discovery::SendUnicast: Error serializing data."
1211  << std::endl;
1212  }
1213 
1214  delete [] buffer;
1215  }
1216 
1219  private: void SendMulticast(const msgs::Discovery &_msg) const
1220  {
1221  uint16_t msgSize;
1222 
1223 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1224  size_t msgSizeFull = _msg.ByteSizeLong();
1225 #else
1226  int msgSizeFull = _msg.ByteSize();
1227 #endif
1228  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1229  {
1230  std::cerr << "Discovery message too large to send. Discovery won't "
1231  << "work. This shouldn't happen.\n";
1232  return;
1233  }
1234 
1235  msgSize = msgSizeFull;
1236  uint16_t totalSize = sizeof(msgSize) + msgSize;
1237  char *buffer = static_cast<char *>(new char[totalSize]);
1238  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1239 
1240  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1241  {
1242  // Send the discovery message to the multicast group through all the
1243  // sockets.
1244  for (const auto &sock : this->Sockets())
1245  {
1246  errno = 0;
1247  if (sendto(sock, reinterpret_cast<const raw_type *>(
1248  reinterpret_cast<const unsigned char*>(buffer)),
1249  totalSize, 0,
1250  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1251  sizeof(*(this->MulticastAddr()))) != totalSize)
1252  {
1253  // Ignore EPERM and ENOBUFS errors.
1254  //
1255  // See issue #106
1256  //
1257  // Rationale drawn from:
1258  //
1259  // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1260  // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1261  if (errno != EPERM && errno != ENOBUFS)
1262  {
1263  std::cerr << "Exception sending a multicast message:"
1264  << strerror(errno) << std::endl;
1265  }
1266  break;
1267  }
1268  }
1269  }
1270  else
1271  {
1272  std::cerr << "Discovery::SendMulticast: Error serializing data."
1273  << std::endl;
1274  }
1275 
1276  delete [] buffer;
1277  }
1278 
1281  private: const std::vector<int> &Sockets() const
1282  {
1283  return this->sockets;
1284  }
1285 
1288  private: const sockaddr_in *MulticastAddr() const
1289  {
1290  return &this->mcastAddr;
1291  }
1292 
1295  private: uint8_t Version() const
1296  {
1297  static std::string gzStats;
1298  static int topicStats;
1299 
1300  if (env("GZ_TRANSPORT_TOPIC_STATISTICS", gzStats) && !gzStats.empty())
1301  {
1302  topicStats = (gzStats == "1");
1303  }
1304  // TODO(CH3): Deprecated. Remove on tock.
1305  else if (env("IGN_TRANSPORT_TOPIC_STATISTICS", gzStats)
1306  && !gzStats.empty())
1307  {
1308  std::cout << "IGN_TRANSPORT_TOPIC_STATISTICS is deprecated! "
1309  << "Use GZ_TRANSPORT_TOPIC_STATISTICS instead!"
1310  << std::endl;
1311  topicStats = (gzStats == "1");
1312  }
1313 
1314  return this->kWireVersion + (topicStats * 100);
1315  }
1316 
1321  private: bool RegisterNetIface(const std::string &_ip)
1322  {
1323  // Make a new socket for sending discovery information.
1324  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1325  if (sock < 0)
1326  {
1327  std::cerr << "Socket creation failed." << std::endl;
1328  return false;
1329  }
1330 
1331  // Socket option: IP_MULTICAST_IF.
1332  // This socket option needs to be applied to each socket used to send
1333  // data. This option selects the source interface for outgoing messages.
1334  struct in_addr ifAddr;
1335  ifAddr.s_addr = inet_addr(_ip.c_str());
1336  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1337  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1338  {
1339  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1340  << std::endl;
1341  return false;
1342  }
1343 
1344  this->sockets.push_back(sock);
1345 
1346  // Join the multicast group. We have to do it for each network interface
1347  // but we can do it on the same socket. We will use the socket at
1348  // position 0 for receiving multicast information.
1349  struct ip_mreq group;
1350  group.imr_multiaddr.s_addr =
1351  inet_addr(this->multicastGroup.c_str());
1352  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1353  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1354  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1355  {
1356  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1357  << std::endl;
1358  return false;
1359  }
1360 
1361  return true;
1362  }
1363 
1366  private: void AddRelayAddress(const std::string &_ip)
1367  {
1368  // Sanity check: Make sure that this IP address is not already saved.
1369  for (auto const &addr : this->relayAddrs)
1370  {
1371  if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
1372  return;
1373  }
1374 
1375  sockaddr_in addr;
1376  memset(&addr, 0, sizeof(addr));
1377  addr.sin_family = AF_INET;
1378  addr.sin_addr.s_addr = inet_addr(_ip.c_str());
1379  addr.sin_port = htons(static_cast<u_short>(this->port));
1380 
1381  this->relayAddrs.push_back(addr);
1382  }
1383 
1387  private: static const unsigned int kDefActivityInterval = 100;
1388 
1392  private: static const unsigned int kDefHeartbeatInterval = 1000;
1393 
1397  private: static const unsigned int kDefSilenceInterval = 3000;
1398 
1400  private: std::string multicastGroup;
1401 
1403  private: const int kTimeout = 250;
1404 
1406  private: static const uint16_t kMaxRcvStr =
1408 
1411  private: static const uint8_t kWireVersion = 10;
1412 
1414  private: int port;
1415 
1417  private: std::string hostAddr;
1418 
1420  private: std::vector<std::string> hostInterfaces;
1421 
1423  private: std::string pUuid;
1424 
1428  private: unsigned int silenceInterval;
1429 
1433  private: unsigned int activityInterval;
1434 
1438  private: unsigned int heartbeatInterval;
1439 
1441  private: DiscoveryCallback<Pub> connectionCb;
1442 
1444  private: DiscoveryCallback<Pub> disconnectionCb;
1445 
1447  private: DiscoveryCallback<Pub> registrationCb;
1448 
1450  private: DiscoveryCallback<Pub> unregistrationCb;
1451 
1453  private: TopicStorage<Pub> info;
1454 
1460 
1462  private: bool verbose;
1463 
1465  private: std::vector<int> sockets;
1466 
1468  private: sockaddr_in mcastAddr;
1469 
1471  private: std::vector<sockaddr_in> relayAddrs;
1472 
1474  private: mutable std::mutex mutex;
1475 
1477  private: std::thread threadReception;
1478 
1480  private: Timestamp timeNextHeartbeat;
1481 
1483  private: Timestamp timeNextActivity;
1484 
1486  private: std::mutex exitMutex;
1487 
1492  private: bool initialized;
1493 
1495  private: unsigned int numHeartbeatsUninitialized;
1496 
1498  private: mutable std::condition_variable initializedCv;
1499 
1501  private: bool exit;
1502 
1504  private: bool enabled;
1505  };
1506 
1510 
1514  }
1515  }
1516 }
1517 
1518 #endif