Fawkes API  Fawkes Development Version
sync_thread.cpp
1 
2 /***************************************************************************
3  * sync_thread.cpp - Fawkes BlackBoard Synchronization Thread
4  *
5  * Created: Thu Jun 04 18:13:06 2009
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "sync_thread.h"
24 
25 #include <blackboard/remote.h>
26 #include <core/threading/mutex_locker.h>
27 #include <utils/time/wait.h>
28 
29 #include <cstring>
30 
31 using namespace std;
32 using namespace fawkes;
33 
34 /** @class BlackBoardSynchronizationThread "sync_thread.h"
35  * Thread to synchronize two BlackBoards.
36  * @author Tim Niemueller
37  */
38 
39 /** Constructor.
40  * @param bbsync_cfg_prefix Configuration prefix for the whole bbsync plugin
41  * @param peer_cfg_prefix The configuration prefix for the peer this sync thread
42  * has been created for.
43  * @param peer name of the peer configuration for this thread
44  */
46  std::string &peer_cfg_prefix,
47  std::string &peer)
48  : Thread("", Thread::OPMODE_CONTINUOUS)
49 {
50  set_name("BBSyncThread[%s]", peer.c_str());
52 
53  __bbsync_cfg_prefix = bbsync_cfg_prefix;
54  __peer_cfg_prefix = peer_cfg_prefix;
55  __peer = peer;
56 
57  __remote_bb = NULL;
58 }
59 
60 
61 /** Destructor. */
63 {
64 }
65 
66 void
68 {
69  logger->log_debug(name(), "Initializing");
70  unsigned int check_interval = 0;
71  try {
72  __host = config->get_string((__peer_cfg_prefix + "host").c_str());
73  __port = config->get_uint((__peer_cfg_prefix + "port").c_str());
74 
75  check_interval = config->get_uint((__bbsync_cfg_prefix + "check_interval").c_str());
76  } catch (Exception &e) {
77  e.append("Host or port not specified for peer");
78  throw;
79  }
80 
81  try {
82  check_interval = config->get_uint((__peer_cfg_prefix + "check_interval").c_str());
83  logger->log_debug(name(), "Peer check interval set, overriding default.");
84  } catch (Exception &e) {
85  logger->log_debug(name(), "No per-peer check interval set, using default");
86  }
87 
88  read_config_combos(__peer_cfg_prefix + "reading/", /* writing */ false);
89  read_config_combos(__peer_cfg_prefix + "writing/", /* writing */ true);
90 
91  for (ComboMap::iterator i = __combos.begin(); i != __combos.end(); ++i) {
92  logger->log_debug(name(), "Combo: %s, %s (%s, R) -> %s (%s, W)", i->second.type.c_str(),
93  i->second.reader_id.c_str(), i->second.remote_writer ? "local" : "remote",
94  i->second.writer_id.c_str(), i->second.remote_writer ? "remote" : "local");
95  }
96 
97  __wsl_local = new SyncWriterInterfaceListener(this, logger, (__peer + "/local").c_str());
98  __wsl_remote = new SyncWriterInterfaceListener(this, logger, (__peer + "/remote").c_str());
99 
100  if (! check_connection()) {
101  logger->log_warn(name(), "Remote peer not reachable, will keep trying");
102  }
103 
104  logger->log_debug(name(), "Checking for remote aliveness every %u ms", check_interval);
105  __timewait = new TimeWait(clock, check_interval * 1000);
106 }
107 
108 
109 void
111 {
112 
113  delete __timewait;
114 
115  close_interfaces();
116 
117  delete __wsl_local;
118  delete __wsl_remote;
119  delete __remote_bb;
120  __remote_bb = NULL;
121 }
122 
123 
124 void
126 {
127  __timewait->mark_start();
128  check_connection();
129  __timewait->wait_systime();
130 }
131 
132 
133 bool
134 BlackBoardSynchronizationThread::check_connection()
135 {
136  if (! __remote_bb || ! __remote_bb->is_alive()) {
137  if (__remote_bb) {
138  logger->log_warn(name(), "Lost connection via remote BB to %s (%s:%u), will try to re-establish",
139  __peer.c_str(), __host.c_str(), __port);
140  blackboard->unregister_listener(__wsl_local);
141  __remote_bb->unregister_listener(__wsl_remote);
142  close_interfaces();
143  delete __remote_bb;
144  __remote_bb = NULL;
145  }
146 
147  try {
148  __remote_bb = new RemoteBlackBoard(__host.c_str(), __port);
149  logger->log_info(name(), "Successfully connected via remote BB to %s (%s:%u)",
150  __peer.c_str(), __host.c_str(), __port);
151 
152  open_interfaces();
153  blackboard->register_listener(__wsl_local, BlackBoard::BBIL_FLAG_WRITER);
154  __remote_bb->register_listener(__wsl_remote, BlackBoard::BBIL_FLAG_WRITER);
155  } catch (Exception &e) {
156  e.print_trace();
157  return false;
158  }
159  }
160  return true;
161 }
162 
163 void
164 BlackBoardSynchronizationThread::read_config_combos(std::string prefix, bool writing)
165 {
166  Configuration::ValueIterator *i = config->search(prefix.c_str());
167  while (i->next()) {
168  if (strcmp(i->type(), "string") != 0) {
169  TypeMismatchException e("Only values of type string may occur in %s, "
170  "but found value of type %s",
171  prefix.c_str(), i->type());
172  delete i;
173  throw e;
174  }
175 
176  std::string varname = std::string(i->path()).substr(prefix.length());
177  std::string uid = i->get_string();
178  size_t sf;
179 
180  if ((sf = uid.find("::")) == std::string::npos) {
181  delete i;
182  throw Exception("Interface UID '%s' at %s is not valid, missing double colon",
183  uid.c_str(), i->path());
184  }
185 
186  std::string type = uid.substr(0, sf);
187  std::string id = uid.substr(sf + 2);
188  combo_t combo = { type, id, id, writing };
189 
190  if ( (sf = id.find("=")) != std::string::npos) {
191  // we got a mapping
192  combo.reader_id = id.substr(0, sf);
193  combo.writer_id = id.substr(sf + 1);
194  }
195 
196  __combos[varname] = combo;
197  }
198  delete i;
199 }
200 
201 
202 void
203 BlackBoardSynchronizationThread::open_interfaces()
204 {
205  logger->log_debug(name(), "Opening interfaces");
206  MutexLocker lock(__interfaces.mutex());
207 
208  ComboMap::iterator i;
209  for (i = __combos.begin(); i != __combos.end(); ++i) {
210  Interface *iface_reader = NULL, *iface_writer = NULL;
211 
212  BlackBoard *writer_bb = i->second.remote_writer ? __remote_bb : blackboard;
213  BlackBoard *reader_bb = i->second.remote_writer ? blackboard : __remote_bb;
214 
215  try {
216  logger->log_debug(name(), "Opening reading %s (%s:%s)",
217  i->second.remote_writer ? "locally" : "remotely",
218  i->second.type.c_str(), i->second.reader_id.c_str());
219  iface_reader = reader_bb->open_for_reading(i->second.type.c_str(),
220  i->second.reader_id.c_str());
221 
222  if (iface_reader->has_writer()) {
223  logger->log_debug(name(), "Opening writing on %s (%s:%s)",
224  i->second.remote_writer ? "remotely" : "locally",
225  i->second.type.c_str(), i->second.writer_id.c_str());
226  iface_writer = writer_bb->open_for_writing(i->second.type.c_str(),
227  i->second.writer_id.c_str());
228  }
229 
230  InterfaceInfo ii(&i->second, iface_writer, reader_bb, writer_bb);
231  __interfaces[iface_reader] = ii;
232 
233  } catch (Exception &e) {
234  reader_bb->close(iface_reader);
235  writer_bb->close(iface_writer);
236  throw;
237  }
238 
239  SyncInterfaceListener *sync_listener = NULL;
240  if (iface_writer) {
241  logger->log_debug(name(), "Creating sync listener");
242  sync_listener = new SyncInterfaceListener(logger, iface_reader, iface_writer,
243  reader_bb, writer_bb);
244  }
245  __sync_listeners[iface_reader] = sync_listener;
246 
247  if (i->second.remote_writer) {
248  __wsl_local->add_interface(iface_reader);
249  } else {
250  __wsl_remote->add_interface(iface_reader);
251  }
252  }
253 }
254 
255 
256 void
257 BlackBoardSynchronizationThread::close_interfaces()
258 {
259  SyncListenerMap::iterator s;
260  for (s = __sync_listeners.begin(); s != __sync_listeners.end(); ++s) {
261  if (s->second) {
262  logger->log_debug(name(), "Closing sync listener %s", s->second->bbil_name());
263  delete s->second;
264  }
265  }
266  MutexLocker lock(__interfaces.mutex());
267  InterfaceMap::iterator i;
268  for (i = __interfaces.begin(); i != __interfaces.end(); ++i) {
269  logger->log_debug(name(), "Closing %s reading interface %s",
270  i->second.combo->remote_writer ? "local" : "remote",
271  i->first->uid());
272  if (i->second.combo->remote_writer) {
273  __wsl_local->remove_interface(i->first);
274  blackboard->close(i->first);
275  } else {
276  __wsl_remote->remove_interface(i->first);
277  __remote_bb->close(i->first);
278  }
279  if (i->second.writer) {
280  logger->log_debug(name(), "Closing %s writing interface %s",
281  i->second.combo->remote_writer ? "remote" : "local",
282  i->second.writer->uid());
283  if (i->second.combo->remote_writer) {
284  __remote_bb->close(i->second.writer);
285  } else {
286  blackboard->close(i->second.writer);
287  }
288  }
289  }
290  __interfaces.clear();
291  __sync_listeners.clear();
292 }
293 
294 
295 /** A writer has been added for an interface.
296  * To be called only by SyncWriterInterfaceListener.
297  * @param interface the interface a writer has been added for.
298  */
299 void
301 {
302  MutexLocker lock(__interfaces.mutex());
303 
304  if (__interfaces[interface].writer) {
305  // There exists a writer!?
306  logger->log_warn(name(), "Writer added for %s, but relay exists already. Bug?", interface->uid());
307  } else {
308  logger->log_warn(name(), "Writer added for %s, opening relay writer", interface->uid());
309 
310  Interface *iface = NULL;
311  SyncInterfaceListener *sync_listener = NULL;
312  InterfaceInfo &ii = __interfaces[interface];
313  try {
314  iface = ii.writer_bb->open_for_writing(ii.combo->type.c_str(),
315  ii.combo->writer_id.c_str());
316 
317  logger->log_debug(name(), "Creating sync listener for %s:%s-%s",
318  ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
319  ii.combo->writer_id.c_str());
320 
321  sync_listener = new SyncInterfaceListener(logger, interface, iface,
322  ii.reader_bb, ii.writer_bb);
323 
324  __sync_listeners[interface] = sync_listener;
325  ii.writer = iface;
326 
327  } catch (Exception &e) {
328  delete sync_listener;
329  ii.writer_bb->close(iface);
330  logger->log_error(name(), "Failed to open writer for %s:%s-%s, sync broken",
331  ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
332  ii.combo->writer_id.c_str());
333  logger->log_error(name(), e);
334  }
335  }
336 }
337 
338 
339 /** A writer has been removed for an interface.
340  * To be called only by SyncWriterInterfaceListener.
341  * @param interface the interface a writer has been removed for.
342  */
343 void
345 {
346  MutexLocker lock(__interfaces.mutex());
347 
348  if (! __interfaces[interface].writer) {
349  // We do not have a writer!?
350  logger->log_warn(name(), "Writer removed for %s, but no relay exists. Bug?", interface->uid());
351  } else {
352  logger->log_warn(name(), "Writer removed for %s, closing relay writer", interface->uid());
353 
354  InterfaceInfo &ii = __interfaces[interface];
355  try {
356  delete __sync_listeners[interface];
357  __sync_listeners[interface] = NULL;
358 
359  ii.writer_bb->close(ii.writer);
360  ii.writer = NULL;
361 
362  } catch (Exception &e) {
363  logger->log_error(name(), "Failed to close writer for %s:%s-%s, sync broken",
364  ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
365  ii.combo->writer_id.c_str());
366  logger->log_error(name(), e);
367  }
368  }
369 }
virtual void finalize()
Finalize the thread.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
virtual ~BlackBoardSynchronizationThread()
Destructor.
Definition: sync_thread.cpp:62
virtual const char * type() const =0
Type of value.
Fawkes library namespace.
virtual void init()
Initialize the thread.
Definition: sync_thread.cpp:67
Mutex locking helper.
Definition: mutex_locker.h:33
STL namespace.
virtual ValueIterator * search(const char *path)=0
Iterator with search results.
virtual bool next()=0
Check if there is another element and advance to this if possible.
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:218
Thread class encapsulation of pthreads.
Definition: thread.h:42
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:727
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:79
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
void wait_systime()
Wait until minimum loop time has been reached in real time.
Definition: wait.cpp:100
BlackBoardSynchronizationThread(std::string &bbsync_cfg_prefix, std::string &peer_cfg_prefix, std::string &peer)
Constructor.
Definition: sync_thread.cpp:45
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:45
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:190
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:761
Base class for exceptions in Fawkes.
Definition: exception.h:36
void remove_interface(fawkes::Interface *interface)
Remove an interface to listen to.
virtual bool is_alive() const =0
Check if the BlackBoard is still alive.
bool has_writer() const
Check if there is a writer for the interface.
Definition: interface.cpp:834
virtual std::string get_string() const =0
Get string value.
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
void add_interface(fawkes::Interface *interface)
Add an interface to listen to.
virtual const char * path() const =0
Path of value.
void print_trace()
Prints trace to stderr.
Definition: exception.cpp:619
void mark_start()
Mark start of loop.
Definition: wait.cpp:70
Listener for writer events in bbsync plugin.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
Remote BlackBoard.
Definition: remote.h:48
Iterator interface to iterate over config values.
Definition: config.h:72
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
void writer_added(fawkes::Interface *interface)
A writer has been added for an interface.
void writer_removed(fawkes::Interface *interface)
A writer has been removed for an interface.
The BlackBoard abstract class.
Definition: blackboard.h:48
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual void loop()
Code to execute in the thread.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
Time wait utility.
Definition: wait.h:32
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_map.h:146
void append(const char *format,...)
Append messages to the message list.
Definition: exception.cpp:341
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Synchronize two interfaces.
Definition: sync_listener.h:33
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
virtual void close(Interface *interface)=0
Close interface.