Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * filter_thread.cpp - Thread that filters data in blackboard 00004 * 00005 * Created: Sun Mar 13 01:12:53 2011 00006 * Copyright 2006-2011 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. 00014 * 00015 * This program is distributed in the hope that it will be useful, 00016 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00018 * GNU Library General Public License for more details. 00019 * 00020 * Read the full text in the LICENSE.GPL file in the doc directory. 00021 */ 00022 00023 #include "filter_thread.h" 00024 #include "filters/max_circle.h" 00025 #include "filters/720to360.h" 00026 #include "filters/deadspots.h" 00027 #include "filters/cascade.h" 00028 #include "filters/reverse_angle.h" 00029 #include "filters/min_circle.h" 00030 #include "filters/circle_sector.h" 00031 #include "filters/min_merge.h" 00032 #ifdef HAVE_TF 00033 # include "filters/projection.h" 00034 #endif 00035 00036 #include <core/threading/barrier.h> 00037 #include <core/threading/mutex.h> 00038 #include <core/threading/wait_condition.h> 00039 00040 #include <interfaces/Laser360Interface.h> 00041 #include <interfaces/Laser720Interface.h> 00042 00043 #include <cstring> 00044 #include <memory> 00045 #include <cstdio> 00046 00047 using namespace fawkes; 00048 00049 /** @class LaserFilterThread "filter_thread.h" 00050 * Laser filter thread. 00051 * This thread integrates into the Fawkes main loop at the sensor processing 00052 * hook, reads data from specified interfaces, filters it with a given 00053 * cascade, and then writes it back to an interface. 00054 * @author Tim Niemueller 00055 */ 00056 00057 00058 /** Constructor. 00059 * @param cfg_name short name of configuration group 00060 * @param cfg_prefix configuration path prefix 00061 */ 00062 LaserFilterThread::LaserFilterThread(std::string &cfg_name, 00063 std::string &cfg_prefix) 00064 : Thread("LaserFilterThread", Thread::OPMODE_WAITFORWAKEUP), 00065 BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_PROCESS) 00066 { 00067 set_name("LaserFilterThread(%s)", cfg_name.c_str()); 00068 __cfg_name = cfg_name; 00069 __cfg_prefix = cfg_prefix; 00070 __wait_barrier = NULL; 00071 } 00072 00073 00074 void 00075 LaserFilterThread::init() 00076 { 00077 try { 00078 open_interfaces(__cfg_prefix + "in/", __in, __in_bufs, false); 00079 open_interfaces(__cfg_prefix + "out/", __out, __out_bufs, true); 00080 00081 if (__in.empty()) { 00082 throw Exception("No input interfaces defined for %s", __cfg_name.c_str()); 00083 } 00084 if (__out.empty()) { 00085 throw Exception("No output interfaces defined for %s", __cfg_name.c_str()); 00086 } 00087 00088 00089 std::map<std::string, std::string> filters; 00090 00091 std::string fpfx = __cfg_prefix + "filters/"; 00092 std::auto_ptr<Configuration::ValueIterator> i(config->search(fpfx.c_str())); 00093 while (i->next()) { 00094 std::string filter_name = std::string(i->path()).substr(fpfx.length()); 00095 if (filter_name.find("/") != std::string::npos) { 00096 // If it contains a slash we assume it is a parameter for a filter 00097 continue; 00098 } 00099 00100 if (! i->is_string()) { 00101 throw Exception("Filter value %s is not a string", i->path()); 00102 } 00103 00104 filters[filter_name] = i->get_string(); 00105 } 00106 if (filters.empty()) { 00107 throw Exception("No filters defined for %s", __cfg_name.c_str()); 00108 } 00109 00110 if (filters.size() == 1) { 00111 std::string filter_name = filters.begin()->first; 00112 logger->log_debug(name(), "Adding filter %s (%s)", 00113 filter_name.c_str(), filters[filter_name].c_str()); 00114 __filter = create_filter(filters[filter_name], fpfx + filter_name + "/", 00115 __in[0].is_360 ? 360 : 720, __in_bufs); 00116 } else { 00117 LaserDataFilterCascade *cascade = 00118 new LaserDataFilterCascade(__in[0].is_360 ? 360 : 720, __in_bufs); 00119 00120 try { 00121 std::map<std::string, std::string>::iterator f; 00122 for (f = filters.begin(); f != filters.end(); ++f) { 00123 logger->log_debug(name(), "Adding filter %s (%s) %zu %zu", 00124 f->first.c_str(), f->second.c_str(), __in_bufs.size(), 00125 cascade->get_out_vector().size()); 00126 cascade->add_filter(create_filter(f->second, fpfx + f->first + "/", 00127 cascade->get_out_data_size(), 00128 cascade->get_out_vector())); 00129 } 00130 } catch (Exception &e) { 00131 delete cascade; 00132 throw; 00133 } 00134 00135 __filter = cascade; 00136 } 00137 00138 if (__out[0].is_360 && (__filter->get_out_data_size() != 360)) { 00139 Exception e("Output interface and filter data size for %s do not match (%u != 360)", 00140 __cfg_name.c_str(), __filter->get_out_data_size()); 00141 delete __filter; 00142 throw e; 00143 } else if (!__out[0].is_360 && (__filter->get_out_data_size() != 720)) { 00144 Exception e("Output interface and filter data size for %s do not match (%u != 720)", 00145 __cfg_name.c_str(), __filter->get_out_data_size()); 00146 delete __filter; 00147 throw e; 00148 } 00149 00150 __filter->set_out_vector(__out_bufs); 00151 00152 } catch (Exception &e) { 00153 for (unsigned int i = 0; i < __in.size(); ++i) { 00154 blackboard->close(__in[i].interface); 00155 } 00156 for (unsigned int i = 0; i < __out.size(); ++i) { 00157 blackboard->close(__out[i].interface); 00158 } 00159 throw; 00160 } 00161 00162 std::list<LaserFilterThread *>::iterator wt; 00163 for (wt = __wait_threads.begin(); wt != __wait_threads.end(); ++wt) { 00164 logger->log_debug(name(), "Depending on %s", (*wt)->name()); 00165 } 00166 00167 __wait_done = true; 00168 __wait_mutex = new Mutex(); 00169 __wait_cond = new WaitCondition(__wait_mutex); 00170 } 00171 00172 00173 void 00174 LaserFilterThread::finalize() 00175 { 00176 delete __filter; 00177 delete __wait_cond; 00178 delete __wait_mutex; 00179 00180 for (unsigned int i = 0; i < __in.size(); ++i) { 00181 blackboard->close(__in[i].interface); 00182 } 00183 __in.clear(); 00184 for (unsigned int i = 0; i < __out.size(); ++i) { 00185 blackboard->close(__out[i].interface); 00186 } 00187 __out.clear(); 00188 } 00189 00190 void 00191 LaserFilterThread::loop() 00192 { 00193 // Wait for dependencies 00194 if (__wait_barrier) { 00195 std::list<LaserFilterThread *>::iterator wt; 00196 for (wt = __wait_threads.begin(); wt != __wait_threads.end(); ++wt) { 00197 (*wt)->wait_done(); 00198 } 00199 } 00200 00201 // Read input interfaces 00202 const size_t in_num = __in.size(); 00203 for (size_t i = 0; i != in_num; ++i) { 00204 __in[i].interface->read(); 00205 if (__in[i].is_360) { 00206 __in_bufs[i]->frame = __in[i].interface_typed.as360->frame(); 00207 } else { 00208 __in_bufs[i]->frame = __in[i].interface_typed.as720->frame(); 00209 } 00210 } 00211 00212 // Filter! 00213 try { 00214 __filter->filter(); 00215 } catch (Exception &e) { 00216 logger->log_warn(name(), "Filtering failed, exception follows"); 00217 logger->log_warn(name(), e); 00218 } 00219 00220 // Write output interfaces 00221 const size_t num = __out.size(); 00222 for (size_t i = 0; i < num; ++i) { 00223 if (__out[i].is_360) { 00224 __out[i].interface_typed.as360->set_frame(__out_bufs[i]->frame.c_str()); 00225 } else { 00226 __out[i].interface_typed.as720->set_frame(__out_bufs[i]->frame.c_str()); 00227 } 00228 __out[i].interface->write(); 00229 } 00230 00231 if (__wait_barrier) { 00232 __wait_mutex->lock(); 00233 __wait_done = false; 00234 __wait_cond->wake_all(); 00235 __wait_mutex->unlock(); 00236 __wait_barrier->wait(); 00237 __wait_mutex->lock(); 00238 __wait_done = true; 00239 __wait_mutex->unlock(); 00240 } 00241 } 00242 00243 00244 /** Wait until thread is done. 00245 * This method blocks the calling thread until this instance's thread has 00246 * finished filtering. 00247 */ 00248 void 00249 LaserFilterThread::wait_done() 00250 { 00251 __wait_mutex->lock(); 00252 while (__wait_done) { 00253 //logger->log_debug(name(), "%s is waiting", Thread::current_thread()->name()); 00254 __wait_cond->wait(); 00255 } 00256 __wait_mutex->unlock(); 00257 } 00258 00259 00260 void 00261 LaserFilterThread::open_interfaces(std::string prefix, 00262 std::vector<LaserInterface> &ifs, 00263 std::vector<LaserDataFilter::Buffer *> &bufs, bool writing) 00264 { 00265 std::auto_ptr<Configuration::ValueIterator> in(config->search(prefix.c_str())); 00266 while (in->next()) { 00267 if (! in->is_string()) { 00268 throw Exception("Config value %s is not of type string", in->path()); 00269 } else { 00270 std::string uid = in->get_string(); 00271 size_t sf; 00272 00273 if ((sf = uid.find("::")) == std::string::npos) { 00274 throw Exception("Interface '%s' is not a UID", uid.c_str()); 00275 } 00276 std::string type = uid.substr(0, sf); 00277 std::string id = uid.substr(sf + 2); 00278 00279 LaserInterface lif; 00280 lif.interface = NULL; 00281 00282 if (type == "Laser360Interface") { 00283 lif.is_360 = true; 00284 } else if (type == "Laser720Interface") { 00285 lif.is_360 = false; 00286 } else { 00287 throw Exception("Interfaces must be of type Laser360Interface or " 00288 "Laser720Interface, but it is '%s'", type.c_str()); 00289 } 00290 00291 lif.id = id; 00292 ifs.push_back(lif); 00293 } 00294 } 00295 00296 if (ifs.empty()) { 00297 throw Exception("No interfaces defined at %s", prefix.c_str()); 00298 } 00299 00300 bufs.resize(ifs.size()); 00301 00302 bool must_360 = ifs[0].is_360; 00303 00304 try { 00305 if (writing) { 00306 for (unsigned int i = 0; i < ifs.size(); ++i) { 00307 if (ifs[i].is_360) { 00308 if (! must_360) { 00309 throw Exception("Interfaces of mixed sizes for %s", 00310 __cfg_name.c_str()); 00311 } 00312 logger->log_debug(name(), "Opening writing Laser360Interface::%s", ifs[i].id.c_str()); 00313 Laser360Interface *laser360 = 00314 blackboard->open_for_writing<Laser360Interface>(ifs[i].id.c_str()); 00315 00316 ifs[i].interface_typed.as360 = laser360; 00317 ifs[i].interface = laser360; 00318 bufs[i] = new LaserDataFilter::Buffer(); 00319 bufs[i]->values = laser360->distances(); 00320 00321 } else { 00322 if (must_360) { 00323 throw Exception("Interfaces of mixed sizes for %s", 00324 __cfg_name.c_str()); 00325 } 00326 00327 logger->log_debug(name(), "Opening writing Laser720Interface::%s", ifs[i].id.c_str()); 00328 Laser720Interface *laser720 = 00329 blackboard->open_for_writing<Laser720Interface>(ifs[i].id.c_str()); 00330 00331 ifs[i].interface_typed.as720 = laser720; 00332 ifs[i].interface = laser720; 00333 bufs[i] = new LaserDataFilter::Buffer(); 00334 bufs[i]->values = laser720->distances(); 00335 } 00336 } 00337 } else { 00338 for (unsigned int i = 0; i < ifs.size(); ++i) { 00339 if (ifs[i].is_360) { 00340 logger->log_debug(name(), "Opening reading Laser360Interface::%s", ifs[i].id.c_str()); 00341 Laser360Interface *laser360 = 00342 blackboard->open_for_reading<Laser360Interface>(ifs[i].id.c_str()); 00343 00344 ifs[i].interface_typed.as360 = laser360; 00345 ifs[i].interface = laser360; 00346 bufs[i] = new LaserDataFilter::Buffer(); 00347 bufs[i]->frame = laser360->frame(); 00348 bufs[i]->values = laser360->distances(); 00349 00350 } else { 00351 logger->log_debug(name(), "Opening reading Laser720Interface::%s", ifs[i].id.c_str()); 00352 Laser720Interface *laser720 = 00353 blackboard->open_for_reading<Laser720Interface>(ifs[i].id.c_str()); 00354 00355 ifs[i].interface_typed.as720 = laser720; 00356 ifs[i].interface = laser720; 00357 bufs[i] = new LaserDataFilter::Buffer(); 00358 bufs[i]->frame = laser720->frame(); 00359 bufs[i]->values = laser720->distances(); 00360 } 00361 } 00362 } 00363 } catch (Exception &e) { 00364 for (unsigned int i = 0; i < ifs.size(); ++i) { 00365 blackboard->close(ifs[i].interface); 00366 } 00367 ifs.clear(); 00368 bufs.clear(); 00369 throw; 00370 } 00371 } 00372 00373 00374 LaserDataFilter * 00375 LaserFilterThread::create_filter(std::string filter_type, std::string prefix, 00376 unsigned int in_data_size, 00377 std::vector<LaserDataFilter::Buffer *> &inbufs) 00378 { 00379 if (filter_type == "720to360") { 00380 bool average = false; 00381 try { 00382 average = config->get_bool((prefix + "average").c_str()); 00383 } catch (Exception &e) {} // ignore 00384 return new Laser720to360DataFilter(average, in_data_size, inbufs); 00385 } else if (filter_type == "reverse") { 00386 return new LaserReverseAngleDataFilter(in_data_size, inbufs); 00387 } else if (filter_type == "max_circle") { 00388 float radius = config->get_float((prefix + "radius").c_str()); 00389 return new LaserMaxCircleDataFilter(radius, in_data_size, inbufs); 00390 } else if (filter_type == "min_circle") { 00391 float radius = config->get_float((prefix + "radius").c_str()); 00392 return new LaserMinCircleDataFilter(radius, in_data_size, inbufs); 00393 } else if (filter_type == "circle_sector") { 00394 unsigned int from = config->get_uint((prefix + "from").c_str()); 00395 unsigned int to = config->get_uint((prefix + "to").c_str()); 00396 return new LaserCircleSectorDataFilter(from, to, in_data_size, inbufs); 00397 } else if (filter_type == "deadspots") { 00398 return new LaserDeadSpotsDataFilter(config, logger, prefix, in_data_size, inbufs); 00399 } else if (filter_type == "min_merge") { 00400 return new LaserMinMergeDataFilter(in_data_size, inbufs); 00401 } else if (filter_type == "projection") { 00402 #ifdef HAVE_TF 00403 const float not_from_x = config->get_float((prefix + "not_from_x").c_str()); 00404 const float not_to_x = config->get_float((prefix + "not_to_x").c_str()); 00405 const float not_from_y = config->get_float((prefix + "not_from_y").c_str()); 00406 const float not_to_y = config->get_float((prefix + "not_to_y").c_str()); 00407 const float only_from_z = config->get_float((prefix + "only_from_z").c_str()); 00408 const float only_to_z = config->get_float((prefix + "only_to_z").c_str()); 00409 const std::string frame = 00410 config->get_string((prefix + "target_frame").c_str()); 00411 return new LaserProjectionDataFilter(tf_listener, frame, 00412 not_from_x, not_to_x, 00413 not_from_y, not_to_y, 00414 only_from_z, only_to_z, 00415 in_data_size, inbufs); 00416 #else 00417 throw Exception("Projection filter unavailable, tf missing"); 00418 #endif 00419 } else { 00420 throw Exception("Unknown filter type %s", filter_type.c_str()); 00421 } 00422 } 00423 00424 00425 /** Set threads to wait for in loop. 00426 * The threads produce data this thread depends on as input, therefore this 00427 * instance has to wait for these threads to get up to date data in each 00428 * loop. 00429 * @param threads threads this instance depends on 00430 */ 00431 void 00432 LaserFilterThread::set_wait_threads(std::list<LaserFilterThread *> &threads) 00433 { 00434 __wait_threads = threads; 00435 } 00436 00437 00438 /** Set wait barrier. 00439 * If there are any dependencies between laser filter threads a common 00440 * barrier is used to signal the end of filtering to reset internal 00441 * variables for the next loop. 00442 * @param barrier common "end of filtering" barrier 00443 */ 00444 void 00445 LaserFilterThread::set_wait_barrier(fawkes::Barrier *barrier) 00446 { 00447 __wait_barrier = barrier; 00448 }