Fawkes API  Fawkes Development Version
mongodb_log_image_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_image_thread.cpp - Thread to log images to MongoDB
4  *
5  * Created: Tue Apr 10 22:12:38 2012
6  * Copyright 2011-2012 Tim Niemueller [www.niemueller.de]
7  * 2012 Bastian Klingen
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "mongodb_log_image_thread.h"
24 
25 #include <core/threading/mutex_locker.h>
26 #include <fvutils/ipc/shm_image.h>
27 #include <fvutils/color/colorspaces.h>
28 #include <utils/time/wait.h>
29 
30 // from MongoDB
31 #include <mongo/client/dbclient.h>
32 #include <mongo/client/gridfs.h>
33 
34 #include <fnmatch.h>
35 
36 using namespace fawkes;
37 using namespace firevision;
38 using namespace mongo;
39 
40 /** @class MongoLogImagesThread "mongodb_log_image_thread.h"
41  * Thread to export Fawkes images to MongoDB.
42  * @author Tim Niemueller
43  * @author Bastian Klingen
44  */
45 
46 /** Constructor. */
48  : Thread("MongoLogImagesThread", Thread::OPMODE_CONTINUOUS)
49 {
51 }
52 
53 /** Destructor. */
55 {
56 }
57 
58 
59 
60 void
62 {
63  database_ = "fflog";
64  try {
65  database_ = config->get_string("/plugins/mongodb-log/database");
66  } catch (Exception &e) {
67  logger->log_info(name(), "No database configured, writing to %s",
68  database_.c_str());
69  }
70 
71  cfg_storage_interval_ =
72  config->get_float("/plugins/mongodb-log/images/storage-interval");
73 
74  cfg_chunk_size_ = 2097152; // 2 MB
75  try {
76  cfg_chunk_size_ = config->get_uint("/plugins/mongodb-log/images/chunk-size");
77  } catch (Exception &e) {} // ignored, use default
78  logger->log_info(name(), "Chunk size: %u", cfg_chunk_size_);
79 
80  try {
81  includes_ = config->get_strings("/plugins/mongodb-log/images/includes");
82  } catch (Exception &e) {} // ignored, no include rules
83  try {
84  excludes_ = config->get_strings("/plugins/mongodb-log/images/excludes");
85  } catch (Exception &e) {} // ignored, no include rules
86 
87  mongodb_ = mongodb_client;
88  gridfs_ = new GridFS(*mongodb_, database_);
89 
90  last_update_ = new Time(clock);
91  now_ = new Time(clock);
92  wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
93  mutex_ = new Mutex();
94  update_images();
95 }
96 
97 bool
99 {
100  mutex_->lock();
101  return true;
102 }
103 
104 void
106 {
107  logger->log_debug(name(), "Finalizing MongoLogImagesThread");
108  std::map<std::string, ImageInfo>::iterator p;
109  for (p = imgs_.begin(); p != imgs_.end(); ++p) {
110  delete p->second.img;
111  }
112  imgs_.clear();
113  delete gridfs_;
114  delete wait_;
115  delete mutex_;
116  delete now_;
117  delete last_update_;
118 }
119 
120 
121 void
123 {
124  MutexLocker lock(mutex_);
125  fawkes::Time loop_start(clock);
126  wait_->mark_start();
127  unsigned int num_stored = 0;
128 
129  now_->stamp();
130  if (*now_ - last_update_ >= 5.0) {
131  *last_update_ = now_;
132  update_images();
133  }
134 
135  std::map<std::string, ImageInfo>::iterator p;
136  for (p = imgs_.begin(); p != imgs_.end(); ++p) {
137  ImageInfo &imginfo = p->second;
138 
139  fawkes::Time cap_time = imginfo.img->capture_time();
140 
141  if ((imginfo.last_sent != cap_time)) {
142  BSONObjBuilder document;
143  imginfo.last_sent = cap_time;
144  document.append("timestamp", (long long) cap_time.in_msec());
145 
146  BSONObjBuilder subb(document.subobjStart("image"));
147  subb.append("image_id", imginfo.img->image_id());
148  subb.append("width", imginfo.img->width());
149  subb.append("height", imginfo.img->height());
150  subb.append("colorspace", colorspace_to_string(imginfo.img->colorspace()));
151 
152  std::stringstream name;
153  name << imginfo.topic_name << "_" << cap_time.in_msec();
154  subb.append("data", gridfs_->storeFile((char*) imginfo.img->buffer(), imginfo.img->data_size(), name.str()));
155 
156  subb.doneFast();
157  collection_ = database_ + "." + imginfo.topic_name;
158  try {
159  mongodb_->insert(collection_, document.obj());
160  ++num_stored;
161  } catch (mongo::DBException &e) {
162  logger->log_warn(this->name(), "Failed to insert image %s into %s: %s",
163  imginfo.img->image_id(), collection_.c_str(), e.what());
164  }
165  }
166  }
167 
168  mutex_->unlock();
169  fawkes::Time loop_end(clock);
170  logger->log_debug(name(), "Stored %u of %zu images in %.1f ms",
171  num_stored, imgs_.size(), (loop_end - &loop_start) * 1000.);
172  wait_->wait();
173 }
174 
175 
176 void
177 MongoLogImagesThread::update_images()
178 {
179  std::set<std::string> missing_images;
180  std::set<std::string> unbacked_images;
181  get_sets(missing_images, unbacked_images);
182 
183  if (! unbacked_images.empty()) {
184  std::set<std::string>::iterator i;
185  for (i = unbacked_images.begin(); i != unbacked_images.end(); ++i) {
186  logger->log_info(name(), "Shutting down MongoLog for no longer available image %s",
187  i->c_str());
188  ImageInfo &imginfo = imgs_[*i];
189  delete imginfo.img;
190  imgs_.erase(*i);
191  }
192  }
193 
194  if (! missing_images.empty()) {
195  std::set<std::string>::iterator i;
196  for (i = missing_images.begin(); i != missing_images.end(); ++i) {
197 
198  std::vector<std::string>::iterator f;
199  bool include = includes_.empty();
200  if (! include) {
201  for (f = includes_.begin(); f != includes_.end(); ++f) {
202  if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
203  include = true;
204  break;
205  }
206  }
207  }
208  if (include) {
209  for (f = excludes_.begin(); f != excludes_.end(); ++f) {
210  if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
211  include = false;
212  break;
213  }
214  }
215  }
216  if (! include) {
217  //logger->log_info(name(), "Excluding image %s", i->c_str());
218  continue;
219  }
220 
221  logger->log_info(name(), "Starting to log image %s", i->c_str());
222 
223  std::string topic_name = std::string("Images.") + *i;
224  size_t pos = 0;
225  while ((pos = topic_name.find_first_of(" -", pos)) != std::string::npos) {
226  topic_name.replace(pos, 1, "_");
227  pos = pos + 1;
228  }
229 
230  ImageInfo &imginfo = imgs_[*i];
231  imginfo.topic_name = topic_name;
232  imginfo.img = new SharedMemoryImageBuffer(i->c_str());
233  }
234  }
235 }
236 
237 
238 void
239 MongoLogImagesThread::get_sets(std::set<std::string> &missing_images,
240  std::set<std::string> &unbacked_images)
241 {
242  std::set<std::string> published_images;
243  std::map<std::string, ImageInfo>::iterator p;
244  for (p = imgs_.begin(); p != imgs_.end(); ++p) {
245  if (p->second.img->num_attached() > 1) {
246  published_images.insert(p->first);
247  }
248  }
249 
250  std::set<std::string> image_buffers;
253  SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
254  SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
255 
256  while ( i != endi ) {
258  dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
259  if ( ih ) {
260  image_buffers.insert(ih->image_id());
261  }
262  ++i;
263  }
264  delete h;
265 
266  missing_images.clear();
267  unbacked_images.clear();
268 
269  std::set_difference(image_buffers.begin(), image_buffers.end(),
270  published_images.begin(), published_images.end(),
271  std::inserter(missing_images, missing_images.end()));
272 
273  std::set_difference(published_images.begin(), published_images.end(),
274  image_buffers.begin(), image_buffers.end(),
275  std::inserter(unbacked_images, unbacked_images.end()));
276 }
void wait()
Wait until minimum loop time has been reached.
Definition: wait.cpp:81
Shared memory image buffer header.
Definition: shm_image.h:69
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Fawkes library namespace.
Definition: mongodb.h:29
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
Mutex locking helper.
Definition: mutex_locker.h:33
Shared Memory iterator.
Definition: shm.h:114
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:51
virtual ~MongoLogImagesThread()
Destructor.
A class for handling time.
Definition: time.h:91
Thread class encapsulation of pthreads.
Definition: thread.h:42
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:727
virtual void finalize()
Finalize the thread.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:45
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:242
virtual void init()
Initialize the thread.
Base class for exceptions in Fawkes.
Definition: exception.h:36
const char * image_id() const
Get image number.
Definition: shm_image.cpp:881
virtual void loop()
Code to execute in the thread.
Shared memory image buffer.
Definition: shm_image.h:181
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void mark_start()
Mark start of loop.
Definition: wait.cpp:70
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
void lock()
Lock this mutex.
Definition: mutex.cpp:89
Time & stamp()
Set this time to the current time.
Definition: time.cpp:783
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Mutex mutual exclusion lock.
Definition: mutex.h:32
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
Time wait utility.
Definition: wait.h:32
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.