Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
concurrent_queue.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #include "tbb/tbb_stddef.h"
22 #include "tbb/tbb_machine.h"
23 #include "tbb/tbb_exception.h"
24 // Define required to satisfy test in internal file.
25 #define __TBB_concurrent_queue_H
27 #include "concurrent_monitor.h"
28 #include "itt_notify.h"
29 #include <new>
30 #include <cstring> // for memset()
31 
32 #if defined(_MSC_VER) && defined(_Wp64)
33  // Workaround for overzealous compiler warnings in /Wp64 mode
34  #pragma warning (disable: 4267)
35 #endif
36 
37 #define RECORD_EVENTS 0
38 
39 
40 namespace tbb {
41 
42 namespace internal {
43 
45 
46 typedef size_t ticket;
47 
49 
51 struct micro_queue {
53 
55 
58 
61 
63 
64  void push( const void* item, ticket k, concurrent_queue_base& base,
66 
67  void abort_push( ticket k, concurrent_queue_base& base );
68 
69  bool pop( void* dst, ticket k, concurrent_queue_base& base );
70 
73 
74  page* make_copy ( concurrent_queue_base& base, const page* src_page, size_t begin_in_page,
75  size_t end_in_page, ticket& g_index, concurrent_queue_base::copy_specifics op_type ) ;
76 
77  void make_invalid( ticket k );
78 };
79 
80 // we need to yank it out of micro_queue because of concurrent_queue_base::deallocate_page being virtual.
87 public:
89  my_ticket(k), my_queue(queue), my_page(p), base(b)
90  {}
92  page* p = my_page;
93  if( p ) {
95  page* q = p->next;
96  my_queue.head_page = q;
97  if( !q ) {
98  my_queue.tail_page = NULL;
99  }
100  }
102  if( p )
104  }
105 };
106 
109  predicate_leq( ticket t_ ) : t(t_) {}
110  bool operator() ( uintptr_t p ) const {return (ticket)p<=t;}
111 };
112 
114 
117 public:
118 private:
119  friend struct micro_queue;
120 
122  static const size_t phi = 3;
123 
124 public:
126  static const size_t n_queue = 8;
127 
129  static size_t index( ticket k ) {
130  return k*phi%n_queue;
131  }
132 
137 
142 
144  // The formula here approximates LRU in a cache-oblivious way.
145  return array[index(k)];
146  }
147 
149 
151  static const ptrdiff_t infinite_capacity = ptrdiff_t(~size_t(0)/2);
152 };
153 
154 #if _MSC_VER && !defined(__INTEL_COMPILER)
155  // unary minus operator applied to unsigned type, result still unsigned
156  #pragma warning( push )
157  #pragma warning( disable: 4146 )
158 #endif
159 
160 static void* static_invalid_page;
161 
162 //------------------------------------------------------------------------
163 // micro_queue
164 //------------------------------------------------------------------------
165 void micro_queue::push( const void* item, ticket k, concurrent_queue_base& base,
168  page* p = NULL;
169  // find index on page where we would put the data
171  if( !index ) { // make a new page
172  __TBB_TRY {
173  p = base.allocate_page();
174  } __TBB_CATCH(...) {
175  ++base.my_rep->n_invalid_entries;
176  make_invalid( k );
177  __TBB_RETHROW();
178  }
179  p->mask = 0;
180  p->next = NULL;
181  }
182 
183  // wait for my turn
184  if( tail_counter!=k ) // The developer insisted on keeping first check out of the backoff loop
185  for( atomic_backoff b(true);;b.pause() ) {
187  if( tail==k ) break;
188  else if( tail&0x1 ) {
189  // no memory. throws an exception; assumes concurrent_queue_rep::n_queue>1
190  ++base.my_rep->n_invalid_entries;
192  }
193  }
194 
195  if( p ) { // page is newly allocated; insert in micro_queue
197  if( page* q = tail_page )
198  q->next = p;
199  else
200  head_page = p;
201  tail_page = p;
202  }
203 
204  if (item) {
205  p = tail_page;
206  ITT_NOTIFY( sync_acquired, p );
207  __TBB_TRY {
208  if( concurrent_queue_base::copy == op_type ) {
209  base.copy_item( *p, index, item );
210  } else {
211  __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
212  static_cast<concurrent_queue_base_v8&>(base).move_item( *p, index, item );
213  }
214  } __TBB_CATCH(...) {
215  ++base.my_rep->n_invalid_entries;
217  __TBB_RETHROW();
218  }
220  // If no exception was thrown, mark item as present.
221  p->mask |= uintptr_t(1)<<index;
222  }
223  else // no item; this was called from abort_push
224  ++base.my_rep->n_invalid_entries;
225 
227 }
228 
229 
231  push(NULL, k, base, concurrent_queue_base::copy);
232 }
233 
234 bool micro_queue::pop( void* dst, ticket k, concurrent_queue_base& base ) {
238  page *p = head_page;
239  __TBB_ASSERT( p, NULL );
241  bool success = false;
242  {
243  micro_queue_pop_finalizer finalizer( *this, base, k+concurrent_queue_rep::n_queue, index==base.items_per_page-1 ? p : NULL );
244  if( p->mask & uintptr_t(1)<<index ) {
245  success = true;
246  ITT_NOTIFY( sync_acquired, dst );
247  ITT_NOTIFY( sync_acquired, head_page );
248  base.assign_and_destroy_item( dst, *p, index );
250  } else {
251  --base.my_rep->n_invalid_entries;
252  }
253  }
254  return success;
255 }
256 
259 {
262 
263  const page* srcp = src.head_page;
264  if( srcp ) {
265  ticket g_index = head_counter;
266  __TBB_TRY {
269  size_t end_in_first_page = (index+n_items<base.items_per_page)?(index+n_items):base.items_per_page;
270 
271  head_page = make_copy( base, srcp, index, end_in_first_page, g_index, op_type );
272  page* cur_page = head_page;
273 
274  if( srcp != src.tail_page ) {
275  for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
276  cur_page->next = make_copy( base, srcp, 0, base.items_per_page, g_index, op_type );
277  cur_page = cur_page->next;
278  }
279 
280  __TBB_ASSERT( srcp==src.tail_page, NULL );
281 
283  if( last_index==0 ) last_index = base.items_per_page;
284 
285  cur_page->next = make_copy( base, srcp, 0, last_index, g_index, op_type );
286  cur_page = cur_page->next;
287  }
288  tail_page = cur_page;
289  } __TBB_CATCH(...) {
290  make_invalid( g_index );
291  __TBB_RETHROW();
292  }
293  } else {
294  head_page = tail_page = NULL;
295  }
296  return *this;
297 }
298 
300  const concurrent_queue_base::page* src_page, size_t begin_in_page, size_t end_in_page,
302 {
303  page* new_page = base.allocate_page();
304  new_page->next = NULL;
305  new_page->mask = src_page->mask;
306  for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
307  if( new_page->mask & uintptr_t(1)<<begin_in_page ) {
308  if( concurrent_queue_base::copy == op_type ) {
309  base.copy_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
310  } else {
311  __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
312  static_cast<concurrent_queue_base_v8&>(base).move_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
313  }
314  }
315  return new_page;
316 }
317 
319 {
320  static concurrent_queue_base::page dummy = {static_cast<page*>((void*)1), 0};
321  // mark it so that no more pushes are allowed.
322  static_invalid_page = &dummy;
323  {
326  if( page* q = tail_page )
327  q->next = static_cast<page*>(static_invalid_page);
328  else
329  head_page = static_cast<page*>(static_invalid_page);
330  tail_page = static_cast<page*>(static_invalid_page);
331  }
332 }
333 
334 #if _MSC_VER && !defined(__INTEL_COMPILER)
335  #pragma warning( pop )
336 #endif // warning 4146 is back
337 
338 //------------------------------------------------------------------------
339 // concurrent_queue_base
340 //------------------------------------------------------------------------
342  items_per_page = item_sz<= 8 ? 32 :
343  item_sz<= 16 ? 16 :
344  item_sz<= 32 ? 8 :
345  item_sz<= 64 ? 4 :
346  item_sz<=128 ? 2 :
347  1;
348  my_capacity = size_t(-1)/(item_sz>1 ? item_sz : 2);
350  __TBB_ASSERT( is_aligned(my_rep, NFS_GetLineSize()), "alignment error" );
351  __TBB_ASSERT( is_aligned(&my_rep->head_counter, NFS_GetLineSize()), "alignment error" );
352  __TBB_ASSERT( is_aligned(&my_rep->tail_counter, NFS_GetLineSize()), "alignment error" );
353  __TBB_ASSERT( is_aligned(&my_rep->array, NFS_GetLineSize()), "alignment error" );
354  std::memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep));
357  this->item_size = item_sz;
358 }
359 
361  size_t nq = my_rep->n_queue;
362  for( size_t i=0; i<nq; i++ )
363  __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
365 }
366 
368  internal_insert_item( src, copy );
369 }
370 
372  internal_insert_item( src, move );
373 }
374 
377  unsigned old_abort_counter = r.abort_counter;
378  ticket k = r.tail_counter++;
379  ptrdiff_t e = my_capacity;
380 #if DO_ITT_NOTIFY
381  bool sync_prepare_done = false;
382 #endif
383  if( (ptrdiff_t)(k-r.head_counter)>=e ) { // queue is full
384 #if DO_ITT_NOTIFY
385  if( !sync_prepare_done ) {
386  ITT_NOTIFY( sync_prepare, &sync_prepare_done );
387  sync_prepare_done = true;
388  }
389 #endif
390  bool slept = false;
392  r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
393  while( (ptrdiff_t)(k-r.head_counter)>=const_cast<volatile ptrdiff_t&>(e = my_capacity) ) {
394  __TBB_TRY {
395  if( r.abort_counter!=old_abort_counter ) {
396  r.slots_avail.cancel_wait( thr_ctx );
398  }
399  slept = r.slots_avail.commit_wait( thr_ctx );
401  r.choose(k).abort_push(k, *this);
402  __TBB_RETHROW();
403  } __TBB_CATCH(...) {
404  __TBB_RETHROW();
405  }
406  if (slept == true) break;
407  r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
408  }
409  if( !slept )
410  r.slots_avail.cancel_wait( thr_ctx );
411  }
412  ITT_NOTIFY( sync_acquired, &sync_prepare_done );
413  __TBB_ASSERT( (ptrdiff_t)(k-r.head_counter)<my_capacity, NULL);
414  r.choose( k ).push( src, k, *this, op_type );
416 }
417 
420  ticket k;
421 #if DO_ITT_NOTIFY
422  bool sync_prepare_done = false;
423 #endif
424  unsigned old_abort_counter = r.abort_counter;
425  // This loop is a single pop operation; abort_counter should not be re-read inside
426  do {
427  k=r.head_counter++;
428  if ( (ptrdiff_t)(r.tail_counter-k)<=0 ) { // queue is empty
429 #if DO_ITT_NOTIFY
430  if( !sync_prepare_done ) {
431  ITT_NOTIFY( sync_prepare, dst );
432  sync_prepare_done = true;
433  }
434 #endif
435  bool slept = false;
437  r.items_avail.prepare_wait( thr_ctx, k );
438  while( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
439  __TBB_TRY {
440  if( r.abort_counter!=old_abort_counter ) {
441  r.items_avail.cancel_wait( thr_ctx );
443  }
444  slept = r.items_avail.commit_wait( thr_ctx );
446  r.head_counter--;
447  __TBB_RETHROW();
448  } __TBB_CATCH(...) {
449  __TBB_RETHROW();
450  }
451  if (slept == true) break;
452  r.items_avail.prepare_wait( thr_ctx, k );
453  }
454  if( !slept )
455  r.items_avail.cancel_wait( thr_ctx );
456  }
457  __TBB_ASSERT((ptrdiff_t)(r.tail_counter-k)>0, NULL);
458  } while( !r.choose(k).pop(dst,k,*this) );
459 
460  // wake up a producer..
462 }
463 
466  ++r.abort_counter;
469 }
470 
473  ticket k;
474  do {
475  k = r.head_counter;
476  for(;;) {
477  if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
478  // Queue is empty
479  return false;
480  }
481  // Queue had item with ticket k when we looked. Attempt to get that item.
482  ticket tk=k;
483  k = r.head_counter.compare_and_swap( tk+1, tk );
484  if( k==tk )
485  break;
486  // Another thread snatched the item, retry.
487  }
488  } while( !r.choose( k ).pop( dst, k, *this ) );
489 
491 
492  return true;
493 }
494 
496  return internal_insert_if_not_full( src, copy );
497 }
498 
500  return internal_insert_if_not_full( src, move );
501 }
502 
505  ticket k = r.tail_counter;
506  for(;;) {
507  if( (ptrdiff_t)(k-r.head_counter)>=my_capacity ) {
508  // Queue is full
509  return false;
510  }
511  // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
512  ticket tk=k;
513  k = r.tail_counter.compare_and_swap( tk+1, tk );
514  if( k==tk )
515  break;
516  // Another thread claimed the slot, so retry.
517  }
518  r.choose(k).push(src, k, *this, op_type);
520  return true;
521 }
522 
524  __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
526 }
527 
529  ticket tc = my_rep->tail_counter;
530  ticket hc = my_rep->head_counter;
531  // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
532  return ( tc==my_rep->tail_counter && ptrdiff_t(tc-hc-my_rep->n_invalid_entries)<=0 );
533 }
534 
535 void concurrent_queue_base_v3::internal_set_capacity( ptrdiff_t capacity, size_t /*item_sz*/ ) {
536  my_capacity = capacity<0 ? concurrent_queue_rep::infinite_capacity : capacity;
537 }
538 
540  size_t nq = my_rep->n_queue;
541  for( size_t i=0; i<nq; ++i ) {
542  page* tp = my_rep->array[i].tail_page;
543  __TBB_ASSERT( my_rep->array[i].head_page==tp, "at most one page should remain" );
544  if( tp!=NULL) {
545  if( tp!=static_invalid_page ) deallocate_page( tp );
546  my_rep->array[i].tail_page = NULL;
547  }
548  }
549 }
550 
553 }
554 
557  my_capacity = src.my_capacity;
558 
559  // copy concurrent_queue_rep.
564 
565  // copy micro_queues
566  for( size_t i = 0; i<my_rep->n_queue; ++i )
567  my_rep->array[i].assign( src.my_rep->array[i], *this, op_type );
568 
570  "the source concurrent queue should not be concurrently modified." );
571 }
572 
574  internal_assign( src, copy );
575 }
576 
578  internal_assign( src, move );
579 }
580 
581 //------------------------------------------------------------------------
582 // concurrent_queue_iterator_rep
583 //------------------------------------------------------------------------
585 public:
588  const size_t offset_of_last;
590  concurrent_queue_iterator_rep( const concurrent_queue_base& queue, size_t offset_of_last_ ) :
591  head_counter(queue.my_rep->head_counter),
592  my_queue(queue),
593  offset_of_last(offset_of_last_)
594  {
595  const concurrent_queue_rep& rep = *queue.my_rep;
596  for( size_t k=0; k<concurrent_queue_rep::n_queue; ++k )
597  array[k] = rep.array[k].head_page;
598  }
600  bool get_item( void*& item, size_t k ) {
601  if( k==my_queue.my_rep->tail_counter ) {
602  item = NULL;
603  return true;
604  } else {
606  __TBB_ASSERT(p,NULL);
608  item = static_cast<unsigned char*>(static_cast<void*>(p)) + offset_of_last + my_queue.item_size*i;
609  return (p->mask & uintptr_t(1)<<i)!=0;
610  }
611  }
612 };
613 
614 //------------------------------------------------------------------------
615 // concurrent_queue_iterator_base
616 //------------------------------------------------------------------------
617 
618 void concurrent_queue_iterator_base_v3::initialize( const concurrent_queue_base& queue, size_t offset_of_last ) {
620  new( my_rep ) concurrent_queue_iterator_rep(queue,offset_of_last);
621  size_t k = my_rep->head_counter;
622  if( !my_rep->get_item(my_item, k) ) advance();
623 }
624 
626  initialize(queue,0);
627 }
628 
630  initialize(queue,offset_of_last);
631 }
632 
634  if( my_rep!=other.my_rep ) {
635  if( my_rep ) {
637  my_rep = NULL;
638  }
639  if( other.my_rep ) {
642  }
643  }
644  my_item = other.my_item;
645 }
646 
648  __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
649  size_t k = my_rep->head_counter;
650  const concurrent_queue_base& queue = my_rep->my_queue;
651 #if TBB_USE_ASSERT
652  void* tmp;
653  my_rep->get_item(tmp,k);
654  __TBB_ASSERT( my_item==tmp, NULL );
655 #endif /* TBB_USE_ASSERT */
657  if( i==queue.items_per_page-1 ) {
659  root = root->next;
660  }
661  // advance k
662  my_rep->head_counter = ++k;
663  if( !my_rep->get_item(my_item, k) ) advance();
664 }
665 
667  //delete my_rep;
669  my_rep = NULL;
670 }
671 
672 } // namespace internal
673 
674 } // namespace tbb
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_iterator_base_v3 &i)
Assignment.
concurrent_queue_base::page page
void __TBB_EXPORTED_METHOD internal_throw_exception() const
throw an exception
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
concurrent_queue_base_v3 concurrent_queue_base
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
micro_queue_pop_finalizer(micro_queue &queue, concurrent_queue_base &b, ticket k, page *p)
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
Definition: tbb_stddef.h:365
Internal representation of a ConcurrentQueue.
virtual void deallocate_page(page *p)=0
custom de-allocator
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
bool internal_insert_if_not_full(const void *src, copy_specifics op_type)
Attempts to enqueue at tail of queue using specified operation (copy or move)
void internal_insert_item(const void *src, copy_specifics op_type)
Enqueues item at tail of queue using specified operation (copy or move)
virtual void copy_item(page &dst, size_t index, const void *src)=0
void abort_push(ticket k, concurrent_queue_base &base)
void __TBB_EXPORTED_METHOD internal_abort()
Abort all pending queue operations.
__TBB_EXPORTED_METHOD concurrent_queue_base_v3(size_t item_size)
ptrdiff_t my_capacity
Capacity of the queue.
concurrent_queue_iterator_rep(const concurrent_queue_base &queue, size_t offset_of_last_)
bool __TBB_EXPORTED_METHOD internal_push_if_not_full(const void *src)
Attempt to enqueue item onto queue using copy operation.
bool __TBB_EXPORTED_METHOD internal_empty() const
Check if the queue is emtpy.
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
void pause()
Pause for a while.
Definition: tbb_machine.h:364
bool operator()(uintptr_t p) const
virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3()
#define __TBB_TRY
Definition: tbb_stddef.h:287
static void * static_invalid_page
pointer allocate(size_type n, const void *hint=0)
Allocate space for n objects, starting on a cache/sector line.
char pad1[NFS_MaxLineSize-((sizeof(atomic< ticket >)+sizeof(concurrent_monitor)+sizeof(atomic< size_t >))&(NFS_MaxLineSize-1))]
void deallocate(pointer p, size_type)
Free block of memory that starts on a cache line.
void __TBB_EXPORTED_METHOD advance()
Advance iterator one step towards tail of queue.
virtual void assign_and_destroy_item(void *dst, page &src, size_t index)=0
void prepare_wait(thread_context &thr, uintptr_t ctx=0)
prepare wait by inserting 'thr' into the wait queue
void const char const char int ITT_FORMAT __itt_group_sync p
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
A lock that occupies a single byte.
Definition: spin_mutex.h:40
Base class for types that should not be assigned.
Definition: tbb_stddef.h:324
Exception for user-initiated abort.
Definition: tbb_exception.h:47
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
static const ptrdiff_t infinite_capacity
Value for effective_capacity that denotes unbounded queue.
micro_queue & assign(const micro_queue &src, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
static size_t index(ticket k)
Map ticket to an array index.
void __TBB_EXPORTED_METHOD internal_set_capacity(ptrdiff_t capacity, size_t element_size)
Set the queue capacity.
#define __TBB_CATCH(e)
Definition: tbb_stddef.h:288
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
bool commit_wait(thread_context &thr)
Commit wait if event count has not changed; otherwise, cancel wait.
static const size_t n_queue
Must be power of 2.
bool is_aligned(T *pointer, uintptr_t alignment)
A function to check if passed in pointer is aligned on a specific border.
Definition: tbb_stddef.h:353
virtual void copy_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
The graph class.
void internal_assign(const concurrent_queue_base_v3 &src, copy_specifics op_type)
Assigns one queue to another using specified operation (copy or move)
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:395
A queue using simple locking.
char pad2[NFS_MaxLineSize-((sizeof(atomic< ticket >)+sizeof(concurrent_monitor))&(NFS_MaxLineSize-1))]
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
void __TBB_EXPORTED_METHOD internal_finish_clear()
free any remaining pages
bool __TBB_EXPORTED_METHOD internal_pop_if_present(void *dst)
Attempt to dequeue item from queue.
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id tail
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const
Get size of queue.
Class that implements exponential backoff.
Definition: tbb_machine.h:349
void abort_all()
Abort any sleeping threads at the time of the call.
void __TBB_EXPORTED_METHOD internal_push(const void *src)
Enqueue item at tail of queue using copy operation.
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:120
page * make_copy(concurrent_queue_base &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, concurrent_queue_base::copy_specifics op_type)
void __TBB_EXPORTED_METHOD internal_pop(void *dst)
Dequeue item from head of queue.
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:403
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:220
Primary template for atomic.
Definition: atomic.h:407
void cancel_wait(thread_context &thr)
Cancel the wait. Removes the thread from the wait queue if not removed yet.
Type-independent portion of concurrent_queue_iterator.
concurrent_queue_rep * my_rep
Internal representation.
bool pop(void *dst, ticket k, concurrent_queue_base &base)
void notify(const P &predicate)
Notify waiting threads of the event that satisfies the given predicate.
__TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3()
Destructor.
void initialize(const concurrent_queue_base_v3 &queue, size_t offset_of_data)
value_type compare_and_swap(value_type value, value_type comparand)
Definition: atomic.h:289
virtual page * allocate_page()=0
custom allocator
concurrent_queue_base::page * array[concurrent_queue_rep::n_queue]
static const size_t phi
Approximately n_queue/golden ratio.
void push(const void *item, ticket k, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
bool get_item(void *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
#define __TBB_RETHROW()
Definition: tbb_stddef.h:290
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_base_v3 &src)
copy internal representation
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.