Fawkes API  Fawkes Development Version
filter_thread.cpp
1 
2 /***************************************************************************
3  * filter_thread.cpp - Thread that filters data in blackboard
4  *
5  * Created: Sun Mar 13 01:12:53 2011
6  * Copyright 2006-2011 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "filter_thread.h"
24 #include "filters/max_circle.h"
25 #include "filters/720to360.h"
26 #include "filters/deadspots.h"
27 #include "filters/cascade.h"
28 #include "filters/reverse_angle.h"
29 #include "filters/min_circle.h"
30 #include "filters/circle_sector.h"
31 #include "filters/min_merge.h"
32 #ifdef HAVE_TF
33 # include "filters/projection.h"
34 #endif
35 
36 #include <core/threading/barrier.h>
37 #include <core/threading/mutex.h>
38 #include <core/threading/wait_condition.h>
39 
40 #include <interfaces/Laser360Interface.h>
41 #include <interfaces/Laser720Interface.h>
42 
43 #include <cstring>
44 #include <memory>
45 #include <cstdio>
46 
47 using namespace fawkes;
48 
49 /** @class LaserFilterThread "filter_thread.h"
50  * Laser filter thread.
51  * This thread integrates into the Fawkes main loop at the sensor processing
52  * hook, reads data from specified interfaces, filters it with a given
53  * cascade, and then writes it back to an interface.
54  * @author Tim Niemueller
55  */
56 
57 
58 /** Constructor.
59  * @param cfg_name short name of configuration group
60  * @param cfg_prefix configuration path prefix
61  */
63  std::string &cfg_prefix)
64  : Thread("LaserFilterThread", Thread::OPMODE_WAITFORWAKEUP),
65  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_PROCESS)
66 {
67  set_name("LaserFilterThread(%s)", cfg_name.c_str());
68  __cfg_name = cfg_name;
69  __cfg_prefix = cfg_prefix;
70  __wait_barrier = NULL;
71 }
72 
73 
74 void
76 {
77  try {
78  open_interfaces(__cfg_prefix + "in/", __in, __in_bufs, false);
79  open_interfaces(__cfg_prefix + "out/", __out, __out_bufs, true);
80 
81  if (__in.empty()) {
82  throw Exception("No input interfaces defined for %s", __cfg_name.c_str());
83  }
84  if (__out.empty()) {
85  throw Exception("No output interfaces defined for %s", __cfg_name.c_str());
86  }
87 
88 
89  std::map<std::string, std::string> filters;
90 
91  std::string fpfx = __cfg_prefix + "filters/";
92  std::auto_ptr<Configuration::ValueIterator> i(config->search(fpfx.c_str()));
93  while (i->next()) {
94  std::string filter_name = std::string(i->path()).substr(fpfx.length());
95  if (filter_name.find("/") != std::string::npos) {
96  // If it contains a slash we assume it is a parameter for a filter
97  continue;
98  }
99 
100  if (! i->is_string()) {
101  throw Exception("Filter value %s is not a string", i->path());
102  }
103 
104  filters[filter_name] = i->get_string();
105  }
106  if (filters.empty()) {
107  throw Exception("No filters defined for %s", __cfg_name.c_str());
108  }
109 
110  if (filters.size() == 1) {
111  std::string filter_name = filters.begin()->first;
112  logger->log_debug(name(), "Adding filter %s (%s)",
113  filter_name.c_str(), filters[filter_name].c_str());
114  __filter = create_filter(filters[filter_name], fpfx + filter_name + "/",
115  __in[0].is_360 ? 360 : 720, __in_bufs);
116  } else {
117  LaserDataFilterCascade *cascade =
118  new LaserDataFilterCascade(__in[0].is_360 ? 360 : 720, __in_bufs);
119 
120  try {
121  std::map<std::string, std::string>::iterator f;
122  for (f = filters.begin(); f != filters.end(); ++f) {
123  logger->log_debug(name(), "Adding filter %s (%s) %zu %zu",
124  f->first.c_str(), f->second.c_str(), __in_bufs.size(),
125  cascade->get_out_vector().size());
126  cascade->add_filter(create_filter(f->second, fpfx + f->first + "/",
127  cascade->get_out_data_size(),
128  cascade->get_out_vector()));
129  }
130  } catch (Exception &e) {
131  delete cascade;
132  throw;
133  }
134 
135  __filter = cascade;
136  }
137 
138  if (__out[0].is_360 && (__filter->get_out_data_size() != 360)) {
139  Exception e("Output interface and filter data size for %s do not match (%u != 360)",
140  __cfg_name.c_str(), __filter->get_out_data_size());
141  delete __filter;
142  throw e;
143  } else if (!__out[0].is_360 && (__filter->get_out_data_size() != 720)) {
144  Exception e("Output interface and filter data size for %s do not match (%u != 720)",
145  __cfg_name.c_str(), __filter->get_out_data_size());
146  delete __filter;
147  throw e;
148  }
149 
150  __filter->set_out_vector(__out_bufs);
151 
152  } catch (Exception &e) {
153  for (unsigned int i = 0; i < __in.size(); ++i) {
154  blackboard->close(__in[i].interface);
155  }
156  for (unsigned int i = 0; i < __out.size(); ++i) {
157  blackboard->close(__out[i].interface);
158  }
159  throw;
160  }
161 
162  std::list<LaserFilterThread *>::iterator wt;
163  for (wt = __wait_threads.begin(); wt != __wait_threads.end(); ++wt) {
164  logger->log_debug(name(), "Depending on %s", (*wt)->name());
165  }
166 
167  __wait_done = true;
168  __wait_mutex = new Mutex();
169  __wait_cond = new WaitCondition(__wait_mutex);
170 }
171 
172 
173 void
175 {
176  delete __filter;
177  delete __wait_cond;
178  delete __wait_mutex;
179 
180  for (unsigned int i = 0; i < __in.size(); ++i) {
181  blackboard->close(__in[i].interface);
182  }
183  __in.clear();
184  for (unsigned int i = 0; i < __out.size(); ++i) {
185  blackboard->close(__out[i].interface);
186  }
187  __out.clear();
188 }
189 
190 void
192 {
193  // Wait for dependencies
194  if (__wait_barrier) {
195  std::list<LaserFilterThread *>::iterator wt;
196  for (wt = __wait_threads.begin(); wt != __wait_threads.end(); ++wt) {
197  (*wt)->wait_done();
198  }
199  }
200 
201  // Read input interfaces
202  const size_t in_num = __in.size();
203  for (size_t i = 0; i != in_num; ++i) {
204  __in[i].interface->read();
205  if (__in[i].is_360) {
206  __in_bufs[i]->frame = __in[i].interface_typed.as360->frame();
207  } else {
208  __in_bufs[i]->frame = __in[i].interface_typed.as720->frame();
209  }
210  }
211 
212  // Filter!
213  try {
214  __filter->filter();
215  } catch (Exception &e) {
216  logger->log_warn(name(), "Filtering failed, exception follows");
217  logger->log_warn(name(), e);
218  }
219 
220  // Write output interfaces
221  const size_t num = __out.size();
222  for (size_t i = 0; i < num; ++i) {
223  if (__out[i].is_360) {
224  __out[i].interface_typed.as360->set_frame(__out_bufs[i]->frame.c_str());
225  } else {
226  __out[i].interface_typed.as720->set_frame(__out_bufs[i]->frame.c_str());
227  }
228  __out[i].interface->write();
229  }
230 
231  if (__wait_barrier) {
232  __wait_mutex->lock();
233  __wait_done = false;
234  __wait_cond->wake_all();
235  __wait_mutex->unlock();
236  __wait_barrier->wait();
237  __wait_mutex->lock();
238  __wait_done = true;
239  __wait_mutex->unlock();
240  }
241 }
242 
243 
244 /** Wait until thread is done.
245  * This method blocks the calling thread until this instance's thread has
246  * finished filtering.
247  */
248 void
250 {
251  __wait_mutex->lock();
252  while (__wait_done) {
253  //logger->log_debug(name(), "%s is waiting", Thread::current_thread()->name());
254  __wait_cond->wait();
255  }
256  __wait_mutex->unlock();
257 }
258 
259 
260 void
261 LaserFilterThread::open_interfaces(std::string prefix,
262  std::vector<LaserInterface> &ifs,
263  std::vector<LaserDataFilter::Buffer *> &bufs, bool writing)
264 {
265  std::auto_ptr<Configuration::ValueIterator> in(config->search(prefix.c_str()));
266  while (in->next()) {
267  if (! in->is_string()) {
268  throw Exception("Config value %s is not of type string", in->path());
269  } else {
270  std::string uid = in->get_string();
271  size_t sf;
272 
273  if ((sf = uid.find("::")) == std::string::npos) {
274  throw Exception("Interface '%s' is not a UID", uid.c_str());
275  }
276  std::string type = uid.substr(0, sf);
277  std::string id = uid.substr(sf + 2);
278 
279  LaserInterface lif;
280  lif.interface = NULL;
281 
282  if (type == "Laser360Interface") {
283  lif.is_360 = true;
284  } else if (type == "Laser720Interface") {
285  lif.is_360 = false;
286  } else {
287  throw Exception("Interfaces must be of type Laser360Interface or "
288  "Laser720Interface, but it is '%s'", type.c_str());
289  }
290 
291  lif.id = id;
292  ifs.push_back(lif);
293  }
294  }
295 
296  if (ifs.empty()) {
297  throw Exception("No interfaces defined at %s", prefix.c_str());
298  }
299 
300  bufs.resize(ifs.size());
301 
302  bool must_360 = ifs[0].is_360;
303 
304  try {
305  if (writing) {
306  for (unsigned int i = 0; i < ifs.size(); ++i) {
307  if (ifs[i].is_360) {
308  if (! must_360) {
309  throw Exception("Interfaces of mixed sizes for %s",
310  __cfg_name.c_str());
311  }
312  logger->log_debug(name(), "Opening writing Laser360Interface::%s", ifs[i].id.c_str());
313  Laser360Interface *laser360 =
314  blackboard->open_for_writing<Laser360Interface>(ifs[i].id.c_str());
315 
316  ifs[i].interface_typed.as360 = laser360;
317  ifs[i].interface = laser360;
318  bufs[i] = new LaserDataFilter::Buffer();
319  bufs[i]->values = laser360->distances();
320 
321  } else {
322  if (must_360) {
323  throw Exception("Interfaces of mixed sizes for %s",
324  __cfg_name.c_str());
325  }
326 
327  logger->log_debug(name(), "Opening writing Laser720Interface::%s", ifs[i].id.c_str());
328  Laser720Interface *laser720 =
329  blackboard->open_for_writing<Laser720Interface>(ifs[i].id.c_str());
330 
331  ifs[i].interface_typed.as720 = laser720;
332  ifs[i].interface = laser720;
333  bufs[i] = new LaserDataFilter::Buffer();
334  bufs[i]->values = laser720->distances();
335  }
336  }
337  } else {
338  for (unsigned int i = 0; i < ifs.size(); ++i) {
339  if (ifs[i].is_360) {
340  logger->log_debug(name(), "Opening reading Laser360Interface::%s", ifs[i].id.c_str());
341  Laser360Interface *laser360 =
342  blackboard->open_for_reading<Laser360Interface>(ifs[i].id.c_str());
343 
344  ifs[i].interface_typed.as360 = laser360;
345  ifs[i].interface = laser360;
346  bufs[i] = new LaserDataFilter::Buffer();
347  bufs[i]->frame = laser360->frame();
348  bufs[i]->values = laser360->distances();
349 
350  } else {
351  logger->log_debug(name(), "Opening reading Laser720Interface::%s", ifs[i].id.c_str());
352  Laser720Interface *laser720 =
353  blackboard->open_for_reading<Laser720Interface>(ifs[i].id.c_str());
354 
355  ifs[i].interface_typed.as720 = laser720;
356  ifs[i].interface = laser720;
357  bufs[i] = new LaserDataFilter::Buffer();
358  bufs[i]->frame = laser720->frame();
359  bufs[i]->values = laser720->distances();
360  }
361  }
362  }
363  } catch (Exception &e) {
364  for (unsigned int i = 0; i < ifs.size(); ++i) {
365  blackboard->close(ifs[i].interface);
366  }
367  ifs.clear();
368  bufs.clear();
369  throw;
370  }
371 }
372 
373 
375 LaserFilterThread::create_filter(std::string filter_type, std::string prefix,
376  unsigned int in_data_size,
377  std::vector<LaserDataFilter::Buffer *> &inbufs)
378 {
379  if (filter_type == "720to360") {
380  bool average = false;
381  try {
382  average = config->get_bool((prefix + "average").c_str());
383  } catch (Exception &e) {} // ignore
384  return new Laser720to360DataFilter(average, in_data_size, inbufs);
385  } else if (filter_type == "reverse") {
386  return new LaserReverseAngleDataFilter(in_data_size, inbufs);
387  } else if (filter_type == "max_circle") {
388  float radius = config->get_float((prefix + "radius").c_str());
389  return new LaserMaxCircleDataFilter(radius, in_data_size, inbufs);
390  } else if (filter_type == "min_circle") {
391  float radius = config->get_float((prefix + "radius").c_str());
392  return new LaserMinCircleDataFilter(radius, in_data_size, inbufs);
393  } else if (filter_type == "circle_sector") {
394  unsigned int from = config->get_uint((prefix + "from").c_str());
395  unsigned int to = config->get_uint((prefix + "to").c_str());
396  return new LaserCircleSectorDataFilter(from, to, in_data_size, inbufs);
397  } else if (filter_type == "deadspots") {
398  return new LaserDeadSpotsDataFilter(config, logger, prefix, in_data_size, inbufs);
399  } else if (filter_type == "min_merge") {
400  return new LaserMinMergeDataFilter(in_data_size, inbufs);
401  } else if (filter_type == "projection") {
402 #ifdef HAVE_TF
403  const float not_from_x = config->get_float((prefix + "not_from_x").c_str());
404  const float not_to_x = config->get_float((prefix + "not_to_x").c_str());
405  const float not_from_y = config->get_float((prefix + "not_from_y").c_str());
406  const float not_to_y = config->get_float((prefix + "not_to_y").c_str());
407  const float only_from_z = config->get_float((prefix + "only_from_z").c_str());
408  const float only_to_z = config->get_float((prefix + "only_to_z").c_str());
409  const std::string frame =
410  config->get_string((prefix + "target_frame").c_str());
411  return new LaserProjectionDataFilter(tf_listener, frame,
412  not_from_x, not_to_x,
413  not_from_y, not_to_y,
414  only_from_z, only_to_z,
415  in_data_size, inbufs);
416 #else
417  throw Exception("Projection filter unavailable, tf missing");
418 #endif
419  } else {
420  throw Exception("Unknown filter type %s", filter_type.c_str());
421  }
422 }
423 
424 
425 /** Set threads to wait for in loop.
426  * The threads produce data this thread depends on as input, therefore this
427  * instance has to wait for these threads to get up to date data in each
428  * loop.
429  * @param threads threads this instance depends on
430  */
431 void
432 LaserFilterThread::set_wait_threads(std::list<LaserFilterThread *> &threads)
433 {
434  __wait_threads = threads;
435 }
436 
437 
438 /** Set wait barrier.
439  * If there are any dependencies between laser filter threads a common
440  * barrier is used to signal the end of filtering to reset internal
441  * variables for the next loop.
442  * @param barrier common "end of filtering" barrier
443  */
444 void
446 {
447  __wait_barrier = barrier;
448 }
Laser360Interface Fawkes BlackBoard Interface.
void set_wait_threads(std::list< LaserFilterThread * > &threads)
Set threads to wait for in loop.
Wait until a given condition holds.
virtual void finalize()
Finalize the thread.
Erase beams outside specified circle sector.
Definition: circle_sector.h:28
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:157
void wake_all()
Wake up all waiting threads.
virtual std::string get_string() const =0
Get string value.
Erase dead spots (i.e.
Definition: deadspots.h:37
virtual bool is_string() const =0
Check if current value is a string.
virtual ValueIterator * search(const char *path)=0
Iterator with search results.
virtual bool next()=0
Check if there is another element and advance to this if possible.
Thread class encapsulation of pthreads.
Definition: thread.h:42
virtual const char * path() const =0
Path of value.
char * frame() const
Get frame value.
Downsample filter from 720 to 360 values.
Definition: 720to360.h:28
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
virtual Interface * open_for_writing(const char *interface_type, const char *identifier)=0
Open interface for writing.
Erase beams below a certain minimum distance distance.
Definition: min_circle.h:28
void set_wait_barrier(fawkes::Barrier *barrier)
Set wait barrier.
Thread aspect to use blocked timing.
char * frame() const
Get frame value.
virtual unsigned int get_out_data_size()
Get size of filtered data array.
Definition: filter.cpp:170
LaserFilterThread(std::string &cfg_name, std::string &cfg_prefix)
Constructor.
Cascade of several laser filters to one.
Definition: cascade.h:30
virtual void filter()=0
Filter the incoming data.
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:749
Base class for exceptions in Fawkes.
Definition: exception.h:36
Cut of laser data at max distance.
Definition: max_circle.h:28
Laser data buffer.
Definition: filter.h:31
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Projects one laser into another laser's plane.
Definition: projection.h:41
void wait()
Wait for the condition forever.
const char * name() const
Get name of thread.
Definition: thread.h:95
float * distances() const
Get distances value.
void wait_done()
Wait until thread is done.
void add_filter(LaserDataFilter *filter)
Add a filter to the cascade.
Definition: cascade.cpp:70
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void init()
Initialize the thread.
virtual Interface * open_for_reading(const char *interface_type, const char *identifier)=0
Open interface for reading.
void lock()
Lock this mutex.
Definition: mutex.cpp:89
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual void set_out_vector(std::vector< Buffer * > &out)
Set filtered data array.
Definition: filter.cpp:123
float * distances() const
Get distances value.
Mutex mutual exclusion lock.
Definition: mutex.h:32
Reverse the angle of beams.
Definition: reverse_angle.h:28
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
Merge multiple laser data arrays into one.
Definition: min_merge.h:28
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
Laser720Interface Fawkes BlackBoard Interface.
virtual std::vector< Buffer * > & get_out_vector()
Get filtered data array.
Definition: filter.cpp:110
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32
virtual void loop()
Code to execute in the thread.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:43
Laser data filter.
Definition: filter.h:28
virtual void close(Interface *interface)=0
Close interface.