Fawkes API  Fawkes Development Version
thread_manager.cpp
1 
2 /***************************************************************************
3  * thread_manager.cpp - Thread manager
4  *
5  * Created: Thu Nov 3 19:11:31 2006 (on train to Cologne)
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <baseapp/thread_manager.h>
25 #include <core/threading/thread.h>
26 #include <core/threading/mutex_locker.h>
27 #include <core/threading/wait_condition.h>
28 #include <core/threading/thread_initializer.h>
29 #include <core/threading/thread_finalizer.h>
30 #include <core/exceptions/software.h>
31 #include <core/exceptions/system.h>
32 
33 #include <aspect/blocked_timing.h>
34 
35 namespace fawkes {
36 #if 0 /* just to make Emacs auto-indent happy */
37 }
38 #endif
39 
40 /** @class ThreadManager <baseapp/thread_manager.h>
41  * Base application thread manager.
42  * This class provides a manager for the threads. Threads are memorized by
43  * their wakeup hook. When the thread manager is deleted, all threads are
44  * appropriately cancelled, joined and deleted. Thus the thread manager
45  * can be used for "garbage collection" of threads.
46  *
47  * The thread manager allows easy wakeup of threads of a given wakeup hook.
48  *
49  * The thread manager needs a thread initializer. Each thread that is added
50  * to the thread manager is initialized with this. The runtime type information
51  * (RTTI) supplied by C++ can be used to initialize threads if appropriate
52  * (if the thread has certain aspects that need special treatment).
53  *
54  * @author Tim Niemueller
55  */
56 
57 /** Constructor.
58  * @param parent_manager parent thread manager
59  */
60 ThreadManager::ThreadManagerAspectCollector::ThreadManagerAspectCollector(ThreadManager *parent_manager)
61 {
62  __parent_manager = parent_manager;
63 }
64 
65 
66 void
67 ThreadManager::ThreadManagerAspectCollector::add(ThreadList &tl)
68 {
69  BlockedTimingAspect *timed_thread;
70 
71  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
72  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
73  throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
74  }
75  }
76 
77  __parent_manager->add_maybelocked(tl, /* lock */ false);
78 }
79 
80 
81 void
82 ThreadManager::ThreadManagerAspectCollector::add(Thread *t)
83 {
84  BlockedTimingAspect *timed_thread;
85 
86  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
87  throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
88  }
89 
90  __parent_manager->add_maybelocked(t, /* lock */ false);
91 }
92 
93 
94 void
95 ThreadManager::ThreadManagerAspectCollector::remove(ThreadList &tl)
96 {
97  BlockedTimingAspect *timed_thread;
98 
99  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
100  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
101  throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
102  }
103  }
104 
105  __parent_manager->remove_maybelocked(tl, /* lock */ false);
106 }
107 
108 
109 void
110 ThreadManager::ThreadManagerAspectCollector::remove(Thread *t)
111 {
112  BlockedTimingAspect *timed_thread;
113 
114  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
115  throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
116  }
117 
118  __parent_manager->remove_maybelocked(t, /* lock */ false);
119 }
120 
121 
122 void
123 ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::ThreadList &tl)
124 {
125  throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
126 }
127 
128 void
129 ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::Thread *t)
130 {
131  throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
132 }
133 
134 
135 /** Constructor.
136  * When using this constructor you need to make sure to call set_inifin()
137  * before any thread is added.
138  */
140 {
141  __initializer = NULL;
142  __finalizer = NULL;
143  __threads.clear();
144  __waitcond_timedthreads = new WaitCondition();
145  __interrupt_timed_thread_wait = false;
146  __aspect_collector = new ThreadManagerAspectCollector(this);
147 }
148 
149 /** Constructor.
150  * This contsructor is equivalent to the one without parameters followed
151  * by a call to set_inifins().
152  * @param initializer thread initializer
153  * @param finalizer thread finalizer
154  */
156  ThreadFinalizer *finalizer)
157 {
158  __initializer = NULL;
159  __finalizer = NULL;
160  __threads.clear();
161  __waitcond_timedthreads = new WaitCondition();
162  __interrupt_timed_thread_wait = false;
163  __aspect_collector = new ThreadManagerAspectCollector(this);
164  set_inifin(initializer, finalizer);
165 }
166 
167 
168 /** Destructor. */
170 {
171  // stop all threads, we call finalize, and we run through it as long as there are
172  // still running threads, after that, we force the thread's death.
173  for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) {
174  try {
175  __tit->second.force_stop(__finalizer);
176  } catch (Exception &e) {} // ignore
177  }
178  try {
179  __untimed_threads.force_stop(__finalizer);
180  } catch (Exception &e) {} // ignore
181  __threads.clear();
182 
183  delete __waitcond_timedthreads;
184  delete __aspect_collector;
185 }
186 
187 
188 /** Set initializer/finalizer.
189  * This method has to be called before any thread is added/removed.
190  * @param initializer thread initializer
191  * @param finalizer thread finalizer
192  */
193 void
195 {
196  __initializer = initializer;
197  __finalizer = finalizer;
198 }
199 
200 
201 /** Remove the given thread from internal structures.
202  * Thread is removed from the internal structures. If the thread has the
203  * BlockedTimingAspect then the hook is added to the changed list.
204  *
205  * @param t thread to remove
206  * @param changed list of changed hooks, appropriate hook is added if necessary
207  */
208 void
209 ThreadManager::internal_remove_thread(Thread *t)
210 {
211  BlockedTimingAspect *timed_thread;
212 
213  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
214  // find thread and remove
216  if ( __threads.find(hook) != __threads.end() ) {
217  __threads[hook].remove_locked(t);
218  if (__threads[hook].empty()) __threads.erase(hook);
219  }
220  } else {
221  __untimed_threads.remove_locked(t);
222  }
223 }
224 
225 
226 /** Add the given thread to internal structures.
227  * Thread is added to the internal structures. If the thread has the
228  * BlockedTimingAspect then the hook is added to the changed list.
229  *
230  * @param t thread to add
231  * @param changed list of changed hooks, appropriate hook is added if necessary
232  */
233 void
234 ThreadManager::internal_add_thread(Thread *t)
235 {
236  BlockedTimingAspect *timed_thread;
237  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
239 
240  if ( __threads.find(hook) == __threads.end() ) {
241  __threads[hook].set_name("ThreadManagerList Hook %i", hook);
242  __threads[hook].set_maintain_barrier(true);
243  }
244  __threads[hook].push_back_locked(t);
245 
246  __waitcond_timedthreads->wake_all();
247  } else {
248  __untimed_threads.push_back_locked(t);
249  }
250 }
251 
252 
253 /** Add threads.
254  * Add the given threads to the thread manager. The threads are initialised
255  * as appropriate and started. See the class documentation for supported
256  * specialisations of threads and the performed initialisation steps.
257  * If the thread initializer cannot initalize one or more threads no thread
258  * is added. In this regard the operation is atomic, either all threads are
259  * added or none.
260  * @param tl thread list with threads to add
261  * @exception CannotInitializeThreadException thrown if at least one of the
262  * threads could not be initialised
263  */
264 void
265 ThreadManager::add_maybelocked(ThreadList &tl, bool lock)
266 {
267  if ( ! (__initializer && __finalizer) ) {
268  throw NullPointerException("ThreadManager: initializer/finalizer not set");
269  }
270 
271  if ( tl.sealed() ) {
272  throw Exception("Not accepting new threads from list that is not fresh, "
273  "list '%s' already sealed", tl.name());
274  }
275 
276  tl.lock();
277 
278  // Try to initialise all threads
279  try {
280  tl.init(__initializer, __finalizer);
281  } catch (Exception &e) {
282  tl.unlock();
283  throw;
284  }
285 
286  tl.seal();
287  tl.start();
288 
289  // All thread initialized, now add threads to internal structure
290  MutexLocker locker(__threads.mutex(), lock);
291  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
292  internal_add_thread(*i);
293  }
294 
295  tl.unlock();
296 }
297 
298 
299 /** Add one thread.
300  * Add the given thread to the thread manager. The thread is initialized
301  * as appropriate and started. See the class documentation for supported
302  * specialisations of threads and the performed initialisation steps.
303  * If the thread initializer cannot initalize the thread it is not added.
304  * @param thread thread to add
305  * @param lock if true the environment is locked before adding the thread
306  * @exception CannotInitializeThreadException thrown if at least the
307  * thread could not be initialised
308  */
309 void
310 ThreadManager::add_maybelocked(Thread *thread, bool lock)
311 {
312  if ( thread == NULL ) {
313  throw NullPointerException("FawkesThreadMananger: cannot add NULL as thread");
314  }
315 
316  if ( ! (__initializer && __finalizer) ) {
317  throw NullPointerException("ThreadManager: initializer/finalizer not set");
318  }
319 
320  try {
321  __initializer->init(thread);
322  } catch (CannotInitializeThreadException &e) {
323  thread->notify_of_failed_init();
324  e.append("Adding thread in ThreadManager failed");
325  throw;
326  }
327 
328  // if the thread's init() method fails, we need to finalize that very
329  // thread only with the finalizer, already initialized threads muts be
330  // fully finalized
331  try {
332  thread->init();
333  } catch (CannotInitializeThreadException &e) {
334  thread->notify_of_failed_init();
335  __finalizer->finalize(thread);
336  throw;
337  } catch (Exception &e) {
338  thread->notify_of_failed_init();
340  cite("Could not initialize thread '%s'", thread->name());
341  cite.append(e);
342  __finalizer->finalize(thread);
343  throw cite;
344  } catch (std::exception &e) {
345  thread->notify_of_failed_init();
347  cite("Could not initialize thread '%s'", thread->name());
348  cite.append("Caught std::exception or derivative: %s", e.what());
349  __finalizer->finalize(thread);
350  throw cite;
351  } catch (...) {
352  thread->notify_of_failed_init();
354  cite("Could not initialize thread '%s'", thread->name());
355  cite.append("Unknown exception caught");
356  __finalizer->finalize(thread);
357  throw cite;
358  }
359 
360  thread->start();
361  MutexLocker locker(__threads.mutex(), lock);
362  internal_add_thread(thread);
363 }
364 
365 
366 /** Remove the given threads.
367  * The thread manager tries to finalize and stop the threads and then removes the
368  * threads from the internal structures.
369  *
370  * This may fail if at least one thread of the given list cannot be finalized, for
371  * example if prepare_finalize() returns false or if the thread finalizer cannot
372  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
373  *
374  * @param tl threads to remove.
375  * @exception CannotFinalizeThreadException At least one thread cannot be safely
376  * finalized
377  * @exception ThreadListNotSealedException if the given thread lits tl is not
378  * sealed the thread manager will refuse to remove it
379  */
380 void
381 ThreadManager::remove_maybelocked(ThreadList &tl, bool lock)
382 {
383  if ( ! (__initializer && __finalizer) ) {
384  throw NullPointerException("ThreadManager: initializer/finalizer not set");
385  }
386 
387 
388  if ( ! tl.sealed() ) {
389  throw ThreadListNotSealedException("(ThreadManager) Cannot remove unsealed thread "
390  "list. Not accepting unsealed list '%s' for removal",
391  tl.name());
392  }
393 
394  tl.lock();
395  MutexLocker locker(__threads.mutex(), lock);
396 
397  try {
398  if ( ! tl.prepare_finalize(__finalizer) ) {
399  tl.cancel_finalize();
400  tl.unlock();
401  throw CannotFinalizeThreadException("One or more threads in list '%s' cannot be "
402  "finalized", tl.name());
403  }
404  } catch (CannotFinalizeThreadException &e) {
405  tl.unlock();
406  throw;
407  } catch (Exception &e) {
408  tl.unlock();
409  e.append("One or more threads in list '%s' cannot be finalized", tl.name());
411  }
412 
413  tl.stop();
414  try {
415  tl.finalize(__finalizer);
416  } catch (Exception &e) {
417  tl.unlock();
418  throw;
419  }
420 
421  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
422  internal_remove_thread(*i);
423  }
424 
425  tl.unlock();
426 }
427 
428 
429 /** Remove the given thread.
430  * The thread manager tries to finalize and stop the thread and then removes the
431  * thread from the internal structures.
432  *
433  * This may fail if the thread cannot be finalized, for
434  * example if prepare_finalize() returns false or if the thread finalizer cannot
435  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
436  *
437  * @param thread thread to remove.
438  * @exception CannotFinalizeThreadException At least one thread cannot be safely
439  * finalized
440  */
441 void
442 ThreadManager::remove_maybelocked(Thread *thread, bool lock)
443 {
444  if ( thread == NULL ) return;
445 
446  if ( ! (__initializer && __finalizer) ) {
447  throw NullPointerException("ThreadManager: initializer/finalizer not set");
448  }
449 
450  MutexLocker locker(__threads.mutex(), lock);
451  try {
452  if ( ! thread->prepare_finalize() ) {
453  thread->cancel_finalize();
454  throw CannotFinalizeThreadException("Thread '%s'cannot be finalized", thread->name());
455  }
456  } catch (CannotFinalizeThreadException &e) {
457  e.append("ThreadManager cannot stop thread '%s'", thread->name());
458  thread->cancel_finalize();
459  throw;
460  }
461 
462  thread->cancel();
463  thread->join();
464  __finalizer->finalize(thread);
465  thread->finalize();
466 
467  internal_remove_thread(thread);
468 }
469 
470 
471 
472 
473 /** Force removal of the given threads.
474  * The thread manager tries to finalize and stop the threads and then removes the
475  * threads from the internal structures.
476  *
477  * This will succeed even if a thread of the given list cannot be finalized, for
478  * example if prepare_finalize() returns false or if the thread finalizer cannot
479  * finalize the thread.
480  *
481  * <b>Caution, using this function may damage your robot.</b>
482  *
483  * @param tl threads to remove.
484  * @exception ThreadListNotSealedException if the given thread lits tl is not
485  * sealed the thread manager will refuse to remove it
486  * The threads are removed from thread manager control. The threads will be stopped
487  * before they are removed (may cause unpredictable results otherwise).
488  */
489 void
491 {
492  if ( ! tl.sealed() ) {
493  throw ThreadListNotSealedException("Not accepting unsealed list '%s' for removal",
494  tl.name());
495  }
496 
497  tl.lock();
498  __threads.mutex()->stopby();
499  bool caught_exception = false;
500  Exception exc("Forced removal of thread list %s failed", tl.name());
501  try {
502  tl.force_stop(__finalizer);
503  } catch (Exception &e) {
504  caught_exception = true;
505  exc = e;
506  }
507 
508  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
509  internal_remove_thread(*i);
510  }
511 
512  tl.unlock();
513 
514  if (caught_exception) {
515  throw exc;
516  }
517 }
518 
519 
520 /** Force removal of the given thread.
521  * The thread manager tries to finalize and stop the thread and then removes the
522  * thread from the internal structures.
523  *
524  * This will succeed even if the thread cannot be finalized, for
525  * example if prepare_finalize() returns false or if the thread finalizer cannot
526  * finalize the thread.
527  *
528  * <b>Caution, using this function may damage your robot.</b>
529  *
530  * @param thread thread to remove.
531  * @exception ThreadListNotSealedException if the given thread lits tl is not
532  * sealed the thread manager will refuse to remove it
533  * The threads are removed from thread manager control. The threads will be stopped
534  * before they are removed (may cause unpredictable results otherwise).
535  */
536 void
538 {
539  MutexLocker lock(__threads.mutex());
540  try {
541  thread->prepare_finalize();
542  } catch (Exception &e) {
543  // ignore
544  }
545 
546  thread->cancel();
547  thread->join();
548  if (__finalizer) __finalizer->finalize(thread);
549  thread->finalize();
550 
551  internal_remove_thread(thread);
552 }
553 
554 
555 void
557  unsigned int timeout_usec)
558 {
559  MutexLocker lock(__threads.mutex());
560 
561  unsigned int timeout_sec = 0;
562  if (timeout_usec >= 1000000) {
563  timeout_sec = timeout_usec / 1000000;
564  timeout_usec -= timeout_sec * 1000000;
565  }
566 
567  // Note that the following lines might throw an exception, we just pass it on
568  if ( __threads.find(hook) != __threads.end() ) {
569  __threads[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
570  }
571 }
572 
573 
574 void
576 {
577  MutexLocker lock(__threads.mutex());
578 
579  if ( __threads.find(hook) != __threads.end() ) {
580  if ( barrier ) {
581  __threads[hook].wakeup(barrier);
582  } else {
583  __threads[hook].wakeup();
584  }
585  if ( __threads[hook].size() == 0 ) {
586  __threads.erase(hook);
587  }
588  }
589 }
590 
591 
592 void
593 ThreadManager::try_recover(std::list<std::string> &recovered_threads)
594 {
595  __threads.lock();
596  for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) {
597  __tit->second.try_recover(recovered_threads);
598  }
599  __threads.unlock();
600 }
601 
602 
603 bool
605 {
606  return (__threads.size() > 0);
607 }
608 
609 
610 void
612 {
613  __interrupt_timed_thread_wait = false;
614  __waitcond_timedthreads->wait();
615  if ( __interrupt_timed_thread_wait ) {
616  __interrupt_timed_thread_wait = false;
617  throw InterruptedException("Waiting for timed threads was interrupted");
618  }
619 }
620 
621 void
623 {
624  __interrupt_timed_thread_wait = true;
625  __waitcond_timedthreads->wake_all();
626 }
627 
628 
629 
630 /** Get a thread collector to be used for an aspect initializer.
631  * @return thread collector instance to use for ThreadProducerAspect.
632  */
635 {
636  return __aspect_collector;
637 }
638 
639 } // end namespace fawkes
bool sealed()
Check if list is sealed.
void set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Set initializer/finalizer.
Wait until a given condition holds.
const char * name()
Name of the thread list.
virtual void init(Thread *thread)=0
This method is called by the ThreadManager for each newly added Thread.
void start()
Start threads.
virtual void lock() const
Lock list.
Definition: lock_list.h:128
void cancel_finalize()
Cancel finalization on all threads.
Fawkes library namespace.
void wake_all()
Wake up all waiting threads.
void push_back_locked(Thread *thread)
Add thread to the end with lock protection.
Mutex locking helper.
Definition: mutex_locker.h:33
void seal()
Seal the list.
virtual ~ThreadManager()
Destructor.
virtual void finalize(Thread *thread)=0
Finalize a thread.
void cancel_finalize()
Cancel finalization.
Definition: thread.cpp:492
Thread collector.
void force_stop(ThreadFinalizer *finalizer)
Force stop of all threads.
A NULL pointer was supplied where not allowed.
Definition: software.h:34
virtual bool timed_threads_exist()
Check if any timed threads exist.
Thread class encapsulation of pthreads.
Definition: thread.h:42
virtual void force_remove(ThreadList &tl)
Force removal of the given threads.
bool prepare_finalize(ThreadFinalizer *finalizer)
Prepare finalize.
ThreadManager()
Constructor.
Thread aspect to use blocked timing.
virtual void wait_for_timed_threads()
Wait for timed threads.
List of threads.
Definition: thread_list.h:57
Thread cannot be initialized.
WakeupHook
Type to define at which hook the thread is woken up.
Base class for exceptions in Fawkes.
Definition: exception.h:36
void remove_locked(Thread *thread)
Remove with lock protection.
void init(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Initialize threads.
void stop()
Stop threads.
Thread initializer interface.
virtual void finalize()
Finalize the thread.
Definition: thread.cpp:473
bool prepare_finalize()
Prepare finalization.
Definition: thread.cpp:383
virtual void unlock() const
Unlock list.
Definition: lock_list.h:144
void finalize(ThreadFinalizer *finalizer)
Finalize Threads.
const char * name() const
Get name of thread.
Definition: thread.h:95
void notify_of_failed_init()
Notify of failed init.
Definition: thread.cpp:1210
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
void wait()
Wait for the condition forever.
virtual void wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier=0)
Wakeup thread for given hook.
virtual void wakeup_and_wait(BlockedTimingAspect::WakeupHook hook, unsigned int timeout_usec=0)
Wakeup thread for given hook and wait for completion.
void cancel()
Cancel a thread.
Definition: thread.cpp:651
Thread list not sealed exception.
Definition: thread_list.h:50
virtual void interrupt_timed_thread_wait()
Interrupt any currently running wait_for_timed_threads() and cause it to throw an InterruptedExceptio...
Thread cannot be finalized.
void join()
Join the thread.
Definition: thread.cpp:610
virtual void try_recover(std::list< std::string > &recovered_threads)
Try to recover threads.
WakeupHook blockedTimingAspectHook() const
Get the wakeup hook.
virtual void init()
Initialize the thread.
Definition: thread.cpp:350
ThreadCollector * aspect_collector() const
Get a thread collector to be used for an aspect initializer.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32
void append(const char *format,...)
Append messages to the message list.
Definition: exception.cpp:341
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:511
Thread finalizer interface.