Fawkes API  Fawkes Development Version
mongodb_log_tf_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_tf_thread.cpp - MongoDB transforms logging Thread
4  *
5  * Created: Tue Dec 11 14:58:00 2012 (Freiburg)
6  * Copyright 2010-2012 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "mongodb_log_tf_thread.h"
23 
24 #include <core/threading/mutex_locker.h>
25 #include <tf/time_cache.h>
26 #include <utils/time/wait.h>
27 #include <plugins/mongodb/aspect/mongodb_conncreator.h>
28 
29 // from MongoDB
30 #include <mongo/client/dbclient.h>
31 
32 #include <cstdlib>
33 
34 using namespace mongo;
35 using namespace fawkes;
36 
37 
38 /** @class MongoLogTransformsThread "mongodb_log_tf_thread.h"
39  * MongoDB transforms logging thread.
40  * This thread periodically queries the transformer for recent transforms
41  * and stores them to the database.
42  *
43  * @author Tim Niemueller
44  */
45 
46 /** Constructor. */
48  : Thread("MongoLogTransformsThread", Thread::OPMODE_CONTINUOUS),
49  TransformAspect(TransformAspect::ONLY_LISTENER)
50 {
52 }
53 
54 
55 /** Destructor. */
57 {
58 }
59 
60 
61 void
63 {
64  database_ = "fflog";
65  collection_ = "tf";
66  try {
67  database_ = config->get_string("/plugins/mongodb-log/database");
68  } catch (Exception &e) {
69  logger->log_info(name(), "No database configured, writing to %s",
70  database_.c_str());
71  }
72 
73  try {
74  collection_ = config->get_string("/plugins/mongodb-log/transforms/collection");
75  } catch (Exception &e) {
76  logger->log_info(name(), "No transforms collection configured, using %s",
77  collection_.c_str());
78  }
79  collection_ = database_ + "." + collection_;
80 
81  cfg_storage_interval_ =
82  config->get_float("/plugins/mongodb-log/transforms/storage-interval");
83 
84 
85  if (cfg_storage_interval_ <= 0.) {
86  // 50% of the cache time, assume 50% is enough to store the data
87  cfg_storage_interval_ = tf_listener->get_cache_time() * 0.5;
88  }
89 
90  wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
91  mutex_ = new Mutex();
92 }
93 
94 bool
96 {
97  mutex_->lock();
98  return true;
99 }
100 
101 void
103 {
104  delete wait_;
105  delete mutex_;
106 }
107 
108 
109 void
111 {
112  mutex_->lock();
113  fawkes::Time loop_start(clock);
114  wait_->mark_start();
115  std::vector<fawkes::Time> tf_range_start;
116  std::vector<fawkes::Time> tf_range_end;;
117 
118  tf_listener->lock();
119  std::vector<tf::TimeCacheInterfacePtr> caches = tf_listener->get_frame_caches();
120  std::vector<tf::TimeCacheInterfacePtr> copies(caches.size(), tf::TimeCacheInterfacePtr());
121 
122  const size_t n_caches = caches.size();
123  tf_range_start.resize(n_caches, fawkes::Time(0,0));
124  tf_range_end.resize(n_caches, fawkes::Time(0,0));
125  if (last_tf_range_end_.size() != n_caches) {
126  last_tf_range_end_.resize(n_caches, fawkes::Time(0,0));
127  }
128 
129  unsigned int num_transforms = 0;
130  unsigned int num_upd_caches = 0;
131 
132  for (size_t i = 0; i < n_caches; ++i) {
133  if (caches[i]) {
134  tf_range_end[i] = caches[i]->get_latest_timestamp();
135  if (last_tf_range_end_[i] != tf_range_end[i]) {
136  // we have new data
137  if (! tf_range_end[i].is_zero()) {
138  tf_range_start[i] = tf_range_end[i] - cfg_storage_interval_;
139  if (last_tf_range_end_[i] > tf_range_start[i]) {
140  tf_range_start[i] = last_tf_range_end_[i];
141  }
142  }
143  copies[i] = caches[i]->clone(tf_range_start[i]);
144  last_tf_range_end_[i] = tf_range_end[i];
145  num_upd_caches += 1;
146  num_transforms += copies[i]->get_list_length();
147  }
148  }
149  }
150  tf_listener->unlock();
151 
152  store(copies, tf_range_start, tf_range_end);
153 
154  mutex_->unlock();
155  // -1 to subtract "NO PARENT" pseudo cache
156  fawkes::Time loop_end(clock);
157  logger->log_debug(name(), "%u transforms for %u updated frames stored in %.1f ms",
158  num_transforms, num_upd_caches,
159  (loop_end - &loop_start) * 1000.);
160  wait_->wait();
161 }
162 
163 
164 void
165 MongoLogTransformsThread::store(std::vector<tf::TimeCacheInterfacePtr> &caches,
166  std::vector<fawkes::Time> &from,
167  std::vector<fawkes::Time> &to)
168 {
169  std::vector<std::string> frame_map = tf_listener->get_frame_id_mappings();
170 
171  for (size_t i = 0; i < caches.size(); ++i) {
172  tf::TimeCacheInterfacePtr tc = caches[i];
173  if (! tc) continue;
174 
175  BSONObjBuilder document;
176  document.append("timestamp", (long long) from[i].in_msec());
177  document.append("timestamp_from", (long long) from[i].in_msec());
178  document.append("timestamp_to", (long long) to[i].in_msec());
179  const tf::TimeCache::L_TransformStorage &storage = tc->get_storage();
180 
181  if (storage.empty()) {
182  /*
183  fawkes::Time now(clock);
184  logger->log_warn(name(), "Empty storage for %s (start: %lu end: %lu now: %lu",
185  frame_map[i].c_str(), from[i].in_msec(), to[i].in_msec(),
186  now.in_msec());
187  */
188  continue;
189  }
190 
191  document.append("frame", frame_map[storage.front().frame_id]);
192  document.append("child_frame", frame_map[storage.front().child_frame_id]);
193 
194  BSONArrayBuilder tfl_array(document.subarrayStart("transforms"));
195  /*
196  logger->log_debug(name(), "Writing %zu transforms for child frame %s",
197  storage.size(), frame_map[i].c_str());
198  */
199 
200  tf::TimeCache::L_TransformStorage::const_iterator s;
201  for (s = storage.begin(); s != storage.end(); ++s) {
202  BSONObjBuilder tf_doc(tfl_array.subobjStart());
203 
204  /*
205  "frame" : "/bl_caster_rotation_link",
206  "child_frame" : "/bl_caster_l_wheel_link",
207  "translation" : [
208  0,
209  0.049,
210  0
211  ],
212  "rotation" : [
213  0,
214  0.607301261865804,
215  0,
216  0.7944716340664417
217  ]
218  */
219 
220  tf_doc.append("timestamp", (long long)s->stamp.in_msec());
221  tf_doc.append("frame", frame_map[s->frame_id]);
222  tf_doc.append("child_frame", frame_map[s->child_frame_id]);
223 
224  BSONArrayBuilder rot_arr(tf_doc.subarrayStart("rotation"));
225  rot_arr.append(s->rotation.x());
226  rot_arr.append(s->rotation.y());
227  rot_arr.append(s->rotation.z());
228  rot_arr.append(s->rotation.w());
229  rot_arr.doneFast();
230 
231  BSONArrayBuilder trans_arr(tf_doc.subarrayStart("translation"));
232  trans_arr.append(s->translation.x());
233  trans_arr.append(s->translation.y());
234  trans_arr.append(s->translation.z());
235  trans_arr.doneFast();
236 
237  tf_doc.doneFast();
238  }
239 
240  tfl_array.doneFast();
241 
242  try {
243  mongodb_client->insert(collection_, document.obj());
244  } catch (mongo::DBException &e) {
245  logger->log_warn(name(), "Inserting TF failed: %s", e.what());
246 
247  } catch (std::exception &e) {
248  // for old and broken compilers
249  logger->log_warn(name(), "Inserting TF failed: %s (*)", e.what());
250  }
251  }
252 }
void wait()
Wait until minimum loop time has been reached.
Definition: wait.cpp:81
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
std::vector< std::string > get_frame_id_mappings() const
Get mappings from frame ID to names.
Fawkes library namespace.
Definition: mongodb.h:29
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:51
A class for handling time.
Definition: time.h:91
virtual void init()
Initialize the thread.
void lock()
Lock transformer.
Thread class encapsulation of pthreads.
Definition: thread.h:42
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:727
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
Thread aspect to access the transform system.
Definition: tf.h:42
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:45
float get_cache_time() const
Get cache time.
virtual void finalize()
Finalize the thread.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void loop()
Code to execute in the thread.
void mark_start()
Mark start of loop.
Definition: wait.cpp:70
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
void lock()
Lock this mutex.
Definition: mutex.cpp:89
virtual ~MongoLogTransformsThread()
Destructor.
Mutex mutual exclusion lock.
Definition: mutex.h:32
tf::Transformer * tf_listener
This is the transform listener which saves transforms published by other threads in the system...
Definition: tf.h:70
std::vector< TimeCacheInterfacePtr > get_frame_caches() const
Get all currently existing caches.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
Time wait utility.
Definition: wait.h:32
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.