Gazebo Transport

API Reference

13.4.0
Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GZ_TRANSPORT_DISCOVERY_HH_
19 #define GZ_TRANSPORT_DISCOVERY_HH_
20 #include <errno.h>
21 #include <string.h>
22 
23 #ifdef _WIN32
24  // For socket(), connect(), send(), and recv().
25  #include <Winsock2.h>
26  #include <Ws2def.h>
27  #include <Ws2ipdef.h>
28  #include <Ws2tcpip.h>
29  // Type used for raw data on this platform.
30  using raw_type = char;
31 #else
32  // For data types
33  #include <sys/types.h>
34  // For socket(), connect(), send(), and recv()
35  #include <sys/socket.h>
36  // For gethostbyname()
37  #include <netdb.h>
38  // For inet_addr()
39  #include <arpa/inet.h>
40  // For close()
41  #include <unistd.h>
42  // For sockaddr_in
43  #include <netinet/in.h>
44  // Type used for raw data on this platform
45  using raw_type = void;
46 #endif
47 
48 #ifdef _WIN32
49  #pragma warning(push, 0)
50 #endif
51 #ifdef _WIN32
52  #pragma warning(pop)
53  // Suppress "decorated name length exceed" warning in STL.
54  #pragma warning(disable: 4503)
55  // Suppress "depreted API warnings" in WINSOCK.
56  #pragma warning(disable: 4996)
57 #endif
58 
59 #include <gz/msgs/discovery.pb.h>
60 
61 #include <algorithm>
62 #include <condition_variable>
63 #include <limits>
64 #include <map>
65 #include <memory>
66 #include <mutex>
67 #include <string>
68 #include <thread>
69 #include <vector>
70 
71 #include <gz/msgs/Utility.hh>
72 
73 #include "gz/transport/config.hh"
74 #include "gz/transport/Export.hh"
75 #include "gz/transport/Helpers.hh"
76 #include "gz/transport/NetUtils.hh"
80 
81 namespace gz
82 {
83  namespace transport
84  {
85  // Inline bracket to help doxygen filtering.
86  inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
88  enum class DestinationType
89  {
91  UNICAST,
93  MULTICAST,
95  ALL
96  };
97 
98  //
104  bool GZ_TRANSPORT_VISIBLE pollSockets(
105  const std::vector<int> &_sockets,
106  const int _timeout);
107 
116  template<typename Pub>
117  class Discovery
118  {
125  public: Discovery(const std::string &_pUuid,
126  const std::string &_ip,
127  const int _port,
128  const bool _verbose = false)
129  : multicastGroup(_ip),
130  port(_port),
131  hostAddr(determineHost()),
132  pUuid(_pUuid),
133  silenceInterval(kDefSilenceInterval),
134  activityInterval(kDefActivityInterval),
135  heartbeatInterval(kDefHeartbeatInterval),
136  connectionCb(nullptr),
137  disconnectionCb(nullptr),
138  verbose(_verbose),
139  initialized(false),
140  numHeartbeatsUninitialized(0),
141  exit(false),
142  enabled(false)
143  {
144  std::string gzIp;
145  if (env("GZ_IP", gzIp) && !gzIp.empty())
146  {
147  this->hostInterfaces = {gzIp};
148  }
149  else
150  {
151  // Get the list of network interfaces in this host.
152  this->hostInterfaces = determineInterfaces();
153  }
154 
155 #ifdef _WIN32
156  WORD wVersionRequested;
157  WSADATA wsaData;
158 
159  // Request WinSock v2.2.
160  wVersionRequested = MAKEWORD(2, 2);
161  // Load WinSock DLL.
162  if (WSAStartup(wVersionRequested, &wsaData) != 0)
163  {
164  std::cerr << "Unable to load WinSock DLL" << std::endl;
165  return;
166  }
167 #endif
168  for (const auto &netIface : this->hostInterfaces)
169  {
170  auto succeed = this->RegisterNetIface(netIface);
171 
172  // If the IP address that we're selecting as the main IP address of
173  // the host is invalid, we change it to 127.0.0.1 .
174  // This is probably because GZ_IP is set to a wrong value.
175  if (netIface == this->hostAddr && !succeed)
176  {
177  this->RegisterNetIface("127.0.0.1");
178  std::cerr << "Did you set the environment variable GZ_IP with a "
179  << "correct IP address? " << std::endl
180  << " [" << netIface << "] seems an invalid local IP "
181  << "address." << std::endl
182  << " Using 127.0.0.1 as hostname." << std::endl;
183  this->hostAddr = "127.0.0.1";
184  }
185  }
186 
187  // Socket option: SO_REUSEADDR. This options is used only for receiving
188  // data. We can reuse the same socket for receiving multicast data from
189  // multiple interfaces. We will use the socket at position 0 for
190  // receiving data.
191  int reuseAddr = 1;
192  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
193  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
194  {
195  std::cerr << "Error setting socket option (SO_REUSEADDR)."
196  << std::endl;
197  return;
198  }
199 
200 #ifdef SO_REUSEPORT
201  // Socket option: SO_REUSEPORT. This options is used only for receiving
202  // data. We can reuse the same socket for receiving multicast data from
203  // multiple interfaces. We will use the socket at position 0 for
204  // receiving data.
205  int reusePort = 1;
206  // cppcheck-suppress ConfigurationNotChecked
207  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
208  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
209  {
210  std::cerr << "Error setting socket option (SO_REUSEPORT)."
211  << std::endl;
212  return;
213  }
214 #endif
215  // Bind the first socket to the discovery port.
216  sockaddr_in localAddr;
217  memset(&localAddr, 0, sizeof(localAddr));
218  localAddr.sin_family = AF_INET;
219  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
220  localAddr.sin_port = htons(static_cast<u_short>(this->port));
221 
222  if (bind(this->sockets.at(0),
223  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
224  {
225  std::cerr << "Binding to a local port failed." << std::endl;
226  return;
227  }
228 
229  // Set 'mcastAddr' to the multicast discovery group.
230  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
231  this->mcastAddr.sin_family = AF_INET;
232  this->mcastAddr.sin_addr.s_addr =
233  inet_addr(this->multicastGroup.c_str());
234  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
235 
237  std::string gzRelay = "";
238  if (env("GZ_RELAY", gzRelay) && !gzRelay.empty())
239  {
240  relays = transport::split(gzRelay, ':');
241  }
242 
243  // Register all unicast relays.
244  for (auto const &relayAddr : relays)
245  this->AddRelayAddress(relayAddr);
246 
247  if (this->verbose)
248  this->PrintCurrentState();
249  }
250 
252  public: virtual ~Discovery()
253  {
254  // Tell the service thread to terminate.
255  this->exitMutex.lock();
256  this->exit = true;
257  this->exitMutex.unlock();
258 
259  // Wait for the service threads to finish before exit.
260  if (this->threadReception.joinable())
261  this->threadReception.join();
262 
263  // Broadcast a BYE message to trigger the remote cancellation of
264  // all our advertised topics.
265  this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
266  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
267 
268  // Close sockets.
269  for (const auto &sock : this->sockets)
270  {
271 #ifdef _WIN32
272  closesocket(sock);
273  WSACleanup();
274 #else
275  close(sock);
276 #endif
277  }
278  }
279 
283  public: void Start()
284  {
285  {
286  std::lock_guard<std::mutex> lock(this->mutex);
287 
288  // The service is already running.
289  if (this->enabled)
290  return;
291 
292  this->enabled = true;
293  }
294 
295  auto now = std::chrono::steady_clock::now();
296  this->timeNextHeartbeat = now;
297  this->timeNextActivity = now;
298 
299  // Start the thread that receives discovery information.
300  this->threadReception = std::thread(&Discovery::RecvMessages, this);
301  }
302 
307  public: bool Advertise(const Pub &_publisher)
308  {
310 
311  {
312  std::lock_guard<std::mutex> lock(this->mutex);
313 
314  if (!this->enabled)
315  return false;
316 
317  // Add the addressing information (local publisher).
318  if (!this->info.AddPublisher(_publisher))
319  return false;
320 
321  cb = this->connectionCb;
322  }
323 
324  if (cb)
325  cb(_publisher);
326 
327  // Only advertise a message outside this process if the scope
328  // is not 'Process'
329  if (_publisher.Options().Scope() != Scope_t::PROCESS)
330  this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
331  _publisher);
332 
333  return true;
334  }
335 
346  public: bool Discover(const std::string &_topic) const
347  {
349  bool found;
350  Addresses_M<Pub> addresses;
351 
352  {
353  std::lock_guard<std::mutex> lock(this->mutex);
354 
355  if (!this->enabled)
356  return false;
357 
358  cb = this->connectionCb;
359  }
360 
361  Pub pub;
362  pub.SetTopic(_topic);
363  pub.SetPUuid(this->pUuid);
364 
365  // Send a discovery request.
366  this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
367 
368  {
369  std::lock_guard<std::mutex> lock(this->mutex);
370  found = this->info.Publishers(_topic, addresses);
371  }
372 
373  if (found)
374  {
375  // I already have information about this topic.
376  for (const auto &proc : addresses)
377  {
378  for (const auto &node : proc.second)
379  {
380  if (cb)
381  {
382  // Execute the user's callback for a service request. Notice
383  // that we only execute one callback for preventing receive
384  // multiple service responses for a single request.
385  cb(node);
386  }
387  }
388  }
389  }
390 
391  return true;
392  }
393 
396  public: void SendSubscribersRep(const MessagePublisher &_pub) const
397  {
398  this->SendMsg(
399  DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REP, _pub);
400  }
401 
404  public: void Register(const MessagePublisher &_pub) const
405  {
406  this->SendMsg(
407  DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
408  }
409 
412  public: void Unregister(const MessagePublisher &_pub) const
413  {
414  this->SendMsg(
415  DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
416  }
417 
420  public: const TopicStorage<Pub> &Info() const
421  {
422  std::lock_guard<std::mutex> lock(this->mutex);
423  return this->info;
424  }
425 
430  public: bool Publishers(const std::string &_topic,
431  Addresses_M<Pub> &_publishers) const
432  {
433  std::lock_guard<std::mutex> lock(this->mutex);
434  return this->info.Publishers(_topic, _publishers);
435  }
436 
441  public: bool RemoteSubscribers(const std::string &_topic,
442  Addresses_M<Pub> &_subscribers) const
443  {
444  std::lock_guard<std::mutex> lock(this->mutex);
445  return this->remoteSubscribers.Publishers(_topic, _subscribers);
446  }
447 
455  public: bool Unadvertise(const std::string &_topic,
456  const std::string &_nUuid)
457  {
458  Pub inf;
459  {
460  std::lock_guard<std::mutex> lock(this->mutex);
461 
462  if (!this->enabled)
463  return false;
464 
465  // Don't do anything if the topic is not advertised by any of my nodes
466  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
467  return true;
468 
469  // Remove the topic information.
470  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
471  }
472 
473  // Only unadvertise a message outside this process if the scope
474  // is not 'Process'.
475  if (inf.Options().Scope() != Scope_t::PROCESS)
476  {
477  this->SendMsg(DestinationType::ALL,
478  msgs::Discovery::UNADVERTISE, inf);
479  }
480 
481  return true;
482  }
483 
486  public: std::string HostAddr() const
487  {
488  std::lock_guard<std::mutex> lock(this->mutex);
489  return this->hostAddr;
490  }
491 
496  public: unsigned int ActivityInterval() const
497  {
498  std::lock_guard<std::mutex> lock(this->mutex);
499  return this->activityInterval;
500  }
501 
507  public: unsigned int HeartbeatInterval() const
508  {
509  std::lock_guard<std::mutex> lock(this->mutex);
510  return this->heartbeatInterval;
511  }
512 
517  public: unsigned int SilenceInterval() const
518  {
519  std::lock_guard<std::mutex> lock(this->mutex);
520  return this->silenceInterval;
521  }
522 
526  public: void SetActivityInterval(const unsigned int _ms)
527  {
528  std::lock_guard<std::mutex> lock(this->mutex);
529  this->activityInterval = _ms;
530  }
531 
535  public: void SetHeartbeatInterval(const unsigned int _ms)
536  {
537  std::lock_guard<std::mutex> lock(this->mutex);
538  this->heartbeatInterval = _ms;
539  }
540 
544  public: void SetSilenceInterval(const unsigned int _ms)
545  {
546  std::lock_guard<std::mutex> lock(this->mutex);
547  this->silenceInterval = _ms;
548  }
549 
554  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
555  {
556  std::lock_guard<std::mutex> lock(this->mutex);
557  this->connectionCb = _cb;
558  }
559 
564  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
565  {
566  std::lock_guard<std::mutex> lock(this->mutex);
567  this->disconnectionCb = _cb;
568  }
569 
573  public: void RegistrationsCb(const DiscoveryCallback<Pub> &_cb)
574  {
575  std::lock_guard<std::mutex> lock(this->mutex);
576  this->registrationCb = _cb;
577  }
578 
582  public: void UnregistrationsCb(const DiscoveryCallback<Pub> &_cb)
583  {
584  std::lock_guard<std::mutex> lock(this->mutex);
585  this->unregistrationCb = _cb;
586  }
587 
591  public: void SubscribersCb(const std::function<void()> &_cb)
592  {
593  std::lock_guard<std::mutex> lock(this->mutex);
594  this->subscribersCb = _cb;
595  }
596 
598  public: void PrintCurrentState() const
599  {
600  std::lock_guard<std::mutex> lock(this->mutex);
601 
602  std::cout << "---------------" << std::endl;
603  std::cout << std::boolalpha << "Enabled: "
604  << this->enabled << std::endl;
605  std::cout << "Discovery state" << std::endl;
606  std::cout << "\tUUID: " << this->pUuid << std::endl;
607  std::cout << "Settings" << std::endl;
608  std::cout << "\tActivity: " << this->activityInterval
609  << " ms." << std::endl;
610  std::cout << "\tHeartbeat: " << this->heartbeatInterval
611  << "ms." << std::endl;
612  std::cout << "\tSilence: " << this->silenceInterval
613  << " ms." << std::endl;
614  std::cout << "Known information:" << std::endl;
615  this->info.Print();
616 
617  // Used to calculate the elapsed time.
619 
620  std::cout << "Activity" << std::endl;
621  if (this->activity.empty())
622  std::cout << "\t<empty>" << std::endl;
623  else
624  {
625  for (auto &proc : this->activity)
626  {
627  // Elapsed time since the last update from this publisher.
628  std::chrono::duration<double> elapsed = now - proc.second;
629 
630  std::cout << "\t" << proc.first << std::endl;
631  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
632  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
633  << std::endl;
634  }
635  }
636  std::cout << "---------------" << std::endl;
637  }
638 
642  public: void TopicList(std::vector<std::string> &_topics)
643  {
644  {
645  std::lock_guard<std::mutex> lock(this->mutex);
646  this->remoteSubscribers.Clear();
647  }
648 
649  // Request the list of subscribers.
650  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
651  this->SendMsg(
652  DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REQ, pub);
653 
654  this->WaitForInit();
655  std::lock_guard<std::mutex> lock(this->mutex);
656  this->info.TopicList(_topics);
657 
658  std::vector<std::string> remoteSubs;
659  this->remoteSubscribers.TopicList(remoteSubs);
660 
661  // Add the remote subscribers
662  for (auto const &t : remoteSubs)
663  {
664  if (std::find(_topics.begin(), _topics.end(), t) == _topics.end())
665  {
666  _topics.push_back(t);
667  }
668  }
669  }
670 
673  public: void WaitForInit() const
674  {
675  std::unique_lock<std::mutex> lk(this->mutex);
676 
677  if (!this->initialized)
678  {
679  this->initializedCv.wait(lk, [this]{return this->initialized;});
680  }
681  }
682 
686  private: void UpdateActivity()
687  {
688  // The UUIDs of the processes that have expired.
690 
691  // A copy of the disconnection callback.
692  DiscoveryCallback<Pub> disconnectCb;
693 
695 
696  {
697  std::lock_guard<std::mutex> lock(this->mutex);
698 
699  if (now < this->timeNextActivity)
700  return;
701 
702  disconnectCb = this->disconnectionCb;
703 
704  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
705  {
706  // Elapsed time since the last update from this publisher.
707  auto elapsed = now - it->second;
708 
709  // This publisher has expired.
710  if (std::chrono::duration_cast<std::chrono::milliseconds>
711  (elapsed).count() > this->silenceInterval)
712  {
713  // Remove all the info entries for this process UUID.
714  this->info.DelPublishersByProc(it->first);
715 
716  uuids.push_back(it->first);
717 
718  // Remove the activity entry.
719  this->activity.erase(it++);
720  }
721  else
722  ++it;
723  }
724 
725  this->timeNextActivity = std::chrono::steady_clock::now() +
726  std::chrono::milliseconds(this->activityInterval);
727  }
728 
729  if (!disconnectCb)
730  return;
731 
732  // Notify without topic information. This is useful to inform the
733  // client that a remote node is gone, even if we were not
734  // interested in its topics.
735  for (auto const &uuid : uuids)
736  {
737  Pub publisher;
738  publisher.SetPUuid(uuid);
739  disconnectCb(publisher);
740  }
741  }
742 
744  private: void UpdateHeartbeat()
745  {
747 
748  {
749  std::lock_guard<std::mutex> lock(this->mutex);
750 
751  if (now < this->timeNextHeartbeat)
752  return;
753  }
754 
755  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
756  this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
757 
759  {
760  std::lock_guard<std::mutex> lock(this->mutex);
761 
762  // Re-advertise topics that are advertised inside this process.
763  this->info.PublishersByProc(this->pUuid, nodes);
764  }
765 
766  for (const auto &topic : nodes)
767  {
768  for (const auto &node : topic.second)
769  {
770  this->SendMsg(DestinationType::ALL,
771  msgs::Discovery::ADVERTISE, node);
772  }
773  }
774 
775  {
776  std::lock_guard<std::mutex> lock(this->mutex);
777  if (!this->initialized)
778  {
779  if (this->numHeartbeatsUninitialized == 2u)
780  {
781  // We consider discovery initialized after two heartbeat cycles.
782  this->initialized = true;
783 
784  // Notify anyone waiting for the initialization phase to finish.
785  this->initializedCv.notify_all();
786  }
787  ++this->numHeartbeatsUninitialized;
788  }
789 
790  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
791  std::chrono::milliseconds(this->heartbeatInterval);
792  }
793  }
794 
804  private: int NextTimeout() const
805  {
806  auto now = std::chrono::steady_clock::now();
807  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
808  auto timeUntilNextActivity = this->timeNextActivity - now;
809 
810  int t = static_cast<int>(
811  std::chrono::duration_cast<std::chrono::milliseconds>
812  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
813  int t2 = std::min(t, this->kTimeout);
814  return std::max(t2, 0);
815  }
816 
818  private: void RecvMessages()
819  {
820  bool timeToExit = false;
821  while (!timeToExit)
822  {
823  // Calculate the timeout.
824  int timeout = this->NextTimeout();
825 
826  if (pollSockets(this->sockets, timeout))
827  {
828  this->RecvDiscoveryUpdate();
829 
830  if (this->verbose)
831  this->PrintCurrentState();
832  }
833 
834  this->UpdateHeartbeat();
835  this->UpdateActivity();
836 
837  // Is it time to exit?
838  {
839  std::lock_guard<std::mutex> lock(this->exitMutex);
840  if (this->exit)
841  timeToExit = true;
842  }
843  }
844  }
845 
847  private: void RecvDiscoveryUpdate()
848  {
849  char rcvStr[Discovery::kMaxRcvStr];
850  sockaddr_in clntAddr;
851  socklen_t addrLen = sizeof(clntAddr);
852 
853  int64_t received = recvfrom(this->sockets.at(0),
854  reinterpret_cast<raw_type *>(rcvStr),
855  this->kMaxRcvStr, 0,
856  reinterpret_cast<sockaddr *>(&clntAddr),
857  reinterpret_cast<socklen_t *>(&addrLen));
858  if (received > 0)
859  {
860  uint16_t len = 0;
861  memcpy(&len, &rcvStr[0], sizeof(len));
862 
863  // Gazebo Transport delimits each discovery message with a
864  // frame_delimiter that contains byte size information.
865  // A discovery message has the form:
866  //
867  // <frame_delimiter><frame_body>
868  //
869  // Gazebo Transport version < 8 sends a frame delimiter that
870  // contains the value of sizeof(frame_delimiter)
871  // + sizeof(frame_body). In other words, the frame_delimiter
872  // contains a value that represents the total size of the
873  // frame_body and frame_delimiter in bytes.
874  //
875  // Gazebo Transport version >= 8 sends a frame_delimiter
876  // that contains the value of sizeof(frame_body). In other
877  // words, the frame_delimiter contains a value that represents
878  // the total size of only the frame_body.
879  //
880  // It is possible that two incompatible versions of Gazebo
881  // Transport exist on the same network. If we receive an
882  // unexpected size, then we ignore the message.
883 
884  // If-condition for version 8+
885  if (len + sizeof(len) == static_cast<uint16_t>(received))
886  {
887  std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
888  uint16_t srcPort = ntohs(clntAddr.sin_port);
889 
890  if (this->verbose)
891  {
892  std::cout << "\nReceived discovery update from "
893  << srcAddr << ": " << srcPort << std::endl;
894  }
895 
896  this->DispatchDiscoveryMsg(srcAddr, rcvStr + sizeof(len), len);
897  }
898  }
899  else if (received < 0)
900  {
901  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
902  << std::endl;
903  }
904  }
905 
910  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
911  char *_msg, uint16_t _len)
912  {
913  gz::msgs::Discovery msg;
914 
915  // Parse the message, and return if parsing failed. Parsing could
916  // fail when another discovery node is publishing messages using an
917  // older (or newer) format.
918  if (!msg.ParseFromArray(_msg, _len))
919  return;
920 
921  // Discard the message if the wire protocol is different than mine.
922  if (this->Version() != msg.version())
923  return;
924 
925  std::string recvPUuid = msg.process_uuid();
926 
927  // Discard our own discovery messages.
928  if (recvPUuid == this->pUuid)
929  return;
930 
931  // Forwarding summary:
932  // - From a unicast peer -> to multicast group (with NO_RELAY flag).
933  // - From multicast group -> to unicast peers (with RELAY flag).
934 
935  // If the RELAY flag is set, this discovery message is coming via a
936  // unicast transmission. In this case, we don't process it, we just
937  // forward it to the multicast group, and it will be dispatched once
938  // received there. Note that we also unset the RELAY flag and set the
939  // NO_RELAY flag, to avoid forwarding the message anymore.
940  if (msg.has_flags() && msg.flags().relay())
941  {
942  // Unset the RELAY flag in the header and set the NO_RELAY.
943  msg.mutable_flags()->set_relay(false);
944  msg.mutable_flags()->set_no_relay(true);
945  this->SendMulticast(msg);
946 
947  // A unicast peer contacted me. I need to save its address for
948  // sending future messages in the future.
949  this->AddRelayAddress(_fromIp);
950  return;
951  }
952  // If we are receiving this discovery message via the multicast channel
953  // and the NO_RELAY flag is not set, we forward this message via unicast
954  // to all our relays. Note that this is the most common case, where we
955  // receive a regular multicast message and we forward it to any remote
956  // relays.
957  else if (!msg.has_flags() || !msg.flags().no_relay())
958  {
959  msg.mutable_flags()->set_relay(true);
960  this->SendUnicast(msg);
961  }
962 
963  bool isSenderLocal = (std::find(this->hostInterfaces.begin(),
964  this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
965  (_fromIp.find("127.") == 0);
966 
967  // Update timestamp and cache the callbacks.
968  DiscoveryCallback<Pub> connectCb;
969  DiscoveryCallback<Pub> disconnectCb;
970  DiscoveryCallback<Pub> registerCb;
971  DiscoveryCallback<Pub> unregisterCb;
972  std::function<void()> subscribersReqCb;
973  {
974  std::lock_guard<std::mutex> lock(this->mutex);
975  this->activity[recvPUuid] = std::chrono::steady_clock::now();
976  connectCb = this->connectionCb;
977  disconnectCb = this->disconnectionCb;
978  registerCb = this->registrationCb;
979  unregisterCb = this->unregistrationCb;
980  subscribersReqCb = this->subscribersCb;
981  }
982 
983  switch (msg.type())
984  {
985  case msgs::Discovery::ADVERTISE:
986  {
987  // Read the rest of the fields.
988  Pub publisher;
989  publisher.SetFromDiscovery(msg);
990 
991  // Check scope of the topic.
992  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
993  (publisher.Options().Scope() == Scope_t::HOST &&
994  !isSenderLocal))
995  {
996  return;
997  }
998 
999  // Register an advertised address for the topic.
1000  bool added;
1001  {
1002  std::lock_guard<std::mutex> lock(this->mutex);
1003  added = this->info.AddPublisher(publisher);
1004  }
1005 
1006  if (added && connectCb)
1007  {
1008  // Execute the client's callback.
1009  connectCb(publisher);
1010  }
1011 
1012  break;
1013  }
1014  case msgs::Discovery::SUBSCRIBE:
1015  {
1016  std::string recvTopic;
1017  // Read the topic information.
1018  if (msg.has_sub())
1019  {
1020  recvTopic = msg.sub().topic();
1021  }
1022  else
1023  {
1024  std::cerr << "Subscription discovery message is missing "
1025  << "Subscriber information.\n";
1026  break;
1027  }
1028 
1029  // Check if at least one of my nodes advertises the topic requested.
1030  Addresses_M<Pub> addresses;
1031  {
1032  std::lock_guard<std::mutex> lock(this->mutex);
1033  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
1034  {
1035  break;
1036  }
1037 
1038  if (!this->info.Publishers(recvTopic, addresses))
1039  break;
1040  }
1041 
1042  for (const auto &nodeInfo : addresses[this->pUuid])
1043  {
1044  // Check scope of the topic.
1045  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
1046  (nodeInfo.Options().Scope() == Scope_t::HOST &&
1047  !isSenderLocal))
1048  {
1049  continue;
1050  }
1051 
1052  // Answer an ADVERTISE message.
1053  this->SendMsg(DestinationType::ALL,
1054  msgs::Discovery::ADVERTISE, nodeInfo);
1055  }
1056 
1057  break;
1058  }
1059  case msgs::Discovery::SUBSCRIBERS_REQ:
1060  {
1061  if (subscribersReqCb)
1062  subscribersReqCb();
1063 
1064  break;
1065  }
1066  case msgs::Discovery::SUBSCRIBERS_REP:
1067  {
1068  // Save the remote subscriber.
1069  Pub publisher;
1070  publisher.SetFromDiscovery(msg);
1071 
1072  {
1073  std::lock_guard<std::mutex> lock(this->mutex);
1074  this->remoteSubscribers.AddPublisher(publisher);
1075  }
1076  break;
1077  }
1078  case msgs::Discovery::NEW_CONNECTION:
1079  {
1080  // Read the rest of the fields.
1081  Pub publisher;
1082  publisher.SetFromDiscovery(msg);
1083 
1084  if (registerCb)
1085  registerCb(publisher);
1086 
1087  break;
1088  }
1089  case msgs::Discovery::END_CONNECTION:
1090  {
1091  // Read the rest of the fields.
1092  Pub publisher;
1093  publisher.SetFromDiscovery(msg);
1094 
1095  if (unregisterCb)
1096  unregisterCb(publisher);
1097 
1098  break;
1099  }
1100  case msgs::Discovery::HEARTBEAT:
1101  {
1102  // The timestamp has already been updated.
1103  break;
1104  }
1105  case msgs::Discovery::BYE:
1106  {
1107  // Remove the activity entry for this publisher.
1108  {
1109  std::lock_guard<std::mutex> lock(this->mutex);
1110  this->activity.erase(recvPUuid);
1111  }
1112 
1113  if (disconnectCb)
1114  {
1115  Pub pub;
1116  pub.SetPUuid(recvPUuid);
1117  // Notify the new disconnection.
1118  disconnectCb(pub);
1119  }
1120 
1121  // Remove the address entry for this topic.
1122  {
1123  std::lock_guard<std::mutex> lock(this->mutex);
1124  this->info.DelPublishersByProc(recvPUuid);
1125  }
1126 
1127  break;
1128  }
1129  case msgs::Discovery::UNADVERTISE:
1130  {
1131  // Read the address.
1132  Pub publisher;
1133  publisher.SetFromDiscovery(msg);
1134 
1135  // Check scope of the topic.
1136  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1137  (publisher.Options().Scope() == Scope_t::HOST &&
1138  !isSenderLocal))
1139  {
1140  return;
1141  }
1142 
1143  if (disconnectCb)
1144  {
1145  // Notify the new disconnection.
1146  disconnectCb(publisher);
1147  }
1148 
1149  // Remove the address entry for this topic.
1150  {
1151  std::lock_guard<std::mutex> lock(this->mutex);
1152  this->info.DelPublisherByNode(publisher.Topic(),
1153  publisher.PUuid(), publisher.NUuid());
1154  }
1155 
1156  break;
1157  }
1158  default:
1159  {
1160  std::cerr << "Unknown message type [" << msg.type() << "].\n";
1161  break;
1162  }
1163  }
1164  }
1165 
1172  private: template<typename T>
1173  void SendMsg(const DestinationType &_destType,
1174  const msgs::Discovery::Type _type,
1175  const T &_pub) const
1176  {
1177  gz::msgs::Discovery discoveryMsg;
1178  discoveryMsg.set_version(this->Version());
1179  discoveryMsg.set_type(_type);
1180  discoveryMsg.set_process_uuid(this->pUuid);
1181  _pub.FillDiscovery(discoveryMsg);
1182 
1183  switch (_type)
1184  {
1185  case msgs::Discovery::ADVERTISE:
1186  case msgs::Discovery::UNADVERTISE:
1187  case msgs::Discovery::NEW_CONNECTION:
1188  case msgs::Discovery::END_CONNECTION:
1189  {
1190  _pub.FillDiscovery(discoveryMsg);
1191  break;
1192  }
1193  case msgs::Discovery::SUBSCRIBE:
1194  {
1195  discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1196  break;
1197  }
1198  case msgs::Discovery::HEARTBEAT:
1199  case msgs::Discovery::BYE:
1200  case msgs::Discovery::SUBSCRIBERS_REQ:
1201  case msgs::Discovery::SUBSCRIBERS_REP:
1202  break;
1203  default:
1204  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1205  << " type [" << _type << "]" << std::endl;
1206  return;
1207  }
1208 
1209  if (_destType == DestinationType::MULTICAST ||
1210  _destType == DestinationType::ALL)
1211  {
1212  this->SendMulticast(discoveryMsg);
1213  }
1214 
1215  // Send the discovery message to the unicast relays.
1216  if (_destType == DestinationType::UNICAST ||
1217  _destType == DestinationType::ALL)
1218  {
1219  // Set the RELAY flag in the header.
1220  discoveryMsg.mutable_flags()->set_relay(true);
1221  this->SendUnicast(discoveryMsg);
1222  }
1223 
1224  if (this->verbose)
1225  {
1226  std::cout << "\t* Sending " << msgs::ToString(_type)
1227  << " msg [" << _pub.Topic() << "]" << std::endl;
1228  }
1229  }
1230 
1233  private: void SendUnicast(const msgs::Discovery &_msg) const
1234  {
1235  uint16_t msgSize;
1236 
1237 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1238  size_t msgSizeFull = _msg.ByteSizeLong();
1239 #else
1240  int msgSizeFull = _msg.ByteSize();
1241 #endif
1242  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1243  {
1244  std::cerr << "Discovery message too large to send. Discovery won't "
1245  << "work. This shouldn't happen.\n";
1246  return;
1247  }
1248  msgSize = msgSizeFull;
1249 
1250  uint16_t totalSize = sizeof(msgSize) + msgSize;
1251  char *buffer = static_cast<char *>(new char[totalSize]);
1252  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1253 
1254  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1255  {
1256  // Send the discovery message to the unicast relays.
1257  for (const auto &sockAddr : this->relayAddrs)
1258  {
1259  errno = 0;
1260  auto sent = sendto(this->sockets.at(0),
1261  reinterpret_cast<const raw_type *>(
1262  reinterpret_cast<const unsigned char*>(buffer)),
1263  totalSize, 0,
1264  reinterpret_cast<const sockaddr *>(&sockAddr),
1265  sizeof(sockAddr));
1266 
1267  if (sent != totalSize)
1268  {
1269  std::cerr << "Exception sending a unicast message:" << std::endl;
1270  std::cerr << " Return value: " << sent << std::endl;
1271  std::cerr << " Error code: " << strerror(errno) << std::endl;
1272  break;
1273  }
1274  }
1275  }
1276  else
1277  {
1278  std::cerr << "Discovery::SendUnicast: Error serializing data."
1279  << std::endl;
1280  }
1281 
1282  delete [] buffer;
1283  }
1284 
1287  private: void SendMulticast(const msgs::Discovery &_msg) const
1288  {
1289  uint16_t msgSize;
1290 
1291 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1292  size_t msgSizeFull = _msg.ByteSizeLong();
1293 #else
1294  int msgSizeFull = _msg.ByteSize();
1295 #endif
1296  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1297  {
1298  std::cerr << "Discovery message too large to send. Discovery won't "
1299  << "work. This shouldn't happen.\n";
1300  return;
1301  }
1302 
1303  msgSize = msgSizeFull;
1304  uint16_t totalSize = sizeof(msgSize) + msgSize;
1305  char *buffer = static_cast<char *>(new char[totalSize]);
1306  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1307 
1308  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1309  {
1310  // Send the discovery message to the multicast group through all the
1311  // sockets.
1312  for (const auto &sock : this->Sockets())
1313  {
1314  errno = 0;
1315  if (sendto(sock, reinterpret_cast<const raw_type *>(
1316  reinterpret_cast<const unsigned char*>(buffer)),
1317  totalSize, 0,
1318  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1319  sizeof(*(this->MulticastAddr()))) != totalSize)
1320  {
1321  // Ignore EPERM and ENOBUFS errors.
1322  //
1323  // See issue #106
1324  //
1325  // Rationale drawn from:
1326  //
1327  // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1328  // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1329  if (errno != EPERM && errno != ENOBUFS)
1330  {
1331  std::cerr << "Exception sending a multicast message:"
1332  << strerror(errno) << std::endl;
1333  }
1334  break;
1335  }
1336  }
1337  }
1338  else
1339  {
1340  std::cerr << "Discovery::SendMulticast: Error serializing data."
1341  << std::endl;
1342  }
1343 
1344  delete [] buffer;
1345  }
1346 
1349  private: const std::vector<int> &Sockets() const
1350  {
1351  return this->sockets;
1352  }
1353 
1356  private: const sockaddr_in *MulticastAddr() const
1357  {
1358  return &this->mcastAddr;
1359  }
1360 
1363  private: uint8_t Version() const
1364  {
1365  static std::string gzStats;
1366  static int topicStats;
1367 
1368  if (env("GZ_TRANSPORT_TOPIC_STATISTICS", gzStats) && !gzStats.empty())
1369  {
1370  topicStats = (gzStats == "1");
1371  }
1372 
1373  return this->kWireVersion + (topicStats * 100);
1374  }
1375 
1380  private: bool RegisterNetIface(const std::string &_ip)
1381  {
1382  // Make a new socket for sending discovery information.
1383  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1384  if (sock < 0)
1385  {
1386  std::cerr << "Socket creation failed." << std::endl;
1387  return false;
1388  }
1389 
1390  // Socket option: IP_MULTICAST_IF.
1391  // This socket option needs to be applied to each socket used to send
1392  // data. This option selects the source interface for outgoing messages.
1393  struct in_addr ifAddr;
1394  ifAddr.s_addr = inet_addr(_ip.c_str());
1395  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1396  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1397  {
1398  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1399  << std::endl;
1400  return false;
1401  }
1402 
1403  this->sockets.push_back(sock);
1404 
1405  // Join the multicast group. We have to do it for each network interface
1406  // but we can do it on the same socket. We will use the socket at
1407  // position 0 for receiving multicast information.
1408  struct ip_mreq group;
1409  group.imr_multiaddr.s_addr =
1410  inet_addr(this->multicastGroup.c_str());
1411  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1412  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1413  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1414  {
1415  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1416  << std::endl;
1417  return false;
1418  }
1419 
1420  return true;
1421  }
1422 
1425  private: void AddRelayAddress(const std::string &_ip)
1426  {
1427  // Sanity check: Make sure that this IP address is not already saved.
1428  for (auto const &addr : this->relayAddrs)
1429  {
1430  if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
1431  return;
1432  }
1433 
1434  sockaddr_in addr;
1435  memset(&addr, 0, sizeof(addr));
1436  addr.sin_family = AF_INET;
1437  addr.sin_addr.s_addr = inet_addr(_ip.c_str());
1438  addr.sin_port = htons(static_cast<u_short>(this->port));
1439 
1440  this->relayAddrs.push_back(addr);
1441  }
1442 
1446  private: static const unsigned int kDefActivityInterval = 100;
1447 
1451  private: static const unsigned int kDefHeartbeatInterval = 1000;
1452 
1456  private: static const unsigned int kDefSilenceInterval = 3000;
1457 
1459  private: std::string multicastGroup;
1460 
1462  private: const int kTimeout = 250;
1463 
1465  private: static const uint16_t kMaxRcvStr =
1467 
1470  private: static const uint8_t kWireVersion = 10;
1471 
1473  private: int port;
1474 
1476  private: std::string hostAddr;
1477 
1479  private: std::vector<std::string> hostInterfaces;
1480 
1482  private: std::string pUuid;
1483 
1487  private: unsigned int silenceInterval;
1488 
1492  private: unsigned int activityInterval;
1493 
1497  private: unsigned int heartbeatInterval;
1498 
1500  private: DiscoveryCallback<Pub> connectionCb;
1501 
1503  private: DiscoveryCallback<Pub> disconnectionCb;
1504 
1506  private: DiscoveryCallback<Pub> registrationCb;
1507 
1509  private: DiscoveryCallback<Pub> unregistrationCb;
1510 
1512  private: std::function<void()> subscribersCb;
1513 
1515  private: TopicStorage<Pub> info;
1516 
1518  private: TopicStorage<Pub> remoteSubscribers;
1519 
1525 
1527  private: bool verbose;
1528 
1530  private: std::vector<int> sockets;
1531 
1533  private: sockaddr_in mcastAddr;
1534 
1536  private: std::vector<sockaddr_in> relayAddrs;
1537 
1539  private: mutable std::mutex mutex;
1540 
1542  private: std::thread threadReception;
1543 
1545  private: Timestamp timeNextHeartbeat;
1546 
1548  private: Timestamp timeNextActivity;
1549 
1551  private: std::mutex exitMutex;
1552 
1557  private: bool initialized;
1558 
1560  private: unsigned int numHeartbeatsUninitialized;
1561 
1563  private: mutable std::condition_variable initializedCv;
1564 
1566  private: bool exit;
1567 
1569  private: bool enabled;
1570  };
1571 
1575 
1579  }
1580  }
1581 }
1582 
1583 #endif