24 #include <blackboard/internal/notifier.h> 25 #include <blackboard/blackboard.h> 26 #include <blackboard/interface_listener.h> 27 #include <blackboard/interface_observer.h> 29 #include <core/threading/mutex.h> 30 #include <core/threading/mutex_locker.h> 31 #include <core/utils/lock_hashset.h> 32 #include <core/utils/lock_hashmap.h> 33 #include <logging/liblogger.h> 34 #include <interface/interface.h> 56 __bbil_writer_events = 0;
57 __bbil_writer_mutex =
new Mutex();
59 __bbil_reader_events = 0;
60 __bbil_reader_mutex =
new Mutex();
62 __bbil_data_events = 0;
63 __bbil_data_mutex =
new Mutex();
65 __bbil_messages_events = 0;
66 __bbil_messages_mutex =
new Mutex();
69 __bbio_mutex =
new Mutex();
76 delete __bbil_writer_mutex;
77 delete __bbil_reader_mutex;
78 delete __bbil_data_mutex;
79 delete __bbil_messages_mutex;
107 listener->bbil_acquire_queue();
109 BlackBoardInterfaceListener::InterfaceQueue::const_iterator i = queue.begin();
111 for (i = queue.begin(); i != queue.end(); ++i) {
115 proc_listener_maybe_queue(i->op, i->interface, listener,
116 __bbil_data_mutex, __bbil_data_events,
117 __bbil_data, __bbil_data_queue,
"data");
122 proc_listener_maybe_queue(i->op, i->interface, listener,
123 __bbil_messages_mutex, __bbil_messages_events,
124 __bbil_messages, __bbil_messages_queue,
130 proc_listener_maybe_queue(i->op, i->interface, listener,
131 __bbil_reader_mutex, __bbil_reader_events,
132 __bbil_reader, __bbil_reader_queue,
"reader");
137 proc_listener_maybe_queue(i->op, i->interface, listener,
138 __bbil_writer_mutex, __bbil_writer_events,
139 __bbil_writer, __bbil_writer_queue,
"writer");
146 listener->bbil_release_queue(flag);
150 BlackBoardNotifier::proc_listener_maybe_queue(
bool op,
153 Mutex *mutex,
unsigned int &events,
154 BBilMap &map, BBilQueue &queue,
160 "listener %s for %s events (queued)",
163 queue_listener(op, interface, listener, queue);
166 add_listener(interface, listener, map);
168 remove_listener(interface, listener, map);
183 listener->bbil_acquire_maps();
185 BlackBoardInterfaceListener::InterfaceMap::const_iterator i;
186 for (i = maps.
data.begin(); i != maps.
data.end(); ++i) {
187 proc_listener_maybe_queue(
false, i->second, listener,
188 __bbil_data_mutex, __bbil_data_events,
189 __bbil_data, __bbil_data_queue,
"data");
193 proc_listener_maybe_queue(
false, i->second, listener,
194 __bbil_messages_mutex, __bbil_messages_events,
195 __bbil_messages, __bbil_messages_queue,
199 for (i = maps.
reader.begin(); i != maps.
reader.end(); ++i) {
200 proc_listener_maybe_queue(
false, i->second, listener,
201 __bbil_reader_mutex, __bbil_reader_events,
202 __bbil_reader, __bbil_reader_queue,
"reader");
205 for (i = maps.
writer.begin(); i != maps.
writer.end(); ++i) {
206 proc_listener_maybe_queue(
false, i->second, listener,
207 __bbil_writer_mutex, __bbil_writer_events,
208 __bbil_writer, __bbil_writer_queue,
"writer");
211 listener->bbil_release_maps();
220 BlackBoardNotifier::add_listener(
Interface *interface,
224 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
225 ilmap.equal_range(interface->
uid());
227 BBilMap::value_type v = std::make_pair(interface->
uid(), listener);
228 BBilMap::iterator f = std::find(ret.first, ret.second, v);
230 if (f == ret.second) {
231 ilmap.insert(std::make_pair(interface->
uid(), listener));
236 BlackBoardNotifier::remove_listener(
Interface *interface,
240 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
241 ilmap.equal_range(interface->
uid());
242 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
243 if (j->second == listener) {
252 BlackBoardNotifier::is_in_queue(
bool op, BBilQueue &queue,
const char *uid,
255 BBilQueue::iterator q;
256 for (q = queue.begin(); q != queue.end(); ++q) {
257 if ((q->op == op) && (q->uid == uid) && (q->listener == bbil)) {
265 BlackBoardNotifier::queue_listener(
bool op,
Interface *interface,
269 BBilQueueEntry qe = { op, interface->
uid(), interface, listener };
281 __bbio_mutex->
lock();
282 if (__bbio_events > 0) {
283 __bbio_queue.push_back(std::make_pair(1, observer));
299 for (i = its->begin(); i != its->end(); ++i) {
300 bbiomap[i->first].push_back(make_pair(observer, i->second));
313 BBioMapIterator i, tmp;
316 while (i != iomap.end()) {
317 BBioListIterator j = i->second.begin();
318 while (j != i->second.end()) {
319 if ( j->first == observer ) {
320 j = i->second.erase(j);
325 if ( i->second.empty() ) {
344 if ( __bbio_events > 0) {
345 BBioQueueEntry e = std::make_pair((
unsigned int)0, observer);
346 BBioQueue::iterator re;
347 while ( (re = find_if(__bbio_queue.begin(), __bbio_queue.end(),
348 bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
349 != __bbio_queue.end()) {
351 if (re->second == observer) {
352 __bbio_queue.erase(re);
355 __bbio_queue.push_back(std::make_pair(0, observer));
358 remove_observer(__bbio_created, observer);
359 remove_observer(__bbio_destroyed, observer);
370 __bbio_mutex->
lock();
374 BBioMapIterator lhmi;
375 BBioListIterator i, l;
376 for (lhmi = __bbio_created.begin(); lhmi != __bbio_created.end(); ++lhmi) {
377 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
continue;
379 BBioList &list = lhmi->second;
380 for (i = list.begin(); i != list.end(); ++i) {
382 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
383 if (fnmatch(pi->c_str(), id, 0) == 0) {
391 __bbio_mutex->
lock();
393 process_bbio_queue();
405 __bbio_mutex->
lock();
409 BBioMapIterator lhmi;
410 BBioListIterator i, l;
411 for (lhmi = __bbio_destroyed.begin(); lhmi != __bbio_destroyed.end(); ++lhmi) {
412 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
continue;
414 BBioList &list = (*lhmi).second;
415 for (i = list.begin(); i != list.end(); ++i) {
417 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
418 if (fnmatch(pi->c_str(), id, 0) == 0) {
426 __bbio_mutex->
lock();
428 process_bbio_queue();
434 BlackBoardNotifier::process_bbio_queue()
436 if ( ! __bbio_queue.empty() ) {
437 if (__bbio_events > 0 ) {
440 while (! __bbio_queue.empty()) {
441 BBioQueueEntry &e = __bbio_queue.front();
443 add_observer(e.second, e.second->bbio_get_observed_create(), __bbio_created);
444 add_observer(e.second, e.second->bbio_get_observed_destroy(), __bbio_destroyed);
446 remove_observer(__bbio_created, e.second);
447 remove_observer(__bbio_destroyed, e.second);
449 __bbio_queue.pop_front();
464 unsigned int event_instance_serial)
throw()
466 __bbil_writer_mutex->
lock();
467 __bbil_writer_events += 1;
468 __bbil_writer_mutex->
unlock();
470 const char *uid = interface->
uid();
471 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
472 __bbil_writer.equal_range(uid);
473 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
475 if (! is_in_queue(
false, __bbil_writer_queue, uid, bbil)) {
477 if (bbil_iface != NULL ) {
481 "BBIL[%s] registered for writer events " 482 "(open) for '%s' but has no such interface",
488 __bbil_writer_mutex->
lock();
489 __bbil_writer_events -= 1;
490 process_writer_queue();
491 __bbil_writer_mutex->
unlock();
502 unsigned int event_instance_serial)
throw()
504 __bbil_writer_mutex->
lock();
505 __bbil_writer_events += 1;
506 __bbil_writer_mutex->
unlock();
508 const char *uid = interface->
uid();
509 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
510 __bbil_writer.equal_range(uid);
511 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
513 if (! is_in_queue(
false, __bbil_data_queue, uid, bbil)) {
515 if (bbil_iface != NULL ) {
519 "BBIL[%s] registered for writer events " 520 "(close) for '%s' but has no such interface",
526 __bbil_writer_mutex->
lock();
527 __bbil_writer_events -= 1;
528 process_writer_queue();
529 __bbil_writer_mutex->
unlock();
533 BlackBoardNotifier::process_writer_queue()
535 if ( ! __bbil_writer_queue.empty() ) {
536 if (__bbil_writer_events > 0 ) {
539 while (! __bbil_writer_queue.empty()) {
540 BBilQueueEntry &e = __bbil_writer_queue.front();
542 add_listener(e.interface, e.listener, __bbil_writer);
544 remove_listener(e.interface, e.listener, __bbil_writer);
546 __bbil_writer_queue.pop_front();
560 unsigned int event_instance_serial)
throw()
562 __bbil_reader_mutex->
lock();
563 __bbil_reader_events += 1;
564 __bbil_reader_mutex->
unlock();
566 const char *uid = interface->
uid();
567 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
568 __bbil_reader.equal_range(uid);
569 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
571 if (! is_in_queue(
false, __bbil_reader_queue, uid, bbil)) {
573 if (bbil_iface != NULL ) {
577 "BBIL[%s] registered for reader events " 578 "(open) for '%s' but has no such interface",
584 __bbil_reader_mutex->
lock();
585 __bbil_reader_events -= 1;
586 process_reader_queue();
587 __bbil_reader_mutex->
unlock();
598 unsigned int event_instance_serial)
throw()
600 __bbil_reader_mutex->
lock();
601 __bbil_reader_events += 1;
602 __bbil_reader_mutex->
unlock();
604 const char *uid = interface->
uid();
605 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
606 __bbil_reader.equal_range(uid);
607 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
609 if (! is_in_queue(
false, __bbil_data_queue, uid, bbil)) {
611 if (bbil_iface != NULL ) {
615 "BBIL[%s] registered for reader events " 616 "(close) for '%s' but has no such interface",
622 __bbil_reader_mutex->
lock();
623 __bbil_reader_events -= 1;
624 process_reader_queue();
625 __bbil_reader_mutex->
unlock();
630 BlackBoardNotifier::process_reader_queue()
632 if ( ! __bbil_reader_queue.empty() ) {
633 if (__bbil_reader_events > 0 ) {
636 while (! __bbil_reader_queue.empty()) {
637 BBilQueueEntry &e = __bbil_reader_queue.front();
639 add_listener(e.interface, e.listener, __bbil_reader);
641 remove_listener(e.interface, e.listener, __bbil_reader);
643 __bbil_reader_queue.pop_front();
662 __bbil_data_mutex->
lock();
663 __bbil_data_events += 1;
664 __bbil_data_mutex->
unlock();
666 const char *uid = interface->
uid();
667 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
668 __bbil_data.equal_range(uid);
669 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
671 if (! is_in_queue(
false, __bbil_data_queue, uid, bbil)) {
673 if (bbil_iface != NULL ) {
677 "BBIL[%s] registered for data change events " 678 "for '%s' but has no such interface",
684 __bbil_data_mutex->
lock();
685 __bbil_data_events -= 1;
686 if ( ! __bbil_data_queue.empty() ) {
687 if (__bbil_data_events == 0 ) {
688 while (! __bbil_data_queue.empty()) {
689 BBilQueueEntry &e = __bbil_data_queue.front();
691 add_listener(e.interface, e.listener, __bbil_data);
693 remove_listener(e.interface, e.listener, __bbil_data);
695 __bbil_data_queue.pop_front();
699 __bbil_data_mutex->
unlock();
716 __bbil_messages_mutex->
lock();
717 __bbil_messages_events += 1;
718 __bbil_messages_mutex->
unlock();
722 const char *uid = interface->
uid();
723 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
724 __bbil_messages.equal_range(uid);
725 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
727 if (! is_in_queue(
false, __bbil_messages_queue, uid, bbil)) {
729 if (bbil_iface != NULL ) {
737 "BBIL[%s] registered for message events " 738 "for '%s' but has no such interface",
744 __bbil_messages_mutex->
lock();
745 __bbil_messages_events -= 1;
746 if ( ! __bbil_messages_queue.empty() ) {
747 if (__bbil_messages_events == 0 ) {
748 while (! __bbil_messages_queue.empty()) {
749 BBilQueueEntry &e = __bbil_messages_queue.front();
751 add_listener(e.interface, e.listener, __bbil_messages);
753 remove_listener(e.interface, e.listener, __bbil_messages);
755 __bbil_messages_queue.pop_front();
759 __bbil_messages_mutex->
unlock();
ObservedInterfaceLockMap::iterator ObservedInterfaceLockMapIterator
Type for iterator of lockable interface type hash sets.
void notify_of_reader_added(const Interface *interface, unsigned int event_instance_serial)
Notify that reader has been added.
ListenerRegisterFlag
Flags to constrain listener registration/updates.
Interface * bbil_reader_interface(const char *iuid)
Get interface instance for given UID.
virtual bool bb_interface_message_received(Interface *interface, Message *message)
BlackBoard message received notification.
Base class for all messages passed through interfaces in Fawkes BlackBoard.
ObservedInterfaceLockMap * bbio_get_observed_destroy()
Get interface destriction type watch list.
void notify_of_interface_created(const char *type, const char *id)
Notify that an interface has been created.
void lock() const
Lock list.
virtual void bb_interface_destroyed(const char *type, const char *id)
BlackBoard interface destroyed notification.
void update_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Update BB event listener.
InterfaceMap messages
Message received event subscriptions.
InterfaceMap writer
Writer event subscriptions.
void notify_of_writer_added(const Interface *interface, unsigned int event_instance_serial)
Notify that writer has been added.
Fawkes library namespace.
void unlock()
Unlock the mutex.
BlackBoardNotifier()
Constructor.
Structure to hold maps for active subscriptions.
void notify_of_writer_removed(const Interface *interface, unsigned int event_instance_serial)
Notify that writer has been removed.
virtual void bb_interface_reader_removed(Interface *interface, unsigned int instance_serial)
A reading instance has been closed for a watched interface.
void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Interface * bbil_message_interface(const char *iuid)
Get interface instance for given UID.
void notify_of_data_change(const Interface *interface)
Notify of data change.
Message received event entry.
InterfaceMap data
Data event subscriptions.
Base class for all Fawkes BlackBoard interfaces.
consider message received events
void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
virtual void bb_interface_data_changed(Interface *interface)
BlackBoard data changed notification.
void notify_of_interface_destroyed(const char *type, const char *id)
Notify that an interface has been destroyed.
InterfaceMap reader
Reader event subscriptions.
void unlock() const
Unlock list.
const char * uid() const
Get unique identifier of interface.
static void log_warn(const char *component, const char *format,...)
Log warning message.
BlackBoard interface observer.
std::list< QueueEntry > InterfaceQueue
Queue of additions/removal of interfaces.
Interface * bbil_data_interface(const char *iuid)
Get interface instance for given UID.
Interface * bbil_writer_interface(const char *iuid)
Get interface instance for given UID.
ObservedInterfaceLockMap * bbio_get_observed_create()
Get interface creation type watch list.
virtual void bb_interface_reader_added(Interface *interface, unsigned int instance_serial)
A reading instance has been opened for a watched interface.
virtual void bb_interface_writer_added(Interface *interface, unsigned int instance_serial)
A writing instance has been opened for a watched interface.
bool notify_of_message_received(const Interface *interface, Message *message)
Notify of message received Notify all subscribers of the given interface of an incoming message This ...
Data changed event entry.
void lock()
Lock this mutex.
virtual void bb_interface_writer_removed(Interface *interface, unsigned int instance_serial)
A writing instance has been closed for a watched interface.
const char * bbil_name() const
Get BBIL name.
virtual ~BlackBoardNotifier()
Destructor.
void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Mutex mutual exclusion lock.
virtual void bb_interface_created(const char *type, const char *id)
BlackBoard interface created notification.
void notify_of_reader_removed(const Interface *interface, unsigned int event_instance_serial)
Notify that reader has been removed.
void register_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Register BB event listener.
BlackBoard interface listener.