24 #include <core/threading/thread_list.h> 25 #include <core/threading/thread.h> 26 #include <core/threading/mutex.h> 27 #include <core/threading/mutex_locker.h> 28 #include <core/threading/barrier.h> 29 #include <core/threading/interruptible_barrier.h> 30 #include <core/exceptions/software.h> 31 #include <core/exceptions/system.h> 56 append(
"Operation '%s' is not allowed on a sealed thread list", operation);
96 __name = strdup(tlname);
98 __finalize_mutex =
new Mutex();
112 __name = strdup(tlname);
114 __finalize_mutex =
new Mutex();
115 __wnw_barrier = NULL;
117 if ( maintain_barrier) update_barrier();
127 __name = strdup(tl.__name);
128 __sealed = tl.__sealed;
129 __finalize_mutex =
new Mutex();
130 __wnw_barrier = NULL;
131 if ( tl.__wnw_barrier != NULL ) update_barrier();
139 delete __finalize_mutex;
140 delete __wnw_barrier;
152 __name = strdup(tl.__name);
153 __sealed = tl.__sealed;
154 __finalize_mutex =
new Mutex();
155 __wnw_barrier = NULL;
156 if ( tl.__wnw_barrier != NULL ) update_barrier();
168 for (iterator i = begin(); i != end(); ++i) {
181 for (iterator i = begin(); i != end(); ++i) {
195 for (iterator i = begin(); i != end(); ++i) {
196 (*i)->wakeup(barrier);
210 unsigned int count = 1;
211 for (iterator i = begin(); i != end(); ++i) {
212 if ( ! (*i)->flagged_bad() ) {
214 (*i)->wakeup(barrier);
230 if (count != barrier->
count()) {
231 throw Exception(
"ThreadList(%s)::wakeup(): barrier has count (%u) different " 232 "from number of unflagged threads (%u)", __name, barrier->
count(), count);
249 if ( ! __wnw_barrier ) {
251 "barrier is maintained");
261 if ( ! __wnw_barrier->
wait(timeout_sec, timeout_nanosec) ) {
265 for (iterator i = begin(); i != end(); ++i) {
266 if ((*i)->flagged_bad()) {
271 for (iterator j = passed_threads->begin(); j != passed_threads->end(); ++j) {
283 __wnw_bad_barriers.push_back(make_pair(__wnw_barrier, bad_threads));
285 __wnw_barrier = NULL;
290 if ( bad_threads.size() > 1 ) {
291 s =
"Multiple threads did not finish in time, flagging as bad: ";
292 for (iterator i = bad_threads.begin(); i != bad_threads.end(); ++i) {
293 s += std::string((*i)->name()) +
" ";
295 }
else if (bad_threads.size() == 0) {
296 s =
"Timeout happened, but no bad threads recorded.";
298 throw Exception(
"Thread %s did not finish in time (max %f), flagging as bad",
299 bad_threads.front()->
name(),
300 (float)timeout_sec + (
float)timeout_nanosec / 1000000000.);
317 throw Exception(
"InterruptibleBarrier cannot be destroyed " 318 "when there still are threads in the wait() function");
320 delete __wnw_barrier;
321 __wnw_barrier = NULL;
322 if ( maintain_barrier ) update_barrier();
338 bool changed =
false;
339 __wnw_bbit = __wnw_bad_barriers.begin();
340 while (__wnw_bbit != __wnw_bad_barriers.end()) {
341 iterator i = __wnw_bbit->second.begin();
342 while (i != __wnw_bbit->second.end()) {
343 if ( (*i)->cancelled() ) {
345 i = __wnw_bbit->second.erase(i);
347 }
else if ( (*i)->waiting() ) {
349 recovered_threads.push_back((*i)->name());
352 i = __wnw_bbit->second.erase(i);
358 if ( __wnw_bbit->second.empty() && __wnw_bbit->first->no_threads_in_wait()) {
359 delete __wnw_bbit->first;
360 __wnw_bbit = __wnw_bad_barriers.erase(__wnw_bbit);
365 if ( changed ) update_barrier();
385 for (ThreadList::iterator i = begin(); i != end(); ++i) {
388 initializer->
init(*i);
390 cite.
append(
"Initialized failed to initialize thread '%s'", (*i)->name());
402 notify_of_failed_init();
403 cite.
append(
"Initializing thread '%s' in list '%s' failed",
404 (*i)->name(), __name);
410 notify_of_failed_init();
411 cite.
append(
"Could not initialize thread '%s'", (*i)->name());
416 }
catch (std::exception &e) {
417 notify_of_failed_init();
418 cite.
append(
"Could not initialize thread '%s'", (*i)->name());
419 cite.
append(
"Caught std::exception or derivative: %s", e.what());
424 notify_of_failed_init();
425 cite.
append(
"Could not initialize thread '%s'", (*i)->name());
426 cite.
append(
"Unknown exception caught");
434 initialized_threads.
finalize(finalizer);
449 for (iterator i = begin(); i != end(); ++i) {
475 for (iterator i = begin(); i != end(); ++i) {
501 for (iterator i = begin(); i != end(); ++i) {
516 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
540 bool can_finalize =
true;
542 bool threw_exception =
false;
543 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
549 can_finalize =
false;
551 if ( ! (*i)->prepare_finalize() ) {
552 can_finalize =
false;
555 cfte.
append(
"Thread '%s' threw an exception while preparing finalization of " 556 "ThreadList '%s' (IGNORED)", (*i)->name(), __name);
558 threw_exception =
true;
560 cfte.
append(
"Thread '%s' threw a generic exception while preparing finalization of " 561 "ThreadList '%s' (IGNORED)", (*i)->name(), __name);
563 threw_exception =
true;
566 if ( threw_exception ) {
586 Exception me(
"One or more threads failed to finalize");
587 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
592 me.
append(
"AspectIniFin called Thread[%s]::finalize() which failed", (*i)->name());
596 me.
append(
"AspectIniFin called Thread[%s]::finalize() which failed", (*i)->name());
600 me.
append(
"Thread[%s]::finalize() threw unsupported exception", (*i)->name());
606 me.
append(
"Could not finalize thread '%s' in list '%s'", (*i)->name(), __name);
623 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
624 (*i)->cancel_finalize();
641 for (i = begin(); i != end(); ++i) {
642 (*i)->set_prepfin_hold(hold);
647 for (iterator j = begin(); j != i; ++j) {
648 (*j)->set_prepfin_hold(
false);
663 bool caught_exception =
false;
664 Exception exc(
"Forced thread finalization failed");;
668 caught_exception =
true;
674 caught_exception =
true;
680 caught_exception =
true;
684 if (caught_exception) {
710 va_start(va, format);
713 if (vasprintf(&tmpname, format, va) != -1) {
753 if ( __wnw_barrier) update_barrier();
772 if ( __wnw_barrier) update_barrier();
786 if ( __wnw_barrier) update_barrier();
805 if ( __wnw_barrier) update_barrier();
818 if ( __wnw_barrier) update_barrier();
831 if ( __wnw_barrier) update_barrier();
845 if ( __wnw_barrier) update_barrier();
856 if ( __wnw_barrier) update_barrier();
867 if ( __wnw_barrier) update_barrier();
881 if ( __wnw_barrier) update_barrier();
888 ThreadList::update_barrier()
890 unsigned int num = 1;
891 for (iterator i = begin(); i != end(); ++i) {
892 if (! (*i)->flagged_bad() ) ++num;
895 delete __wnw_barrier;
900 __wnw_bad_barriers.push_back(make_pair(__wnw_barrier, empty_list));
908 ThreadList::notify_of_failed_init()
910 for (ThreadList::iterator i = begin(); i != end(); ++i) {
911 (*i)->notify_of_failed_init();
bool sealed()
Check if list is sealed.
ThreadListSealedException(const char *operation)
Constructor.
ThreadListNotSealedException(const char *format,...)
Constructor.
LockList< Type > & operator=(const LockList< Type > &ll)
Copy values from another LockList.
const char * name()
Name of the thread list.
unsigned int count()
Get number of threads this barrier will wait for.
virtual void init(Thread *thread)=0
This method is called by the ThreadManager for each newly added Thread.
void clear()
Clear the list.
void start()
Start threads.
bool no_threads_in_wait()
Checks if there are no more threads in the wait() function.
virtual void lock() const
Lock list.
void cancel_finalize()
Cancel finalization on all threads.
Fawkes library namespace.
void push_back_locked(Thread *thread)
Add thread to the end with lock protection.
void seal()
Seal the list.
virtual void finalize(Thread *thread)=0
Finalize a thread.
void set_prepfin_hold(bool hold)
Set prepfin hold on all threads.
void force_stop(ThreadFinalizer *finalizer)
Force stop of all threads.
A NULL pointer was supplied where not allowed.
Thread class encapsulation of pthreads.
void pop_back()
Remove last element.
bool prepare_finalize(ThreadFinalizer *finalizer)
Prepare finalize.
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
bool wait(unsigned int timeout_sec, unsigned int timeout_nanosec)
Wait for other threads.
Thread list sealed exception.
void push_front_locked(Thread *thread)
Add thread to the front with lock protection.
virtual bool prepare_finalize(Thread *thread)=0
Prepare finalization of a thread.
Thread cannot be initialized.
ThreadList & operator=(const ThreadList &tl)
Assignment operator.
ThreadList(const char *tlname="")
Constructor.
Base class for exceptions in Fawkes.
void remove_locked(Thread *thread)
Remove with lock protection.
ThreadList::iterator erase(iterator pos)
Erase element at given position.
void init(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Initialize threads.
Thread initializer interface.
void wakeup()
Wakeup all threads in list.
void finalize(ThreadFinalizer *finalizer)
Finalize Threads.
void pop_front()
Remove first element.
void append_va(const char *format, va_list va)
Append messages to the message list.
static const unsigned int FLAG_BAD
Standard thread flag: "thread is bad".
void push_back(Thread *thread)
Add thread to the end.
RefPtr< ThreadList > passed_threads()
Get a list of threads that passed the barrier.
RefPtr<> is a reference-counting shared smartpointer.
void wakeup_unlocked()
Wakeup all threads in list.
Thread cannot be finalized.
void set_name(const char *format,...)
Set name of thread.
void try_recover(std::list< std::string > &recovered_threads)
Check if any of the bad barriers recovered.
void set_maintain_barrier(bool maintain_barrier)
Set if this thread list should maintain a barrier.
Mutex mutual exclusion lock.
void push_front(Thread *thread)
Add thread to the front.
void wakeup_and_wait(unsigned int timeout_sec=0, unsigned int timeout_nanosec=0)
Wakeup threads and wait for them to finish.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
System ran out of memory and desired operation could not be fulfilled.
void cancel()
Cancel threads.
void append(const char *format,...)
Append messages to the message list.
void remove(Thread *thread)
Remove with lock protection.
Thread finalizer interface.