Gazebo Transport

API Reference

15.0.0~pre1
SubscriptionHandler.hh
Go to the documentation of this file.
1/*
2 * Copyright (C) 2014 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18#ifndef GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
19#define GZ_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
20
21#ifdef _MSC_VER
22#pragma warning(push)
23#pragma warning(disable: 4251)
24#endif
25#include <google/protobuf/message.h>
26#include <google/protobuf/stubs/common.h>
27#if GOOGLE_PROTOBUF_VERSION >= 3000000 && GOOGLE_PROTOBUF_VERSION < 4022000
28#include <google/protobuf/stubs/casts.h>
29#endif
30#ifdef _MSC_VER
31#pragma warning(pop)
32#endif
33
34#include <chrono>
35#include <iostream>
36#include <memory>
37#include <string>
38#include <utility>
39
40#include <gz/msgs/Factory.hh>
41
42#include "gz/transport/config.hh"
43#include "gz/transport/Export.hh"
48#include "gz/transport/Uuid.hh"
49
50namespace zenoh
51{
52 // Forward declaration.
53 class Session;
54}
55
56namespace gz::transport
57{
58 // Inline bracket to help doxygen filtering.
59 inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
60 //
62 class SubscriptionHandlerBasePrivate;
63
66 class GZ_TRANSPORT_VISIBLE SubscriptionHandlerBase
67 {
72 public: explicit SubscriptionHandlerBase(
73 const std::string &_pUuid,
74 const std::string &_nUuid,
75 const SubscribeOptions &_opts = SubscribeOptions());
76
78 public: virtual ~SubscriptionHandlerBase();
79
83 public: virtual std::string TypeName() = 0;
84
87 public: std::string ProcUuid() const;
88
91 public: std::string NodeUuid() const;
92
95 public: std::string HandlerUuid() const;
96
99 public: bool IgnoreLocalMessages() const;
100
104 protected: bool UpdateThrottling();
105
108
111 protected: double periodNs;
112
113#ifdef _WIN32
114// Disable warning C4251 which is triggered by
115// std::*
116#pragma warning(push)
117#pragma warning(disable: 4251)
118#endif
121#ifdef _WIN32
122#pragma warning(pop)
123#endif
124 };
125
134 class GZ_TRANSPORT_VISIBLE ISubscriptionHandler
136 {
141 public: explicit ISubscriptionHandler(
142 const std::string &_pUuid,
143 const std::string &_nUuid,
144 const SubscribeOptions &_opts = SubscribeOptions());
145
147 public: virtual ~ISubscriptionHandler() = default;
148
153 public: virtual bool RunLocalCallback(
154 const ProtoMsg &_msg,
155 const MessageInfo &_info) = 0;
156
162 const std::string &_data,
163 const std::string &_type) const = 0;
164
165#ifdef HAVE_ZENOH
169 public: void CreateGenericZenohSubscriber(
171 const FullyQualifiedTopic &_fullyQualifiedTopic);
172#endif
173 };
174
179 template <typename T> class SubscriptionHandler
180 : public ISubscriptionHandler
181 {
182 // Documentation inherited.
183 public: explicit SubscriptionHandler(const std::string &_pUuid,
184 const std::string &_nUuid,
185 const SubscribeOptions &_opts = SubscribeOptions())
186 : ISubscriptionHandler(_pUuid, _nUuid, _opts)
187 {
188 }
189
190 // Documentation inherited.
192 const std::string &_data,
193 const std::string &/*_type*/) const
194 {
195 // Instantiate a specific protobuf message
196 auto msgPtr = std::make_shared<T>();
197
198 // Create the message using some serialized data
199 if (!msgPtr->ParseFromString(_data))
200 {
201 std::cerr << "SubscriptionHandler::CreateMsg() error: ParseFromString"
202 << " failed" << std::endl;
203 }
204
205 return msgPtr;
206 }
207
208 // Documentation inherited.
210 {
211 return std::string(T().GetTypeName());
212 }
213
216 public: void SetCallback(const MsgCallback<T> &_cb)
217 {
218 this->cb = _cb;
219 }
220
221#ifdef HAVE_ZENOH
226 public: void SetCallback(const MsgCallback<T> &_cb,
228 const FullyQualifiedTopic &_fullyQualifiedTopic)
229 {
230 this->SetCallback(std::move(_cb));
231 this->CreateGenericZenohSubscriber(_session, _fullyQualifiedTopic);
232 }
233#endif
234
235 // Documentation inherited.
236 public: bool RunLocalCallback(const ProtoMsg &_msg,
237 const MessageInfo &_info)
238 {
239 // No callback stored.
240 if (!this->cb)
241 {
242 std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
243 << "Callback is NULL" << std::endl;
244 return false;
245 }
246
247 // Check the subscription throttling option.
248 if (!this->UpdateThrottling())
249 return true;
250
251#if GOOGLE_PROTOBUF_VERSION >= 5028000
252 auto msgPtr = google::protobuf::DynamicCastMessage<T>(&_msg);
253#elif GOOGLE_PROTOBUF_VERSION >= 4022000
254 auto msgPtr = google::protobuf::internal::DownCast<const T*>(&_msg);
255#elif GOOGLE_PROTOBUF_VERSION >= 3000000
256 auto msgPtr = google::protobuf::down_cast<const T*>(&_msg);
257#else
258 auto msgPtr = google::protobuf::internal::down_cast<const T*>(&_msg);
259#endif
260
261 // Verify the dynamically casted message is valid
262 if (msgPtr == nullptr)
263 {
264 if (_msg.GetDescriptor() != nullptr)
265 {
266 std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
267 << "Failed to cast the message of the type "
268 << _msg.GetDescriptor()->full_name()
269 << " to the specified type" << '\n';
270 }
271 else
272 {
273 std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
274 << "Failed to cast the message of an unknown type"
275 << " to the specified type" << '\n';
276 }
277 std::cerr.flush();
278 return false;
279 }
280
281 this->cb(*msgPtr, _info);
282 return true;
283 }
284
286 private: MsgCallback<T> cb;
287 };
288
291 template <> class SubscriptionHandler<ProtoMsg>
292 : public ISubscriptionHandler
293 {
294 // Documentation inherited.
295 public: explicit SubscriptionHandler(const std::string &_pUuid,
296 const std::string &_nUuid,
297 const SubscribeOptions &_opts = SubscribeOptions())
298 : ISubscriptionHandler(_pUuid, _nUuid, _opts)
299 {
300 }
301
302 // Documentation inherited.
304 const std::string &_data,
305 const std::string &_type) const
306 {
308
309 const google::protobuf::Descriptor *desc =
310 google::protobuf::DescriptorPool::generated_pool()
311 ->FindMessageTypeByName(_type);
312
313 // First, check if we have the descriptor from the generated proto
314 // classes.
315 if (desc)
316 {
317 msgPtr.reset(google::protobuf::MessageFactory::generated_factory()
318 ->GetPrototype(desc)->New());
319 }
320 else
321 {
322 // Fallback on Gazebo Msgs if the message type is not found.
323 msgPtr = gz::msgs::Factory::New(_type);
324 }
325
326 if (!msgPtr)
327 return nullptr;
328
329 // Create the message using some serialized data
330 if (!msgPtr->ParseFromString(_data))
331 {
332 std::cerr << "CreateMsg() error: ParseFromString failed" << std::endl;
333 return nullptr;
334 }
335
336 return msgPtr;
337 }
338
339 // Documentation inherited.
341 {
342 return kGenericMessageType;
343 }
344
347 public: void SetCallback(const MsgCallback<ProtoMsg> &_cb)
348 {
349 this->cb = _cb;
350 }
351
352#ifdef HAVE_ZENOH
357 public: void SetCallback(const MsgCallback<ProtoMsg> &_cb,
359 const FullyQualifiedTopic &_fullyQualifiedTopic)
360 {
361 this->SetCallback(std::move(_cb));
362 this->CreateGenericZenohSubscriber(_session, _fullyQualifiedTopic);
363 }
364#endif
365
366 // Documentation inherited.
367 public: bool RunLocalCallback(const ProtoMsg &_msg,
368 const MessageInfo &_info)
369 {
370 // No callback stored.
371 if (!this->cb)
372 {
373 std::cerr << "SubscriptionHandler::RunLocalCallback() "
374 << "error: Callback is NULL" << std::endl;
375 return false;
376 }
377
378 // Check the subscription throttling option.
379 if (!this->UpdateThrottling())
380 return true;
381
382 this->cb(_msg, _info);
383 return true;
384 }
385
387 private: MsgCallback<ProtoMsg> cb;
388 };
389
394 {
402 public: explicit RawSubscriptionHandler(
403 const std::string &_pUuid,
404 const std::string &_nUuid,
405 const std::string &_msgType = kGenericMessageType,
406 const SubscribeOptions &_opts = SubscribeOptions());
407
408 // Documentation inherited
409 public: std::string TypeName() override;
410
414 public: void SetCallback(const RawCallback &_callback);
415
416#ifdef HAVE_ZENOH
421 public: void SetCallback(const RawCallback &_cb,
423 const FullyQualifiedTopic &_fullyQualifiedTopic);
424#endif
425
432 public: bool RunRawCallback(const char *_msgData, const size_t _size,
433 const MessageInfo &_info);
434
437
438 private: class Implementation;
439
440#ifdef _WIN32
441// Disable warning C4251 which is triggered by
442// std::unique_ptr
443#pragma warning(push)
444#pragma warning(disable: 4251)
445#endif
448 private: std::unique_ptr<Implementation> pimpl;
449#ifdef _WIN32
450#pragma warning(pop)
451#endif
452 };
453 }
454}
455
456#endif