Fawkes API  Fawkes Development Version
logreplay_thread.cpp
1 
2 /***************************************************************************
3  * logreplay_thread.cpp - BB Log Replay Thread
4  *
5  * Created: Wed Feb 17 01:53:00 2010
6  * Copyright 2010 Tim Niemueller [www.niemueller.de]
7  * 2010 Masrur Doostdar <doostdar@kbsg.rwth-aachen.de>
8  *
9  ****************************************************************************/
10 
11 /* This program is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU General Public License as published by
13  * the Free Software Foundation; either version 2 of the License, or
14  * (at your option) any later version.
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL file in the doc directory.
22  */
23 
24 #include "logreplay_thread.h"
25 #include "file.h"
26 
27 #include <blackboard/blackboard.h>
28 #include <logging/logger.h>
29 #include <core/threading/wait_condition.h>
30 #include <core/exceptions/system.h>
31 #include <utils/misc/autofree.h>
32 
33 #include <blackboard/internal/instance_factory.h>
34 
35 
36 #include <memory>
37 #include <cstring>
38 #include <cstdlib>
39 #include <cstdio>
40 #include <cerrno>
41 #include <fcntl.h>
42 #ifdef __FreeBSD__
43 # include <sys/endian.h>
44 #elif defined(__MACH__) && defined(__APPLE__)
45 # include <sys/_endian.h>
46 #else
47 # include <endian.h>
48 #endif
49 #include <arpa/inet.h>
50 #include <sys/mman.h>
51 
52 using namespace fawkes;
53 
54 /** @class BBLogReplayThread "logreplay_thread.h"
55  * BlackBoard log Replay thread.
56  * Writes the data of the logfile into a blackboard interface, considering the
57  * time-step differences between the data.
58  * @author Masrur Doostdar
59  * @author Tim Niemueller
60  */
61 
62 /** Constructor.
63  * @param logfile_name filename of the log to be replayed
64  * @param logdir directory containing the logfile
65  * @param scenario ID of the log scenario
66  * @param grace_period time in seconds that desired offset and loop offset may
67  * diverge to still write the new data
68  * @param loop_replay specifies if the replay should be looped
69  * @param non_blocking do not block the main loop if not enough time has elapsed
70  * to replay new data but just wait for the next cycle. This is ignored in
71  * continuous thread mode as it could cause busy waiting.
72  * @param thread_name initial thread name
73  * @param th_opmode thread operation mode
74  */
75 BBLogReplayThread::BBLogReplayThread(const char *logfile_name,
76  const char *logdir,
77  const char *scenario,
78  float grace_period,
79  bool loop_replay,
80  bool non_blocking,
81  const char *thread_name,
82  fawkes::Thread::OpMode th_opmode)
83  : Thread(thread_name, th_opmode)
84 {
85  set_name("BBLogReplayThread(%s)", logfile_name);
87 
88  __logfile_name= strdup(logfile_name);
89  __logdir = strdup(logdir);
90  __scenario = strdup(scenario); // dont need this!?
91  __filename = NULL;
92  __cfg_grace_period = grace_period;
93  __cfg_loop_replay = loop_replay;
94  if (th_opmode == OPMODE_WAITFORWAKEUP) {
95  __cfg_non_blocking = non_blocking;
96  } else {
97  // would cause busy waiting
98  __cfg_non_blocking = false;
99  }
100 }
101 
102 
103 /** Destructor. */
105 {
106  free(__logfile_name);
107  free(__logdir);
108  free(__scenario);
109 }
110 
111 
112 
113 
114 void
116 {
117  __logfile = NULL;
118  __interface = NULL;
119  __filename = NULL;
120 
121  if (asprintf(&__filename, "%s/%s", __logdir, __logfile_name) == -1) {
122  throw OutOfMemoryException("Cannot re-generate logfile-path");
123  }
124 
125  try {
126  __logfile = new BBLogFile(__filename, true);
127  } catch (Exception &e) {
128  finalize();
129  throw;
130  }
131 
132  if (! __logfile->has_next()) {
133  finalize();
134  throw Exception("Log file %s does not have any entries", __filename);
135  }
136 
137  __interface = blackboard->open_for_writing(__logfile->interface_type(),
138  __logfile->interface_id());
139 
140  try {
141  __logfile->set_interface(__interface);
142  } catch (Exception &e) {
143  finalize();
144  throw;
145  }
146 
147  logger->log_info(name(), "Replaying from %s:", __filename);
148 }
149 
150 
151 void
153 {
154  delete __logfile;
155  if (__filename) free(__filename);
156  blackboard->close(__interface);
157 }
158 
159 
160 void
162 {
163  // Write first immediately, skip first offset
164  __logfile->read_next();
165  __interface->write();
166  __last_offset = __logfile->entry_offset();
167  if (__logfile->has_next()) {
168  __logfile->read_next();
169  __offsetdiff = __logfile->entry_offset() - __last_offset;
170  __last_offset = __logfile->entry_offset();
171  }
172  __last_loop.stamp();
173 }
174 
175 void
177 {
178  if (__logfile->has_next()) {
179 
180  // check if there is time left to wait
181  __now.stamp();
182  __loopdiff = __now - __last_loop;
183  if ((__offsetdiff.in_sec() - __loopdiff.in_sec()) > __cfg_grace_period) {
184  if (__cfg_non_blocking) {
185  // need to keep waiting before posting, but in non-blocking mode
186  // just wait for next loop
187  return;
188  } else {
189  __waittime = __offsetdiff - __loopdiff;
190  __waittime.wait();
191  }
192  }
193 
194  __interface->write();
195  __logfile->read_next();
196 
197  __last_loop.stamp();
198  __offsetdiff = __logfile->entry_offset() - __last_offset;
199  __last_offset = __logfile->entry_offset();
200 
201  } else {
202  if(__cfg_loop_replay){
203  logger->log_info(name(), "replay finished, looping");
204  __logfile->rewind();
205  } else {
206  if (opmode() == OPMODE_CONTINUOUS) {
207  // block
208  logger->log_info(name(), "replay finished, sleeping");
209  WaitCondition waitcond;
210  waitcond.wait();
211  } // else wait will just run once per loop
212  }
213  }
214 }
double in_sec() const
Convet time to seconds.
Definition: time.cpp:232
virtual void init()
Initialize the thread.
Wait until a given condition holds.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
virtual ~BBLogReplayThread()
Destructor.
void rewind()
Rewind file to start.
Definition: bblogfile.cpp:254
Fawkes library namespace.
OpMode
Thread operation mode.
Definition: thread.h:52
const char * interface_id() const
Get interface ID.
Definition: bblogfile.cpp:573
Thread class encapsulation of pthreads.
Definition: thread.h:42
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:727
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:500
bool has_next()
Check if another entry is available.
Definition: bblogfile.cpp:267
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
void read_next()
Read next entry.
Definition: bblogfile.cpp:284
void wait()
Wait (sleep) for this time.
Definition: time.cpp:817
const fawkes::Time & entry_offset() const
Get current entry offset.
Definition: bblogfile.cpp:514
OpMode opmode() const
Get operation mode.
Definition: thread.cpp:678
void set_interface(fawkes::Interface *interface)
Set the internal interface.
Definition: bblogfile.cpp:491
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:761
Base class for exceptions in Fawkes.
Definition: exception.h:36
operate in continuous mode (default)
Definition: thread.h:53
BBLogReplayThread(const char *logfile_name, const char *logdir, const char *scenario, float grace_period, bool loop_replay, bool non_blocking=false, const char *thread_name="BBLogReplayThread", fawkes::Thread::OpMode th_opmode=Thread::OPMODE_CONTINUOUS)
Constructor.
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void once()
Execute an action exactly once.
void wait()
Wait for the condition forever.
const char * interface_type() const
Get interface type.
Definition: bblogfile.cpp:563
virtual void loop()
Code to execute in the thread.
Time & stamp()
Set this time to the current time.
Definition: time.cpp:783
operate in wait-for-wakeup mode
Definition: thread.h:54
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void finalize()
Finalize the thread.
Class to easily access bblogger log files.
Definition: bblogfile.h:38
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
virtual void close(Interface *interface)=0
Close interface.