Fawkes API  Fawkes Development Version
sync_thread.cpp
00001 
00002 /***************************************************************************
00003  *  sync_thread.cpp - Fawkes BlackBoard Synchronization Thread
00004  *
00005  *  Created: Thu Jun 04 18:13:06 2009
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU Library General Public License for more details.
00019  *
00020  *  Read the full text in the LICENSE.GPL file in the doc directory.
00021  */
00022 
00023 #include "sync_thread.h"
00024 
00025 #include <blackboard/remote.h>
00026 #include <core/threading/mutex_locker.h>
00027 #include <utils/time/wait.h>
00028 
00029 #include <cstring>
00030 
00031 using namespace std;
00032 using namespace fawkes;
00033 
00034 /** @class BlackBoardSynchronizationThread "sync_thread.h"
00035  * Thread to synchronize two BlackBoards.
00036  * @author Tim Niemueller
00037  */
00038 
00039 /** Constructor.
00040  * @param bbsync_cfg_prefix Configuration prefix for the whole bbsync plugin
00041  * @param peer_cfg_prefix The configuration prefix for the peer this sync thread
00042  * has been created for.
00043  * @param peer name of the peer configuration for this thread
00044  */
00045 BlackBoardSynchronizationThread::BlackBoardSynchronizationThread(std::string &bbsync_cfg_prefix,
00046                                                                  std::string &peer_cfg_prefix,
00047                                                                  std::string &peer)
00048   : Thread("", Thread::OPMODE_CONTINUOUS)
00049 {
00050   set_name("BBSyncThread[%s]", peer.c_str());
00051   set_prepfin_conc_loop(true);
00052 
00053   __bbsync_cfg_prefix = bbsync_cfg_prefix;
00054   __peer_cfg_prefix   = peer_cfg_prefix;
00055   __peer              = peer;
00056 
00057   __remote_bb = NULL;
00058 }
00059 
00060 
00061 /** Destructor. */
00062 BlackBoardSynchronizationThread::~BlackBoardSynchronizationThread()
00063 {
00064 }
00065 
00066 void
00067 BlackBoardSynchronizationThread::init()
00068 {
00069   logger->log_debug(name(), "Initializing");
00070   unsigned int check_interval = 0;
00071   try {
00072     __host = config->get_string((__peer_cfg_prefix + "host").c_str());
00073     __port = config->get_uint((__peer_cfg_prefix + "port").c_str());
00074 
00075     check_interval = config->get_uint((__bbsync_cfg_prefix + "check_interval").c_str());
00076   } catch (Exception &e) {
00077     e.append("Host or port not specified for peer");
00078     throw;
00079   }
00080 
00081   try {
00082     check_interval = config->get_uint((__peer_cfg_prefix + "check_interval").c_str());
00083     logger->log_debug(name(), "Peer check interval set, overriding default.");
00084   } catch (Exception &e) {
00085     logger->log_debug(name(), "No per-peer check interval set, using default");
00086   }
00087 
00088   read_config_combos(__peer_cfg_prefix + "reading/", /* writing */ false);
00089   read_config_combos(__peer_cfg_prefix + "writing/", /* writing */ true);
00090 
00091   for (ComboMap::iterator i = __combos.begin(); i != __combos.end(); ++i) {
00092     logger->log_debug(name(), "Combo: %s, %s (%s, R) -> %s (%s, W)", i->second.type.c_str(),
00093                       i->second.reader_id.c_str(), i->second.remote_writer ? "local" : "remote",
00094                       i->second.writer_id.c_str(), i->second.remote_writer ? "remote" : "local");
00095   }
00096 
00097   __wsl_local  = new SyncWriterInterfaceListener(this, logger, (__peer + "/local").c_str());
00098   __wsl_remote = new SyncWriterInterfaceListener(this, logger, (__peer + "/remote").c_str());
00099 
00100   if (! check_connection()) {
00101     logger->log_warn(name(), "Remote peer not reachable, will keep trying");
00102   }
00103 
00104   logger->log_debug(name(), "Checking for remote aliveness every %u ms", check_interval);
00105   __timewait = new TimeWait(clock, check_interval * 1000);
00106 }
00107 
00108 
00109 void
00110 BlackBoardSynchronizationThread::finalize()
00111 {
00112 
00113   delete __timewait;
00114 
00115   close_interfaces();
00116 
00117   delete __wsl_local;
00118   delete __wsl_remote;
00119   delete __remote_bb;
00120   __remote_bb = NULL;
00121 }
00122 
00123 
00124 void
00125 BlackBoardSynchronizationThread::loop()
00126 {
00127   __timewait->mark_start();
00128   check_connection();
00129   __timewait->wait_systime();
00130 }
00131 
00132 
00133 bool
00134 BlackBoardSynchronizationThread::check_connection()
00135 {
00136   if (! __remote_bb || ! __remote_bb->is_alive()) {
00137     if (__remote_bb) {
00138       logger->log_warn(name(), "Lost connection via remote BB to %s (%s:%u), will try to re-establish",
00139                        __peer.c_str(), __host.c_str(), __port);
00140       blackboard->unregister_listener(__wsl_local);
00141       __remote_bb->unregister_listener(__wsl_remote);
00142       close_interfaces();
00143       delete __remote_bb;
00144       __remote_bb = NULL;
00145     }
00146 
00147     try {
00148       __remote_bb = new RemoteBlackBoard(__host.c_str(), __port);
00149       logger->log_info(name(), "Successfully connected via remote BB to %s (%s:%u)",
00150                        __peer.c_str(), __host.c_str(), __port);
00151 
00152       open_interfaces();
00153       blackboard->register_listener(__wsl_local, BlackBoard::BBIL_FLAG_WRITER);
00154       __remote_bb->register_listener(__wsl_remote, BlackBoard::BBIL_FLAG_WRITER);
00155     } catch (Exception &e) {
00156       e.print_trace();
00157       return false;
00158     }
00159   }
00160   return true;
00161 }
00162 
00163 void
00164 BlackBoardSynchronizationThread::read_config_combos(std::string prefix, bool writing)
00165 {
00166   Configuration::ValueIterator *i = config->search(prefix.c_str());
00167   while (i->next()) {
00168     if (strcmp(i->type(), "string") != 0) {
00169       TypeMismatchException e("Only values of type string may occur in %s, "
00170                               "but found value of type %s",
00171                               prefix.c_str(), i->type());
00172       delete i;
00173       throw e;
00174     }
00175 
00176     std::string varname = std::string(i->path()).substr(prefix.length());
00177     std::string uid     = i->get_string();
00178     size_t sf;
00179 
00180     if ((sf = uid.find("::")) == std::string::npos) {
00181       delete i;
00182       throw Exception("Interface UID '%s' at %s is not valid, missing double colon",
00183                       uid.c_str(), i->path());
00184     }
00185 
00186     std::string type = uid.substr(0, sf);
00187     std::string id = uid.substr(sf + 2);
00188     combo_t combo = {  type, id, id, writing };
00189 
00190     if ( (sf = id.find("=")) != std::string::npos) {
00191       // we got a mapping
00192       combo.reader_id = id.substr(0, sf);
00193       combo.writer_id = id.substr(sf + 1);
00194     }
00195 
00196     __combos[varname] = combo;
00197   }
00198   delete i;
00199 }
00200 
00201 
00202 void
00203 BlackBoardSynchronizationThread::open_interfaces()
00204 {
00205   logger->log_debug(name(), "Opening interfaces");
00206   MutexLocker lock(__interfaces.mutex());
00207 
00208   ComboMap::iterator i;
00209   for (i = __combos.begin(); i != __combos.end(); ++i) {
00210     Interface *iface_reader = NULL, *iface_writer = NULL;
00211 
00212     BlackBoard *writer_bb = i->second.remote_writer ? __remote_bb : blackboard;
00213     BlackBoard *reader_bb = i->second.remote_writer ? blackboard  : __remote_bb;
00214 
00215     try {
00216       logger->log_debug(name(), "Opening reading %s (%s:%s)",
00217                         i->second.remote_writer ? "locally" : "remotely",
00218                         i->second.type.c_str(), i->second.reader_id.c_str());
00219       iface_reader = reader_bb->open_for_reading(i->second.type.c_str(),
00220                                                  i->second.reader_id.c_str());
00221 
00222       if (iface_reader->has_writer()) {
00223         logger->log_debug(name(), "Opening writing on %s (%s:%s)",
00224                           i->second.remote_writer ? "remotely" : "locally",
00225                           i->second.type.c_str(), i->second.writer_id.c_str());
00226         iface_writer = writer_bb->open_for_writing(i->second.type.c_str(),
00227                                                    i->second.writer_id.c_str());
00228       }
00229 
00230       InterfaceInfo ii(&i->second, iface_writer, reader_bb, writer_bb);
00231       __interfaces[iface_reader] = ii;
00232 
00233     } catch (Exception &e) {
00234       reader_bb->close(iface_reader);
00235       writer_bb->close(iface_writer);
00236       throw;
00237     }
00238 
00239     SyncInterfaceListener *sync_listener = NULL;
00240     if (iface_writer) {
00241       logger->log_debug(name(), "Creating sync listener");
00242       sync_listener = new SyncInterfaceListener(logger, iface_reader, iface_writer,
00243                                                 reader_bb, writer_bb);
00244     }
00245     __sync_listeners[iface_reader] = sync_listener;
00246 
00247     if (i->second.remote_writer) {
00248       __wsl_local->add_interface(iface_reader);
00249     } else {
00250       __wsl_remote->add_interface(iface_reader);
00251     }
00252   }
00253 }
00254 
00255 
00256 void
00257 BlackBoardSynchronizationThread::close_interfaces()
00258 {
00259   SyncListenerMap::iterator s;
00260   for (s = __sync_listeners.begin(); s != __sync_listeners.end(); ++s) {
00261     if (s->second) {
00262       logger->log_debug(name(), "Closing sync listener %s", s->second->bbil_name());
00263       delete s->second;
00264     }
00265   }
00266   MutexLocker lock(__interfaces.mutex());
00267   InterfaceMap::iterator i;
00268   for (i = __interfaces.begin(); i != __interfaces.end(); ++i) {
00269     logger->log_debug(name(), "Closing %s reading interface %s",
00270                       i->second.combo->remote_writer ? "local" : "remote",
00271                       i->first->uid());
00272     if (i->second.combo->remote_writer) {
00273       __wsl_local->remove_interface(i->first);
00274       blackboard->close(i->first);
00275     } else {
00276       __wsl_remote->remove_interface(i->first);
00277       __remote_bb->close(i->first);
00278     }
00279     if (i->second.writer) {
00280       logger->log_debug(name(), "Closing %s writing interface %s",
00281                         i->second.combo->remote_writer ? "remote" : "local",
00282                         i->second.writer->uid());
00283       if (i->second.combo->remote_writer) {
00284         __remote_bb->close(i->second.writer);
00285       } else {
00286         blackboard->close(i->second.writer);
00287       }
00288     }
00289   }
00290   __interfaces.clear();
00291   __sync_listeners.clear();
00292 }
00293 
00294 
00295 /** A writer has been added for an interface.
00296  * To be called only by SyncWriterInterfaceListener.
00297  * @param interface the interface a writer has been added for.
00298  */
00299 void
00300 BlackBoardSynchronizationThread::writer_added(fawkes::Interface *interface) throw()
00301 {
00302   MutexLocker lock(__interfaces.mutex());
00303 
00304   if (__interfaces[interface].writer) {
00305     // There exists a writer!?
00306     logger->log_warn(name(), "Writer added for %s, but relay exists already. Bug?", interface->uid());
00307   } else {
00308     logger->log_warn(name(), "Writer added for %s, opening relay writer", interface->uid());
00309 
00310     Interface *iface = NULL;
00311     SyncInterfaceListener *sync_listener = NULL;
00312     InterfaceInfo &ii = __interfaces[interface];
00313     try {
00314       iface = ii.writer_bb->open_for_writing(ii.combo->type.c_str(),
00315                                              ii.combo->writer_id.c_str());
00316       
00317       logger->log_debug(name(), "Creating sync listener for %s:%s-%s",
00318                         ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
00319                         ii.combo->writer_id.c_str());
00320 
00321       sync_listener = new SyncInterfaceListener(logger, interface, iface,
00322                                                 ii.reader_bb, ii.writer_bb);
00323 
00324       __sync_listeners[interface] = sync_listener;
00325       ii.writer = iface;
00326 
00327     } catch (Exception &e) {
00328       delete sync_listener;
00329       ii.writer_bb->close(iface);
00330       logger->log_error(name(), "Failed to open writer for %s:%s-%s, sync broken",
00331                         ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
00332                         ii.combo->writer_id.c_str());
00333       logger->log_error(name(), e);
00334     }
00335   }
00336 }
00337 
00338 
00339 /** A writer has been removed for an interface.
00340  * To be called only by SyncWriterInterfaceListener.
00341  * @param interface the interface a writer has been removed for.
00342  */
00343 void
00344 BlackBoardSynchronizationThread::writer_removed(fawkes::Interface *interface) throw()
00345 {
00346   MutexLocker lock(__interfaces.mutex());
00347 
00348   if (! __interfaces[interface].writer) {
00349     // We do not have a writer!?
00350     logger->log_warn(name(), "Writer removed for %s, but no relay exists. Bug?", interface->uid());
00351   } else {
00352     logger->log_warn(name(), "Writer removed for %s, closing relay writer", interface->uid());
00353 
00354     InterfaceInfo &ii = __interfaces[interface];
00355     try {
00356       delete __sync_listeners[interface];
00357       __sync_listeners[interface] = NULL;
00358 
00359       ii.writer_bb->close(ii.writer);
00360       ii.writer = NULL;
00361 
00362     } catch (Exception &e) {
00363       logger->log_error(name(), "Failed to close writer for %s:%s-%s, sync broken",
00364                         ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
00365                         ii.combo->writer_id.c_str());
00366       logger->log_error(name(), e);
00367     }
00368   }
00369 }