Gazebo Transport

API Reference

13.4.1
SubscriptionHandler.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
19 #define GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
20 
21 #ifdef _MSC_VER
22 #pragma warning(push)
23 #pragma warning(disable: 4251)
24 #endif
25 #include <google/protobuf/message.h>
26 #include <google/protobuf/stubs/common.h>
27 #if GOOGLE_PROTOBUF_VERSION >= 3000000 && GOOGLE_PROTOBUF_VERSION < 4022000
28 #include <google/protobuf/stubs/casts.h>
29 #endif
30 #ifdef _MSC_VER
31 #pragma warning(pop)
32 #endif
33 
34 #include <chrono>
35 #include <iostream>
36 #include <memory>
37 #include <string>
38 #include <utility>
39 
40 #include <gz/msgs/Factory.hh>
41 
42 #include "gz/transport/config.hh"
43 #include "gz/transport/Export.hh"
47 #include "gz/transport/Uuid.hh"
48 
49 namespace gz::transport
50 {
51  // Inline bracket to help doxygen filtering.
52  inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
53  //
56  class GZ_TRANSPORT_VISIBLE SubscriptionHandlerBase
57  {
61  public: explicit SubscriptionHandlerBase(
62  const std::string &_nUuid,
63  const SubscribeOptions &_opts = SubscribeOptions());
64 
66  public: virtual ~SubscriptionHandlerBase() = default;
67 
71  public: virtual std::string TypeName() = 0;
72 
75  public: std::string NodeUuid() const;
76 
79  public: std::string HandlerUuid() const;
80 
83  public: bool IgnoreLocalMessages() const;
84 
88  protected: bool UpdateThrottling();
89 
91  protected: SubscribeOptions opts;
92 
95  protected: double periodNs;
96 
97 #ifdef _WIN32
98 // Disable warning C4251 which is triggered by
99 // std::*
100 #pragma warning(push)
101 #pragma warning(disable: 4251)
102 #endif
104  protected: std::string hUuid;
105 
108 
110  private: std::string nUuid;
111 #ifdef _WIN32
112 #pragma warning(pop)
113 #endif
114  };
115 
124  class GZ_TRANSPORT_VISIBLE ISubscriptionHandler
125  : public SubscriptionHandlerBase
126  {
130  public: explicit ISubscriptionHandler(
131  const std::string &_nUuid,
132  const SubscribeOptions &_opts = SubscribeOptions());
133 
135  public: virtual ~ISubscriptionHandler() = default;
136 
141  public: virtual bool RunLocalCallback(
142  const ProtoMsg &_msg,
143  const MessageInfo &_info) = 0;
144 
149  public: virtual const std::shared_ptr<ProtoMsg> CreateMsg(
150  const std::string &_data,
151  const std::string &_type) const = 0;
152  };
153 
158  template <typename T> class SubscriptionHandler
159  : public ISubscriptionHandler
160  {
161  // Documentation inherited.
162  public: explicit SubscriptionHandler(const std::string &_nUuid,
163  const SubscribeOptions &_opts = SubscribeOptions())
164  : ISubscriptionHandler(_nUuid, _opts)
165  {
166  }
167 
168  // Documentation inherited.
170  const std::string &_data,
171  const std::string &/*_type*/) const
172  {
173  // Instantiate a specific protobuf message
174  auto msgPtr = std::make_shared<T>();
175 
176  // Create the message using some serialized data
177  if (!msgPtr->ParseFromString(_data))
178  {
179  std::cerr << "SubscriptionHandler::CreateMsg() error: ParseFromString"
180  << " failed" << std::endl;
181  }
182 
183  return msgPtr;
184  }
185 
186  // Documentation inherited.
188  {
189  return std::string(T().GetTypeName());
190  }
191 
194  public: void SetCallback(const MsgCallback<T> &_cb)
195  {
196  this->cb = _cb;
197  }
198 
199  // Documentation inherited.
200  public: bool RunLocalCallback(const ProtoMsg &_msg,
201  const MessageInfo &_info)
202  {
203  // No callback stored.
204  if (!this->cb)
205  {
206  std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
207  << "Callback is NULL" << std::endl;
208  return false;
209  }
210 
211  // Check the subscription throttling option.
212  if (!this->UpdateThrottling())
213  return true;
214 
215 #if GOOGLE_PROTOBUF_VERSION >= 5028000
216  auto msgPtr = google::protobuf::DynamicCastMessage<T>(&_msg);
217 #elif GOOGLE_PROTOBUF_VERSION >= 4022000
218  auto msgPtr = google::protobuf::internal::DownCast<const T*>(&_msg);
219 #elif GOOGLE_PROTOBUF_VERSION >= 3000000
220  auto msgPtr = google::protobuf::down_cast<const T*>(&_msg);
221 #else
222  auto msgPtr = google::protobuf::internal::down_cast<const T*>(&_msg);
223 #endif
224 
225  // Verify the dynamically casted message is valid
226  if (msgPtr == nullptr)
227  {
228  if (_msg.GetDescriptor() != nullptr)
229  {
230  std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
231  << "Failed to cast the message of the type "
232  << _msg.GetDescriptor()->full_name()
233  << " to the specified type" << '\n';
234  }
235  else
236  {
237  std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
238  << "Failed to cast the message of an unknown type"
239  << " to the specified type" << '\n';
240  }
241  std::cerr.flush();
242  return false;
243  }
244 
245  this->cb(*msgPtr, _info);
246  return true;
247  }
248 
250  private: MsgCallback<T> cb;
251  };
252 
255  template <> class SubscriptionHandler<ProtoMsg>
256  : public ISubscriptionHandler
257  {
258  // Documentation inherited.
259  public: explicit SubscriptionHandler(const std::string &_nUuid,
260  const SubscribeOptions &_opts = SubscribeOptions())
261  : ISubscriptionHandler(_nUuid, _opts)
262  {
263  }
264 
265  // Documentation inherited.
267  const std::string &_data,
268  const std::string &_type) const
269  {
271 
272  const google::protobuf::Descriptor *desc =
273  google::protobuf::DescriptorPool::generated_pool()
274  ->FindMessageTypeByName(_type);
275 
276  // First, check if we have the descriptor from the generated proto
277  // classes.
278  if (desc)
279  {
280  msgPtr.reset(google::protobuf::MessageFactory::generated_factory()
281  ->GetPrototype(desc)->New());
282  }
283  else
284  {
285  // Fallback on Gazebo Msgs if the message type is not found.
286  msgPtr = gz::msgs::Factory::New(_type);
287  }
288 
289  if (!msgPtr)
290  return nullptr;
291 
292  // Create the message using some serialized data
293  if (!msgPtr->ParseFromString(_data))
294  {
295  std::cerr << "CreateMsg() error: ParseFromString failed" << std::endl;
296  return nullptr;
297  }
298 
299  return msgPtr;
300  }
301 
302  // Documentation inherited.
304  {
305  return kGenericMessageType;
306  }
307 
310  public: void SetCallback(const MsgCallback<ProtoMsg> &_cb)
311  {
312  this->cb = _cb;
313  }
314 
315  // Documentation inherited.
316  public: bool RunLocalCallback(const ProtoMsg &_msg,
317  const MessageInfo &_info)
318  {
319  // No callback stored.
320  if (!this->cb)
321  {
322  std::cerr << "SubscriptionHandler::RunLocalCallback() "
323  << "error: Callback is NULL" << std::endl;
324  return false;
325  }
326 
327  // Check the subscription throttling option.
328  if (!this->UpdateThrottling())
329  return true;
330 
331  this->cb(_msg, _info);
332  return true;
333  }
334 
336  private: MsgCallback<ProtoMsg> cb;
337  };
338 
343  {
350  public: explicit RawSubscriptionHandler(
351  const std::string &_nUuid,
352  const std::string &_msgType = kGenericMessageType,
353  const SubscribeOptions &_opts = SubscribeOptions());
354 
355  // Documentation inherited
356  public: std::string TypeName() override;
357 
361  public: void SetCallback(const RawCallback &_callback);
362 
369  public: bool RunRawCallback(const char *_msgData, const size_t _size,
370  const MessageInfo &_info);
371 
374 
375  private: class Implementation;
376 
377 #ifdef _WIN32
378 // Disable warning C4251 which is triggered by
379 // std::unique_ptr
380 #pragma warning(push)
381 #pragma warning(disable: 4251)
382 #endif
385  private: std::unique_ptr<Implementation> pimpl;
386 #ifdef _WIN32
387 #pragma warning(pop)
388 #endif
389  };
390  }
391 }
392 
393 #endif