Fawkes API  Fawkes Development Version
notifier.cpp
00001  
00002 /***************************************************************************
00003  *  notifier.cpp - BlackBoard notifier
00004  *
00005  *  Created: Mon Mar 03 23:28:18 2008
00006  *  Copyright  2006-2008  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <blackboard/internal/notifier.h>
00025 #include <blackboard/blackboard.h>
00026 #include <blackboard/interface_listener.h>
00027 #include <blackboard/interface_observer.h>
00028 
00029 #include <core/threading/mutex.h>
00030 #include <core/threading/mutex_locker.h>
00031 #include <core/utils/lock_hashset.h>
00032 #include <core/utils/lock_hashmap.h>
00033 #include <logging/liblogger.h>
00034 #include <interface/interface.h>
00035 
00036 #include <algorithm>
00037 #include <functional>
00038 #include <cstdlib>
00039 #include <cstring>
00040 #include <fnmatch.h>
00041 
00042 namespace fawkes {
00043 
00044 /** @class BlackBoardNotifier <blackboard/internal/notifier.h>
00045  * BlackBoard notifier.
00046  * This class is used by the BlackBoard to notify listeners and observers
00047  * of changes. 
00048  *
00049  * @author Tim Niemueller
00050  */
00051 
00052 
00053 /** Constructor. */
00054 BlackBoardNotifier::BlackBoardNotifier()
00055 {
00056   __bbil_writer_events     = 0;
00057   __bbil_writer_mutex      = new Mutex();
00058 
00059   __bbil_reader_events     = 0;
00060   __bbil_reader_mutex      = new Mutex();
00061 
00062   __bbil_data_events       = 0;
00063   __bbil_data_mutex        = new Mutex();
00064 
00065   __bbil_messages_events   = 0;
00066   __bbil_messages_mutex    = new Mutex();
00067 
00068   __bbio_events            = 0;
00069   __bbio_mutex             = new Mutex();
00070 }
00071 
00072 
00073 /** Destructor */
00074 BlackBoardNotifier::~BlackBoardNotifier()
00075 {
00076   delete __bbil_writer_mutex;
00077   delete __bbil_reader_mutex;
00078   delete __bbil_data_mutex;
00079   delete __bbil_messages_mutex;
00080 
00081   delete __bbio_mutex;
00082 }
00083 
00084 /** Register BB event listener.
00085  * @param listener BlackBoard event listener to register
00086  * @param flag concatenation of flags denoting which queue entries should be
00087  * processed
00088  */
00089 void
00090 BlackBoardNotifier::register_listener(BlackBoardInterfaceListener *listener,
00091                                       BlackBoard::ListenerRegisterFlag flag)
00092 {
00093   update_listener(listener, flag);
00094 }
00095 
00096 
00097 /** Update BB event listener.
00098  * @param listener BlackBoard event listener to update subscriptions of
00099  * @param flag concatenation of flags denoting which queue entries should be
00100  * processed
00101  */
00102 void
00103 BlackBoardNotifier::update_listener(BlackBoardInterfaceListener *listener,
00104                                     BlackBoard::ListenerRegisterFlag flag)
00105 {
00106   const BlackBoardInterfaceListener::InterfaceQueue & queue =
00107     listener->bbil_acquire_queue();
00108 
00109   BlackBoardInterfaceListener::InterfaceQueue::const_iterator i = queue.begin();
00110 
00111   for (i = queue.begin(); i != queue.end(); ++i) {
00112     switch (i->type) {
00113     case BlackBoardInterfaceListener::DATA:
00114       if (flag & BlackBoard::BBIL_FLAG_DATA) {
00115         proc_listener_maybe_queue(i->op, i->interface, listener,
00116                                   __bbil_data_mutex, __bbil_data_events,
00117                                   __bbil_data, __bbil_data_queue, "data");
00118       }
00119       break;
00120     case BlackBoardInterfaceListener::MESSAGES:
00121       if (flag & BlackBoard::BBIL_FLAG_MESSAGES) {
00122         proc_listener_maybe_queue(i->op, i->interface, listener,
00123                                   __bbil_messages_mutex, __bbil_messages_events,
00124                                   __bbil_messages, __bbil_messages_queue,
00125                                   "messages");
00126       }
00127       break;
00128     case BlackBoardInterfaceListener::READER:
00129       if (flag & BlackBoard::BBIL_FLAG_READER) {
00130       proc_listener_maybe_queue(i->op, i->interface, listener,
00131                                 __bbil_reader_mutex, __bbil_reader_events,
00132                                 __bbil_reader, __bbil_reader_queue, "reader");
00133       }
00134       break;
00135     case BlackBoardInterfaceListener::WRITER:
00136       if (flag & BlackBoard::BBIL_FLAG_WRITER) {
00137       proc_listener_maybe_queue(i->op, i->interface, listener,
00138                                 __bbil_writer_mutex, __bbil_writer_events,
00139                                 __bbil_writer, __bbil_writer_queue, "writer");
00140       }
00141       break;
00142     default: break;
00143     }
00144   }
00145 
00146   listener->bbil_release_queue(flag);
00147 }
00148 
00149 void
00150 BlackBoardNotifier::proc_listener_maybe_queue(bool op,
00151                                               Interface *interface,
00152                                               BlackBoardInterfaceListener *listener,
00153                                               Mutex *mutex, unsigned int &events,
00154                                               BBilMap &map, BBilQueue &queue,
00155                                               const char *hint)
00156 {
00157   MutexLocker lock(mutex);
00158   if (events > 0) {
00159     LibLogger::log_warn("BlackBoardNotifier", "Registering interface "
00160                         "listener %s for %s events (queued)",
00161                         listener->bbil_name(), hint);
00162 
00163     queue_listener(op, interface, listener, queue);
00164   } else {
00165     if (op) { // add
00166       add_listener(interface, listener, map);
00167     } else {
00168       remove_listener(interface, listener, map);
00169     }
00170   }
00171 }
00172 
00173 
00174 /** Unregister BB interface listener.
00175  * This will remove the given BlackBoard interface listener from any
00176  * event that it was previously registered for.
00177  * @param listener BlackBoard event listener to remove
00178  */
00179 void
00180 BlackBoardNotifier::unregister_listener(BlackBoardInterfaceListener *listener)
00181 {
00182   const BlackBoardInterfaceListener::InterfaceMaps maps =
00183     listener->bbil_acquire_maps();
00184 
00185   BlackBoardInterfaceListener::InterfaceMap::const_iterator i;
00186   for (i = maps.data.begin(); i != maps.data.end(); ++i) {
00187     proc_listener_maybe_queue(false, i->second, listener,
00188                               __bbil_data_mutex, __bbil_data_events,
00189                               __bbil_data, __bbil_data_queue, "data");
00190   }
00191 
00192   for (i = maps.messages.begin(); i != maps.messages.end(); ++i) {
00193     proc_listener_maybe_queue(false, i->second, listener,
00194                               __bbil_messages_mutex, __bbil_messages_events,
00195                               __bbil_messages, __bbil_messages_queue,
00196                               "messages");
00197   }
00198 
00199   for (i = maps.reader.begin(); i != maps.reader.end(); ++i) {
00200     proc_listener_maybe_queue(false, i->second, listener,
00201                               __bbil_reader_mutex, __bbil_reader_events,
00202                               __bbil_reader, __bbil_reader_queue, "reader");
00203   }
00204 
00205   for (i = maps.writer.begin(); i != maps.writer.end(); ++i) {
00206     proc_listener_maybe_queue(false, i->second, listener,
00207                               __bbil_writer_mutex, __bbil_writer_events,
00208                               __bbil_writer, __bbil_writer_queue, "writer");
00209   }
00210 
00211   listener->bbil_release_maps();
00212 }
00213 
00214 /** Add listener for specified map.
00215  * @param listener interface listener for events
00216  * @param im map of interfaces to listen for
00217  * @param ilmap internal map to add listener to
00218  */
00219 void
00220 BlackBoardNotifier::add_listener(Interface *interface,
00221                                  BlackBoardInterfaceListener *listener,
00222                                  BBilMap &ilmap)
00223 {
00224   std::pair<BBilMap::iterator, BBilMap::iterator> ret =
00225     ilmap.equal_range(interface->uid());
00226 
00227   BBilMap::value_type v = std::make_pair(interface->uid(), listener);
00228   BBilMap::iterator f = std::find(ret.first, ret.second, v);
00229 
00230   if (f == ret.second) {
00231     ilmap.insert(std::make_pair(interface->uid(), listener));
00232   }
00233 }
00234 
00235 void
00236 BlackBoardNotifier::remove_listener(Interface *interface,
00237                                     BlackBoardInterfaceListener *listener,
00238                                     BBilMap &ilmap)
00239 {
00240   std::pair<BBilMap::iterator, BBilMap::iterator> ret =
00241     ilmap.equal_range(interface->uid());
00242   for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
00243     if (j->second == listener) {
00244       ilmap.erase(j);
00245       break;
00246     }
00247   }
00248 }
00249 
00250 
00251 bool
00252 BlackBoardNotifier::is_in_queue(bool op, BBilQueue &queue, const char *uid,
00253                                 BlackBoardInterfaceListener *bbil)
00254 {
00255   BBilQueue::iterator q;
00256   for (q = queue.begin(); q != queue.end(); ++q) {
00257     if ((q->op == op) && (q->uid == uid) && (q->listener == bbil)) {
00258       return true;
00259     }
00260   }
00261   return false;
00262 }
00263 
00264 void
00265 BlackBoardNotifier::queue_listener(bool op, Interface *interface,
00266                                    BlackBoardInterfaceListener *listener,
00267                                    BBilQueue &queue)
00268 {
00269   BBilQueueEntry qe = { op, interface->uid(), interface, listener };
00270   queue.push_back(qe);
00271 }
00272 
00273 
00274 
00275 /** Register BB interface observer.
00276  * @param observer BlackBoard interface observer to register
00277  */
00278 void
00279 BlackBoardNotifier::register_observer(BlackBoardInterfaceObserver *observer)
00280 {
00281   __bbio_mutex->lock();
00282   if (__bbio_events > 0) {
00283     __bbio_queue.push_back(std::make_pair(1, observer));
00284   } else {
00285     add_observer(observer, observer->bbio_get_observed_create(), __bbio_created);
00286     add_observer(observer, observer->bbio_get_observed_destroy(), __bbio_destroyed);
00287   }
00288   __bbio_mutex->unlock();
00289 }
00290 
00291 
00292 void
00293 BlackBoardNotifier::add_observer(BlackBoardInterfaceObserver *observer,
00294                                  BlackBoardInterfaceObserver::ObservedInterfaceLockMap *its,
00295                                  BBioMap &bbiomap)
00296 {
00297   BlackBoardInterfaceObserver::ObservedInterfaceLockMapIterator i;
00298   its->lock();
00299   for (i = its->begin(); i != its->end(); ++i) {
00300     bbiomap[i->first].push_back(make_pair(observer, i->second));
00301   }
00302   its->unlock();
00303 }
00304 
00305 
00306 /** Remove observer from map.
00307  * @param iomap interface observer map to remove the observer from
00308  * @param observer observer to remove
00309  */
00310 void
00311 BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer)
00312 {
00313   BBioMapIterator i, tmp;
00314 
00315   i = iomap.begin();
00316   while (i != iomap.end()) {
00317     BBioListIterator j = i->second.begin();
00318     while (j != i->second.end()) {
00319       if ( j->first == observer ) {
00320         j = i->second.erase(j);
00321       } else {
00322         ++j;
00323       }
00324     }
00325     if ( i->second.empty() ) {
00326       tmp = i;
00327       ++i;
00328       iomap.erase(tmp);
00329     } else {
00330       ++i;
00331     }
00332   }
00333 }
00334 
00335 /** Unregister BB interface observer.
00336  * This will remove the given BlackBoard event listener from any event that it was
00337  * previously registered for.
00338  * @param observer BlackBoard event listener to remove
00339  */
00340 void
00341 BlackBoardNotifier::unregister_observer(BlackBoardInterfaceObserver *observer)
00342 {
00343   MutexLocker lock(__bbio_mutex);
00344   if ( __bbio_events > 0) {
00345     BBioQueueEntry e = std::make_pair((unsigned int)0, observer);
00346     BBioQueue::iterator re;
00347     while ( (re = find_if(__bbio_queue.begin(), __bbio_queue.end(),
00348                           bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
00349             != __bbio_queue.end()) {
00350       // if there is an entry in the register queue, remove it!
00351       if (re->second == observer) {
00352         __bbio_queue.erase(re);
00353       }
00354     }
00355     __bbio_queue.push_back(std::make_pair(0, observer));
00356 
00357   } else {
00358     remove_observer(__bbio_created, observer);
00359     remove_observer(__bbio_destroyed, observer);
00360   }
00361 }
00362 
00363 /** Notify that an interface has been created.
00364  * @param type type of the interface
00365  * @param id ID of the interface
00366  */
00367 void
00368 BlackBoardNotifier::notify_of_interface_created(const char *type, const char *id) throw()
00369 {
00370   __bbio_mutex->lock();
00371   __bbio_events += 1;
00372   __bbio_mutex->unlock();
00373 
00374   BBioMapIterator lhmi;
00375   BBioListIterator i, l;
00376   for (lhmi = __bbio_created.begin(); lhmi != __bbio_created.end(); ++lhmi) {
00377     if (fnmatch(lhmi->first.c_str(), type, 0) != 0) continue;
00378 
00379     BBioList &list = lhmi->second;
00380     for (i = list.begin(); i != list.end(); ++i) {
00381       BlackBoardInterfaceObserver *bbio = i->first;
00382       for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
00383         if (fnmatch(pi->c_str(), id, 0) == 0) {
00384           bbio->bb_interface_created(type, id);
00385           break;
00386         }
00387       }
00388     }
00389   }
00390 
00391   __bbio_mutex->lock();
00392   __bbio_events -= 1;
00393   process_bbio_queue();
00394   __bbio_mutex->unlock();
00395 }
00396 
00397 
00398 /** Notify that an interface has been destroyed.
00399  * @param type type of the interface
00400  * @param id ID of the interface
00401  */
00402 void
00403 BlackBoardNotifier::notify_of_interface_destroyed(const char *type, const char *id) throw()
00404 {
00405   __bbio_mutex->lock();
00406   __bbio_events += 1;
00407   __bbio_mutex->unlock();
00408 
00409   BBioMapIterator lhmi;
00410   BBioListIterator i, l;
00411   for (lhmi = __bbio_destroyed.begin(); lhmi != __bbio_destroyed.end(); ++lhmi) {
00412     if (fnmatch(lhmi->first.c_str(), type, 0) != 0) continue;
00413 
00414     BBioList &list = (*lhmi).second;
00415     for (i = list.begin(); i != list.end(); ++i) {
00416       BlackBoardInterfaceObserver *bbio = i->first;
00417       for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
00418         if (fnmatch(pi->c_str(), id, 0) == 0) {
00419           bbio->bb_interface_destroyed(type, id);
00420           break;
00421         }
00422       }
00423     }
00424   }
00425 
00426   __bbio_mutex->lock();
00427   __bbio_events -= 1;
00428   process_bbio_queue();
00429   __bbio_mutex->unlock();
00430 }
00431 
00432 
00433 void
00434 BlackBoardNotifier::process_bbio_queue()
00435 {
00436   if ( ! __bbio_queue.empty() ) {
00437     if (__bbio_events > 0 ) {
00438       return;
00439     } else {
00440       while (! __bbio_queue.empty()) {
00441         BBioQueueEntry &e = __bbio_queue.front();
00442         if (e.first) { // register
00443           add_observer(e.second, e.second->bbio_get_observed_create(), __bbio_created);
00444           add_observer(e.second, e.second->bbio_get_observed_destroy(), __bbio_destroyed);
00445         } else {       // unregister
00446           remove_observer(__bbio_created, e.second);
00447           remove_observer(__bbio_destroyed, e.second);
00448         }
00449         __bbio_queue.pop_front();
00450       }
00451     }
00452   }
00453 }
00454 
00455 
00456 /** Notify that writer has been added.
00457  * @param interface the interface for which the event happened. It is not necessarily the
00458  * instance which caused the event, but it must have the same mem serial.
00459  * @param event_instance_serial the instance serial of the interface that caused the event
00460  * @see BlackBoardInterfaceListener::bb_interface_writer_added()
00461  */
00462 void
00463 BlackBoardNotifier::notify_of_writer_added(const Interface *interface,
00464                                            unsigned int event_instance_serial) throw()
00465 {
00466   __bbil_writer_mutex->lock();
00467   __bbil_writer_events += 1;
00468   __bbil_writer_mutex->unlock();
00469 
00470   const char *uid = interface->uid();
00471   std::pair<BBilMap::iterator, BBilMap::iterator> ret =
00472     __bbil_writer.equal_range(uid);
00473   for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
00474     BlackBoardInterfaceListener *bbil = j->second;
00475     if (! is_in_queue(/* remove op*/ false, __bbil_writer_queue, uid, bbil)) {
00476       Interface *bbil_iface = bbil->bbil_writer_interface(uid);
00477       if (bbil_iface != NULL ) {
00478         bbil->bb_interface_writer_added(bbil_iface, event_instance_serial);
00479       } else {
00480         LibLogger::log_warn("BlackBoardNotifier",
00481                             "BBIL[%s] registered for writer events "
00482                             "(open) for '%s' but has no such interface",
00483                             bbil->bbil_name(), uid);
00484       }
00485     }
00486   }
00487 
00488   __bbil_writer_mutex->lock();
00489   __bbil_writer_events -= 1;
00490   process_writer_queue();
00491   __bbil_writer_mutex->unlock();
00492 }
00493 
00494 
00495 /** Notify that writer has been removed.
00496  * @param interface interface for which the writer has been removed
00497  * @param event_instance_serial instance serial of the interface that caused the event
00498  * @see BlackBoardInterfaceListener::bb_interface_writer_removed()
00499  */
00500 void
00501 BlackBoardNotifier::notify_of_writer_removed(const Interface *interface,
00502                                              unsigned int event_instance_serial) throw()
00503 {
00504   __bbil_writer_mutex->lock();
00505   __bbil_writer_events += 1;
00506   __bbil_writer_mutex->unlock();
00507 
00508   const char *uid = interface->uid();
00509   std::pair<BBilMap::iterator, BBilMap::iterator> ret =
00510     __bbil_writer.equal_range(uid);
00511   for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
00512     BlackBoardInterfaceListener *bbil = j->second;
00513     if (! is_in_queue(/* remove op*/ false, __bbil_data_queue, uid, bbil)) {
00514       Interface *bbil_iface = bbil->bbil_writer_interface(uid);
00515       if (bbil_iface != NULL ) {
00516         bbil->bb_interface_writer_removed(bbil_iface, event_instance_serial);
00517       } else {
00518         LibLogger::log_warn("BlackBoardNotifier",
00519                             "BBIL[%s] registered for writer events "
00520                             "(close) for '%s' but has no such interface",
00521                             bbil->bbil_name(), uid);
00522       }
00523     }
00524   }
00525 
00526   __bbil_writer_mutex->lock();
00527   __bbil_writer_events -= 1;
00528   process_writer_queue();
00529   __bbil_writer_mutex->unlock();
00530 }
00531 
00532 void
00533 BlackBoardNotifier::process_writer_queue()
00534 {
00535   if ( ! __bbil_writer_queue.empty() ) {
00536     if (__bbil_writer_events > 0 ) {
00537       return;
00538     } else {
00539       while (! __bbil_writer_queue.empty()) {
00540         BBilQueueEntry &e = __bbil_writer_queue.front();
00541         if (e.op) { // register
00542           add_listener(e.interface, e.listener, __bbil_writer);
00543         } else {    // unregister
00544           remove_listener(e.interface, e.listener, __bbil_writer);
00545         }
00546         __bbil_writer_queue.pop_front();
00547       }
00548     }
00549   }
00550 }
00551 
00552 
00553 /** Notify that reader has been added.
00554  * @param interface interface for which the reader has been added
00555  * @param event_instance_serial instance serial of the interface that caused the event
00556  * @see BlackBoardInterfaceListener::bb_interface_reader_added()
00557  */
00558 void
00559 BlackBoardNotifier::notify_of_reader_added(const Interface *interface,
00560                                            unsigned int event_instance_serial) throw()
00561 {
00562   __bbil_reader_mutex->lock();
00563   __bbil_reader_events += 1;
00564   __bbil_reader_mutex->unlock();
00565 
00566   const char *uid = interface->uid();
00567   std::pair<BBilMap::iterator, BBilMap::iterator> ret =
00568     __bbil_reader.equal_range(uid);
00569   for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
00570     BlackBoardInterfaceListener *bbil = j->second;
00571     if (! is_in_queue(/* remove op*/ false, __bbil_reader_queue, uid, bbil)) {
00572       Interface *bbil_iface = bbil->bbil_reader_interface(uid);
00573       if (bbil_iface != NULL ) {
00574         bbil->bb_interface_reader_added(bbil_iface, event_instance_serial);
00575       } else {
00576         LibLogger::log_warn("BlackBoardNotifier",
00577                             "BBIL[%s] registered for reader events "
00578                             "(open) for '%s' but has no such interface",
00579                             bbil->bbil_name(), uid);
00580       }
00581     }
00582   }
00583 
00584   __bbil_reader_mutex->lock();
00585   __bbil_reader_events -= 1;
00586   process_reader_queue();
00587   __bbil_reader_mutex->unlock();
00588 }
00589 
00590 
00591 /** Notify that reader has been removed.
00592  * @param interface interface for which the reader has been removed
00593  * @param event_instance_serial instance serial of the interface that caused the event
00594  * @see BlackBoardInterfaceListener::bb_interface_reader_removed()
00595  */
00596 void
00597 BlackBoardNotifier::notify_of_reader_removed(const Interface *interface,
00598                                              unsigned int event_instance_serial) throw()
00599 {
00600   __bbil_reader_mutex->lock();
00601   __bbil_reader_events += 1;
00602   __bbil_reader_mutex->unlock();
00603 
00604   const char *uid = interface->uid();
00605   std::pair<BBilMap::iterator, BBilMap::iterator> ret =
00606     __bbil_reader.equal_range(uid);
00607   for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
00608     BlackBoardInterfaceListener *bbil = j->second;
00609     if (! is_in_queue(/* remove op*/ false, __bbil_data_queue, uid, bbil)) {
00610       Interface *bbil_iface = bbil->bbil_reader_interface(uid);
00611       if (bbil_iface != NULL ) {
00612         bbil->bb_interface_reader_removed(bbil_iface, event_instance_serial);
00613       } else {
00614         LibLogger::log_warn("BlackBoardNotifier",
00615                             "BBIL[%s] registered for reader events "
00616                             "(close) for '%s' but has no such interface",
00617                             bbil->bbil_name(), uid);
00618       }
00619     }
00620   }
00621 
00622   __bbil_reader_mutex->lock();
00623   __bbil_reader_events -= 1;
00624   process_reader_queue();
00625   __bbil_reader_mutex->unlock();
00626 }
00627 
00628 
00629 void
00630 BlackBoardNotifier::process_reader_queue()
00631 {
00632   if ( ! __bbil_reader_queue.empty() ) {
00633     if (__bbil_reader_events > 0 ) {
00634       return;
00635     } else {
00636       while (! __bbil_reader_queue.empty()) {
00637         BBilQueueEntry &e = __bbil_reader_queue.front();
00638         if (e.op) { // register
00639           add_listener(e.interface, e.listener, __bbil_reader);
00640         } else {    // unregister
00641           remove_listener(e.interface, e.listener, __bbil_reader);
00642         }
00643         __bbil_reader_queue.pop_front();
00644       }
00645     }
00646   }
00647 }
00648 
00649 
00650 /** Notify of data change.
00651  * Notify all subscribers of the given interface of a data change.
00652  * This also influences logging and sending data over the network so it is
00653  * mandatory to call this function! The interface base class write method does
00654  * that for you.
00655  * @param interface interface whose subscribers to notify
00656  * @see Interface::write()
00657  * @see BlackBoardInterfaceListener::bb_interface_data_changed()
00658  */
00659 void
00660 BlackBoardNotifier::notify_of_data_change(const Interface *interface)
00661 {
00662   __bbil_data_mutex->lock();
00663   __bbil_data_events += 1;
00664   __bbil_data_mutex->unlock();
00665 
00666   const char *uid = interface->uid();
00667   std::pair<BBilMap::iterator, BBilMap::iterator> ret =
00668     __bbil_data.equal_range(uid);
00669   for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
00670     BlackBoardInterfaceListener *bbil = j->second;
00671     if (! is_in_queue(/* remove op*/ false, __bbil_data_queue, uid, bbil)) {
00672       Interface *bbil_iface = bbil->bbil_data_interface(uid);
00673       if (bbil_iface != NULL ) {
00674         bbil->bb_interface_data_changed(bbil_iface);
00675       } else {
00676         LibLogger::log_warn("BlackBoardNotifier",
00677                             "BBIL[%s] registered for data change events "
00678                             "for '%s' but has no such interface",
00679                             bbil->bbil_name(), uid);
00680       }
00681     }
00682   }
00683 
00684   __bbil_data_mutex->lock();
00685   __bbil_data_events -= 1;
00686   if ( ! __bbil_data_queue.empty() ) {
00687     if (__bbil_data_events == 0 ) {
00688       while (! __bbil_data_queue.empty()) {
00689         BBilQueueEntry &e = __bbil_data_queue.front();
00690         if (e.op) { // register
00691           add_listener(e.interface, e.listener,  __bbil_data);
00692         } else {    // unregister
00693           remove_listener(e.interface, e.listener,  __bbil_data);
00694         }
00695         __bbil_data_queue.pop_front();
00696       }
00697     }
00698   }
00699   __bbil_data_mutex->unlock();
00700 }
00701 
00702 
00703 /** Notify of message received
00704  * Notify all subscribers of the given interface of an incoming message
00705  * This also influences logging and sending data over the network so it is
00706  * mandatory to call this function! The interface base class write method does
00707  * that for you.
00708  * @param interface interface whose subscribers to notify
00709  * @param message message which is being received
00710  * @return false if any listener returned false, true otherwise
00711  * @see BlackBoardInterfaceListener::bb_interface_message_received()
00712  */
00713 bool
00714 BlackBoardNotifier::notify_of_message_received(const Interface *interface, Message *message)
00715 {
00716   __bbil_messages_mutex->lock();
00717   __bbil_messages_events += 1;
00718   __bbil_messages_mutex->unlock();
00719 
00720   bool done = true;
00721 
00722   const char *uid = interface->uid();
00723   std::pair<BBilMap::iterator, BBilMap::iterator> ret =
00724     __bbil_messages.equal_range(uid);
00725   for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
00726     BlackBoardInterfaceListener *bbil = j->second;
00727     if (! is_in_queue(/* remove op*/ false, __bbil_messages_queue, uid, bbil)) {
00728       Interface *bbil_iface = bbil->bbil_message_interface(uid);
00729       if (bbil_iface != NULL ) {
00730         bool abort = bbil->bb_interface_message_received(bbil_iface, message);
00731         if (abort) {
00732           done = true;
00733           break;
00734         }
00735       } else {
00736         LibLogger::log_warn("BlackBoardNotifier",
00737                           "BBIL[%s] registered for message events "
00738                             "for '%s' but has no such interface",
00739                             bbil->bbil_name(), uid);
00740       }
00741     }
00742   }
00743 
00744   __bbil_messages_mutex->lock();
00745   __bbil_messages_events -= 1;
00746   if ( ! __bbil_messages_queue.empty() ) {
00747     if (__bbil_messages_events == 0 ) {
00748       while (! __bbil_messages_queue.empty()) {
00749         BBilQueueEntry &e = __bbil_messages_queue.front();
00750         if (e.op) { // register
00751           add_listener(e.interface, e.listener,  __bbil_messages);
00752         } else {    // unregister
00753           remove_listener(e.interface, e.listener,  __bbil_messages);
00754         }
00755         __bbil_messages_queue.pop_front();
00756       }
00757     }
00758   }
00759   __bbil_messages_mutex->unlock();
00760 
00761   return done;
00762 }
00763 
00764 } // end namespace fawkes