Gazebo Transport

API Reference

14.0.0
SubscriptionHandler.hh
Go to the documentation of this file.
1/*
2 * Copyright (C) 2014 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18#ifndef GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
19#define GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
20
21#ifdef _MSC_VER
22#pragma warning(push)
23#pragma warning(disable: 4251)
24#endif
25#include <google/protobuf/message.h>
26#include <google/protobuf/stubs/common.h>
27#if GOOGLE_PROTOBUF_VERSION >= 3000000 && GOOGLE_PROTOBUF_VERSION < 4022000
28#include <google/protobuf/stubs/casts.h>
29#endif
30#ifdef _MSC_VER
31#pragma warning(pop)
32#endif
33
34#include <chrono>
35#include <iostream>
36#include <memory>
37#include <string>
38#include <utility>
39
40#include <gz/msgs/Factory.hh>
41
42#include "gz/transport/config.hh"
43#include "gz/transport/Export.hh"
47#include "gz/transport/Uuid.hh"
48
49namespace gz
50{
51 namespace transport
52 {
53 // Inline bracket to help doxygen filtering.
54 inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
55 //
58 class GZ_TRANSPORT_VISIBLE SubscriptionHandlerBase
59 {
63 public: explicit SubscriptionHandlerBase(
64 const std::string &_nUuid,
65 const SubscribeOptions &_opts = SubscribeOptions());
66
68 public: virtual ~SubscriptionHandlerBase() = default;
69
73 public: virtual std::string TypeName() = 0;
74
77 public: std::string NodeUuid() const;
78
81 public: std::string HandlerUuid() const;
82
85 public: bool IgnoreLocalMessages() const;
86
90 protected: bool UpdateThrottling();
91
94
97 protected: double periodNs;
98
99#ifdef _WIN32
100// Disable warning C4251 which is triggered by
101// std::*
102#pragma warning(push)
103#pragma warning(disable: 4251)
104#endif
106 protected: std::string hUuid;
107
110
112 private: std::string nUuid;
113#ifdef _WIN32
114#pragma warning(pop)
115#endif
116 };
117
126 class GZ_TRANSPORT_VISIBLE ISubscriptionHandler
128 {
132 public: explicit ISubscriptionHandler(
133 const std::string &_nUuid,
134 const SubscribeOptions &_opts = SubscribeOptions());
135
137 public: virtual ~ISubscriptionHandler() = default;
138
143 public: virtual bool RunLocalCallback(
144 const ProtoMsg &_msg,
145 const MessageInfo &_info) = 0;
146
152 const std::string &_data,
153 const std::string &_type) const = 0;
154 };
155
160 template <typename T> class SubscriptionHandler
161 : public ISubscriptionHandler
162 {
163 // Documentation inherited.
164 public: explicit SubscriptionHandler(const std::string &_nUuid,
165 const SubscribeOptions &_opts = SubscribeOptions())
166 : ISubscriptionHandler(_nUuid, _opts)
167 {
168 }
169
170 // Documentation inherited.
172 const std::string &_data,
173 const std::string &/*_type*/) const
174 {
175 // Instantiate a specific protobuf message
176 auto msgPtr = std::make_shared<T>();
177
178 // Create the message using some serialized data
179 if (!msgPtr->ParseFromString(_data))
180 {
181 std::cerr << "SubscriptionHandler::CreateMsg() error: ParseFromString"
182 << " failed" << std::endl;
183 }
184
185 return msgPtr;
186 }
187
188 // Documentation inherited.
190 {
191 return T().GetTypeName();
192 }
193
196 public: void SetCallback(const MsgCallback<T> &_cb)
197 {
198 this->cb = _cb;
199 }
200
201 // Documentation inherited.
202 public: bool RunLocalCallback(const ProtoMsg &_msg,
203 const MessageInfo &_info)
204 {
205 // No callback stored.
206 if (!this->cb)
207 {
208 std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
209 << "Callback is NULL" << std::endl;
210 return false;
211 }
212
213 // Check the subscription throttling option.
214 if (!this->UpdateThrottling())
215 return true;
216
217#if GOOGLE_PROTOBUF_VERSION >= 5028000
218 auto msgPtr = google::protobuf::DynamicCastMessage<T>(&_msg);
219#elif GOOGLE_PROTOBUF_VERSION >= 4022000
220 auto msgPtr = google::protobuf::internal::DownCast<const T*>(&_msg);
221#elif GOOGLE_PROTOBUF_VERSION >= 3000000
222 auto msgPtr = google::protobuf::down_cast<const T*>(&_msg);
223#else
224 auto msgPtr = google::protobuf::internal::down_cast<const T*>(&_msg);
225#endif
226
227 // Verify the dynamically casted message is valid
228 if (msgPtr == nullptr)
229 {
230 if (_msg.GetDescriptor() != nullptr)
231 {
232 std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
233 << "Failed to cast the message of the type "
234 << _msg.GetDescriptor()->full_name()
235 << " to the specified type" << '\n';
236 }
237 else
238 {
239 std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
240 << "Failed to cast the message of an unknown type"
241 << " to the specified type" << '\n';
242 }
243 std::cerr.flush();
244 return false;
245 }
246
247 this->cb(*msgPtr, _info);
248 return true;
249 }
250
252 private: MsgCallback<T> cb;
253 };
254
257 template <> class SubscriptionHandler<ProtoMsg>
258 : public ISubscriptionHandler
259 {
260 // Documentation inherited.
261 public: explicit SubscriptionHandler(const std::string &_nUuid,
262 const SubscribeOptions &_opts = SubscribeOptions())
263 : ISubscriptionHandler(_nUuid, _opts)
264 {
265 }
266
267 // Documentation inherited.
269 const std::string &_data,
270 const std::string &_type) const
271 {
273
274 const google::protobuf::Descriptor *desc =
275 google::protobuf::DescriptorPool::generated_pool()
276 ->FindMessageTypeByName(_type);
277
278 // First, check if we have the descriptor from the generated proto
279 // classes.
280 if (desc)
281 {
282 msgPtr.reset(google::protobuf::MessageFactory::generated_factory()
283 ->GetPrototype(desc)->New());
284 }
285 else
286 {
287 // Fallback on Gazebo Msgs if the message type is not found.
288 msgPtr = gz::msgs::Factory::New(_type);
289 }
290
291 if (!msgPtr)
292 return nullptr;
293
294 // Create the message using some serialized data
295 if (!msgPtr->ParseFromString(_data))
296 {
297 std::cerr << "CreateMsg() error: ParseFromString failed" << std::endl;
298 return nullptr;
299 }
300
301 return msgPtr;
302 }
303
304 // Documentation inherited.
306 {
307 return kGenericMessageType;
308 }
309
312 public: void SetCallback(const MsgCallback<ProtoMsg> &_cb)
313 {
314 this->cb = _cb;
315 }
316
317 // Documentation inherited.
318 public: bool RunLocalCallback(const ProtoMsg &_msg,
319 const MessageInfo &_info)
320 {
321 // No callback stored.
322 if (!this->cb)
323 {
324 std::cerr << "SubscriptionHandler::RunLocalCallback() "
325 << "error: Callback is NULL" << std::endl;
326 return false;
327 }
328
329 // Check the subscription throttling option.
330 if (!this->UpdateThrottling())
331 return true;
332
333 this->cb(_msg, _info);
334 return true;
335 }
336
338 private: MsgCallback<ProtoMsg> cb;
339 };
340
345 {
352 public: explicit RawSubscriptionHandler(
353 const std::string &_nUuid,
354 const std::string &_msgType = kGenericMessageType,
355 const SubscribeOptions &_opts = SubscribeOptions());
356
357 // Documentation inherited
358 public: std::string TypeName() override;
359
363 public: void SetCallback(const RawCallback &_callback);
364
371 public: bool RunRawCallback(const char *_msgData, const size_t _size,
372 const MessageInfo &_info);
373
376
377 private: class Implementation;
378
379#ifdef _WIN32
380// Disable warning C4251 which is triggered by
381// std::unique_ptr
382#pragma warning(push)
383#pragma warning(disable: 4251)
384#endif
387 private: std::unique_ptr<Implementation> pimpl;
388#ifdef _WIN32
389#pragma warning(pop)
390#endif
391 };
392 }
393 }
394}
395
396#endif