Connection.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
19 
20 #include <tbb/task.h>
21 #include <google/protobuf/message.h>
22 
23 #include <boost/asio.hpp>
24 #include <boost/bind.hpp>
25 #include <boost/function.hpp>
26 #include <boost/thread.hpp>
27 #include <boost/tuple/tuple.hpp>
28 
29 #include <string>
30 #include <vector>
31 #include <iostream>
32 #include <iomanip>
33 #include <deque>
34 #include <utility>
35 
36 #include "gazebo/common/Event.hh"
37 #include "gazebo/common/Console.hh"
39 #include "gazebo/util/system.hh"
40 
41 #define HEADER_LENGTH 8
42 
43 namespace gazebo
44 {
45  namespace transport
46  {
47  extern GZ_TRANSPORT_VISIBLE bool is_stopped();
48 
49  class IOManager;
50  class Connection;
51  typedef boost::shared_ptr<Connection> ConnectionPtr;
52 
56  class GZ_TRANSPORT_VISIBLE ConnectionReadTask : public tbb::task
57  {
62  public: ConnectionReadTask(
63  boost::function<void (const std::string &)> _func,
64  const std::string &_data) :
65  func(_func),
66  data(_data)
67  {
68  }
69 
72  public: tbb::task *execute()
73  {
74  this->func(this->data);
75  return NULL;
76  }
77 
79  private: boost::function<void (const std::string &)> func;
80 
82  private: std::string data;
83  };
85 
100  class GZ_TRANSPORT_VISIBLE Connection :
101  public boost::enable_shared_from_this<Connection>
102  {
104  public: Connection();
105 
107  public: virtual ~Connection();
108 
113  public: bool Connect(const std::string &_host, unsigned int _port);
114 
116  typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
117 
122  public: void Listen(unsigned int _port, const AcceptCallback &_acceptCB);
123 
125  typedef boost::function<void(const std::string &_data)> ReadCallback;
126 
130  public: void StartRead(const ReadCallback &_cb);
131 
133  public: void StopRead();
134 
136  public: void Shutdown();
137 
140  public: bool IsOpen() const;
141 
143  private: void Close();
144 
146  public: void Cancel();
147 
151  public: bool Read(std::string &_data);
152 
160  public: void EnqueueMsg(const std::string &_buffer,
161  boost::function<void(uint32_t)> _cb, uint32_t _id,
162  bool _force = false);
163 
168  public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
169 
172  public: std::string GetLocalURI() const;
173 
176  public: std::string GetRemoteURI() const;
177 
180  public: std::string GetLocalAddress() const;
181 
184  public: unsigned int GetLocalPort() const;
185 
188  public: std::string GetRemoteAddress() const;
189 
192  public: unsigned int GetRemotePort() const;
193 
196  public: std::string GetRemoteHostname() const;
197 
200  public: static std::string GetLocalHostname();
201 
204  public: template<typename Handler>
205  void AsyncRead(Handler _handler)
206  {
207  boost::mutex::scoped_lock lock(this->socketMutex);
208  if (!this->IsOpen())
209  {
210  gzerr << "AsyncRead on a closed socket\n";
211  return;
212  }
213 
214  void (Connection::*f)(const boost::system::error_code &,
215  boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
216 
217  this->inboundHeader.resize(HEADER_LENGTH);
218  boost::asio::async_read(*this->socket,
219  boost::asio::buffer(this->inboundHeader),
220  boost::bind(f, this,
221  boost::asio::placeholders::error,
222  boost::make_tuple(_handler)));
223  }
224 
232  private: template<typename Handler>
233  void OnReadHeader(const boost::system::error_code &_e,
234  boost::tuple<Handler> _handler)
235  {
236  if (_e)
237  {
238  if (_e.message() == "End of file")
239  this->isOpen = false;
240  }
241  else
242  {
243  std::size_t inboundData_size = 0;
244  std::string header(&this->inboundHeader[0],
245  this->inboundHeader.size());
246  this->inboundHeader.clear();
247 
248  inboundData_size = this->ParseHeader(header);
249 
250  if (inboundData_size > 0)
251  {
252  // Start the asynchronous call to receive data
253  this->inboundData.resize(inboundData_size);
254 
255  void (Connection::*f)(const boost::system::error_code &e,
256  boost::tuple<Handler>) =
257  &Connection::OnReadData<Handler>;
258 
259  boost::asio::async_read(*this->socket,
260  boost::asio::buffer(this->inboundData),
261  boost::bind(f, this,
262  boost::asio::placeholders::error,
263  _handler));
264  }
265  else
266  {
267  gzerr << "Header is empty\n";
268  boost::get<0>(_handler)("");
269  // This code tries to read the header again. We should
270  // never get here.
271  // this->inboundHeader.resize(HEADER_LENGTH);
272 
273  // void (Connection::*f)(const boost::system::error_code &,
274  // boost::tuple<Handler>) =
275  // &Connection::OnReadHeader<Handler>;
276 
277  // boost::asio::async_read(*this->socket,
278  // boost::asio::buffer(this->inboundHeader),
279  // boost::bind(f, this,
280  // boost::asio::placeholders::error, _handler));
281  }
282  }
283  }
284 
292  private: template<typename Handler>
293  void OnReadData(const boost::system::error_code &_e,
294  boost::tuple<Handler> _handler)
295  {
296  if (_e)
297  {
298  if (_e.message() == "End of file")
299  this->isOpen = false;
300  }
301 
302  // Inform caller that data has been received
303  std::string data(&this->inboundData[0],
304  this->inboundData.size());
305  this->inboundData.clear();
306 
307  if (data.empty())
308  gzerr << "OnReadData got empty data!!!\n";
309 
310  if (!_e && !transport::is_stopped())
311  {
312  ConnectionReadTask *task = new(tbb::task::allocate_root())
313  ConnectionReadTask(boost::get<0>(_handler), data);
314  tbb::task::enqueue(*task);
315 
316  // Non-tbb version:
317  // boost::get<0>(_handler)(data);
318  }
319  }
320 
324  public: event::ConnectionPtr ConnectToShutdown(boost::function<void()>
325  _subscriber)
326  { return this->shutdown.Connect(_subscriber); }
327 
333  public: void DisconnectShutdown(event::ConnectionPtr _subscriber)
334  GAZEBO_DEPRECATED(8.0)
335  {this->shutdown.Disconnect(_subscriber->Id());}
336 
338  public: void ProcessWriteQueue(bool _blocking = false);
339 
342  public: unsigned int GetId() const;
343 
347  public: static bool ValidateIP(const std::string &_ip);
348 
352  public: std::string GetIPWhiteList() const;
353 
357  private: void OnWrite(const boost::system::error_code &_e);
358 
361  private: void OnAccept(const boost::system::error_code &_e);
362 
365  private: std::size_t ParseHeader(const std::string &_header);
366 
368  private: void ReadLoop(const ReadCallback &_cb);
369 
372  private: static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
373 
376  private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
377 
380  private: static std::string GetHostname(
381  boost::asio::ip::tcp::endpoint _ep);
382 
386  private: void OnConnect(const boost::system::error_code &_error,
387  boost::asio::ip::tcp::resolver::iterator _endPointIter);
388 
390  private: boost::asio::ip::tcp::socket *socket;
391 
393  private: boost::asio::ip::tcp::acceptor *acceptor;
394 
396  private: std::deque<std::string> writeQueue;
397 
400  private: std::deque<
401  std::pair<boost::function<void(uint32_t)>, uint32_t> > callbacks;
402 
404  private: boost::mutex connectMutex;
405 
407  private: boost::recursive_mutex writeMutex;
408 
410  private: boost::recursive_mutex readMutex;
411 
413  private: mutable boost::mutex socketMutex;
414 
416  private: boost::condition_variable connectCondition;
417 
419  private: AcceptCallback acceptCB;
420 
422  private: std::vector<char> inboundHeader;
423 
425  private: std::vector<char> inboundData;
426 
428  private: bool readQuit;
429 
431  private: unsigned int id;
432 
434  private: static unsigned int idCounter;
435 
437  private: ConnectionPtr acceptConn;
438 
441 
443  private: static IOManager *iomanager;
444 
446  private: unsigned int writeCount;
447 
449  private: std::string localURI;
450 
452  private: std::string localAddress;
453 
455  private: std::string remoteURI;
456 
458  private: std::string remoteAddress;
459 
461  private: bool connectError;
462 
464  private: std::string ipWhiteList;
465 
467  private: char *headerBuffer;
468 
470  private: bool dropMsgLogged;
471 
474  private: unsigned int callbackIndex;
475 
477  private: bool isOpen;
478  };
480  }
481 }
482 #endif
boost::function< void(const ConnectionPtr &)> AcceptCallback
The signature of a connection accept callback.
Definition: Connection.hh:116
Forward declarations for the common classes.
Definition: Animation.hh:33
boost::function< void(const std::string &_data)> ReadCallback
The signature of a connection read callback.
Definition: Connection.hh:125
#define HEADER_LENGTH
Definition: Connection.hh:41
#define gzerr
Output an error message.
Definition: Console.hh:50
Manages boost::asio IO.
Definition: IOManager.hh:33
void DisconnectShutdown(event::ConnectionPtr _subscriber) GAZEBO_DEPRECATED(8.0)
Unregister a function to be called when the connection is shut down.
Definition: Connection.hh:333
event::ConnectionPtr ConnectToShutdown(boost::function< void()> _subscriber)
Register a function to be called when the connection is shut down.
Definition: Connection.hh:324
boost::shared_ptr< Connection > ConnectionPtr
Definition: Connection.hh:50
GAZEBO_VISIBLE bool shutdown()
Stop and cleanup simulation.
boost::shared_ptr< Connection > ConnectionPtr
Definition: CommonTypes.hh:134
#define NULL
Definition: CommonTypes.hh:31
#define GAZEBO_DEPRECATED(version)
Definition: system.hh:302
void AsyncRead(Handler _handler)
Peform an asyncronous read param[in] _handler Callback to invoke on received data.
Definition: Connection.hh:205
bool is_stopped()
Is the transport system stopped?
Single TCP/IP connection manager.
Definition: Connection.hh:100