Fawkes API  Fawkes Development Version
log_thread.cpp
1 
2 /***************************************************************************
3  * log_thread.cpp - BB Logger Thread
4  *
5  * Created: Sun Nov 08 00:02:09 2009
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "log_thread.h"
24 #include "file.h"
25 
26 #include <blackboard/blackboard.h>
27 #include <logging/logger.h>
28 #include <core/exceptions/system.h>
29 #include <interfaces/SwitchInterface.h>
30 
31 #include <memory>
32 #include <cstring>
33 #include <cstdlib>
34 #include <cstdio>
35 #include <cerrno>
36 #include <fcntl.h>
37 #ifdef __FreeBSD__
38 # include <sys/endian.h>
39 #elif defined(__MACH__) && defined(__APPLE__)
40 # include <sys/_endian.h>
41 #else
42 # include <endian.h>
43 #endif
44 #include <arpa/inet.h>
45 #include <sys/stat.h>
46 #include <sys/mman.h>
47 
48 using namespace fawkes;
49 
50 /** @class BBLoggerThread "log_thread.h"
51  * BlackBoard logger thread.
52  * One instance of this thread handles logging of one specific interface.
53  * The plugin will spawn as many threads as there are interfaces to log. This
54  * allows for maximum concurrency of the writers and avoids a serialization
55  * bottle neck.
56  * The log thread can operate in buffering mode. If this mode is disabled, the
57  * data is written to the file within the blackboard data changed event, and
58  * thus the writing operation can slow down the overall system, but memory
59  * requirements are low. This is useful if a lot of data is written or if the
60  * storage device is slow. If the mode is enabled, during the event the BB data
61  * will be copied into another memory segment and the thread will be woken up.
62  * Once the thread is running it stores all of the BB data segments bufferd
63  * up to then.
64  * The interface listener listens for events for a particular interface and
65  * then writes the changes to the file.
66  * @author Tim Niemueller
67  */
68 
69 /** Constructor.
70  * @param iface_uid interface UID which to log
71  * @param logdir directory to store config files, must exist
72  * @param buffering enable log buffering?
73  * @param flushing true to flush after each written chunk
74  * @param scenario ID of the log scenario
75  * @param start_time time to use as start time for the log
76  */
77 BBLoggerThread::BBLoggerThread(const char *iface_uid,
78  const char *logdir, bool buffering, bool flushing,
79  const char *scenario, fawkes::Time *start_time)
80  : Thread("BBLoggerThread", Thread::OPMODE_WAITFORWAKEUP),
81  BlackBoardInterfaceListener("BBLoggerThread(%s)", iface_uid)
82 {
84  set_name("BBLoggerThread(%s)", iface_uid);
85 
86  __buffering = buffering;
87  __flushing = flushing;
88  __uid = strdup(iface_uid);
89  __logdir = strdup(logdir);
90  __scenario = strdup(scenario);
91  __start = new Time(start_time);
92  __filename = NULL;
93  __queue_mutex = new Mutex();
94  __data_size = 0;
95  __is_master = false;
96  __enabled = true;
97 
98  __now = NULL;
99 
100  // Parse UID
101  Interface::parse_uid(__uid, __type, __id);
102 
103  char date[21];
104  Time now;
105  struct tm *tmp = localtime(&(now.get_timeval()->tv_sec));
106  strftime(date, 21, "%F-%H-%M-%S", tmp);
107 
108  if (asprintf(&__filename, "%s/%s-%s-%s-%s.log", LOGDIR, __scenario,
109  __type.c_str(), __id.c_str(), date) == -1) {
110  throw OutOfMemoryException("Cannot generate log name");
111  }
112 }
113 
114 
115 /** Destructor. */
117 {
118  free(__uid);
119  free(__logdir);
120  free(__scenario);
121  free(__filename);
122  delete __queue_mutex;
123  delete __start;
124 }
125 
126 
127 void
129 {
130  __queues[0].clear();
131  __queues[1].clear();
132  __act_queue = 0;
133 
134  __queue_mutex = new Mutex();
135  __data_size = 0;
136 
137  __now = NULL;
138  __num_data_items = 0;
139  __session_start = 0;
140 
141  // use open because fopen does not provide O_CREAT | O_EXCL
142  // open read/write because of usage of mmap
143  mode_t m = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
144  int fd = open(__filename, O_RDWR | O_CREAT | O_EXCL, m);
145  if ( ! fd ) {
146  throw CouldNotOpenFileException(__filename, errno, "Failed to open log 1");
147  } else {
148  __f_data = fdopen(fd, "w+");
149  if ( ! __f_data ) {
150  throw CouldNotOpenFileException(__filename, errno, "Failed to open log 2");
151  }
152  }
153 
154  try {
155  __iface = blackboard->open_for_reading(__type.c_str(), __id.c_str());
156  __data_size = __iface->datasize();
157  } catch (Exception &e) {
158  fclose(__f_data);
159  throw;
160  }
161 
162  try {
163  write_header();
164  } catch (FileWriteException &e) {
165  blackboard->close(__iface);
166  fclose(__f_data);
167  throw;
168  }
169 
170  __now = new Time(clock);
171 
172  if (__is_master) {
173  try {
174  __switch_if = blackboard->open_for_writing<SwitchInterface>("BBLogger");
175  __switch_if->set_enabled(__enabled);
176  __switch_if->write();
177  bbil_add_message_interface(__switch_if);
178  } catch (Exception &e) {
179  fclose(__f_data);
180  throw;
181  }
182  }
183 
184  bbil_add_data_interface(__iface);
185  bbil_add_writer_interface(__iface);
186 
188 
189  logger->log_info(name(), "Logging %s to %s%s", __iface->uid(), __filename,
190  __is_master ? " as master" : "");
191 }
192 
193 
194 void
196 {
198  if (__is_master) {
199  blackboard->close(__switch_if);
200  }
201  update_header();
202  fclose(__f_data);
203  for (unsigned int q = 0; q < 2; ++q) {
204  while (!__queues[q].empty()) {
205  void *t = __queues[q].front();
206  free(t);
207  __queues[q].pop();
208  }
209  }
210  delete __now;
211  __now = NULL;
212 }
213 
214 
215 /** Get filename.
216  * @return file name, valid after object instantiated, but before init() does not
217  * mean that the file has been or can actually be opened
218  */
219 const char *
221 {
222  return __filename;
223 }
224 
225 
226 /** Enable or disable logging.
227  * @param enabled true to enable logging, false to disable
228  */
229 void
231 {
232  if (enabled && !__enabled) {
233  logger->log_info(name(), "Logging enabled",
234  (__num_data_items - __session_start));
235  __session_start = __num_data_items;
236  } else if (!enabled && __enabled) {
237  logger->log_info(name(), "Logging disabled (wrote %u entries), flushing",
238  (__num_data_items - __session_start));
239  update_header();
240  fflush(__f_data);
241  }
242 
243  __enabled = enabled;
244 }
245 
246 
247 /** Set threadlist and master status.
248  * This copies the thread list and sets this thread as master thread.
249  * If you intend to use this method you must do so before the thread is
250  * initialized. You may only ever declare one thread as master.
251  * @param thread_list list of threads to notify on enable/disable events
252  */
253 void
255 {
256  __is_master = true;
257  __threads = thread_list;
258 }
259 
260 void
261 BBLoggerThread::write_header()
262 {
263  bblog_file_header header;
264  memset(&header, 0, sizeof(header));
265  header.file_magic = htonl(BBLOGGER_FILE_MAGIC);
266  header.file_version = htonl(BBLOGGER_FILE_VERSION);
267 #if __BYTE_ORDER == __BIG_ENDIAN
268  header.endianess = BBLOG_BIG_ENDIAN;
269 #else
270  header.endianess = BBLOG_LITTLE_ENDIAN;
271 #endif
272  header.num_data_items = __num_data_items;
273  strncpy(header.scenario, (const char *)__scenario, BBLOG_SCENARIO_SIZE);
274  strncpy(header.interface_type, __iface->type(), BBLOG_INTERFACE_TYPE_SIZE);
275  strncpy(header.interface_id, __iface->id(), BBLOG_INTERFACE_ID_SIZE);
276  memcpy(header.interface_hash, __iface->hash(), BBLOG_INTERFACE_HASH_SIZE);
277  header.data_size = __iface->datasize();
278  long start_time_sec, start_time_usec;
279  __start->get_timestamp(start_time_sec, start_time_usec);
280  header.start_time_sec = start_time_sec;
281  header.start_time_usec = start_time_usec;
282  if (fwrite(&header, sizeof(header), 1, __f_data) != 1) {
283  throw FileWriteException(__filename, "Failed to write header");
284  }
285  fflush(__f_data);
286 }
287 
288 /** Updates the num_data_items field in the header. */
289 void
290 BBLoggerThread::update_header()
291 {
292  // write updated num_data_items field
293 #if _POSIX_MAPPED_FILES
294  void *h = mmap(NULL, sizeof(bblog_file_header), PROT_WRITE, MAP_SHARED,
295  fileno(__f_data), 0);
296  if (h == MAP_FAILED) {
297  logger->log_warn(name(), "Failed to mmap log (%s), "
298  "not updating number of data items",
299  strerror(errno));
300  } else {
301  bblog_file_header *header = (bblog_file_header *)h;
302  header->num_data_items = __num_data_items;
303  munmap(h, sizeof(bblog_file_header));
304  }
305 #else
306  logger->log_warn(name(), "Memory mapped files not available, "
307  "not updating number of data items on close");
308 #endif
309 }
310 
311 void
312 BBLoggerThread::write_chunk(const void *chunk)
313 {
314  bblog_entry_header ehead;
315  __now->stamp();
316  Time d = *__now - *__start;
317  long rel_time_sec, rel_time_usec;
318  d.get_timestamp(rel_time_sec, rel_time_usec);
319  ehead.rel_time_sec = rel_time_sec;
320  ehead.rel_time_usec = rel_time_usec;
321  if ( (fwrite(&ehead, sizeof(ehead), 1, __f_data) == 1) &&
322  (fwrite(chunk, __data_size, 1, __f_data) == 1) ) {
323  if (__flushing) fflush(__f_data);
324  __num_data_items += 1;
325  } else {
326  logger->log_warn(name(), "Failed to write chunk");
327  }
328 }
329 
330 
331 void
333 {
334  unsigned int write_queue = __act_queue;
335  __queue_mutex->lock();
336  __act_queue = 1 - __act_queue;
337  __queue_mutex->unlock();
338  LockQueue<void *> &queue = __queues[write_queue];
339  //logger->log_debug(name(), "Writing %zu entries", queue.size());
340  while (! queue.empty() ) {
341  void *c = queue.front();
342  write_chunk(c);
343  free(c);
344  queue.pop();
345  }
346 }
347 
348 bool
350  Message *message) throw()
351 {
354 
355  bool enabled = true;
356  if ((enm = dynamic_cast<SwitchInterface::EnableSwitchMessage *>(message)) != NULL) {
357  enabled = true;
358  } else if ((dism = dynamic_cast<SwitchInterface::DisableSwitchMessage *>(message)) != NULL) {
359  enabled = false;
360  } else {
361  logger->log_debug(name(), "Unhandled message type: %s via %s",
362  message->type(), interface->uid());
363  }
364 
365  for (ThreadList::iterator i = __threads.begin(); i != __threads.end(); ++i) {
366  BBLoggerThread *bblt = dynamic_cast<BBLoggerThread *>(*i);
367  bblt->set_enabled(enabled);
368  }
369 
370  __switch_if->set_enabled(__enabled);
371  __switch_if->write();
372 
373  return false;
374 }
375 
376 
377 void
379 {
380  if (!__enabled) return;
381 
382  try {
383  __iface->read();
384 
385  if ( __buffering ) {
386  void *c = malloc(__iface->datasize());
387  memcpy(c, __iface->datachunk(), __iface->datasize());
388  __queue_mutex->lock();
389  __queues[__act_queue].push_locked(c);
390  __queue_mutex->unlock();
391  wakeup();
392  } else {
393  __queue_mutex->lock();
394  write_chunk(__iface->datachunk());
395  __queue_mutex->unlock();
396  }
397 
398  } catch (Exception &e) {
399  logger->log_error(name(), "Exception when data changed");
400  logger->log_error(name(), e);
401  }
402 }
403 
404 void
406  unsigned int instance_serial) throw()
407 {
408  __session_start = __num_data_items;
409 }
410 
411 void
413  unsigned int instance_serial) throw()
414 {
415  logger->log_info(name(), "Writer removed (wrote %u entries), flushing",
416  (__num_data_items - __session_start));
417  update_header();
418  fflush(__f_data);
419 }
uint32_t rel_time_sec
time since start time, seconds
Definition: file.h:76
uint64_t start_time_sec
Start time, timestamp seconds.
Definition: file.h:68
File could not be opened.
Definition: system.h:53
char interface_id[BBLOG_INTERFACE_ID_SIZE]
Interface ID.
Definition: file.h:65
virtual bool bb_interface_message_received(fawkes::Interface *interface, fawkes::Message *message)
BlackBoard message received notification.
Definition: log_thread.cpp:349
BBLogger file header definition.
Definition: file.h:53
void clear()
Clear the queue.
Definition: lock_queue.h:158
unsigned int datasize() const
Get data size.
Definition: interface.cpp:534
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Definition: message.h:44
const timeval * get_timeval() const
Obtain the timeval where the time is stored.
Definition: time.h:109
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
void get_timestamp(long &sec, long &usec) const
Get time stamp.
Definition: time.h:114
const char * get_filename() const
Get filename.
Definition: log_thread.cpp:220
char scenario[BBLOG_SCENARIO_SIZE]
Scenario as defined in config.
Definition: file.h:62
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
void bbil_add_writer_interface(Interface *interface)
Add an interface to the writer addition/removal watch list.
const char * id() const
Get identifier of interface.
Definition: interface.cpp:661
virtual void finalize()
Finalize the thread.
Definition: log_thread.cpp:195
A class for handling time.
Definition: time.h:91
virtual void init()
Initialize the thread.
Definition: log_thread.cpp:128
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:218
Thread class encapsulation of pthreads.
Definition: thread.h:42
uint32_t endianess
Endianess, 0 little endian, 1 big endian.
Definition: file.h:58
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:500
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:79
virtual ~BBLoggerThread()
Destructor.
Definition: log_thread.cpp:116
BlackBoard logger thread.
Definition: log_thread.h:46
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
virtual void bb_interface_writer_added(fawkes::Interface *interface, unsigned int instance_serial)
A writing instance has been opened for a watched interface.
Definition: log_thread.cpp:405
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:45
const unsigned char * hash() const
Get interface hash.
Definition: interface.cpp:294
virtual void bb_interface_data_changed(fawkes::Interface *interface)
BlackBoard data changed notification.
Definition: log_thread.cpp:378
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:190
List of threads.
Definition: thread_list.h:57
SwitchInterface Fawkes BlackBoard Interface.
unsigned char interface_hash[BBLOG_INTERFACE_HASH_SIZE]
Interface Hash.
Definition: file.h:66
void wakeup()
Wake up thread.
Definition: thread.cpp:1000
const char * type() const
Get type of interface.
Definition: interface.cpp:651
uint32_t file_magic
Magic value to identify file, must be 0xFFBBFFBB (big endian)
Definition: file.h:54
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:761
Base class for exceptions in Fawkes.
Definition: exception.h:36
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:477
virtual void bb_interface_writer_removed(fawkes::Interface *interface, unsigned int instance_serial)
A writing instance has been closed for a watched interface.
Definition: log_thread.cpp:412
void set_enabled(const bool new_enabled)
Set enabled value.
Could not write to file.
Definition: system.h:68
BBLogger entry header.
Definition: file.h:75
DisableSwitchMessage Fawkes BlackBoard Interface Message.
const char * name() const
Get name of thread.
Definition: thread.h:95
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:687
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
uint32_t num_data_items
Number of data items in file, if set to zero reader must scan the file for this number.
Definition: file.h:60
uint32_t data_size
size of one interface data block
Definition: file.h:67
void set_coalesce_wakeups(bool coalesce=true)
Set wakeup coalescing.
Definition: thread.cpp:741
void set_enabled(bool enabled)
Enable or disable logging.
Definition: log_thread.cpp:230
Queue with a lock.
Definition: lock_queue.h:43
virtual void loop()
Code to execute in the thread.
Definition: log_thread.cpp:332
BBLoggerThread(const char *iface_uid, const char *logdir, bool buffering, bool flushing, const char *scenario, fawkes::Time *start_time)
Constructor.
Definition: log_thread.cpp:77
void set_threadlist(fawkes::ThreadList &thread_list)
Set threadlist and master status.
Definition: log_thread.cpp:254
EnableSwitchMessage Fawkes BlackBoard Interface Message.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
void push_locked(const Type &x)
Push element to queue with lock protection.
Definition: lock_queue.h:139
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
uint32_t rel_time_usec
time since start time, microseconds
Definition: file.h:77
uint64_t start_time_usec
Start time, timestamp microseconds.
Definition: file.h:69
void lock()
Lock this mutex.
Definition: mutex.cpp:89
Time & stamp()
Set this time to the current time.
Definition: time.cpp:783
const void * datachunk() const
Get data chunk.
Definition: interface.cpp:430
Mutex mutual exclusion lock.
Definition: mutex.h:32
uint32_t file_version
File version, set to BBLOGGER_FILE_VERSION on write and verify on read (big endian) ...
Definition: file.h:56
void bbil_add_message_interface(Interface *interface)
Add an interface to the message received watch list.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
char interface_type[BBLOG_INTERFACE_TYPE_SIZE]
Interface type.
Definition: file.h:64
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
BlackBoard interface listener.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
void bbil_add_data_interface(Interface *interface)
Add an interface to the data modification watch list.
virtual void close(Interface *interface)=0
Close interface.