Fawkes API  Fawkes Development Version
mongodb_log_bb_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_bb_thread.cpp - MongoDB blackboard logging Thread
4  *
5  * Created: Wed Dec 08 23:09:29 2010
6  * Copyright 2010-2012 Tim Niemueller [www.niemueller.de]
7  * 2012 Bastian Klingen
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "mongodb_log_bb_thread.h"
24 
25 #include <core/threading/mutex_locker.h>
26 #include <plugins/mongodb/aspect/mongodb_conncreator.h>
27 #include <cstdlib>
28 
29 // from MongoDB
30 #include <mongo/client/dbclient.h>
31 
32 #include <fnmatch.h>
33 
34 using namespace mongo;
35 using namespace fawkes;
36 
37 
38 /** @class MongoLogBlackboardThread "mongodb_thread.h"
39  * MongoDB Logging Thread.
40  * This thread registers to interfaces specified with patterns in the
41  * configurationa and logs any changes to MongoDB.
42  *
43  * @author Tim Niemueller
44  */
45 
46 /** Constructor. */
48  : Thread("MongoLogBlackboardThread", Thread::OPMODE_WAITFORWAKEUP)
49 {
50 }
51 
52 
53 /** Destructor. */
55 {
56 }
57 
58 
59 void
61 {
62  now_ = new Time(clock);
63  database_ = "fflog";
64  try {
65  database_ = config->get_string("/plugins/mongodb-log/database");
66  } catch (Exception &e) {
67  logger->log_info(name(), "No database configured, writing to %s",
68  database_.c_str());
69  }
70 
71  std::vector<std::string> includes;
72  try {
73  includes = config->get_strings("/plugins/mongodb-log/blackboard/includes");
74  } catch (Exception &e) {} // ignored, no include rules
75  try {
76  excludes_ = config->get_strings("/plugins/mongodb-log/blackboard/excludes");
77  } catch (Exception &e) {} // ignored, no include rules
78 
79  if (includes.empty()) {
80  includes.push_back("*");
81  }
82 
83  std::vector<std::string>::iterator i;
84  std::vector<std::string>::iterator e;
85  for (i = includes.begin(); i != includes.end(); ++i) {
86  bbio_add_observed_create("*", i->c_str());
87 
88  std::list<Interface *> current_interfaces =
89  blackboard->open_multiple_for_reading("*", i->c_str());
90 
91  std::list<Interface *>::iterator i;
92  for (i = current_interfaces.begin(); i != current_interfaces.end(); ++i) {
93  bool exclude = false;
94  for (e = excludes_.begin(); e != excludes_.end(); ++e) {
95  if (fnmatch(e->c_str(), (*i)->id(), 0) != FNM_NOMATCH) {
96  logger->log_debug(name(), "Excluding '%s' by config rule", (*i)->uid());
97  blackboard->close(*i);
98  exclude = true;
99  break;
100  }
101  }
102  if (exclude) continue;
103 
104  logger->log_debug(name(), "Adding %s", (*i)->uid());
105  mongo::DBClientBase *mc = mongodb_connmgr->create_client();
106  listeners_[(*i)->uid()] = new InterfaceListener(blackboard, *i, mc, database_,
107  collections_, logger, now_);
108  }
109  }
110 
112 }
113 
114 
115 void
117 {
119 
120  std::map<std::string, InterfaceListener *>::iterator i;
121  for (i = listeners_.begin(); i != listeners_.end(); ++i) {
122  mongo::DBClientBase *mc = i->second->mongodb_client();
123  delete i->second;
125  }
126  listeners_.clear();
127 }
128 
129 
130 void
132 {
133 }
134 
135 // for BlackBoardInterfaceObserver
136 void
137 MongoLogBlackboardThread::bb_interface_created(const char *type, const char *id) throw()
138 {
139  MutexLocker lock(listeners_.mutex());
140 
141  std::vector<std::string>::iterator e;
142  for (e = excludes_.begin(); e != excludes_.end(); ++e) {
143  if (fnmatch(e->c_str(), id, 0) != FNM_NOMATCH) {
144  logger->log_debug(name(), "Ignoring excluded interface '%s::%s'", type, id);
145  return;
146  }
147  }
148 
149  try {
150  Interface *interface = blackboard->open_for_reading(type, id);
151  if (listeners_.find(interface->uid()) == listeners_.end()) {
152  logger->log_debug(name(), "Opening new %s", interface->uid());
153  mongo::DBClientBase *mc = mongodb_connmgr->create_client();
154  listeners_[interface->uid()] = new InterfaceListener(blackboard, interface, mc,
155  database_, collections_,
156  logger, now_);
157  } else {
158  logger->log_warn(name(), "Interface %s already opened", interface->uid());
159  blackboard->close(interface);
160  }
161  } catch (Exception &e) {
162  logger->log_warn(name(), "Failed to open interface %s::%s, exception follows",
163  type, id);
164  logger->log_warn(name(), e);
165  }
166 }
167 
168 
169 
170 
171 /** Constructor.
172  * @param blackboard blackboard
173  * @param interface interface to listen for
174  * @param mongodb MongoDB client to write to
175  * @param database name of database to write to
176  * @param colls collections
177  * @param logger logger
178  * @param now Time
179  */
180 MongoLogBlackboardThread::InterfaceListener::InterfaceListener(BlackBoard *blackboard,
181  Interface *interface,
182  mongo::DBClientBase *mongodb,
183  std::string &database,
184  LockSet<std::string> &colls,
185  Logger *logger, Time *now)
186  : BlackBoardInterfaceListener("MongoLogListener-%s", interface->uid()),
187  database_(database), collections_(colls)
188 {
189  blackboard_ = blackboard;
190  interface_ = interface;
191  mongodb_ = mongodb;
192  logger_ = logger;
193  now_ = now;
194 
195  // sanitize interface ID to be suitable for MongoDB
196  std::string id = interface->id();
197  size_t pos = 0;
198  while((pos = id.find_first_of(" -", pos)) != std::string::npos) {
199  id.replace(pos, 1, "_");
200  pos = pos + 1;
201  }
202  collection_ = database_ + "." + interface->type() + "." + id;
203  if (collections_.find(collection_) != collections_.end()) {
204  throw Exception("Collection named %s already used, cannot log %s",
205  collection_.c_str(), interface->uid());
206  }
207 
208  bbil_add_data_interface(interface);
209  blackboard_->register_listener(this, BlackBoard::BBIL_FLAG_DATA);
210 }
211 
212 
213 /** Destructor. */
214 MongoLogBlackboardThread::InterfaceListener::~InterfaceListener()
215 {
216  blackboard_->unregister_listener(this);
217 }
218 
219 void
220 MongoLogBlackboardThread::InterfaceListener::bb_interface_data_changed(Interface *interface)
221  throw()
222 {
223  now_->stamp();
224  interface->read();
225 
226  try {
227  // write interface data
228  BSONObjBuilder document;
229  document.append("timestamp", (long long) now_->in_msec());
231  for (i = interface->fields(); i != interface->fields_end(); ++i) {
232  size_t length = i.get_length();
233  bool is_array = (length > 1);
234 
235  switch (i.get_type()) {
236  case IFT_BOOL:
237  if (is_array) {
238  bool *bools = i.get_bools();
239  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
240  for (size_t l = 0; l < length; ++l) {
241  subb.append(bools[l]);
242  }
243  subb.doneFast();
244  } else {
245  document.append(i.get_name(), i.get_bool());
246  }
247  break;
248 
249  case IFT_INT8:
250  if (is_array) {
251  int8_t *ints = i.get_int8s();
252  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
253  for (size_t l = 0; l < length; ++l) {
254  subb.append(ints[l]);
255  }
256  subb.doneFast();
257  } else {
258  document.append(i.get_name(), i.get_int8());
259  }
260  break;
261 
262  case IFT_UINT8:
263  if (is_array) {
264  uint8_t *ints = i.get_uint8s();
265  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
266  for (size_t l = 0; l < length; ++l) {
267  subb.append(ints[l]);
268  }
269  subb.doneFast();
270  } else {
271  document.append(i.get_name(), i.get_uint8());
272  }
273  break;
274 
275  case IFT_INT16:
276  if (is_array) {
277  int16_t *ints = i.get_int16s();
278  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
279  for (size_t l = 0; l < length; ++l) {
280  subb.append(ints[l]);
281  }
282  subb.doneFast();
283  } else {
284  document.append(i.get_name(), i.get_int16());
285  }
286  break;
287 
288  case IFT_UINT16:
289  if (is_array) {
290  uint16_t *ints = i.get_uint16s();
291  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
292  for (size_t l = 0; l < length; ++l) {
293  subb.append(ints[l]);
294  }
295  subb.doneFast();
296  } else {
297  document.append(i.get_name(), i.get_uint16());
298  }
299  break;
300 
301  case IFT_INT32:
302  if (is_array) {
303  int32_t *ints = i.get_int32s();
304  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
305  for (size_t l = 0; l < length; ++l) {
306  subb.append(ints[l]);
307  }
308  subb.doneFast();
309  } else {
310  document.append(i.get_name(), i.get_int32());
311  }
312  break;
313 
314  case IFT_UINT32:
315  if (is_array) {
316  uint32_t *ints = i.get_uint32s();
317  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
318  for (size_t l = 0; l < length; ++l) {
319  subb.append(ints[l]);
320  }
321  subb.doneFast();
322  } else {
323  document.append(i.get_name(), i.get_uint32());
324  }
325  break;
326 
327  case IFT_INT64:
328  if (is_array) {
329  int64_t *ints = i.get_int64s();
330  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
331  for (size_t l = 0; l < length; ++l) {
332  subb.append((long long int)ints[l]);
333  }
334  subb.doneFast();
335  } else {
336  document.append(i.get_name(), (long long int)i.get_int64());
337  }
338  break;
339 
340  case IFT_UINT64:
341  if (is_array) {
342  uint64_t *ints = i.get_uint64s();
343  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
344  for (size_t l = 0; l < length; ++l) {
345  subb.append((long long int)ints[l]);
346  }
347  subb.doneFast();
348  } else {
349  document.append(i.get_name(), (long long int)i.get_uint64());
350  }
351  break;
352 
353  case IFT_FLOAT:
354  if (is_array) {
355  float *floats = i.get_floats();
356  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
357  for (size_t l = 0; l < length; ++l) {
358  subb.append(floats[l]);
359  }
360  subb.doneFast();
361  } else {
362  document.append(i.get_name(), i.get_float());
363  }
364  break;
365 
366  case IFT_DOUBLE:
367  if (is_array) {
368  double *doubles = i.get_doubles();
369  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
370  for (size_t l = 0; l < length; ++l) {
371  subb.append(doubles[l]);
372  }
373  subb.doneFast();
374  } else {
375  document.append(i.get_name(), i.get_double());
376  }
377  break;
378 
379  case IFT_STRING:
380  document.append(i.get_name(), i.get_string());
381  break;
382 
383  case IFT_BYTE:
384  if (is_array) {
385  document.appendBinData(i.get_name(), length,
386  BinDataGeneral, i.get_bytes());
387  } else {
388  document.append(i.get_name(), i.get_byte());
389  }
390  break;
391 
392  case IFT_ENUM:
393  if (is_array) {
394  int32_t *ints = i.get_enums();
395  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
396  for (size_t l = 0; l < length; ++l) {
397  subb.append(ints[l]);
398  }
399  subb.doneFast();
400  } else {
401  document.append(i.get_name(), i.get_enum());
402  }
403  break;
404  }
405  }
406 
407  mongodb_->insert(collection_, document.obj());
408  } catch (mongo::DBException &e) {
409  logger_->log_warn(bbil_name(), "Failed to log to %s: %s",
410  collection_.c_str(), e.what());
411  } catch (std::exception &e) {
412  logger_->log_warn(bbil_name(), "Failed to log to %s: %s (*)",
413  collection_.c_str(), e.what());
414  }
415 }
64 bit integer field
Definition: types.h:43
Interface field iterator.
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: blackboard.cpp:230
virtual mongo::DBClientBase * create_client(const char *config_name=0)=0
Create a new MongoDB client.
uint8_t * get_bytes() const
Get value of current field as byte array.
uint16_t get_uint16(unsigned int index=0) const
Get value of current field as unsigned integer.
int32_t * get_enums() const
Get value of current enum field as integer array.
int32_t get_enum(unsigned int index=0) const
Get value of current enum field as integer.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
double get_double(unsigned int index=0) const
Get value of current field as double.
uint16_t * get_uint16s() const
Get value of current field as unsigned integer array.
Fawkes library namespace.
Definition: mongodb.h:29
bool get_bool(unsigned int index=0) const
Get value of current field as bool.
8 bit unsigned integer field
Definition: types.h:38
Mutex locking helper.
Definition: mutex_locker.h:33
float * get_floats() const
Get value of current field as float array.
16 bit unsigned integer field
Definition: types.h:40
const char * id() const
Get identifier of interface.
Definition: interface.cpp:661
virtual void loop()
Code to execute in the thread.
interface_fieldtype_t get_type() const
Get type of current field.
string field
Definition: types.h:47
A class for handling time.
Definition: time.h:91
virtual ~MongoLogBlackboardThread()
Destructor.
byte field, alias for uint8
Definition: types.h:48
MongoDBConnCreator * mongodb_connmgr
Connection manager to retrieve more client connections from if necessary.
Definition: mongodb.h:52
Thread class encapsulation of pthreads.
Definition: thread.h:42
float get_float(unsigned int index=0) const
Get value of current field as float.
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:79
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
16 bit integer field
Definition: types.h:39
uint8_t get_byte(unsigned int index=0) const
Get value of current field as byte.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:45
int16_t get_int16(unsigned int index=0) const
Get value of current field as integer.
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:242
int64_t * get_int64s() const
Get value of current field as integer array.
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*")
Add interface creation type to watch list.
uint8_t * get_uint8s() const
Get value of current field as unsigned integer array.
const char * type() const
Get type of interface.
Definition: interface.cpp:651
int8_t get_int8(unsigned int index=0) const
Get value of current field as integer.
Base class for exceptions in Fawkes.
Definition: exception.h:36
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:477
int8_t * get_int8s() const
Get value of current field as integer array.
const char * get_name() const
Get name of current field.
uint8_t get_uint8(unsigned int index=0) const
Get value of current field as unsigned integer.
double * get_doubles() const
Get value of current field as double array.
virtual void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: blackboard.cpp:244
const char * name() const
Get name of thread.
Definition: thread.h:95
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:687
uint64_t get_uint64(unsigned int index=0) const
Get value of current field as unsigned integer.
64 bit unsigned integer field
Definition: types.h:44
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
const char * get_string() const
Get value of current field as string.
uint64_t * get_uint64s() const
Get value of current field as unsigned integer array.
uint32_t get_uint32(unsigned int index=0) const
Get value of current field as unsigned integer.
float field
Definition: types.h:45
bool * get_bools() const
Get value of current field as bool array.
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
Open multiple interfaces for reading.
size_t get_length() const
Get length of current field.
32 bit integer field
Definition: types.h:41
InterfaceFieldIterator fields_end()
Invalid iterator.
Definition: interface.cpp:1218
uint32_t * get_uint32s() const
Get value of current field as unsigned integer array.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
int32_t * get_int32s() const
Get value of current field as integer array.
Time & stamp()
Set this time to the current time.
Definition: time.cpp:783
The BlackBoard abstract class.
Definition: blackboard.h:48
InterfaceFieldIterator fields()
Get iterator over all fields of this interface instance.
Definition: interface.cpp:1208
virtual void bb_interface_created(const char *type, const char *id)
BlackBoard interface created notification.
boolean field
Definition: types.h:36
int32_t get_int32(unsigned int index=0) const
Get value of current field as integer.
int16_t * get_int16s() const
Get value of current field as integer array.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_map.h:146
32 bit unsigned integer field
Definition: types.h:42
field with interface specific enum type
Definition: types.h:49
8 bit integer field
Definition: types.h:37
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
BlackBoard interface listener.
double field
Definition: types.h:46
virtual void delete_client(mongo::DBClientBase *client)=0
Delete a client.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
virtual void close(Interface *interface)=0
Close interface.
Interface for logging.
Definition: logger.h:34
int64_t get_int64(unsigned int index=0) const
Get value of current field as integer.