Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * log_thread.cpp - BB Logger Thread 00004 * 00005 * Created: Sun Nov 08 00:02:09 2009 00006 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. 00014 * 00015 * This program is distributed in the hope that it will be useful, 00016 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00018 * GNU Library General Public License for more details. 00019 * 00020 * Read the full text in the LICENSE.GPL file in the doc directory. 00021 */ 00022 00023 #include "log_thread.h" 00024 #include "file.h" 00025 00026 #include <blackboard/blackboard.h> 00027 #include <logging/logger.h> 00028 #include <core/exceptions/system.h> 00029 #include <interfaces/SwitchInterface.h> 00030 00031 #include <memory> 00032 #include <cstring> 00033 #include <cstdlib> 00034 #include <cstdio> 00035 #include <cerrno> 00036 #include <fcntl.h> 00037 #ifdef __FreeBSD__ 00038 # include <sys/endian.h> 00039 #elif defined(__MACH__) && defined(__APPLE__) 00040 # include <sys/_endian.h> 00041 #else 00042 # include <endian.h> 00043 #endif 00044 #include <arpa/inet.h> 00045 #include <sys/stat.h> 00046 #include <sys/mman.h> 00047 00048 using namespace fawkes; 00049 00050 /** @class BBLoggerThread "log_thread.h" 00051 * BlackBoard logger thread. 00052 * One instance of this thread handles logging of one specific interface. 00053 * The plugin will spawn as many threads as there are interfaces to log. This 00054 * allows for maximum concurrency of the writers and avoids a serialization 00055 * bottle neck. 00056 * The log thread can operate in buffering mode. If this mode is disabled, the 00057 * data is written to the file within the blackboard data changed event, and 00058 * thus the writing operation can slow down the overall system, but memory 00059 * requirements are low. This is useful if a lot of data is written or if the 00060 * storage device is slow. If the mode is enabled, during the event the BB data 00061 * will be copied into another memory segment and the thread will be woken up. 00062 * Once the thread is running it stores all of the BB data segments bufferd 00063 * up to then. 00064 * The interface listener listens for events for a particular interface and 00065 * then writes the changes to the file. 00066 * @author Tim Niemueller 00067 */ 00068 00069 /** Constructor. 00070 * @param iface_uid interface UID which to log 00071 * @param logdir directory to store config files, must exist 00072 * @param buffering enable log buffering? 00073 * @param flushing true to flush after each written chunk 00074 * @param scenario ID of the log scenario 00075 * @param start_time time to use as start time for the log 00076 */ 00077 BBLoggerThread::BBLoggerThread(const char *iface_uid, 00078 const char *logdir, bool buffering, bool flushing, 00079 const char *scenario, fawkes::Time *start_time) 00080 : Thread("BBLoggerThread", Thread::OPMODE_WAITFORWAKEUP), 00081 BlackBoardInterfaceListener("BBLoggerThread(%s)", iface_uid) 00082 { 00083 set_coalesce_wakeups(true); 00084 set_name("BBLoggerThread(%s)", iface_uid); 00085 00086 __buffering = buffering; 00087 __flushing = flushing; 00088 __uid = strdup(iface_uid); 00089 __logdir = strdup(logdir); 00090 __scenario = strdup(scenario); 00091 __start = new Time(start_time); 00092 __filename = NULL; 00093 __queue_mutex = new Mutex(); 00094 __data_size = 0; 00095 __is_master = false; 00096 __enabled = true; 00097 00098 __now = NULL; 00099 00100 // Parse UID 00101 Interface::parse_uid(__uid, &__type, &__id); 00102 00103 char date[21]; 00104 Time now; 00105 struct tm *tmp = localtime(&(now.get_timeval()->tv_sec)); 00106 strftime(date, 21, "%F-%H-%M-%S", tmp); 00107 00108 if (asprintf(&__filename, "%s/%s-%s-%s-%s.log", LOGDIR, __scenario, 00109 __type, __id, date) == -1) { 00110 throw OutOfMemoryException("Cannot generate log name"); 00111 } 00112 } 00113 00114 00115 /** Destructor. */ 00116 BBLoggerThread::~BBLoggerThread() 00117 { 00118 free(__uid); 00119 free(__type); 00120 free(__id); 00121 free(__logdir); 00122 free(__scenario); 00123 free(__filename); 00124 delete __queue_mutex; 00125 delete __start; 00126 } 00127 00128 00129 void 00130 BBLoggerThread::init() 00131 { 00132 __queues[0].clear(); 00133 __queues[1].clear(); 00134 __act_queue = 0; 00135 00136 __queue_mutex = new Mutex(); 00137 __data_size = 0; 00138 00139 __now = NULL; 00140 __num_data_items = 0; 00141 __session_start = 0; 00142 00143 // use open because fopen does not provide O_CREAT | O_EXCL 00144 // open read/write because of usage of mmap 00145 mode_t m = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; 00146 int fd = open(__filename, O_RDWR | O_CREAT | O_EXCL, m); 00147 if ( ! fd ) { 00148 throw CouldNotOpenFileException(__filename, errno, "Failed to open log 1"); 00149 } else { 00150 __f_data = fdopen(fd, "w+"); 00151 if ( ! __f_data ) { 00152 throw CouldNotOpenFileException(__filename, errno, "Failed to open log 2"); 00153 } 00154 } 00155 00156 try { 00157 __iface = blackboard->open_for_reading(__type, __id); 00158 __data_size = __iface->datasize(); 00159 } catch (Exception &e) { 00160 fclose(__f_data); 00161 throw; 00162 } 00163 00164 try { 00165 write_header(); 00166 } catch (FileWriteException &e) { 00167 blackboard->close(__iface); 00168 fclose(__f_data); 00169 throw; 00170 } 00171 00172 __now = new Time(clock); 00173 00174 if (__is_master) { 00175 try { 00176 __switch_if = blackboard->open_for_writing<SwitchInterface>("BBLogger"); 00177 __switch_if->set_enabled(__enabled); 00178 __switch_if->write(); 00179 bbil_add_message_interface(__switch_if); 00180 } catch (Exception &e) { 00181 fclose(__f_data); 00182 throw; 00183 } 00184 } 00185 00186 bbil_add_data_interface(__iface); 00187 bbil_add_writer_interface(__iface); 00188 00189 blackboard->register_listener(this); 00190 00191 logger->log_info(name(), "Logging %s to %s%s", __iface->uid(), __filename, 00192 __is_master ? " as master" : ""); 00193 } 00194 00195 00196 void 00197 BBLoggerThread::finalize() 00198 { 00199 blackboard->unregister_listener(this); 00200 if (__is_master) { 00201 blackboard->close(__switch_if); 00202 } 00203 update_header(); 00204 fclose(__f_data); 00205 for (unsigned int q = 0; q < 2; ++q) { 00206 while (!__queues[q].empty()) { 00207 void *t = __queues[q].front(); 00208 free(t); 00209 __queues[q].pop(); 00210 } 00211 } 00212 delete __now; 00213 __now = NULL; 00214 } 00215 00216 00217 /** Get filename. 00218 * @return file name, valid after object instantiated, but before init() does not 00219 * mean that the file has been or can actually be opened 00220 */ 00221 const char * 00222 BBLoggerThread::get_filename() const 00223 { 00224 return __filename; 00225 } 00226 00227 00228 /** Enable or disable logging. 00229 * @param enabled true to enable logging, false to disable 00230 */ 00231 void 00232 BBLoggerThread::set_enabled(bool enabled) 00233 { 00234 if (enabled && !__enabled) { 00235 logger->log_info(name(), "Logging enabled", 00236 (__num_data_items - __session_start)); 00237 __session_start = __num_data_items; 00238 } else if (!enabled && __enabled) { 00239 logger->log_info(name(), "Logging disabled (wrote %u entries), flushing", 00240 (__num_data_items - __session_start)); 00241 update_header(); 00242 fflush(__f_data); 00243 } 00244 00245 __enabled = enabled; 00246 } 00247 00248 00249 /** Set threadlist and master status. 00250 * This copies the thread list and sets this thread as master thread. 00251 * If you intend to use this method you must do so before the thread is 00252 * initialized. You may only ever declare one thread as master. 00253 * @param thread_list list of threads to notify on enable/disable events 00254 */ 00255 void 00256 BBLoggerThread::set_threadlist(fawkes::ThreadList &thread_list) 00257 { 00258 __is_master = true; 00259 __threads = thread_list; 00260 } 00261 00262 void 00263 BBLoggerThread::write_header() 00264 { 00265 bblog_file_header header; 00266 memset(&header, 0, sizeof(header)); 00267 header.file_magic = htonl(BBLOGGER_FILE_MAGIC); 00268 header.file_version = htonl(BBLOGGER_FILE_VERSION); 00269 #if __BYTE_ORDER == __BIG_ENDIAN 00270 header.endianess = BBLOG_BIG_ENDIAN; 00271 #else 00272 header.endianess = BBLOG_LITTLE_ENDIAN; 00273 #endif 00274 header.num_data_items = __num_data_items; 00275 strncpy(header.scenario, (const char *)__scenario, BBLOG_SCENARIO_SIZE); 00276 strncpy(header.interface_type, __iface->type(), BBLOG_INTERFACE_TYPE_SIZE); 00277 strncpy(header.interface_id, __iface->id(), BBLOG_INTERFACE_ID_SIZE); 00278 memcpy(header.interface_hash, __iface->hash(), BBLOG_INTERFACE_HASH_SIZE); 00279 header.data_size = __iface->datasize(); 00280 long start_time_sec, start_time_usec; 00281 __start->get_timestamp(start_time_sec, start_time_usec); 00282 header.start_time_sec = start_time_sec; 00283 header.start_time_usec = start_time_usec; 00284 if (fwrite(&header, sizeof(header), 1, __f_data) != 1) { 00285 throw FileWriteException(__filename, "Failed to write header"); 00286 } 00287 fflush(__f_data); 00288 } 00289 00290 /** Updates the num_data_items field in the header. */ 00291 void 00292 BBLoggerThread::update_header() 00293 { 00294 // write updated num_data_items field 00295 #if _POSIX_MAPPED_FILES 00296 void *h = mmap(NULL, sizeof(bblog_file_header), PROT_WRITE, MAP_SHARED, 00297 fileno(__f_data), 0); 00298 if (h == MAP_FAILED) { 00299 logger->log_warn(name(), "Failed to mmap log (%s), " 00300 "not updating number of data items", 00301 strerror(errno)); 00302 } else { 00303 bblog_file_header *header = (bblog_file_header *)h; 00304 header->num_data_items = __num_data_items; 00305 munmap(h, sizeof(bblog_file_header)); 00306 } 00307 #else 00308 logger->log_warn(name(), "Memory mapped files not available, " 00309 "not updating number of data items on close"); 00310 #endif 00311 } 00312 00313 void 00314 BBLoggerThread::write_chunk(const void *chunk) 00315 { 00316 bblog_entry_header ehead; 00317 __now->stamp(); 00318 Time d = *__now - *__start; 00319 long rel_time_sec, rel_time_usec; 00320 d.get_timestamp(rel_time_sec, rel_time_usec); 00321 ehead.rel_time_sec = rel_time_sec; 00322 ehead.rel_time_usec = rel_time_usec; 00323 if ( (fwrite(&ehead, sizeof(ehead), 1, __f_data) == 1) && 00324 (fwrite(chunk, __data_size, 1, __f_data) == 1) ) { 00325 if (__flushing) fflush(__f_data); 00326 __num_data_items += 1; 00327 } else { 00328 logger->log_warn(name(), "Failed to write chunk"); 00329 } 00330 } 00331 00332 00333 void 00334 BBLoggerThread::loop() 00335 { 00336 unsigned int write_queue = __act_queue; 00337 __queue_mutex->lock(); 00338 __act_queue = 1 - __act_queue; 00339 __queue_mutex->unlock(); 00340 LockQueue<void *> &queue = __queues[write_queue]; 00341 //logger->log_debug(name(), "Writing %zu entries", queue.size()); 00342 while (! queue.empty() ) { 00343 void *c = queue.front(); 00344 write_chunk(c); 00345 free(c); 00346 queue.pop(); 00347 } 00348 } 00349 00350 bool 00351 BBLoggerThread::bb_interface_message_received(Interface *interface, 00352 Message *message) throw() 00353 { 00354 SwitchInterface::EnableSwitchMessage *enm; 00355 SwitchInterface::DisableSwitchMessage *dism; 00356 00357 bool enabled = true; 00358 if ((enm = dynamic_cast<SwitchInterface::EnableSwitchMessage *>(message)) != NULL) { 00359 enabled = true; 00360 } else if ((dism = dynamic_cast<SwitchInterface::DisableSwitchMessage *>(message)) != NULL) { 00361 enabled = false; 00362 } else { 00363 logger->log_debug(name(), "Unhandled message type: %s via %s", 00364 message->type(), interface->uid()); 00365 } 00366 00367 for (ThreadList::iterator i = __threads.begin(); i != __threads.end(); ++i) { 00368 BBLoggerThread *bblt = dynamic_cast<BBLoggerThread *>(*i); 00369 bblt->set_enabled(enabled); 00370 } 00371 00372 __switch_if->set_enabled(__enabled); 00373 __switch_if->write(); 00374 00375 return false; 00376 } 00377 00378 00379 void 00380 BBLoggerThread::bb_interface_data_changed(Interface *interface) throw() 00381 { 00382 if (!__enabled) return; 00383 00384 try { 00385 __iface->read(); 00386 00387 if ( __buffering ) { 00388 void *c = malloc(__iface->datasize()); 00389 memcpy(c, __iface->datachunk(), __iface->datasize()); 00390 __queue_mutex->lock(); 00391 __queues[__act_queue].push_locked(c); 00392 __queue_mutex->unlock(); 00393 wakeup(); 00394 } else { 00395 __queue_mutex->lock(); 00396 write_chunk(__iface->datachunk()); 00397 __queue_mutex->unlock(); 00398 } 00399 00400 } catch (Exception &e) { 00401 logger->log_error(name(), "Exception when data changed"); 00402 logger->log_error(name(), e); 00403 } 00404 } 00405 00406 void 00407 BBLoggerThread::bb_interface_writer_added(Interface *interface, 00408 unsigned int instance_serial) throw() 00409 { 00410 __session_start = __num_data_items; 00411 } 00412 00413 void 00414 BBLoggerThread::bb_interface_writer_removed(Interface *interface, 00415 unsigned int instance_serial) throw() 00416 { 00417 logger->log_info(name(), "Writer removed (wrote %u entries), flushing", 00418 (__num_data_items - __session_start)); 00419 update_header(); 00420 fflush(__f_data); 00421 }