Fawkes API  Fawkes Development Version
pcl_db_store_thread.cpp
1 
2 /***************************************************************************
3  * pcl_db_store_thread.cpp - Store point clouds to MongoDB
4  *
5  * Created: Mon May 05 14:26:22 2014
6  * Copyright 2012-2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "pcl_db_store_thread.h"
23 #include <interfaces/PclDatabaseStoreInterface.h>
24 
25 #include <blackboard/utils/on_message_waker.h>
26 #include <pcl_utils/pcl_adapter.h>
27 
28 // from MongoDB
29 #include <mongo/client/dbclient.h>
30 #include <mongo/client/gridfs.h>
31 
32 #define CFG_PREFIX "/perception/pcl-db/"
33 
34 using namespace fawkes;
35 using namespace mongo;
36 
37 /** @class PointCloudDBStoreThread "pcl_db_store_thread.h"
38  * Thread to store point clouds from database on request.
39  * @author Tim Niemueller
40  */
41 
42 
43 /** Constructor. */
45  : Thread("PointCloudDBStoreThread", Thread::OPMODE_WAITFORWAKEUP)
46 {
47 }
48 
49 
50 /** Destructor. */
52 {
53 }
54 
55 
56 void
58 {
59  store_if_ = NULL;
60  adapter_ = NULL;
61  msg_waker_ = NULL;
62 
63  cfg_database_ = config->get_string(CFG_PREFIX"database-name");
64 
65  adapter_ = new PointCloudAdapter(pcl_manager, logger);
66 
67  try {
68  store_if_ =
70 
71  msg_waker_ = new BlackBoardOnMessageWaker(blackboard, store_if_, this);
72  } catch (Exception &e) {
73  finalize();
74  throw;
75  }
76 }
77 
78 void
80 {
81  delete msg_waker_;
82  blackboard->close(store_if_);
83  delete adapter_;
84 }
85 
86 
87 void
89 {
90  if (store_if_->msgq_empty()) return;
91 
93  store_if_->msgq_first_safe(msg))
94  {
95  store_if_->set_final(false);
96  store_if_->set_msgid(msg->id());
97  store_if_->set_error("");
98  store_if_->write();
99 
100  std::string msg_database = msg->database();
101  std::string msg_collection = msg->collection();
102 
103  bool store = true;
104  std::string errmsg;
105  std::string database = (msg_database != "") ? msg_database : cfg_database_;
106  std::string collection = database + ".";
107  if (msg_collection == "") {
108  collection += "pcls";
109  } else if (msg_collection.find("fs.") == 0) {
110  errmsg = "Passed in collection uses GridFS namespace";
111  store = false;
112  } else {
113  collection += msg->collection();
114  }
115 
116  if (store) store_pointcloud(msg->pcl_id(), database, collection, errmsg);
117 
118  store_if_->set_error(errmsg.c_str());
119  store_if_->set_final(true);
120  store_if_->write();
121  } else {
122  logger->log_warn(name(), "Unhandled message received");
123  }
124  store_if_->msgq_pop();
125 }
126 
127 
128 bool
129 PointCloudDBStoreThread::store_pointcloud(std::string pcl_id, std::string database,
130  std::string collection, std::string &errmsg)
131 {
132  if ( ! pcl_manager->exists_pointcloud(pcl_id.c_str())) {
133  errmsg = "PointCloud does not exist";
134  return false;
135  }
136 
137  std::string frame_id;
138  unsigned int width, height;
139  bool is_dense;
140  void *point_data;
141  size_t point_size, num_points;
142  fawkes::Time time;
144 
145  adapter_->get_data_and_info(pcl_id, frame_id, is_dense, width, height, time,
146  fieldinfo, &point_data, point_size, num_points);
147 
148  size_t data_size = point_size * num_points;
149 
150  BSONObjBuilder document;
151  document.append("timestamp", (long long) time.in_msec());
152  BSONObjBuilder subb(document.subobjStart("pointcloud"));
153  subb.append("frame_id", frame_id);
154  subb.append("is_dense", is_dense);
155  subb.append("width", width);
156  subb.append("height", height);
157  subb.append("point_size", (unsigned int)point_size);
158  subb.append("num_points", (unsigned int)num_points);
159 
160  GridFS gridfs(*mongodb_client, database);
161 
162  std::stringstream name;
163  name << "pcl_" << time.in_msec();
164  subb.append("data", gridfs.storeFile((char*) point_data, data_size, name.str()));
165 
166  BSONArrayBuilder subb2(subb.subarrayStart("field_info"));
167  for (unsigned int i = 0; i < fieldinfo.size(); i++) {
168  BSONObjBuilder fi(subb2.subobjStart());
169  fi.append("name", fieldinfo[i].name);
170  fi.append("offset", fieldinfo[i].offset);
171  fi.append("datatype", fieldinfo[i].datatype);
172  fi.append("count", fieldinfo[i].count);
173  fi.doneFast();
174  }
175  subb2.doneFast();
176  subb.doneFast();
177  try {
178  mongodb_client->insert(collection, document.obj());
179  } catch (mongo::DBException &e) {
180  logger->log_warn(this->name(), "Failed to insert into %s: %s",
181  collection.c_str(), e.what());
182  errmsg = e.what();
183  return false;
184  }
185 
186  return true;
187 }
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
Definition: pcl_adapter.h:62
void set_msgid(const uint32_t new_msgid)
Set msgid value.
virtual void loop()
Code to execute in the thread.
void set_final(const bool new_final)
Set final value.
bool msgq_empty()
Check if queue is empty.
Definition: interface.cpp:1048
void set_error(const char *new_error)
Set error value.
Fawkes library namespace.
Definition: mongodb.h:29
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:51
Wake threads on receiving a blackboard message.
A class for handling time.
Definition: time.h:91
PointCloudDBStoreThread()
Constructor.
Thread class encapsulation of pthreads.
Definition: thread.h:42
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:500
virtual void init()
Initialize the thread.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
void get_data_and_info(const std::string &id, std::string &frame_id, bool &is_dense, unsigned int &width, unsigned int &height, fawkes::Time &time, V_PointFieldInfo &pfi, void **data_ptr, size_t &point_size, size_t &num_points)
Get data and info of point cloud.
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:242
void msgq_pop()
Erase first message from queue.
Definition: interface.cpp:1193
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:50
Base class for exceptions in Fawkes.
Definition: exception.h:36
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
MessageType * msgq_first_safe(MessageType *&msg)
Get first message casted to the desired type without exceptions.
Definition: interface.h:302
Point cloud adapter class.
Definition: pcl_adapter.h:38
StoreMessage Fawkes BlackBoard Interface Message.
PclDatabaseStoreInterface Fawkes BlackBoard Interface.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual ~PointCloudDBStoreThread()
Destructor.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
virtual void finalize()
Finalize the thread.
bool exists_pointcloud(const char *id)
Check if point cloud exists.
virtual void close(Interface *interface)=0
Close interface.