Fawkes API  Fawkes Development Version
filter_thread.cpp
00001 
00002 /***************************************************************************
00003  *  filter_thread.cpp - Thread that filters data in blackboard
00004  *
00005  *  Created: Sun Mar 13 01:12:53 2011
00006  *  Copyright  2006-2011  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU Library General Public License for more details.
00019  *
00020  *  Read the full text in the LICENSE.GPL file in the doc directory.
00021  */
00022 
00023 #include "filter_thread.h"
00024 #include "filters/max_circle.h"
00025 #include "filters/720to360.h"
00026 #include "filters/deadspots.h"
00027 #include "filters/cascade.h"
00028 #include "filters/reverse_angle.h"
00029 #include "filters/min_circle.h"
00030 #include "filters/circle_sector.h"
00031 #include "filters/min_merge.h"
00032 #ifdef HAVE_TF
00033 #  include "filters/projection.h"
00034 #endif
00035 
00036 #include <core/threading/barrier.h>
00037 #include <core/threading/mutex.h>
00038 #include <core/threading/wait_condition.h>
00039 
00040 #include <interfaces/Laser360Interface.h>
00041 #include <interfaces/Laser720Interface.h>
00042 
00043 #include <cstring>
00044 #include <memory>
00045 #include <cstdio>
00046 
00047 using namespace fawkes;
00048 
00049 /** @class LaserFilterThread "filter_thread.h"
00050  * Laser filter thread.
00051  * This thread integrates into the Fawkes main loop at the sensor processing
00052  * hook, reads data from specified interfaces, filters it with a given
00053  * cascade, and then writes it back to an interface.
00054  * @author Tim Niemueller
00055  */
00056 
00057 
00058 /** Constructor.
00059  * @param cfg_name short name of configuration group
00060  * @param cfg_prefix configuration path prefix
00061  */
00062 LaserFilterThread::LaserFilterThread(std::string &cfg_name,
00063                                      std::string &cfg_prefix)
00064   : Thread("LaserFilterThread", Thread::OPMODE_WAITFORWAKEUP),
00065     BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_PROCESS)
00066 {
00067   set_name("LaserFilterThread(%s)", cfg_name.c_str());
00068   __cfg_name   = cfg_name;
00069   __cfg_prefix = cfg_prefix;
00070   __wait_barrier = NULL;
00071 }
00072 
00073 
00074 void
00075 LaserFilterThread::init()
00076 {
00077   try {
00078     open_interfaces(__cfg_prefix + "in/", __in, __in_bufs, false);
00079     open_interfaces(__cfg_prefix + "out/", __out, __out_bufs, true);
00080 
00081     if (__in.empty()) {
00082       throw Exception("No input interfaces defined for %s", __cfg_name.c_str());
00083     }
00084     if (__out.empty()) {
00085       throw Exception("No output interfaces defined for %s", __cfg_name.c_str());
00086     }
00087 
00088 
00089     std::map<std::string, std::string> filters;
00090 
00091     std::string fpfx = __cfg_prefix + "filters/";
00092     std::auto_ptr<Configuration::ValueIterator> i(config->search(fpfx.c_str()));
00093     while (i->next()) {
00094       std::string filter_name = std::string(i->path()).substr(fpfx.length());
00095       if (filter_name.find("/") != std::string::npos) {
00096         // If it contains a slash we assume it is a parameter for a filter
00097         continue;
00098       }
00099 
00100       if (! i->is_string()) {
00101         throw Exception("Filter value %s is not a string", i->path());
00102       }
00103 
00104       filters[filter_name] = i->get_string();
00105     }
00106     if (filters.empty()) {
00107       throw Exception("No filters defined for %s", __cfg_name.c_str());
00108     }
00109 
00110     if (filters.size() == 1) {
00111       std::string filter_name = filters.begin()->first;
00112       logger->log_debug(name(), "Adding filter %s (%s)",
00113                         filter_name.c_str(), filters[filter_name].c_str());
00114       __filter = create_filter(filters[filter_name], fpfx + filter_name + "/",
00115                                __in[0].is_360 ? 360 : 720, __in_bufs);
00116     } else {
00117       LaserDataFilterCascade *cascade =
00118         new LaserDataFilterCascade(__in[0].is_360 ? 360 : 720, __in_bufs);
00119 
00120       try {
00121         std::map<std::string, std::string>::iterator f;
00122         for (f = filters.begin(); f != filters.end(); ++f) {
00123           logger->log_debug(name(), "Adding filter %s (%s) %zu %zu",
00124                             f->first.c_str(), f->second.c_str(), __in_bufs.size(),
00125                             cascade->get_out_vector().size());
00126           cascade->add_filter(create_filter(f->second, fpfx + f->first + "/",
00127                                             cascade->get_out_data_size(),
00128                                             cascade->get_out_vector()));
00129         }
00130       } catch (Exception &e) {
00131         delete cascade;
00132         throw;
00133       }
00134 
00135       __filter = cascade;
00136     }
00137 
00138     if (__out[0].is_360 && (__filter->get_out_data_size() != 360)) {
00139       Exception e("Output interface and filter data size for %s do not match (%u != 360)",
00140                   __cfg_name.c_str(), __filter->get_out_data_size());
00141       delete __filter;
00142       throw e;
00143     } else if (!__out[0].is_360 && (__filter->get_out_data_size() != 720)) {
00144       Exception e("Output interface and filter data size for %s do not match (%u != 720)",
00145                   __cfg_name.c_str(), __filter->get_out_data_size());
00146       delete __filter;
00147       throw e;
00148     }
00149 
00150     __filter->set_out_vector(__out_bufs);
00151 
00152   } catch (Exception &e) {
00153     for (unsigned int i = 0; i < __in.size(); ++i) {
00154       blackboard->close(__in[i].interface);
00155     }
00156     for (unsigned int i = 0; i < __out.size(); ++i) {
00157       blackboard->close(__out[i].interface);
00158     }
00159     throw;
00160   }
00161 
00162   std::list<LaserFilterThread *>::iterator wt;
00163   for (wt = __wait_threads.begin(); wt != __wait_threads.end(); ++wt) {
00164     logger->log_debug(name(), "Depending on %s", (*wt)->name());
00165   }
00166 
00167   __wait_done  = true;
00168   __wait_mutex = new Mutex();
00169   __wait_cond  = new WaitCondition(__wait_mutex);
00170 }
00171 
00172 
00173 void
00174 LaserFilterThread::finalize()
00175 {
00176   delete __filter;
00177   delete __wait_cond;
00178   delete __wait_mutex;
00179 
00180   for (unsigned int i = 0; i < __in.size(); ++i) {
00181     blackboard->close(__in[i].interface);
00182   }
00183   __in.clear();
00184   for (unsigned int i = 0; i < __out.size(); ++i) {
00185     blackboard->close(__out[i].interface);
00186   }
00187   __out.clear();
00188 }
00189 
00190 void
00191 LaserFilterThread::loop()
00192 {
00193   // Wait for dependencies
00194   if (__wait_barrier) {
00195     std::list<LaserFilterThread *>::iterator wt;
00196     for (wt = __wait_threads.begin(); wt != __wait_threads.end(); ++wt) {
00197       (*wt)->wait_done();
00198     }
00199   }
00200 
00201   // Read input interfaces
00202   const size_t in_num = __in.size();
00203   for (size_t i = 0; i != in_num; ++i) {
00204     __in[i].interface->read();
00205     if (__in[i].is_360) {
00206       __in_bufs[i]->frame = __in[i].interface_typed.as360->frame();
00207     } else {
00208       __in_bufs[i]->frame = __in[i].interface_typed.as720->frame();
00209     }
00210   }
00211 
00212   // Filter!
00213   try {
00214     __filter->filter();
00215   } catch (Exception &e) {
00216     logger->log_warn(name(), "Filtering failed, exception follows");
00217     logger->log_warn(name(), e);
00218   }
00219 
00220   // Write output interfaces
00221   const size_t num = __out.size();
00222   for (size_t i = 0; i < num; ++i) {
00223     if (__out[i].is_360) {
00224       __out[i].interface_typed.as360->set_frame(__out_bufs[i]->frame.c_str());
00225     } else {
00226       __out[i].interface_typed.as720->set_frame(__out_bufs[i]->frame.c_str());
00227     }
00228     __out[i].interface->write();
00229   }
00230 
00231   if (__wait_barrier) {
00232     __wait_mutex->lock();
00233     __wait_done = false;
00234     __wait_cond->wake_all();
00235     __wait_mutex->unlock();
00236     __wait_barrier->wait();
00237     __wait_mutex->lock();
00238     __wait_done = true;
00239     __wait_mutex->unlock();
00240   }
00241 }
00242 
00243 
00244 /** Wait until thread is done.
00245  * This method blocks the calling thread until this instance's thread has
00246  * finished filtering.
00247  */
00248 void
00249 LaserFilterThread::wait_done()
00250 {
00251   __wait_mutex->lock();
00252   while (__wait_done) {
00253     //logger->log_debug(name(), "%s is waiting", Thread::current_thread()->name());
00254     __wait_cond->wait();
00255   }
00256   __wait_mutex->unlock();
00257 }
00258 
00259 
00260 void
00261 LaserFilterThread::open_interfaces(std::string prefix,
00262                                    std::vector<LaserInterface> &ifs,
00263                                    std::vector<LaserDataFilter::Buffer *> &bufs, bool writing)
00264 {
00265   std::auto_ptr<Configuration::ValueIterator> in(config->search(prefix.c_str()));
00266   while (in->next()) {
00267     if (! in->is_string()) {
00268       throw Exception("Config value %s is not of type string", in->path());
00269     } else {
00270       std::string uid = in->get_string();
00271       size_t sf;
00272 
00273       if ((sf = uid.find("::")) == std::string::npos) {
00274         throw Exception("Interface '%s' is not a UID", uid.c_str());
00275       }
00276       std::string type = uid.substr(0, sf);
00277       std::string id = uid.substr(sf + 2);
00278 
00279       LaserInterface lif;
00280       lif.interface = NULL;
00281 
00282       if (type == "Laser360Interface") {
00283         lif.is_360 = true;
00284       } else if (type == "Laser720Interface") {
00285         lif.is_360 = false;
00286       } else {
00287         throw Exception("Interfaces must be of type Laser360Interface or "
00288                         "Laser720Interface, but it is '%s'", type.c_str());
00289       }
00290 
00291       lif.id = id;
00292       ifs.push_back(lif);
00293     }
00294   }
00295 
00296   if (ifs.empty()) {
00297     throw Exception("No interfaces defined at %s", prefix.c_str());
00298   }
00299 
00300   bufs.resize(ifs.size());
00301 
00302   bool must_360 = ifs[0].is_360;
00303 
00304   try {
00305     if (writing) {
00306       for (unsigned int i = 0; i < ifs.size(); ++i) {
00307         if (ifs[i].is_360) {
00308           if (! must_360) {
00309             throw Exception("Interfaces of mixed sizes for %s",
00310                             __cfg_name.c_str());
00311           }
00312           logger->log_debug(name(), "Opening writing Laser360Interface::%s", ifs[i].id.c_str());
00313           Laser360Interface *laser360 = 
00314             blackboard->open_for_writing<Laser360Interface>(ifs[i].id.c_str());
00315 
00316           ifs[i].interface_typed.as360 = laser360;
00317           ifs[i].interface = laser360;
00318           bufs[i] = new LaserDataFilter::Buffer();
00319           bufs[i]->values = laser360->distances();
00320           
00321         } else {
00322           if (must_360) {
00323             throw Exception("Interfaces of mixed sizes for %s",
00324                             __cfg_name.c_str());
00325           }
00326 
00327           logger->log_debug(name(), "Opening writing Laser720Interface::%s", ifs[i].id.c_str());
00328           Laser720Interface *laser720 = 
00329             blackboard->open_for_writing<Laser720Interface>(ifs[i].id.c_str());
00330 
00331           ifs[i].interface_typed.as720 = laser720;
00332           ifs[i].interface = laser720;
00333           bufs[i] = new LaserDataFilter::Buffer();
00334           bufs[i]->values = laser720->distances();
00335         }
00336       }
00337     } else {
00338       for (unsigned int i = 0; i < ifs.size(); ++i) {
00339         if (ifs[i].is_360) {
00340           logger->log_debug(name(), "Opening reading Laser360Interface::%s", ifs[i].id.c_str());
00341           Laser360Interface *laser360 =
00342             blackboard->open_for_reading<Laser360Interface>(ifs[i].id.c_str());
00343 
00344           ifs[i].interface_typed.as360 = laser360;
00345           ifs[i].interface = laser360;
00346           bufs[i] = new LaserDataFilter::Buffer();
00347           bufs[i]->frame  = laser360->frame();
00348           bufs[i]->values = laser360->distances();
00349           
00350         } else {
00351           logger->log_debug(name(), "Opening reading Laser720Interface::%s", ifs[i].id.c_str());
00352           Laser720Interface *laser720 =
00353             blackboard->open_for_reading<Laser720Interface>(ifs[i].id.c_str());
00354 
00355           ifs[i].interface_typed.as720 = laser720;
00356           ifs[i].interface = laser720;
00357           bufs[i] = new LaserDataFilter::Buffer();
00358           bufs[i]->frame  = laser720->frame();
00359           bufs[i]->values = laser720->distances();
00360         }
00361       }
00362     }
00363   } catch (Exception &e) {
00364     for (unsigned int i = 0; i < ifs.size(); ++i) {
00365       blackboard->close(ifs[i].interface);
00366     }
00367     ifs.clear();
00368     bufs.clear();
00369     throw;
00370   }
00371 }
00372 
00373 
00374 LaserDataFilter *
00375 LaserFilterThread::create_filter(std::string filter_type, std::string prefix,
00376                                  unsigned int in_data_size,
00377                                  std::vector<LaserDataFilter::Buffer *> &inbufs)
00378 {
00379   if (filter_type == "720to360") {
00380     bool average = false;
00381     try {
00382       average = config->get_bool((prefix + "average").c_str());
00383     } catch (Exception &e) {} // ignore
00384     return new Laser720to360DataFilter(average, in_data_size, inbufs);
00385   } else if (filter_type == "reverse") {
00386     return new LaserReverseAngleDataFilter(in_data_size, inbufs);
00387   } else if (filter_type == "max_circle") {
00388     float radius = config->get_float((prefix + "radius").c_str());
00389     return new LaserMaxCircleDataFilter(radius, in_data_size, inbufs);
00390   } else if (filter_type == "min_circle") {
00391     float radius = config->get_float((prefix + "radius").c_str());
00392     return new LaserMinCircleDataFilter(radius, in_data_size, inbufs);
00393   } else if (filter_type == "circle_sector") {
00394     unsigned int from = config->get_uint((prefix + "from").c_str());
00395     unsigned int to   = config->get_uint((prefix + "to").c_str());
00396     return new LaserCircleSectorDataFilter(from, to, in_data_size, inbufs);
00397   } else if (filter_type == "deadspots") {
00398     return new LaserDeadSpotsDataFilter(config, logger, prefix, in_data_size, inbufs);
00399   } else if (filter_type == "min_merge") {
00400     return new LaserMinMergeDataFilter(in_data_size, inbufs);
00401   } else if (filter_type == "projection") {
00402 #ifdef HAVE_TF
00403     const float not_from_x = config->get_float((prefix + "not_from_x").c_str());
00404     const float not_to_x = config->get_float((prefix + "not_to_x").c_str());
00405     const float not_from_y = config->get_float((prefix + "not_from_y").c_str());
00406     const float not_to_y = config->get_float((prefix + "not_to_y").c_str());
00407     const float only_from_z = config->get_float((prefix + "only_from_z").c_str());
00408     const float only_to_z = config->get_float((prefix + "only_to_z").c_str());
00409     const std::string frame =
00410       config->get_string((prefix + "target_frame").c_str());
00411     return new LaserProjectionDataFilter(tf_listener, frame,
00412                                          not_from_x, not_to_x,
00413                                          not_from_y, not_to_y,
00414                                          only_from_z, only_to_z,
00415                                          in_data_size, inbufs);
00416 #else
00417     throw Exception("Projection filter unavailable, tf missing");
00418 #endif
00419   } else {
00420     throw Exception("Unknown filter type %s", filter_type.c_str());
00421   }
00422 }
00423 
00424 
00425 /** Set threads to wait for in loop.
00426  * The threads produce data this thread depends on as input, therefore this
00427  * instance has to wait for these threads to get up to date data in each
00428  * loop.
00429  * @param threads threads this instance depends on
00430  */
00431 void
00432 LaserFilterThread::set_wait_threads(std::list<LaserFilterThread *> &threads)
00433 {
00434   __wait_threads = threads;
00435 }
00436 
00437 
00438 /** Set wait barrier.
00439  * If there are any dependencies between laser filter threads a common
00440  * barrier is used to signal the end of filtering to reset internal
00441  * variables for the next loop.
00442  * @param barrier common "end of filtering" barrier
00443  */
00444 void
00445 LaserFilterThread::set_wait_barrier(fawkes::Barrier *barrier)
00446 {
00447   __wait_barrier = barrier;
00448 }