22 #include "mongodb_log_tf_thread.h" 24 #include <core/threading/mutex_locker.h> 25 #include <tf/time_cache.h> 26 #include <utils/time/wait.h> 27 #include <plugins/mongodb/aspect/mongodb_conncreator.h> 30 #include <mongo/client/dbclient.h> 34 using namespace mongo;
48 :
Thread(
"MongoLogTransformsThread",
Thread::OPMODE_CONTINUOUS),
74 collection_ =
config->
get_string(
"/plugins/mongodb-log/transforms/collection");
79 collection_ = database_ +
"." + collection_;
81 cfg_storage_interval_ =
85 if (cfg_storage_interval_ <= 0.) {
90 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
115 std::vector<fawkes::Time> tf_range_start;
116 std::vector<fawkes::Time> tf_range_end;;
120 std::vector<tf::TimeCacheInterfacePtr> copies(caches.size(), tf::TimeCacheInterfacePtr());
122 const size_t n_caches = caches.size();
125 if (last_tf_range_end_.size() != n_caches) {
129 unsigned int num_transforms = 0;
130 unsigned int num_upd_caches = 0;
132 for (
size_t i = 0; i < n_caches; ++i) {
134 tf_range_end[i] = caches[i]->get_latest_timestamp();
135 if (last_tf_range_end_[i] != tf_range_end[i]) {
137 if (! tf_range_end[i].is_zero()) {
138 tf_range_start[i] = tf_range_end[i] - cfg_storage_interval_;
139 if (last_tf_range_end_[i] > tf_range_start[i]) {
140 tf_range_start[i] = last_tf_range_end_[i];
143 copies[i] = caches[i]->clone(tf_range_start[i]);
144 last_tf_range_end_[i] = tf_range_end[i];
146 num_transforms += copies[i]->get_list_length();
152 store(copies, tf_range_start, tf_range_end);
158 num_transforms, num_upd_caches,
159 (loop_end - &loop_start) * 1000.);
165 MongoLogTransformsThread::store(std::vector<tf::TimeCacheInterfacePtr> &caches,
166 std::vector<fawkes::Time> &from,
167 std::vector<fawkes::Time> &to)
171 for (
size_t i = 0; i < caches.size(); ++i) {
172 tf::TimeCacheInterfacePtr tc = caches[i];
175 BSONObjBuilder document;
176 document.append(
"timestamp", (
long long) from[i].in_msec());
177 document.append(
"timestamp_from", (
long long) from[i].in_msec());
178 document.append(
"timestamp_to", (
long long) to[i].in_msec());
179 const tf::TimeCache::L_TransformStorage &storage = tc->get_storage();
181 if (storage.empty()) {
191 document.append(
"frame", frame_map[storage.front().frame_id]);
192 document.append(
"child_frame", frame_map[storage.front().child_frame_id]);
194 BSONArrayBuilder tfl_array(document.subarrayStart(
"transforms"));
200 tf::TimeCache::L_TransformStorage::const_iterator s;
201 for (s = storage.begin(); s != storage.end(); ++s) {
202 BSONObjBuilder tf_doc(tfl_array.subobjStart());
220 tf_doc.append(
"timestamp", (
long long)s->stamp.in_msec());
221 tf_doc.append(
"frame", frame_map[s->frame_id]);
222 tf_doc.append(
"child_frame", frame_map[s->child_frame_id]);
224 BSONArrayBuilder rot_arr(tf_doc.subarrayStart(
"rotation"));
225 rot_arr.append(s->rotation.x());
226 rot_arr.append(s->rotation.y());
227 rot_arr.append(s->rotation.z());
228 rot_arr.append(s->rotation.w());
231 BSONArrayBuilder trans_arr(tf_doc.subarrayStart(
"translation"));
232 trans_arr.append(s->translation.x());
233 trans_arr.append(s->translation.y());
234 trans_arr.append(s->translation.z());
235 trans_arr.doneFast();
240 tfl_array.doneFast();
244 }
catch (mongo::DBException &e) {
247 }
catch (std::exception &e) {
void wait()
Wait until minimum loop time has been reached.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Fawkes library namespace.
void unlock()
Unlock the mutex.
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
A class for handling time.
Thread class encapsulation of pthreads.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Logger * logger
This is the Logger member used to access the logger.
Clock * clock
By means of this member access to the clock is given.
Base class for exceptions in Fawkes.
const char * name() const
Get name of thread.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void mark_start()
Mark start of loop.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
void lock()
Lock this mutex.
Mutex mutual exclusion lock.
Configuration * config
This is the Configuration member used to access the configuration.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.