Gazebo Transport

API Reference

12.2.2
gz/transport/SubscriptionHandler.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
19 #define GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
20 
21 #ifdef _MSC_VER
22 #pragma warning(push)
23 #pragma warning(disable: 4251)
24 #endif
25 #include <google/protobuf/message.h>
26 #include <google/protobuf/stubs/common.h>
27 #if GOOGLE_PROTOBUF_VERSION >= 3000000 && GOOGLE_PROTOBUF_VERSION < 4022000
28 #include <google/protobuf/stubs/casts.h>
29 #endif
30 #ifdef _MSC_VER
31 #pragma warning(pop)
32 #endif
33 
34 #include <chrono>
35 #include <iostream>
36 #include <memory>
37 #include <string>
38 #include <utility>
39 
40 #include <gz/msgs/Factory.hh>
41 
42 #include "gz/transport/config.hh"
43 #include "gz/transport/Export.hh"
47 #include "gz/transport/Uuid.hh"
48 
49 namespace gz
50 {
51  namespace transport
52  {
53  // Inline bracket to help doxygen filtering.
54  inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
55  //
58  class GZ_TRANSPORT_VISIBLE SubscriptionHandlerBase
59  {
63  public: explicit SubscriptionHandlerBase(
64  const std::string &_nUuid,
65  const SubscribeOptions &_opts = SubscribeOptions());
66 
68  public: virtual ~SubscriptionHandlerBase() = default;
69 
73  public: virtual std::string TypeName() = 0;
74 
77  public: std::string NodeUuid() const;
78 
81  public: std::string HandlerUuid() const;
82 
86  protected: bool UpdateThrottling();
87 
89  protected: SubscribeOptions opts;
90 
93  protected: double periodNs;
94 
95 #ifdef _WIN32
96 // Disable warning C4251 which is triggered by
97 // std::*
98 #pragma warning(push)
99 #pragma warning(disable: 4251)
100 #endif
101  protected: std::string hUuid;
103 
106 
108  private: std::string nUuid;
109 #ifdef _WIN32
110 #pragma warning(pop)
111 #endif
112  };
113 
122  class GZ_TRANSPORT_VISIBLE ISubscriptionHandler
123  : public SubscriptionHandlerBase
124  {
128  public: explicit ISubscriptionHandler(
129  const std::string &_nUuid,
130  const SubscribeOptions &_opts = SubscribeOptions());
131 
133  public: virtual ~ISubscriptionHandler() = default;
134 
139  public: virtual bool RunLocalCallback(
140  const ProtoMsg &_msg,
141  const MessageInfo &_info) = 0;
142 
147  public: virtual const std::shared_ptr<ProtoMsg> CreateMsg(
148  const std::string &_data,
149  const std::string &_type) const = 0;
150  };
151 
156  template <typename T> class SubscriptionHandler
157  : public ISubscriptionHandler
158  {
159  // Documentation inherited.
160  public: explicit SubscriptionHandler(const std::string &_nUuid,
161  const SubscribeOptions &_opts = SubscribeOptions())
162  : ISubscriptionHandler(_nUuid, _opts)
163  {
164  }
165 
166  // Documentation inherited.
168  const std::string &_data,
169  const std::string &/*_type*/) const
170  {
171  // Instantiate a specific protobuf message
172  auto msgPtr = std::make_shared<T>();
173 
174  // Create the message using some serialized data
175  if (!msgPtr->ParseFromString(_data))
176  {
177  std::cerr << "SubscriptionHandler::CreateMsg() error: ParseFromString"
178  << " failed" << std::endl;
179  }
180 
181  return msgPtr;
182  }
183 
184  // Documentation inherited.
186  {
187  return T().GetTypeName();
188  }
189 
192  public: void SetCallback(const MsgCallback<T> &_cb)
193  {
194  this->cb = _cb;
195  }
196 
197  // Documentation inherited.
198  public: bool RunLocalCallback(const ProtoMsg &_msg,
199  const MessageInfo &_info)
200  {
201  // No callback stored.
202  if (!this->cb)
203  {
204  std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
205  << "Callback is NULL" << std::endl;
206  return false;
207  }
208 
209  // Check the subscription throttling option.
210  if (!this->UpdateThrottling())
211  return true;
212 
213 #if GOOGLE_PROTOBUF_VERSION >= 4022000
214  auto msgPtr = google::protobuf::internal::DownCast<const T*>(&_msg);
215 #elif GOOGLE_PROTOBUF_VERSION >= 3000000
216  auto msgPtr = google::protobuf::down_cast<const T*>(&_msg);
217 #else
218  auto msgPtr = google::protobuf::internal::down_cast<const T*>(&_msg);
219 #endif
220 
221  this->cb(*msgPtr, _info);
222  return true;
223  }
224 
226  private: MsgCallback<T> cb;
227  };
228 
231  template <> class SubscriptionHandler<ProtoMsg>
232  : public ISubscriptionHandler
233  {
234  // Documentation inherited.
235  public: explicit SubscriptionHandler(const std::string &_nUuid,
236  const SubscribeOptions &_opts = SubscribeOptions())
237  : ISubscriptionHandler(_nUuid, _opts)
238  {
239  }
240 
241  // Documentation inherited.
243  const std::string &_data,
244  const std::string &_type) const
245  {
247 
248  const google::protobuf::Descriptor *desc =
249  google::protobuf::DescriptorPool::generated_pool()
250  ->FindMessageTypeByName(_type);
251 
252  // First, check if we have the descriptor from the generated proto
253  // classes.
254  if (desc)
255  {
256  msgPtr.reset(google::protobuf::MessageFactory::generated_factory()
257  ->GetPrototype(desc)->New());
258  }
259  else
260  {
261  // Fallback on Gazebo Msgs if the message type is not found.
262  msgPtr = gz::msgs::Factory::New(_type);
263  }
264 
265  if (!msgPtr)
266  return nullptr;
267 
268  // Create the message using some serialized data
269  if (!msgPtr->ParseFromString(_data))
270  {
271  std::cerr << "CreateMsg() error: ParseFromString failed" << std::endl;
272  return nullptr;
273  }
274 
275  return msgPtr;
276  }
277 
278  // Documentation inherited.
280  {
281  return kGenericMessageType;
282  }
283 
286  public: void SetCallback(const MsgCallback<ProtoMsg> &_cb)
287  {
288  this->cb = _cb;
289  }
290 
291  // Documentation inherited.
292  public: bool RunLocalCallback(const ProtoMsg &_msg,
293  const MessageInfo &_info)
294  {
295  // No callback stored.
296  if (!this->cb)
297  {
298  std::cerr << "SubscriptionHandler::RunLocalCallback() "
299  << "error: Callback is NULL" << std::endl;
300  return false;
301  }
302 
303  // Check the subscription throttling option.
304  if (!this->UpdateThrottling())
305  return true;
306 
307  this->cb(_msg, _info);
308  return true;
309  }
310 
312  private: MsgCallback<ProtoMsg> cb;
313  };
314 
319  {
326  public: explicit RawSubscriptionHandler(
327  const std::string &_nUuid,
328  const std::string &_msgType = kGenericMessageType,
329  const SubscribeOptions &_opts = SubscribeOptions());
330 
331  // Documentation inherited
332  public: std::string TypeName() override;
333 
337  public: void SetCallback(const RawCallback &_callback);
338 
345  public: bool RunRawCallback(const char *_msgData, const size_t _size,
346  const MessageInfo &_info);
347 
349  public: ~RawSubscriptionHandler();
350 
351  private: class Implementation;
352 
353 #ifdef _WIN32
354 // Disable warning C4251 which is triggered by
355 // std::unique_ptr
356 #pragma warning(push)
357 #pragma warning(disable: 4251)
358 #endif
359  private: std::unique_ptr<Implementation> pimpl;
362 #ifdef _WIN32
363 #pragma warning(pop)
364 #endif
365  };
366  }
367  }
368 }
369 
370 #endif