24 #include <baseapp/thread_manager.h> 25 #include <core/threading/thread.h> 26 #include <core/threading/mutex_locker.h> 27 #include <core/threading/wait_condition.h> 28 #include <core/threading/thread_initializer.h> 29 #include <core/threading/thread_finalizer.h> 30 #include <core/exceptions/software.h> 31 #include <core/exceptions/system.h> 33 #include <aspect/blocked_timing.h> 60 ThreadManager::ThreadManagerAspectCollector::ThreadManagerAspectCollector(ThreadManager *parent_manager)
62 __parent_manager = parent_manager;
67 ThreadManager::ThreadManagerAspectCollector::add(ThreadList &tl)
69 BlockedTimingAspect *timed_thread;
71 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
72 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
73 throw IllegalArgumentException(
"ThreadProducerAspect may not add threads with BlockedTimingAspect");
77 __parent_manager->add_maybelocked(tl,
false);
82 ThreadManager::ThreadManagerAspectCollector::add(Thread *t)
84 BlockedTimingAspect *timed_thread;
86 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
87 throw IllegalArgumentException(
"ThreadProducerAspect may not add threads with BlockedTimingAspect");
90 __parent_manager->add_maybelocked(t,
false);
95 ThreadManager::ThreadManagerAspectCollector::remove(ThreadList &tl)
97 BlockedTimingAspect *timed_thread;
99 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
100 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
101 throw IllegalArgumentException(
"ThreadProducerAspect may not remove threads with BlockedTimingAspect");
105 __parent_manager->remove_maybelocked(tl,
false);
110 ThreadManager::ThreadManagerAspectCollector::remove(Thread *t)
112 BlockedTimingAspect *timed_thread;
114 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
115 throw IllegalArgumentException(
"ThreadProducerAspect may not remove threads with BlockedTimingAspect");
118 __parent_manager->remove_maybelocked(t,
false);
125 throw AccessViolationException(
"ThreadManagerAspect threads may not force removal of threads");
129 ThreadManager::ThreadManagerAspectCollector::force_remove(
fawkes::Thread *t)
131 throw AccessViolationException(
"ThreadManagerAspect threads may not force removal of threads");
141 __initializer = NULL;
145 __interrupt_timed_thread_wait =
false;
146 __aspect_collector =
new ThreadManagerAspectCollector(
this);
158 __initializer = NULL;
162 __interrupt_timed_thread_wait =
false;
163 __aspect_collector =
new ThreadManagerAspectCollector(
this);
173 for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) {
175 __tit->second.force_stop(__finalizer);
183 delete __waitcond_timedthreads;
184 delete __aspect_collector;
196 __initializer = initializer;
197 __finalizer = finalizer;
209 ThreadManager::internal_remove_thread(
Thread *t)
213 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
216 if ( __threads.find(hook) != __threads.end() ) {
217 __threads[hook].remove_locked(t);
218 if (__threads[hook].empty()) __threads.erase(hook);
234 ThreadManager::internal_add_thread(
Thread *t)
237 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
240 if ( __threads.find(hook) == __threads.end() ) {
241 __threads[hook].set_name(
"ThreadManagerList Hook %i", hook);
242 __threads[hook].set_maintain_barrier(
true);
244 __threads[hook].push_back_locked(t);
246 __waitcond_timedthreads->
wake_all();
265 ThreadManager::add_maybelocked(
ThreadList &tl,
bool lock)
267 if ( ! (__initializer && __finalizer) ) {
272 throw Exception(
"Not accepting new threads from list that is not fresh, " 273 "list '%s' already sealed", tl.
name());
280 tl.
init(__initializer, __finalizer);
291 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
292 internal_add_thread(*i);
310 ThreadManager::add_maybelocked(
Thread *thread,
bool lock)
312 if ( thread == NULL ) {
316 if ( ! (__initializer && __finalizer) ) {
321 __initializer->
init(thread);
324 e.
append(
"Adding thread in ThreadManager failed");
340 cite(
"Could not initialize thread '%s'", thread->
name());
344 }
catch (std::exception &e) {
347 cite(
"Could not initialize thread '%s'", thread->
name());
348 cite.
append(
"Caught std::exception or derivative: %s", e.what());
354 cite(
"Could not initialize thread '%s'", thread->
name());
355 cite.
append(
"Unknown exception caught");
362 internal_add_thread(thread);
381 ThreadManager::remove_maybelocked(
ThreadList &tl,
bool lock)
383 if ( ! (__initializer && __finalizer) ) {
390 "list. Not accepting unsealed list '%s' for removal",
402 "finalized", tl.
name());
409 e.
append(
"One or more threads in list '%s' cannot be finalized", tl.
name());
421 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
422 internal_remove_thread(*i);
442 ThreadManager::remove_maybelocked(
Thread *thread,
bool lock)
444 if ( thread == NULL )
return;
446 if ( ! (__initializer && __finalizer) ) {
457 e.
append(
"ThreadManager cannot stop thread '%s'", thread->
name());
467 internal_remove_thread(thread);
498 __threads.mutex()->stopby();
499 bool caught_exception =
false;
500 Exception exc(
"Forced removal of thread list %s failed", tl.
name());
504 caught_exception =
true;
508 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
509 internal_remove_thread(*i);
514 if (caught_exception) {
548 if (__finalizer) __finalizer->
finalize(thread);
551 internal_remove_thread(thread);
557 unsigned int timeout_usec)
561 unsigned int timeout_sec = 0;
562 if (timeout_usec >= 1000000) {
563 timeout_sec = timeout_usec / 1000000;
564 timeout_usec -= timeout_sec * 1000000;
568 if ( __threads.find(hook) != __threads.end() ) {
569 __threads[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
579 if ( __threads.find(hook) != __threads.end() ) {
581 __threads[hook].wakeup(barrier);
583 __threads[hook].wakeup();
585 if ( __threads[hook].size() == 0 ) {
586 __threads.erase(hook);
596 for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) {
597 __tit->second.try_recover(recovered_threads);
606 return (__threads.size() > 0);
613 __interrupt_timed_thread_wait =
false;
614 __waitcond_timedthreads->
wait();
615 if ( __interrupt_timed_thread_wait ) {
616 __interrupt_timed_thread_wait =
false;
624 __interrupt_timed_thread_wait =
true;
625 __waitcond_timedthreads->
wake_all();
636 return __aspect_collector;
bool sealed()
Check if list is sealed.
void set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Set initializer/finalizer.
Wait until a given condition holds.
const char * name()
Name of the thread list.
virtual void init(Thread *thread)=0
This method is called by the ThreadManager for each newly added Thread.
void start()
Start threads.
virtual void lock() const
Lock list.
void cancel_finalize()
Cancel finalization on all threads.
Fawkes library namespace.
void wake_all()
Wake up all waiting threads.
void push_back_locked(Thread *thread)
Add thread to the end with lock protection.
void seal()
Seal the list.
virtual ~ThreadManager()
Destructor.
virtual void finalize(Thread *thread)=0
Finalize a thread.
void cancel_finalize()
Cancel finalization.
void force_stop(ThreadFinalizer *finalizer)
Force stop of all threads.
A NULL pointer was supplied where not allowed.
virtual bool timed_threads_exist()
Check if any timed threads exist.
Thread class encapsulation of pthreads.
virtual void force_remove(ThreadList &tl)
Force removal of the given threads.
bool prepare_finalize(ThreadFinalizer *finalizer)
Prepare finalize.
ThreadManager()
Constructor.
Thread aspect to use blocked timing.
virtual void wait_for_timed_threads()
Wait for timed threads.
Thread cannot be initialized.
WakeupHook
Type to define at which hook the thread is woken up.
Base class for exceptions in Fawkes.
void remove_locked(Thread *thread)
Remove with lock protection.
void init(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Initialize threads.
Thread initializer interface.
virtual void finalize()
Finalize the thread.
bool prepare_finalize()
Prepare finalization.
virtual void unlock() const
Unlock list.
void finalize(ThreadFinalizer *finalizer)
Finalize Threads.
const char * name() const
Get name of thread.
void notify_of_failed_init()
Notify of failed init.
The current system call has been interrupted (for instance by a signal).
void wait()
Wait for the condition forever.
virtual void wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier=0)
Wakeup thread for given hook.
virtual void wakeup_and_wait(BlockedTimingAspect::WakeupHook hook, unsigned int timeout_usec=0)
Wakeup thread for given hook and wait for completion.
void cancel()
Cancel a thread.
Thread list not sealed exception.
virtual void interrupt_timed_thread_wait()
Interrupt any currently running wait_for_timed_threads() and cause it to throw an InterruptedExceptio...
Thread cannot be finalized.
void join()
Join the thread.
virtual void try_recover(std::list< std::string > &recovered_threads)
Try to recover threads.
WakeupHook blockedTimingAspectHook() const
Get the wakeup hook.
virtual void init()
Initialize the thread.
ThreadCollector * aspect_collector() const
Get a thread collector to be used for an aspect initializer.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
void append(const char *format,...)
Append messages to the message list.
void start(bool wait=true)
Call this method to start the thread.
Thread finalizer interface.