Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * notifier.cpp - BlackBoard notifier 00004 * 00005 * Created: Mon Mar 03 23:28:18 2008 00006 * Copyright 2006-2008 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <blackboard/internal/notifier.h> 00025 #include <blackboard/blackboard.h> 00026 #include <blackboard/interface_listener.h> 00027 #include <blackboard/interface_observer.h> 00028 00029 #include <core/threading/mutex.h> 00030 #include <core/threading/mutex_locker.h> 00031 #include <core/utils/lock_hashset.h> 00032 #include <core/utils/lock_hashmap.h> 00033 #include <logging/liblogger.h> 00034 #include <interface/interface.h> 00035 00036 #include <algorithm> 00037 #include <functional> 00038 #include <cstdlib> 00039 #include <cstring> 00040 #include <fnmatch.h> 00041 00042 namespace fawkes { 00043 00044 /** @class BlackBoardNotifier <blackboard/internal/notifier.h> 00045 * BlackBoard notifier. 00046 * This class is used by the BlackBoard to notify listeners and observers 00047 * of changes. 00048 * 00049 * @author Tim Niemueller 00050 */ 00051 00052 00053 /** Constructor. */ 00054 BlackBoardNotifier::BlackBoardNotifier() 00055 { 00056 __bbil_writer_events = 0; 00057 __bbil_writer_mutex = new Mutex(); 00058 00059 __bbil_reader_events = 0; 00060 __bbil_reader_mutex = new Mutex(); 00061 00062 __bbil_data_events = 0; 00063 __bbil_data_mutex = new Mutex(); 00064 00065 __bbil_messages_events = 0; 00066 __bbil_messages_mutex = new Mutex(); 00067 00068 __bbio_events = 0; 00069 __bbio_mutex = new Mutex(); 00070 } 00071 00072 00073 /** Destructor */ 00074 BlackBoardNotifier::~BlackBoardNotifier() 00075 { 00076 delete __bbil_writer_mutex; 00077 delete __bbil_reader_mutex; 00078 delete __bbil_data_mutex; 00079 delete __bbil_messages_mutex; 00080 00081 delete __bbio_mutex; 00082 } 00083 00084 /** Register BB event listener. 00085 * @param listener BlackBoard event listener to register 00086 * @param flag concatenation of flags denoting which queue entries should be 00087 * processed 00088 */ 00089 void 00090 BlackBoardNotifier::register_listener(BlackBoardInterfaceListener *listener, 00091 BlackBoard::ListenerRegisterFlag flag) 00092 { 00093 update_listener(listener, flag); 00094 } 00095 00096 00097 /** Update BB event listener. 00098 * @param listener BlackBoard event listener to update subscriptions of 00099 * @param flag concatenation of flags denoting which queue entries should be 00100 * processed 00101 */ 00102 void 00103 BlackBoardNotifier::update_listener(BlackBoardInterfaceListener *listener, 00104 BlackBoard::ListenerRegisterFlag flag) 00105 { 00106 const BlackBoardInterfaceListener::InterfaceQueue & queue = 00107 listener->bbil_acquire_queue(); 00108 00109 BlackBoardInterfaceListener::InterfaceQueue::const_iterator i = queue.begin(); 00110 00111 for (i = queue.begin(); i != queue.end(); ++i) { 00112 switch (i->type) { 00113 case BlackBoardInterfaceListener::DATA: 00114 if (flag & BlackBoard::BBIL_FLAG_DATA) { 00115 proc_listener_maybe_queue(i->op, i->interface, listener, 00116 __bbil_data_mutex, __bbil_data_events, 00117 __bbil_data, __bbil_data_queue, "data"); 00118 } 00119 break; 00120 case BlackBoardInterfaceListener::MESSAGES: 00121 if (flag & BlackBoard::BBIL_FLAG_MESSAGES) { 00122 proc_listener_maybe_queue(i->op, i->interface, listener, 00123 __bbil_messages_mutex, __bbil_messages_events, 00124 __bbil_messages, __bbil_messages_queue, 00125 "messages"); 00126 } 00127 break; 00128 case BlackBoardInterfaceListener::READER: 00129 if (flag & BlackBoard::BBIL_FLAG_READER) { 00130 proc_listener_maybe_queue(i->op, i->interface, listener, 00131 __bbil_reader_mutex, __bbil_reader_events, 00132 __bbil_reader, __bbil_reader_queue, "reader"); 00133 } 00134 break; 00135 case BlackBoardInterfaceListener::WRITER: 00136 if (flag & BlackBoard::BBIL_FLAG_WRITER) { 00137 proc_listener_maybe_queue(i->op, i->interface, listener, 00138 __bbil_writer_mutex, __bbil_writer_events, 00139 __bbil_writer, __bbil_writer_queue, "writer"); 00140 } 00141 break; 00142 default: break; 00143 } 00144 } 00145 00146 listener->bbil_release_queue(flag); 00147 } 00148 00149 void 00150 BlackBoardNotifier::proc_listener_maybe_queue(bool op, 00151 Interface *interface, 00152 BlackBoardInterfaceListener *listener, 00153 Mutex *mutex, unsigned int &events, 00154 BBilMap &map, BBilQueue &queue, 00155 const char *hint) 00156 { 00157 MutexLocker lock(mutex); 00158 if (events > 0) { 00159 LibLogger::log_warn("BlackBoardNotifier", "Registering interface " 00160 "listener %s for %s events (queued)", 00161 listener->bbil_name(), hint); 00162 00163 queue_listener(op, interface, listener, queue); 00164 } else { 00165 if (op) { // add 00166 add_listener(interface, listener, map); 00167 } else { 00168 remove_listener(interface, listener, map); 00169 } 00170 } 00171 } 00172 00173 00174 /** Unregister BB interface listener. 00175 * This will remove the given BlackBoard interface listener from any 00176 * event that it was previously registered for. 00177 * @param listener BlackBoard event listener to remove 00178 */ 00179 void 00180 BlackBoardNotifier::unregister_listener(BlackBoardInterfaceListener *listener) 00181 { 00182 const BlackBoardInterfaceListener::InterfaceMaps maps = 00183 listener->bbil_acquire_maps(); 00184 00185 BlackBoardInterfaceListener::InterfaceMap::const_iterator i; 00186 for (i = maps.data.begin(); i != maps.data.end(); ++i) { 00187 proc_listener_maybe_queue(false, i->second, listener, 00188 __bbil_data_mutex, __bbil_data_events, 00189 __bbil_data, __bbil_data_queue, "data"); 00190 } 00191 00192 for (i = maps.messages.begin(); i != maps.messages.end(); ++i) { 00193 proc_listener_maybe_queue(false, i->second, listener, 00194 __bbil_messages_mutex, __bbil_messages_events, 00195 __bbil_messages, __bbil_messages_queue, 00196 "messages"); 00197 } 00198 00199 for (i = maps.reader.begin(); i != maps.reader.end(); ++i) { 00200 proc_listener_maybe_queue(false, i->second, listener, 00201 __bbil_reader_mutex, __bbil_reader_events, 00202 __bbil_reader, __bbil_reader_queue, "reader"); 00203 } 00204 00205 for (i = maps.writer.begin(); i != maps.writer.end(); ++i) { 00206 proc_listener_maybe_queue(false, i->second, listener, 00207 __bbil_writer_mutex, __bbil_writer_events, 00208 __bbil_writer, __bbil_writer_queue, "writer"); 00209 } 00210 00211 listener->bbil_release_maps(); 00212 } 00213 00214 /** Add listener for specified map. 00215 * @param listener interface listener for events 00216 * @param im map of interfaces to listen for 00217 * @param ilmap internal map to add listener to 00218 */ 00219 void 00220 BlackBoardNotifier::add_listener(Interface *interface, 00221 BlackBoardInterfaceListener *listener, 00222 BBilMap &ilmap) 00223 { 00224 std::pair<BBilMap::iterator, BBilMap::iterator> ret = 00225 ilmap.equal_range(interface->uid()); 00226 00227 BBilMap::value_type v = std::make_pair(interface->uid(), listener); 00228 BBilMap::iterator f = std::find(ret.first, ret.second, v); 00229 00230 if (f == ret.second) { 00231 ilmap.insert(std::make_pair(interface->uid(), listener)); 00232 } 00233 } 00234 00235 void 00236 BlackBoardNotifier::remove_listener(Interface *interface, 00237 BlackBoardInterfaceListener *listener, 00238 BBilMap &ilmap) 00239 { 00240 std::pair<BBilMap::iterator, BBilMap::iterator> ret = 00241 ilmap.equal_range(interface->uid()); 00242 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) { 00243 if (j->second == listener) { 00244 ilmap.erase(j); 00245 break; 00246 } 00247 } 00248 } 00249 00250 00251 bool 00252 BlackBoardNotifier::is_in_queue(bool op, BBilQueue &queue, const char *uid, 00253 BlackBoardInterfaceListener *bbil) 00254 { 00255 BBilQueue::iterator q; 00256 for (q = queue.begin(); q != queue.end(); ++q) { 00257 if ((q->op == op) && (q->uid == uid) && (q->listener == bbil)) { 00258 return true; 00259 } 00260 } 00261 return false; 00262 } 00263 00264 void 00265 BlackBoardNotifier::queue_listener(bool op, Interface *interface, 00266 BlackBoardInterfaceListener *listener, 00267 BBilQueue &queue) 00268 { 00269 BBilQueueEntry qe = { op, interface->uid(), interface, listener }; 00270 queue.push_back(qe); 00271 } 00272 00273 00274 00275 /** Register BB interface observer. 00276 * @param observer BlackBoard interface observer to register 00277 */ 00278 void 00279 BlackBoardNotifier::register_observer(BlackBoardInterfaceObserver *observer) 00280 { 00281 __bbio_mutex->lock(); 00282 if (__bbio_events > 0) { 00283 __bbio_queue.push_back(std::make_pair(1, observer)); 00284 } else { 00285 add_observer(observer, observer->bbio_get_observed_create(), __bbio_created); 00286 add_observer(observer, observer->bbio_get_observed_destroy(), __bbio_destroyed); 00287 } 00288 __bbio_mutex->unlock(); 00289 } 00290 00291 00292 void 00293 BlackBoardNotifier::add_observer(BlackBoardInterfaceObserver *observer, 00294 BlackBoardInterfaceObserver::ObservedInterfaceLockMap *its, 00295 BBioMap &bbiomap) 00296 { 00297 BlackBoardInterfaceObserver::ObservedInterfaceLockMapIterator i; 00298 its->lock(); 00299 for (i = its->begin(); i != its->end(); ++i) { 00300 bbiomap[i->first].push_back(make_pair(observer, i->second)); 00301 } 00302 its->unlock(); 00303 } 00304 00305 00306 /** Remove observer from map. 00307 * @param iomap interface observer map to remove the observer from 00308 * @param observer observer to remove 00309 */ 00310 void 00311 BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer) 00312 { 00313 BBioMapIterator i, tmp; 00314 00315 i = iomap.begin(); 00316 while (i != iomap.end()) { 00317 BBioListIterator j = i->second.begin(); 00318 while (j != i->second.end()) { 00319 if ( j->first == observer ) { 00320 j = i->second.erase(j); 00321 } else { 00322 ++j; 00323 } 00324 } 00325 if ( i->second.empty() ) { 00326 tmp = i; 00327 ++i; 00328 iomap.erase(tmp); 00329 } else { 00330 ++i; 00331 } 00332 } 00333 } 00334 00335 /** Unregister BB interface observer. 00336 * This will remove the given BlackBoard event listener from any event that it was 00337 * previously registered for. 00338 * @param observer BlackBoard event listener to remove 00339 */ 00340 void 00341 BlackBoardNotifier::unregister_observer(BlackBoardInterfaceObserver *observer) 00342 { 00343 MutexLocker lock(__bbio_mutex); 00344 if ( __bbio_events > 0) { 00345 BBioQueueEntry e = std::make_pair((unsigned int)0, observer); 00346 BBioQueue::iterator re; 00347 while ( (re = find_if(__bbio_queue.begin(), __bbio_queue.end(), 00348 bind2nd(std::not_equal_to<BBioQueueEntry>(), e))) 00349 != __bbio_queue.end()) { 00350 // if there is an entry in the register queue, remove it! 00351 if (re->second == observer) { 00352 __bbio_queue.erase(re); 00353 } 00354 } 00355 __bbio_queue.push_back(std::make_pair(0, observer)); 00356 00357 } else { 00358 remove_observer(__bbio_created, observer); 00359 remove_observer(__bbio_destroyed, observer); 00360 } 00361 } 00362 00363 /** Notify that an interface has been created. 00364 * @param type type of the interface 00365 * @param id ID of the interface 00366 */ 00367 void 00368 BlackBoardNotifier::notify_of_interface_created(const char *type, const char *id) throw() 00369 { 00370 __bbio_mutex->lock(); 00371 __bbio_events += 1; 00372 __bbio_mutex->unlock(); 00373 00374 BBioMapIterator lhmi; 00375 BBioListIterator i, l; 00376 for (lhmi = __bbio_created.begin(); lhmi != __bbio_created.end(); ++lhmi) { 00377 if (fnmatch(lhmi->first.c_str(), type, 0) != 0) continue; 00378 00379 BBioList &list = lhmi->second; 00380 for (i = list.begin(); i != list.end(); ++i) { 00381 BlackBoardInterfaceObserver *bbio = i->first; 00382 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) { 00383 if (fnmatch(pi->c_str(), id, 0) == 0) { 00384 bbio->bb_interface_created(type, id); 00385 break; 00386 } 00387 } 00388 } 00389 } 00390 00391 __bbio_mutex->lock(); 00392 __bbio_events -= 1; 00393 process_bbio_queue(); 00394 __bbio_mutex->unlock(); 00395 } 00396 00397 00398 /** Notify that an interface has been destroyed. 00399 * @param type type of the interface 00400 * @param id ID of the interface 00401 */ 00402 void 00403 BlackBoardNotifier::notify_of_interface_destroyed(const char *type, const char *id) throw() 00404 { 00405 __bbio_mutex->lock(); 00406 __bbio_events += 1; 00407 __bbio_mutex->unlock(); 00408 00409 BBioMapIterator lhmi; 00410 BBioListIterator i, l; 00411 for (lhmi = __bbio_destroyed.begin(); lhmi != __bbio_destroyed.end(); ++lhmi) { 00412 if (fnmatch(lhmi->first.c_str(), type, 0) != 0) continue; 00413 00414 BBioList &list = (*lhmi).second; 00415 for (i = list.begin(); i != list.end(); ++i) { 00416 BlackBoardInterfaceObserver *bbio = i->first; 00417 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) { 00418 if (fnmatch(pi->c_str(), id, 0) == 0) { 00419 bbio->bb_interface_destroyed(type, id); 00420 break; 00421 } 00422 } 00423 } 00424 } 00425 00426 __bbio_mutex->lock(); 00427 __bbio_events -= 1; 00428 process_bbio_queue(); 00429 __bbio_mutex->unlock(); 00430 } 00431 00432 00433 void 00434 BlackBoardNotifier::process_bbio_queue() 00435 { 00436 if ( ! __bbio_queue.empty() ) { 00437 if (__bbio_events > 0 ) { 00438 return; 00439 } else { 00440 while (! __bbio_queue.empty()) { 00441 BBioQueueEntry &e = __bbio_queue.front(); 00442 if (e.first) { // register 00443 add_observer(e.second, e.second->bbio_get_observed_create(), __bbio_created); 00444 add_observer(e.second, e.second->bbio_get_observed_destroy(), __bbio_destroyed); 00445 } else { // unregister 00446 remove_observer(__bbio_created, e.second); 00447 remove_observer(__bbio_destroyed, e.second); 00448 } 00449 __bbio_queue.pop_front(); 00450 } 00451 } 00452 } 00453 } 00454 00455 00456 /** Notify that writer has been added. 00457 * @param interface the interface for which the event happened. It is not necessarily the 00458 * instance which caused the event, but it must have the same mem serial. 00459 * @param event_instance_serial the instance serial of the interface that caused the event 00460 * @see BlackBoardInterfaceListener::bb_interface_writer_added() 00461 */ 00462 void 00463 BlackBoardNotifier::notify_of_writer_added(const Interface *interface, 00464 unsigned int event_instance_serial) throw() 00465 { 00466 __bbil_writer_mutex->lock(); 00467 __bbil_writer_events += 1; 00468 __bbil_writer_mutex->unlock(); 00469 00470 const char *uid = interface->uid(); 00471 std::pair<BBilMap::iterator, BBilMap::iterator> ret = 00472 __bbil_writer.equal_range(uid); 00473 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) { 00474 BlackBoardInterfaceListener *bbil = j->second; 00475 if (! is_in_queue(/* remove op*/ false, __bbil_writer_queue, uid, bbil)) { 00476 Interface *bbil_iface = bbil->bbil_writer_interface(uid); 00477 if (bbil_iface != NULL ) { 00478 bbil->bb_interface_writer_added(bbil_iface, event_instance_serial); 00479 } else { 00480 LibLogger::log_warn("BlackBoardNotifier", 00481 "BBIL[%s] registered for writer events " 00482 "(open) for '%s' but has no such interface", 00483 bbil->bbil_name(), uid); 00484 } 00485 } 00486 } 00487 00488 __bbil_writer_mutex->lock(); 00489 __bbil_writer_events -= 1; 00490 process_writer_queue(); 00491 __bbil_writer_mutex->unlock(); 00492 } 00493 00494 00495 /** Notify that writer has been removed. 00496 * @param interface interface for which the writer has been removed 00497 * @param event_instance_serial instance serial of the interface that caused the event 00498 * @see BlackBoardInterfaceListener::bb_interface_writer_removed() 00499 */ 00500 void 00501 BlackBoardNotifier::notify_of_writer_removed(const Interface *interface, 00502 unsigned int event_instance_serial) throw() 00503 { 00504 __bbil_writer_mutex->lock(); 00505 __bbil_writer_events += 1; 00506 __bbil_writer_mutex->unlock(); 00507 00508 const char *uid = interface->uid(); 00509 std::pair<BBilMap::iterator, BBilMap::iterator> ret = 00510 __bbil_writer.equal_range(uid); 00511 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) { 00512 BlackBoardInterfaceListener *bbil = j->second; 00513 if (! is_in_queue(/* remove op*/ false, __bbil_data_queue, uid, bbil)) { 00514 Interface *bbil_iface = bbil->bbil_writer_interface(uid); 00515 if (bbil_iface != NULL ) { 00516 bbil->bb_interface_writer_removed(bbil_iface, event_instance_serial); 00517 } else { 00518 LibLogger::log_warn("BlackBoardNotifier", 00519 "BBIL[%s] registered for writer events " 00520 "(close) for '%s' but has no such interface", 00521 bbil->bbil_name(), uid); 00522 } 00523 } 00524 } 00525 00526 __bbil_writer_mutex->lock(); 00527 __bbil_writer_events -= 1; 00528 process_writer_queue(); 00529 __bbil_writer_mutex->unlock(); 00530 } 00531 00532 void 00533 BlackBoardNotifier::process_writer_queue() 00534 { 00535 if ( ! __bbil_writer_queue.empty() ) { 00536 if (__bbil_writer_events > 0 ) { 00537 return; 00538 } else { 00539 while (! __bbil_writer_queue.empty()) { 00540 BBilQueueEntry &e = __bbil_writer_queue.front(); 00541 if (e.op) { // register 00542 add_listener(e.interface, e.listener, __bbil_writer); 00543 } else { // unregister 00544 remove_listener(e.interface, e.listener, __bbil_writer); 00545 } 00546 __bbil_writer_queue.pop_front(); 00547 } 00548 } 00549 } 00550 } 00551 00552 00553 /** Notify that reader has been added. 00554 * @param interface interface for which the reader has been added 00555 * @param event_instance_serial instance serial of the interface that caused the event 00556 * @see BlackBoardInterfaceListener::bb_interface_reader_added() 00557 */ 00558 void 00559 BlackBoardNotifier::notify_of_reader_added(const Interface *interface, 00560 unsigned int event_instance_serial) throw() 00561 { 00562 __bbil_reader_mutex->lock(); 00563 __bbil_reader_events += 1; 00564 __bbil_reader_mutex->unlock(); 00565 00566 const char *uid = interface->uid(); 00567 std::pair<BBilMap::iterator, BBilMap::iterator> ret = 00568 __bbil_reader.equal_range(uid); 00569 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) { 00570 BlackBoardInterfaceListener *bbil = j->second; 00571 if (! is_in_queue(/* remove op*/ false, __bbil_reader_queue, uid, bbil)) { 00572 Interface *bbil_iface = bbil->bbil_reader_interface(uid); 00573 if (bbil_iface != NULL ) { 00574 bbil->bb_interface_reader_added(bbil_iface, event_instance_serial); 00575 } else { 00576 LibLogger::log_warn("BlackBoardNotifier", 00577 "BBIL[%s] registered for reader events " 00578 "(open) for '%s' but has no such interface", 00579 bbil->bbil_name(), uid); 00580 } 00581 } 00582 } 00583 00584 __bbil_reader_mutex->lock(); 00585 __bbil_reader_events -= 1; 00586 process_reader_queue(); 00587 __bbil_reader_mutex->unlock(); 00588 } 00589 00590 00591 /** Notify that reader has been removed. 00592 * @param interface interface for which the reader has been removed 00593 * @param event_instance_serial instance serial of the interface that caused the event 00594 * @see BlackBoardInterfaceListener::bb_interface_reader_removed() 00595 */ 00596 void 00597 BlackBoardNotifier::notify_of_reader_removed(const Interface *interface, 00598 unsigned int event_instance_serial) throw() 00599 { 00600 __bbil_reader_mutex->lock(); 00601 __bbil_reader_events += 1; 00602 __bbil_reader_mutex->unlock(); 00603 00604 const char *uid = interface->uid(); 00605 std::pair<BBilMap::iterator, BBilMap::iterator> ret = 00606 __bbil_reader.equal_range(uid); 00607 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) { 00608 BlackBoardInterfaceListener *bbil = j->second; 00609 if (! is_in_queue(/* remove op*/ false, __bbil_data_queue, uid, bbil)) { 00610 Interface *bbil_iface = bbil->bbil_reader_interface(uid); 00611 if (bbil_iface != NULL ) { 00612 bbil->bb_interface_reader_removed(bbil_iface, event_instance_serial); 00613 } else { 00614 LibLogger::log_warn("BlackBoardNotifier", 00615 "BBIL[%s] registered for reader events " 00616 "(close) for '%s' but has no such interface", 00617 bbil->bbil_name(), uid); 00618 } 00619 } 00620 } 00621 00622 __bbil_reader_mutex->lock(); 00623 __bbil_reader_events -= 1; 00624 process_reader_queue(); 00625 __bbil_reader_mutex->unlock(); 00626 } 00627 00628 00629 void 00630 BlackBoardNotifier::process_reader_queue() 00631 { 00632 if ( ! __bbil_reader_queue.empty() ) { 00633 if (__bbil_reader_events > 0 ) { 00634 return; 00635 } else { 00636 while (! __bbil_reader_queue.empty()) { 00637 BBilQueueEntry &e = __bbil_reader_queue.front(); 00638 if (e.op) { // register 00639 add_listener(e.interface, e.listener, __bbil_reader); 00640 } else { // unregister 00641 remove_listener(e.interface, e.listener, __bbil_reader); 00642 } 00643 __bbil_reader_queue.pop_front(); 00644 } 00645 } 00646 } 00647 } 00648 00649 00650 /** Notify of data change. 00651 * Notify all subscribers of the given interface of a data change. 00652 * This also influences logging and sending data over the network so it is 00653 * mandatory to call this function! The interface base class write method does 00654 * that for you. 00655 * @param interface interface whose subscribers to notify 00656 * @see Interface::write() 00657 * @see BlackBoardInterfaceListener::bb_interface_data_changed() 00658 */ 00659 void 00660 BlackBoardNotifier::notify_of_data_change(const Interface *interface) 00661 { 00662 __bbil_data_mutex->lock(); 00663 __bbil_data_events += 1; 00664 __bbil_data_mutex->unlock(); 00665 00666 const char *uid = interface->uid(); 00667 std::pair<BBilMap::iterator, BBilMap::iterator> ret = 00668 __bbil_data.equal_range(uid); 00669 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) { 00670 BlackBoardInterfaceListener *bbil = j->second; 00671 if (! is_in_queue(/* remove op*/ false, __bbil_data_queue, uid, bbil)) { 00672 Interface *bbil_iface = bbil->bbil_data_interface(uid); 00673 if (bbil_iface != NULL ) { 00674 bbil->bb_interface_data_changed(bbil_iface); 00675 } else { 00676 LibLogger::log_warn("BlackBoardNotifier", 00677 "BBIL[%s] registered for data change events " 00678 "for '%s' but has no such interface", 00679 bbil->bbil_name(), uid); 00680 } 00681 } 00682 } 00683 00684 __bbil_data_mutex->lock(); 00685 __bbil_data_events -= 1; 00686 if ( ! __bbil_data_queue.empty() ) { 00687 if (__bbil_data_events == 0 ) { 00688 while (! __bbil_data_queue.empty()) { 00689 BBilQueueEntry &e = __bbil_data_queue.front(); 00690 if (e.op) { // register 00691 add_listener(e.interface, e.listener, __bbil_data); 00692 } else { // unregister 00693 remove_listener(e.interface, e.listener, __bbil_data); 00694 } 00695 __bbil_data_queue.pop_front(); 00696 } 00697 } 00698 } 00699 __bbil_data_mutex->unlock(); 00700 } 00701 00702 00703 /** Notify of message received 00704 * Notify all subscribers of the given interface of an incoming message 00705 * This also influences logging and sending data over the network so it is 00706 * mandatory to call this function! The interface base class write method does 00707 * that for you. 00708 * @param interface interface whose subscribers to notify 00709 * @param message message which is being received 00710 * @return false if any listener returned false, true otherwise 00711 * @see BlackBoardInterfaceListener::bb_interface_message_received() 00712 */ 00713 bool 00714 BlackBoardNotifier::notify_of_message_received(const Interface *interface, Message *message) 00715 { 00716 __bbil_messages_mutex->lock(); 00717 __bbil_messages_events += 1; 00718 __bbil_messages_mutex->unlock(); 00719 00720 bool done = true; 00721 00722 const char *uid = interface->uid(); 00723 std::pair<BBilMap::iterator, BBilMap::iterator> ret = 00724 __bbil_messages.equal_range(uid); 00725 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) { 00726 BlackBoardInterfaceListener *bbil = j->second; 00727 if (! is_in_queue(/* remove op*/ false, __bbil_messages_queue, uid, bbil)) { 00728 Interface *bbil_iface = bbil->bbil_message_interface(uid); 00729 if (bbil_iface != NULL ) { 00730 bool abort = bbil->bb_interface_message_received(bbil_iface, message); 00731 if (abort) { 00732 done = true; 00733 break; 00734 } 00735 } else { 00736 LibLogger::log_warn("BlackBoardNotifier", 00737 "BBIL[%s] registered for message events " 00738 "for '%s' but has no such interface", 00739 bbil->bbil_name(), uid); 00740 } 00741 } 00742 } 00743 00744 __bbil_messages_mutex->lock(); 00745 __bbil_messages_events -= 1; 00746 if ( ! __bbil_messages_queue.empty() ) { 00747 if (__bbil_messages_events == 0 ) { 00748 while (! __bbil_messages_queue.empty()) { 00749 BBilQueueEntry &e = __bbil_messages_queue.front(); 00750 if (e.op) { // register 00751 add_listener(e.interface, e.listener, __bbil_messages); 00752 } else { // unregister 00753 remove_listener(e.interface, e.listener, __bbil_messages); 00754 } 00755 __bbil_messages_queue.pop_front(); 00756 } 00757 } 00758 } 00759 __bbil_messages_mutex->unlock(); 00760 00761 return done; 00762 } 00763 00764 } // end namespace fawkes