Gazebo Transport

API Reference

13.4.0
SubscriptionHandler.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
19 #define GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
20 
21 #ifdef _MSC_VER
22 #pragma warning(push)
23 #pragma warning(disable: 4251)
24 #endif
25 #include <google/protobuf/message.h>
26 #include <google/protobuf/stubs/common.h>
27 #if GOOGLE_PROTOBUF_VERSION >= 3000000 && GOOGLE_PROTOBUF_VERSION < 4022000
28 #include <google/protobuf/stubs/casts.h>
29 #endif
30 #ifdef _MSC_VER
31 #pragma warning(pop)
32 #endif
33 
34 #include <chrono>
35 #include <iostream>
36 #include <memory>
37 #include <string>
38 #include <utility>
39 
40 #include <gz/msgs/Factory.hh>
41 
42 #include "gz/transport/config.hh"
43 #include "gz/transport/Export.hh"
47 #include "gz/transport/Uuid.hh"
48 
49 namespace gz
50 {
51  namespace transport
52  {
53  // Inline bracket to help doxygen filtering.
54  inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
55  //
58  class GZ_TRANSPORT_VISIBLE SubscriptionHandlerBase
59  {
63  public: explicit SubscriptionHandlerBase(
64  const std::string &_nUuid,
65  const SubscribeOptions &_opts = SubscribeOptions());
66 
68  public: virtual ~SubscriptionHandlerBase() = default;
69 
73  public: virtual std::string TypeName() = 0;
74 
77  public: std::string NodeUuid() const;
78 
81  public: std::string HandlerUuid() const;
82 
85  public: bool IgnoreLocalMessages() const;
86 
90  protected: bool UpdateThrottling();
91 
93  protected: SubscribeOptions opts;
94 
97  protected: double periodNs;
98 
99 #ifdef _WIN32
100 // Disable warning C4251 which is triggered by
101 // std::*
102 #pragma warning(push)
103 #pragma warning(disable: 4251)
104 #endif
106  protected: std::string hUuid;
107 
110 
112  private: std::string nUuid;
113 #ifdef _WIN32
114 #pragma warning(pop)
115 #endif
116  };
117 
126  class GZ_TRANSPORT_VISIBLE ISubscriptionHandler
127  : public SubscriptionHandlerBase
128  {
132  public: explicit ISubscriptionHandler(
133  const std::string &_nUuid,
134  const SubscribeOptions &_opts = SubscribeOptions());
135 
137  public: virtual ~ISubscriptionHandler() = default;
138 
143  public: virtual bool RunLocalCallback(
144  const ProtoMsg &_msg,
145  const MessageInfo &_info) = 0;
146 
151  public: virtual const std::shared_ptr<ProtoMsg> CreateMsg(
152  const std::string &_data,
153  const std::string &_type) const = 0;
154  };
155 
160  template <typename T> class SubscriptionHandler
161  : public ISubscriptionHandler
162  {
163  // Documentation inherited.
164  public: explicit SubscriptionHandler(const std::string &_nUuid,
165  const SubscribeOptions &_opts = SubscribeOptions())
166  : ISubscriptionHandler(_nUuid, _opts)
167  {
168  }
169 
170  // Documentation inherited.
172  const std::string &_data,
173  const std::string &/*_type*/) const
174  {
175  // Instantiate a specific protobuf message
176  auto msgPtr = std::make_shared<T>();
177 
178  // Create the message using some serialized data
179  if (!msgPtr->ParseFromString(_data))
180  {
181  std::cerr << "SubscriptionHandler::CreateMsg() error: ParseFromString"
182  << " failed" << std::endl;
183  }
184 
185  return msgPtr;
186  }
187 
188  // Documentation inherited.
190  {
191  return T().GetTypeName();
192  }
193 
196  public: void SetCallback(const MsgCallback<T> &_cb)
197  {
198  this->cb = _cb;
199  }
200 
201  // Documentation inherited.
202  public: bool RunLocalCallback(const ProtoMsg &_msg,
203  const MessageInfo &_info)
204  {
205  // No callback stored.
206  if (!this->cb)
207  {
208  std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
209  << "Callback is NULL" << std::endl;
210  return false;
211  }
212 
213  // Check the subscription throttling option.
214  if (!this->UpdateThrottling())
215  return true;
216 
217 #if GOOGLE_PROTOBUF_VERSION >= 5028000
218  auto msgPtr = google::protobuf::DynamicCastMessage<T>(&_msg);
219 #elif GOOGLE_PROTOBUF_VERSION >= 4022000
220  auto msgPtr = google::protobuf::internal::DownCast<const T*>(&_msg);
221 #elif GOOGLE_PROTOBUF_VERSION >= 3000000
222  auto msgPtr = google::protobuf::down_cast<const T*>(&_msg);
223 #else
224  auto msgPtr = google::protobuf::internal::down_cast<const T*>(&_msg);
225 #endif
226 
227  // Verify the dynamically casted message is valid
228  if (msgPtr == nullptr)
229  {
230  if (_msg.GetDescriptor() != nullptr)
231  {
232  std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
233  << "Failed to cast the message of the type "
234  << _msg.GetDescriptor()->full_name()
235  << " to the specified type" << '\n';
236  }
237  else
238  {
239  std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
240  << "Failed to cast the message of an unknown type"
241  << " to the specified type" << '\n';
242  }
243  std::cerr.flush();
244  return false;
245  }
246 
247  this->cb(*msgPtr, _info);
248  return true;
249  }
250 
252  private: MsgCallback<T> cb;
253  };
254 
257  template <> class SubscriptionHandler<ProtoMsg>
258  : public ISubscriptionHandler
259  {
260  // Documentation inherited.
261  public: explicit SubscriptionHandler(const std::string &_nUuid,
262  const SubscribeOptions &_opts = SubscribeOptions())
263  : ISubscriptionHandler(_nUuid, _opts)
264  {
265  }
266 
267  // Documentation inherited.
269  const std::string &_data,
270  const std::string &_type) const
271  {
273 
274  const google::protobuf::Descriptor *desc =
275  google::protobuf::DescriptorPool::generated_pool()
276  ->FindMessageTypeByName(_type);
277 
278  // First, check if we have the descriptor from the generated proto
279  // classes.
280  if (desc)
281  {
282  msgPtr.reset(google::protobuf::MessageFactory::generated_factory()
283  ->GetPrototype(desc)->New());
284  }
285  else
286  {
287  // Fallback on Gazebo Msgs if the message type is not found.
288  msgPtr = gz::msgs::Factory::New(_type);
289  }
290 
291  if (!msgPtr)
292  return nullptr;
293 
294  // Create the message using some serialized data
295  if (!msgPtr->ParseFromString(_data))
296  {
297  std::cerr << "CreateMsg() error: ParseFromString failed" << std::endl;
298  return nullptr;
299  }
300 
301  return msgPtr;
302  }
303 
304  // Documentation inherited.
306  {
307  return kGenericMessageType;
308  }
309 
312  public: void SetCallback(const MsgCallback<ProtoMsg> &_cb)
313  {
314  this->cb = _cb;
315  }
316 
317  // Documentation inherited.
318  public: bool RunLocalCallback(const ProtoMsg &_msg,
319  const MessageInfo &_info)
320  {
321  // No callback stored.
322  if (!this->cb)
323  {
324  std::cerr << "SubscriptionHandler::RunLocalCallback() "
325  << "error: Callback is NULL" << std::endl;
326  return false;
327  }
328 
329  // Check the subscription throttling option.
330  if (!this->UpdateThrottling())
331  return true;
332 
333  this->cb(_msg, _info);
334  return true;
335  }
336 
338  private: MsgCallback<ProtoMsg> cb;
339  };
340 
345  {
352  public: explicit RawSubscriptionHandler(
353  const std::string &_nUuid,
354  const std::string &_msgType = kGenericMessageType,
355  const SubscribeOptions &_opts = SubscribeOptions());
356 
357  // Documentation inherited
358  public: std::string TypeName() override;
359 
363  public: void SetCallback(const RawCallback &_callback);
364 
371  public: bool RunRawCallback(const char *_msgData, const size_t _size,
372  const MessageInfo &_info);
373 
376 
377  private: class Implementation;
378 
379 #ifdef _WIN32
380 // Disable warning C4251 which is triggered by
381 // std::unique_ptr
382 #pragma warning(push)
383 #pragma warning(disable: 4251)
384 #endif
387  private: std::unique_ptr<Implementation> pimpl;
388 #ifdef _WIN32
389 #pragma warning(pop)
390 #endif
391  };
392  }
393  }
394 }
395 
396 #endif