Fawkes API  Fawkes Development Version
sick_tim55x_ethernet_aqt.cpp
1 
2 /***************************************************************************
3  * sick_tim55x_ethernet_aqt.cpp - Retrieve data from Sick TiM55x via Ethernet
4  *
5  * Created: Sun Jun 15 20:45:42 2014
6  * Copyright 2008-2014 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "sick_tim55x_ethernet_aqt.h"
24 
25 #include <core/threading/mutex.h>
26 #include <core/threading/mutex_locker.h>
27 #include <utils/misc/string_split.h>
28 #include <utils/math/angle.h>
29 
30 #include <boost/lexical_cast.hpp>
31 #include <boost/lambda/bind.hpp>
32 #include <boost/lambda/lambda.hpp>
33 #if BOOST_VERSION < 104800
34 # include <boost/bind.hpp>
35 #endif
36 #include <cstdlib>
37 #include <cstdio>
38 #include <cstring>
39 #include <unistd.h>
40 
41 using namespace fawkes;
42 
43 #define RECONNECT_INTERVAL 1000
44 #define RECEIVE_TIMEOUT 500
45 
46 /** @class SickTiM55xEthernetAcquisitionThread "sick_tim55x_ethernet_aqt.h"
47  * Laser acqusition thread for Sick TiM55x laser range finders.
48  * This thread fetches the data from the laser.
49  * @author Tim Niemueller
50  */
51 
52 
53 /** Constructor.
54  * @param cfg_name short name of configuration group
55  * @param cfg_prefix configuration path prefix
56  */
58  std::string &cfg_prefix)
59  : SickTiM55xCommonAcquisitionThread(cfg_name, cfg_prefix),
60  socket_(io_service_), deadline_(io_service_), soft_deadline_(io_service_)
61 {
62  set_name("SickTiM55x(%s)", cfg_name.c_str());
63 }
64 
65 void
67 {
69 
70  cfg_host_ = config->get_string((cfg_prefix_ + "host").c_str());
71  cfg_port_ = config->get_string((cfg_prefix_ + "port").c_str());
72 
73  socket_mutex_ = new Mutex();
74 
75  deadline_.expires_at(boost::posix_time::pos_infin);
76  check_deadline();
77 
78  soft_deadline_.expires_at(boost::posix_time::pos_infin);
79  check_soft_timeout();
80 
81  init_device();
82 
84 }
85 
86 
87 void
89 {
90  free(_distances);
91  _distances = NULL;
92 
93  delete socket_mutex_;
94 }
95 
96 
97 void
99 {
100  if (socket_.is_open()) {
101  try {
102  deadline_.expires_from_now(boost::posix_time::milliseconds(RECEIVE_TIMEOUT));
103 
104  ec_ = boost::asio::error::would_block;
105  bytes_read_ = 0;
106 
107  boost::asio::async_read_until(socket_, input_buffer_, '\03',
108 #if BOOST_VERSION >= 104800
109  (boost::lambda::var(ec_) = boost::lambda::_1,
110  boost::lambda::var(bytes_read_) = boost::lambda::_2));
111 #else
112  boost::bind(
113  &SickTiM55xEthernetAcquisitionThread::handle_read,
114  this,
115  boost::asio::placeholders::error,
116  boost::asio::placeholders::bytes_transferred
117  ));
118 #endif
119 
120  do io_service_.run_one(); while (ec_ == boost::asio::error::would_block);
121 
122  reset_distances();
123  reset_echoes();
124 
125  if (ec_) {
126  if (ec_.value() == boost::system::errc::operation_canceled) {
127  logger->log_error(name(), "Data timeout, will try to reconnect");
128  } else {
129  logger->log_warn(name(), "Data read error: %s\n", ec_.message().c_str());
130  }
131  _data_mutex->lock();
132  _timestamp->stamp();
133  _new_data = true;
134  _data_mutex->unlock();
135  close_device();
136 
137  } else {
138  deadline_.expires_at(boost::posix_time::pos_infin);
139 
140  unsigned char recv_buf[bytes_read_];
141  std::istream in_stream(&input_buffer_);
142  in_stream.read((char *)recv_buf, bytes_read_);
143 
144  if (bytes_read_ > 0) {
145  try {
146  parse_datagram(recv_buf, bytes_read_);
147  } catch (Exception &e) {
148  logger->log_warn(name(), "Failed to parse datagram, resyncing, exception follows");
149  logger->log_warn(name(), e);
150  resync();
151  }
152  }
153  }
154  } catch (boost::system::system_error &e) {
155  if (e.code() == boost::asio::error::eof) {
156  close_device();
157  logger->log_warn(name(),
158  "Sick TiM55x/Ethernet connection lost, trying to reconnect");
159  } else {
160  logger->log_warn(name(), "Sick TiM55x/Ethernet failed read: %s", e.what());
161  }
162  }
163  } else {
164  try {
165  init_device();
166  logger->log_warn(name(), "Reconnected to device");
167  } catch (Exception &e) {
168  // ignore, keep trying
169  usleep(RECONNECT_INTERVAL * 1000);
170  }
171  }
172 
173  yield();
174 }
175 
176 
177 void
178 SickTiM55xEthernetAcquisitionThread::open_device()
179 {
180  try {
181  boost::asio::ip::tcp::resolver resolver(io_service_);
182  boost::asio::ip::tcp::resolver::query
183  query(cfg_host_, cfg_port_);
184  boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);
185 
186  // this is just the overly complicated way to get a timeout on
187  // a synchronous connect, cf.
188  // http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/example/cpp03/timeouts/blocking_tcp_client.cpp
189 
190  deadline_.expires_from_now(boost::posix_time::seconds(5));
191 
192  boost::system::error_code ec;
193 
194  for (; iter != boost::asio::ip::tcp::resolver::iterator(); ++iter) {
195  socket_.close();
196  ec_ = boost::asio::error::would_block;
197 #if BOOST_VERSION >= 104800
198  socket_.async_connect(iter->endpoint(), boost::lambda::var(ec_) = boost::lambda::_1);
199 #else
200  socket_.async_connect(iter->endpoint(),
201  boost::bind(&SickTiM55xEthernetAcquisitionThread::handle_read, this,
202  boost::asio::placeholders::error, 0));
203 #endif
204 
205  // Block until the asynchronous operation has completed.
206  do io_service_.run_one(); while (ec_ == boost::asio::error::would_block);
207 
208  // Determine whether a connection was successfully established.
209  if (ec_ || ! socket_.is_open()) {
210  if (ec_.value() == boost::system::errc::operation_canceled) {
211  throw Exception("Sick TiM55X Ethernet: connection timed out");
212  } else {
213  throw Exception("Connection failed: %s", ec_.message().c_str());
214  }
215  }
216  deadline_.expires_at(boost::posix_time::pos_infin);
217  }
218  } catch (boost::system::system_error &e) {
219  throw Exception("Connection failed: %s", e.what());
220  }
221 }
222 
223 
224 void
225 SickTiM55xEthernetAcquisitionThread::close_device()
226 {
227  boost::system::error_code err;
228  if (socket_.is_open()) {
229  socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, err);
230  socket_.close();
231  }
232 }
233 
234 
235 void
236 SickTiM55xEthernetAcquisitionThread::flush_device()
237 {
238  if (socket_.is_open()) {
239  try {
240  soft_deadline_.expires_from_now(boost::posix_time::milliseconds(RECEIVE_TIMEOUT));
241  do {
242  ec_ = boost::asio::error::would_block;
243  bytes_read_ = 0;
244 
245  boost::asio::async_read_until(socket_, input_buffer_, '\03',
246 #if BOOST_VERSION >= 104800
247  (boost::lambda::var(ec_) = boost::lambda::_1,
248  boost::lambda::var(bytes_read_) = boost::lambda::_2));
249 #else
250  boost::bind(
251  &SickTiM55xEthernetAcquisitionThread::handle_read,
252  this,
253  boost::asio::placeholders::error,
254  boost::asio::placeholders::bytes_transferred
255  ));
256 #endif
257 
258  do io_service_.run_one(); while (ec_ == boost::asio::error::would_block);
259 
260  } while (bytes_read_ > 0);
261  soft_deadline_.expires_from_now(boost::posix_time::pos_infin);
262  } catch (boost::system::system_error &e) {
263  // ignore, just assume done, if there really is an error we'll
264  // catch it later on
265  }
266  }
267 }
268 
269 void
270 SickTiM55xEthernetAcquisitionThread::send_with_reply(const char *request,
271  std::string *reply)
272 {
273  MutexLocker lock(socket_mutex_);
274 
275  int request_length = strlen(request);
276 
277  try {
278  boost::asio::write(socket_, boost::asio::buffer(request, request_length));
279 
280  deadline_.expires_from_now(boost::posix_time::milliseconds(RECEIVE_TIMEOUT));
281 
282  ec_ = boost::asio::error::would_block;
283  bytes_read_ = 0;
284  boost::asio::async_read_until(socket_, input_buffer_, '\03',
285 #if BOOST_VERSION >= 104800
286  (boost::lambda::var(ec_) = boost::lambda::_1,
287  boost::lambda::var(bytes_read_) = boost::lambda::_2));
288 #else
289  boost::bind(
290  &SickTiM55xEthernetAcquisitionThread::handle_read,
291  this,
292  boost::asio::placeholders::error,
293  boost::asio::placeholders::bytes_transferred
294  ));
295 #endif
296 
297  do io_service_.run_one(); while (ec_ == boost::asio::error::would_block);
298 
299  if (ec_) {
300  if (ec_.value() == boost::system::errc::operation_canceled) {
301  throw Exception("Timeout waiting for message reply");
302  } else {
303  throw Exception("Failed to read reply: %s", ec_.message().c_str());
304  }
305  }
306 
307  deadline_.expires_at(boost::posix_time::pos_infin);
308 
309  if (reply) {
310  char recv_buf[bytes_read_];
311  std::istream in_stream(&input_buffer_);
312  in_stream.read(recv_buf, bytes_read_);
313  *reply = std::string(recv_buf, bytes_read_);
314  } else {
315  input_buffer_.consume(bytes_read_);
316  }
317  } catch (boost::system::system_error &e) {
318  throw Exception("Sick TiM55x/Ethernet failed I/O: %s", e.what());
319  }
320 }
321 
322 
323 /** Check whether the deadline has passed.
324  * We compare the deadline against
325  * the current time since a new asynchronous operation may have moved the
326  * deadline before this actor had a chance to run.
327  */
328 void
329 SickTiM55xEthernetAcquisitionThread::check_deadline()
330 {
331  if (deadline_.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
332  socket_.close();
333  deadline_.expires_at(boost::posix_time::pos_infin);
334  }
335 
336 #if BOOST_VERSION >= 104800
337  deadline_.async_wait(boost::lambda::bind(&SickTiM55xEthernetAcquisitionThread::check_deadline, this));
338 #else
339  deadline_.async_wait(boost::bind(&SickTiM55xEthernetAcquisitionThread::check_deadline, this));
340 #endif
341 }
342 
343 /** Check whether the soft timeout deadline has passed.
344  * We compare the deadline against the current time since a new
345  * asynchronous operation may have moved the deadline before this
346  * actor had a chance to run.
347  */
348 void
349 SickTiM55xEthernetAcquisitionThread::check_soft_timeout()
350 {
351  if (soft_deadline_.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
352  socket_.cancel();
353  soft_deadline_.expires_at(boost::posix_time::pos_infin);
354  }
355 
356 #if BOOST_VERSION >= 104800
357  soft_deadline_.async_wait(boost::lambda::bind(&SickTiM55xEthernetAcquisitionThread::check_soft_timeout, this));
358 #else
359  soft_deadline_.async_wait(boost::bind(&SickTiM55xEthernetAcquisitionThread::check_soft_timeout, this));
360 #endif
361 }
Laser acqusition thread for Sick TiM55x laser range finders.
void resync()
Resynchronize to laser data.
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
Mutex locking helper.
Definition: mutex_locker.h:33
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
void reset_distances()
Reset all distance values to NaN.
virtual void finalize()
Finalize the thread.
virtual void pre_init(fawkes::Configuration *config, fawkes::Logger *logger)
Pre initialization.
fawkes::Time * _timestamp
Time when the most recent data was received.
fawkes::Mutex * _data_mutex
Lock while writing to distances or echoes array or marking new data.
void parse_datagram(const unsigned char *datagram, size_t datagram_length)
Parse incoming message from device.
void reset_echoes()
Reset all distance values to NaN.
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:761
Base class for exceptions in Fawkes.
Definition: exception.h:36
float * _distances
Allocate a float array and copy your distance values measured in meters here.
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
bool _new_data
Set to true in your loop if new data is available.
SickTiM55xEthernetAcquisitionThread(std::string &cfg_name, std::string &cfg_prefix)
Constructor.
virtual void init()
Initialize the thread.
void yield()
Yield the processor to another thread or process.
Definition: thread.cpp:902
void read_common_config()
Read common configuration parameters.
virtual void loop()
Code to execute in the thread.
void lock()
Lock this mutex.
Definition: mutex.cpp:89
Time & stamp()
Set this time to the current time.
Definition: time.cpp:783
Mutex mutual exclusion lock.
Definition: mutex.h:32
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
std::string cfg_prefix_
Configuration path prefix for this configuration.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.