Fawkes API  Fawkes Development Version
remote.cpp
00001 
00002 /***************************************************************************
00003  *  remote.h - Remote BlackBoard access via Fawkes network protocol
00004  *
00005  *  Created: Mon Mar 03 10:53:00 2008
00006  *  Copyright  2006-2008  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <blackboard/remote.h>
00025 #include <blackboard/exceptions.h>
00026 #include <blackboard/net/messages.h>
00027 #include <blackboard/net/ilist_content.h>
00028 #include <blackboard/net/interface_proxy.h>
00029 #include <blackboard/internal/notifier.h>
00030 #include <blackboard/internal/instance_factory.h>
00031 
00032 #include <interface/interface_info.h>
00033 
00034 #include <core/threading/thread.h>
00035 #include <core/threading/mutex.h>
00036 #include <core/threading/mutex_locker.h>
00037 #include <core/threading/wait_condition.h>
00038 #include <netcomm/fawkes/client.h>
00039 
00040 #include <string>
00041 #include <cstring>
00042 #include <fnmatch.h>
00043 #include <arpa/inet.h>
00044 
00045 namespace fawkes {
00046 
00047 /** @class RemoteBlackBoard <blackboard/remote.h>
00048  * Remote BlackBoard.
00049  * This class implements the access to a remote BlackBoard using the Fawkes
00050  * network protocol.
00051  *
00052  * @author Tim Niemueller
00053  */
00054 
00055 /** Constructor.
00056  * @param client Fawkes network client to use.
00057  */
00058 RemoteBlackBoard::RemoteBlackBoard(FawkesNetworkClient *client)
00059 {
00060   __fnc = client;
00061   __fnc_owner = false;
00062 
00063   if ( ! __fnc->connected() ) {
00064     throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
00065   }
00066 
00067   __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
00068 
00069   __mutex = new Mutex();
00070   __instance_factory = new BlackBoardInstanceFactory();
00071 
00072   __wait_mutex = new Mutex();
00073   __wait_cond  = new WaitCondition(__wait_mutex);
00074 
00075   __inbound_thread = NULL;
00076   __m = NULL;
00077 }
00078 
00079 
00080 /** Constructor.
00081  * This will internall create a fawkes network client that is used to communicate
00082  * with the remote BlackBoard.
00083  * @param hostname hostname to connect to
00084  * @param port port to connect to
00085  */
00086 RemoteBlackBoard::RemoteBlackBoard(const char *hostname, unsigned short int port)
00087 {
00088   __fnc = new FawkesNetworkClient(hostname, port);
00089   try {
00090     __fnc->connect();
00091   } catch (Exception &e) {
00092     delete __fnc;
00093     throw;
00094   }
00095 
00096   __fnc_owner = true;
00097 
00098   if ( ! __fnc->connected() ) {
00099     throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
00100   }
00101 
00102   __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
00103 
00104   __mutex = new Mutex();
00105   __instance_factory = new BlackBoardInstanceFactory();
00106 
00107   __wait_mutex = new Mutex();
00108   __wait_cond  = new WaitCondition(__wait_mutex);
00109 
00110   __inbound_thread = NULL;
00111   __m = NULL;
00112 }
00113 
00114 
00115 /** Destructor. */
00116 RemoteBlackBoard::~RemoteBlackBoard()
00117 {
00118   __fnc->deregister_handler(FAWKES_CID_BLACKBOARD);
00119   delete __mutex;
00120   delete __instance_factory;
00121 
00122   for ( __pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
00123     delete __pit->second;
00124   }
00125 
00126   if (__fnc_owner) {
00127     __fnc->disconnect();
00128     delete __fnc;
00129   }
00130 
00131   delete __wait_cond;
00132   delete __wait_mutex;
00133 }
00134 
00135 
00136 bool
00137 RemoteBlackBoard::is_alive() const throw()
00138 {
00139   return __fnc->connected();
00140 }
00141 
00142 
00143 void
00144 RemoteBlackBoard::reopen_interfaces()
00145 {
00146   __proxies.lock();
00147   __ipit = __invalid_proxies.begin();
00148   while ( __ipit != __invalid_proxies.end() ) {
00149     try {
00150       Interface *iface = (*__ipit)->interface();
00151       open_interface(iface->type(), iface->id(), iface->is_writer(), iface);
00152       iface->set_validity(true);
00153       __ipit = __invalid_proxies.erase(__ipit);
00154     } catch (Exception &e) {
00155           // we failed to re-establish validity for the given interface, bad luck
00156       ++__ipit;
00157     }
00158   }
00159   __proxies.unlock();
00160 }
00161 
00162 bool
00163 RemoteBlackBoard::try_aliveness_restore() throw()
00164 {
00165   bool rv = true;
00166   try {
00167     if ( ! __fnc->connected() ) {
00168       __fnc->connect();
00169 
00170       reopen_interfaces();
00171     }
00172   } catch (...) {
00173     rv = false;
00174   }
00175   return rv;
00176 }
00177 
00178 
00179 void
00180 RemoteBlackBoard::open_interface(const char *type, const char *identifier,
00181                                  bool writer, Interface *iface)
00182 {
00183   if ( ! __fnc->connected() ) {
00184     throw Exception("Cannot instantiate remote interface, connection is dead");
00185   }
00186 
00187   __mutex->lock();
00188   if (__inbound_thread != NULL &&
00189       Thread::current_thread() &&
00190       strcmp(Thread::current_thread()->name(), __inbound_thread) == 0)
00191   {
00192     throw Exception("Cannot call open_interface() from inbound handler");
00193   }
00194   __mutex->unlock();
00195 
00196   bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1, sizeof(bb_iopen_msg_t));
00197   strncpy(om->type, type, __INTERFACE_TYPE_SIZE);
00198   strncpy(om->id, identifier, __INTERFACE_ID_SIZE);
00199   memcpy(om->hash, iface->hash(), __INTERFACE_HASH_SIZE);
00200 
00201   FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00202                                                         writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING,
00203                                                         om, sizeof(bb_iopen_msg_t));
00204 
00205   __wait_mutex->lock();
00206   __fnc->enqueue(omsg);
00207   while (is_alive() &&
00208          (! __m ||
00209            ((__m->msgid() != MSG_BB_OPEN_SUCCESS) &&
00210            (__m->msgid() != MSG_BB_OPEN_FAILURE))))
00211   {
00212     if ( __m ) {
00213       __m->unref();
00214       __m = NULL;
00215     }
00216     __wait_cond->wait();
00217   }
00218   __wait_mutex->unlock();
00219 
00220   if (!is_alive()) {
00221     throw Exception("Connection died while trying to open %s::%s",
00222                     type, identifier);
00223   }
00224 
00225   if ( __m->msgid() == MSG_BB_OPEN_SUCCESS ) {
00226     // We got the interface, create internal storage and prepare instance for return
00227     BlackBoardInterfaceProxy *proxy = new BlackBoardInterfaceProxy(__fnc, __m, __notifier,
00228                                                                    iface, writer);
00229     __proxies[proxy->serial()] = proxy;
00230   } else if ( __m->msgid() == MSG_BB_OPEN_FAILURE ) {
00231     bb_iopenfail_msg_t *fm = __m->msg<bb_iopenfail_msg_t>();
00232     unsigned int error = ntohl(fm->errno);
00233     __m->unref();
00234     __m = NULL;
00235     if ( error == BB_ERR_WRITER_EXISTS ) {
00236       throw BlackBoardWriterActiveException(identifier, type);
00237     } else if ( error == BB_ERR_HASH_MISMATCH ) {
00238       throw Exception("Hash mismatch for interface %s:%s", type, identifier);
00239     } else if ( error == BB_ERR_UNKNOWN_TYPE ) {
00240       throw Exception("Type %s unknown (%s::%s)", type, type, identifier);
00241     } else if ( error == BB_ERR_WRITER_EXISTS ) {
00242       throw BlackBoardWriterActiveException(identifier, type);
00243     } else {
00244       throw Exception("Could not open interface");
00245     }
00246   }
00247 
00248   __m->unref();
00249   __m = NULL;
00250 }
00251 
00252 Interface *
00253 RemoteBlackBoard::open_interface(const char *type, const char *identifier, bool writer)
00254 {
00255   if ( ! __fnc->connected() ) {
00256     throw Exception("Cannot instantiate remote interface, connection is dead");
00257   }
00258 
00259   Interface *iface = __instance_factory->new_interface_instance(type, identifier);
00260   try {
00261     open_interface(type, identifier, writer, iface);
00262   } catch (Exception &e) {
00263     __instance_factory->delete_interface_instance(iface);
00264     throw;
00265   }
00266 
00267   return iface;
00268 }
00269 
00270 
00271 Interface *
00272 RemoteBlackBoard::open_for_reading(const char *type, const char *identifier)
00273 {
00274   return open_interface(type, identifier, /* writer? */ false);
00275 }
00276 
00277 
00278 Interface *
00279 RemoteBlackBoard::open_for_writing(const char *type, const char *identifier)
00280 {
00281   return open_interface(type, identifier, /* writer? */ true);
00282 }
00283 
00284 
00285 std::list<Interface *>
00286 RemoteBlackBoard::open_multiple_for_reading(const char *type_pattern,
00287                                             const char *id_pattern)
00288 {
00289   std::list<Interface *> rv;
00290 
00291   InterfaceInfoList *infl = list_all();
00292   for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
00293     // ensure 0-termination
00294     char type[__INTERFACE_TYPE_SIZE + 1];
00295     char id[__INTERFACE_ID_SIZE + 1];
00296     type[__INTERFACE_TYPE_SIZE] = 0;
00297     id[__INTERFACE_TYPE_SIZE] = 0;
00298     strncpy(type, i->type(), __INTERFACE_TYPE_SIZE);
00299     strncpy(id, i->id(), __INTERFACE_ID_SIZE);
00300 
00301     if ((fnmatch(type_pattern, type, 0) == FNM_NOMATCH) ||
00302         (fnmatch(id_pattern, id, 0) == FNM_NOMATCH) ) {
00303       // type or ID prefix does not match, go on
00304       continue;
00305     }
00306 
00307     try {
00308       Interface *iface = open_for_reading((*i).type(), (*i).id());
00309       rv.push_back(iface);
00310     } catch (Exception &e) {
00311       for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) {
00312         close(*j);
00313       }
00314       throw;
00315     }
00316   }
00317 
00318   return rv;
00319 }
00320 
00321 
00322 /** Close interface.
00323  * @param interface interface to close
00324  */
00325 void
00326 RemoteBlackBoard::close(Interface *interface)
00327 {
00328   if ( interface == NULL )  return;
00329 
00330   unsigned int serial = interface->serial();
00331 
00332   if ( __proxies.find(serial) != __proxies.end() ) {
00333     delete __proxies[serial];
00334     __proxies.erase(serial);
00335   }
00336 
00337   if ( __fnc->connected() ) {
00338     // We cannot "officially" close it, if we are disconnected it cannot be used anyway
00339     bb_iserial_msg_t *sm = (bb_iserial_msg_t *)calloc(1, sizeof(bb_iserial_msg_t));
00340     sm->serial = htonl(interface->serial());
00341 
00342     FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00343                                                           MSG_BB_CLOSE,
00344                                                           sm, sizeof(bb_iserial_msg_t));
00345     __fnc->enqueue(omsg);
00346   }
00347 
00348   __instance_factory->delete_interface_instance(interface);
00349 }
00350 
00351 
00352 InterfaceInfoList *
00353 RemoteBlackBoard::list_all()
00354 {
00355   __mutex->lock();
00356   if (__inbound_thread != NULL &&
00357       strcmp(Thread::current_thread()->name(), __inbound_thread) == 0)
00358   {
00359     throw Exception("Cannot call list_all() from inbound handler");
00360   }
00361   __mutex->unlock();
00362 
00363   InterfaceInfoList *infl = new InterfaceInfoList();
00364 
00365   FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00366                                                         MSG_BB_LIST_ALL);
00367   __wait_mutex->lock();
00368   __fnc->enqueue(omsg);
00369   while (! __m ||
00370          (__m->msgid() != MSG_BB_INTERFACE_LIST)) {
00371     if ( __m ) {
00372       __m->unref();
00373       __m = NULL;
00374     }
00375     __wait_cond->wait();
00376   }
00377   __wait_mutex->unlock();
00378 
00379   BlackBoardInterfaceListContent *bbilc = __m->msgc<BlackBoardInterfaceListContent>();
00380   while ( bbilc->has_next() ) {
00381     size_t iisize;
00382     bb_iinfo_msg_t *ii = bbilc->next(&iisize);
00383     infl->append(ii->type, ii->id, ii->hash,  ii->serial,
00384                  ii->has_writer, ii->num_readers);
00385   }
00386 
00387   __m->unref();
00388   __m = NULL;
00389 
00390   return infl;
00391 }
00392 
00393 
00394 InterfaceInfoList *
00395 RemoteBlackBoard::list(const char *type_pattern, const char *id_pattern)
00396 {
00397   __mutex->lock();
00398   if (__inbound_thread != NULL &&
00399       strcmp(Thread::current_thread()->name(), __inbound_thread) == 0)
00400   {
00401     throw Exception("Cannot call list() from inbound handler");
00402   }
00403   __mutex->unlock();
00404 
00405   InterfaceInfoList *infl = new InterfaceInfoList();
00406 
00407   bb_ilistreq_msg_t *om =
00408     (bb_ilistreq_msg_t *)calloc(1, sizeof(bb_ilistreq_msg_t));
00409   strncpy(om->type_pattern, type_pattern, __INTERFACE_TYPE_SIZE);
00410   strncpy(om->id_pattern, id_pattern, __INTERFACE_ID_SIZE);
00411 
00412   FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00413                                                         MSG_BB_LIST,
00414                                                         om,
00415                                                         sizeof(bb_ilistreq_msg_t));
00416 
00417   __wait_mutex->lock();
00418   __fnc->enqueue(omsg);
00419   while (! __m ||
00420          (__m->msgid() != MSG_BB_INTERFACE_LIST)) {
00421     if ( __m ) {
00422       __m->unref();
00423       __m = NULL;
00424     }
00425     __wait_cond->wait();
00426   }
00427   __wait_mutex->unlock();
00428 
00429   BlackBoardInterfaceListContent *bbilc =
00430     __m->msgc<BlackBoardInterfaceListContent>();
00431   while ( bbilc->has_next() ) {
00432     size_t iisize;
00433     bb_iinfo_msg_t *ii = bbilc->next(&iisize);
00434     infl->append(ii->type, ii->id, ii->hash,  ii->serial,
00435                  ii->has_writer, ii->num_readers);
00436   }
00437 
00438   __m->unref();
00439   __m = NULL;
00440 
00441   return infl;
00442 }
00443 
00444 
00445 /** We are no longer registered in Fawkes network client.
00446  * Ignored.
00447  * @param id the id of the calling client
00448  */
00449 void
00450 RemoteBlackBoard::deregistered(unsigned int id) throw()
00451 {
00452 }
00453 
00454 
00455 void
00456 RemoteBlackBoard::inbound_received(FawkesNetworkMessage *m,
00457                                    unsigned int id) throw()
00458 {
00459   __mutex->lock();
00460   __inbound_thread = Thread::current_thread()->name();
00461   __mutex->unlock();
00462 
00463   if ( m->cid() == FAWKES_CID_BLACKBOARD ) {
00464     unsigned int msgid = m->msgid();
00465     try {
00466       if ( msgid == MSG_BB_DATA_CHANGED ) {
00467         unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
00468         if ( __proxies.find(serial) != __proxies.end() ) {
00469           __proxies[serial]->process_data_changed(m);
00470         }
00471       } else if (msgid == MSG_BB_INTERFACE_MESSAGE) {
00472         unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
00473         if ( __proxies.find(serial) != __proxies.end() ) {
00474           __proxies[serial]->process_interface_message(m);
00475         }
00476       } else if (msgid == MSG_BB_READER_ADDED) {
00477         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00478         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00479           __proxies[ntohl(esm->serial)]->reader_added(ntohl(esm->event_serial));
00480         }
00481       } else if (msgid == MSG_BB_READER_REMOVED) {
00482         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00483         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00484           __proxies[ntohl(esm->serial)]->reader_removed(ntohl(esm->event_serial));
00485         }
00486       } else if (msgid == MSG_BB_WRITER_ADDED) {
00487         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00488         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00489           __proxies[ntohl(esm->serial)]->writer_added(ntohl(esm->event_serial));
00490         }
00491       } else if (msgid == MSG_BB_WRITER_REMOVED) {
00492         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00493         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00494           __proxies[ntohl(esm->serial)]->writer_removed(ntohl(esm->event_serial));
00495         }
00496       } else if (msgid == MSG_BB_INTERFACE_CREATED) {
00497         bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>();
00498         __notifier->notify_of_interface_created(em->type, em->id);
00499       } else if (msgid == MSG_BB_INTERFACE_DESTROYED) {
00500         bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>();
00501         __notifier->notify_of_interface_destroyed(em->type, em->id);
00502       } else {
00503         __wait_mutex->lock();
00504         __m = m;
00505         __m->ref();
00506         __wait_cond->wake_all();
00507         __wait_mutex->unlock();
00508       }
00509     } catch (Exception &e) {
00510       // Bam, you're dead. Ok, not now, we just ignore that this shit happened...
00511     }
00512   }
00513 
00514   __mutex->lock();
00515   __inbound_thread = NULL;
00516   __mutex->unlock();
00517 }
00518 
00519 
00520 void
00521 RemoteBlackBoard::connection_died(unsigned int id) throw()
00522 {
00523   // mark all assigned interfaces as invalid
00524   __proxies.lock();
00525   for (__pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
00526     __pit->second->interface()->set_validity(false);
00527     __invalid_proxies.push_back(__pit->second);
00528   }
00529   __proxies.clear();
00530   __proxies.unlock();
00531   __wait_cond->wake_all();
00532 }
00533 
00534 
00535 void
00536 RemoteBlackBoard::connection_established(unsigned int id) throw()
00537 {
00538 }
00539 
00540 } // end namespace fawkes