Gazebo Transport

API Reference

14.0.0~pre1
Discovery.hh
Go to the documentation of this file.
1/*
2 * Copyright (C) 2014 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18#ifndef GZ_TRANSPORT_DISCOVERY_HH_
19#define GZ_TRANSPORT_DISCOVERY_HH_
20#include <errno.h>
21#include <string.h>
22
23#ifdef _WIN32
24 // For socket(), connect(), send(), and recv().
25 #include <Winsock2.h>
26 #include <Ws2def.h>
27 #include <Ws2ipdef.h>
28 #include <Ws2tcpip.h>
29 // Type used for raw data on this platform.
30 using raw_type = char;
31#else
32 // For data types
33 #include <sys/types.h>
34 // For socket(), connect(), send(), and recv()
35 #include <sys/socket.h>
36 // For gethostbyname()
37 #include <netdb.h>
38 // For inet_addr()
39 #include <arpa/inet.h>
40 // For close()
41 #include <unistd.h>
42 // For sockaddr_in
43 #include <netinet/in.h>
44 // Type used for raw data on this platform
45 using raw_type = void;
46#endif
47
48#ifdef _WIN32
49 #pragma warning(push, 0)
50#endif
51#ifdef _WIN32
52 #pragma warning(pop)
53 // Suppress "decorated name length exceed" warning in STL.
54 #pragma warning(disable: 4503)
55 // Suppress "depreted API warnings" in WINSOCK.
56 #pragma warning(disable: 4996)
57#endif
58
59#include <gz/msgs/discovery.pb.h>
60
61#include <algorithm>
62#include <condition_variable>
63#include <limits>
64#include <map>
65#include <memory>
66#include <mutex>
67#include <string>
68#include <thread>
69#include <vector>
70
71#include <gz/msgs/Utility.hh>
72
73#include "gz/transport/config.hh"
74#include "gz/transport/Export.hh"
80
81namespace gz
82{
83 namespace transport
84 {
85 // Inline bracket to help doxygen filtering.
86 inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
88 enum class DestinationType
89 {
91 UNICAST,
95 ALL
96 };
97
98 //
104 bool GZ_TRANSPORT_VISIBLE pollSockets(
105 const std::vector<int> &_sockets,
106 const int _timeout);
107
116 template<typename Pub>
118 {
125 public: Discovery(const std::string &_pUuid,
126 const std::string &_ip,
127 const int _port,
128 const bool _verbose = false)
129 : multicastGroup(_ip),
130 port(_port),
131 hostAddr(determineHost()),
132 pUuid(_pUuid),
133 silenceInterval(kDefSilenceInterval),
134 activityInterval(kDefActivityInterval),
135 heartbeatInterval(kDefHeartbeatInterval),
136 connectionCb(nullptr),
137 disconnectionCb(nullptr),
138 verbose(_verbose),
139 initialized(false),
140 numHeartbeatsUninitialized(0),
141 exit(false),
142 enabled(false)
143 {
144 std::string gzIp;
145 if (env("GZ_IP", gzIp) && !gzIp.empty())
146 {
147 this->hostInterfaces = {gzIp};
148 }
149 else
150 {
151 // Get the list of network interfaces in this host.
152 this->hostInterfaces = determineInterfaces();
153 }
154
155#ifdef _WIN32
156 WORD wVersionRequested;
157 WSADATA wsaData;
158
159 // Request WinSock v2.2.
160 wVersionRequested = MAKEWORD(2, 2);
161 // Load WinSock DLL.
162 if (WSAStartup(wVersionRequested, &wsaData) != 0)
163 {
164 std::cerr << "Unable to load WinSock DLL" << std::endl;
165 return;
166 }
167#endif
168 for (const auto &netIface : this->hostInterfaces)
169 {
170 auto succeed = this->RegisterNetIface(netIface);
171
172 // If the IP address that we're selecting as the main IP address of
173 // the host is invalid, we change it to 127.0.0.1 .
174 // This is probably because GZ_IP is set to a wrong value.
175 if (netIface == this->hostAddr && !succeed)
176 {
177 this->RegisterNetIface("127.0.0.1");
178 std::cerr << "Did you set the environment variable GZ_IP with a "
179 << "correct IP address? " << std::endl
180 << " [" << netIface << "] seems an invalid local IP "
181 << "address." << std::endl
182 << " Using 127.0.0.1 as hostname." << std::endl;
183 this->hostAddr = "127.0.0.1";
184 }
185 }
186
187 // Socket option: SO_REUSEADDR. This options is used only for receiving
188 // data. We can reuse the same socket for receiving multicast data from
189 // multiple interfaces. We will use the socket at position 0 for
190 // receiving data.
191 int reuseAddr = 1;
192 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
193 reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
194 {
195 std::cerr << "Error setting socket option (SO_REUSEADDR)."
196 << std::endl;
197 return;
198 }
199
200#ifdef SO_REUSEPORT
201 // Socket option: SO_REUSEPORT. This options is used only for receiving
202 // data. We can reuse the same socket for receiving multicast data from
203 // multiple interfaces. We will use the socket at position 0 for
204 // receiving data.
205 int reusePort = 1;
206 // cppcheck-suppress ConfigurationNotChecked
207 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
208 reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
209 {
210 std::cerr << "Error setting socket option (SO_REUSEPORT)."
211 << std::endl;
212 return;
213 }
214#endif
215 // Bind the first socket to the discovery port.
216 sockaddr_in localAddr;
217 memset(&localAddr, 0, sizeof(localAddr));
218 localAddr.sin_family = AF_INET;
219 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
220 localAddr.sin_port = htons(static_cast<u_short>(this->port));
221
222 if (bind(this->sockets.at(0),
223 reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
224 {
225 std::cerr << "Binding to a local port failed." << std::endl;
226 return;
227 }
228
229 // Set 'mcastAddr' to the multicast discovery group.
230 memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
231 this->mcastAddr.sin_family = AF_INET;
232 this->mcastAddr.sin_addr.s_addr =
233 inet_addr(this->multicastGroup.c_str());
234 this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
235
237 std::string gzRelay = "";
238 if (env("GZ_RELAY", gzRelay) && !gzRelay.empty())
239 {
240 relays = transport::split(gzRelay, ':');
241 }
242
243 // Register all unicast relays.
244 for (auto const &relayAddr : relays)
245 this->AddRelayAddress(relayAddr);
246
247 if (this->verbose)
248 this->PrintCurrentState();
249 }
250
252 public: virtual ~Discovery()
253 {
254 // Tell the service thread to terminate.
255 this->exitMutex.lock();
256 this->exit = true;
257 this->exitMutex.unlock();
258
259 // Wait for the service threads to finish before exit.
260 if (this->threadReception.joinable())
261 this->threadReception.join();
262
263 // Broadcast a BYE message to trigger the remote cancellation of
264 // all our advertised topics.
265 this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
266 Publisher("", "", this->pUuid, "", AdvertiseOptions()));
267
268 // Close sockets.
269 for (const auto &sock : this->sockets)
270 {
271#ifdef _WIN32
272 closesocket(sock);
273 WSACleanup();
274#else
275 close(sock);
276#endif
277 }
278 }
279
283 public: void Start()
284 {
285 {
286 std::lock_guard<std::mutex> lock(this->mutex);
287
288 // The service is already running.
289 if (this->enabled)
290 return;
291
292 this->enabled = true;
293 }
294
296 this->timeNextHeartbeat = now;
297 this->timeNextActivity = now;
298
299 // Start the thread that receives discovery information.
300 this->threadReception = std::thread(&Discovery::RecvMessages, this);
301 }
302
307 public: bool Advertise(const Pub &_publisher)
308 {
310
311 {
312 std::lock_guard<std::mutex> lock(this->mutex);
313
314 if (!this->enabled)
315 return false;
316
317 // Add the addressing information (local publisher).
318 if (!this->info.AddPublisher(_publisher))
319 return false;
320
321 cb = this->connectionCb;
322 }
323
324 if (cb)
325 cb(_publisher);
326
327 // Only advertise a message outside this process if the scope
328 // is not 'Process'
329 if (_publisher.Options().Scope() != Scope_t::PROCESS)
330 this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
331 _publisher);
332
333 return true;
334 }
335
346 public: bool Discover(const std::string &_topic) const
347 {
349 bool found;
350 Addresses_M<Pub> addresses;
351
352 {
353 std::lock_guard<std::mutex> lock(this->mutex);
354
355 if (!this->enabled)
356 return false;
357
358 cb = this->connectionCb;
359 }
360
361 Pub pub;
362 pub.SetTopic(_topic);
363 pub.SetPUuid(this->pUuid);
364
365 // Send a discovery request.
366 this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
367
368 {
369 std::lock_guard<std::mutex> lock(this->mutex);
370 found = this->info.Publishers(_topic, addresses);
371 }
372
373 if (found)
374 {
375 // I already have information about this topic.
376 for (const auto &proc : addresses)
377 {
378 for (const auto &node : proc.second)
379 {
380 if (cb)
381 {
382 // Execute the user's callback for a service request. Notice
383 // that we only execute one callback for preventing receive
384 // multiple service responses for a single request.
385 cb(node);
386 }
387 }
388 }
389 }
390
391 return true;
392 }
393
396 public: void SendSubscribersRep(const MessagePublisher &_pub) const
397 {
398 this->SendMsg(
399 DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REP, _pub);
400 }
401
404 public: void Register(const MessagePublisher &_pub) const
405 {
406 this->SendMsg(
407 DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
408 }
409
412 public: void Unregister(const MessagePublisher &_pub) const
413 {
414 this->SendMsg(
415 DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
416 }
417
420 public: const TopicStorage<Pub> &Info() const
421 {
422 std::lock_guard<std::mutex> lock(this->mutex);
423 return this->info;
424 }
425
430 public: bool Publishers(const std::string &_topic,
431 Addresses_M<Pub> &_publishers) const
432 {
433 std::lock_guard<std::mutex> lock(this->mutex);
434 return this->info.Publishers(_topic, _publishers);
435 }
436
441 public: bool RemoteSubscribers(const std::string &_topic,
442 Addresses_M<Pub> &_subscribers) const
443 {
444 std::lock_guard<std::mutex> lock(this->mutex);
445 return this->remoteSubscribers.Publishers(_topic, _subscribers);
446 }
447
455 public: bool Unadvertise(const std::string &_topic,
456 const std::string &_nUuid)
457 {
458 Pub inf;
459 {
460 std::lock_guard<std::mutex> lock(this->mutex);
461
462 if (!this->enabled)
463 return false;
464
465 // Don't do anything if the topic is not advertised by any of my nodes
466 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
467 return true;
468
469 // Remove the topic information.
470 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
471 }
472
473 // Only unadvertise a message outside this process if the scope
474 // is not 'Process'.
475 if (inf.Options().Scope() != Scope_t::PROCESS)
476 {
477 this->SendMsg(DestinationType::ALL,
478 msgs::Discovery::UNADVERTISE, inf);
479 }
480
481 return true;
482 }
483
486 public: std::string HostAddr() const
487 {
488 std::lock_guard<std::mutex> lock(this->mutex);
489 return this->hostAddr;
490 }
491
496 public: unsigned int ActivityInterval() const
497 {
498 std::lock_guard<std::mutex> lock(this->mutex);
499 return this->activityInterval;
500 }
501
507 public: unsigned int HeartbeatInterval() const
508 {
509 std::lock_guard<std::mutex> lock(this->mutex);
510 return this->heartbeatInterval;
511 }
512
517 public: unsigned int SilenceInterval() const
518 {
519 std::lock_guard<std::mutex> lock(this->mutex);
520 return this->silenceInterval;
521 }
522
526 public: void SetActivityInterval(const unsigned int _ms)
527 {
528 std::lock_guard<std::mutex> lock(this->mutex);
529 this->activityInterval = _ms;
530 }
531
535 public: void SetHeartbeatInterval(const unsigned int _ms)
536 {
537 std::lock_guard<std::mutex> lock(this->mutex);
538 this->heartbeatInterval = _ms;
539 }
540
544 public: void SetSilenceInterval(const unsigned int _ms)
545 {
546 std::lock_guard<std::mutex> lock(this->mutex);
547 this->silenceInterval = _ms;
548 }
549
554 public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
555 {
556 std::lock_guard<std::mutex> lock(this->mutex);
557 this->connectionCb = _cb;
558 }
559
565 {
566 std::lock_guard<std::mutex> lock(this->mutex);
567 this->disconnectionCb = _cb;
568 }
569
574 {
575 std::lock_guard<std::mutex> lock(this->mutex);
576 this->registrationCb = _cb;
577 }
578
583 {
584 std::lock_guard<std::mutex> lock(this->mutex);
585 this->unregistrationCb = _cb;
586 }
587
591 public: void SubscribersCb(const std::function<void()> &_cb)
592 {
593 std::lock_guard<std::mutex> lock(this->mutex);
594 this->subscribersCb = _cb;
595 }
596
598 public: void PrintCurrentState() const
599 {
600 std::lock_guard<std::mutex> lock(this->mutex);
601
602 std::cout << "---------------" << std::endl;
603 std::cout << std::boolalpha << "Enabled: "
604 << this->enabled << std::endl;
605 std::cout << "Discovery state" << std::endl;
606 std::cout << "\tUUID: " << this->pUuid << std::endl;
607 std::cout << "Settings" << std::endl;
608 std::cout << "\tActivity: " << this->activityInterval
609 << " ms." << std::endl;
610 std::cout << "\tHeartbeat: " << this->heartbeatInterval
611 << "ms." << std::endl;
612 std::cout << "\tSilence: " << this->silenceInterval
613 << " ms." << std::endl;
614 std::cout << "Known information:" << std::endl;
615 this->info.Print();
616
617 // Used to calculate the elapsed time.
619
620 std::cout << "Activity" << std::endl;
621 if (this->activity.empty())
622 std::cout << "\t<empty>" << std::endl;
623 else
624 {
625 for (auto &proc : this->activity)
626 {
627 // Elapsed time since the last update from this publisher.
628 std::chrono::duration<double> elapsed = now - proc.second;
629
630 std::cout << "\t" << proc.first << std::endl;
631 std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
632 std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
633 << std::endl;
634 }
635 }
636 std::cout << "---------------" << std::endl;
637 }
638
642 public: void TopicList(std::vector<std::string> &_topics)
643 {
644 {
645 std::lock_guard<std::mutex> lock(this->mutex);
646 this->remoteSubscribers.Clear();
647 }
648
649 // Request the list of subscribers.
650 Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
651 this->SendMsg(
652 DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REQ, pub);
653
654 this->WaitForInit();
655 std::lock_guard<std::mutex> lock(this->mutex);
656 this->info.TopicList(_topics);
657
658 std::vector<std::string> remoteSubs;
659 this->remoteSubscribers.TopicList(remoteSubs);
660
661 // Add the remote subscribers
662 for (auto const &t : remoteSubs)
663 {
664 if (std::find(_topics.begin(), _topics.end(), t) == _topics.end())
665 {
666 _topics.push_back(t);
667 }
668 }
669 }
670
673 public: void WaitForInit() const
674 {
675 std::unique_lock<std::mutex> lk(this->mutex);
676
677 if (!this->initialized)
678 {
679 this->initializedCv.wait(lk, [this]{return this->initialized;});
680 }
681 }
682
686 private: void UpdateActivity()
687 {
688 // The UUIDs of the processes that have expired.
690
691 // A copy of the disconnection callback.
692 DiscoveryCallback<Pub> disconnectCb;
693
695
696 {
697 std::lock_guard<std::mutex> lock(this->mutex);
698
699 if (now < this->timeNextActivity)
700 return;
701
702 disconnectCb = this->disconnectionCb;
703
704 for (auto it = this->activity.cbegin(); it != this->activity.cend();)
705 {
706 // Elapsed time since the last update from this publisher.
707 auto elapsed = now - it->second;
708
709 // This publisher has expired.
710 if (std::chrono::duration_cast<std::chrono::milliseconds>
711 (elapsed).count() > this->silenceInterval)
712 {
713 // Remove all the info entries for this process UUID.
714 this->info.DelPublishersByProc(it->first);
715
716 uuids.push_back(it->first);
717
718 // Remove the activity entry.
719 this->activity.erase(it++);
720 }
721 else
722 ++it;
723 }
724
725 this->timeNextActivity = std::chrono::steady_clock::now() +
726 std::chrono::milliseconds(this->activityInterval);
727 }
728
729 if (!disconnectCb)
730 return;
731
732 // Notify without topic information. This is useful to inform the
733 // client that a remote node is gone, even if we were not
734 // interested in its topics.
735 for (auto const &uuid : uuids)
736 {
737 Pub publisher;
738 publisher.SetPUuid(uuid);
739 disconnectCb(publisher);
740 }
741 }
742
745 public: void AddRelayAddress(const std::string &_ip)
746 {
747 std::lock_guard<std::mutex> lock(this->mutex);
748 // Sanity check: Make sure that this IP address is not already saved.
749 for (auto const &addr : this->relayAddrs)
750 {
751 if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
752 return;
753 }
754
755 sockaddr_in addr;
756 memset(&addr, 0, sizeof(addr));
757 addr.sin_family = AF_INET;
758 addr.sin_addr.s_addr = inet_addr(_ip.c_str());
759 addr.sin_port = htons(static_cast<u_short>(this->port));
760
761 this->relayAddrs.push_back(addr);
762 }
763
764 // \brief Gets this instance's relay addresses.
765 // \return The list of relay addresses.
767 {
769
770 std::lock_guard<std::mutex> lock(this->mutex);
771
772 for (auto const &addr : this->relayAddrs) {
773 result.push_back(inet_ntoa(addr.sin_addr));
774 }
775
776 return result;
777 }
778
780 private: void UpdateHeartbeat()
781 {
783
784 {
785 std::lock_guard<std::mutex> lock(this->mutex);
786
787 if (now < this->timeNextHeartbeat)
788 return;
789 }
790
791 Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
792 this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
793
795 {
796 std::lock_guard<std::mutex> lock(this->mutex);
797
798 // Re-advertise topics that are advertised inside this process.
799 this->info.PublishersByProc(this->pUuid, nodes);
800 }
801
802 for (const auto &topic : nodes)
803 {
804 for (const auto &node : topic.second)
805 {
806 this->SendMsg(DestinationType::ALL,
807 msgs::Discovery::ADVERTISE, node);
808 }
809 }
810
811 {
813 if (!this->initialized)
814 {
815 if (this->numHeartbeatsUninitialized == 2u)
816 {
817 // We consider discovery initialized after two heartbeat cycles.
818 this->initialized = true;
819
820 // Notify anyone waiting for the initialization phase to finish.
821 this->initializedCv.notify_all();
822 }
823 ++this->numHeartbeatsUninitialized;
824 }
825
826 this->timeNextHeartbeat = std::chrono::steady_clock::now() +
827 std::chrono::milliseconds(this->heartbeatInterval);
828 }
829 }
830
840 private: int NextTimeout() const
841 {
843 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
844 auto timeUntilNextActivity = this->timeNextActivity - now;
845
846 int t = static_cast<int>(
847 std::chrono::duration_cast<std::chrono::milliseconds>
848 (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
849 int t2 = std::min(t, this->kTimeout);
850 return std::max(t2, 0);
851 }
852
854 private: void RecvMessages()
855 {
856 bool timeToExit = false;
857 while (!timeToExit)
858 {
859 // Calculate the timeout.
860 int timeout = this->NextTimeout();
861
862 if (pollSockets(this->sockets, timeout))
863 {
864 this->RecvDiscoveryUpdate();
865
866 if (this->verbose)
867 this->PrintCurrentState();
868 }
869
870 this->UpdateHeartbeat();
871 this->UpdateActivity();
872
873 // Is it time to exit?
874 {
875 std::lock_guard<std::mutex> lock(this->exitMutex);
876 if (this->exit)
877 timeToExit = true;
878 }
879 }
880 }
881
883 private: void RecvDiscoveryUpdate()
884 {
885 char rcvStr[Discovery::kMaxRcvStr];
886 sockaddr_in clntAddr;
887 socklen_t addrLen = sizeof(clntAddr);
888
889 int64_t received = recvfrom(this->sockets.at(0),
890 reinterpret_cast<raw_type *>(rcvStr),
891 this->kMaxRcvStr, 0,
892 reinterpret_cast<sockaddr *>(&clntAddr),
893 reinterpret_cast<socklen_t *>(&addrLen));
894 if (received > 0)
895 {
896 uint16_t len = 0;
897 memcpy(&len, &rcvStr[0], sizeof(len));
898
899 // Gazebo Transport delimits each discovery message with a
900 // frame_delimiter that contains byte size information.
901 // A discovery message has the form:
902 //
903 // <frame_delimiter><frame_body>
904 //
905 // Gazebo Transport version < 8 sends a frame delimiter that
906 // contains the value of sizeof(frame_delimiter)
907 // + sizeof(frame_body). In other words, the frame_delimiter
908 // contains a value that represents the total size of the
909 // frame_body and frame_delimiter in bytes.
910 //
911 // Gazebo Transport version >= 8 sends a frame_delimiter
912 // that contains the value of sizeof(frame_body). In other
913 // words, the frame_delimiter contains a value that represents
914 // the total size of only the frame_body.
915 //
916 // It is possible that two incompatible versions of Gazebo
917 // Transport exist on the same network. If we receive an
918 // unexpected size, then we ignore the message.
919
920 // If-condition for version 8+
921 if (len + sizeof(len) == static_cast<uint16_t>(received))
922 {
923 std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
924 uint16_t srcPort = ntohs(clntAddr.sin_port);
925
926 if (this->verbose)
927 {
928 std::cout << "\nReceived discovery update from "
929 << srcAddr << ": " << srcPort << std::endl;
930 }
931
932 this->DispatchDiscoveryMsg(srcAddr, rcvStr + sizeof(len), len);
933 }
934 }
935 else if (received < 0)
936 {
937 std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
938 << std::endl;
939 }
940 }
941
946 private: void DispatchDiscoveryMsg(const std::string &_fromIp,
947 char *_msg, uint16_t _len)
948 {
949 gz::msgs::Discovery msg;
950
951 // Parse the message, and return if parsing failed. Parsing could
952 // fail when another discovery node is publishing messages using an
953 // older (or newer) format.
954 if (!msg.ParseFromArray(_msg, _len))
955 return;
956
957 // Discard the message if the wire protocol is different than mine.
958 if (this->Version() != msg.version())
959 return;
960
961 std::string recvPUuid = msg.process_uuid();
962
963 // Discard our own discovery messages.
964 if (recvPUuid == this->pUuid)
965 return;
966
967 // Forwarding summary:
968 // - From a unicast peer -> to multicast group (with NO_RELAY flag).
969 // - From multicast group -> to unicast peers (with RELAY flag).
970
971 // If the RELAY flag is set, this discovery message is coming via a
972 // unicast transmission. In this case, we don't process it, we just
973 // forward it to the multicast group, and it will be dispatched once
974 // received there. Note that we also unset the RELAY flag and set the
975 // NO_RELAY flag, to avoid forwarding the message anymore.
976 if (msg.has_flags() && msg.flags().relay())
977 {
978 // Unset the RELAY flag in the header and set the NO_RELAY.
979 msg.mutable_flags()->set_relay(false);
980 msg.mutable_flags()->set_no_relay(true);
981 this->SendMulticast(msg);
982
983 // A unicast peer contacted me. I need to save its address for
984 // sending future messages in the future.
985 this->AddRelayAddress(_fromIp);
986 return;
987 }
988 // If we are receiving this discovery message via the multicast channel
989 // and the NO_RELAY flag is not set, we forward this message via unicast
990 // to all our relays. Note that this is the most common case, where we
991 // receive a regular multicast message and we forward it to any remote
992 // relays.
993 else if (!msg.has_flags() || !msg.flags().no_relay())
994 {
995 msg.mutable_flags()->set_relay(true);
996 this->SendUnicast(msg);
997 }
998
999 bool isSenderLocal = (std::find(this->hostInterfaces.begin(),
1000 this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
1001 (_fromIp.find("127.") == 0);
1002
1003 // Update timestamp and cache the callbacks.
1004 DiscoveryCallback<Pub> connectCb;
1005 DiscoveryCallback<Pub> disconnectCb;
1006 DiscoveryCallback<Pub> registerCb;
1007 DiscoveryCallback<Pub> unregisterCb;
1008 std::function<void()> subscribersReqCb;
1009 {
1010 std::lock_guard<std::mutex> lock(this->mutex);
1011 this->activity[recvPUuid] = std::chrono::steady_clock::now();
1012 connectCb = this->connectionCb;
1013 disconnectCb = this->disconnectionCb;
1014 registerCb = this->registrationCb;
1015 unregisterCb = this->unregistrationCb;
1016 subscribersReqCb = this->subscribersCb;
1017 }
1018
1019 switch (msg.type())
1020 {
1021 case msgs::Discovery::ADVERTISE:
1022 {
1023 // Read the rest of the fields.
1024 Pub publisher;
1025 publisher.SetFromDiscovery(msg);
1026
1027 // Check scope of the topic.
1028 if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1029 (publisher.Options().Scope() == Scope_t::HOST &&
1030 !isSenderLocal))
1031 {
1032 return;
1033 }
1034
1035 // Register an advertised address for the topic.
1036 bool added;
1037 {
1038 std::lock_guard<std::mutex> lock(this->mutex);
1039 added = this->info.AddPublisher(publisher);
1040 }
1041
1042 if (added && connectCb)
1043 {
1044 // Execute the client's callback.
1045 connectCb(publisher);
1046 }
1047
1048 break;
1049 }
1050 case msgs::Discovery::SUBSCRIBE:
1051 {
1052 std::string recvTopic;
1053 // Read the topic information.
1054 if (msg.has_sub())
1055 {
1056 recvTopic = msg.sub().topic();
1057 }
1058 else
1059 {
1060 std::cerr << "Subscription discovery message is missing "
1061 << "Subscriber information.\n";
1062 break;
1063 }
1064
1065 // Check if at least one of my nodes advertises the topic requested.
1066 Addresses_M<Pub> addresses;
1067 {
1068 std::lock_guard<std::mutex> lock(this->mutex);
1069 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
1070 {
1071 break;
1072 }
1073
1074 if (!this->info.Publishers(recvTopic, addresses))
1075 break;
1076 }
1077
1078 for (const auto &nodeInfo : addresses[this->pUuid])
1079 {
1080 // Check scope of the topic.
1081 if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
1082 (nodeInfo.Options().Scope() == Scope_t::HOST &&
1083 !isSenderLocal))
1084 {
1085 continue;
1086 }
1087
1088 // Answer an ADVERTISE message.
1089 this->SendMsg(DestinationType::ALL,
1090 msgs::Discovery::ADVERTISE, nodeInfo);
1091 }
1092
1093 break;
1094 }
1095 case msgs::Discovery::SUBSCRIBERS_REQ:
1096 {
1097 if (subscribersReqCb)
1098 subscribersReqCb();
1099
1100 break;
1101 }
1102 case msgs::Discovery::SUBSCRIBERS_REP:
1103 {
1104 // Save the remote subscriber.
1105 Pub publisher;
1106 publisher.SetFromDiscovery(msg);
1107
1108 {
1109 std::lock_guard<std::mutex> lock(this->mutex);
1110 this->remoteSubscribers.AddPublisher(publisher);
1111 }
1112 break;
1113 }
1114 case msgs::Discovery::NEW_CONNECTION:
1115 {
1116 // Read the rest of the fields.
1117 Pub publisher;
1118 publisher.SetFromDiscovery(msg);
1119
1120 if (registerCb)
1121 registerCb(publisher);
1122
1123 break;
1124 }
1125 case msgs::Discovery::END_CONNECTION:
1126 {
1127 // Read the rest of the fields.
1128 Pub publisher;
1129 publisher.SetFromDiscovery(msg);
1130
1131 if (unregisterCb)
1132 unregisterCb(publisher);
1133
1134 break;
1135 }
1136 case msgs::Discovery::HEARTBEAT:
1137 {
1138 // The timestamp has already been updated.
1139 break;
1140 }
1141 case msgs::Discovery::BYE:
1142 {
1143 // Remove the activity entry for this publisher.
1144 {
1145 std::lock_guard<std::mutex> lock(this->mutex);
1146 this->activity.erase(recvPUuid);
1147 }
1148
1149 if (disconnectCb)
1150 {
1151 Pub pub;
1152 pub.SetPUuid(recvPUuid);
1153 // Notify the new disconnection.
1154 disconnectCb(pub);
1155 }
1156
1157 // Remove the address entry for this topic.
1158 {
1159 std::lock_guard<std::mutex> lock(this->mutex);
1160 this->info.DelPublishersByProc(recvPUuid);
1161 }
1162
1163 break;
1164 }
1165 case msgs::Discovery::UNADVERTISE:
1166 {
1167 // Read the address.
1168 Pub publisher;
1169 publisher.SetFromDiscovery(msg);
1170
1171 // Check scope of the topic.
1172 if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1173 (publisher.Options().Scope() == Scope_t::HOST &&
1174 !isSenderLocal))
1175 {
1176 return;
1177 }
1178
1179 if (disconnectCb)
1180 {
1181 // Notify the new disconnection.
1182 disconnectCb(publisher);
1183 }
1184
1185 // Remove the address entry for this topic.
1186 {
1187 std::lock_guard<std::mutex> lock(this->mutex);
1188 this->info.DelPublisherByNode(publisher.Topic(),
1189 publisher.PUuid(), publisher.NUuid());
1190 }
1191
1192 break;
1193 }
1194 default:
1195 {
1196 std::cerr << "Unknown message type [" << msg.type() << "].\n";
1197 break;
1198 }
1199 }
1200 }
1201
1208 private: template<typename T>
1209 void SendMsg(const DestinationType &_destType,
1210 const msgs::Discovery::Type _type,
1211 const T &_pub) const
1212 {
1213 gz::msgs::Discovery discoveryMsg;
1214 discoveryMsg.set_version(this->Version());
1215 discoveryMsg.set_type(_type);
1216 discoveryMsg.set_process_uuid(this->pUuid);
1217 _pub.FillDiscovery(discoveryMsg);
1218
1219 switch (_type)
1220 {
1221 case msgs::Discovery::ADVERTISE:
1222 case msgs::Discovery::UNADVERTISE:
1223 case msgs::Discovery::NEW_CONNECTION:
1224 case msgs::Discovery::END_CONNECTION:
1225 {
1226 _pub.FillDiscovery(discoveryMsg);
1227 break;
1228 }
1229 case msgs::Discovery::SUBSCRIBE:
1230 {
1231 discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1232 break;
1233 }
1234 case msgs::Discovery::HEARTBEAT:
1235 case msgs::Discovery::BYE:
1236 case msgs::Discovery::SUBSCRIBERS_REQ:
1237 case msgs::Discovery::SUBSCRIBERS_REP:
1238 break;
1239 default:
1240 std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1241 << " type [" << _type << "]" << std::endl;
1242 return;
1243 }
1244
1245 if (_destType == DestinationType::MULTICAST ||
1246 _destType == DestinationType::ALL)
1247 {
1248 this->SendMulticast(discoveryMsg);
1249 }
1250
1251 // Send the discovery message to the unicast relays.
1252 if (_destType == DestinationType::UNICAST ||
1253 _destType == DestinationType::ALL)
1254 {
1255 // Set the RELAY flag in the header.
1256 discoveryMsg.mutable_flags()->set_relay(true);
1257 this->SendUnicast(discoveryMsg);
1258 }
1259
1260 if (this->verbose)
1261 {
1262 std::cout << "\t* Sending " << msgs::ToString(_type)
1263 << " msg [" << _pub.Topic() << "]" << std::endl;
1264 }
1265 }
1266
1269 private: void SendUnicast(const msgs::Discovery &_msg) const
1270 {
1271 uint16_t msgSize;
1272
1273#if GOOGLE_PROTOBUF_VERSION >= 3004000
1274 size_t msgSizeFull = _msg.ByteSizeLong();
1275#else
1276 int msgSizeFull = _msg.ByteSize();
1277#endif
1278 if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1279 {
1280 std::cerr << "Discovery message too large to send. Discovery won't "
1281 << "work. This shouldn't happen.\n";
1282 return;
1283 }
1284 msgSize = msgSizeFull;
1285
1286 uint16_t totalSize = sizeof(msgSize) + msgSize;
1287 char *buffer = static_cast<char *>(new char[totalSize]);
1288 memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1289
1290 if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1291 {
1292 // Send the discovery message to the unicast relays.
1293 std::lock_guard<std::mutex> lock(this->mutex);
1294
1295 for (const auto &sockAddr : this->relayAddrs)
1296 {
1297 errno = 0;
1298 auto sent = sendto(this->sockets.at(0),
1299 reinterpret_cast<const raw_type *>(
1300 reinterpret_cast<const unsigned char*>(buffer)),
1301 totalSize, 0,
1302 reinterpret_cast<const sockaddr *>(&sockAddr),
1303 sizeof(sockAddr));
1304
1305 if (sent != totalSize)
1306 {
1307 std::cerr << "Exception sending a unicast message:" << std::endl;
1308 std::cerr << " Return value: " << sent << std::endl;
1309 std::cerr << " Error code: " << strerror(errno) << std::endl;
1310 break;
1311 }
1312 }
1313 }
1314 else
1315 {
1316 std::cerr << "Discovery::SendUnicast: Error serializing data."
1317 << std::endl;
1318 }
1319
1320 delete [] buffer;
1321 }
1322
1325 private: void SendMulticast(const msgs::Discovery &_msg) const
1326 {
1327 uint16_t msgSize;
1328
1329#if GOOGLE_PROTOBUF_VERSION >= 3004000
1330 size_t msgSizeFull = _msg.ByteSizeLong();
1331#else
1332 int msgSizeFull = _msg.ByteSize();
1333#endif
1334 if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1335 {
1336 std::cerr << "Discovery message too large to send. Discovery won't "
1337 << "work. This shouldn't happen.\n";
1338 return;
1339 }
1340
1341 msgSize = msgSizeFull;
1342 uint16_t totalSize = sizeof(msgSize) + msgSize;
1343 char *buffer = static_cast<char *>(new char[totalSize]);
1344 memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1345
1346 if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1347 {
1348 // Send the discovery message to the multicast group through all the
1349 // sockets.
1350 for (const auto &sock : this->Sockets())
1351 {
1352 errno = 0;
1353 if (sendto(sock, reinterpret_cast<const raw_type *>(
1354 reinterpret_cast<const unsigned char*>(buffer)),
1355 totalSize, 0,
1356 reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1357 sizeof(*(this->MulticastAddr()))) != totalSize)
1358 {
1359 // Ignore EPERM and ENOBUFS errors.
1360 //
1361 // See issue #106
1362 //
1363 // Rationale drawn from:
1364 //
1365 // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1366 // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1367 if (errno != EPERM && errno != ENOBUFS)
1368 {
1369 std::cerr << "Exception sending a multicast message:"
1370 << strerror(errno) << std::endl;
1371 }
1372 break;
1373 }
1374 }
1375 }
1376 else
1377 {
1378 std::cerr << "Discovery::SendMulticast: Error serializing data."
1379 << std::endl;
1380 }
1381
1382 delete [] buffer;
1383 }
1384
1387 private: const std::vector<int> &Sockets() const
1388 {
1389 return this->sockets;
1390 }
1391
1394 private: const sockaddr_in *MulticastAddr() const
1395 {
1396 return &this->mcastAddr;
1397 }
1398
1401 private: uint8_t Version() const
1402 {
1403 static std::string gzStats;
1404 static int topicStats;
1405
1406 if (env("GZ_TRANSPORT_TOPIC_STATISTICS", gzStats) && !gzStats.empty())
1407 {
1408 topicStats = (gzStats == "1");
1409 }
1410
1411 return this->kWireVersion + (topicStats * 100);
1412 }
1413
1418 private: bool RegisterNetIface(const std::string &_ip)
1419 {
1420 // Make a new socket for sending discovery information.
1421 int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1422 if (sock < 0)
1423 {
1424 std::cerr << "Socket creation failed." << std::endl;
1425 return false;
1426 }
1427
1428 // Socket option: IP_MULTICAST_IF.
1429 // This socket option needs to be applied to each socket used to send
1430 // data. This option selects the source interface for outgoing messages.
1431 struct in_addr ifAddr;
1432 ifAddr.s_addr = inet_addr(_ip.c_str());
1433 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1434 reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1435 {
1436 std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1437 << std::endl;
1438 return false;
1439 }
1440
1441 this->sockets.push_back(sock);
1442
1443 // Join the multicast group. We have to do it for each network interface
1444 // but we can do it on the same socket. We will use the socket at
1445 // position 0 for receiving multicast information.
1446 struct ip_mreq group;
1447 group.imr_multiaddr.s_addr =
1448 inet_addr(this->multicastGroup.c_str());
1449 group.imr_interface.s_addr = inet_addr(_ip.c_str());
1450 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1451 reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1452 {
1453 std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1454 << std::endl;
1455 return false;
1456 }
1457
1458 return true;
1459 }
1460
1464 private: static const unsigned int kDefActivityInterval = 100;
1465
1469 private: static const unsigned int kDefHeartbeatInterval = 1000;
1470
1474 private: static const unsigned int kDefSilenceInterval = 3000;
1475
1477 private: std::string multicastGroup;
1478
1480 private: const int kTimeout = 250;
1481
1483 private: static const uint16_t kMaxRcvStr =
1485
1488 private: static const uint8_t kWireVersion = 10;
1489
1491 private: int port;
1492
1494 private: std::string hostAddr;
1495
1497 private: std::vector<std::string> hostInterfaces;
1498
1500 private: std::string pUuid;
1501
1505 private: unsigned int silenceInterval;
1506
1510 private: unsigned int activityInterval;
1511
1515 private: unsigned int heartbeatInterval;
1516
1518 private: DiscoveryCallback<Pub> connectionCb;
1519
1521 private: DiscoveryCallback<Pub> disconnectionCb;
1522
1524 private: DiscoveryCallback<Pub> registrationCb;
1525
1527 private: DiscoveryCallback<Pub> unregistrationCb;
1528
1530 private: std::function<void()> subscribersCb;
1531
1533 private: TopicStorage<Pub> info;
1534
1536 private: TopicStorage<Pub> remoteSubscribers;
1537
1543
1545 private: bool verbose;
1546
1548 private: std::vector<int> sockets;
1549
1551 private: sockaddr_in mcastAddr;
1552
1554 private: std::vector<sockaddr_in> relayAddrs;
1555
1557 private: mutable std::mutex mutex;
1558
1560 private: std::thread threadReception;
1561
1563 private: Timestamp timeNextHeartbeat;
1564
1566 private: Timestamp timeNextActivity;
1567
1569 private: std::mutex exitMutex;
1570
1575 private: bool initialized;
1576
1578 private: unsigned int numHeartbeatsUninitialized;
1579
1581 private: mutable std::condition_variable initializedCv;
1582
1584 private: bool exit;
1585
1587 private: bool enabled;
1588 };
1589
1593
1597 }
1598 }
1599}
1600
1601#endif