Fawkes API  Fawkes Development Version
laserscan_thread.cpp
00001 
00002 /***************************************************************************
00003  *  laserscan_thread.cpp - Thread to exchange laser scans
00004  *
00005  *  Created: Tue May 29 19:41:18 2012
00006  *  Copyright  2011-2012  Tim Niemueller [www.niemueller.de]
00007  ****************************************************************************/
00008 
00009 /*  This program is free software; you can redistribute it and/or modify
00010  *  it under the terms of the GNU General Public License as published by
00011  *  the Free Software Foundation; either version 2 of the License, or
00012  *  (at your option) any later version.
00013  *
00014  *  This program is distributed in the hope that it will be useful,
00015  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00016  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017  *  GNU Library General Public License for more details.
00018  *
00019  *  Read the full text in the LICENSE.GPL file in the doc directory.
00020  */
00021 
00022 #include "laserscan_thread.h"
00023 
00024 #include <core/threading/mutex_locker.h>
00025 #include <utils/math/angle.h>
00026 
00027 #include <ros/this_node.h>
00028 #include <sensor_msgs/LaserScan.h>
00029 
00030 #include <fnmatch.h>
00031 
00032 using namespace fawkes;
00033 
00034 /** @class RosLaserScanThread "pcl_thread.h"
00035  * Thread to exchange point clouds between Fawkes and ROS.
00036  * @author Tim Niemueller
00037  */
00038 
00039 /** Constructor. */
00040 RosLaserScanThread::RosLaserScanThread()
00041   : Thread("RosLaserScanThread", Thread::OPMODE_WAITFORWAKEUP),
00042     BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_PROCESS),
00043     BlackBoardInterfaceListener("RosLaserScanThread")
00044 {
00045   __ls_msg_queue_mutex = new Mutex();
00046   __seq_num_mutex = new Mutex();
00047 }
00048 
00049 /** Destructor. */
00050 RosLaserScanThread::~RosLaserScanThread()
00051 {
00052   delete __ls_msg_queue_mutex;
00053   delete __seq_num_mutex;
00054 }
00055 
00056 
00057 std::string
00058 RosLaserScanThread::topic_name(const char *if_id, const char *suffix)
00059 {
00060   std::string topic_name = std::string("fawkes_scans/") + if_id + "_" + suffix;
00061   std::string::size_type pos = 0;
00062   while ((pos = topic_name.find("-", pos)) != std::string::npos) {
00063     topic_name.replace(pos, 1, "_");
00064   }
00065   pos = 0;
00066   while ((pos = topic_name.find(" ", pos)) != std::string::npos) {
00067     topic_name.replace(pos, 1, "_");
00068   }
00069   return topic_name;
00070 }
00071 
00072 
00073 void
00074 RosLaserScanThread::init()
00075 {
00076   __active_queue = 0;
00077   __seq_num = 0;
00078 
00079   // Must do that before registering listener because we might already
00080   // get events right away
00081   __sub_ls = rosnode->subscribe("scan", 100,
00082                                 &RosLaserScanThread::laser_scan_message_cb, this);
00083 
00084   __ls360_ifs =
00085     blackboard->open_multiple_for_reading<Laser360Interface>("*");
00086   __ls720_ifs =
00087     blackboard->open_multiple_for_reading<Laser720Interface>("*");
00088 
00089   std::list<Laser360Interface *>::iterator i360;
00090   for (i360 = __ls360_ifs.begin(); i360 != __ls360_ifs.end(); ++i360) {
00091     (*i360)->read();
00092     logger->log_info(name(), "Opened %s", (*i360)->uid());
00093     bbil_add_data_interface(*i360);
00094     bbil_add_reader_interface(*i360);
00095     bbil_add_writer_interface(*i360);
00096 
00097     std::string topname = topic_name((*i360)->id(), "360");
00098 
00099     PublisherInfo pi;
00100     pi.pub =
00101       rosnode->advertise<sensor_msgs::LaserScan>(topname, 1);
00102 
00103     logger->log_info(name(), "Publishing laser scan %s at %s",
00104                      (*i360)->uid(), topname.c_str());
00105 
00106     pi.msg.header.frame_id = (*i360)->frame();
00107     pi.msg.angle_min = 0;
00108     pi.msg.angle_max = 2*M_PI;
00109     pi.msg.angle_increment = deg2rad(1);
00110     pi.msg.ranges.resize(360);
00111 
00112     __pubs[(*i360)->uid()] = pi;
00113   }
00114   std::list<Laser720Interface *>::iterator i720;
00115   for (i720 = __ls720_ifs.begin(); i720 != __ls720_ifs.end(); ++i720) {
00116     logger->log_info(name(), "Opened %s", (*i720)->uid());
00117     bbil_add_data_interface(*i720);
00118     bbil_add_reader_interface(*i720);
00119     bbil_add_writer_interface(*i720);
00120 
00121     std::string topname = topic_name((*i720)->id(), "720");
00122 
00123     PublisherInfo pi;
00124     pi.pub =
00125       rosnode->advertise<sensor_msgs::LaserScan>(topname, 1);
00126 
00127     logger->log_info(name(), "Publishing laser scan %s at %s",
00128                      (*i720)->uid(), topname.c_str());
00129 
00130     pi.msg.header.frame_id = (*i720)->frame();
00131     pi.msg.angle_min = 0;
00132     pi.msg.angle_max = 2*M_PI;
00133     pi.msg.angle_increment = deg2rad(0.5);
00134     pi.msg.ranges.resize(720);
00135 
00136     __pubs[(*i720)->uid()] = pi;
00137   }
00138   blackboard->register_listener(this);
00139 
00140   bbio_add_observed_create("Laser360Interface", "*");
00141   bbio_add_observed_create("Laser720Interface", "*");
00142   blackboard->register_observer(this);
00143 }
00144 
00145 
00146 void
00147 RosLaserScanThread::finalize()
00148 {
00149   blackboard->unregister_listener(this);
00150   blackboard->unregister_observer(this);
00151 
00152   __sub_ls.shutdown();
00153 
00154   std::map<std::string, PublisherInfo>::iterator p;
00155   for (p = __pubs.begin(); p != __pubs.end(); ++p) {
00156     p->second.pub.shutdown();
00157   }
00158 
00159   std::list<Laser360Interface *>::iterator i360;
00160   for (i360 = __ls360_ifs.begin(); i360 != __ls360_ifs.end(); ++i360) {
00161     blackboard->close(*i360);
00162   }
00163   __ls360_ifs.clear();
00164   std::list<Laser720Interface *>::iterator i720;
00165   for (i720 = __ls720_ifs.begin(); i720 != __ls720_ifs.end(); ++i720) {
00166     blackboard->close(*i720);
00167   }
00168   __ls720_ifs.clear();
00169 }
00170 
00171 
00172 void
00173 RosLaserScanThread::loop()
00174 {
00175   __ls_msg_queue_mutex->lock();
00176   unsigned int queue = __active_queue;
00177   __active_queue = 1 - __active_queue;
00178   __ls_msg_queue_mutex->unlock();
00179 
00180   while (! __ls_msg_queues[queue].empty()) {
00181     const sensor_msgs::LaserScan::ConstPtr &msg = __ls_msg_queues[queue].front();
00182 
00183     // Check if interface exists, open if it does not
00184     std::map<std::string, std::string> *msg_header_map =
00185       msg->__connection_header.get();
00186     std::map<std::string, std::string>::iterator it =
00187       msg_header_map->find("callerid");
00188     const std::string &callerid = it->second;
00189 
00190     // for now we only create 360 interfaces, might add on that later
00191     if (it == msg_header_map->end()) {
00192       logger->log_warn(name(), "Received laser scan from ROS without caller ID,"
00193                        "ignoring");
00194     } else {
00195       bool have_interface = true;
00196       if (__ls360_wifs.find(callerid) == __ls360_wifs.end()) {
00197         try {
00198           std::string id = std::string("ROS Laser ") + callerid;
00199           Laser360Interface *ls360if =
00200             blackboard->open_for_writing<Laser360Interface>(id.c_str());
00201           __ls360_wifs[callerid] = ls360if;
00202         } catch (Exception &e) {
00203           logger->log_warn(name(), "Failed to open ROS laser interface for "
00204                            "message from node %s, exception follows",
00205                            callerid.c_str());
00206           logger->log_warn(name(), e);
00207           have_interface = false;
00208         }
00209       }
00210 
00211       if (have_interface) {
00212         // update interface with laser data
00213         Laser360Interface *ls360if = __ls360_wifs[it->second];
00214         ls360if->set_frame(msg->header.frame_id.c_str());
00215         float distances[360];
00216         for (unsigned int a = 0; a < 360; ++a) {
00217           float a_rad = deg2rad(a);
00218           if ((a_rad < msg->angle_min) || (a_rad > msg->angle_max)) {
00219             distances[a] = 0.;
00220           } else {
00221             // get closest ray from message
00222             int idx =
00223               (int)roundf((a_rad - msg->angle_min) / msg->angle_increment);
00224             distances[a] = msg->ranges[idx];
00225           }
00226         }
00227         ls360if->set_distances(distances);
00228         ls360if->write();
00229       }
00230     }
00231 
00232     __ls_msg_queues[queue].pop();
00233   }
00234 }
00235 
00236 
00237 void
00238 RosLaserScanThread::bb_interface_data_changed(fawkes::Interface *interface) throw()
00239 {
00240   Laser360Interface *ls360if = dynamic_cast<Laser360Interface *>(interface);
00241   Laser720Interface *ls720if = dynamic_cast<Laser720Interface *>(interface);
00242 
00243   PublisherInfo &pi = __pubs[interface->uid()];
00244   sensor_msgs::LaserScan &msg = pi.msg;
00245 
00246   if (ls360if) {
00247     ls360if->read();
00248 
00249     const Time *time = ls360if->timestamp();
00250 
00251     __seq_num_mutex->lock();
00252     msg.header.seq = ++__seq_num;
00253     __seq_num_mutex->unlock();
00254     msg.header.stamp = ros::Time(time->get_sec(), time->get_nsec());
00255     msg.header.frame_id = ls360if->frame();
00256 
00257     msg.angle_min = 0;
00258     msg.angle_max = 2*M_PI;
00259     msg.angle_increment = deg2rad(1);
00260     msg.range_min = 0.;
00261     msg.range_max = 1000.;
00262     msg.ranges.resize(360);
00263     memcpy(&msg.ranges[0], ls360if->distances(), 360*sizeof(float));
00264 
00265     pi.pub.publish(pi.msg);
00266 
00267   } else if (ls720if) {
00268     ls720if->read();
00269 
00270     const Time *time = ls720if->timestamp();
00271 
00272     sensor_msgs::LaserScan msg;
00273     __seq_num_mutex->lock();
00274     msg.header.seq = ++__seq_num;
00275     __seq_num_mutex->unlock();
00276     msg.header.stamp = ros::Time(time->get_sec(), time->get_nsec());
00277     msg.header.frame_id = ls720if->frame();
00278 
00279     msg.angle_min = 0;
00280     msg.angle_max = 2*M_PI;
00281     msg.angle_increment = deg2rad(1);
00282     msg.range_min = 0.;
00283     msg.range_max = 1000.;
00284     msg.ranges.resize(720);
00285     memcpy(&msg.ranges[0], ls720if->distances(), 720*sizeof(float));
00286 
00287     pi.pub.publish(pi.msg);
00288   }
00289 
00290 }
00291 
00292 
00293 void
00294 RosLaserScanThread::bb_interface_created(const char *type, const char *id) throw()
00295 {
00296   // Ignore ID pattern of our own interfaces
00297   if (fnmatch("ROS *", id, FNM_NOESCAPE) == 0) return;
00298 
00299   if (strncmp(type, "Laser360Interface", __INTERFACE_TYPE_SIZE) == 0) {
00300 
00301     Laser360Interface *ls360if;
00302     try {
00303       logger->log_info(name(), "Opening %s:%s", type, id);
00304       ls360if = blackboard->open_for_reading<Laser360Interface>(id);
00305     } catch (Exception &e) {
00306       // ignored
00307       logger->log_warn(name(), "Failed to open %s:%s: %s", type, id, e.what());
00308       return;
00309     }
00310 
00311     try {
00312       bbil_add_data_interface(ls360if);
00313       bbil_add_reader_interface(ls360if);
00314       bbil_add_writer_interface(ls360if);
00315 
00316       std::string topname = topic_name(ls360if->id(), "360");
00317 
00318       PublisherInfo pi;
00319       pi.pub = rosnode->advertise<sensor_msgs::LaserScan>(topname, 1);
00320 
00321       logger->log_info(name(), "Publishing laser scan %s at %s",
00322                        ls360if->uid(), topname.c_str());
00323 
00324       pi.msg.header.frame_id = ls360if->frame();
00325       pi.msg.angle_min = 0;
00326       pi.msg.angle_max = 2*M_PI;
00327       pi.msg.angle_increment = deg2rad(1);
00328       pi.msg.ranges.resize(360);
00329 
00330       __pubs[ls360if->uid()] = pi;    
00331 
00332       blackboard->update_listener(this);
00333       __ls360_ifs.push_back(ls360if);
00334     } catch (Exception &e) {
00335       blackboard->close(ls360if);
00336       logger->log_warn(name(), "Failed to register for %s:%s: %s",
00337                        type, id, e.what());
00338       return;
00339     }
00340   } else if (strncmp(type, "Laser720Interface", __INTERFACE_TYPE_SIZE) == 0) {
00341 
00342     Laser720Interface *ls720if;
00343     try {
00344       logger->log_info(name(), "Opening %s:%s", type, id);
00345       ls720if = blackboard->open_for_reading<Laser720Interface>(id);
00346     } catch (Exception &e) {
00347       // ignored
00348       logger->log_warn(name(), "Failed to open %s:%s: %s", type, id, e.what());
00349       return;
00350     }
00351 
00352     try {
00353       bbil_add_data_interface(ls720if);
00354       bbil_add_reader_interface(ls720if);
00355       bbil_add_writer_interface(ls720if);
00356 
00357       std::string topname = topic_name(ls720if->id(), "720");
00358 
00359       PublisherInfo pi;
00360       pi.pub = rosnode->advertise<sensor_msgs::LaserScan>(topname, 1);
00361 
00362       logger->log_info(name(), "Publishing laser scan %s at %s",
00363                        ls720if->uid(), topname.c_str());
00364 
00365       pi.msg.header.frame_id = ls720if->frame();
00366       pi.msg.angle_min = 0;
00367       pi.msg.angle_max = 2*M_PI;
00368       pi.msg.angle_increment = deg2rad(0.5);
00369       pi.msg.ranges.resize(720);
00370 
00371       __pubs[ls720if->uid()] = pi;
00372 
00373       blackboard->update_listener(this);
00374       __ls720_ifs.push_back(ls720if);
00375     } catch (Exception &e) {
00376       blackboard->close(ls720if);
00377       logger->log_warn(name(), "Failed to register for %s:%s: %s",
00378                        type, id, e.what());
00379       return;
00380     }
00381   }
00382 }
00383 
00384 void
00385 RosLaserScanThread::bb_interface_writer_removed(fawkes::Interface *interface,
00386                                          unsigned int instance_serial) throw()
00387 {
00388   conditional_close(interface);
00389 }
00390 
00391 void
00392 RosLaserScanThread::bb_interface_reader_removed(fawkes::Interface *interface,
00393                                          unsigned int instance_serial) throw()
00394 {
00395   conditional_close(interface);
00396 }
00397 
00398 void
00399 RosLaserScanThread::conditional_close(Interface *interface) throw()
00400 {
00401   // Verify it's a laser interface
00402   Laser360Interface *ls360if = dynamic_cast<Laser360Interface *>(interface);
00403   Laser720Interface *ls720if = dynamic_cast<Laser720Interface *>(interface);
00404 
00405   if (ls360if) {
00406     std::list<Laser360Interface *>::iterator i;
00407     for (i = __ls360_ifs.begin(); i != __ls360_ifs.end(); ++i) {
00408       if (*ls360if == **i) {
00409         if (! ls360if->has_writer() && (ls360if->num_readers() == 1)) {
00410           // It's only us
00411           logger->log_info(name(), "Last on %s, closing", ls360if->uid());
00412           bbil_remove_data_interface(*i);
00413           bbil_remove_reader_interface(*i);
00414           bbil_remove_writer_interface(*i);
00415           blackboard->update_listener(this);
00416           blackboard->close(*i);
00417           __ls360_ifs.erase(i);
00418           break;
00419         }
00420       }
00421     }
00422   } else if (ls720if) {
00423     std::list<Laser720Interface *>::iterator i;
00424     for (i = __ls720_ifs.begin(); i != __ls720_ifs.end(); ++i) {
00425       if (*ls720if == **i) {
00426         if (! ls720if->has_writer() && (ls720if->num_readers() == 1)) {
00427           // It's only us
00428           logger->log_info(name(), "Last on %s, closing", ls720if->uid());
00429           bbil_remove_data_interface(*i);
00430           bbil_remove_reader_interface(*i);
00431           bbil_remove_writer_interface(*i);
00432           blackboard->update_listener(this);
00433           blackboard->close(*i);
00434           __ls720_ifs.erase(i);
00435           break;
00436         }
00437       }
00438     }
00439   }
00440 }
00441 
00442 
00443 /** Callback function for ROS laser scan message subscription.
00444  * @param msg incoming message
00445  */
00446 void
00447 RosLaserScanThread::laser_scan_message_cb(const sensor_msgs::LaserScan::ConstPtr &msg)
00448 {
00449   MutexLocker lock(__ls_msg_queue_mutex);
00450 
00451   std::map<std::string, std::string> *msg_header_map =
00452     msg->__connection_header.get();
00453   std::map<std::string, std::string>::iterator it =
00454     msg_header_map->find("callerid");
00455 
00456   if (it == msg_header_map->end()) {
00457     logger->log_warn(name(), "Message received without callerid");
00458   } else if (it->second != ros::this_node::getName()) {
00459     __ls_msg_queues[__active_queue].push(msg);
00460   }
00461 }