Fawkes API  Fawkes Development Version
laser_filter_plugin.cpp
00001 
00002 /***************************************************************************
00003  *  laser_filter_plugin.cpp - Fawkes Laser Filter Plugin
00004  *
00005  *  Created: Sun Mar 13 01:06:51 2011
00006  *  Copyright  2006-2011  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU Library General Public License for more details.
00019  *
00020  *  Read the full text in the LICENSE.GPL file in the doc directory.
00021  */
00022 
00023 #include "laser_filter_plugin.h"
00024 
00025 #include "filter_thread.h"
00026 
00027 #include <core/threading/barrier.h>
00028 #include <map>
00029 #include <set>
00030 #include <memory>
00031 
00032 using namespace fawkes;
00033 
00034 /** @class LaserFilterPlugin "laser_filter_plugin.h"
00035  * Laser filter plugin for Fawkes.
00036  * This plugin filters laser data. It reads laser data from one or more
00037  * interfaces, filters it, and writes to an output interface. It supports
00038  * a virtually arbitrary number of active filters.
00039  * @author Tim Niemueller
00040  */
00041 
00042 /** Constructor.
00043  * @param config Fawkes configuration
00044  */
00045 LaserFilterPlugin::LaserFilterPlugin(Configuration *config)
00046   : Plugin(config)
00047 {
00048   __barrier = NULL;
00049 
00050   std::set<std::string> configs;
00051   std::set<std::string> ignored_configs;
00052   std::map<std::string, LaserFilterThread *> threads;
00053 
00054   std::string prefix = "/plugins/laser-filter/";
00055 
00056   // Read configurations and spawn LaserFilterThreads
00057   std::auto_ptr<Configuration::ValueIterator> i(config->search(prefix.c_str()));
00058   while (i->next()) {
00059     std::string cfg_name = std::string(i->path()).substr(prefix.length());
00060     cfg_name = cfg_name.substr(0, cfg_name.find("/"));
00061 
00062     if ( (configs.find(cfg_name) == configs.end()) &&
00063          (ignored_configs.find(cfg_name) == ignored_configs.end()) ) {
00064 
00065       std::string cfg_prefix = prefix + cfg_name + "/";
00066 
00067       bool active = true;
00068       try {
00069         active = config->get_bool((cfg_prefix + "active").c_str());
00070       } catch (Exception &e) {} // ignored, assume enabled
00071 
00072       try {
00073         if (active) {
00074           LaserFilterThread *thread = new LaserFilterThread(cfg_name, cfg_prefix);
00075           thread_list.push_back(thread);
00076           threads[cfg_name] = thread;
00077           configs.insert(cfg_name);
00078         } else {
00079           //printf("Ignoring laser config %s\n", cfg_name.c_str());
00080           ignored_configs.insert(cfg_name);
00081         }
00082       } catch(Exception &e) {
00083         for (ThreadList::iterator i = thread_list.begin();
00084              i != thread_list.end(); ++i) {
00085           delete *i;
00086         }
00087         throw;
00088       }
00089     }
00090   }
00091 
00092   if ( thread_list.empty() ) {
00093     throw Exception("No active laser filters configured, aborting");
00094   }
00095 
00096   // Read input and output information for spawned configurations
00097   // for dependency detection
00098   std::map<std::string, std::list<std::string> > inputs;
00099   std::map<std::string, std::list<std::string> > outputs;
00100   std::set<std::string>::iterator c, d;
00101 
00102   for (c = configs.begin(); c != configs.end(); ++c) {
00103     std::string cinp = prefix + *c + "/in/";
00104     std::list<std::string> cinputs;
00105     std::auto_ptr<Configuration::ValueIterator> in(config->search(cinp.c_str()));
00106     while (in->next()) {
00107       if (in->is_string()) {
00108         cinputs.push_back(in->get_string());
00109       }
00110     }
00111 
00112     std::string coutp = prefix + *c + "/out/";
00113     std::list<std::string> coutputs;
00114     std::auto_ptr<Configuration::ValueIterator> out(config->search(coutp.c_str()));
00115     while (out->next()) {
00116       if (out->is_string()) {
00117         coutputs.push_back(out->get_string());
00118       }
00119     }
00120 
00121     inputs[*c] = cinputs;
00122     outputs[*c] = coutputs;
00123   }
00124 
00125   // Detect inter-thread dependencies, setup proper serialization by
00126   // create a list of threads that one threads depends on and setting
00127   // it. Setup common "end of filtering" barrier.
00128   try {
00129     bool has_deps = false;
00130     for (c = configs.begin(); c != configs.end(); ++c) {
00131 
00132       //printf("Config %s\n", c->c_str());
00133 
00134       std::list<LaserFilterThread *> depthreads;
00135 
00136       std::list<std::string>::iterator i, o;
00137       std::list<std::string> &cinputs = inputs[*c];
00138       for (i = cinputs.begin(); i != cinputs.end(); ++i) {
00139         //printf("  Input %s\n", i->c_str());
00140 
00141         for (d = configs.begin(); d != configs.end(); ++d) {
00142           if (*c == *d)  continue;
00143           //printf("    Config %s\n", d->c_str());
00144 
00145           std::list<std::string> &coutputs = outputs[*d];
00146           for (o = coutputs.begin(); o != coutputs.end(); ++o) {
00147             //printf("      Output %s\n", o->c_str());
00148             if (*i == *o) {
00149               has_deps = true;
00150               //printf("        *** Dep Thread matches %s for %s\n",
00151               //       d->c_str(), o->c_str());
00152               depthreads.push_back(threads[*d]);
00153               break;
00154             }
00155           }
00156         }
00157       }
00158 
00159       if (! depthreads.empty()) {
00160         depthreads.sort();
00161         depthreads.unique();
00162         threads[*c]->set_wait_threads(depthreads);
00163       }
00164     }
00165 
00166     // If any dependencies have been detected, have all threads wait at
00167     // a common "end of filtering" barrier, which allows for resetting
00168     // a "need to wait for done" flag.
00169     if (has_deps) {
00170       std::map<std::string, LaserFilterThread *>::iterator t;
00171       __barrier = new Barrier(threads.size());
00172       for (t = threads.begin(); t != threads.end(); ++t) {
00173         t->second->set_wait_barrier(__barrier);
00174       }
00175     }
00176 
00177   } catch (Exception &e) {
00178     ThreadList::iterator t;
00179     for (t = thread_list.begin(); t != thread_list.end(); ++t) {
00180       delete *t;
00181     }
00182     throw;
00183   }
00184 }
00185 
00186 
00187 LaserFilterPlugin::~LaserFilterPlugin()
00188 {
00189   delete __barrier;
00190 }
00191 
00192 
00193 PLUGIN_DESCRIPTION("Filter laser data in blackboard")
00194 EXPORT_PLUGIN(LaserFilterPlugin)