Gazebo Transport

API Reference

13.4.0
SubscriptionHandler.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
19 #define GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
20 
21 #ifdef _MSC_VER
22 #pragma warning(push)
23 #pragma warning(disable: 4251)
24 #endif
25 #include <google/protobuf/message.h>
26 #include <google/protobuf/stubs/common.h>
27 #if GOOGLE_PROTOBUF_VERSION >= 3000000 && GOOGLE_PROTOBUF_VERSION < 4022000
28 #include <google/protobuf/stubs/casts.h>
29 #endif
30 #ifdef _MSC_VER
31 #pragma warning(pop)
32 #endif
33 
34 #include <chrono>
35 #include <iostream>
36 #include <memory>
37 #include <string>
38 #include <utility>
39 
40 #include <gz/msgs/Factory.hh>
41 
42 #include "gz/transport/config.hh"
43 #include "gz/transport/Export.hh"
47 #include "gz/transport/Uuid.hh"
48 
49 namespace gz
50 {
51  namespace transport
52  {
53  // Inline bracket to help doxygen filtering.
54  inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
55  //
58  class GZ_TRANSPORT_VISIBLE SubscriptionHandlerBase
59  {
63  public: explicit SubscriptionHandlerBase(
64  const std::string &_nUuid,
65  const SubscribeOptions &_opts = SubscribeOptions());
66 
68  public: virtual ~SubscriptionHandlerBase() = default;
69 
73  public: virtual std::string TypeName() = 0;
74 
77  public: std::string NodeUuid() const;
78 
81  public: std::string HandlerUuid() const;
82 
85  public: bool IgnoreLocalMessages() const;
86 
90  protected: bool UpdateThrottling();
91 
93  protected: SubscribeOptions opts;
94 
97  protected: double periodNs;
98 
99 #ifdef _WIN32
100 // Disable warning C4251 which is triggered by
101 // std::*
102 #pragma warning(push)
103 #pragma warning(disable: 4251)
104 #endif
106  protected: std::string hUuid;
107 
110 
112  private: std::string nUuid;
113 #ifdef _WIN32
114 #pragma warning(pop)
115 #endif
116  };
117 
126  class GZ_TRANSPORT_VISIBLE ISubscriptionHandler
127  : public SubscriptionHandlerBase
128  {
132  public: explicit ISubscriptionHandler(
133  const std::string &_nUuid,
134  const SubscribeOptions &_opts = SubscribeOptions());
135 
137  public: virtual ~ISubscriptionHandler() = default;
138 
143  public: virtual bool RunLocalCallback(
144  const ProtoMsg &_msg,
145  const MessageInfo &_info) = 0;
146 
151  public: virtual const std::shared_ptr<ProtoMsg> CreateMsg(
152  const std::string &_data,
153  const std::string &_type) const = 0;
154  };
155 
160  template <typename T> class SubscriptionHandler
161  : public ISubscriptionHandler
162  {
163  // Documentation inherited.
164  public: explicit SubscriptionHandler(const std::string &_nUuid,
165  const SubscribeOptions &_opts = SubscribeOptions())
166  : ISubscriptionHandler(_nUuid, _opts)
167  {
168  }
169 
170  // Documentation inherited.
172  const std::string &_data,
173  const std::string &/*_type*/) const
174  {
175  // Instantiate a specific protobuf message
176  auto msgPtr = std::make_shared<T>();
177 
178  // Create the message using some serialized data
179  if (!msgPtr->ParseFromString(_data))
180  {
181  std::cerr << "SubscriptionHandler::CreateMsg() error: ParseFromString"
182  << " failed" << std::endl;
183  }
184 
185  return msgPtr;
186  }
187 
188  // Documentation inherited.
190  {
191  return T().GetTypeName();
192  }
193 
196  public: void SetCallback(const MsgCallback<T> &_cb)
197  {
198  this->cb = _cb;
199  }
200 
201  // Documentation inherited.
202  public: bool RunLocalCallback(const ProtoMsg &_msg,
203  const MessageInfo &_info)
204  {
205  // No callback stored.
206  if (!this->cb)
207  {
208  std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
209  << "Callback is NULL" << std::endl;
210  return false;
211  }
212 
213  // Check the subscription throttling option.
214  if (!this->UpdateThrottling())
215  return true;
216 
217 #if GOOGLE_PROTOBUF_VERSION >= 4022000
218  auto msgPtr = google::protobuf::internal::DownCast<const T*>(&_msg);
219 #elif GOOGLE_PROTOBUF_VERSION >= 3000000
220  auto msgPtr = google::protobuf::down_cast<const T*>(&_msg);
221 #else
222  auto msgPtr = google::protobuf::internal::down_cast<const T*>(&_msg);
223 #endif
224 
225  this->cb(*msgPtr, _info);
226  return true;
227  }
228 
230  private: MsgCallback<T> cb;
231  };
232 
235  template <> class SubscriptionHandler<ProtoMsg>
236  : public ISubscriptionHandler
237  {
238  // Documentation inherited.
239  public: explicit SubscriptionHandler(const std::string &_nUuid,
240  const SubscribeOptions &_opts = SubscribeOptions())
241  : ISubscriptionHandler(_nUuid, _opts)
242  {
243  }
244 
245  // Documentation inherited.
247  const std::string &_data,
248  const std::string &_type) const
249  {
251 
252  const google::protobuf::Descriptor *desc =
253  google::protobuf::DescriptorPool::generated_pool()
254  ->FindMessageTypeByName(_type);
255 
256  // First, check if we have the descriptor from the generated proto
257  // classes.
258  if (desc)
259  {
260  msgPtr.reset(google::protobuf::MessageFactory::generated_factory()
261  ->GetPrototype(desc)->New());
262  }
263  else
264  {
265  // Fallback on Gazebo Msgs if the message type is not found.
266  msgPtr = gz::msgs::Factory::New(_type);
267  }
268 
269  if (!msgPtr)
270  return nullptr;
271 
272  // Create the message using some serialized data
273  if (!msgPtr->ParseFromString(_data))
274  {
275  std::cerr << "CreateMsg() error: ParseFromString failed" << std::endl;
276  return nullptr;
277  }
278 
279  return msgPtr;
280  }
281 
282  // Documentation inherited.
284  {
285  return kGenericMessageType;
286  }
287 
290  public: void SetCallback(const MsgCallback<ProtoMsg> &_cb)
291  {
292  this->cb = _cb;
293  }
294 
295  // Documentation inherited.
296  public: bool RunLocalCallback(const ProtoMsg &_msg,
297  const MessageInfo &_info)
298  {
299  // No callback stored.
300  if (!this->cb)
301  {
302  std::cerr << "SubscriptionHandler::RunLocalCallback() "
303  << "error: Callback is NULL" << std::endl;
304  return false;
305  }
306 
307  // Check the subscription throttling option.
308  if (!this->UpdateThrottling())
309  return true;
310 
311  this->cb(_msg, _info);
312  return true;
313  }
314 
316  private: MsgCallback<ProtoMsg> cb;
317  };
318 
323  {
330  public: explicit RawSubscriptionHandler(
331  const std::string &_nUuid,
332  const std::string &_msgType = kGenericMessageType,
333  const SubscribeOptions &_opts = SubscribeOptions());
334 
335  // Documentation inherited
336  public: std::string TypeName() override;
337 
341  public: void SetCallback(const RawCallback &_callback);
342 
349  public: bool RunRawCallback(const char *_msgData, const size_t _size,
350  const MessageInfo &_info);
351 
354 
355  private: class Implementation;
356 
357 #ifdef _WIN32
358 // Disable warning C4251 which is triggered by
359 // std::unique_ptr
360 #pragma warning(push)
361 #pragma warning(disable: 4251)
362 #endif
365  private: std::unique_ptr<Implementation> pimpl;
366 #ifdef _WIN32
367 #pragma warning(pop)
368 #endif
369  };
370  }
371  }
372 }
373 
374 #endif