Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * logreplay_thread.cpp - BB Log Replay Thread 00004 * 00005 * Created: Wed Feb 17 01:53:00 2010 00006 * Copyright 2010 Tim Niemueller [www.niemueller.de] 00007 * 2010 Masrur Doostdar <doostdar@kbsg.rwth-aachen.de> 00008 * 00009 ****************************************************************************/ 00010 00011 /* This program is free software; you can redistribute it and/or modify 00012 * it under the terms of the GNU General Public License as published by 00013 * the Free Software Foundation; either version 2 of the License, or 00014 * (at your option) any later version. 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL file in the doc directory. 00022 */ 00023 00024 #include "logreplay_thread.h" 00025 #include "file.h" 00026 00027 #include <blackboard/blackboard.h> 00028 #include <logging/logger.h> 00029 #include <core/threading/wait_condition.h> 00030 #include <core/exceptions/system.h> 00031 #include <utils/misc/autofree.h> 00032 00033 #include <blackboard/internal/instance_factory.h> 00034 00035 00036 #include <memory> 00037 #include <cstring> 00038 #include <cstdlib> 00039 #include <cstdio> 00040 #include <cerrno> 00041 #include <fcntl.h> 00042 #ifdef __FreeBSD__ 00043 # include <sys/endian.h> 00044 #elif defined(__MACH__) && defined(__APPLE__) 00045 # include <sys/_endian.h> 00046 #else 00047 # include <endian.h> 00048 #endif 00049 #include <arpa/inet.h> 00050 #include <sys/mman.h> 00051 00052 using namespace fawkes; 00053 00054 /** @class BBLogReplayThread "logreplay_thread.h" 00055 * BlackBoard log Replay thread. 00056 * Writes the data of the logfile into a blackboard interface, considering the 00057 * time-step differences between the data. 00058 * @author Masrur Doostdar 00059 * @author Tim Niemueller 00060 */ 00061 00062 /** Constructor. 00063 * @param logfile_name filename of the log to be replayed 00064 * @param logdir directory containing the logfile 00065 * @param scenario ID of the log scenario 00066 * @param grace_period time in seconds that desired offset and loop offset may 00067 * diverge to still write the new data 00068 * @param loop_replay specifies if the replay should be looped 00069 * @param non_blocking do not block the main loop if not enough time has elapsed 00070 * to replay new data but just wait for the next cycle. This is ignored in 00071 * continuous thread mode as it could cause busy waiting. 00072 * @param thread_name initial thread name 00073 * @param th_opmode thread operation mode 00074 */ 00075 BBLogReplayThread::BBLogReplayThread(const char *logfile_name, 00076 const char *logdir, 00077 const char *scenario, 00078 float grace_period, 00079 bool loop_replay, 00080 bool non_blocking, 00081 const char *thread_name, 00082 fawkes::Thread::OpMode th_opmode) 00083 : Thread(thread_name, th_opmode) 00084 { 00085 set_name("BBLogReplayThread(%s)", logfile_name); 00086 set_prepfin_conc_loop(true); 00087 00088 __logfile_name= strdup(logfile_name); 00089 __logdir = strdup(logdir); 00090 __scenario = strdup(scenario); // dont need this!? 00091 __filename = NULL; 00092 __cfg_grace_period = grace_period; 00093 __cfg_loop_replay = loop_replay; 00094 if (th_opmode == OPMODE_WAITFORWAKEUP) { 00095 __cfg_non_blocking = non_blocking; 00096 } else { 00097 // would cause busy waiting 00098 __cfg_non_blocking = false; 00099 } 00100 } 00101 00102 00103 /** Destructor. */ 00104 BBLogReplayThread::~BBLogReplayThread() 00105 { 00106 free(__logfile_name); 00107 free(__logdir); 00108 free(__scenario); 00109 } 00110 00111 00112 00113 00114 void 00115 BBLogReplayThread::init() 00116 { 00117 __logfile = NULL; 00118 __interface = NULL; 00119 __filename = NULL; 00120 00121 if (asprintf(&__filename, "%s/%s", __logdir, __logfile_name) == -1) { 00122 throw OutOfMemoryException("Cannot re-generate logfile-path"); 00123 } 00124 00125 try { 00126 __logfile = new BBLogFile(__filename, true); 00127 } catch (Exception &e) { 00128 finalize(); 00129 throw; 00130 } 00131 00132 if (! __logfile->has_next()) { 00133 finalize(); 00134 throw Exception("Log file %s does not have any entries", __filename); 00135 } 00136 00137 __interface = blackboard->open_for_writing(__logfile->interface_type(), 00138 __logfile->interface_id()); 00139 00140 try { 00141 __logfile->set_interface(__interface); 00142 } catch (Exception &e) { 00143 finalize(); 00144 throw; 00145 } 00146 00147 logger->log_info(name(), "Replaying from %s:", __filename); 00148 } 00149 00150 00151 void 00152 BBLogReplayThread::finalize() 00153 { 00154 delete __logfile; 00155 if (__filename) free(__filename); 00156 blackboard->close(__interface); 00157 } 00158 00159 00160 void 00161 BBLogReplayThread::once() 00162 { 00163 // Write first immediately, skip first offset 00164 __logfile->read_next(); 00165 __interface->write(); 00166 __last_offset = __logfile->entry_offset(); 00167 if (__logfile->has_next()) { 00168 __logfile->read_next(); 00169 __offsetdiff = __logfile->entry_offset() - __last_offset; 00170 __last_offset = __logfile->entry_offset(); 00171 } 00172 __last_loop.stamp(); 00173 } 00174 00175 void 00176 BBLogReplayThread::loop() 00177 { 00178 if (__logfile->has_next()) { 00179 00180 // check if there is time left to wait 00181 __now.stamp(); 00182 __loopdiff = __now - __last_loop; 00183 if ((__offsetdiff.in_sec() - __loopdiff.in_sec()) > __cfg_grace_period) { 00184 if (__cfg_non_blocking) { 00185 // need to keep waiting before posting, but in non-blocking mode 00186 // just wait for next loop 00187 return; 00188 } else { 00189 __waittime = __offsetdiff - __loopdiff; 00190 __waittime.wait(); 00191 } 00192 } 00193 00194 __interface->write(); 00195 __logfile->read_next(); 00196 00197 __last_loop.stamp(); 00198 __offsetdiff = __logfile->entry_offset() - __last_offset; 00199 __last_offset = __logfile->entry_offset(); 00200 00201 } else { 00202 if(__cfg_loop_replay){ 00203 logger->log_info(name(), "replay finished, looping"); 00204 __logfile->rewind(); 00205 } else { 00206 if (opmode() == OPMODE_CONTINUOUS) { 00207 // block 00208 logger->log_info(name(), "replay finished, sleeping"); 00209 WaitCondition waitcond; 00210 waitcond.wait(); 00211 } // else wait will just run once per loop 00212 } 00213 } 00214 }