Gazebo Transport

API Reference

13.4.1
Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GZ_TRANSPORT_DISCOVERY_HH_
19 #define GZ_TRANSPORT_DISCOVERY_HH_
20 #include <errno.h>
21 #include <string.h>
22 
23 #ifdef _WIN32
24  // For socket(), connect(), send(), and recv().
25  #include <Winsock2.h>
26  #include <Ws2def.h>
27  #include <Ws2ipdef.h>
28  #include <Ws2tcpip.h>
29  // Type used for raw data on this platform.
30  using raw_type = char;
31 #else
32  // For data types
33  #include <sys/types.h>
34  // For socket(), connect(), send(), and recv()
35  #include <sys/socket.h>
36  // For gethostbyname()
37  #include <netdb.h>
38  // For inet_addr()
39  #include <arpa/inet.h>
40  // For close()
41  #include <unistd.h>
42  // For sockaddr_in
43  #include <netinet/in.h>
44  // Type used for raw data on this platform
45  using raw_type = void;
46 #endif
47 
48 #ifdef _WIN32
49  #pragma warning(push, 0)
50 #endif
51 #ifdef _WIN32
52  #pragma warning(pop)
53  // Suppress "decorated name length exceed" warning in STL.
54  #pragma warning(disable: 4503)
55  // Suppress "depreted API warnings" in WINSOCK.
56  #pragma warning(disable: 4996)
57 #endif
58 
59 #include <gz/msgs/discovery.pb.h>
60 
61 #include <algorithm>
62 #include <condition_variable>
63 #include <limits>
64 #include <map>
65 #include <memory>
66 #include <mutex>
67 #include <string>
68 #include <thread>
69 #include <vector>
70 
71 #include <gz/msgs/Utility.hh>
72 
73 #include "gz/transport/config.hh"
74 #include "gz/transport/Export.hh"
75 #include "gz/transport/Helpers.hh"
76 #include "gz/transport/NetUtils.hh"
80 
81 namespace gz
82 {
83  namespace transport
84  {
85  // Inline bracket to help doxygen filtering.
86  inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
88  enum class DestinationType
89  {
91  UNICAST,
93  MULTICAST,
95  ALL
96  };
97 
98  //
104  bool GZ_TRANSPORT_VISIBLE pollSockets(
105  const std::vector<int> &_sockets,
106  const int _timeout);
107 
116  template<typename Pub>
117  class Discovery
118  {
125  public: Discovery(const std::string &_pUuid,
126  const std::string &_ip,
127  const int _port,
128  const bool _verbose = false)
129  : multicastGroup(_ip),
130  port(_port),
131  hostAddr(determineHost()),
132  pUuid(_pUuid),
133  silenceInterval(kDefSilenceInterval),
134  activityInterval(kDefActivityInterval),
135  heartbeatInterval(kDefHeartbeatInterval),
136  connectionCb(nullptr),
137  disconnectionCb(nullptr),
138  verbose(_verbose),
139  initialized(false),
140  numHeartbeatsUninitialized(0),
141  exit(false),
142  enabled(false)
143  {
144  std::string gzIp;
145  if (env("GZ_IP", gzIp) && !gzIp.empty())
146  {
147  this->hostInterfaces = {gzIp};
148  }
149  else
150  {
151  // Get the list of network interfaces in this host.
152  this->hostInterfaces = determineInterfaces();
153  }
154 
155 #ifdef _WIN32
156  WORD wVersionRequested;
157  WSADATA wsaData;
158 
159  // Request WinSock v2.2.
160  wVersionRequested = MAKEWORD(2, 2);
161  // Load WinSock DLL.
162  if (WSAStartup(wVersionRequested, &wsaData) != 0)
163  {
164  std::cerr << "Unable to load WinSock DLL" << std::endl;
165  return;
166  }
167 #endif
168  for (const auto &netIface : this->hostInterfaces)
169  {
170  auto succeed = this->RegisterNetIface(netIface);
171 
172  // If the IP address that we're selecting as the main IP address of
173  // the host is invalid, we change it to 127.0.0.1 .
174  // This is probably because GZ_IP is set to a wrong value.
175  if (netIface == this->hostAddr && !succeed)
176  {
177  this->RegisterNetIface("127.0.0.1");
178  std::cerr << "Did you set the environment variable GZ_IP with a "
179  << "correct IP address? " << std::endl
180  << " [" << netIface << "] seems an invalid local IP "
181  << "address." << std::endl
182  << " Using 127.0.0.1 as hostname." << std::endl;
183  this->hostAddr = "127.0.0.1";
184  }
185  }
186 
187  // Socket option: SO_REUSEADDR. This options is used only for receiving
188  // data. We can reuse the same socket for receiving multicast data from
189  // multiple interfaces. We will use the socket at position 0 for
190  // receiving data.
191  int reuseAddr = 1;
192  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
193  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
194  {
195  std::cerr << "Error setting socket option (SO_REUSEADDR)."
196  << std::endl;
197  return;
198  }
199 
200 #ifdef SO_REUSEPORT
201  // Socket option: SO_REUSEPORT. This options is used only for receiving
202  // data. We can reuse the same socket for receiving multicast data from
203  // multiple interfaces. We will use the socket at position 0 for
204  // receiving data.
205  int reusePort = 1;
206  // cppcheck-suppress ConfigurationNotChecked
207  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
208  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
209  {
210  std::cerr << "Error setting socket option (SO_REUSEPORT)."
211  << std::endl;
212  return;
213  }
214 #endif
215  // Bind the first socket to the discovery port.
216  sockaddr_in localAddr;
217  memset(&localAddr, 0, sizeof(localAddr));
218  localAddr.sin_family = AF_INET;
219  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
220  localAddr.sin_port = htons(static_cast<u_short>(this->port));
221 
222  if (bind(this->sockets.at(0),
223  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
224  {
225  std::cerr << "Binding to a local port failed." << std::endl;
226  return;
227  }
228 
229  // Set 'mcastAddr' to the multicast discovery group.
230  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
231  this->mcastAddr.sin_family = AF_INET;
232  this->mcastAddr.sin_addr.s_addr =
233  inet_addr(this->multicastGroup.c_str());
234  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
235 
237  std::string gzRelay;
238  if (env("GZ_RELAY", gzRelay) && !gzRelay.empty())
239  {
240  relays = transport::split(gzRelay, ':');
241  }
242 
243  // Register all unicast relays.
244  for (auto const &relayAddr : relays)
245  this->AddRelayAddress(relayAddr);
246 
247  if (this->verbose)
248  this->PrintCurrentState();
249  }
250 
252  public: virtual ~Discovery()
253  {
254  // Tell the service thread to terminate.
255  this->exitMutex.lock();
256  this->exit = true;
257  this->exitMutex.unlock();
258 
259  // Wait for the service threads to finish before exit.
260  if (this->threadReception.joinable())
261  this->threadReception.join();
262 
263  // Broadcast a BYE message to trigger the remote cancellation of
264  // all our advertised topics.
265  this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
266  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
267 
268  // Close sockets.
269  for (const auto &sock : this->sockets)
270  {
271 #ifdef _WIN32
272  closesocket(sock);
273  WSACleanup();
274 #else
275  close(sock);
276 #endif
277  }
278  }
279 
283  public: void Start()
284  {
285  {
286  std::lock_guard<std::mutex> lock(this->mutex);
287 
288  // The service is already running.
289  if (this->enabled)
290  return;
291 
292  this->enabled = true;
293  }
294 
295  auto now = std::chrono::steady_clock::now();
296  this->timeNextHeartbeat = now;
297  this->timeNextActivity = now;
298 
299  // Start the thread that receives discovery information.
300  this->threadReception = std::thread(&Discovery::RecvMessages, this);
301  }
302 
307  public: bool Advertise(const Pub &_publisher)
308  {
310 
311  {
312  std::lock_guard<std::mutex> lock(this->mutex);
313 
314  if (!this->enabled)
315  return false;
316 
317  // Add the addressing information (local publisher).
318  if (!this->info.AddPublisher(_publisher))
319  return false;
320 
321  cb = this->connectionCb;
322  }
323 
324  if (cb)
325  cb(_publisher);
326 
327  // Only advertise a message outside this process if the scope
328  // is not 'Process'
329  if (_publisher.Options().Scope() != Scope_t::PROCESS)
330  this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
331  _publisher);
332 
333  return true;
334  }
335 
346  public: bool Discover(const std::string &_topic) const
347  {
349  bool found;
350  Addresses_M<Pub> addresses;
351 
352  {
353  std::lock_guard<std::mutex> lock(this->mutex);
354 
355  if (!this->enabled)
356  return false;
357 
358  cb = this->connectionCb;
359  }
360 
361  Pub pub;
362  pub.SetTopic(_topic);
363  pub.SetPUuid(this->pUuid);
364 
365  // Send a discovery request.
366  this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
367 
368  {
369  std::lock_guard<std::mutex> lock(this->mutex);
370  found = this->info.Publishers(_topic, addresses);
371  }
372 
373  if (found)
374  {
375  // I already have information about this topic.
376  for (const auto &proc : addresses)
377  {
378  for (const auto &node : proc.second)
379  {
380  if (cb)
381  {
382  // Execute the user's callback for a service request. Notice
383  // that we only execute one callback for preventing receive
384  // multiple service responses for a single request.
385  cb(node);
386  }
387  }
388  }
389  }
390 
391  return true;
392  }
393 
396  public: void SendSubscribersRep(const MessagePublisher &_pub) const
397  {
398  this->SendMsg(
399  DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REP, _pub);
400  }
401 
404  public: void Register(const MessagePublisher &_pub) const
405  {
406  this->SendMsg(
407  DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
408  }
409 
412  public: void Unregister(const MessagePublisher &_pub) const
413  {
414  this->SendMsg(
415  DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
416  }
417 
420  public: const TopicStorage<Pub> &Info() const
421  {
422  std::lock_guard<std::mutex> lock(this->mutex);
423  return this->info;
424  }
425 
430  public: bool Publishers(const std::string &_topic,
431  Addresses_M<Pub> &_publishers) const
432  {
433  std::lock_guard<std::mutex> lock(this->mutex);
434  return this->info.Publishers(_topic, _publishers);
435  }
436 
441  public: bool RemoteSubscribers(const std::string &_topic,
442  Addresses_M<Pub> &_subscribers) const
443  {
444  std::lock_guard<std::mutex> lock(this->mutex);
445  return this->remoteSubscribers.Publishers(_topic, _subscribers);
446  }
447 
455  public: bool Unadvertise(const std::string &_topic,
456  const std::string &_nUuid)
457  {
458  Pub inf;
459  {
460  std::lock_guard<std::mutex> lock(this->mutex);
461 
462  if (!this->enabled)
463  return false;
464 
465  // Don't do anything if the topic is not advertised by any of my nodes
466  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
467  return true;
468 
469  // Remove the topic information.
470  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
471  }
472 
473  // Only unadvertise a message outside this process if the scope
474  // is not 'Process'.
475  if (inf.Options().Scope() != Scope_t::PROCESS)
476  {
477  this->SendMsg(DestinationType::ALL,
478  msgs::Discovery::UNADVERTISE, inf);
479  }
480 
481  return true;
482  }
483 
486  public: std::string HostAddr() const
487  {
488  std::lock_guard<std::mutex> lock(this->mutex);
489  return this->hostAddr;
490  }
491 
496  public: unsigned int ActivityInterval() const
497  {
498  std::lock_guard<std::mutex> lock(this->mutex);
499  return this->activityInterval;
500  }
501 
507  public: unsigned int HeartbeatInterval() const
508  {
509  std::lock_guard<std::mutex> lock(this->mutex);
510  return this->heartbeatInterval;
511  }
512 
517  public: unsigned int SilenceInterval() const
518  {
519  std::lock_guard<std::mutex> lock(this->mutex);
520  return this->silenceInterval;
521  }
522 
526  public: void SetActivityInterval(const unsigned int _ms)
527  {
528  std::lock_guard<std::mutex> lock(this->mutex);
529  this->activityInterval = _ms;
530  }
531 
535  public: void SetHeartbeatInterval(const unsigned int _ms)
536  {
537  std::lock_guard<std::mutex> lock(this->mutex);
538  this->heartbeatInterval = _ms;
539  }
540 
544  public: void SetSilenceInterval(const unsigned int _ms)
545  {
546  std::lock_guard<std::mutex> lock(this->mutex);
547  this->silenceInterval = _ms;
548  }
549 
554  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
555  {
556  std::lock_guard<std::mutex> lock(this->mutex);
557  this->connectionCb = _cb;
558  }
559 
564  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
565  {
566  std::lock_guard<std::mutex> lock(this->mutex);
567  this->disconnectionCb = _cb;
568  }
569 
573  public: void RegistrationsCb(const DiscoveryCallback<Pub> &_cb)
574  {
575  std::lock_guard<std::mutex> lock(this->mutex);
576  this->registrationCb = _cb;
577  }
578 
582  public: void UnregistrationsCb(const DiscoveryCallback<Pub> &_cb)
583  {
584  std::lock_guard<std::mutex> lock(this->mutex);
585  this->unregistrationCb = _cb;
586  }
587 
591  public: void SubscribersCb(const std::function<void()> &_cb)
592  {
593  std::lock_guard<std::mutex> lock(this->mutex);
594  this->subscribersCb = _cb;
595  }
596 
598  public: void PrintCurrentState() const
599  {
600  std::lock_guard<std::mutex> lock(this->mutex);
601 
602  std::cout << "---------------" << std::endl;
603  std::cout << std::boolalpha << "Enabled: "
604  << this->enabled << std::endl;
605  std::cout << "Discovery state" << std::endl;
606  std::cout << "\tUUID: " << this->pUuid << std::endl;
607  std::cout << "Settings" << std::endl;
608  std::cout << "\tActivity: " << this->activityInterval
609  << " ms." << std::endl;
610  std::cout << "\tHeartbeat: " << this->heartbeatInterval
611  << "ms." << std::endl;
612  std::cout << "\tSilence: " << this->silenceInterval
613  << " ms." << std::endl;
614  std::cout << "Known information:" << std::endl;
615  this->info.Print();
616 
617  // Used to calculate the elapsed time.
619 
620  std::cout << "Activity" << std::endl;
621  if (this->activity.empty())
622  std::cout << "\t<empty>" << std::endl;
623  else
624  {
625  for (auto &proc : this->activity)
626  {
627  // Elapsed time since the last update from this publisher.
628  std::chrono::duration<double> elapsed = now - proc.second;
629 
630  std::cout << "\t" << proc.first << std::endl;
631  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
632  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
633  << std::endl;
634  }
635  }
636  std::cout << "---------------" << std::endl;
637  }
638 
642  public: void TopicList(std::vector<std::string> &_topics)
643  {
644  {
645  std::lock_guard<std::mutex> lock(this->mutex);
646  this->remoteSubscribers.Clear();
647  }
648 
649  // Request the list of subscribers.
650  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
651  this->SendMsg(
652  DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REQ, pub);
653 
654  this->WaitForInit();
655  std::lock_guard<std::mutex> lock(this->mutex);
656  this->info.TopicList(_topics);
657 
658  std::vector<std::string> remoteSubs;
659  this->remoteSubscribers.TopicList(remoteSubs);
660 
661  // Add the remote subscribers
662  for (auto const &t : remoteSubs)
663  {
664  if (std::find(_topics.begin(), _topics.end(), t) == _topics.end())
665  {
666  _topics.push_back(t);
667  }
668  }
669  }
670 
673  public: void WaitForInit() const
674  {
675  std::unique_lock<std::mutex> lk(this->mutex);
676 
677  if (!this->initialized)
678  {
679  this->initializedCv.wait(lk, [this]{return this->initialized;});
680  }
681  }
682 
686  private: void UpdateActivity()
687  {
688  // The UUIDs of the processes that have expired.
690 
691  // A copy of the disconnection callback.
692  DiscoveryCallback<Pub> disconnectCb;
693 
695 
696  {
697  std::lock_guard<std::mutex> lock(this->mutex);
698 
699  if (now < this->timeNextActivity)
700  return;
701 
702  disconnectCb = this->disconnectionCb;
703 
704  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
705  {
706  // Elapsed time since the last update from this publisher.
707  auto elapsed = now - it->second;
708 
709  // This publisher has expired.
710  if (std::chrono::duration_cast<std::chrono::milliseconds>
711  (elapsed).count() > this->silenceInterval)
712  {
713  // Remove all the info entries for this process UUID.
714  this->info.DelPublishersByProc(it->first);
715 
716  uuids.push_back(it->first);
717 
718  // Remove the activity entry.
719  this->activity.erase(it++);
720  }
721  else
722  ++it;
723  }
724 
725  this->timeNextActivity = std::chrono::steady_clock::now() +
726  std::chrono::milliseconds(this->activityInterval);
727  }
728 
729  if (!disconnectCb)
730  return;
731 
732  // Notify without topic information. This is useful to inform the
733  // client that a remote node is gone, even if we were not
734  // interested in its topics.
735  for (auto const &uuid : uuids)
736  {
737  Pub publisher;
738  publisher.SetPUuid(uuid);
739  disconnectCb(publisher);
740  }
741  }
742 
745  public: void AddRelayAddress(const std::string &_ip)
746  {
747  std::lock_guard<std::mutex> lock(this->mutex);
748  // Sanity check: Make sure that this IP address is not already saved.
749  for (auto const &addr : this->relayAddrs)
750  {
751  if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
752  return;
753  }
754 
755  sockaddr_in addr;
756  memset(&addr, 0, sizeof(addr));
757  addr.sin_family = AF_INET;
758  addr.sin_addr.s_addr = inet_addr(_ip.c_str());
759  addr.sin_port = htons(static_cast<u_short>(this->port));
760 
761  this->relayAddrs.push_back(addr);
762  }
763 
764  // \brief Gets this instance's relay addresses.
765  // \return The list of relay addresses.
767  {
769 
770  std::lock_guard<std::mutex> lock(this->mutex);
771 
772  for (auto const &addr : this->relayAddrs) {
773  result.push_back(inet_ntoa(addr.sin_addr));
774  }
775 
776  return result;
777  }
778 
780  private: void UpdateHeartbeat()
781  {
783 
784  {
785  std::lock_guard<std::mutex> lock(this->mutex);
786 
787  if (now < this->timeNextHeartbeat)
788  return;
789  }
790 
791  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
792  this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
793 
795  {
796  std::lock_guard<std::mutex> lock(this->mutex);
797 
798  // Re-advertise topics that are advertised inside this process.
799  this->info.PublishersByProc(this->pUuid, nodes);
800  }
801 
802  for (const auto &topic : nodes)
803  {
804  for (const auto &node : topic.second)
805  {
806  this->SendMsg(DestinationType::ALL,
807  msgs::Discovery::ADVERTISE, node);
808  }
809  }
810 
811  {
812  std::lock_guard<std::mutex> lock(this->mutex);
813  if (!this->initialized)
814  {
815  if (this->numHeartbeatsUninitialized == 2u)
816  {
817  // We consider discovery initialized after two heartbeat cycles.
818  this->initialized = true;
819 
820  // Notify anyone waiting for the initialization phase to finish.
821  this->initializedCv.notify_all();
822  }
823  ++this->numHeartbeatsUninitialized;
824  }
825 
826  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
827  std::chrono::milliseconds(this->heartbeatInterval);
828  }
829  }
830 
840  private: int NextTimeout() const
841  {
842  auto now = std::chrono::steady_clock::now();
843  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
844  auto timeUntilNextActivity = this->timeNextActivity - now;
845 
846  int t = static_cast<int>(
847  std::chrono::duration_cast<std::chrono::milliseconds>
848  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
849  int t2 = std::min(t, this->kTimeout);
850  return std::max(t2, 0);
851  }
852 
854  private: void RecvMessages()
855  {
856  bool timeToExit = false;
857  while (!timeToExit)
858  {
859  // Calculate the timeout.
860  int timeout = this->NextTimeout();
861 
862  if (pollSockets(this->sockets, timeout))
863  {
864  this->RecvDiscoveryUpdate();
865 
866  if (this->verbose)
867  this->PrintCurrentState();
868  }
869 
870  this->UpdateHeartbeat();
871  this->UpdateActivity();
872 
873  // Is it time to exit?
874  {
875  std::lock_guard<std::mutex> lock(this->exitMutex);
876  if (this->exit)
877  timeToExit = true;
878  }
879  }
880  }
881 
883  private: void RecvDiscoveryUpdate()
884  {
885  char rcvStr[Discovery::kMaxRcvStr];
886  sockaddr_in clntAddr;
887  socklen_t addrLen = sizeof(clntAddr);
888 
889  int64_t received = recvfrom(this->sockets.at(0),
890  reinterpret_cast<raw_type *>(rcvStr),
891  this->kMaxRcvStr, 0,
892  reinterpret_cast<sockaddr *>(&clntAddr),
893  reinterpret_cast<socklen_t *>(&addrLen));
894  if (received > 0)
895  {
896  uint16_t len = 0;
897  memcpy(&len, &rcvStr[0], sizeof(len));
898 
899  // Gazebo Transport delimits each discovery message with a
900  // frame_delimiter that contains byte size information.
901  // A discovery message has the form:
902  //
903  // <frame_delimiter><frame_body>
904  //
905  // Gazebo Transport version < 8 sends a frame delimiter that
906  // contains the value of sizeof(frame_delimiter)
907  // + sizeof(frame_body). In other words, the frame_delimiter
908  // contains a value that represents the total size of the
909  // frame_body and frame_delimiter in bytes.
910  //
911  // Gazebo Transport version >= 8 sends a frame_delimiter
912  // that contains the value of sizeof(frame_body). In other
913  // words, the frame_delimiter contains a value that represents
914  // the total size of only the frame_body.
915  //
916  // It is possible that two incompatible versions of Gazebo
917  // Transport exist on the same network. If we receive an
918  // unexpected size, then we ignore the message.
919 
920  // If-condition for version 8+
921  if (len + sizeof(len) == static_cast<uint16_t>(received))
922  {
923  std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
924  uint16_t srcPort = ntohs(clntAddr.sin_port);
925 
926  if (this->verbose)
927  {
928  std::cout << "\nReceived discovery update from "
929  << srcAddr << ": " << srcPort << std::endl;
930  }
931 
932  this->DispatchDiscoveryMsg(srcAddr, rcvStr + sizeof(len), len);
933  }
934  }
935  else if (received < 0)
936  {
937  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
938  << std::endl;
939  }
940  }
941 
946  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
947  char *_msg, uint16_t _len)
948  {
949  gz::msgs::Discovery msg;
950 
951  // Parse the message, and return if parsing failed. Parsing could
952  // fail when another discovery node is publishing messages using an
953  // older (or newer) format.
954  if (!msg.ParseFromArray(_msg, _len))
955  return;
956 
957  // Discard the message if the wire protocol is different than mine.
958  if (this->Version() != msg.version())
959  return;
960 
961  std::string recvPUuid = msg.process_uuid();
962 
963  // Discard our own discovery messages.
964  if (recvPUuid == this->pUuid)
965  return;
966 
967  // Forwarding summary:
968  // - From a unicast peer -> to multicast group (with NO_RELAY flag).
969  // - From multicast group -> to unicast peers (with RELAY flag).
970 
971  // If the RELAY flag is set, this discovery message is coming via a
972  // unicast transmission. In this case, we don't process it, we just
973  // forward it to the multicast group, and it will be dispatched once
974  // received there. Note that we also unset the RELAY flag and set the
975  // NO_RELAY flag, to avoid forwarding the message anymore.
976  if (msg.has_flags() && msg.flags().relay())
977  {
978  // Unset the RELAY flag in the header and set the NO_RELAY.
979  msg.mutable_flags()->set_relay(false);
980  msg.mutable_flags()->set_no_relay(true);
981  this->SendMulticast(msg);
982 
983  // A unicast peer contacted me. I need to save its address for
984  // sending future messages in the future.
985  this->AddRelayAddress(_fromIp);
986  return;
987  }
988  // If we are receiving this discovery message via the multicast channel
989  // and the NO_RELAY flag is not set, we forward this message via unicast
990  // to all our relays. Note that this is the most common case, where we
991  // receive a regular multicast message and we forward it to any remote
992  // relays.
993  else if (!msg.has_flags() || !msg.flags().no_relay())
994  {
995  msg.mutable_flags()->set_relay(true);
996  this->SendUnicast(msg);
997  }
998 
999  bool isSenderLocal = (std::find(this->hostInterfaces.begin(),
1000  this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
1001  (_fromIp.find("127.") == 0);
1002 
1003  // Update timestamp and cache the callbacks.
1004  DiscoveryCallback<Pub> connectCb;
1005  DiscoveryCallback<Pub> disconnectCb;
1006  DiscoveryCallback<Pub> registerCb;
1007  DiscoveryCallback<Pub> unregisterCb;
1008  std::function<void()> subscribersReqCb;
1009  {
1010  std::lock_guard<std::mutex> lock(this->mutex);
1011  this->activity[recvPUuid] = std::chrono::steady_clock::now();
1012  connectCb = this->connectionCb;
1013  disconnectCb = this->disconnectionCb;
1014  registerCb = this->registrationCb;
1015  unregisterCb = this->unregistrationCb;
1016  subscribersReqCb = this->subscribersCb;
1017  }
1018 
1019  switch (msg.type())
1020  {
1021  case msgs::Discovery::ADVERTISE:
1022  {
1023  // Read the rest of the fields.
1024  Pub publisher;
1025  publisher.SetFromDiscovery(msg);
1026 
1027  // Check scope of the topic.
1028  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1029  (publisher.Options().Scope() == Scope_t::HOST &&
1030  !isSenderLocal))
1031  {
1032  return;
1033  }
1034 
1035  // Register an advertised address for the topic.
1036  bool added;
1037  {
1038  std::lock_guard<std::mutex> lock(this->mutex);
1039  added = this->info.AddPublisher(publisher);
1040  }
1041 
1042  if (added && connectCb)
1043  {
1044  // Execute the client's callback.
1045  connectCb(publisher);
1046  }
1047 
1048  break;
1049  }
1050  case msgs::Discovery::SUBSCRIBE:
1051  {
1052  std::string recvTopic;
1053  // Read the topic information.
1054  if (msg.has_sub())
1055  {
1056  recvTopic = msg.sub().topic();
1057  }
1058  else
1059  {
1060  std::cerr << "Subscription discovery message is missing "
1061  << "Subscriber information.\n";
1062  break;
1063  }
1064 
1065  // Check if at least one of my nodes advertises the topic requested.
1066  Addresses_M<Pub> addresses;
1067  {
1068  std::lock_guard<std::mutex> lock(this->mutex);
1069  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
1070  {
1071  break;
1072  }
1073 
1074  if (!this->info.Publishers(recvTopic, addresses))
1075  break;
1076  }
1077 
1078  for (const auto &nodeInfo : addresses[this->pUuid])
1079  {
1080  // Check scope of the topic.
1081  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
1082  (nodeInfo.Options().Scope() == Scope_t::HOST &&
1083  !isSenderLocal))
1084  {
1085  continue;
1086  }
1087 
1088  // Answer an ADVERTISE message.
1089  this->SendMsg(DestinationType::ALL,
1090  msgs::Discovery::ADVERTISE, nodeInfo);
1091  }
1092 
1093  break;
1094  }
1095  case msgs::Discovery::SUBSCRIBERS_REQ:
1096  {
1097  if (subscribersReqCb)
1098  subscribersReqCb();
1099 
1100  break;
1101  }
1102  case msgs::Discovery::SUBSCRIBERS_REP:
1103  {
1104  // Save the remote subscriber.
1105  Pub publisher;
1106  publisher.SetFromDiscovery(msg);
1107 
1108  {
1109  std::lock_guard<std::mutex> lock(this->mutex);
1110  this->remoteSubscribers.AddPublisher(publisher);
1111  }
1112  break;
1113  }
1114  case msgs::Discovery::NEW_CONNECTION:
1115  {
1116  // Read the rest of the fields.
1117  Pub publisher;
1118  publisher.SetFromDiscovery(msg);
1119 
1120  if (registerCb)
1121  registerCb(publisher);
1122 
1123  break;
1124  }
1125  case msgs::Discovery::END_CONNECTION:
1126  {
1127  // Read the rest of the fields.
1128  Pub publisher;
1129  publisher.SetFromDiscovery(msg);
1130 
1131  if (unregisterCb)
1132  unregisterCb(publisher);
1133 
1134  break;
1135  }
1136  case msgs::Discovery::HEARTBEAT:
1137  {
1138  // The timestamp has already been updated.
1139  break;
1140  }
1141  case msgs::Discovery::BYE:
1142  {
1143  // Remove the activity entry for this publisher.
1144  {
1145  std::lock_guard<std::mutex> lock(this->mutex);
1146  this->activity.erase(recvPUuid);
1147  }
1148 
1149  if (disconnectCb)
1150  {
1151  Pub pub;
1152  pub.SetPUuid(recvPUuid);
1153  // Notify the new disconnection.
1154  disconnectCb(pub);
1155  }
1156 
1157  // Remove the address entry for this topic.
1158  {
1159  std::lock_guard<std::mutex> lock(this->mutex);
1160  this->info.DelPublishersByProc(recvPUuid);
1161  }
1162 
1163  break;
1164  }
1165  case msgs::Discovery::UNADVERTISE:
1166  {
1167  // Read the address.
1168  Pub publisher;
1169  publisher.SetFromDiscovery(msg);
1170 
1171  // Check scope of the topic.
1172  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1173  (publisher.Options().Scope() == Scope_t::HOST &&
1174  !isSenderLocal))
1175  {
1176  return;
1177  }
1178 
1179  if (disconnectCb)
1180  {
1181  // Notify the new disconnection.
1182  disconnectCb(publisher);
1183  }
1184 
1185  // Remove the address entry for this topic.
1186  {
1187  std::lock_guard<std::mutex> lock(this->mutex);
1188  this->info.DelPublisherByNode(publisher.Topic(),
1189  publisher.PUuid(), publisher.NUuid());
1190  }
1191 
1192  break;
1193  }
1194  default:
1195  {
1196  std::cerr << "Unknown message type [" << msg.type() << "].\n";
1197  break;
1198  }
1199  }
1200  }
1201 
1208  private: template<typename T>
1209  void SendMsg(const DestinationType &_destType,
1210  const msgs::Discovery::Type _type,
1211  const T &_pub) const
1212  {
1213  gz::msgs::Discovery discoveryMsg;
1214  discoveryMsg.set_version(this->Version());
1215  discoveryMsg.set_type(_type);
1216  discoveryMsg.set_process_uuid(this->pUuid);
1217  _pub.FillDiscovery(discoveryMsg);
1218 
1219  switch (_type)
1220  {
1221  case msgs::Discovery::ADVERTISE:
1222  case msgs::Discovery::UNADVERTISE:
1223  case msgs::Discovery::NEW_CONNECTION:
1224  case msgs::Discovery::END_CONNECTION:
1225  {
1226  _pub.FillDiscovery(discoveryMsg);
1227  break;
1228  }
1229  case msgs::Discovery::SUBSCRIBE:
1230  {
1231  discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1232  break;
1233  }
1234  case msgs::Discovery::HEARTBEAT:
1235  case msgs::Discovery::BYE:
1236  case msgs::Discovery::SUBSCRIBERS_REQ:
1237  case msgs::Discovery::SUBSCRIBERS_REP:
1238  break;
1239  default:
1240  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1241  << " type [" << _type << "]" << std::endl;
1242  return;
1243  }
1244 
1245  if (_destType == DestinationType::MULTICAST ||
1246  _destType == DestinationType::ALL)
1247  {
1248  this->SendMulticast(discoveryMsg);
1249  }
1250 
1251  // Send the discovery message to the unicast relays.
1252  if (_destType == DestinationType::UNICAST ||
1253  _destType == DestinationType::ALL)
1254  {
1255  // Set the RELAY flag in the header.
1256  discoveryMsg.mutable_flags()->set_relay(true);
1257  this->SendUnicast(discoveryMsg);
1258  }
1259 
1260  if (this->verbose)
1261  {
1262  std::cout << "\t* Sending " << msgs::ToString(_type)
1263  << " msg [" << _pub.Topic() << "]" << std::endl;
1264  }
1265  }
1266 
1269  private: void SendUnicast(const msgs::Discovery &_msg) const
1270  {
1271  uint16_t msgSize;
1272 
1273 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1274  size_t msgSizeFull = _msg.ByteSizeLong();
1275 #else
1276  int msgSizeFull = _msg.ByteSize();
1277 #endif
1278  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1279  {
1280  std::cerr << "Discovery message too large to send. Discovery won't "
1281  << "work. This shouldn't happen.\n";
1282  return;
1283  }
1284  msgSize = msgSizeFull;
1285 
1286  uint16_t totalSize = sizeof(msgSize) + msgSize;
1287  char *buffer = static_cast<char *>(new char[totalSize]);
1288  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1289 
1290  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1291  {
1292  // Send the discovery message to the unicast relays.
1293  std::lock_guard<std::mutex> lock(this->mutex);
1294 
1295  for (const auto &sockAddr : this->relayAddrs)
1296  {
1297  errno = 0;
1298  auto sent = sendto(this->sockets.at(0),
1299  reinterpret_cast<const raw_type *>(
1300  reinterpret_cast<const unsigned char*>(buffer)),
1301  totalSize, 0,
1302  reinterpret_cast<const sockaddr *>(&sockAddr),
1303  sizeof(sockAddr));
1304 
1305  if (sent != totalSize)
1306  {
1307  std::cerr << "Exception sending a unicast message:" << std::endl;
1308  std::cerr << " Return value: " << sent << std::endl;
1309  std::cerr << " Error code: " << strerror(errno) << std::endl;
1310  break;
1311  }
1312  }
1313  }
1314  else
1315  {
1316  std::cerr << "Discovery::SendUnicast: Error serializing data."
1317  << std::endl;
1318  }
1319 
1320  delete [] buffer;
1321  }
1322 
1325  private: void SendMulticast(const msgs::Discovery &_msg) const
1326  {
1327  uint16_t msgSize;
1328 
1329 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1330  size_t msgSizeFull = _msg.ByteSizeLong();
1331 #else
1332  int msgSizeFull = _msg.ByteSize();
1333 #endif
1334  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1335  {
1336  std::cerr << "Discovery message too large to send. Discovery won't "
1337  << "work. This shouldn't happen.\n";
1338  return;
1339  }
1340 
1341  msgSize = msgSizeFull;
1342  uint16_t totalSize = sizeof(msgSize) + msgSize;
1343  char *buffer = static_cast<char *>(new char[totalSize]);
1344  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1345 
1346  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1347  {
1348  // Send the discovery message to the multicast group through all the
1349  // sockets.
1350  for (const auto &sock : this->Sockets())
1351  {
1352  errno = 0;
1353  if (sendto(sock, reinterpret_cast<const raw_type *>(
1354  reinterpret_cast<const unsigned char*>(buffer)),
1355  totalSize, 0,
1356  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1357  sizeof(*(this->MulticastAddr()))) != totalSize)
1358  {
1359  // Ignore EPERM and ENOBUFS errors.
1360  //
1361  // See issue #106
1362  //
1363  // Rationale drawn from:
1364  //
1365  // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1366  // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1367  if (errno != EPERM && errno != ENOBUFS)
1368  {
1369  std::cerr << "Exception sending a multicast message:"
1370  << strerror(errno) << std::endl;
1371  }
1372  break;
1373  }
1374  }
1375  }
1376  else
1377  {
1378  std::cerr << "Discovery::SendMulticast: Error serializing data."
1379  << std::endl;
1380  }
1381 
1382  delete [] buffer;
1383  }
1384 
1387  private: const std::vector<int> &Sockets() const
1388  {
1389  return this->sockets;
1390  }
1391 
1394  private: const sockaddr_in *MulticastAddr() const
1395  {
1396  return &this->mcastAddr;
1397  }
1398 
1401  private: uint8_t Version() const
1402  {
1403  static std::string gzStats;
1404  static int topicStats;
1405 
1406  if (env("GZ_TRANSPORT_TOPIC_STATISTICS", gzStats) && !gzStats.empty())
1407  {
1408  topicStats = (gzStats == "1");
1409  }
1410 
1411  return this->kWireVersion + (topicStats * 100);
1412  }
1413 
1418  private: bool RegisterNetIface(const std::string &_ip)
1419  {
1420  // Make a new socket for sending discovery information.
1421  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1422  if (sock < 0)
1423  {
1424  std::cerr << "Socket creation failed." << std::endl;
1425  return false;
1426  }
1427 
1428  // Socket option: IP_MULTICAST_IF.
1429  // This socket option needs to be applied to each socket used to send
1430  // data. This option selects the source interface for outgoing messages.
1431  struct in_addr ifAddr;
1432  ifAddr.s_addr = inet_addr(_ip.c_str());
1433  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1434  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1435  {
1436  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1437  << std::endl;
1438  return false;
1439  }
1440 
1441  this->sockets.push_back(sock);
1442 
1443  // Join the multicast group. We have to do it for each network interface
1444  // but we can do it on the same socket. We will use the socket at
1445  // position 0 for receiving multicast information.
1446  struct ip_mreq group;
1447  group.imr_multiaddr.s_addr =
1448  inet_addr(this->multicastGroup.c_str());
1449  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1450  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1451  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1452  {
1453  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1454  << std::endl;
1455  return false;
1456  }
1457 
1458  return true;
1459  }
1460 
1464  private: static const unsigned int kDefActivityInterval = 100;
1465 
1469  private: static const unsigned int kDefHeartbeatInterval = 1000;
1470 
1474  private: static const unsigned int kDefSilenceInterval = 3000;
1475 
1477  private: std::string multicastGroup;
1478 
1480  private: const int kTimeout = 250;
1481 
1483  private: static const uint16_t kMaxRcvStr =
1485 
1488  private: static const uint8_t kWireVersion = 10;
1489 
1491  private: int port;
1492 
1494  private: std::string hostAddr;
1495 
1497  private: std::vector<std::string> hostInterfaces;
1498 
1500  private: std::string pUuid;
1501 
1505  private: unsigned int silenceInterval;
1506 
1510  private: unsigned int activityInterval;
1511 
1515  private: unsigned int heartbeatInterval;
1516 
1518  private: DiscoveryCallback<Pub> connectionCb;
1519 
1521  private: DiscoveryCallback<Pub> disconnectionCb;
1522 
1524  private: DiscoveryCallback<Pub> registrationCb;
1525 
1527  private: DiscoveryCallback<Pub> unregistrationCb;
1528 
1530  private: std::function<void()> subscribersCb;
1531 
1533  private: TopicStorage<Pub> info;
1534 
1536  private: TopicStorage<Pub> remoteSubscribers;
1537 
1543 
1545  private: bool verbose;
1546 
1548  private: std::vector<int> sockets;
1549 
1551  private: sockaddr_in mcastAddr;
1552 
1554  private: std::vector<sockaddr_in> relayAddrs;
1555 
1557  private: mutable std::mutex mutex;
1558 
1560  private: std::thread threadReception;
1561 
1563  private: Timestamp timeNextHeartbeat;
1564 
1566  private: Timestamp timeNextActivity;
1567 
1569  private: std::mutex exitMutex;
1570 
1575  private: bool initialized;
1576 
1578  private: unsigned int numHeartbeatsUninitialized;
1579 
1581  private: mutable std::condition_variable initializedCv;
1582 
1584  private: bool exit;
1585 
1587  private: bool enabled;
1588  };
1589 
1593 
1597  }
1598  }
1599 }
1600 
1601 #endif