Fawkes API  Fawkes Development Version
pcl_db_roscomm_thread.cpp
1 
2 /***************************************************************************
3  * pcl_db_merge_roscomm_thread.cpp - ROS communication for pcl-db-merge
4  *
5  * Created: Thu Dec 06 13:54:45 2012
6  * Copyright 2012 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "pcl_db_roscomm_thread.h"
23 #include <interfaces/PclDatabaseMergeInterface.h>
24 #include <interfaces/PclDatabaseRetrieveInterface.h>
25 #include <interfaces/PclDatabaseStoreInterface.h>
26 
27 #include <core/threading/wait_condition.h>
28 #include <blackboard/utils/on_update_waker.h>
29 
30 #include <ros/ros.h>
31 
32 #include <pcl/pcl_config.h>
33 #if PCL_VERSION_COMPARE(>=,1,7,0)
34 # include <pcl/PCLPointCloud2.h>
35 # include <pcl/common/conversions.h>
36 # include <pcl_conversions/pcl_conversions.h>
37 #endif
38 
39 using namespace fawkes;
40 
41 #define CFG_PREFIX "/perception/pcl-db-merge/ros/"
42 
43 /** @class PointCloudDBROSCommThread "pcl_db_roscomm_thread.h"
44  * Thread to merge point clouds from database on request.
45  * @author Tim Niemueller
46  */
47 
48 
49 /** Constructor. */
51  : Thread("PointCloudDBROSCommThread", Thread::OPMODE_WAITFORWAKEUP),
52  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_WORLDSTATE)
53 {
54 }
55 
56 
57 /** Destructor. */
59 {
60 }
61 
62 
63 void
65 {
66  cfg_store_pcl_id_ = config->get_string("/perception/pcl-db-roscomm/store-pcl-id");
67 
68  merge_if_ =
70 
71  merge_waitcond_ = new WaitCondition();
72  merge_update_waker_ = new BlackBoardOnUpdateWaker(blackboard, merge_if_, this);
73 
74  retrieve_if_ =
75  blackboard->open_for_reading<PclDatabaseRetrieveInterface>("PCL Database Retrieve");
76 
77  retrieve_waitcond_ = new WaitCondition();
78  retrieve_update_waker_ = new BlackBoardOnUpdateWaker(blackboard, retrieve_if_, this);
79 
80  store_if_ =
82 
83  store_waitcond_ = new WaitCondition();
84  store_update_waker_ = new BlackBoardOnUpdateWaker(blackboard, store_if_, this);
85 
86  srv_merge_ = new ros::ServiceServer();
87  srv_retrieve_ = new ros::ServiceServer();
88  srv_record_ = new ros::ServiceServer();
89  srv_store_ = new ros::ServiceServer();
90 
91  *srv_merge_ = rosnode->advertiseService("/pcl_db/merge",
92  &PointCloudDBROSCommThread::merge_cb, this);
93 
94  *srv_retrieve_ = rosnode->advertiseService("/pcl_db/retrieve",
95  &PointCloudDBROSCommThread::retrieve_cb, this);
96 
97  *srv_store_ = rosnode->advertiseService("/pcl_db/store",
98  &PointCloudDBROSCommThread::store_cb, this);
99 }
100 
101 void
103 {
104  srv_merge_->shutdown();
105  srv_retrieve_->shutdown();
106  srv_store_->shutdown();
107  srv_record_->shutdown();
108  delete srv_merge_;
109  delete srv_retrieve_;
110  delete srv_store_;
111  delete srv_record_;
112 
113  delete merge_update_waker_;
114  delete retrieve_update_waker_;
115  delete store_update_waker_;
116  delete merge_waitcond_;
117  delete retrieve_waitcond_;
118  delete store_waitcond_;
119 
120  blackboard->close(merge_if_);
121  blackboard->close(retrieve_if_);
122  blackboard->close(store_if_);
123 }
124 
125 
126 void
128 {
129  merge_if_->read();
130  if (merge_if_->changed()) {
131  logger->log_info(name(), "Merge interface has changed");
132 
133  logger->log_info(name(), "%u vs. %u final: %s",
134  merge_if_->msgid(), merge_msg_id_, merge_if_->is_final() ? "yes" : "no");
135 
136  if ((merge_if_->msgid() == merge_msg_id_) && merge_if_->is_final()) {
137  logger->log_info(name(), "Merge final");
138  merge_waitcond_->wake_all();
139  }
140  }
141 
142  retrieve_if_->read();
143  if (retrieve_if_->changed()) {
144  logger->log_info(name(), "Retrieve interface has changed");
145 
146  logger->log_info(name(), "%u vs. %u final: %s",
147  retrieve_if_->msgid(), retrieve_msg_id_,
148  retrieve_if_->is_final() ? "yes" : "no");
149 
150  if ((retrieve_if_->msgid() == retrieve_msg_id_) && retrieve_if_->is_final()) {
151  logger->log_info(name(), "Retrieve final");
152  retrieve_waitcond_->wake_all();
153  }
154  }
155 
156  store_if_->read();
157  if (store_if_->changed()) {
158  logger->log_info(name(), "Store interface has changed");
159 
160  logger->log_info(name(), "%u vs. %u final: %s",
161  store_if_->msgid(), store_msg_id_,
162  store_if_->is_final() ? "yes" : "no");
163 
164  if ((store_if_->msgid() == store_msg_id_) && store_if_->is_final()) {
165  logger->log_info(name(), "Store final");
166  store_waitcond_->wake_all();
167  }
168  }
169 }
170 
171 bool
172 PointCloudDBROSCommThread::merge_cb(hybris_c1_msgs::MergePointClouds::Request &req,
173  hybris_c1_msgs::MergePointClouds::Response &resp)
174 {
177 
178  if (req.timestamps.size() > mm->maxlenof_timestamps()) {
179  logger->log_warn(name(), "Number of requested timestamps (%zu) "
180  "exceeds maximum number allowed (%zu)",
181  req.timestamps.size(), mm->maxlenof_timestamps());
182  resp.ok = false;
183  resp.error = "Number of requested timestamps exceeds maximum number allowed";
184  delete mm;
185  return true;
186  }
187  if (req.timestamps.empty()) {
188  logger->log_warn(name(), "No times given in request");
189  resp.ok = false;
190  resp.error = "No times given in request";
191  delete mm;
192  return true;
193  }
194 
195  size_t num_timestamps = mm->maxlenof_timestamps();
196  std::vector<int64_t> timestamps(num_timestamps, 0);
197  for (size_t i = 0; i < req.timestamps.size(); ++i) {
198  timestamps[i] = (int64_t)req.timestamps[i].sec * 1000L
199  + (int64_t)req.timestamps[i].nsec / 1000000L;
200  }
201  sort(timestamps.begin(), timestamps.begin() + req.timestamps.size());
202  mm->set_timestamps(&timestamps[0]);
203  mm->set_database(req.database.c_str());
204  mm->set_collection(req.collection.c_str());
205 
206  mm->ref();
207  merge_if_->msgq_enqueue(mm);
208  merge_msg_id_ = mm->id();
209  mm->unref();
210 
211  // wait for result
212  merge_waitcond_->wait();
213 
214  // Check result
215  merge_if_->read();
216  if (merge_if_->is_final() && (std::string("") == merge_if_->error())) {
217  resp.ok = true;
218  } else {
219  resp.ok = false;
220  resp.error = merge_if_->error();
221  }
222  return true;
223 }
224 
225 
226 bool
227 PointCloudDBROSCommThread::retrieve_cb(hybris_c1_msgs::RetrievePointCloud::Request &req,
228  hybris_c1_msgs::RetrievePointCloud::Response &resp)
229 {
232 
233  int64_t timestamp = (int64_t)req.timestamp.sec * 1000L
234  + (int64_t)req.timestamp.nsec / 1000000L;
235 
236  logger->log_info(name(), "Restoring %lli from %s.%s", timestamp,
237  req.database.c_str(), req.collection.c_str());
238 
239  mm->set_timestamp(timestamp);
240  mm->set_database(req.database.c_str());
241  mm->set_collection(req.collection.c_str());
242  mm->set_target_frame(req.target_frame.c_str());
243  mm->set_original_timestamp(req.original_timestamp);
244 
245  mm->ref();
246  retrieve_if_->msgq_enqueue(mm);
247  retrieve_msg_id_ = mm->id();
248  mm->unref();
249 
250  // wait for result
251  retrieve_waitcond_->wait();
252 
253  // Check result
254  retrieve_if_->read();
255  if (retrieve_if_->is_final() && (std::string("") == retrieve_if_->error())) {
256  resp.ok = true;
257  } else {
258  resp.ok = false;
259  resp.error = retrieve_if_->error();
260  }
261  return true;
262 }
263 
264 
265 bool
266 PointCloudDBROSCommThread::store_cb(hybris_c1_msgs::StorePointCloud::Request &req,
267  hybris_c1_msgs::StorePointCloud::Response &resp)
268 {
269 #if PCL_VERSION_COMPARE(>=,1,7,0)
272 
273  logger->log_info(name(), "Storing to %s.%s",
274  (req.database == "") ? "<default>" : req.database.c_str(),
275  (req.collection == "") ? "<default>" : req.collection.c_str());
276 
277  pcl::PCLPointCloud2 pcl_in;
278  pcl_conversions::moveToPCL(req.pointcloud, pcl_in);
279 
281  pcl_xyz(new pcl::PointCloud<pcl::PointXYZ>());
283  pcl_xyzrgb(new pcl::PointCloud<pcl::PointXYZRGB>());
284 
285  std::string fields_in = pcl::getFieldsList(pcl_in);
286  std::string fields_xyz = pcl::getFieldsList(**pcl_xyz);
287  std::string fields_xyzrgb = pcl::getFieldsList(**pcl_xyzrgb);
288 
289  if (fields_in == fields_xyz) {
290  pcl::fromPCLPointCloud2(pcl_in, **pcl_xyz);
291  pcl_manager->add_pointcloud(cfg_store_pcl_id_.c_str(), pcl_xyz);
292 
293  } else if (fields_in == fields_xyzrgb) {
294  pcl::fromPCLPointCloud2(pcl_in, **pcl_xyzrgb);
295  pcl_manager->add_pointcloud(cfg_store_pcl_id_.c_str(), pcl_xyzrgb);
296 
297  } else {
298  resp.ok = false;
299  resp.error = "Unsupported point cloud type";
300  }
301 
302  mm->set_pcl_id(cfg_store_pcl_id_.c_str());
303  mm->set_database(req.database.c_str());
304  mm->set_collection(req.collection.c_str());
305 
306  mm->ref();
307  store_if_->msgq_enqueue(mm);
308  store_msg_id_ = mm->id();
309  mm->unref();
310 
311  // wait for result
312  store_waitcond_->wait();
313 
314  pcl_manager->remove_pointcloud(cfg_store_pcl_id_.c_str());
315 
316  // Check result
317  store_if_->read();
318  if (store_if_->is_final() && (std::string("") == store_if_->error())) {
319  resp.ok = true;
320  } else {
321  resp.ok = false;
322  resp.error = store_if_->error();
323  }
324  return true;
325 #else
326  logger->log_warn(name(), "Cannot store point clouds, PCL < 1.7.0");
327  resp.ok = false;
328  resp.error = "Storing only supported with PCL >= 1.7.0";
329  return true;
330 #endif
331 }
332 
333 
334 bool
335 PointCloudDBROSCommThread::record_cb(hybris_c1_msgs::RecordData::Request &req,
336  hybris_c1_msgs::RecordData::Response &resp)
337 {
338  logger->log_info(name(), "Recording ordered for %f sec", req.range.toSec());
339  ros::Time begin = ros::Time::now();
340  ros::Time end = begin + req.range;
341  ros::Time::sleepUntil(end);
342  resp.begin = begin;
343  resp.end = end;
344  resp.ok = true;
345  logger->log_info(name(), "Recording done");
346  return true;
347 }
348 
PclDatabaseMergeInterface Fawkes BlackBoard Interface.
virtual ~PointCloudDBROSCommThread()
Destructor.
Wait until a given condition holds.
unsigned int id() const
Get message ID.
Definition: message.cpp:197
RetrieveMessage Fawkes BlackBoard Interface Message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:99
void remove_pointcloud(const char *id)
Remove the point cloud.
Wake threads when a blackboard interface is updated.
void set_target_frame(const char *new_target_frame)
Set target_frame value.
PclDatabaseRetrieveInterface Fawkes BlackBoard Interface.
void set_database(const char *new_database)
Set database value.
Fawkes library namespace.
void wake_all()
Wake up all waiting threads.
void set_pcl_id(const char *new_pcl_id)
Set pcl_id value.
void set_collection(const char *new_collection)
Set collection value.
void add_pointcloud(const char *id, RefPtr< pcl::PointCloud< PointT > > cloud)
Add point cloud.
Thread class encapsulation of pthreads.
Definition: thread.h:42
char * error() const
Get error value.
void set_timestamp(const int64_t new_timestamp)
Set timestamp value.
void set_database(const char *new_database)
Set database value.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
MergeMessage Fawkes BlackBoard Interface Message.
size_t maxlenof_timestamps() const
Get maximum length of timestamps value.
Thread aspect to use blocked timing.
virtual void init()
Initialize the thread.
virtual void finalize()
Finalize the thread.
void set_collection(const char *new_collection)
Set collection value.
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:50
void set_collection(const char *new_collection)
Set collection value.
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:477
uint32_t msgid() const
Get msgid value.
void ref()
Increment reference count.
Definition: refcount.cpp:70
uint32_t msgid() const
Get msgid value.
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void wait()
Wait for the condition forever.
bool changed() const
Check if data has been changed.
Definition: interface.cpp:796
unsigned int msgq_enqueue(Message *message)
Enqueue message at end of queue.
Definition: interface.cpp:903
RefPtr<> is a reference-counting shared smartpointer.
Definition: refptr.h:49
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
void set_timestamps(unsigned int index, const int64_t new_timestamps)
Set timestamps value at given index.
LockPtr< ros::NodeHandle > rosnode
Central ROS node handle.
Definition: ros.h:48
void set_database(const char *new_database)
Set database value.
StoreMessage Fawkes BlackBoard Interface Message.
void set_original_timestamp(const bool new_original_timestamp)
Set original_timestamp value.
PclDatabaseStoreInterface Fawkes BlackBoard Interface.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
virtual void loop()
Code to execute in the thread.
char * error() const
Get error value.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
virtual void close(Interface *interface)=0
Close interface.