23 #include "mongodb_log_pcl_thread.h" 26 #include <core/threading/mutex_locker.h> 27 #include <utils/time/wait.h> 30 #include <mongo/client/dbclient.h> 31 #include <mongo/client/gridfs.h> 37 using namespace mongo;
47 :
Thread(
"MongoLogPointCloudThread",
Thread::OPMODE_CONTINUOUS)
70 cfg_storage_interval_ =
73 cfg_chunk_size_ = 2097152;
75 cfg_chunk_size_ =
config->
get_uint(
"/plugins/mongodb-log/pointclouds/chunk-size");
79 cfg_flush_after_write_ =
false;
81 cfg_flush_after_write_ =
82 config->
get_uint(
"/plugins/mongodb-log/pointclouds/flush-after-write");
85 std::vector<std::string> includes;
89 std::vector<std::string> excludes;
96 gridfs_ =
new GridFS(*mongodb_, database_);
103 std::vector<std::string>::iterator p;
104 std::vector<std::string>::iterator f;
105 for (p = pcls.begin(); p != pcls.end(); ++p) {
106 bool include = includes.empty();
108 for (f = includes.begin(); f != includes.end(); ++f) {
109 if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
117 for (f = excludes.begin(); f != excludes.end(); ++f) {
118 if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
132 std::string topic_name = std::string(
"PointClouds.") + *p;
134 while ((pos = topic_name.find_first_of(
" -", pos)) != std::string::npos) {
135 topic_name.replace(pos, 1,
"_");
139 pi.topic_name = topic_name;
143 std::string frame_id;
144 unsigned int width, height;
147 adapter_->
get_info(*p, width, height, frame_id, is_dense, fieldinfo);
148 pi.msg.header.frame_id = frame_id;
149 pi.msg.width = width;
150 pi.msg.height = height;
151 pi.msg.is_dense = is_dense;
152 pi.msg.fields.clear();
153 pi.msg.fields.resize(fieldinfo.size());
154 for (
unsigned int i = 0; i < fieldinfo.size(); ++i) {
155 pi.msg.fields[i].name = fieldinfo[i].name;
156 pi.msg.fields[i].offset = fieldinfo[i].offset;
157 pi.msg.fields[i].datatype = fieldinfo[i].datatype;
158 pi.msg.fields[i].count = fieldinfo[i].count;
164 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
165 mutex_ =
new Mutex();
193 std::map<std::string, PointCloudInfo>::iterator p;
194 unsigned int num_stored = 0;
195 for (p = pcls_.begin(); p != pcls_.end(); ++p) {
196 PointCloudInfo &pi = p->second;
197 std::string frame_id;
198 unsigned int width, height;
200 size_t point_size, num_points;
202 adapter_->
get_data(p->first, frame_id, width, height, time,
203 &point_data, point_size, num_points);
204 size_t data_size = point_size * num_points;
206 if (pi.last_sent != time) {
211 BSONObjBuilder document;
212 document.append(
"timestamp", (
long long) time.
in_msec());
213 BSONObjBuilder subb(document.subobjStart(
"pointcloud"));
214 subb.append(
"frame_id", pi.msg.header.frame_id);
215 subb.append(
"is_dense", pi.msg.is_dense);
216 subb.append(
"width", width);
217 subb.append(
"height", height);
218 subb.append(
"point_size", (
unsigned int)point_size);
219 subb.append(
"num_points", (
unsigned int)num_points);
221 std::stringstream
name;
222 name << pi.topic_name <<
"_" << time.
in_msec();
223 subb.append(
"data", gridfs_->storeFile((
char*) point_data, data_size, name.str()));
225 BSONArrayBuilder subb2(subb.subarrayStart(
"field_info"));
226 for (
unsigned int i = 0; i < pi.msg.fields.size(); i++) {
227 BSONObjBuilder fi(subb2.subobjStart());
228 fi.append(
"name", pi.msg.fields[i].name);
229 fi.append(
"offset", pi.msg.fields[i].offset);
230 fi.append(
"datatype", pi.msg.fields[i].datatype);
231 fi.append(
"count", pi.msg.fields[i].count);
236 collection_ = database_ +
"." + pi.topic_name;
238 mongodb_->insert(collection_, document.obj());
240 }
catch (mongo::DBException &e) {
242 collection_.c_str(), e.what());
246 float diff = (end - &
start) * 1000.;
248 p->first.c_str(), time.
in_msec(), diff);
261 num_stored, pcls_.size(), (loop_end - &loop_start) * 1000.);
263 if (cfg_flush_after_write_) {
265 BSONObjBuilder flush_cmd;
267 flush_cmd.append(
"fsync", 1);
268 flush_cmd.append(
"async", 1);
270 if (reply.hasField(
"ok")) {
271 if (! reply[
"ok"].trueValue()) {
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
void wait()
Wait until minimum loop time has been reached.
std::vector< std::string > get_pointcloud_list() const
Get list of point cloud IDs.
MongoLogPointCloudThread()
Constructor.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Fawkes library namespace.
void unlock()
Unlock the mutex.
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
A class for handling time.
Thread class encapsulation of pthreads.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Logger * logger
This is the Logger member used to access the logger.
Clock * clock
By means of this member access to the clock is given.
long in_msec() const
Convert the stored time into milli-seconds.
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Base class for exceptions in Fawkes.
const char * name() const
Get name of thread.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void get_info(const std::string &id, unsigned int &width, unsigned int &height, std::string &frame_id, bool &is_dense, V_PointFieldInfo &pfi)
Get info about point cloud.
void mark_start()
Mark start of loop.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
Point cloud adapter class.
virtual ~MongoLogPointCloudThread()
Destructor.
void lock()
Lock this mutex.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Mutex mutual exclusion lock.
virtual void finalize()
Finalize the thread.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
Configuration * config
This is the Configuration member used to access the configuration.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual void loop()
Code to execute in the thread.
void get_data(const std::string &id, std::string &frame_id, unsigned int &width, unsigned int &height, fawkes::Time &time, void **data_ptr, size_t &point_size, size_t &num_points)
Get current data of point cloud.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
void start(bool wait=true)
Call this method to start the thread.
virtual void init()
Initialize the thread.