Fawkes API  Fawkes Development Version
interruptible_barrier.cpp
00001 
00002 /***************************************************************************
00003  *  interruptible_barrier.cpp - Interruptible Barrier
00004  *
00005  *  Created: Sat Jan 31 12:30:32 2009
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <core/threading/interruptible_barrier.h>
00025 #include <core/threading/thread_list.h>
00026 #include <core/exceptions/system.h>
00027 #include <core/macros.h>
00028 
00029 #include <core/threading/mutex.h>
00030 #include <core/threading/wait_condition.h>
00031 
00032 namespace fawkes {
00033 #if 0 /* just to make Emacs auto-indent happy */
00034 }
00035 #endif
00036 
00037 
00038 /// @cond INTERNALS
00039 class InterruptibleBarrierData
00040 {
00041  public:
00042   unsigned int   threads_left;
00043   Mutex         *mutex;
00044   WaitCondition *waitcond;
00045   bool           own_mutex;
00046 
00047   InterruptibleBarrierData(Mutex *mutex)
00048   {
00049     if (mutex) {
00050       this->mutex = mutex;
00051       own_mutex   = false;
00052     } else {
00053       this->mutex = new Mutex();
00054       own_mutex   = true;
00055     }
00056     waitcond = new WaitCondition(this->mutex);
00057   }
00058 
00059   ~InterruptibleBarrierData()
00060   {
00061     if (own_mutex)  delete mutex;
00062     delete waitcond;
00063   }
00064 };
00065 /// @endcond
00066 
00067 
00068 /** @class InterruptibleBarrier <core/threading/barrier.h>
00069  * A barrier is a synchronization tool which blocks until a given number of
00070  * threads have reached the barrier. This particular implementations allows for
00071  * giving a timeout after which the waiting is aborted.
00072  *
00073  * For general information when a barrier is useful see the Barrier class.
00074  *
00075  * Additionally to the general barrier features the InterruptibleBarrier::wait()
00076  * can be given a timeout after which the waiting is aborted.
00077  * Since the POSIX standard does not provide a timed wait for barriers this
00078  * implementation uses a Mutex and WaitCondition internally to achieve the
00079  * desired result.
00080  *
00081  * @see Barrier
00082  * @ingroup Threading
00083  * @author Tim Niemueller
00084  */
00085 
00086 
00087 /** Constructor.
00088  * @param count the number of threads to wait for
00089  */
00090 InterruptibleBarrier::InterruptibleBarrier(unsigned int count)
00091   : Barrier(count)
00092 {
00093   _count = count;
00094   if ( _count == 0 ) {
00095     throw Exception("Barrier count must be at least 1");
00096   }
00097   __data = new InterruptibleBarrierData(NULL);
00098   __data->threads_left = 0;
00099   __passed_threads = RefPtr<ThreadList>(new ThreadList());
00100 
00101   __interrupted = false;
00102   __timeout     = false;
00103 }
00104 
00105 
00106 /** Constructor with custom mutex.
00107  * Use this constructor only if you really know what you are doing. This constructor
00108  * allows to pass a mutex that is used internally for the barrier. Note that in
00109  * this case it is your duty to lock the mutex before the wait() and unlock
00110  * afterwards! It combines features of a barrier and a wait condition.
00111  * @param mutex Mutex to use
00112  * @param count the number of threads to wait for
00113  */
00114 InterruptibleBarrier::InterruptibleBarrier(Mutex *mutex, unsigned int count)
00115   : Barrier(count)
00116 {
00117   _count = count;
00118   if ( _count == 0 ) {
00119     throw Exception("Barrier count must be at least 1");
00120   }
00121   __data = new InterruptibleBarrierData(mutex);
00122   __data->threads_left = 0;
00123   __passed_threads = RefPtr<ThreadList>(new ThreadList());
00124 
00125   __interrupted = false;
00126   __timeout     = false;
00127 }
00128 
00129 /** Invalid constructor.
00130  * This will throw an exception if called as it is illegal to copy
00131  * a barrier.
00132  * @param barrier to copy
00133  */
00134 InterruptibleBarrier::InterruptibleBarrier(const InterruptibleBarrier &b)
00135   : Barrier()
00136 {
00137   throw Exception("Barriers cannot be copied");
00138 }
00139 
00140 
00141 /** Invalid constructor.
00142  * This will throw an exception if called as it is illegal to copy
00143  * a barrier.
00144  * @param barrier to copy
00145  */
00146 InterruptibleBarrier::InterruptibleBarrier(const InterruptibleBarrier *b)
00147   : Barrier()
00148 {
00149   throw Exception("Barriers cannot be copied");
00150 }
00151 
00152 
00153 /** Invalid assignment operator.
00154  * This will throw an exception if called as it is illegal to assign
00155  * a barrier.
00156  * @param barrier to copy
00157  */
00158 InterruptibleBarrier &
00159 InterruptibleBarrier::operator=(const InterruptibleBarrier &b)
00160 {
00161   throw Exception("Barriers cannot be assigned");
00162 }
00163 
00164 /** Invalid assignment operator.
00165  * This will throw an exception if called as it is illegal to assign
00166  * a barrier.
00167  * @param barrier to copy
00168  */
00169 InterruptibleBarrier &
00170 InterruptibleBarrier::operator=(const InterruptibleBarrier *b)
00171 {
00172   throw Exception("Barriers cannot be assigned");
00173 }
00174 
00175 
00176 /** Destructor */
00177 InterruptibleBarrier::~InterruptibleBarrier()
00178 {
00179   delete __data;
00180 }
00181 
00182 
00183 /** Get a list of threads that passed the barrier.
00184  * The list contains the threads that passed the barrier. With some book keeping
00185  * outside of the barrier you can determine which threads you expected at the
00186  * barrier but did not pass it.
00187  * @return refptr to list of threads that passed the barrier.
00188  */
00189 RefPtr<ThreadList>
00190 InterruptibleBarrier::passed_threads()
00191 {
00192   return __passed_threads;
00193 }
00194 
00195 
00196 /** Interrupt the barrier.
00197  * This will cause all threads currently waiting on the barrier to
00198  * throw an exception and no further thread will wait.
00199  * You have to call reset() the before you use this barrier
00200  * the next time.
00201  */
00202 void
00203 InterruptibleBarrier::interrupt() throw()
00204 {
00205   if (likely(__data->own_mutex))  __data->mutex->lock();
00206   __interrupted = true;
00207   __data->waitcond->wake_all();
00208   if (likely(__data->own_mutex))  __data->mutex->unlock();
00209 }
00210 
00211 
00212 /** Clears the barrier.
00213  * Call this method when you want to use the barrier the next time after
00214  * an interrupt or timeout occured. Make sure all threads that should have
00215  * passed the barrier the last time did pass it.
00216  */
00217 void
00218 InterruptibleBarrier::reset() throw()
00219 {
00220   if (likely(__data->own_mutex))  __data->mutex->lock();
00221   __interrupted        = false;
00222   __timeout            = false;
00223   __data->threads_left = _count;
00224   __passed_threads.clear();
00225   if (likely(__data->own_mutex))  __data->mutex->unlock();  
00226 }
00227 
00228 
00229 /** Wait for other threads.
00230  * This method will block until as many threads have called wait as you have
00231  * given count to the constructor. Note that if the barrier is interrupted or
00232  * times out you need to call reset() to get the barrier into a re-usable state.
00233  * It is your duty to make sure that all threads using the barrier are in a
00234  * cohesive state.
00235  * @param timeout_sec relative timeout in seconds, added to timeout_nanosec
00236  * @param timeout_nanosec timeout in nanoseconds
00237  * @return true, if the barrier was properly reached, false if the barrier timeout
00238  * was reached and the wait did not finish properly.
00239  * @exception InterruptedException thrown if the barrier was forcefully interrupted
00240  * by calling interrupt().
00241  */
00242 bool
00243 InterruptibleBarrier::wait(unsigned int timeout_sec, unsigned int timeout_nanosec)
00244 {
00245   if (likely(__data->own_mutex))  __data->mutex->lock();
00246 
00247   if ( __data->threads_left == 0 ) {
00248     // first to come
00249     __timeout = __interrupted = __wait_at_barrier = false;
00250     __data->threads_left = _count;
00251     __passed_threads->clear();
00252   } else {
00253     if ( __interrupted || __timeout ) {
00254       // interrupted or timed out threads need to be reset if they should be reused
00255       if (likely(__data->own_mutex))  __data->mutex->unlock();
00256       return true;
00257     }
00258   }
00259 
00260   --__data->threads_left;
00261   try {
00262     __passed_threads->push_back_locked(Thread::current_thread());
00263   } catch (Exception &e) {
00264     // Cannot do anything more useful :-/
00265     // to stay fully compatible with Barrier we do *not* re-throw
00266     e.print_trace();
00267   }
00268 
00269   bool local_timeout = false;
00270   bool waker = (__data->threads_left == 0);
00271 
00272   while ( __data->threads_left && !__interrupted && !__timeout && ! local_timeout) {
00273     local_timeout = ! __data->waitcond->reltimed_wait(timeout_sec, timeout_nanosec);
00274   }
00275   if (local_timeout)  __timeout = true;
00276 
00277   if ( __interrupted ) {
00278     if (likely(__data->own_mutex))  __data->mutex->unlock();
00279     throw InterruptedException("InterruptibleBarrier forcefully interrupted, only "
00280                                "%u of %u threads reached the barrier",
00281                                _count - __data->threads_left, _count);
00282   }
00283 
00284   if (waker || local_timeout) {
00285     __wait_at_barrier = waker;
00286     __data->waitcond->wake_all();
00287   }
00288 
00289   if (likely(__data->own_mutex))  __data->mutex->unlock();
00290 
00291   if (__wait_at_barrier) {
00292     Barrier::wait();
00293   }
00294 
00295   return ! __timeout;
00296 }
00297 
00298 } // end namespace fawkes