Fawkes API  Fawkes Development Version
interruptible_barrier.cpp
1 
2 /***************************************************************************
3  * interruptible_barrier.cpp - Interruptible Barrier
4  *
5  * Created: Sat Jan 31 12:30:32 2009
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <core/threading/interruptible_barrier.h>
25 #include <core/threading/thread_list.h>
26 #include <core/exceptions/system.h>
27 #include <core/macros.h>
28 
29 #include <core/threading/mutex.h>
30 #include <core/threading/wait_condition.h>
31 
32 namespace fawkes {
33 #if 0 /* just to make Emacs auto-indent happy */
34 }
35 #endif
36 
37 
38 /// @cond INTERNALS
39 class InterruptibleBarrierData
40 {
41  public:
42  unsigned int threads_left;
43  Mutex *mutex;
44  WaitCondition *waitcond;
45  bool own_mutex;
46 
47  InterruptibleBarrierData(Mutex *mutex)
48  {
49  if (mutex) {
50  this->mutex = mutex;
51  own_mutex = false;
52  } else {
53  this->mutex = new Mutex();
54  own_mutex = true;
55  }
56  waitcond = new WaitCondition(this->mutex);
57  }
58 
59  ~InterruptibleBarrierData()
60  {
61  if (own_mutex) delete mutex;
62  delete waitcond;
63  }
64 };
65 /// @endcond
66 
67 
68 /** @class InterruptibleBarrier <core/threading/barrier.h>
69  * A barrier is a synchronization tool which blocks until a given number of
70  * threads have reached the barrier. This particular implementations allows for
71  * giving a timeout after which the waiting is aborted.
72  *
73  * For general information when a barrier is useful see the Barrier class.
74  *
75  * Additionally to the general barrier features the InterruptibleBarrier::wait()
76  * can be given a timeout after which the waiting is aborted.
77  * Since the POSIX standard does not provide a timed wait for barriers this
78  * implementation uses a Mutex and WaitCondition internally to achieve the
79  * desired result.
80  *
81  * @see Barrier
82  * @ingroup Threading
83  * @author Tim Niemueller
84  */
85 
86 
87 /** Constructor.
88  * @param count the number of threads to wait for
89  */
91  : Barrier(count)
92 {
93  _count = count;
94  if ( _count == 0 ) {
95  throw Exception("Barrier count must be at least 1");
96  }
97  __data = new InterruptibleBarrierData(NULL);
98  __data->threads_left = 0;
99  __passed_threads = RefPtr<ThreadList>(new ThreadList());
100 
101  __interrupted = false;
102  __timeout = false;
103  __num_threads_in_wait_function = 0;
104 }
105 
106 
107 /** Constructor with custom mutex.
108  * Use this constructor only if you really know what you are doing. This constructor
109  * allows to pass a mutex that is used internally for the barrier. Note that in
110  * this case it is your duty to lock the mutex before the wait() and unlock
111  * afterwards! It combines features of a barrier and a wait condition.
112  * @param mutex Mutex to use
113  * @param count the number of threads to wait for
114  */
116  : Barrier(count)
117 {
118  _count = count;
119  if ( _count == 0 ) {
120  throw Exception("Barrier count must be at least 1");
121  }
122  __data = new InterruptibleBarrierData(mutex);
123  __data->threads_left = 0;
124  __passed_threads = RefPtr<ThreadList>(new ThreadList());
125 
126  __interrupted = false;
127  __timeout = false;
128  __num_threads_in_wait_function = 0;
129 }
130 
131 /** Invalid constructor.
132  * This will throw an exception if called as it is illegal to copy
133  * a barrier.
134  * @param barrier to copy
135  */
137  : Barrier()
138 {
139  throw Exception("Barriers cannot be copied");
140 }
141 
142 
143 /** Invalid constructor.
144  * This will throw an exception if called as it is illegal to copy
145  * a barrier.
146  * @param barrier to copy
147  */
149  : Barrier()
150 {
151  throw Exception("Barriers cannot be copied");
152 }
153 
154 
155 /** Invalid assignment operator.
156  * This will throw an exception if called as it is illegal to assign
157  * a barrier.
158  * @param barrier to copy
159  */
161 InterruptibleBarrier::operator=(const InterruptibleBarrier &b)
162 {
163  throw Exception("Barriers cannot be assigned");
164 }
165 
166 /** Invalid assignment operator.
167  * This will throw an exception if called as it is illegal to assign
168  * a barrier.
169  * @param barrier to copy
170  */
172 InterruptibleBarrier::operator=(const InterruptibleBarrier *b)
173 {
174  throw Exception("Barriers cannot be assigned");
175 }
176 
177 
178 /** Destructor */
180 {
181  delete __data;
182 }
183 
184 
185 /** Get a list of threads that passed the barrier.
186  * The list contains the threads that passed the barrier. With some book keeping
187  * outside of the barrier you can determine which threads you expected at the
188  * barrier but did not pass it.
189  * @return refptr to list of threads that passed the barrier.
190  */
193 {
194  return __passed_threads;
195 }
196 
197 
198 /** Interrupt the barrier.
199  * This will cause all threads currently waiting on the barrier to
200  * throw an exception and no further thread will wait.
201  * You have to call reset() the before you use this barrier
202  * the next time.
203  */
204 void
206 {
207  if (likely(__data->own_mutex)) __data->mutex->lock();
208  __interrupted = true;
209  __data->waitcond->wake_all();
210  if (likely(__data->own_mutex)) __data->mutex->unlock();
211 }
212 
213 
214 /** Clears the barrier.
215  * Call this method when you want to use the barrier the next time after
216  * an interrupt or timeout occured. Make sure all threads that should have
217  * passed the barrier the last time did pass it.
218  */
219 void
221 {
222  if (likely(__data->own_mutex)) __data->mutex->lock();
223  __interrupted = false;
224  __timeout = false;
225  __data->threads_left = _count;
226  __passed_threads.clear();
227  if (likely(__data->own_mutex)) __data->mutex->unlock();
228 }
229 
230 
231 /** Wait for other threads.
232  * This method will block until as many threads have called wait as you have
233  * given count to the constructor. Note that if the barrier is interrupted or
234  * times out you need to call reset() to get the barrier into a re-usable state.
235  * It is your duty to make sure that all threads using the barrier are in a
236  * cohesive state.
237  * @param timeout_sec relative timeout in seconds, added to timeout_nanosec
238  * @param timeout_nanosec timeout in nanoseconds
239  * @return true, if the barrier was properly reached, false if the barrier timeout
240  * was reached and the wait did not finish properly.
241  * @exception InterruptedException thrown if the barrier was forcefully interrupted
242  * by calling interrupt().
243  */
244 bool
245 InterruptibleBarrier::wait(unsigned int timeout_sec, unsigned int timeout_nanosec)
246 {
247  if (likely(__data->own_mutex)) __data->mutex->lock();
248  __num_threads_in_wait_function++;
249 
250  if ( __data->threads_left == 0 ) {
251  // first to come
252  __timeout = __interrupted = __wait_at_barrier = false;
253  __data->threads_left = _count;
254  __passed_threads->clear();
255  } else {
256  if ( __interrupted || __timeout ) {
257  // interrupted or timed out threads need to be reset if they should be reused
258  __num_threads_in_wait_function--;
259  if (likely(__data->own_mutex)) __data->mutex->unlock();
260  return true;
261  }
262  }
263 
264  --__data->threads_left;
265  try {
266  __passed_threads->push_back_locked(Thread::current_thread());
267  } catch (Exception &e) {
268  // Cannot do anything more useful :-/
269  // to stay fully compatible with Barrier we do *not* re-throw
270  e.print_trace();
271  }
272 
273  bool local_timeout = false;
274 
275  //Am I the last thread the interruptable barrier is waiting for? Then I can wake the others up.
276  bool waker = (__data->threads_left == 0);
277 
278  while ( __data->threads_left && !__interrupted && !__timeout && ! local_timeout) {
279  //Here, the threads are waiting for the barrier
280  //pthread_cond_timedwait releases __data->mutex if it is not external
281  local_timeout = ! __data->waitcond->reltimed_wait(timeout_sec, timeout_nanosec);
282  //before continuing, pthread_cond_timedwait locks __data->mutex again if it is not external
283  }
284 
285  if (local_timeout){
286  //set timeout flag of the interruptable barrier so the other threads can continue
287  __timeout = true;
288  }
289 
290  if ( __interrupted ) {
291  if (likely(__data->own_mutex)) __data->mutex->unlock();
292  throw InterruptedException("InterruptibleBarrier forcefully interrupted, only "
293  "%u of %u threads reached the barrier",
294  _count - __data->threads_left, _count);
295  }
296 
297  if (waker){
298  //all threads of this barrier have to synchronize at the standard Barrier
299  __wait_at_barrier = true;
300  }
301 
302  if (waker || local_timeout) {
303  //the other threads can stop waiting in th while-loop
304  __data->waitcond->wake_all();
305  }
306 
307  if (likely(__data->own_mutex)) __data->mutex->unlock();
308 
309  if (__wait_at_barrier) {
310  //hard synchronization
311  Barrier::wait();
312  }
313 
314  if (likely(__data->own_mutex)) __data->mutex->lock();
315  //increment is not threadsafe
316  __num_threads_in_wait_function--;
317  if (likely(__data->own_mutex)) __data->mutex->unlock();
318 
319  return ! __timeout;
320 }
321 
322 /** Checks if there are no more threads in the wait() function.
323  * This method is used to prevent the destruction of the barrier
324  * while there are threads in wait().
325  * @return true, if no thread currently is in wait()
326  */
328  if (likely(__data->own_mutex)) __data->mutex->lock();
329  bool res = __num_threads_in_wait_function == 0;
330  if (likely(__data->own_mutex)) __data->mutex->unlock();
331 
332  return res;
333 }
334 
335 
336 } // end namespace fawkes
unsigned int count()
Get number of threads this barrier will wait for.
Definition: barrier.cpp:181
void interrupt()
Interrupt the barrier.
virtual void wait()
Wait for other threads.
bool no_threads_in_wait()
Checks if there are no more threads in the wait() function.
Fawkes library namespace.
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:157
virtual ~InterruptibleBarrier()
Destructor.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
List of threads.
Definition: thread_list.h:57
Barrier()
Protected Constructor.
Definition: barrier.cpp:133
Base class for exceptions in Fawkes.
Definition: exception.h:36
InterruptibleBarrier(unsigned int count)
Constructor.
unsigned int _count
Number of threads that are expected to wait for the barrier.
Definition: barrier.h:48
static Thread * current_thread()
Get the Thread instance of the currently running thread.
Definition: thread.cpp:1318
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
void print_trace()
Prints trace to stderr.
Definition: exception.cpp:619
RefPtr< ThreadList > passed_threads()
Get a list of threads that passed the barrier.
RefPtr<> is a reference-counting shared smartpointer.
Definition: refptr.h:49
void reset()
Clears the barrier.
Mutex mutual exclusion lock.
Definition: mutex.h:32
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32