Fawkes API  Fawkes Development Version
logreplay_thread.cpp
00001 
00002 /***************************************************************************
00003  *  logreplay_thread.cpp - BB Log Replay Thread
00004  *
00005  *  Created: Wed Feb 17 01:53:00 2010
00006  *  Copyright  2010  Tim Niemueller [www.niemueller.de]
00007  *             2010  Masrur Doostdar <doostdar@kbsg.rwth-aachen.de>
00008  *
00009  ****************************************************************************/
00010 
00011 /*  This program is free software; you can redistribute it and/or modify
00012  *  it under the terms of the GNU General Public License as published by
00013  *  the Free Software Foundation; either version 2 of the License, or
00014  *  (at your option) any later version.
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL file in the doc directory.
00022  */
00023 
00024 #include "logreplay_thread.h"
00025 #include "file.h"
00026 
00027 #include <blackboard/blackboard.h>
00028 #include <logging/logger.h>
00029 #include <core/threading/wait_condition.h>
00030 #include <core/exceptions/system.h>
00031 #include <utils/misc/autofree.h>
00032 
00033 #include <blackboard/internal/instance_factory.h>
00034 
00035 
00036 #include <memory>
00037 #include <cstring>
00038 #include <cstdlib>
00039 #include <cstdio>
00040 #include <cerrno>
00041 #include <fcntl.h>
00042 #ifdef __FreeBSD__
00043 #  include <sys/endian.h>
00044 #elif defined(__MACH__) && defined(__APPLE__)
00045 #  include <sys/_endian.h>
00046 #else
00047 #  include <endian.h>
00048 #endif
00049 #include <arpa/inet.h>
00050 #include <sys/mman.h>
00051 
00052 using namespace fawkes;
00053 
00054 /** @class BBLogReplayThread "logreplay_thread.h"
00055  * BlackBoard log Replay thread.
00056  * Writes the data of the logfile into a blackboard interface, considering the
00057  * time-step differences between the data.
00058  * @author Masrur Doostdar
00059  * @author Tim Niemueller
00060  */
00061 
00062 /** Constructor.
00063  * @param logfile_name filename of the log to be replayed
00064  * @param logdir directory containing the logfile
00065  * @param scenario ID of the log scenario
00066  * @param grace_period time in seconds that desired offset and loop offset may
00067  * diverge to still write the new data
00068  * @param loop_replay specifies if the replay should be looped
00069  * @param non_blocking do not block the main loop if not enough time has elapsed
00070  * to replay new data but just wait for the next cycle. This is ignored in
00071  * continuous thread mode as it could cause busy waiting.
00072  * @param thread_name initial thread name
00073  * @param th_opmode thread operation mode
00074  */
00075 BBLogReplayThread::BBLogReplayThread(const char *logfile_name,
00076                                      const char *logdir,
00077                                      const char *scenario,
00078                                      float grace_period,
00079                                      bool loop_replay,
00080                                      bool non_blocking,
00081                                      const char *thread_name,
00082                                      fawkes::Thread::OpMode th_opmode)
00083   : Thread(thread_name, th_opmode)
00084 {
00085   set_name("BBLogReplayThread(%s)", logfile_name);
00086   set_prepfin_conc_loop(true);
00087 
00088   __logfile_name= strdup(logfile_name);
00089   __logdir      = strdup(logdir);
00090   __scenario    = strdup(scenario); // dont need this!?
00091   __filename    = NULL;
00092   __cfg_grace_period = grace_period;
00093   __cfg_loop_replay  = loop_replay;
00094   if (th_opmode == OPMODE_WAITFORWAKEUP) {
00095     __cfg_non_blocking = non_blocking;
00096   } else {
00097     // would cause busy waiting
00098     __cfg_non_blocking = false;
00099   }
00100 }
00101 
00102 
00103 /** Destructor. */
00104 BBLogReplayThread::~BBLogReplayThread()
00105 {
00106   free(__logfile_name);
00107   free(__logdir);
00108   free(__scenario);
00109 }
00110 
00111 
00112 
00113 
00114 void
00115 BBLogReplayThread::init()
00116 {
00117   __logfile = NULL;
00118   __interface = NULL;
00119   __filename = NULL;
00120 
00121   if (asprintf(&__filename, "%s/%s", __logdir, __logfile_name) == -1) {
00122     throw OutOfMemoryException("Cannot re-generate logfile-path");
00123   }
00124 
00125   try {
00126     __logfile = new BBLogFile(__filename, true);
00127   } catch (Exception &e) {
00128     finalize();
00129     throw;
00130   }
00131 
00132   if (! __logfile->has_next()) {
00133     finalize();
00134     throw Exception("Log file %s does not have any entries", __filename);
00135   }
00136 
00137   __interface = blackboard->open_for_writing(__logfile->interface_type(),
00138                                              __logfile->interface_id());
00139 
00140   try {
00141     __logfile->set_interface(__interface);
00142   } catch (Exception &e) {
00143     finalize();
00144     throw;
00145   }
00146 
00147   logger->log_info(name(), "Replaying from %s:", __filename);
00148 }
00149 
00150 
00151 void
00152 BBLogReplayThread::finalize()
00153 {
00154   delete __logfile;
00155   if (__filename)  free(__filename);
00156   blackboard->close(__interface);
00157 }
00158 
00159 
00160 void
00161 BBLogReplayThread::once()
00162 {
00163   // Write first immediately, skip first offset
00164   __logfile->read_next();
00165   __interface->write();
00166   __last_offset = __logfile->entry_offset();
00167   if (__logfile->has_next()) {
00168     __logfile->read_next();
00169     __offsetdiff  = __logfile->entry_offset() - __last_offset;
00170     __last_offset = __logfile->entry_offset();
00171   }
00172   __last_loop.stamp();
00173 }
00174 
00175 void
00176 BBLogReplayThread::loop()
00177 {
00178   if (__logfile->has_next()) {
00179 
00180     // check if there is time left to wait
00181     __now.stamp();
00182     __loopdiff = __now - __last_loop;
00183     if ((__offsetdiff.in_sec() - __loopdiff.in_sec()) > __cfg_grace_period) {
00184       if (__cfg_non_blocking) {
00185         // need to keep waiting before posting, but in non-blocking mode
00186         // just wait for next loop
00187         return;
00188       } else {
00189         __waittime = __offsetdiff - __loopdiff;
00190         __waittime.wait();
00191       }
00192     }
00193 
00194     __interface->write();
00195     __logfile->read_next();
00196 
00197     __last_loop.stamp();
00198     __offsetdiff  = __logfile->entry_offset() - __last_offset;
00199     __last_offset = __logfile->entry_offset();
00200 
00201   } else {
00202     if(__cfg_loop_replay){
00203       logger->log_info(name(), "replay finished, looping");
00204       __logfile->rewind();
00205     } else {
00206       if (opmode() == OPMODE_CONTINUOUS) {
00207         // block
00208         logger->log_info(name(), "replay finished, sleeping");
00209         WaitCondition waitcond;
00210         waitcond.wait();
00211       } // else wait will just run once per loop
00212     }
00213   }
00214 }