23 #include "mongodb_log_image_thread.h" 25 #include <core/threading/mutex_locker.h> 26 #include <fvutils/ipc/shm_image.h> 27 #include <fvutils/color/colorspaces.h> 28 #include <utils/time/wait.h> 31 #include <mongo/client/dbclient.h> 32 #include <mongo/client/gridfs.h> 38 using namespace mongo;
48 :
Thread(
"MongoLogImagesThread",
Thread::OPMODE_CONTINUOUS)
71 cfg_storage_interval_ =
74 cfg_chunk_size_ = 2097152;
76 cfg_chunk_size_ =
config->
get_uint(
"/plugins/mongodb-log/images/chunk-size");
88 gridfs_ =
new GridFS(*mongodb_, database_);
92 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
108 std::map<std::string, ImageInfo>::iterator p;
109 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
110 delete p->second.img;
127 unsigned int num_stored = 0;
130 if (*now_ - last_update_ >= 5.0) {
131 *last_update_ = now_;
135 std::map<std::string, ImageInfo>::iterator p;
136 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
137 ImageInfo &imginfo = p->second;
141 if ((imginfo.last_sent != cap_time)) {
142 BSONObjBuilder document;
143 imginfo.last_sent = cap_time;
144 document.append(
"timestamp", (
long long) cap_time.
in_msec());
146 BSONObjBuilder subb(document.subobjStart(
"image"));
147 subb.append(
"image_id", imginfo.img->image_id());
148 subb.append(
"width", imginfo.img->width());
149 subb.append(
"height", imginfo.img->height());
150 subb.append(
"colorspace", colorspace_to_string(imginfo.img->colorspace()));
152 std::stringstream
name;
153 name << imginfo.topic_name <<
"_" << cap_time.
in_msec();
154 subb.append(
"data", gridfs_->storeFile((
char*) imginfo.img->buffer(), imginfo.img->data_size(), name.str()));
157 collection_ = database_ +
"." + imginfo.topic_name;
159 mongodb_->insert(collection_, document.obj());
161 }
catch (mongo::DBException &e) {
163 imginfo.img->image_id(), collection_.c_str(), e.what());
171 num_stored, imgs_.size(), (loop_end - &loop_start) * 1000.);
177 MongoLogImagesThread::update_images()
179 std::set<std::string> missing_images;
180 std::set<std::string> unbacked_images;
181 get_sets(missing_images, unbacked_images);
183 if (! unbacked_images.empty()) {
184 std::set<std::string>::iterator i;
185 for (i = unbacked_images.begin(); i != unbacked_images.end(); ++i) {
188 ImageInfo &imginfo = imgs_[*i];
194 if (! missing_images.empty()) {
195 std::set<std::string>::iterator i;
196 for (i = missing_images.begin(); i != missing_images.end(); ++i) {
198 std::vector<std::string>::iterator f;
199 bool include = includes_.empty();
201 for (f = includes_.begin(); f != includes_.end(); ++f) {
202 if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
209 for (f = excludes_.begin(); f != excludes_.end(); ++f) {
210 if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
223 std::string topic_name = std::string(
"Images.") + *i;
225 while ((pos = topic_name.find_first_of(
" -", pos)) != std::string::npos) {
226 topic_name.replace(pos, 1,
"_");
230 ImageInfo &imginfo = imgs_[*i];
231 imginfo.topic_name = topic_name;
239 MongoLogImagesThread::get_sets(std::set<std::string> &missing_images,
240 std::set<std::string> &unbacked_images)
242 std::set<std::string> published_images;
243 std::map<std::string, ImageInfo>::iterator p;
244 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
245 if (p->second.img->num_attached() > 1) {
246 published_images.insert(p->first);
250 std::set<std::string> image_buffers;
253 SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
256 while ( i != endi ) {
260 image_buffers.insert(ih->
image_id());
266 missing_images.clear();
267 unbacked_images.clear();
269 std::set_difference(image_buffers.begin(), image_buffers.end(),
270 published_images.begin(), published_images.end(),
271 std::inserter(missing_images, missing_images.end()));
273 std::set_difference(published_images.begin(), published_images.end(),
274 image_buffers.begin(), image_buffers.end(),
275 std::inserter(unbacked_images, unbacked_images.end()));
void wait()
Wait until minimum loop time has been reached.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
MongoLogImagesThread()
Constructor.
Fawkes library namespace.
void unlock()
Unlock the mutex.
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
virtual ~MongoLogImagesThread()
Destructor.
A class for handling time.
Thread class encapsulation of pthreads.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
virtual void finalize()
Finalize the thread.
Logger * logger
This is the Logger member used to access the logger.
Clock * clock
By means of this member access to the clock is given.
long in_msec() const
Convert the stored time into milli-seconds.
virtual void init()
Initialize the thread.
Base class for exceptions in Fawkes.
virtual void loop()
Code to execute in the thread.
Shared memory image buffer.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
const char * name() const
Get name of thread.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void mark_start()
Mark start of loop.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
void lock()
Lock this mutex.
Time & stamp()
Set this time to the current time.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Mutex mutual exclusion lock.
Configuration * config
This is the Configuration member used to access the configuration.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.