Gazebo Transport

API Reference

14.1.0
SubscriptionHandler.hh
Go to the documentation of this file.
1/*
2 * Copyright (C) 2014 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18#ifndef GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
19#define GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
20
21#ifdef _MSC_VER
22#pragma warning(push)
23#pragma warning(disable: 4251)
24#endif
25#include <google/protobuf/message.h>
26#include <google/protobuf/stubs/common.h>
27#if GOOGLE_PROTOBUF_VERSION >= 3000000 && GOOGLE_PROTOBUF_VERSION < 4022000
28#include <google/protobuf/stubs/casts.h>
29#endif
30#ifdef _MSC_VER
31#pragma warning(pop)
32#endif
33
34#include <chrono>
35#include <iostream>
36#include <memory>
37#include <string>
38#include <utility>
39
40#include <gz/msgs/Factory.hh>
41
42#include "gz/transport/config.hh"
43#include "gz/transport/Export.hh"
47#include "gz/transport/Uuid.hh"
48
49namespace gz::transport
50{
51 // Inline bracket to help doxygen filtering.
52 inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
53 //
56 class GZ_TRANSPORT_VISIBLE SubscriptionHandlerBase
57 {
61 public: explicit SubscriptionHandlerBase(
62 const std::string &_nUuid,
63 const SubscribeOptions &_opts = SubscribeOptions());
64
66 public: virtual ~SubscriptionHandlerBase() = default;
67
71 public: virtual std::string TypeName() = 0;
72
75 public: std::string NodeUuid() const;
76
79 public: std::string HandlerUuid() const;
80
83 public: bool IgnoreLocalMessages() const;
84
88 protected: bool UpdateThrottling();
89
92
95 protected: double periodNs;
96
97#ifdef _WIN32
98// Disable warning C4251 which is triggered by
99// std::*
100#pragma warning(push)
101#pragma warning(disable: 4251)
102#endif
104 protected: std::string hUuid;
105
108
110 private: std::string nUuid;
111#ifdef _WIN32
112#pragma warning(pop)
113#endif
114 };
115
124 class GZ_TRANSPORT_VISIBLE ISubscriptionHandler
126 {
130 public: explicit ISubscriptionHandler(
131 const std::string &_nUuid,
132 const SubscribeOptions &_opts = SubscribeOptions());
133
135 public: virtual ~ISubscriptionHandler() = default;
136
141 public: virtual bool RunLocalCallback(
142 const ProtoMsg &_msg,
143 const MessageInfo &_info) = 0;
144
150 const std::string &_data,
151 const std::string &_type) const = 0;
152 };
153
158 template <typename T> class SubscriptionHandler
159 : public ISubscriptionHandler
160 {
161 // Documentation inherited.
162 public: explicit SubscriptionHandler(const std::string &_nUuid,
163 const SubscribeOptions &_opts = SubscribeOptions())
164 : ISubscriptionHandler(_nUuid, _opts)
165 {
166 }
167
168 // Documentation inherited.
170 const std::string &_data,
171 const std::string &/*_type*/) const
172 {
173 // Instantiate a specific protobuf message
174 auto msgPtr = std::make_shared<T>();
175
176 // Create the message using some serialized data
177 if (!msgPtr->ParseFromString(_data))
178 {
179 std::cerr << "SubscriptionHandler::CreateMsg() error: ParseFromString"
180 << " failed" << std::endl;
181 }
182
183 return msgPtr;
184 }
185
186 // Documentation inherited.
188 {
189 return std::string(T().GetTypeName());
190 }
191
194 public: void SetCallback(const MsgCallback<T> &_cb)
195 {
196 this->cb = _cb;
197 }
198
199 // Documentation inherited.
200 public: bool RunLocalCallback(const ProtoMsg &_msg,
201 const MessageInfo &_info)
202 {
203 // No callback stored.
204 if (!this->cb)
205 {
206 std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
207 << "Callback is NULL" << std::endl;
208 return false;
209 }
210
211 // Check the subscription throttling option.
212 if (!this->UpdateThrottling())
213 return true;
214
215#if GOOGLE_PROTOBUF_VERSION >= 5028000
216 auto msgPtr = google::protobuf::DynamicCastMessage<T>(&_msg);
217#elif GOOGLE_PROTOBUF_VERSION >= 4022000
218 auto msgPtr = google::protobuf::internal::DownCast<const T*>(&_msg);
219#elif GOOGLE_PROTOBUF_VERSION >= 3000000
220 auto msgPtr = google::protobuf::down_cast<const T*>(&_msg);
221#else
222 auto msgPtr = google::protobuf::internal::down_cast<const T*>(&_msg);
223#endif
224
225 // Verify the dynamically casted message is valid
226 if (msgPtr == nullptr)
227 {
228 if (_msg.GetDescriptor() != nullptr)
229 {
230 std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
231 << "Failed to cast the message of the type "
232 << _msg.GetDescriptor()->full_name()
233 << " to the specified type" << '\n';
234 }
235 else
236 {
237 std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
238 << "Failed to cast the message of an unknown type"
239 << " to the specified type" << '\n';
240 }
241 std::cerr.flush();
242 return false;
243 }
244
245 this->cb(*msgPtr, _info);
246 return true;
247 }
248
250 private: MsgCallback<T> cb;
251 };
252
255 template <> class SubscriptionHandler<ProtoMsg>
256 : public ISubscriptionHandler
257 {
258 // Documentation inherited.
259 public: explicit SubscriptionHandler(const std::string &_nUuid,
260 const SubscribeOptions &_opts = SubscribeOptions())
261 : ISubscriptionHandler(_nUuid, _opts)
262 {
263 }
264
265 // Documentation inherited.
267 const std::string &_data,
268 const std::string &_type) const
269 {
271
272 const google::protobuf::Descriptor *desc =
273 google::protobuf::DescriptorPool::generated_pool()
274 ->FindMessageTypeByName(_type);
275
276 // First, check if we have the descriptor from the generated proto
277 // classes.
278 if (desc)
279 {
280 msgPtr.reset(google::protobuf::MessageFactory::generated_factory()
281 ->GetPrototype(desc)->New());
282 }
283 else
284 {
285 // Fallback on Gazebo Msgs if the message type is not found.
286 msgPtr = gz::msgs::Factory::New(_type);
287 }
288
289 if (!msgPtr)
290 return nullptr;
291
292 // Create the message using some serialized data
293 if (!msgPtr->ParseFromString(_data))
294 {
295 std::cerr << "CreateMsg() error: ParseFromString failed" << std::endl;
296 return nullptr;
297 }
298
299 return msgPtr;
300 }
301
302 // Documentation inherited.
304 {
305 return kGenericMessageType;
306 }
307
310 public: void SetCallback(const MsgCallback<ProtoMsg> &_cb)
311 {
312 this->cb = _cb;
313 }
314
315 // Documentation inherited.
316 public: bool RunLocalCallback(const ProtoMsg &_msg,
317 const MessageInfo &_info)
318 {
319 // No callback stored.
320 if (!this->cb)
321 {
322 std::cerr << "SubscriptionHandler::RunLocalCallback() "
323 << "error: Callback is NULL" << std::endl;
324 return false;
325 }
326
327 // Check the subscription throttling option.
328 if (!this->UpdateThrottling())
329 return true;
330
331 this->cb(_msg, _info);
332 return true;
333 }
334
336 private: MsgCallback<ProtoMsg> cb;
337 };
338
343 {
350 public: explicit RawSubscriptionHandler(
351 const std::string &_nUuid,
352 const std::string &_msgType = kGenericMessageType,
353 const SubscribeOptions &_opts = SubscribeOptions());
354
355 // Documentation inherited
356 public: std::string TypeName() override;
357
361 public: void SetCallback(const RawCallback &_callback);
362
369 public: bool RunRawCallback(const char *_msgData, const size_t _size,
370 const MessageInfo &_info);
371
374
375 private: class Implementation;
376
377#ifdef _WIN32
378// Disable warning C4251 which is triggered by
379// std::unique_ptr
380#pragma warning(push)
381#pragma warning(disable: 4251)
382#endif
385 private: std::unique_ptr<Implementation> pimpl;
386#ifdef _WIN32
387#pragma warning(pop)
388#endif
389 };
390 }
391}
392
393#endif