Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
private_server.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #include "../rml/include/rml_tbb.h"
22 #include "../rml/server/thread_monitor.h"
23 #include "tbb/atomic.h"
25 #include "scheduler_common.h"
26 #include "governor.h"
27 #include "tbb_misc.h"
28 
29 using rml::internal::thread_monitor;
30 
31 namespace tbb {
32 namespace internal {
33 namespace rml {
34 
35 typedef thread_monitor::handle_type thread_handle;
36 
37 class private_server;
38 
40 private:
42 
48  enum state_t {
57  };
59 
62 
64  tbb_client& my_client;
65 
67  const size_t my_index;
68 
70 
72  thread_monitor my_thread_monitor;
73 
76 
79 
80  friend class private_server;
81 
83  void run();
84 
86  void wake_or_launch();
87 
89  void start_shutdown();
90 
91  static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );
92 
93  static void release_handle(thread_handle my_handle, bool join);
94 
95 protected:
96  private_worker( private_server& server, tbb_client& client, const size_t i ) :
97  my_server(server), my_client(client), my_index(i),
99  {
100  my_state = st_init;
101  }
102 };
103 
105 
106 
107 #if _MSC_VER && !defined(__INTEL_COMPILER)
108  // Suppress overzealous compiler warnings about uninstantiable class
109  #pragma warning(push)
110  #pragma warning(disable:4510 4610)
111 #endif
114 public:
115  padded_private_worker( private_server& server, tbb_client& client, const size_t i )
116  : private_worker(server,client,i) { suppress_unused_warning(pad); }
117 };
118 #if _MSC_VER && !defined(__INTEL_COMPILER)
119  #pragma warning(pop)
120 #endif
121 
122 class private_server: public tbb_server, no_copy {
123 private:
124  tbb_client& my_client;
126 
127  const tbb_client::size_type my_n_thread;
128 
130  const size_t my_stack_size;
131 
133 
138 
141 
143 
146 
150 
151 #if TBB_USE_ASSERT
152  atomic<int> my_net_slack_requests;
153 #endif /* TBB_USE_ASSERT */
154 
156 
159  // First test of a double-check idiom. Second test is inside wake_some(0).
160  if( my_asleep_list_root )
161  wake_some(0);
162  }
163 
166 
168  void wake_some( int additional_slack );
169 
170  virtual ~private_server();
171 
173  if( --my_ref_count==0 ) {
174  my_client.acknowledge_close_connection();
175  this->~private_server();
177  }
178  }
179 
180  friend class private_worker;
181 public:
182  private_server( tbb_client& client );
183 
184  version_type version() const __TBB_override {
185  return 0;
186  }
187 
188  void request_close_connection( bool /*exiting*/ ) __TBB_override {
189  for( size_t i=0; i<my_n_thread; ++i )
190  my_thread_array[i].start_shutdown();
192  }
193 
195 
197 
199 
200  void adjust_job_count_estimate( int delta ) __TBB_override;
201 
202 #if _WIN32||_WIN64
203  void register_master ( ::rml::server::execution_resource_t& ) __TBB_override {}
204  void unregister_master ( ::rml::server::execution_resource_t ) __TBB_override {}
205 #endif /* _WIN32||_WIN64 */
206 };
207 
208 //------------------------------------------------------------------------
209 // Methods of private_worker
210 //------------------------------------------------------------------------
211 #if _MSC_VER && !defined(__INTEL_COMPILER)
212  // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
213  #pragma warning(push)
214  #pragma warning(disable:4189)
215 #endif
216 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
217 // ensure that stack is properly aligned for TBB threads
218 __attribute__((force_align_arg_pointer))
219 #endif
220 __RML_DECL_THREAD_ROUTINE private_worker::thread_routine( void* arg ) {
221  private_worker* self = static_cast<private_worker*>(arg);
222  AVOID_64K_ALIASING( self->my_index );
223  self->run();
224  return 0;
225 }
226 #if _MSC_VER && !defined(__INTEL_COMPILER)
227  #pragma warning(pop)
228 #endif
229 
231  if (join)
232  thread_monitor::join(handle);
233  else
234  thread_monitor::detach_thread(handle);
235 }
236 
238  state_t s;
239 
240  do {
241  s = my_state;
242  __TBB_ASSERT( s!=st_quit, NULL );
243  } while( my_state.compare_and_swap( st_quit, s )!=s );
244  if( s==st_normal || s==st_starting ) {
245  // May have invalidated invariant for sleeping, so wake up the thread.
246  // Note that the notify() here occurs without maintaining invariants for my_slack.
247  // It does not matter, because my_state==st_quit overrides checking of my_slack.
248  my_thread_monitor.notify();
249  // Do not need release handle in st_init state,
250  // because in this case the thread wasn't started yet.
251  // For st_starting release is done at launch site.
252  if (s==st_normal)
254  } else if( s==st_init ) {
255  // Perform action that otherwise would be performed by associated thread when it quits.
257  }
258 }
259 
262 
263  // Transiting to st_normal here would require setting my_handle,
264  // which would create race with the launching thread and
265  // complications in handle management on Windows.
266 
267  ::rml::job& j = *my_client.create_one_job();
268  while( my_state!=st_quit ) {
269  if( my_server.my_slack>=0 ) {
270  my_client.process(j);
271  } else {
272  thread_monitor::cookie c;
273  // Prepare to wait
274  my_thread_monitor.prepare_wait(c);
275  // Check/set the invariant for sleeping
277  my_thread_monitor.commit_wait(c);
278  __TBB_ASSERT( my_state==st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
280  } else {
281  // Invariant broken
282  my_thread_monitor.cancel_wait();
283  }
284  }
285  }
286  my_client.cleanup(j);
287 
290 }
291 
294  // after this point, remove_server_ref() must be done by created thread
295 #if USE_WINTHREAD
296  my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
297 #elif USE_PTHREAD
298  {
299  affinity_helper fpa;
300  fpa.protect_affinity_mask( /*restore_process_mask=*/true );
301  my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
302  // Implicit destruction of fpa resets original affinity mask.
303  }
304 #endif /* USE_PTHREAD */
306  if (st_starting != s) {
307  // Do shutdown during startup. my_handle can't be released
308  // by start_shutdown, because my_handle value might be not set yet
309  // at time of transition from st_starting to st_quit.
310  __TBB_ASSERT( s==st_quit, NULL );
312  }
313  }
314  else {
315  __TBB_ASSERT( !my_next, "Should not wake a thread while it's still in asleep list" );
316  my_thread_monitor.notify();
317  }
318 }
319 
320 //------------------------------------------------------------------------
321 // Methods of private_server
322 //------------------------------------------------------------------------
323 private_server::private_server( tbb_client& client ) :
324  my_client(client),
325  my_n_thread(client.max_job_count()),
326  my_stack_size(client.min_stack_size()),
327  my_thread_array(NULL)
328 {
330  my_slack = 0;
331 #if TBB_USE_ASSERT
332  my_net_slack_requests = 0;
333 #endif /* TBB_USE_ASSERT */
334  my_asleep_list_root = NULL;
336  for( size_t i=0; i<my_n_thread; ++i ) {
337  private_worker* t = new( &my_thread_array[i] ) padded_private_worker( *this, client, i );
340  }
341 }
342 
344  __TBB_ASSERT( my_net_slack_requests==0, NULL );
345  for( size_t i=my_n_thread; i--; )
349 }
350 
352  asleep_list_mutex_type::scoped_lock lock;
353  if( !lock.try_acquire(my_asleep_list_mutex) )
354  return false;
355  // Contribute to slack under lock so that if another takes that unit of slack,
356  // it sees us sleeping on the list and wakes us up.
357  int k = ++my_slack;
358  if( k<=0 ) {
360  my_asleep_list_root = &t;
361  return true;
362  } else {
363  --my_slack;
364  return false;
365  }
366 }
367 
368 void private_server::wake_some( int additional_slack ) {
369  __TBB_ASSERT( additional_slack>=0, NULL );
370  private_worker* wakee[2];
371  private_worker**w = wakee;
372  {
373  asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
374  while( my_asleep_list_root && w<wakee+2 ) {
375  if( additional_slack>0 ) {
376  if (additional_slack+my_slack<=0) // additional demand does not exceed surplus supply
377  break;
378  --additional_slack;
379  } else {
380  // Chain reaction; Try to claim unit of slack
381  int old;
382  do {
383  old = my_slack;
384  if( old<=0 ) goto done;
385  } while( my_slack.compare_and_swap(old-1,old)!=old );
386  }
387  // Pop sleeping worker to combine with claimed unit of slack
388  my_asleep_list_root = (*w++ = my_asleep_list_root)->my_next;
389  }
390  if( additional_slack ) {
391  // Contribute our unused slack to my_slack.
392  my_slack += additional_slack;
393  }
394  }
395 done:
396  while( w>wakee ) {
397  private_worker* ww = *--w;
398  ww->my_next = NULL;
399  ww->wake_or_launch();
400  }
401 }
402 
404 #if TBB_USE_ASSERT
405  my_net_slack_requests+=delta;
406 #endif /* TBB_USE_ASSERT */
407  if( delta<0 ) {
408  my_slack+=delta;
409  } else if( delta>0 ) {
410  wake_some( delta );
411  }
412 }
413 
415 tbb_server* make_private_server( tbb_client& client ) {
417 }
418 
419 } // namespace rml
420 } // namespace internal
421 
422 } // namespace tbb
tbb_client & my_client
Associated client.
bool try_insert_in_asleep_list(private_worker &t)
Try to add t to list of sleeping workers.
#define __TBB_override
Definition: tbb_stddef.h:244
static __RML_DECL_THREAD_ROUTINE thread_routine(void *arg)
Associated thread has ended normal life sequence and promises to never touch *this again.
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
atomic< int > my_ref_count
Counter used to determine when to delete this.
const size_t my_stack_size
Stack size for each thread. */.
void suppress_unused_warning(const T1 &)
Utility template function to prevent "unused" warnings by various compilers.
Definition: tbb_stddef.h:381
static unsigned default_num_threads()
Definition: governor.h:85
static void release_handle(thread_handle my_handle, bool join)
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
void independent_thread_number_changed(int) __TBB_override
void wake_some(int additional_slack)
Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
#define __TBB_Yield()
Definition: ibm_aix51.h:48
const tbb_client::size_type my_n_thread
Maximum number of threads to be created.
void run()
Actions executed by the associated thread.
pointer allocate(size_type n, const void *hint=0)
Allocate space for n objects, starting on a cache/sector line.
static bool does_client_join_workers(const tbb::internal::rml::tbb_client &client)
Definition: market.cpp:296
private_worker(private_server &server, tbb_client &client, const size_t i)
void deallocate(pointer p, size_type)
Free block of memory that starts on a cache line.
void request_close_connection(bool) __TBB_override
const size_t my_index
index used for avoiding the 64K aliasing problem
atomic< int > my_slack
Number of jobs that could use their associated thread minus number of active threads.
unsigned default_concurrency() const __TBB_override
padded_private_worker * my_thread_array
thread_monitor::handle_type thread_handle
void wake_or_launch()
Wake up associated thread (or launch a thread if there is none)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
tbb::atomic< private_worker * > my_asleep_list_root
List of workers that are asleep or committed to sleeping until notified by another thread.
private_server & my_server
Associated server.
The graph class.
__TBB_SCHEDULER_MUTEX_TYPE scheduler_mutex_type
Mutex type for global locks in the scheduler.
private_worker * my_next
Link for list of workers that are sleeping or have no associated thread.
*this has associated thread that is starting up.
static const size_t cache_line_size
scheduler_mutex_type asleep_list_mutex_type
Protects my_asleep_list_root.
version_type version() const __TBB_override
thread_monitor my_thread_monitor
Monitor for sleeping when there is no work to do.
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:309
void start_shutdown()
Called by a thread (usually not the associated thread) to commence termination.
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:220
void const char const char int ITT_FORMAT __itt_group_sync s
tbb_server * make_private_server(tbb_client &client)
Factory method called from task.cpp to create a private_server.
char pad[cache_line_size - sizeof(private_worker)%cache_line_size]
void adjust_job_count_estimate(int delta) __TBB_override
value_type compare_and_swap(value_type value, value_type comparand)
Definition: atomic.h:289
padded_private_worker(private_server &server, tbb_client &client, const size_t i)
asleep_list_mutex_type my_asleep_list_mutex
state_t
State in finite-state machine that controls the worker.
thread_handle my_handle
Handle of the OS thread associated with this worker.
void propagate_chain_reaction()
Wake up to two sleeping workers, if there are any sleeping.
Associated thread is doing normal life sequence.

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.