Fawkes API  Fawkes Development Version
pcl_db_retrieve_thread.cpp
1 
2 /***************************************************************************
3  * pcl_db_retrieve_thread.cpp - Restore and retrieve point clouds from MongoDB
4  *
5  * Created: Thu Aug 22 12:04:09 2013
6  * Copyright 2012 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "pcl_db_retrieve_thread.h"
23 #include <interfaces/PclDatabaseRetrieveInterface.h>
24 
25 #include <blackboard/utils/on_message_waker.h>
26 
27 using namespace fawkes;
28 
29 /** @class PointCloudDBRetrieveThread "pcl_db_retrieve_thread.h"
30  * Thread to retrieve point clouds from database on request.
31  * @author Tim Niemueller
32  */
33 
34 
35 /** Constructor. */
37  : Thread("PointCloudDBRetrieveThread", Thread::OPMODE_WAITFORWAKEUP)
38 {
39 }
40 
41 
42 /** Destructor. */
44 {
45 }
46 
47 
48 void
50 {
51  pl_xyz_ = NULL;
52  pl_xyzrgb_ = NULL;
53  retrieve_if_ = NULL;
54  msg_waker_ = NULL;
55 
56  cfg_database_ = config->get_string(CFG_PREFIX"database-name");
57  cfg_output_id_ = config->get_string(CFG_PREFIX_RETRV"output-pcl-id");
58  cfg_original_id_ = config->get_string(CFG_PREFIX_RETRV"original-pcl-id");
59 
60  foutput_ = new pcl::PointCloud<pcl::PointXYZRGB>();
61  foutput_->is_dense = false;
62  pcl_manager->add_pointcloud<pcl::PointXYZRGB>(cfg_output_id_.c_str(), foutput_);
63  output_ = pcl_utils::cloudptr_from_refptr(foutput_);
64 
65  foriginal_ = new pcl::PointCloud<pcl::PointXYZRGB>();
66  foriginal_->is_dense = false;
67  pcl_manager->add_pointcloud<pcl::PointXYZRGB>(cfg_original_id_.c_str(), foriginal_);
68  original_ = pcl_utils::cloudptr_from_refptr(foriginal_);
69 
70 
71  pl_xyz_ =
74  original_, output_);
75 
76  pl_xyzrgb_ =
79  original_, output_);
80 
81  try {
82  retrieve_if_ =
83  blackboard->open_for_writing<PclDatabaseRetrieveInterface>("PCL Database Retrieve");
84 
85  msg_waker_ = new BlackBoardOnMessageWaker(blackboard, retrieve_if_, this);
86  } catch (Exception &e) {
87  finalize();
88  throw;
89  }
90 }
91 
92 void
94 {
95  delete msg_waker_;
96  blackboard->close(retrieve_if_);
97 
98  delete pl_xyz_;
99  delete pl_xyzrgb_;
100 
101  output_.reset();
102  pcl_manager->remove_pointcloud(cfg_output_id_.c_str());
103  foutput_.reset();
104 
105  original_.reset();
106  pcl_manager->remove_pointcloud(cfg_original_id_.c_str());
107  foriginal_.reset();
108 }
109 
110 
111 void
113 {
114  long long timestamp = 0;
115  std::vector<long long> times(1);
116  std::string database;
117  std::string collection;
118  std::string target_frame;
119  bool original_timestamp;
120 
121  if (retrieve_if_->msgq_empty()) return;
122 
124  retrieve_if_->msgq_first_safe(msg))
125  {
126  retrieve_if_->set_final(false);
127  retrieve_if_->set_msgid(msg->id());
128  retrieve_if_->set_error("");
129  retrieve_if_->write();
130 
131  timestamp = msg->timestamp();
132  times[0] = timestamp;
133  database =
134  (strcmp(msg->database(), "") != 0) ? msg->database() : cfg_database_;
135  collection = msg->collection();
136  target_frame = msg->target_frame();
137  original_timestamp = msg->is_original_timestamp();
138  } else {
139  logger->log_warn(name(), "Unhandled message received");
140  retrieve_if_->msgq_pop();
141  return;
142  }
143  retrieve_if_->msgq_pop();
144 
145  logger->log_info(name(), "Restoring from '%s' for the time %lli",
146  collection.c_str(), timestamp);
147 
148  ApplicabilityStatus st_xyz, st_xyzrgb;
149  long long actual_time = 0;
150 
151  pl_xyz_->applicable(times, database, collection);
152  if ((st_xyz = pl_xyz_->applicable(times, database, collection)) == APPLICABLE) {
153  logger->log_info(name(), "Restoring XYZ");
154  pl_xyz_->retrieve(timestamp, database, collection, target_frame, actual_time);
155  } else if ((st_xyzrgb = pl_xyzrgb_->applicable(times, database, collection)) == APPLICABLE) {
156  logger->log_info(name(), "Restoring XYZRGB");
157  pl_xyzrgb_->retrieve(timestamp, database, collection, target_frame, actual_time);
158  if (! original_timestamp) {
159  Time now(clock);
160  pcl_utils::set_time(foutput_, now);
161  }
162  } else {
163  logger->log_warn(name(), "No applicable merging pipeline known:");
164  logger->log_warn(name(), " XYZ: %s", to_string(st_xyz));
165  logger->log_warn(name(), " XYZ/RGB: %s", to_string(st_xyzrgb));
166  retrieve_if_->set_error("No applicable merging pipeline known (for details see log)");
167  }
168 
169  if (actual_time != 0) {
170  if (original_timestamp) {
171  Time now((long)actual_time);
172  Time last;
173  pcl_utils::get_time(foutput_, last);
174  // force sending with one microsecond offset if same than last time
175  if (last == now) now += 1L;
176  pcl_utils::set_time(foutput_, now);
177  } else {
178  Time now(clock);
179  pcl_utils::set_time(foutput_, now);
180  }
181  }
182 
183  retrieve_if_->set_final(true);
184  retrieve_if_->write();
185 }
186 
virtual void loop()
Code to execute in the thread.
RetrieveMessage Fawkes BlackBoard Interface Message.
bool msgq_empty()
Check if queue is empty.
Definition: interface.cpp:1048
void set_final(const bool new_final)
Set final value.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
void set_error(const char *new_error)
Set error value.
void remove_pointcloud(const char *id)
Remove the point cloud.
PclDatabaseRetrieveInterface Fawkes BlackBoard Interface.
Fawkes library namespace.
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:51
Wake threads on receiving a blackboard message.
A class for handling time.
Definition: time.h:91
void retrieve(long long timestamp, std::string &database, std::string &collection, std::string &target_frame, long long &actual_time)
Retrieve point clouds.
void add_pointcloud(const char *id, RefPtr< pcl::PointCloud< PointT > > cloud)
Add point cloud.
Thread class encapsulation of pthreads.
Definition: thread.h:42
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:500
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:45
void reset()
Reset pointer.
Definition: refptr.h:464
void msgq_pop()
Erase first message from queue.
Definition: interface.cpp:1193
void set_msgid(const uint32_t new_msgid)
Set msgid value.
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:50
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual ~PointCloudDBRetrieveThread()
Destructor.
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void finalize()
Finalize the thread.
MessageType * msgq_first_safe(MessageType *&msg)
Get first message casted to the desired type without exceptions.
Definition: interface.h:302
tf::Transformer * tf_listener
This is the transform listener which saves transforms published by other threads in the system...
Definition: tf.h:70
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
ApplicabilityStatus applicable(std::vector< long long > &times, std::string &database, std::string &collection)
Check if this pipeline instance is suitable for the given times.
virtual void init()
Initialize the thread.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
virtual void close(Interface *interface)=0
Close interface.