Gazebo Transport

API Reference

14.0.0~pre1
SubscriptionHandler.hh
Go to the documentation of this file.
1/*
2 * Copyright (C) 2014 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18#ifndef GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
19#define GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
20
21#ifdef _MSC_VER
22#pragma warning(push)
23#pragma warning(disable: 4251)
24#endif
25#include <google/protobuf/message.h>
26#include <google/protobuf/stubs/common.h>
27#if GOOGLE_PROTOBUF_VERSION >= 3000000 && GOOGLE_PROTOBUF_VERSION < 4022000
28#include <google/protobuf/stubs/casts.h>
29#endif
30#ifdef _MSC_VER
31#pragma warning(pop)
32#endif
33
34#include <chrono>
35#include <iostream>
36#include <memory>
37#include <string>
38#include <utility>
39
40#include <gz/msgs/Factory.hh>
41
42#include "gz/transport/config.hh"
43#include "gz/transport/Export.hh"
47#include "gz/transport/Uuid.hh"
48
49namespace gz
50{
51 namespace transport
52 {
53 // Inline bracket to help doxygen filtering.
54 inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
55 //
58 class GZ_TRANSPORT_VISIBLE SubscriptionHandlerBase
59 {
63 public: explicit SubscriptionHandlerBase(
64 const std::string &_nUuid,
65 const SubscribeOptions &_opts = SubscribeOptions());
66
68 public: virtual ~SubscriptionHandlerBase() = default;
69
73 public: virtual std::string TypeName() = 0;
74
77 public: std::string NodeUuid() const;
78
81 public: std::string HandlerUuid() const;
82
85 public: bool IgnoreLocalMessages() const;
86
90 protected: bool UpdateThrottling();
91
94
97 protected: double periodNs;
98
99#ifdef _WIN32
100// Disable warning C4251 which is triggered by
101// std::*
102#pragma warning(push)
103#pragma warning(disable: 4251)
104#endif
106 protected: std::string hUuid;
107
110
112 private: std::string nUuid;
113#ifdef _WIN32
114#pragma warning(pop)
115#endif
116 };
117
126 class GZ_TRANSPORT_VISIBLE ISubscriptionHandler
128 {
132 public: explicit ISubscriptionHandler(
133 const std::string &_nUuid,
134 const SubscribeOptions &_opts = SubscribeOptions());
135
137 public: virtual ~ISubscriptionHandler() = default;
138
143 public: virtual bool RunLocalCallback(
144 const ProtoMsg &_msg,
145 const MessageInfo &_info) = 0;
146
152 const std::string &_data,
153 const std::string &_type) const = 0;
154 };
155
160 template <typename T> class SubscriptionHandler
161 : public ISubscriptionHandler
162 {
163 // Documentation inherited.
164 public: explicit SubscriptionHandler(const std::string &_nUuid,
165 const SubscribeOptions &_opts = SubscribeOptions())
166 : ISubscriptionHandler(_nUuid, _opts)
167 {
168 }
169
170 // Documentation inherited.
172 const std::string &_data,
173 const std::string &/*_type*/) const
174 {
175 // Instantiate a specific protobuf message
176 auto msgPtr = std::make_shared<T>();
177
178 // Create the message using some serialized data
179 if (!msgPtr->ParseFromString(_data))
180 {
181 std::cerr << "SubscriptionHandler::CreateMsg() error: ParseFromString"
182 << " failed" << std::endl;
183 }
184
185 return msgPtr;
186 }
187
188 // Documentation inherited.
190 {
191 return T().GetTypeName();
192 }
193
196 public: void SetCallback(const MsgCallback<T> &_cb)
197 {
198 this->cb = _cb;
199 }
200
201 // Documentation inherited.
202 public: bool RunLocalCallback(const ProtoMsg &_msg,
203 const MessageInfo &_info)
204 {
205 // No callback stored.
206 if (!this->cb)
207 {
208 std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
209 << "Callback is NULL" << std::endl;
210 return false;
211 }
212
213 // Check the subscription throttling option.
214 if (!this->UpdateThrottling())
215 return true;
216
217#if GOOGLE_PROTOBUF_VERSION >= 4022000
218 auto msgPtr = google::protobuf::internal::DownCast<const T*>(&_msg);
219#elif GOOGLE_PROTOBUF_VERSION >= 3000000
220 auto msgPtr = google::protobuf::down_cast<const T*>(&_msg);
221#else
222 auto msgPtr = google::protobuf::internal::down_cast<const T*>(&_msg);
223#endif
224
225 this->cb(*msgPtr, _info);
226 return true;
227 }
228
230 private: MsgCallback<T> cb;
231 };
232
235 template <> class SubscriptionHandler<ProtoMsg>
236 : public ISubscriptionHandler
237 {
238 // Documentation inherited.
239 public: explicit SubscriptionHandler(const std::string &_nUuid,
240 const SubscribeOptions &_opts = SubscribeOptions())
241 : ISubscriptionHandler(_nUuid, _opts)
242 {
243 }
244
245 // Documentation inherited.
247 const std::string &_data,
248 const std::string &_type) const
249 {
251
252 const google::protobuf::Descriptor *desc =
253 google::protobuf::DescriptorPool::generated_pool()
254 ->FindMessageTypeByName(_type);
255
256 // First, check if we have the descriptor from the generated proto
257 // classes.
258 if (desc)
259 {
260 msgPtr.reset(google::protobuf::MessageFactory::generated_factory()
261 ->GetPrototype(desc)->New());
262 }
263 else
264 {
265 // Fallback on Gazebo Msgs if the message type is not found.
266 msgPtr = gz::msgs::Factory::New(_type);
267 }
268
269 if (!msgPtr)
270 return nullptr;
271
272 // Create the message using some serialized data
273 if (!msgPtr->ParseFromString(_data))
274 {
275 std::cerr << "CreateMsg() error: ParseFromString failed" << std::endl;
276 return nullptr;
277 }
278
279 return msgPtr;
280 }
281
282 // Documentation inherited.
284 {
285 return kGenericMessageType;
286 }
287
290 public: void SetCallback(const MsgCallback<ProtoMsg> &_cb)
291 {
292 this->cb = _cb;
293 }
294
295 // Documentation inherited.
296 public: bool RunLocalCallback(const ProtoMsg &_msg,
297 const MessageInfo &_info)
298 {
299 // No callback stored.
300 if (!this->cb)
301 {
302 std::cerr << "SubscriptionHandler::RunLocalCallback() "
303 << "error: Callback is NULL" << std::endl;
304 return false;
305 }
306
307 // Check the subscription throttling option.
308 if (!this->UpdateThrottling())
309 return true;
310
311 this->cb(_msg, _info);
312 return true;
313 }
314
316 private: MsgCallback<ProtoMsg> cb;
317 };
318
323 {
330 public: explicit RawSubscriptionHandler(
331 const std::string &_nUuid,
332 const std::string &_msgType = kGenericMessageType,
333 const SubscribeOptions &_opts = SubscribeOptions());
334
335 // Documentation inherited
336 public: std::string TypeName() override;
337
341 public: void SetCallback(const RawCallback &_callback);
342
349 public: bool RunRawCallback(const char *_msgData, const size_t _size,
350 const MessageInfo &_info);
351
354
355 private: class Implementation;
356
357#ifdef _WIN32
358// Disable warning C4251 which is triggered by
359// std::unique_ptr
360#pragma warning(push)
361#pragma warning(disable: 4251)
362#endif
365 private: std::unique_ptr<Implementation> pimpl;
366#ifdef _WIN32
367#pragma warning(pop)
368#endif
369 };
370 }
371 }
372}
373
374#endif