23 #include "mongodb_log_bb_thread.h" 25 #include <core/threading/mutex_locker.h> 26 #include <plugins/mongodb/aspect/mongodb_conncreator.h> 30 #include <mongo/client/dbclient.h> 34 using namespace mongo;
48 :
Thread(
"MongoLogBlackboardThread",
Thread::OPMODE_WAITFORWAKEUP)
71 std::vector<std::string> includes;
79 if (includes.empty()) {
80 includes.push_back(
"*");
83 std::vector<std::string>::iterator i;
84 std::vector<std::string>::iterator e;
85 for (i = includes.begin(); i != includes.end(); ++i) {
88 std::list<Interface *> current_interfaces =
91 std::list<Interface *>::iterator i;
92 for (i = current_interfaces.begin(); i != current_interfaces.end(); ++i) {
94 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
95 if (fnmatch(e->c_str(), (*i)->id(), 0) != FNM_NOMATCH) {
102 if (exclude)
continue;
106 listeners_[(*i)->uid()] =
new InterfaceListener(
blackboard, *i, mc, database_,
107 collections_,
logger, now_);
120 std::map<std::string, InterfaceListener *>::iterator i;
121 for (i = listeners_.begin(); i != listeners_.end(); ++i) {
122 mongo::DBClientBase *mc = i->second->mongodb_client();
141 std::vector<std::string>::iterator e;
142 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
143 if (fnmatch(e->c_str(), id, 0) != FNM_NOMATCH) {
151 if (listeners_.find(interface->uid()) == listeners_.end()) {
154 listeners_[interface->uid()] =
new InterfaceListener(
blackboard, interface, mc,
155 database_, collections_,
182 mongo::DBClientBase *mongodb,
183 std::string &database,
187 database_(database), collections_(colls)
190 interface_ = interface;
196 std::string
id = interface->
id();
198 while((pos =
id.find_first_of(
" -", pos)) != std::string::npos) {
199 id.replace(pos, 1,
"_");
202 collection_ = database_ +
"." + interface->
type() +
"." + id;
203 if (collections_.find(collection_) != collections_.end()) {
204 throw Exception(
"Collection named %s already used, cannot log %s",
205 collection_.c_str(), interface->
uid());
208 bbil_add_data_interface(interface);
209 blackboard_->register_listener(
this, BlackBoard::BBIL_FLAG_DATA);
214 MongoLogBlackboardThread::InterfaceListener::~InterfaceListener()
216 blackboard_->unregister_listener(
this);
220 MongoLogBlackboardThread::InterfaceListener::bb_interface_data_changed(
Interface *interface)
228 BSONObjBuilder document;
229 document.append(
"timestamp", (
long long) now_->
in_msec());
233 bool is_array = (length > 1);
239 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
240 for (
size_t l = 0; l < length; ++l) {
241 subb.append(bools[l]);
252 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
253 for (
size_t l = 0; l < length; ++l) {
254 subb.append(ints[l]);
265 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
266 for (
size_t l = 0; l < length; ++l) {
267 subb.append(ints[l]);
278 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
279 for (
size_t l = 0; l < length; ++l) {
280 subb.append(ints[l]);
291 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
292 for (
size_t l = 0; l < length; ++l) {
293 subb.append(ints[l]);
304 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
305 for (
size_t l = 0; l < length; ++l) {
306 subb.append(ints[l]);
317 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
318 for (
size_t l = 0; l < length; ++l) {
319 subb.append(ints[l]);
330 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
331 for (
size_t l = 0; l < length; ++l) {
332 subb.append((
long long int)ints[l]);
343 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
344 for (
size_t l = 0; l < length; ++l) {
345 subb.append((
long long int)ints[l]);
356 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
357 for (
size_t l = 0; l < length; ++l) {
358 subb.append(floats[l]);
369 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
370 for (
size_t l = 0; l < length; ++l) {
371 subb.append(doubles[l]);
385 document.appendBinData(i.
get_name(), length,
395 BSONArrayBuilder subb(document.subarrayStart(i.
get_name()));
396 for (
size_t l = 0; l < length; ++l) {
397 subb.append(ints[l]);
407 mongodb_->insert(collection_, document.obj());
408 }
catch (mongo::DBException &e) {
409 logger_->log_warn(bbil_name(),
"Failed to log to %s: %s",
410 collection_.c_str(), e.what());
411 }
catch (std::exception &e) {
412 logger_->log_warn(bbil_name(),
"Failed to log to %s: %s (*)",
413 collection_.c_str(), e.what());
Interface field iterator.
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
virtual mongo::DBClientBase * create_client(const char *config_name=0)=0
Create a new MongoDB client.
uint8_t * get_bytes() const
Get value of current field as byte array.
uint16_t get_uint16(unsigned int index=0) const
Get value of current field as unsigned integer.
int32_t * get_enums() const
Get value of current enum field as integer array.
int32_t get_enum(unsigned int index=0) const
Get value of current enum field as integer.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
double get_double(unsigned int index=0) const
Get value of current field as double.
uint16_t * get_uint16s() const
Get value of current field as unsigned integer array.
Fawkes library namespace.
bool get_bool(unsigned int index=0) const
Get value of current field as bool.
8 bit unsigned integer field
float * get_floats() const
Get value of current field as float array.
16 bit unsigned integer field
const char * id() const
Get identifier of interface.
virtual void loop()
Code to execute in the thread.
interface_fieldtype_t get_type() const
Get type of current field.
A class for handling time.
virtual ~MongoLogBlackboardThread()
Destructor.
byte field, alias for uint8
MongoDBConnCreator * mongodb_connmgr
Connection manager to retrieve more client connections from if necessary.
Thread class encapsulation of pthreads.
float get_float(unsigned int index=0) const
Get value of current field as float.
Base class for all Fawkes BlackBoard interfaces.
Logger * logger
This is the Logger member used to access the logger.
uint8_t get_byte(unsigned int index=0) const
Get value of current field as byte.
Clock * clock
By means of this member access to the clock is given.
int16_t get_int16(unsigned int index=0) const
Get value of current field as integer.
long in_msec() const
Convert the stored time into milli-seconds.
int64_t * get_int64s() const
Get value of current field as integer array.
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*")
Add interface creation type to watch list.
uint8_t * get_uint8s() const
Get value of current field as unsigned integer array.
const char * type() const
Get type of interface.
int8_t get_int8(unsigned int index=0) const
Get value of current field as integer.
Base class for exceptions in Fawkes.
void read()
Read from BlackBoard into local copy.
int8_t * get_int8s() const
Get value of current field as integer array.
const char * get_name() const
Get name of current field.
uint8_t get_uint8(unsigned int index=0) const
Get value of current field as unsigned integer.
double * get_doubles() const
Get value of current field as double array.
virtual void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
const char * name() const
Get name of thread.
const char * uid() const
Get unique identifier of interface.
uint64_t get_uint64(unsigned int index=0) const
Get value of current field as unsigned integer.
64 bit unsigned integer field
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
const char * get_string() const
Get value of current field as string.
uint64_t * get_uint64s() const
Get value of current field as unsigned integer array.
uint32_t get_uint32(unsigned int index=0) const
Get value of current field as unsigned integer.
bool * get_bools() const
Get value of current field as bool array.
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
Open multiple interfaces for reading.
size_t get_length() const
Get length of current field.
InterfaceFieldIterator fields_end()
Invalid iterator.
uint32_t * get_uint32s() const
Get value of current field as unsigned integer array.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
int32_t * get_int32s() const
Get value of current field as integer array.
Time & stamp()
Set this time to the current time.
The BlackBoard abstract class.
InterfaceFieldIterator fields()
Get iterator over all fields of this interface instance.
virtual void bb_interface_created(const char *type, const char *id)
BlackBoard interface created notification.
MongoLogBlackboardThread()
Constructor.
int32_t get_int32(unsigned int index=0) const
Get value of current field as integer.
int16_t * get_int16s() const
Get value of current field as integer array.
Configuration * config
This is the Configuration member used to access the configuration.
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
32 bit unsigned integer field
field with interface specific enum type
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
BlackBoard interface listener.
virtual void delete_client(mongo::DBClientBase *client)=0
Delete a client.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
virtual void close(Interface *interface)=0
Close interface.
int64_t get_int64(unsigned int index=0) const
Get value of current field as integer.