Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * interruptible_barrier.cpp - Interruptible Barrier 00004 * 00005 * Created: Sat Jan 31 12:30:32 2009 00006 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <core/threading/interruptible_barrier.h> 00025 #include <core/threading/thread_list.h> 00026 #include <core/exceptions/system.h> 00027 #include <core/macros.h> 00028 00029 #include <core/threading/mutex.h> 00030 #include <core/threading/wait_condition.h> 00031 00032 namespace fawkes { 00033 #if 0 /* just to make Emacs auto-indent happy */ 00034 } 00035 #endif 00036 00037 00038 /// @cond INTERNALS 00039 class InterruptibleBarrierData 00040 { 00041 public: 00042 unsigned int threads_left; 00043 Mutex *mutex; 00044 WaitCondition *waitcond; 00045 bool own_mutex; 00046 00047 InterruptibleBarrierData(Mutex *mutex) 00048 { 00049 if (mutex) { 00050 this->mutex = mutex; 00051 own_mutex = false; 00052 } else { 00053 this->mutex = new Mutex(); 00054 own_mutex = true; 00055 } 00056 waitcond = new WaitCondition(this->mutex); 00057 } 00058 00059 ~InterruptibleBarrierData() 00060 { 00061 if (own_mutex) delete mutex; 00062 delete waitcond; 00063 } 00064 }; 00065 /// @endcond 00066 00067 00068 /** @class InterruptibleBarrier <core/threading/barrier.h> 00069 * A barrier is a synchronization tool which blocks until a given number of 00070 * threads have reached the barrier. This particular implementations allows for 00071 * giving a timeout after which the waiting is aborted. 00072 * 00073 * For general information when a barrier is useful see the Barrier class. 00074 * 00075 * Additionally to the general barrier features the InterruptibleBarrier::wait() 00076 * can be given a timeout after which the waiting is aborted. 00077 * Since the POSIX standard does not provide a timed wait for barriers this 00078 * implementation uses a Mutex and WaitCondition internally to achieve the 00079 * desired result. 00080 * 00081 * @see Barrier 00082 * @ingroup Threading 00083 * @author Tim Niemueller 00084 */ 00085 00086 00087 /** Constructor. 00088 * @param count the number of threads to wait for 00089 */ 00090 InterruptibleBarrier::InterruptibleBarrier(unsigned int count) 00091 : Barrier(count) 00092 { 00093 _count = count; 00094 if ( _count == 0 ) { 00095 throw Exception("Barrier count must be at least 1"); 00096 } 00097 __data = new InterruptibleBarrierData(NULL); 00098 __data->threads_left = 0; 00099 __passed_threads = RefPtr<ThreadList>(new ThreadList()); 00100 00101 __interrupted = false; 00102 __timeout = false; 00103 } 00104 00105 00106 /** Constructor with custom mutex. 00107 * Use this constructor only if you really know what you are doing. This constructor 00108 * allows to pass a mutex that is used internally for the barrier. Note that in 00109 * this case it is your duty to lock the mutex before the wait() and unlock 00110 * afterwards! It combines features of a barrier and a wait condition. 00111 * @param mutex Mutex to use 00112 * @param count the number of threads to wait for 00113 */ 00114 InterruptibleBarrier::InterruptibleBarrier(Mutex *mutex, unsigned int count) 00115 : Barrier(count) 00116 { 00117 _count = count; 00118 if ( _count == 0 ) { 00119 throw Exception("Barrier count must be at least 1"); 00120 } 00121 __data = new InterruptibleBarrierData(mutex); 00122 __data->threads_left = 0; 00123 __passed_threads = RefPtr<ThreadList>(new ThreadList()); 00124 00125 __interrupted = false; 00126 __timeout = false; 00127 } 00128 00129 /** Invalid constructor. 00130 * This will throw an exception if called as it is illegal to copy 00131 * a barrier. 00132 * @param barrier to copy 00133 */ 00134 InterruptibleBarrier::InterruptibleBarrier(const InterruptibleBarrier &b) 00135 : Barrier() 00136 { 00137 throw Exception("Barriers cannot be copied"); 00138 } 00139 00140 00141 /** Invalid constructor. 00142 * This will throw an exception if called as it is illegal to copy 00143 * a barrier. 00144 * @param barrier to copy 00145 */ 00146 InterruptibleBarrier::InterruptibleBarrier(const InterruptibleBarrier *b) 00147 : Barrier() 00148 { 00149 throw Exception("Barriers cannot be copied"); 00150 } 00151 00152 00153 /** Invalid assignment operator. 00154 * This will throw an exception if called as it is illegal to assign 00155 * a barrier. 00156 * @param barrier to copy 00157 */ 00158 InterruptibleBarrier & 00159 InterruptibleBarrier::operator=(const InterruptibleBarrier &b) 00160 { 00161 throw Exception("Barriers cannot be assigned"); 00162 } 00163 00164 /** Invalid assignment operator. 00165 * This will throw an exception if called as it is illegal to assign 00166 * a barrier. 00167 * @param barrier to copy 00168 */ 00169 InterruptibleBarrier & 00170 InterruptibleBarrier::operator=(const InterruptibleBarrier *b) 00171 { 00172 throw Exception("Barriers cannot be assigned"); 00173 } 00174 00175 00176 /** Destructor */ 00177 InterruptibleBarrier::~InterruptibleBarrier() 00178 { 00179 delete __data; 00180 } 00181 00182 00183 /** Get a list of threads that passed the barrier. 00184 * The list contains the threads that passed the barrier. With some book keeping 00185 * outside of the barrier you can determine which threads you expected at the 00186 * barrier but did not pass it. 00187 * @return refptr to list of threads that passed the barrier. 00188 */ 00189 RefPtr<ThreadList> 00190 InterruptibleBarrier::passed_threads() 00191 { 00192 return __passed_threads; 00193 } 00194 00195 00196 /** Interrupt the barrier. 00197 * This will cause all threads currently waiting on the barrier to 00198 * throw an exception and no further thread will wait. 00199 * You have to call reset() the before you use this barrier 00200 * the next time. 00201 */ 00202 void 00203 InterruptibleBarrier::interrupt() throw() 00204 { 00205 if (likely(__data->own_mutex)) __data->mutex->lock(); 00206 __interrupted = true; 00207 __data->waitcond->wake_all(); 00208 if (likely(__data->own_mutex)) __data->mutex->unlock(); 00209 } 00210 00211 00212 /** Clears the barrier. 00213 * Call this method when you want to use the barrier the next time after 00214 * an interrupt or timeout occured. Make sure all threads that should have 00215 * passed the barrier the last time did pass it. 00216 */ 00217 void 00218 InterruptibleBarrier::reset() throw() 00219 { 00220 if (likely(__data->own_mutex)) __data->mutex->lock(); 00221 __interrupted = false; 00222 __timeout = false; 00223 __data->threads_left = _count; 00224 __passed_threads.clear(); 00225 if (likely(__data->own_mutex)) __data->mutex->unlock(); 00226 } 00227 00228 00229 /** Wait for other threads. 00230 * This method will block until as many threads have called wait as you have 00231 * given count to the constructor. Note that if the barrier is interrupted or 00232 * times out you need to call reset() to get the barrier into a re-usable state. 00233 * It is your duty to make sure that all threads using the barrier are in a 00234 * cohesive state. 00235 * @param timeout_sec relative timeout in seconds, added to timeout_nanosec 00236 * @param timeout_nanosec timeout in nanoseconds 00237 * @return true, if the barrier was properly reached, false if the barrier timeout 00238 * was reached and the wait did not finish properly. 00239 * @exception InterruptedException thrown if the barrier was forcefully interrupted 00240 * by calling interrupt(). 00241 */ 00242 bool 00243 InterruptibleBarrier::wait(unsigned int timeout_sec, unsigned int timeout_nanosec) 00244 { 00245 if (likely(__data->own_mutex)) __data->mutex->lock(); 00246 00247 if ( __data->threads_left == 0 ) { 00248 // first to come 00249 __timeout = __interrupted = __wait_at_barrier = false; 00250 __data->threads_left = _count; 00251 __passed_threads->clear(); 00252 } else { 00253 if ( __interrupted || __timeout ) { 00254 // interrupted or timed out threads need to be reset if they should be reused 00255 if (likely(__data->own_mutex)) __data->mutex->unlock(); 00256 return true; 00257 } 00258 } 00259 00260 --__data->threads_left; 00261 try { 00262 __passed_threads->push_back_locked(Thread::current_thread()); 00263 } catch (Exception &e) { 00264 // Cannot do anything more useful :-/ 00265 // to stay fully compatible with Barrier we do *not* re-throw 00266 e.print_trace(); 00267 } 00268 00269 bool local_timeout = false; 00270 bool waker = (__data->threads_left == 0); 00271 00272 while ( __data->threads_left && !__interrupted && !__timeout && ! local_timeout) { 00273 local_timeout = ! __data->waitcond->reltimed_wait(timeout_sec, timeout_nanosec); 00274 } 00275 if (local_timeout) __timeout = true; 00276 00277 if ( __interrupted ) { 00278 if (likely(__data->own_mutex)) __data->mutex->unlock(); 00279 throw InterruptedException("InterruptibleBarrier forcefully interrupted, only " 00280 "%u of %u threads reached the barrier", 00281 _count - __data->threads_left, _count); 00282 } 00283 00284 if (waker || local_timeout) { 00285 __wait_at_barrier = waker; 00286 __data->waitcond->wake_all(); 00287 } 00288 00289 if (likely(__data->own_mutex)) __data->mutex->unlock(); 00290 00291 if (__wait_at_barrier) { 00292 Barrier::wait(); 00293 } 00294 00295 return ! __timeout; 00296 } 00297 00298 } // end namespace fawkes