Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * tf_thread.cpp - Thread to exchange transforms 00004 * 00005 * Created: Wed Oct 26 01:02:59 2011 00006 * Copyright 2011 Tim Niemueller [www.niemueller.de] 00007 ****************************************************************************/ 00008 00009 /* This program is free software; you can redistribute it and/or modify 00010 * it under the terms of the GNU General Public License as published by 00011 * the Free Software Foundation; either version 2 of the License, or 00012 * (at your option) any later version. 00013 * 00014 * This program is distributed in the hope that it will be useful, 00015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 * GNU Library General Public License for more details. 00018 * 00019 * Read the full text in the LICENSE.GPL file in the doc directory. 00020 */ 00021 00022 #include "tf_thread.h" 00023 00024 #include <core/threading/mutex_locker.h> 00025 #include <ros/this_node.h> 00026 00027 using namespace fawkes; 00028 00029 /** @class RosTfThread "tf_thread.h" 00030 * Thread to exchange transforms between Fawkes and ROS. 00031 * This threads connects to Fawkes and ROS to read and write transforms. 00032 * Transforms received on one end are republished to the other side. To 00033 * Fawkes new frames are published during the sensor hook. 00034 * @author Tim Niemueller 00035 */ 00036 00037 /** Constructor. */ 00038 RosTfThread::RosTfThread() 00039 : Thread("RosTfThread", Thread::OPMODE_WAITFORWAKEUP), 00040 BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_ACQUIRE), 00041 TransformAspect(TransformAspect::ONLY_PUBLISHER, "ROS"), 00042 BlackBoardInterfaceListener("RosTfThread") 00043 { 00044 __tf_msg_queue_mutex = new Mutex(); 00045 __seq_num_mutex = new Mutex(); 00046 } 00047 00048 /** Destructor. */ 00049 RosTfThread::~RosTfThread() 00050 { 00051 delete __tf_msg_queue_mutex; 00052 delete __seq_num_mutex; 00053 } 00054 00055 00056 00057 void 00058 RosTfThread::init() 00059 { 00060 __active_queue = 0; 00061 __seq_num = 0; 00062 00063 // Must do that before registering listener because we might already 00064 // get events right away 00065 __sub_tf = rosnode->subscribe("/tf", 100, &RosTfThread::tf_message_cb, this); 00066 __pub_tf = rosnode->advertise< ::tf::tfMessage >("/tf", 100); 00067 00068 __tfifs = blackboard->open_multiple_for_reading<TransformInterface>("TF *"); 00069 00070 std::list<TransformInterface *>::iterator i; 00071 std::list<TransformInterface *>::iterator own_if = __tfifs.end(); 00072 for (i = __tfifs.begin(); i != __tfifs.end(); ++i) { 00073 //logger->log_info(name(), "Opened %s", (*i)->uid()); 00074 if (strcmp((*i)->id(), "TF ROS") == 0) { 00075 // that's our own Fawkes publisher, do NOT republish what we receive... 00076 own_if = i; 00077 blackboard->close(*i); 00078 } else { 00079 bbil_add_data_interface(*i); 00080 bbil_add_reader_interface(*i); 00081 bbil_add_writer_interface(*i); 00082 } 00083 } 00084 if (own_if != __tfifs.end()) __tfifs.erase(own_if); 00085 blackboard->register_listener(this); 00086 00087 bbio_add_observed_create("TransformInterface", "TF *"); 00088 blackboard->register_observer(this); 00089 00090 } 00091 00092 00093 void 00094 RosTfThread::finalize() 00095 { 00096 blackboard->unregister_listener(this); 00097 blackboard->unregister_observer(this); 00098 00099 __sub_tf.shutdown(); 00100 __pub_tf.shutdown(); 00101 00102 std::list<TransformInterface *>::iterator i; 00103 for (i = __tfifs.begin(); i != __tfifs.end(); ++i) { 00104 blackboard->close(*i); 00105 } 00106 __tfifs.clear(); 00107 } 00108 00109 00110 void 00111 RosTfThread::loop() 00112 { 00113 __tf_msg_queue_mutex->lock(); 00114 unsigned int queue = __active_queue; 00115 __active_queue = 1 - __active_queue; 00116 __tf_msg_queue_mutex->unlock(); 00117 00118 while (! __tf_msg_queues[queue].empty()) { 00119 const ::tf::tfMessage::ConstPtr &msg = __tf_msg_queues[queue].front(); 00120 const size_t tsize = msg->transforms.size(); 00121 for (size_t i = 0; i < tsize; ++i) { 00122 const geometry_msgs::TransformStamped &ts = msg->transforms[i]; 00123 const geometry_msgs::Vector3 &t = ts.transform.translation; 00124 const geometry_msgs::Quaternion &r = ts.transform.rotation; 00125 00126 fawkes::Time time(ts.header.stamp.sec, ts.header.stamp.nsec / 1000); 00127 00128 fawkes::tf::Transform tr(fawkes::tf::Quaternion(r.x, r.y, r.z, r.w), 00129 fawkes::tf::Vector3(t.x, t.y, t.z)); 00130 fawkes::tf::StampedTransform 00131 st(tr, time, ts.header.frame_id, ts.child_frame_id); 00132 00133 tf_publisher->send_transform(st); 00134 } 00135 __tf_msg_queues[queue].pop(); 00136 } 00137 } 00138 00139 00140 void 00141 RosTfThread::bb_interface_data_changed(fawkes::Interface *interface) throw() 00142 { 00143 TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface); 00144 if (! tfif) return; 00145 00146 tfif->read(); 00147 00148 00149 double *translation = tfif->translation(); 00150 double *rotation = tfif->rotation(); 00151 const Time *time = tfif->timestamp(); 00152 00153 geometry_msgs::Vector3 t; 00154 t.x = translation[0]; t.y = translation[1]; t.z = translation[2]; 00155 geometry_msgs::Quaternion r; 00156 r.x = rotation[0]; r.y = rotation[1]; r.z = rotation[2]; r.w = rotation[3]; 00157 geometry_msgs::Transform tr; 00158 tr.translation = t; 00159 tr.rotation = r; 00160 00161 geometry_msgs::TransformStamped ts; 00162 __seq_num_mutex->lock(); 00163 ts.header.seq = ++__seq_num; 00164 __seq_num_mutex->unlock(); 00165 ts.header.stamp = ros::Time(time->get_sec(), time->get_nsec()); 00166 ts.header.frame_id = tfif->frame(); 00167 ts.child_frame_id = tfif->child_frame(); 00168 ts.transform = tr; 00169 00170 ::tf::tfMessage tmsg; 00171 tmsg.transforms.push_back(ts); 00172 00173 __pub_tf.publish(tmsg); 00174 } 00175 00176 00177 void 00178 RosTfThread::bb_interface_created(const char *type, const char *id) throw() 00179 { 00180 if (strncmp(type, "TransformInterface", __INTERFACE_TYPE_SIZE) != 0) return; 00181 00182 TransformInterface *tfif; 00183 try { 00184 //logger->log_info(name(), "Opening %s:%s", type, id); 00185 tfif = blackboard->open_for_reading<TransformInterface>(id); 00186 } catch (Exception &e) { 00187 // ignored 00188 logger->log_warn(name(), "Failed to open %s:%s: %s", type, id, e.what()); 00189 return; 00190 } 00191 00192 try { 00193 bbil_add_data_interface(tfif); 00194 bbil_add_reader_interface(tfif); 00195 bbil_add_writer_interface(tfif); 00196 blackboard->update_listener(this); 00197 __tfifs.push_back(tfif); 00198 } catch (Exception &e) { 00199 blackboard->close(tfif); 00200 logger->log_warn(name(), "Failed to register for %s:%s: %s", type, id, e.what()); 00201 return; 00202 } 00203 } 00204 00205 void 00206 RosTfThread::bb_interface_writer_removed(fawkes::Interface *interface, 00207 unsigned int instance_serial) throw() 00208 { 00209 conditional_close(interface); 00210 } 00211 00212 void 00213 RosTfThread::bb_interface_reader_removed(fawkes::Interface *interface, 00214 unsigned int instance_serial) throw() 00215 { 00216 conditional_close(interface); 00217 } 00218 00219 void 00220 RosTfThread::conditional_close(Interface *interface) throw() 00221 { 00222 // Verify it's a TransformInterface 00223 TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface); 00224 if (! tfif) return; 00225 00226 std::list<TransformInterface *>::iterator i; 00227 for (i = __tfifs.begin(); i != __tfifs.end(); ++i) { 00228 if (*interface == **i) { 00229 if (! interface->has_writer() && (interface->num_readers() == 1)) { 00230 // It's only us 00231 logger->log_info(name(), "Last on %s, closing", interface->uid()); 00232 bbil_remove_data_interface(*i); 00233 bbil_remove_reader_interface(*i); 00234 bbil_remove_writer_interface(*i); 00235 blackboard->update_listener(this); 00236 blackboard->close(*i); 00237 __tfifs.erase(i); 00238 break; 00239 } 00240 } 00241 } 00242 } 00243 00244 00245 /** Callback function for ROS tf message subscription. 00246 * @param msg incoming message 00247 */ 00248 void 00249 RosTfThread::tf_message_cb(const ::tf::tfMessage::ConstPtr &msg) 00250 { 00251 MutexLocker lock(__tf_msg_queue_mutex); 00252 00253 std::map<std::string, std::string> *msg_header_map = 00254 msg->__connection_header.get(); 00255 std::map<std::string, std::string>::iterator it = 00256 msg_header_map->find("callerid"); 00257 00258 if (it == msg_header_map->end()) { 00259 logger->log_warn(name(), "Message received without callerid"); 00260 } else if (it->second != ros::this_node::getName()) { 00261 __tf_msg_queues[__active_queue].push(msg); 00262 } 00263 }