Fawkes API  Fawkes Development Version
thread_manager.cpp
00001 
00002 /***************************************************************************
00003  *  thread_manager.cpp - Thread manager
00004  *
00005  *  Created: Thu Nov  3 19:11:31 2006 (on train to Cologne)
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <baseapp/thread_manager.h>
00025 #include <core/threading/thread.h>
00026 #include <core/threading/mutex_locker.h>
00027 #include <core/threading/wait_condition.h>
00028 #include <core/threading/thread_initializer.h>
00029 #include <core/threading/thread_finalizer.h>
00030 #include <core/exceptions/software.h>
00031 #include <core/exceptions/system.h>
00032 
00033 #include <aspect/blocked_timing.h>
00034 
00035 namespace fawkes {
00036 #if 0 /* just to make Emacs auto-indent happy */
00037 }
00038 #endif
00039 
00040 /** @class ThreadManager <baseapp/thread_manager.h>
00041  * Base application thread manager.
00042  * This class provides a manager for the threads. Threads are memorized by
00043  * their wakeup hook. When the thread manager is deleted, all threads are
00044  * appropriately cancelled, joined and deleted. Thus the thread manager
00045  * can be used for "garbage collection" of threads.
00046  *
00047  * The thread manager allows easy wakeup of threads of a given wakeup hook.
00048  *
00049  * The thread manager needs a thread initializer. Each thread that is added
00050  * to the thread manager is initialized with this. The runtime type information
00051  * (RTTI) supplied by C++ can be used to initialize threads if appropriate
00052  * (if the thread has certain aspects that need special treatment).
00053  *
00054  * @author Tim Niemueller
00055  */
00056 
00057 /** Constructor.
00058  * @param parent_manager parent thread manager
00059  */
00060 ThreadManager::ThreadManagerAspectCollector::ThreadManagerAspectCollector(ThreadManager *parent_manager)
00061 {
00062   __parent_manager = parent_manager;
00063 }
00064 
00065 
00066 void
00067 ThreadManager::ThreadManagerAspectCollector::add(ThreadList &tl)
00068 {
00069   BlockedTimingAspect *timed_thread;
00070 
00071   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00072     if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
00073       throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
00074     }
00075   }
00076 
00077   __parent_manager->add_maybelocked(tl, /* lock */ false);
00078 }
00079 
00080 
00081 void
00082 ThreadManager::ThreadManagerAspectCollector::add(Thread *t)
00083 {
00084   BlockedTimingAspect *timed_thread;
00085 
00086   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00087     throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
00088   }
00089 
00090   __parent_manager->add_maybelocked(t, /* lock */ false);
00091 }
00092 
00093 
00094 void
00095 ThreadManager::ThreadManagerAspectCollector::remove(ThreadList &tl)
00096 {
00097   BlockedTimingAspect *timed_thread;
00098 
00099   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00100     if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
00101       throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
00102     }
00103   }
00104 
00105   __parent_manager->remove_maybelocked(tl, /* lock */ false);
00106 }
00107 
00108 
00109 void
00110 ThreadManager::ThreadManagerAspectCollector::remove(Thread *t)
00111 {
00112   BlockedTimingAspect *timed_thread;
00113 
00114   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00115     throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
00116   }
00117 
00118   __parent_manager->remove_maybelocked(t, /* lock */ false);
00119 }
00120 
00121 
00122 void
00123 ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::ThreadList &tl)
00124 {
00125   throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
00126 }
00127 
00128 void
00129 ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::Thread *t)
00130 {
00131   throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
00132 }
00133 
00134 
00135 /** Constructor.
00136  * When using this constructor you need to make sure to call set_inifin()
00137  * before any thread is added.
00138  */
00139 ThreadManager::ThreadManager()
00140 {
00141   __initializer = NULL;
00142   __finalizer   = NULL;
00143   __threads.clear();
00144   __waitcond_timedthreads = new WaitCondition();
00145   __interrupt_timed_thread_wait = false;
00146   __aspect_collector = new ThreadManagerAspectCollector(this);
00147 }
00148 
00149 /** Constructor.
00150  * This contsructor is equivalent to the one without parameters followed
00151  * by a call to set_inifins().
00152  * @param initializer thread initializer
00153  * @param finalizer thread finalizer
00154  */
00155 ThreadManager::ThreadManager(ThreadInitializer *initializer,
00156                              ThreadFinalizer *finalizer)
00157 {
00158   __initializer = NULL;
00159   __finalizer   = NULL;
00160   __threads.clear();
00161   __waitcond_timedthreads = new WaitCondition();
00162   __interrupt_timed_thread_wait = false;
00163   __aspect_collector = new ThreadManagerAspectCollector(this);
00164   set_inifin(initializer, finalizer);
00165 }
00166 
00167 
00168 /** Destructor. */
00169 ThreadManager::~ThreadManager()
00170 {
00171   // stop all threads, we call finalize, and we run through it as long as there are
00172   // still running threads, after that, we force the thread's death.
00173   for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) {
00174     __tit->second.force_stop(__finalizer);
00175   }
00176   __untimed_threads.force_stop(__finalizer);
00177   __threads.clear();
00178 
00179   delete __waitcond_timedthreads;
00180   delete __aspect_collector;
00181 }
00182 
00183 
00184 /** Set initializer/finalizer.
00185  * This method has to be called before any thread is added/removed.
00186  * @param initializer thread initializer
00187  * @param finalizer thread finalizer
00188  */
00189 void
00190 ThreadManager::set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
00191 {
00192   __initializer = initializer;
00193   __finalizer   = finalizer;
00194 }
00195 
00196 
00197 /** Remove the given thread from internal structures.
00198  * Thread is removed from the internal structures. If the thread has the
00199  * BlockedTimingAspect then the hook is added to the changed list.
00200  *
00201  * @param t thread to remove
00202  * @param changed list of changed hooks, appropriate hook is added if necessary
00203  */
00204 void
00205 ThreadManager::internal_remove_thread(Thread *t)
00206 {
00207   BlockedTimingAspect *timed_thread;
00208 
00209   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00210     // find thread and remove
00211     BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook();
00212     if ( __threads.find(hook) != __threads.end() ) {
00213       __threads[hook].remove_locked(t);
00214       if (__threads[hook].empty())  __threads.erase(hook);
00215     }
00216   } else {
00217     __untimed_threads.remove_locked(t);
00218   }
00219 }
00220 
00221 
00222 /** Add the given thread to internal structures.
00223  * Thread is added to the internal structures. If the thread has the
00224  * BlockedTimingAspect then the hook is added to the changed list.
00225  *
00226  * @param t thread to add
00227  * @param changed list of changed hooks, appropriate hook is added if necessary
00228  */
00229 void
00230 ThreadManager::internal_add_thread(Thread *t)
00231 {
00232   BlockedTimingAspect *timed_thread;
00233   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00234     BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook();
00235 
00236     if ( __threads.find(hook) == __threads.end() ) {
00237       __threads[hook].set_name("ThreadManagerList Hook %i", hook);
00238       __threads[hook].set_maintain_barrier(true);
00239     }
00240     __threads[hook].push_back_locked(t);
00241 
00242     __waitcond_timedthreads->wake_all();
00243   } else {
00244     __untimed_threads.push_back_locked(t);
00245   }
00246 }
00247 
00248 
00249 /** Add threads.
00250  * Add the given threads to the thread manager. The threads are initialised
00251  * as appropriate and started. See the class documentation for supported
00252  * specialisations of threads and the performed initialisation steps.
00253  * If the thread initializer cannot initalize one or more threads no thread
00254  * is added. In this regard the operation is atomic, either all threads are
00255  * added or none.
00256  * @param tl thread list with threads to add
00257  * @exception CannotInitializeThreadException thrown if at least one of the
00258  * threads could not be initialised
00259  */
00260 void
00261 ThreadManager::add_maybelocked(ThreadList &tl, bool lock)
00262 {
00263   if ( ! (__initializer && __finalizer) ) {
00264     throw NullPointerException("ThreadManager: initializer/finalizer not set");
00265   }
00266 
00267   if ( tl.sealed() ) {
00268     throw Exception("Not accepting new threads from list that is not fresh, "
00269                     "list '%s' already sealed", tl.name());
00270   }
00271 
00272   tl.lock();
00273 
00274   // Try to initialise all threads
00275   try {
00276     tl.init(__initializer, __finalizer);
00277   } catch (Exception &e) {
00278     tl.unlock();
00279     throw;
00280   }
00281 
00282   tl.seal();
00283   tl.start();
00284 
00285   // All thread initialized, now add threads to internal structure
00286   MutexLocker locker(__threads.mutex(), lock);
00287   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00288     internal_add_thread(*i);
00289   }
00290 
00291   tl.unlock();
00292 }
00293 
00294 
00295 /** Add one thread.
00296  * Add the given thread to the thread manager. The thread is initialized
00297  * as appropriate and started. See the class documentation for supported
00298  * specialisations of threads and the performed initialisation steps.
00299  * If the thread initializer cannot initalize the thread it is not added.
00300  * @param thread thread to add
00301  * @param lock if true the environment is locked before adding the thread
00302  * @exception CannotInitializeThreadException thrown if at least the
00303  * thread could not be initialised
00304  */
00305 void
00306 ThreadManager::add_maybelocked(Thread *thread, bool lock)
00307 {
00308   if ( thread == NULL ) {
00309     throw NullPointerException("FawkesThreadMananger: cannot add NULL as thread");
00310   }
00311 
00312   if ( ! (__initializer && __finalizer) ) {
00313     throw NullPointerException("ThreadManager: initializer/finalizer not set");
00314   }
00315 
00316   try {
00317     __initializer->init(thread);
00318   } catch (CannotInitializeThreadException &e) {
00319     e.append("Adding thread in ThreadManager failed");
00320     throw;
00321   }
00322 
00323   thread->start();
00324   MutexLocker locker(__threads.mutex(), lock);
00325   internal_add_thread(thread);
00326 }
00327 
00328 
00329 /** Remove the given threads.
00330  * The thread manager tries to finalize and stop the threads and then removes the
00331  * threads from the internal structures.
00332  *
00333  * This may fail if at least one thread of the given list cannot be finalized, for
00334  * example if prepare_finalize() returns false or if the thread finalizer cannot
00335  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
00336  *
00337  * @param tl threads to remove.
00338  * @exception CannotFinalizeThreadException At least one thread cannot be safely
00339  * finalized
00340  * @exception ThreadListNotSealedException if the given thread lits tl is not
00341  * sealed the thread manager will refuse to remove it
00342  */
00343 void
00344 ThreadManager::remove_maybelocked(ThreadList &tl, bool lock)
00345 {
00346   if ( ! (__initializer && __finalizer) ) {
00347     throw NullPointerException("ThreadManager: initializer/finalizer not set");
00348   }
00349 
00350 
00351   if ( ! tl.sealed() ) {
00352     throw ThreadListNotSealedException("(ThreadManager) Cannot remove unsealed thread "
00353                                        "list. Not accepting unsealed list '%s' for removal",
00354                                        tl.name());
00355   }
00356 
00357   tl.lock();
00358   MutexLocker locker(__threads.mutex(), lock);
00359 
00360   try {
00361     if ( ! tl.prepare_finalize(__finalizer) ) {
00362       tl.cancel_finalize();
00363       tl.unlock();
00364       throw CannotFinalizeThreadException("One or more threads in list '%s' cannot be "
00365                                           "finalized", tl.name());
00366     }
00367   } catch (CannotFinalizeThreadException &e) {
00368     tl.unlock();
00369     throw;
00370   } catch (Exception &e) {
00371     tl.unlock();
00372     e.append("One or more threads in list '%s' cannot be finalized", tl.name());
00373     throw CannotFinalizeThreadException(e);
00374   }
00375 
00376   tl.stop();
00377   try {
00378     tl.finalize(__finalizer);
00379   } catch (Exception &e) {
00380     tl.unlock();
00381     throw;
00382   }
00383 
00384   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00385     internal_remove_thread(*i);
00386   }
00387 
00388   tl.unlock();
00389 }
00390 
00391 
00392 /** Remove the given thread.
00393  * The thread manager tries to finalize and stop the thread and then removes the
00394  * thread from the internal structures.
00395  *
00396  * This may fail if the thread cannot be finalized, for
00397  * example if prepare_finalize() returns false or if the thread finalizer cannot
00398  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
00399  *
00400  * @param thread thread to remove.
00401  * @exception CannotFinalizeThreadException At least one thread cannot be safely
00402  * finalized
00403  */
00404 void
00405 ThreadManager::remove_maybelocked(Thread *thread, bool lock)
00406 {
00407   if ( thread == NULL ) return;
00408 
00409   if ( ! (__initializer && __finalizer) ) {
00410     throw NullPointerException("ThreadManager: initializer/finalizer not set");
00411   }
00412 
00413   MutexLocker locker(__threads.mutex(), lock);
00414   try {
00415     if ( ! thread->prepare_finalize() ) {
00416       thread->cancel_finalize();
00417       throw CannotFinalizeThreadException("Thread '%s'cannot be finalized", thread->name());
00418     }
00419   } catch (CannotFinalizeThreadException &e) {
00420     e.append("ThreadManager cannot stop thread '%s'", thread->name());
00421     thread->cancel_finalize();
00422     throw;
00423   }
00424 
00425   thread->cancel();
00426   thread->join();
00427   __finalizer->finalize(thread);
00428   thread->finalize();
00429 
00430   internal_remove_thread(thread);
00431 }
00432 
00433 
00434 
00435 
00436 /** Force removal of the given threads.
00437  * The thread manager tries to finalize and stop the threads and then removes the
00438  * threads from the internal structures.
00439  *
00440  * This will succeed even if a thread of the given list cannot be finalized, for
00441  * example if prepare_finalize() returns false or if the thread finalizer cannot
00442  * finalize the thread.
00443  *
00444  * <b>Caution, using this function may damage your robot.</b>
00445  *
00446  * @param tl threads to remove.
00447  * @exception ThreadListNotSealedException if the given thread lits tl is not
00448  * sealed the thread manager will refuse to remove it
00449  * The threads are removed from thread manager control. The threads will be stopped
00450  * before they are removed (may cause unpredictable results otherwise).
00451  */
00452 void
00453 ThreadManager::force_remove(ThreadList &tl)
00454 {
00455   if ( ! tl.sealed() ) {
00456     throw ThreadListNotSealedException("Not accepting unsealed list '%s' for removal",
00457                                        tl.name());
00458   }
00459 
00460   tl.lock();
00461   __threads.mutex()->stopby();
00462   tl.force_stop(__finalizer);
00463 
00464   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00465     internal_remove_thread(*i);
00466   }
00467 
00468   tl.unlock();
00469 }
00470 
00471 
00472 /** Force removal of the given thread.
00473  * The thread manager tries to finalize and stop the thread and then removes the
00474  * thread from the internal structures.
00475  *
00476  * This will succeed even if the thread cannot be finalized, for
00477  * example if prepare_finalize() returns false or if the thread finalizer cannot
00478  * finalize the thread.
00479  *
00480  * <b>Caution, using this function may damage your robot.</b>
00481  *
00482  * @param thread thread to remove.
00483  * @exception ThreadListNotSealedException if the given thread lits tl is not
00484  * sealed the thread manager will refuse to remove it
00485  * The threads are removed from thread manager control. The threads will be stopped
00486  * before they are removed (may cause unpredictable results otherwise).
00487  */
00488 void
00489 ThreadManager::force_remove(fawkes::Thread *thread)
00490 {
00491   MutexLocker lock(__threads.mutex());
00492   try {
00493     thread->prepare_finalize();
00494   } catch (Exception &e) {
00495     // ignore
00496   }
00497 
00498   thread->cancel();
00499   thread->join();
00500   if (__finalizer) __finalizer->finalize(thread);
00501   thread->finalize();
00502 
00503   internal_remove_thread(thread);
00504 }
00505 
00506 
00507 void
00508 ThreadManager::wakeup_and_wait(BlockedTimingAspect::WakeupHook hook,
00509                                unsigned int timeout_usec)
00510 {
00511   MutexLocker lock(__threads.mutex());
00512 
00513   unsigned int timeout_sec = 0;
00514   if (timeout_usec >= 1000000) {
00515     timeout_sec   = timeout_usec / 1000000;
00516     timeout_usec -= timeout_sec  * 1000000;
00517   }
00518 
00519   // Note that the following lines might throw an exception, we just pass it on
00520   if ( __threads.find(hook) != __threads.end() ) {
00521     __threads[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
00522   }
00523 }
00524 
00525 
00526 void
00527 ThreadManager::wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier)
00528 {
00529   MutexLocker lock(__threads.mutex());
00530 
00531   if ( __threads.find(hook) != __threads.end() ) {
00532     if ( barrier ) {
00533       __threads[hook].wakeup(barrier);
00534     } else {
00535       __threads[hook].wakeup();
00536     }
00537     if ( __threads[hook].size() == 0 ) {
00538       __threads.erase(hook);
00539     }
00540   }
00541 }
00542 
00543 
00544 void
00545 ThreadManager::try_recover(std::list<std::string> &recovered_threads)
00546 {
00547   __threads.lock();
00548   for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) {
00549     __tit->second.try_recover(recovered_threads);
00550   }
00551   __threads.unlock();
00552 }
00553 
00554 
00555 bool
00556 ThreadManager::timed_threads_exist()
00557 {
00558   return (__threads.size() > 0);
00559 }
00560 
00561 
00562 void
00563 ThreadManager::wait_for_timed_threads()
00564 {
00565   __interrupt_timed_thread_wait = false;
00566   __waitcond_timedthreads->wait();
00567   if ( __interrupt_timed_thread_wait ) {
00568     __interrupt_timed_thread_wait = false;
00569     throw InterruptedException("Waiting for timed threads was interrupted");
00570   }
00571 }
00572 
00573 void
00574 ThreadManager::interrupt_timed_thread_wait()
00575 {
00576   __interrupt_timed_thread_wait = true;
00577   __waitcond_timedthreads->wake_all();
00578 }
00579 
00580 
00581 
00582 /** Get a thread collector to be used for an aspect initializer.
00583  * @return thread collector instance to use for ThreadProducerAspect.
00584  */
00585 ThreadCollector *
00586 ThreadManager::aspect_collector() const
00587 {
00588   return __aspect_collector;
00589 }
00590 
00591 } // end namespace fawkes