Fawkes API  Fawkes Development Version
tf_thread.cpp
1 
2 /***************************************************************************
3  * tf_thread.cpp - Thread to exchange transforms
4  *
5  * Created: Wed Oct 26 01:02:59 2011
6  * Copyright 2011 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "tf_thread.h"
23 
24 #include <core/threading/mutex_locker.h>
25 #include <ros/this_node.h>
26 
27 using namespace fawkes;
28 
29 /** @class RosTfThread "tf_thread.h"
30  * Thread to exchange transforms between Fawkes and ROS.
31  * This threads connects to Fawkes and ROS to read and write transforms.
32  * Transforms received on one end are republished to the other side. To
33  * Fawkes new frames are published during the sensor hook.
34  * @author Tim Niemueller
35  */
36 
37 /** Constructor. */
39  : Thread("RosTfThread", Thread::OPMODE_WAITFORWAKEUP),
40  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_ACQUIRE),
41  TransformAspect(TransformAspect::DEFER_PUBLISHER),
42  BlackBoardInterfaceListener("RosTfThread")
43 {
44  __tf_msg_queue_mutex = new Mutex();
45  __seq_num_mutex = new Mutex();
46  __last_update = new Time();
47 }
48 
49 /** Destructor. */
51 {
52  delete __tf_msg_queue_mutex;
53  delete __seq_num_mutex;
54  delete __last_update;
55 }
56 
57 
58 
59 void
61 {
62  __active_queue = 0;
63  __seq_num = 0;
64  __last_update->set_clock(clock);
65  __last_update->set_time(0, 0);
66 
67  __cfg_use_tf2 = config->get_bool("/ros/tf/use_tf2");
68  __cfg_update_interval = 1.0;
69  try {
70  __cfg_update_interval = config->get_float("/ros/tf/static-update-interval");
71  } catch (Exception &e) {} // ignored, use default
72 
73  // Must do that before registering listener because we might already
74  // get events right away
75  if (__cfg_use_tf2) {
76 #ifndef HAVE_TF2_MSGS
77  throw Exception("tf2 enabled in config but not available at compile time");
78 #else
79  __sub_tf = rosnode->subscribe("tf", 100, &RosTfThread::tf_message_cb_dynamic, this);
80  __sub_static_tf = rosnode->subscribe("tf_static", 100, &RosTfThread::tf_message_cb_static, this);
81  __pub_tf = rosnode->advertise< tf2_msgs::TFMessage >("tf", 100);
82  __pub_static_tf = rosnode->advertise< tf2_msgs::TFMessage >("tf_static", 100, /* latch */ true);
83 #endif
84  } else {
85  __sub_tf = rosnode->subscribe("tf", 100, &RosTfThread::tf_message_cb, this);
86  __pub_tf = rosnode->advertise< ::tf::tfMessage >("tf", 100);
87  }
88 
90  std::list<TransformInterface *>::iterator i;
91  for (i = __tfifs.begin(); i != __tfifs.end(); ++i) {
95  }
97 
98  publish_static_transforms_to_ros();
99 
100  bbio_add_observed_create("TransformInterface", "/tf*");
102 }
103 
104 
105 void
107 {
110 
111  __sub_tf.shutdown();
112  __pub_tf.shutdown();
113 
114  std::list<TransformInterface *>::iterator i;
115  for (i = __tfifs.begin(); i != __tfifs.end(); ++i) {
116  blackboard->close(*i);
117  }
118  __tfifs.clear();
119 }
120 
121 
122 void
124 {
125  __tf_msg_queue_mutex->lock();
126  unsigned int queue = __active_queue;
127  __active_queue = 1 - __active_queue;
128  __tf_msg_queue_mutex->unlock();
129 
130  if (__cfg_use_tf2) {
131 #ifdef HAVE_TF2_MSGS
132  while (! __tf2_msg_queues[queue].empty()) {
133  const std::pair<bool, tf2_msgs::TFMessage::ConstPtr> &q = __tf2_msg_queues[queue].front();
134  const tf2_msgs::TFMessage::ConstPtr &msg = q.second;
135  const size_t tsize = msg->transforms.size();
136  for (size_t i = 0; i < tsize; ++i) {
137  publish_transform_to_fawkes(msg->transforms[i], q.first);
138  }
139  __tf2_msg_queues[queue].pop();
140  }
141 #endif
142  } else {
143  while (! __tf_msg_queues[queue].empty()) {
144  const ::tf::tfMessage::ConstPtr &msg = __tf_msg_queues[queue].front();
145  const size_t tsize = msg->transforms.size();
146  for (size_t i = 0; i < tsize; ++i) {
147  geometry_msgs::TransformStamped ts = msg->transforms[i];
148  if (! ts.header.frame_id.empty() && ts.header.frame_id[0] == '/') {
149  ts.header.frame_id = ts.header.frame_id.substr(1);
150  }
151  if (! ts.child_frame_id.empty() && ts.child_frame_id[0] == '/') {
152  ts.child_frame_id = ts.child_frame_id.substr(1);
153  }
154  publish_transform_to_fawkes(ts);
155  }
156  __tf_msg_queues[queue].pop();
157  }
158 
159  fawkes::Time now(clock);
160  if ((now - __last_update) > __cfg_update_interval) {
161  __last_update->stamp();
162 
163  publish_static_transforms_to_ros();
164  }
165  }
166 }
167 
168 
169 void
171 {
172  TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface);
173  if (! tfif) return;
174 
175  tfif->read();
176 
177  if (__cfg_use_tf2 && tfif->is_static_transform()) {
178  publish_static_transforms_to_ros();
179  } else {
180  if (__cfg_use_tf2) {
181 #ifdef HAVE_TF2_MSGS
182  tf2_msgs::TFMessage tmsg;
183  tmsg.transforms.push_back(create_transform_stamped(tfif));
184  __pub_tf.publish(tmsg);
185 #endif
186  } else {
187  ::tf::tfMessage tmsg;
188  if (tfif->is_static_transform()) {
189  // date time stamps slightly into the future so they are valid
190  // for longer and need less frequent updates.
191  fawkes::Time timestamp = fawkes::Time(clock) + (__cfg_update_interval * 1.1);
192 
193  tmsg.transforms.push_back(create_transform_stamped(tfif, &timestamp));
194  } else {
195  tmsg.transforms.push_back(create_transform_stamped(tfif));
196  }
197  __pub_tf.publish(tmsg);
198  }
199  }
200 }
201 
202 
203 void
204 RosTfThread::bb_interface_created(const char *type, const char *id) throw()
205 {
206  if (strncmp(type, "TransformInterface", __INTERFACE_TYPE_SIZE) != 0) return;
207 
208  for (const auto &f : __ros_frames) {
209  // ignore interfaces that we publish ourself
210  if (f == id) return;
211  }
212 
213  TransformInterface *tfif;
214  try {
215  //logger->log_info(name(), "Opening %s:%s", type, id);
217  } catch (Exception &e) {
218  // ignored
219  logger->log_warn(name(), "Failed to open %s:%s: %s", type, id, e.what());
220  return;
221  }
222 
223  try {
228  __tfifs.push_back(tfif);
229  } catch (Exception &e) {
230  blackboard->close(tfif);
231  logger->log_warn(name(), "Failed to register for %s:%s: %s", type, id, e.what());
232  return;
233  }
234 }
235 
236 void
238  unsigned int instance_serial) throw()
239 {
240  conditional_close(interface);
241 }
242 
243 void
245  unsigned int instance_serial) throw()
246 {
247  conditional_close(interface);
248 }
249 
250 void
251 RosTfThread::conditional_close(Interface *interface) throw()
252 {
253  // Verify it's a TransformInterface
254  TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface);
255  if (! tfif) return;
256 
257  std::list<TransformInterface *>::iterator i;
258  for (i = __tfifs.begin(); i != __tfifs.end(); ++i) {
259  if (*interface == **i) {
260  if (! interface->has_writer() && (interface->num_readers() == 1)) {
261  // It's only us
262  logger->log_info(name(), "Last on %s, closing", interface->uid());
267  blackboard->close(*i);
268  __tfifs.erase(i);
269  break;
270  }
271  }
272  }
273 }
274 
275 
276 geometry_msgs::TransformStamped
277 RosTfThread::create_transform_stamped(TransformInterface *tfif, const Time *time)
278 {
279  double *translation = tfif->translation();
280  double *rotation = tfif->rotation();
281  if (! time) time = tfif->timestamp();
282 
283  geometry_msgs::Vector3 t;
284  t.x = translation[0]; t.y = translation[1]; t.z = translation[2];
285  geometry_msgs::Quaternion r;
286  r.x = rotation[0]; r.y = rotation[1]; r.z = rotation[2]; r.w = rotation[3];
287  geometry_msgs::Transform tr;
288  tr.translation = t;
289  tr.rotation = r;
290 
291  geometry_msgs::TransformStamped ts;
292  __seq_num_mutex->lock();
293  ts.header.seq = ++__seq_num;
294  __seq_num_mutex->unlock();
295  ts.header.stamp = ros::Time(time->get_sec(), time->get_nsec());
296  ts.header.frame_id = tfif->frame();
297  ts.child_frame_id = tfif->child_frame();
298  ts.transform = tr;
299 
300  return ts;
301 }
302 
303 void
304 RosTfThread::publish_static_transforms_to_ros()
305 {
306  std::list<fawkes::TransformInterface *>::iterator t;
307  fawkes::Time now(clock);
308 
309  if (__cfg_use_tf2) {
310 #ifdef HAVE_TF2_MSGS
311  tf2_msgs::TFMessage tmsg;
312  for (t = __tfifs.begin(); t != __tfifs.end(); ++t) {
313  fawkes::TransformInterface *tfif = *t;
314  tfif->read();
315  if (tfif->is_static_transform()) {
316  tmsg.transforms.push_back(create_transform_stamped(tfif, &now));
317  }
318  }
319  __pub_static_tf.publish(tmsg);
320 #endif
321  } else {
322  // date time stamps slightly into the future so they are valid
323  // for longer and need less frequent updates.
324  fawkes::Time timestamp = now + (__cfg_update_interval * 1.1);
325 
326  ::tf::tfMessage tmsg;
327  for (t = __tfifs.begin(); t != __tfifs.end(); ++t) {
328  fawkes::TransformInterface *tfif = *t;
329  tfif->read();
330  if (tfif->is_static_transform()) {
331  tmsg.transforms.push_back(create_transform_stamped(tfif, &timestamp));
332  }
333  }
334  __pub_tf.publish(tmsg);
335  }
336 }
337 
338 
339 void
340 RosTfThread::publish_transform_to_fawkes(const geometry_msgs::TransformStamped &ts, bool static_tf)
341 {
342  const geometry_msgs::Vector3 &t = ts.transform.translation;
343  const geometry_msgs::Quaternion &r = ts.transform.rotation;
344 
345  fawkes::Time time(ts.header.stamp.sec, ts.header.stamp.nsec / 1000);
346 
347  fawkes::tf::Transform tr(fawkes::tf::Quaternion(r.x, r.y, r.z, r.w),
348  fawkes::tf::Vector3(t.x, t.y, t.z));
350  st(tr, time, ts.header.frame_id, ts.child_frame_id);
351 
352  if (tf_publishers.find(ts.child_frame_id) == tf_publishers.end()) {
353  try {
354  __ros_frames.push_back(std::string("/tf/") + ts.child_frame_id);
355  tf_add_publisher("%s", ts.child_frame_id.c_str());
356  tf_publishers[ts.child_frame_id]->send_transform(st, static_tf);
357  } catch (Exception &e) {
358  __ros_frames.pop_back();
359  logger->log_warn(name(), "Failed to create Fawkes transform publisher for frame %s from ROS",
360  ts.child_frame_id.c_str());
361  logger->log_warn(name(), e);
362  }
363  } else {
364  tf_publishers[ts.child_frame_id]->send_transform(st, static_tf);
365  }
366 }
367 
368 
369 /** Callback function for ROS tf message subscription.
370  * @param msg incoming message
371  */
372 void
373 RosTfThread::tf_message_cb(const ros::MessageEvent<::tf::tfMessage const> &msg_evt)
374 {
375  MutexLocker lock(__tf_msg_queue_mutex);
376 
377  const ::tf::tfMessage::ConstPtr &msg = msg_evt.getConstMessage();
378  std::string authority = msg_evt.getPublisherName();
379 
380  if (authority == "") {
381  logger->log_warn(name(), "Message received without callerid");
382  } else if (authority != ros::this_node::getName()) {
383  __tf_msg_queues[__active_queue].push(msg);
384  }
385 }
386 
387 #ifdef HAVE_TF2_MSGS
388 void
389 RosTfThread::tf_message_cb_static(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt)
390 {
391  tf_message_cb(msg_evt, true);
392 }
393 
394 void
395 RosTfThread::tf_message_cb_dynamic(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt)
396 {
397  tf_message_cb(msg_evt, false);
398 }
399 
400 void
401 RosTfThread::tf_message_cb(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt, bool static_tf)
402 {
403  MutexLocker lock(__tf_msg_queue_mutex);
404 
405  const tf2_msgs::TFMessage::ConstPtr &msg = msg_evt.getConstMessage();
406  std::string authority = msg_evt.getPublisherName();
407 
408  if (authority == "") {
409  logger->log_warn(name(), "Message received without callerid");
410  } else if (authority != ros::this_node::getName()) {
411  __tf2_msg_queues[__active_queue].push(std::make_pair(static_tf, msg));
412  }
413 }
414 #endif
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: blackboard.cpp:230
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
virtual void bb_interface_data_changed(fawkes::Interface *interface)
BlackBoard data changed notification.
Definition: tf_thread.cpp:170
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
Mutex locking helper.
Definition: mutex_locker.h:33
void bbil_add_writer_interface(Interface *interface)
Add an interface to the writer addition/removal watch list.
A class for handling time.
Definition: time.h:91
virtual const char * what() const
Get primary string.
Definition: exception.cpp:661
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:218
Thread class encapsulation of pthreads.
Definition: thread.h:42
virtual void update_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Update BB event listener.
Definition: blackboard.cpp:203
TransformInterface Fawkes BlackBoard Interface.
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:79
void bbil_remove_writer_interface(Interface *interface)
Remove an interface to the writer addition/removal watch list.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
Thread aspect to access the transform system.
Definition: tf.h:42
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:45
Thread aspect to use blocked timing.
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:190
std::map< std::string, tf::TransformPublisher * > tf_publishers
Map of transform publishers created through the aspect.
Definition: tf.h:73
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*")
Add interface creation type to watch list.
char * child_frame() const
Get child_frame value.
Base class for exceptions in Fawkes.
Definition: exception.h:36
void set_clock(Clock *clock)
Set clock for this instance.
Definition: time.cpp:329
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:477
bool is_static_transform() const
Get static_transform value.
virtual void finalize()
Finalize the thread.
Definition: tf_thread.cpp:106
long get_nsec() const
Get nanoseconds.
Definition: time.h:113
Transform that contains a timestamp and frame IDs.
Definition: types.h:96
virtual void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: blackboard.cpp:244
bool has_writer() const
Check if there is a writer for the interface.
Definition: interface.cpp:834
const char * name() const
Get name of thread.
Definition: thread.h:95
double * translation() const
Get translation value.
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:687
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
const Time * timestamp() const
Get timestamp of last write.
Definition: interface.cpp:718
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
Open multiple interfaces for reading.
void bbil_remove_data_interface(Interface *interface)
Remove an interface to the data modification watch list.
long get_sec() const
Get seconds.
Definition: time.h:110
void bbil_add_reader_interface(Interface *interface)
Add an interface to the reader addition/removal watch list.
RosTfThread()
Constructor.
Definition: tf_thread.cpp:38
void tf_add_publisher(const char *frame_id_format,...)
Late add of publisher.
Definition: tf.cpp:198
virtual void loop()
Code to execute in the thread.
Definition: tf_thread.cpp:123
double * rotation() const
Get rotation value.
virtual void bb_interface_reader_removed(fawkes::Interface *interface, unsigned int instance_serial)
A reading instance has been closed for a watched interface.
Definition: tf_thread.cpp:244
void set_time(const timeval *tv)
Sets the time.
Definition: time.cpp:262
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
LockPtr< ros::NodeHandle > rosnode
Central ROS node handle.
Definition: ros.h:48
void lock()
Lock this mutex.
Definition: mutex.cpp:89
Time & stamp()
Set this time to the current time.
Definition: time.cpp:783
unsigned int num_readers() const
Get the number of readers.
Definition: interface.cpp:863
virtual void bb_interface_writer_removed(fawkes::Interface *interface, unsigned int instance_serial)
A writing instance has been closed for a watched interface.
Definition: tf_thread.cpp:237
Mutex mutual exclusion lock.
Definition: mutex.h:32
virtual void init()
Initialize the thread.
Definition: tf_thread.cpp:60
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
char * frame() const
Get frame value.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual void bb_interface_created(const char *type, const char *id)
BlackBoard interface created notification.
Definition: tf_thread.cpp:204
void bbil_remove_reader_interface(Interface *interface)
Remove an interface to the reader addition/removal watch list.
virtual ~RosTfThread()
Destructor.
Definition: tf_thread.cpp:50
BlackBoard interface listener.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
void bbil_add_data_interface(Interface *interface)
Add an interface to the data modification watch list.
virtual void close(Interface *interface)=0
Close interface.