Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * thread_manager.cpp - Thread manager 00004 * 00005 * Created: Thu Nov 3 19:11:31 2006 (on train to Cologne) 00006 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <baseapp/thread_manager.h> 00025 #include <core/threading/thread.h> 00026 #include <core/threading/mutex_locker.h> 00027 #include <core/threading/wait_condition.h> 00028 #include <core/threading/thread_initializer.h> 00029 #include <core/threading/thread_finalizer.h> 00030 #include <core/exceptions/software.h> 00031 #include <core/exceptions/system.h> 00032 00033 #include <aspect/blocked_timing.h> 00034 00035 namespace fawkes { 00036 #if 0 /* just to make Emacs auto-indent happy */ 00037 } 00038 #endif 00039 00040 /** @class ThreadManager <baseapp/thread_manager.h> 00041 * Base application thread manager. 00042 * This class provides a manager for the threads. Threads are memorized by 00043 * their wakeup hook. When the thread manager is deleted, all threads are 00044 * appropriately cancelled, joined and deleted. Thus the thread manager 00045 * can be used for "garbage collection" of threads. 00046 * 00047 * The thread manager allows easy wakeup of threads of a given wakeup hook. 00048 * 00049 * The thread manager needs a thread initializer. Each thread that is added 00050 * to the thread manager is initialized with this. The runtime type information 00051 * (RTTI) supplied by C++ can be used to initialize threads if appropriate 00052 * (if the thread has certain aspects that need special treatment). 00053 * 00054 * @author Tim Niemueller 00055 */ 00056 00057 /** Constructor. 00058 * @param parent_manager parent thread manager 00059 */ 00060 ThreadManager::ThreadManagerAspectCollector::ThreadManagerAspectCollector(ThreadManager *parent_manager) 00061 { 00062 __parent_manager = parent_manager; 00063 } 00064 00065 00066 void 00067 ThreadManager::ThreadManagerAspectCollector::add(ThreadList &tl) 00068 { 00069 BlockedTimingAspect *timed_thread; 00070 00071 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) { 00072 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) { 00073 throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect"); 00074 } 00075 } 00076 00077 __parent_manager->add_maybelocked(tl, /* lock */ false); 00078 } 00079 00080 00081 void 00082 ThreadManager::ThreadManagerAspectCollector::add(Thread *t) 00083 { 00084 BlockedTimingAspect *timed_thread; 00085 00086 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) { 00087 throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect"); 00088 } 00089 00090 __parent_manager->add_maybelocked(t, /* lock */ false); 00091 } 00092 00093 00094 void 00095 ThreadManager::ThreadManagerAspectCollector::remove(ThreadList &tl) 00096 { 00097 BlockedTimingAspect *timed_thread; 00098 00099 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) { 00100 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) { 00101 throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect"); 00102 } 00103 } 00104 00105 __parent_manager->remove_maybelocked(tl, /* lock */ false); 00106 } 00107 00108 00109 void 00110 ThreadManager::ThreadManagerAspectCollector::remove(Thread *t) 00111 { 00112 BlockedTimingAspect *timed_thread; 00113 00114 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) { 00115 throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect"); 00116 } 00117 00118 __parent_manager->remove_maybelocked(t, /* lock */ false); 00119 } 00120 00121 00122 void 00123 ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::ThreadList &tl) 00124 { 00125 throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads"); 00126 } 00127 00128 void 00129 ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::Thread *t) 00130 { 00131 throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads"); 00132 } 00133 00134 00135 /** Constructor. 00136 * When using this constructor you need to make sure to call set_inifin() 00137 * before any thread is added. 00138 */ 00139 ThreadManager::ThreadManager() 00140 { 00141 __initializer = NULL; 00142 __finalizer = NULL; 00143 __threads.clear(); 00144 __waitcond_timedthreads = new WaitCondition(); 00145 __interrupt_timed_thread_wait = false; 00146 __aspect_collector = new ThreadManagerAspectCollector(this); 00147 } 00148 00149 /** Constructor. 00150 * This contsructor is equivalent to the one without parameters followed 00151 * by a call to set_inifins(). 00152 * @param initializer thread initializer 00153 * @param finalizer thread finalizer 00154 */ 00155 ThreadManager::ThreadManager(ThreadInitializer *initializer, 00156 ThreadFinalizer *finalizer) 00157 { 00158 __initializer = NULL; 00159 __finalizer = NULL; 00160 __threads.clear(); 00161 __waitcond_timedthreads = new WaitCondition(); 00162 __interrupt_timed_thread_wait = false; 00163 __aspect_collector = new ThreadManagerAspectCollector(this); 00164 set_inifin(initializer, finalizer); 00165 } 00166 00167 00168 /** Destructor. */ 00169 ThreadManager::~ThreadManager() 00170 { 00171 // stop all threads, we call finalize, and we run through it as long as there are 00172 // still running threads, after that, we force the thread's death. 00173 for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) { 00174 __tit->second.force_stop(__finalizer); 00175 } 00176 __untimed_threads.force_stop(__finalizer); 00177 __threads.clear(); 00178 00179 delete __waitcond_timedthreads; 00180 delete __aspect_collector; 00181 } 00182 00183 00184 /** Set initializer/finalizer. 00185 * This method has to be called before any thread is added/removed. 00186 * @param initializer thread initializer 00187 * @param finalizer thread finalizer 00188 */ 00189 void 00190 ThreadManager::set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer) 00191 { 00192 __initializer = initializer; 00193 __finalizer = finalizer; 00194 } 00195 00196 00197 /** Remove the given thread from internal structures. 00198 * Thread is removed from the internal structures. If the thread has the 00199 * BlockedTimingAspect then the hook is added to the changed list. 00200 * 00201 * @param t thread to remove 00202 * @param changed list of changed hooks, appropriate hook is added if necessary 00203 */ 00204 void 00205 ThreadManager::internal_remove_thread(Thread *t) 00206 { 00207 BlockedTimingAspect *timed_thread; 00208 00209 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) { 00210 // find thread and remove 00211 BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook(); 00212 if ( __threads.find(hook) != __threads.end() ) { 00213 __threads[hook].remove_locked(t); 00214 if (__threads[hook].empty()) __threads.erase(hook); 00215 } 00216 } else { 00217 __untimed_threads.remove_locked(t); 00218 } 00219 } 00220 00221 00222 /** Add the given thread to internal structures. 00223 * Thread is added to the internal structures. If the thread has the 00224 * BlockedTimingAspect then the hook is added to the changed list. 00225 * 00226 * @param t thread to add 00227 * @param changed list of changed hooks, appropriate hook is added if necessary 00228 */ 00229 void 00230 ThreadManager::internal_add_thread(Thread *t) 00231 { 00232 BlockedTimingAspect *timed_thread; 00233 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) { 00234 BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook(); 00235 00236 if ( __threads.find(hook) == __threads.end() ) { 00237 __threads[hook].set_name("ThreadManagerList Hook %i", hook); 00238 __threads[hook].set_maintain_barrier(true); 00239 } 00240 __threads[hook].push_back_locked(t); 00241 00242 __waitcond_timedthreads->wake_all(); 00243 } else { 00244 __untimed_threads.push_back_locked(t); 00245 } 00246 } 00247 00248 00249 /** Add threads. 00250 * Add the given threads to the thread manager. The threads are initialised 00251 * as appropriate and started. See the class documentation for supported 00252 * specialisations of threads and the performed initialisation steps. 00253 * If the thread initializer cannot initalize one or more threads no thread 00254 * is added. In this regard the operation is atomic, either all threads are 00255 * added or none. 00256 * @param tl thread list with threads to add 00257 * @exception CannotInitializeThreadException thrown if at least one of the 00258 * threads could not be initialised 00259 */ 00260 void 00261 ThreadManager::add_maybelocked(ThreadList &tl, bool lock) 00262 { 00263 if ( ! (__initializer && __finalizer) ) { 00264 throw NullPointerException("ThreadManager: initializer/finalizer not set"); 00265 } 00266 00267 if ( tl.sealed() ) { 00268 throw Exception("Not accepting new threads from list that is not fresh, " 00269 "list '%s' already sealed", tl.name()); 00270 } 00271 00272 tl.lock(); 00273 00274 // Try to initialise all threads 00275 try { 00276 tl.init(__initializer, __finalizer); 00277 } catch (Exception &e) { 00278 tl.unlock(); 00279 throw; 00280 } 00281 00282 tl.seal(); 00283 tl.start(); 00284 00285 // All thread initialized, now add threads to internal structure 00286 MutexLocker locker(__threads.mutex(), lock); 00287 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) { 00288 internal_add_thread(*i); 00289 } 00290 00291 tl.unlock(); 00292 } 00293 00294 00295 /** Add one thread. 00296 * Add the given thread to the thread manager. The thread is initialized 00297 * as appropriate and started. See the class documentation for supported 00298 * specialisations of threads and the performed initialisation steps. 00299 * If the thread initializer cannot initalize the thread it is not added. 00300 * @param thread thread to add 00301 * @param lock if true the environment is locked before adding the thread 00302 * @exception CannotInitializeThreadException thrown if at least the 00303 * thread could not be initialised 00304 */ 00305 void 00306 ThreadManager::add_maybelocked(Thread *thread, bool lock) 00307 { 00308 if ( thread == NULL ) { 00309 throw NullPointerException("FawkesThreadMananger: cannot add NULL as thread"); 00310 } 00311 00312 if ( ! (__initializer && __finalizer) ) { 00313 throw NullPointerException("ThreadManager: initializer/finalizer not set"); 00314 } 00315 00316 try { 00317 __initializer->init(thread); 00318 } catch (CannotInitializeThreadException &e) { 00319 e.append("Adding thread in ThreadManager failed"); 00320 throw; 00321 } 00322 00323 thread->start(); 00324 MutexLocker locker(__threads.mutex(), lock); 00325 internal_add_thread(thread); 00326 } 00327 00328 00329 /** Remove the given threads. 00330 * The thread manager tries to finalize and stop the threads and then removes the 00331 * threads from the internal structures. 00332 * 00333 * This may fail if at least one thread of the given list cannot be finalized, for 00334 * example if prepare_finalize() returns false or if the thread finalizer cannot 00335 * finalize the thread. In this case a CannotFinalizeThreadException is thrown. 00336 * 00337 * @param tl threads to remove. 00338 * @exception CannotFinalizeThreadException At least one thread cannot be safely 00339 * finalized 00340 * @exception ThreadListNotSealedException if the given thread lits tl is not 00341 * sealed the thread manager will refuse to remove it 00342 */ 00343 void 00344 ThreadManager::remove_maybelocked(ThreadList &tl, bool lock) 00345 { 00346 if ( ! (__initializer && __finalizer) ) { 00347 throw NullPointerException("ThreadManager: initializer/finalizer not set"); 00348 } 00349 00350 00351 if ( ! tl.sealed() ) { 00352 throw ThreadListNotSealedException("(ThreadManager) Cannot remove unsealed thread " 00353 "list. Not accepting unsealed list '%s' for removal", 00354 tl.name()); 00355 } 00356 00357 tl.lock(); 00358 MutexLocker locker(__threads.mutex(), lock); 00359 00360 try { 00361 if ( ! tl.prepare_finalize(__finalizer) ) { 00362 tl.cancel_finalize(); 00363 tl.unlock(); 00364 throw CannotFinalizeThreadException("One or more threads in list '%s' cannot be " 00365 "finalized", tl.name()); 00366 } 00367 } catch (CannotFinalizeThreadException &e) { 00368 tl.unlock(); 00369 throw; 00370 } catch (Exception &e) { 00371 tl.unlock(); 00372 e.append("One or more threads in list '%s' cannot be finalized", tl.name()); 00373 throw CannotFinalizeThreadException(e); 00374 } 00375 00376 tl.stop(); 00377 try { 00378 tl.finalize(__finalizer); 00379 } catch (Exception &e) { 00380 tl.unlock(); 00381 throw; 00382 } 00383 00384 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) { 00385 internal_remove_thread(*i); 00386 } 00387 00388 tl.unlock(); 00389 } 00390 00391 00392 /** Remove the given thread. 00393 * The thread manager tries to finalize and stop the thread and then removes the 00394 * thread from the internal structures. 00395 * 00396 * This may fail if the thread cannot be finalized, for 00397 * example if prepare_finalize() returns false or if the thread finalizer cannot 00398 * finalize the thread. In this case a CannotFinalizeThreadException is thrown. 00399 * 00400 * @param thread thread to remove. 00401 * @exception CannotFinalizeThreadException At least one thread cannot be safely 00402 * finalized 00403 */ 00404 void 00405 ThreadManager::remove_maybelocked(Thread *thread, bool lock) 00406 { 00407 if ( thread == NULL ) return; 00408 00409 if ( ! (__initializer && __finalizer) ) { 00410 throw NullPointerException("ThreadManager: initializer/finalizer not set"); 00411 } 00412 00413 MutexLocker locker(__threads.mutex(), lock); 00414 try { 00415 if ( ! thread->prepare_finalize() ) { 00416 thread->cancel_finalize(); 00417 throw CannotFinalizeThreadException("Thread '%s'cannot be finalized", thread->name()); 00418 } 00419 } catch (CannotFinalizeThreadException &e) { 00420 e.append("ThreadManager cannot stop thread '%s'", thread->name()); 00421 thread->cancel_finalize(); 00422 throw; 00423 } 00424 00425 thread->cancel(); 00426 thread->join(); 00427 __finalizer->finalize(thread); 00428 thread->finalize(); 00429 00430 internal_remove_thread(thread); 00431 } 00432 00433 00434 00435 00436 /** Force removal of the given threads. 00437 * The thread manager tries to finalize and stop the threads and then removes the 00438 * threads from the internal structures. 00439 * 00440 * This will succeed even if a thread of the given list cannot be finalized, for 00441 * example if prepare_finalize() returns false or if the thread finalizer cannot 00442 * finalize the thread. 00443 * 00444 * <b>Caution, using this function may damage your robot.</b> 00445 * 00446 * @param tl threads to remove. 00447 * @exception ThreadListNotSealedException if the given thread lits tl is not 00448 * sealed the thread manager will refuse to remove it 00449 * The threads are removed from thread manager control. The threads will be stopped 00450 * before they are removed (may cause unpredictable results otherwise). 00451 */ 00452 void 00453 ThreadManager::force_remove(ThreadList &tl) 00454 { 00455 if ( ! tl.sealed() ) { 00456 throw ThreadListNotSealedException("Not accepting unsealed list '%s' for removal", 00457 tl.name()); 00458 } 00459 00460 tl.lock(); 00461 __threads.mutex()->stopby(); 00462 tl.force_stop(__finalizer); 00463 00464 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) { 00465 internal_remove_thread(*i); 00466 } 00467 00468 tl.unlock(); 00469 } 00470 00471 00472 /** Force removal of the given thread. 00473 * The thread manager tries to finalize and stop the thread and then removes the 00474 * thread from the internal structures. 00475 * 00476 * This will succeed even if the thread cannot be finalized, for 00477 * example if prepare_finalize() returns false or if the thread finalizer cannot 00478 * finalize the thread. 00479 * 00480 * <b>Caution, using this function may damage your robot.</b> 00481 * 00482 * @param thread thread to remove. 00483 * @exception ThreadListNotSealedException if the given thread lits tl is not 00484 * sealed the thread manager will refuse to remove it 00485 * The threads are removed from thread manager control. The threads will be stopped 00486 * before they are removed (may cause unpredictable results otherwise). 00487 */ 00488 void 00489 ThreadManager::force_remove(fawkes::Thread *thread) 00490 { 00491 MutexLocker lock(__threads.mutex()); 00492 try { 00493 thread->prepare_finalize(); 00494 } catch (Exception &e) { 00495 // ignore 00496 } 00497 00498 thread->cancel(); 00499 thread->join(); 00500 if (__finalizer) __finalizer->finalize(thread); 00501 thread->finalize(); 00502 00503 internal_remove_thread(thread); 00504 } 00505 00506 00507 void 00508 ThreadManager::wakeup_and_wait(BlockedTimingAspect::WakeupHook hook, 00509 unsigned int timeout_usec) 00510 { 00511 MutexLocker lock(__threads.mutex()); 00512 00513 unsigned int timeout_sec = 0; 00514 if (timeout_usec >= 1000000) { 00515 timeout_sec = timeout_usec / 1000000; 00516 timeout_usec -= timeout_sec * 1000000; 00517 } 00518 00519 // Note that the following lines might throw an exception, we just pass it on 00520 if ( __threads.find(hook) != __threads.end() ) { 00521 __threads[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000); 00522 } 00523 } 00524 00525 00526 void 00527 ThreadManager::wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier) 00528 { 00529 MutexLocker lock(__threads.mutex()); 00530 00531 if ( __threads.find(hook) != __threads.end() ) { 00532 if ( barrier ) { 00533 __threads[hook].wakeup(barrier); 00534 } else { 00535 __threads[hook].wakeup(); 00536 } 00537 if ( __threads[hook].size() == 0 ) { 00538 __threads.erase(hook); 00539 } 00540 } 00541 } 00542 00543 00544 void 00545 ThreadManager::try_recover(std::list<std::string> &recovered_threads) 00546 { 00547 __threads.lock(); 00548 for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) { 00549 __tit->second.try_recover(recovered_threads); 00550 } 00551 __threads.unlock(); 00552 } 00553 00554 00555 bool 00556 ThreadManager::timed_threads_exist() 00557 { 00558 return (__threads.size() > 0); 00559 } 00560 00561 00562 void 00563 ThreadManager::wait_for_timed_threads() 00564 { 00565 __interrupt_timed_thread_wait = false; 00566 __waitcond_timedthreads->wait(); 00567 if ( __interrupt_timed_thread_wait ) { 00568 __interrupt_timed_thread_wait = false; 00569 throw InterruptedException("Waiting for timed threads was interrupted"); 00570 } 00571 } 00572 00573 void 00574 ThreadManager::interrupt_timed_thread_wait() 00575 { 00576 __interrupt_timed_thread_wait = true; 00577 __waitcond_timedthreads->wake_all(); 00578 } 00579 00580 00581 00582 /** Get a thread collector to be used for an aspect initializer. 00583 * @return thread collector instance to use for ThreadProducerAspect. 00584 */ 00585 ThreadCollector * 00586 ThreadManager::aspect_collector() const 00587 { 00588 return __aspect_collector; 00589 } 00590 00591 } // end namespace fawkes