Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * remote.h - Remote BlackBoard access via Fawkes network protocol 00004 * 00005 * Created: Mon Mar 03 10:53:00 2008 00006 * Copyright 2006-2008 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <blackboard/remote.h> 00025 #include <blackboard/exceptions.h> 00026 #include <blackboard/net/messages.h> 00027 #include <blackboard/net/ilist_content.h> 00028 #include <blackboard/net/interface_proxy.h> 00029 #include <blackboard/internal/notifier.h> 00030 #include <blackboard/internal/instance_factory.h> 00031 00032 #include <interface/interface_info.h> 00033 00034 #include <core/threading/thread.h> 00035 #include <core/threading/mutex.h> 00036 #include <core/threading/mutex_locker.h> 00037 #include <core/threading/wait_condition.h> 00038 #include <netcomm/fawkes/client.h> 00039 00040 #include <string> 00041 #include <cstring> 00042 #include <fnmatch.h> 00043 #include <arpa/inet.h> 00044 00045 namespace fawkes { 00046 00047 /** @class RemoteBlackBoard <blackboard/remote.h> 00048 * Remote BlackBoard. 00049 * This class implements the access to a remote BlackBoard using the Fawkes 00050 * network protocol. 00051 * 00052 * @author Tim Niemueller 00053 */ 00054 00055 /** Constructor. 00056 * @param client Fawkes network client to use. 00057 */ 00058 RemoteBlackBoard::RemoteBlackBoard(FawkesNetworkClient *client) 00059 { 00060 __fnc = client; 00061 __fnc_owner = false; 00062 00063 if ( ! __fnc->connected() ) { 00064 throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client"); 00065 } 00066 00067 __fnc->register_handler(this, FAWKES_CID_BLACKBOARD); 00068 00069 __mutex = new Mutex(); 00070 __instance_factory = new BlackBoardInstanceFactory(); 00071 00072 __wait_mutex = new Mutex(); 00073 __wait_cond = new WaitCondition(__wait_mutex); 00074 00075 __inbound_thread = NULL; 00076 __m = NULL; 00077 } 00078 00079 00080 /** Constructor. 00081 * This will internall create a fawkes network client that is used to communicate 00082 * with the remote BlackBoard. 00083 * @param hostname hostname to connect to 00084 * @param port port to connect to 00085 */ 00086 RemoteBlackBoard::RemoteBlackBoard(const char *hostname, unsigned short int port) 00087 { 00088 __fnc = new FawkesNetworkClient(hostname, port); 00089 try { 00090 __fnc->connect(); 00091 } catch (Exception &e) { 00092 delete __fnc; 00093 throw; 00094 } 00095 00096 __fnc_owner = true; 00097 00098 if ( ! __fnc->connected() ) { 00099 throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client"); 00100 } 00101 00102 __fnc->register_handler(this, FAWKES_CID_BLACKBOARD); 00103 00104 __mutex = new Mutex(); 00105 __instance_factory = new BlackBoardInstanceFactory(); 00106 00107 __wait_mutex = new Mutex(); 00108 __wait_cond = new WaitCondition(__wait_mutex); 00109 00110 __inbound_thread = NULL; 00111 __m = NULL; 00112 } 00113 00114 00115 /** Destructor. */ 00116 RemoteBlackBoard::~RemoteBlackBoard() 00117 { 00118 __fnc->deregister_handler(FAWKES_CID_BLACKBOARD); 00119 delete __mutex; 00120 delete __instance_factory; 00121 00122 for ( __pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) { 00123 delete __pit->second; 00124 } 00125 00126 if (__fnc_owner) { 00127 __fnc->disconnect(); 00128 delete __fnc; 00129 } 00130 00131 delete __wait_cond; 00132 delete __wait_mutex; 00133 } 00134 00135 00136 bool 00137 RemoteBlackBoard::is_alive() const throw() 00138 { 00139 return __fnc->connected(); 00140 } 00141 00142 00143 void 00144 RemoteBlackBoard::reopen_interfaces() 00145 { 00146 __proxies.lock(); 00147 __ipit = __invalid_proxies.begin(); 00148 while ( __ipit != __invalid_proxies.end() ) { 00149 try { 00150 Interface *iface = (*__ipit)->interface(); 00151 open_interface(iface->type(), iface->id(), iface->is_writer(), iface); 00152 iface->set_validity(true); 00153 __ipit = __invalid_proxies.erase(__ipit); 00154 } catch (Exception &e) { 00155 // we failed to re-establish validity for the given interface, bad luck 00156 ++__ipit; 00157 } 00158 } 00159 __proxies.unlock(); 00160 } 00161 00162 bool 00163 RemoteBlackBoard::try_aliveness_restore() throw() 00164 { 00165 bool rv = true; 00166 try { 00167 if ( ! __fnc->connected() ) { 00168 __fnc->connect(); 00169 00170 reopen_interfaces(); 00171 } 00172 } catch (...) { 00173 rv = false; 00174 } 00175 return rv; 00176 } 00177 00178 00179 void 00180 RemoteBlackBoard::open_interface(const char *type, const char *identifier, 00181 bool writer, Interface *iface) 00182 { 00183 if ( ! __fnc->connected() ) { 00184 throw Exception("Cannot instantiate remote interface, connection is dead"); 00185 } 00186 00187 __mutex->lock(); 00188 if (__inbound_thread != NULL && 00189 Thread::current_thread() && 00190 strcmp(Thread::current_thread()->name(), __inbound_thread) == 0) 00191 { 00192 throw Exception("Cannot call open_interface() from inbound handler"); 00193 } 00194 __mutex->unlock(); 00195 00196 bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1, sizeof(bb_iopen_msg_t)); 00197 strncpy(om->type, type, __INTERFACE_TYPE_SIZE); 00198 strncpy(om->id, identifier, __INTERFACE_ID_SIZE); 00199 memcpy(om->hash, iface->hash(), __INTERFACE_HASH_SIZE); 00200 00201 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, 00202 writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING, 00203 om, sizeof(bb_iopen_msg_t)); 00204 00205 __wait_mutex->lock(); 00206 __fnc->enqueue(omsg); 00207 while (is_alive() && 00208 (! __m || 00209 ((__m->msgid() != MSG_BB_OPEN_SUCCESS) && 00210 (__m->msgid() != MSG_BB_OPEN_FAILURE)))) 00211 { 00212 if ( __m ) { 00213 __m->unref(); 00214 __m = NULL; 00215 } 00216 __wait_cond->wait(); 00217 } 00218 __wait_mutex->unlock(); 00219 00220 if (!is_alive()) { 00221 throw Exception("Connection died while trying to open %s::%s", 00222 type, identifier); 00223 } 00224 00225 if ( __m->msgid() == MSG_BB_OPEN_SUCCESS ) { 00226 // We got the interface, create internal storage and prepare instance for return 00227 BlackBoardInterfaceProxy *proxy = new BlackBoardInterfaceProxy(__fnc, __m, __notifier, 00228 iface, writer); 00229 __proxies[proxy->serial()] = proxy; 00230 } else if ( __m->msgid() == MSG_BB_OPEN_FAILURE ) { 00231 bb_iopenfail_msg_t *fm = __m->msg<bb_iopenfail_msg_t>(); 00232 unsigned int error = ntohl(fm->errno); 00233 __m->unref(); 00234 __m = NULL; 00235 if ( error == BB_ERR_WRITER_EXISTS ) { 00236 throw BlackBoardWriterActiveException(identifier, type); 00237 } else if ( error == BB_ERR_HASH_MISMATCH ) { 00238 throw Exception("Hash mismatch for interface %s:%s", type, identifier); 00239 } else if ( error == BB_ERR_UNKNOWN_TYPE ) { 00240 throw Exception("Type %s unknown (%s::%s)", type, type, identifier); 00241 } else if ( error == BB_ERR_WRITER_EXISTS ) { 00242 throw BlackBoardWriterActiveException(identifier, type); 00243 } else { 00244 throw Exception("Could not open interface"); 00245 } 00246 } 00247 00248 __m->unref(); 00249 __m = NULL; 00250 } 00251 00252 Interface * 00253 RemoteBlackBoard::open_interface(const char *type, const char *identifier, bool writer) 00254 { 00255 if ( ! __fnc->connected() ) { 00256 throw Exception("Cannot instantiate remote interface, connection is dead"); 00257 } 00258 00259 Interface *iface = __instance_factory->new_interface_instance(type, identifier); 00260 try { 00261 open_interface(type, identifier, writer, iface); 00262 } catch (Exception &e) { 00263 __instance_factory->delete_interface_instance(iface); 00264 throw; 00265 } 00266 00267 return iface; 00268 } 00269 00270 00271 Interface * 00272 RemoteBlackBoard::open_for_reading(const char *type, const char *identifier) 00273 { 00274 return open_interface(type, identifier, /* writer? */ false); 00275 } 00276 00277 00278 Interface * 00279 RemoteBlackBoard::open_for_writing(const char *type, const char *identifier) 00280 { 00281 return open_interface(type, identifier, /* writer? */ true); 00282 } 00283 00284 00285 std::list<Interface *> 00286 RemoteBlackBoard::open_multiple_for_reading(const char *type_pattern, 00287 const char *id_pattern) 00288 { 00289 std::list<Interface *> rv; 00290 00291 InterfaceInfoList *infl = list_all(); 00292 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) { 00293 // ensure 0-termination 00294 char type[__INTERFACE_TYPE_SIZE + 1]; 00295 char id[__INTERFACE_ID_SIZE + 1]; 00296 type[__INTERFACE_TYPE_SIZE] = 0; 00297 id[__INTERFACE_TYPE_SIZE] = 0; 00298 strncpy(type, i->type(), __INTERFACE_TYPE_SIZE); 00299 strncpy(id, i->id(), __INTERFACE_ID_SIZE); 00300 00301 if ((fnmatch(type_pattern, type, 0) == FNM_NOMATCH) || 00302 (fnmatch(id_pattern, id, 0) == FNM_NOMATCH) ) { 00303 // type or ID prefix does not match, go on 00304 continue; 00305 } 00306 00307 try { 00308 Interface *iface = open_for_reading((*i).type(), (*i).id()); 00309 rv.push_back(iface); 00310 } catch (Exception &e) { 00311 for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) { 00312 close(*j); 00313 } 00314 throw; 00315 } 00316 } 00317 00318 return rv; 00319 } 00320 00321 00322 /** Close interface. 00323 * @param interface interface to close 00324 */ 00325 void 00326 RemoteBlackBoard::close(Interface *interface) 00327 { 00328 if ( interface == NULL ) return; 00329 00330 unsigned int serial = interface->serial(); 00331 00332 if ( __proxies.find(serial) != __proxies.end() ) { 00333 delete __proxies[serial]; 00334 __proxies.erase(serial); 00335 } 00336 00337 if ( __fnc->connected() ) { 00338 // We cannot "officially" close it, if we are disconnected it cannot be used anyway 00339 bb_iserial_msg_t *sm = (bb_iserial_msg_t *)calloc(1, sizeof(bb_iserial_msg_t)); 00340 sm->serial = htonl(interface->serial()); 00341 00342 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, 00343 MSG_BB_CLOSE, 00344 sm, sizeof(bb_iserial_msg_t)); 00345 __fnc->enqueue(omsg); 00346 } 00347 00348 __instance_factory->delete_interface_instance(interface); 00349 } 00350 00351 00352 InterfaceInfoList * 00353 RemoteBlackBoard::list_all() 00354 { 00355 __mutex->lock(); 00356 if (__inbound_thread != NULL && 00357 strcmp(Thread::current_thread()->name(), __inbound_thread) == 0) 00358 { 00359 throw Exception("Cannot call list_all() from inbound handler"); 00360 } 00361 __mutex->unlock(); 00362 00363 InterfaceInfoList *infl = new InterfaceInfoList(); 00364 00365 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, 00366 MSG_BB_LIST_ALL); 00367 __wait_mutex->lock(); 00368 __fnc->enqueue(omsg); 00369 while (! __m || 00370 (__m->msgid() != MSG_BB_INTERFACE_LIST)) { 00371 if ( __m ) { 00372 __m->unref(); 00373 __m = NULL; 00374 } 00375 __wait_cond->wait(); 00376 } 00377 __wait_mutex->unlock(); 00378 00379 BlackBoardInterfaceListContent *bbilc = __m->msgc<BlackBoardInterfaceListContent>(); 00380 while ( bbilc->has_next() ) { 00381 size_t iisize; 00382 bb_iinfo_msg_t *ii = bbilc->next(&iisize); 00383 infl->append(ii->type, ii->id, ii->hash, ii->serial, 00384 ii->has_writer, ii->num_readers); 00385 } 00386 00387 __m->unref(); 00388 __m = NULL; 00389 00390 return infl; 00391 } 00392 00393 00394 InterfaceInfoList * 00395 RemoteBlackBoard::list(const char *type_pattern, const char *id_pattern) 00396 { 00397 __mutex->lock(); 00398 if (__inbound_thread != NULL && 00399 strcmp(Thread::current_thread()->name(), __inbound_thread) == 0) 00400 { 00401 throw Exception("Cannot call list() from inbound handler"); 00402 } 00403 __mutex->unlock(); 00404 00405 InterfaceInfoList *infl = new InterfaceInfoList(); 00406 00407 bb_ilistreq_msg_t *om = 00408 (bb_ilistreq_msg_t *)calloc(1, sizeof(bb_ilistreq_msg_t)); 00409 strncpy(om->type_pattern, type_pattern, __INTERFACE_TYPE_SIZE); 00410 strncpy(om->id_pattern, id_pattern, __INTERFACE_ID_SIZE); 00411 00412 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, 00413 MSG_BB_LIST, 00414 om, 00415 sizeof(bb_ilistreq_msg_t)); 00416 00417 __wait_mutex->lock(); 00418 __fnc->enqueue(omsg); 00419 while (! __m || 00420 (__m->msgid() != MSG_BB_INTERFACE_LIST)) { 00421 if ( __m ) { 00422 __m->unref(); 00423 __m = NULL; 00424 } 00425 __wait_cond->wait(); 00426 } 00427 __wait_mutex->unlock(); 00428 00429 BlackBoardInterfaceListContent *bbilc = 00430 __m->msgc<BlackBoardInterfaceListContent>(); 00431 while ( bbilc->has_next() ) { 00432 size_t iisize; 00433 bb_iinfo_msg_t *ii = bbilc->next(&iisize); 00434 infl->append(ii->type, ii->id, ii->hash, ii->serial, 00435 ii->has_writer, ii->num_readers); 00436 } 00437 00438 __m->unref(); 00439 __m = NULL; 00440 00441 return infl; 00442 } 00443 00444 00445 /** We are no longer registered in Fawkes network client. 00446 * Ignored. 00447 * @param id the id of the calling client 00448 */ 00449 void 00450 RemoteBlackBoard::deregistered(unsigned int id) throw() 00451 { 00452 } 00453 00454 00455 void 00456 RemoteBlackBoard::inbound_received(FawkesNetworkMessage *m, 00457 unsigned int id) throw() 00458 { 00459 __mutex->lock(); 00460 __inbound_thread = Thread::current_thread()->name(); 00461 __mutex->unlock(); 00462 00463 if ( m->cid() == FAWKES_CID_BLACKBOARD ) { 00464 unsigned int msgid = m->msgid(); 00465 try { 00466 if ( msgid == MSG_BB_DATA_CHANGED ) { 00467 unsigned int serial = ntohl(((unsigned int *)m->payload())[0]); 00468 if ( __proxies.find(serial) != __proxies.end() ) { 00469 __proxies[serial]->process_data_changed(m); 00470 } 00471 } else if (msgid == MSG_BB_INTERFACE_MESSAGE) { 00472 unsigned int serial = ntohl(((unsigned int *)m->payload())[0]); 00473 if ( __proxies.find(serial) != __proxies.end() ) { 00474 __proxies[serial]->process_interface_message(m); 00475 } 00476 } else if (msgid == MSG_BB_READER_ADDED) { 00477 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>(); 00478 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) { 00479 __proxies[ntohl(esm->serial)]->reader_added(ntohl(esm->event_serial)); 00480 } 00481 } else if (msgid == MSG_BB_READER_REMOVED) { 00482 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>(); 00483 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) { 00484 __proxies[ntohl(esm->serial)]->reader_removed(ntohl(esm->event_serial)); 00485 } 00486 } else if (msgid == MSG_BB_WRITER_ADDED) { 00487 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>(); 00488 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) { 00489 __proxies[ntohl(esm->serial)]->writer_added(ntohl(esm->event_serial)); 00490 } 00491 } else if (msgid == MSG_BB_WRITER_REMOVED) { 00492 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>(); 00493 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) { 00494 __proxies[ntohl(esm->serial)]->writer_removed(ntohl(esm->event_serial)); 00495 } 00496 } else if (msgid == MSG_BB_INTERFACE_CREATED) { 00497 bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>(); 00498 __notifier->notify_of_interface_created(em->type, em->id); 00499 } else if (msgid == MSG_BB_INTERFACE_DESTROYED) { 00500 bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>(); 00501 __notifier->notify_of_interface_destroyed(em->type, em->id); 00502 } else { 00503 __wait_mutex->lock(); 00504 __m = m; 00505 __m->ref(); 00506 __wait_cond->wake_all(); 00507 __wait_mutex->unlock(); 00508 } 00509 } catch (Exception &e) { 00510 // Bam, you're dead. Ok, not now, we just ignore that this shit happened... 00511 } 00512 } 00513 00514 __mutex->lock(); 00515 __inbound_thread = NULL; 00516 __mutex->unlock(); 00517 } 00518 00519 00520 void 00521 RemoteBlackBoard::connection_died(unsigned int id) throw() 00522 { 00523 // mark all assigned interfaces as invalid 00524 __proxies.lock(); 00525 for (__pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) { 00526 __pit->second->interface()->set_validity(false); 00527 __invalid_proxies.push_back(__pit->second); 00528 } 00529 __proxies.clear(); 00530 __proxies.unlock(); 00531 __wait_cond->wake_all(); 00532 } 00533 00534 00535 void 00536 RemoteBlackBoard::connection_established(unsigned int id) throw() 00537 { 00538 } 00539 00540 } // end namespace fawkes