Fawkes API  Fawkes Development Version
mongodb_log_pcl_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_pcl_thread.cpp - Thread to log point clouds to MongoDB
4  *
5  * Created: Mon Nov 07 02:58:40 2011
6  * Copyright 2011-2012 Tim Niemueller [www.niemueller.de]
7  * 2012 Bastian Klingen
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "mongodb_log_pcl_thread.h"
24 
25 // Fawkes
26 #include <core/threading/mutex_locker.h>
27 #include <utils/time/wait.h>
28 
29 // from MongoDB
30 #include <mongo/client/dbclient.h>
31 #include <mongo/client/gridfs.h>
32 
33 #include <fnmatch.h>
34 #include <unistd.h>
35 
36 using namespace fawkes;
37 using namespace mongo;
38 
39 /** @class MongoLogPointCloudThread "mongodb_log_pcl_thread.h"
40  * Thread to store point clouds to MongoDB.
41  * @author Tim Niemueller
42  * @author Bastian Klingen
43  */
44 
45 /** Constructor. */
47  : Thread("MongoLogPointCloudThread", Thread::OPMODE_CONTINUOUS)
48 {
50 }
51 
52 /** Destructor. */
54 {
55 }
56 
57 
58 
59 void
61 {
62  database_ = "fflog";
63  try {
64  database_ = config->get_string("/plugins/mongodb-log/database");
65  } catch (Exception &e) {
66  logger->log_info(name(), "No database configured, writing to %s",
67  database_.c_str());
68  }
69 
70  cfg_storage_interval_ =
71  config->get_float("/plugins/mongodb-log/pointclouds/storage-interval");
72 
73  cfg_chunk_size_ = 2097152; // 2 MB
74  try {
75  cfg_chunk_size_ = config->get_uint("/plugins/mongodb-log/pointclouds/chunk-size");
76  } catch (Exception &e) {} // ignored, use default
77  logger->log_info(name(), "Chunk size: %u", cfg_chunk_size_);
78 
79  cfg_flush_after_write_ = false;
80  try {
81  cfg_flush_after_write_ =
82  config->get_uint("/plugins/mongodb-log/pointclouds/flush-after-write");
83  } catch (Exception &e) {} // ignored, use default
84 
85  std::vector<std::string> includes;
86  try {
87  includes = config->get_strings("/plugins/mongodb-log/pointclouds/includes");
88  } catch (Exception &e) {} // ignored, no include rules
89  std::vector<std::string> excludes;
90 
91  try {
92  excludes = config->get_strings("/plugins/mongodb-log/pointclouds/excludes");
93  } catch (Exception &e) {} // ignored, no include rules
94 
95  mongodb_ = mongodb_client;
96  gridfs_ = new GridFS(*mongodb_, database_);
97  //gridfs_->setChunkSize(cfg_chunk_size_);
98 
99  adapter_ = new PointCloudAdapter(pcl_manager, logger);
100 
101  std::vector<std::string> pcls = pcl_manager->get_pointcloud_list();
102 
103  std::vector<std::string>::iterator p;
104  std::vector<std::string>::iterator f;
105  for (p = pcls.begin(); p != pcls.end(); ++p) {
106  bool include = includes.empty();
107  if (! include) {
108  for (f = includes.begin(); f != includes.end(); ++f) {
109  if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
110  logger->log_debug(name(), "Include match %s to %s", f->c_str(), p->c_str());
111  include = true;
112  break;
113  }
114  }
115  }
116  if (include) {
117  for (f = excludes.begin(); f != excludes.end(); ++f) {
118  if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
119  logger->log_debug(name(), "Exclude match %s to %s", f->c_str(), p->c_str());
120  include = false;
121  break;
122  }
123  }
124  }
125  if (! include) {
126  logger->log_info(name(), "Excluding point cloud %s", p->c_str());
127  continue;
128  }
129 
130  PointCloudInfo pi;
131 
132  std::string topic_name = std::string("PointClouds.") + *p;
133  size_t pos = 0;
134  while ((pos = topic_name.find_first_of(" -", pos)) != std::string::npos) {
135  topic_name.replace(pos, 1, "_");
136  pos = pos + 1;
137  }
138 
139  pi.topic_name = topic_name;
140 
141  logger->log_info(name(), "MongoLog of point cloud %s", p->c_str());
142 
143  std::string frame_id;
144  unsigned int width, height;
145  bool is_dense;
147  adapter_->get_info(*p, width, height, frame_id, is_dense, fieldinfo);
148  pi.msg.header.frame_id = frame_id;
149  pi.msg.width = width;
150  pi.msg.height = height;
151  pi.msg.is_dense = is_dense;
152  pi.msg.fields.clear();
153  pi.msg.fields.resize(fieldinfo.size());
154  for (unsigned int i = 0; i < fieldinfo.size(); ++i) {
155  pi.msg.fields[i].name = fieldinfo[i].name;
156  pi.msg.fields[i].offset = fieldinfo[i].offset;
157  pi.msg.fields[i].datatype = fieldinfo[i].datatype;
158  pi.msg.fields[i].count = fieldinfo[i].count;
159  }
160 
161  pcls_[*p] = pi;
162  }
163 
164  wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
165  mutex_ = new Mutex();
166 }
167 
168 
169 bool
171 {
172  mutex_->lock();
173  return true;
174 }
175 
176 
177 void
179 {
180  delete adapter_;
181  delete gridfs_;
182  delete wait_;
183  delete mutex_;
184 }
185 
186 
187 void
189 {
190  MutexLocker lock(mutex_);
191  fawkes::Time loop_start(clock);
192  wait_->mark_start();
193  std::map<std::string, PointCloudInfo>::iterator p;
194  unsigned int num_stored = 0;
195  for (p = pcls_.begin(); p != pcls_.end(); ++p) {
196  PointCloudInfo &pi = p->second;
197  std::string frame_id;
198  unsigned int width, height;
199  void *point_data;
200  size_t point_size, num_points;
201  fawkes::Time time;
202  adapter_->get_data(p->first, frame_id, width, height, time,
203  &point_data, point_size, num_points);
204  size_t data_size = point_size * num_points;
205 
206  if (pi.last_sent != time) {
207  pi.last_sent = time;
208 
210 
211  BSONObjBuilder document;
212  document.append("timestamp", (long long) time.in_msec());
213  BSONObjBuilder subb(document.subobjStart("pointcloud"));
214  subb.append("frame_id", pi.msg.header.frame_id);
215  subb.append("is_dense", pi.msg.is_dense);
216  subb.append("width", width);
217  subb.append("height", height);
218  subb.append("point_size", (unsigned int)point_size);
219  subb.append("num_points", (unsigned int)num_points);
220 
221  std::stringstream name;
222  name << pi.topic_name << "_" << time.in_msec();
223  subb.append("data", gridfs_->storeFile((char*) point_data, data_size, name.str()));
224 
225  BSONArrayBuilder subb2(subb.subarrayStart("field_info"));
226  for (unsigned int i = 0; i < pi.msg.fields.size(); i++) {
227  BSONObjBuilder fi(subb2.subobjStart());
228  fi.append("name", pi.msg.fields[i].name);
229  fi.append("offset", pi.msg.fields[i].offset);
230  fi.append("datatype", pi.msg.fields[i].datatype);
231  fi.append("count", pi.msg.fields[i].count);
232  fi.doneFast();
233  }
234  subb2.doneFast();
235  subb.doneFast();
236  collection_ = database_ + "." + pi.topic_name;
237  try {
238  mongodb_->insert(collection_, document.obj());
239  ++num_stored;
240  } catch (mongo::DBException &e) {
241  logger->log_warn(this->name(), "Failed to insert into %s: %s",
242  collection_.c_str(), e.what());
243  }
244 
245  fawkes::Time end(clock);
246  float diff = (end - &start) * 1000.;
247  logger->log_debug(this->name(), "Stored point cloud %s (time %li) in %.1f ms",
248  p->first.c_str(), time.in_msec(), diff);
249 
250  } else {
251  logger->log_debug(this->name(), "Point cloud %s did not change",
252  p->first.c_str());
253  //adapter_->close(p->first);
254  }
255 
256  }
257  mutex_->unlock();
258  // -1 to subtract "NO PARENT" pseudo cache
259  fawkes::Time loop_end(clock);
260  logger->log_debug(name(), "Stored %u of %zu point clouds in %.1f ms",
261  num_stored, pcls_.size(), (loop_end - &loop_start) * 1000.);
262 
263  if (cfg_flush_after_write_) {
264  // flush database
265  BSONObjBuilder flush_cmd;
266  BSONObj reply;
267  flush_cmd.append("fsync", 1);
268  flush_cmd.append("async", 1);
269  mongodb_client->runCommand("admin", flush_cmd.obj(), reply);
270  if (reply.hasField("ok")) {
271  if (! reply["ok"].trueValue()) {
272  logger->log_warn(name(), "fsync error: ", reply["errmsg"].String().c_str());
273  }
274  } else {
275  logger->log_warn(name(), "fsync reply has no ok field");
276  }
277  }
278  wait_->wait();
279 }
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
Definition: pcl_adapter.h:62
void wait()
Wait until minimum loop time has been reached.
Definition: wait.cpp:81
std::vector< std::string > get_pointcloud_list() const
Get list of point cloud IDs.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Fawkes library namespace.
Definition: mongodb.h:29
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
Mutex locking helper.
Definition: mutex_locker.h:33
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:51
A class for handling time.
Definition: time.h:91
Thread class encapsulation of pthreads.
Definition: thread.h:42
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:727
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:45
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:242
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:50
Base class for exceptions in Fawkes.
Definition: exception.h:36
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void get_info(const std::string &id, unsigned int &width, unsigned int &height, std::string &frame_id, bool &is_dense, V_PointFieldInfo &pfi)
Get info about point cloud.
void mark_start()
Mark start of loop.
Definition: wait.cpp:70
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
Point cloud adapter class.
Definition: pcl_adapter.h:38
virtual ~MongoLogPointCloudThread()
Destructor.
void lock()
Lock this mutex.
Definition: mutex.cpp:89
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Mutex mutual exclusion lock.
Definition: mutex.h:32
virtual void finalize()
Finalize the thread.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
Time wait utility.
Definition: wait.h:32
virtual void loop()
Code to execute in the thread.
void get_data(const std::string &id, std::string &frame_id, unsigned int &width, unsigned int &height, fawkes::Time &time, void **data_ptr, size_t &point_size, size_t &num_points)
Get current data of point cloud.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:511
virtual void init()
Initialize the thread.