Fawkes API  Fawkes Development Version
tf_thread.cpp
00001 
00002 /***************************************************************************
00003  *  tf_thread.cpp - Thread to exchange transforms
00004  *
00005  *  Created: Wed Oct 26 01:02:59 2011
00006  *  Copyright  2011  Tim Niemueller [www.niemueller.de]
00007  ****************************************************************************/
00008 
00009 /*  This program is free software; you can redistribute it and/or modify
00010  *  it under the terms of the GNU General Public License as published by
00011  *  the Free Software Foundation; either version 2 of the License, or
00012  *  (at your option) any later version.
00013  *
00014  *  This program is distributed in the hope that it will be useful,
00015  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00016  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017  *  GNU Library General Public License for more details.
00018  *
00019  *  Read the full text in the LICENSE.GPL file in the doc directory.
00020  */
00021 
00022 #include "tf_thread.h"
00023 
00024 #include <core/threading/mutex_locker.h>
00025 #include <ros/this_node.h>
00026 
00027 using namespace fawkes;
00028 
00029 /** @class RosTfThread "tf_thread.h"
00030  * Thread to exchange transforms between Fawkes and ROS.
00031  * This threads connects to Fawkes and ROS to read and write transforms.
00032  * Transforms received on one end are republished to the other side. To
00033  * Fawkes new frames are published during the sensor hook.
00034  * @author Tim Niemueller
00035  */
00036 
00037 /** Constructor. */
00038 RosTfThread::RosTfThread()
00039   : Thread("RosTfThread", Thread::OPMODE_WAITFORWAKEUP),
00040     BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_ACQUIRE),
00041     TransformAspect(TransformAspect::ONLY_PUBLISHER, "ROS"),
00042     BlackBoardInterfaceListener("RosTfThread")
00043 {
00044   __tf_msg_queue_mutex = new Mutex();
00045   __seq_num_mutex = new Mutex();
00046 }
00047 
00048 /** Destructor. */
00049 RosTfThread::~RosTfThread()
00050 {
00051   delete __tf_msg_queue_mutex;
00052   delete __seq_num_mutex;
00053 }
00054 
00055 
00056 
00057 void
00058 RosTfThread::init()
00059 {
00060   __active_queue = 0;
00061   __seq_num = 0;
00062 
00063   // Must do that before registering listener because we might already
00064   // get events right away
00065   __sub_tf = rosnode->subscribe("/tf", 100, &RosTfThread::tf_message_cb, this);
00066   __pub_tf = rosnode->advertise< ::tf::tfMessage >("/tf", 100);
00067 
00068   __tfifs = blackboard->open_multiple_for_reading<TransformInterface>("TF *");
00069 
00070   std::list<TransformInterface *>::iterator i;
00071   std::list<TransformInterface *>::iterator own_if = __tfifs.end();
00072   for (i = __tfifs.begin(); i != __tfifs.end(); ++i) {
00073     //logger->log_info(name(), "Opened %s", (*i)->uid());
00074     if (strcmp((*i)->id(), "TF ROS") == 0) {
00075       // that's our own Fawkes publisher, do NOT republish what we receive...
00076       own_if = i;
00077       blackboard->close(*i);
00078     } else {
00079       bbil_add_data_interface(*i);
00080       bbil_add_reader_interface(*i);
00081       bbil_add_writer_interface(*i);
00082     }
00083   }
00084   if (own_if != __tfifs.end()) __tfifs.erase(own_if);
00085   blackboard->register_listener(this);
00086 
00087   bbio_add_observed_create("TransformInterface", "TF *");
00088   blackboard->register_observer(this);
00089 
00090 }
00091 
00092 
00093 void
00094 RosTfThread::finalize()
00095 {
00096   blackboard->unregister_listener(this);
00097   blackboard->unregister_observer(this);
00098 
00099   __sub_tf.shutdown();
00100   __pub_tf.shutdown();
00101 
00102   std::list<TransformInterface *>::iterator i;
00103   for (i = __tfifs.begin(); i != __tfifs.end(); ++i) {
00104     blackboard->close(*i);
00105   }
00106   __tfifs.clear();
00107 }
00108 
00109 
00110 void
00111 RosTfThread::loop()
00112 {
00113   __tf_msg_queue_mutex->lock();
00114   unsigned int queue = __active_queue;
00115   __active_queue = 1 - __active_queue;
00116   __tf_msg_queue_mutex->unlock();
00117 
00118   while (! __tf_msg_queues[queue].empty()) {
00119     const ::tf::tfMessage::ConstPtr &msg = __tf_msg_queues[queue].front();
00120     const size_t tsize = msg->transforms.size();
00121     for (size_t i = 0; i < tsize; ++i) {
00122       const geometry_msgs::TransformStamped &ts = msg->transforms[i];
00123       const geometry_msgs::Vector3 &t = ts.transform.translation;
00124       const geometry_msgs::Quaternion &r = ts.transform.rotation;
00125 
00126       fawkes::Time time(ts.header.stamp.sec, ts.header.stamp.nsec / 1000);
00127 
00128       fawkes::tf::Transform tr(fawkes::tf::Quaternion(r.x, r.y, r.z, r.w),
00129                                fawkes::tf::Vector3(t.x, t.y, t.z));
00130       fawkes::tf::StampedTransform
00131         st(tr, time, ts.header.frame_id, ts.child_frame_id);
00132 
00133       tf_publisher->send_transform(st);
00134     }
00135     __tf_msg_queues[queue].pop();
00136   }
00137 }
00138 
00139 
00140 void
00141 RosTfThread::bb_interface_data_changed(fawkes::Interface *interface) throw()
00142 {
00143   TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface);
00144   if (! tfif) return;
00145 
00146   tfif->read();
00147 
00148 
00149   double *translation = tfif->translation();
00150   double *rotation = tfif->rotation();
00151   const Time *time = tfif->timestamp();
00152 
00153   geometry_msgs::Vector3 t;
00154   t.x = translation[0]; t.y = translation[1]; t.z = translation[2];
00155   geometry_msgs::Quaternion r;
00156   r.x = rotation[0]; r.y = rotation[1]; r.z = rotation[2]; r.w = rotation[3];
00157   geometry_msgs::Transform tr;
00158   tr.translation = t;
00159   tr.rotation = r;
00160 
00161   geometry_msgs::TransformStamped ts;
00162   __seq_num_mutex->lock();
00163   ts.header.seq = ++__seq_num;
00164   __seq_num_mutex->unlock();
00165   ts.header.stamp = ros::Time(time->get_sec(), time->get_nsec());
00166   ts.header.frame_id = tfif->frame();
00167   ts.child_frame_id = tfif->child_frame();
00168   ts.transform = tr;
00169 
00170   ::tf::tfMessage tmsg;
00171   tmsg.transforms.push_back(ts);
00172 
00173   __pub_tf.publish(tmsg);
00174 }
00175 
00176 
00177 void
00178 RosTfThread::bb_interface_created(const char *type, const char *id) throw()
00179 {
00180   if (strncmp(type, "TransformInterface", __INTERFACE_TYPE_SIZE) != 0)  return;
00181 
00182   TransformInterface *tfif;
00183   try {
00184     //logger->log_info(name(), "Opening %s:%s", type, id);
00185     tfif = blackboard->open_for_reading<TransformInterface>(id);
00186   } catch (Exception &e) {
00187     // ignored
00188     logger->log_warn(name(), "Failed to open %s:%s: %s", type, id, e.what());
00189     return;
00190   }
00191 
00192   try {
00193     bbil_add_data_interface(tfif);
00194     bbil_add_reader_interface(tfif);
00195     bbil_add_writer_interface(tfif);
00196     blackboard->update_listener(this);
00197     __tfifs.push_back(tfif);
00198   } catch (Exception &e) {
00199     blackboard->close(tfif);
00200     logger->log_warn(name(), "Failed to register for %s:%s: %s", type, id, e.what());
00201     return;
00202   }
00203 }
00204 
00205 void
00206 RosTfThread::bb_interface_writer_removed(fawkes::Interface *interface,
00207                                          unsigned int instance_serial) throw()
00208 {
00209   conditional_close(interface);
00210 }
00211 
00212 void
00213 RosTfThread::bb_interface_reader_removed(fawkes::Interface *interface,
00214                                          unsigned int instance_serial) throw()
00215 {
00216   conditional_close(interface);
00217 }
00218 
00219 void
00220 RosTfThread::conditional_close(Interface *interface) throw()
00221 {
00222   // Verify it's a TransformInterface
00223   TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface);
00224   if (! tfif) return;
00225   
00226   std::list<TransformInterface *>::iterator i;
00227   for (i = __tfifs.begin(); i != __tfifs.end(); ++i) {
00228     if (*interface == **i) {
00229       if (! interface->has_writer() && (interface->num_readers() == 1)) {
00230         // It's only us
00231         logger->log_info(name(), "Last on %s, closing", interface->uid());
00232         bbil_remove_data_interface(*i);
00233         bbil_remove_reader_interface(*i);
00234         bbil_remove_writer_interface(*i);
00235         blackboard->update_listener(this);
00236         blackboard->close(*i);
00237         __tfifs.erase(i);
00238         break;
00239       }
00240     }
00241   }
00242 }
00243 
00244 
00245 /** Callback function for ROS tf message subscription.
00246  * @param msg incoming message
00247  */
00248 void
00249 RosTfThread::tf_message_cb(const ::tf::tfMessage::ConstPtr &msg)
00250 {
00251   MutexLocker lock(__tf_msg_queue_mutex);
00252 
00253   std::map<std::string, std::string> *msg_header_map =
00254     msg->__connection_header.get();
00255   std::map<std::string, std::string>::iterator it =
00256     msg_header_map->find("callerid");
00257 
00258   if (it == msg_header_map->end()) {
00259     logger->log_warn(name(), "Message received without callerid");
00260   } else if (it->second != ros::this_node::getName()) {
00261     __tf_msg_queues[__active_queue].push(msg);
00262   }
00263 }