Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
concurrent_queue.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB_concurrent_queue_H
22 #define __TBB_concurrent_queue_H
23 
26 
27 namespace tbb {
28 
29 namespace strict_ppl {
30 
32 
35 template<typename T, typename A = cache_aligned_allocator<T> >
36 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
37  template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
38 
42 
44  virtual void *allocate_block( size_t n ) __TBB_override {
45  void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
46  if( !b )
48  return b;
49  }
50 
52  virtual void deallocate_block( void *b, size_t n ) __TBB_override {
53  my_allocator.deallocate( reinterpret_cast<char*>(b), n );
54  }
55 
56  static void copy_construct_item(T* location, const void* src){
57  new (location) T(*static_cast<const T*>(src));
58  }
59 
60 #if __TBB_CPP11_RVALUE_REF_PRESENT
61  static void move_construct_item(T* location, const void* src) {
62  new (location) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
63  }
64 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
65 public:
67  typedef T value_type;
68 
70  typedef T& reference;
71 
73  typedef const T& const_reference;
74 
76  typedef size_t size_type;
77 
79  typedef ptrdiff_t difference_type;
80 
82  typedef A allocator_type;
83 
86  my_allocator( a )
87  {
88  }
89 
91  template<typename InputIterator>
92  concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
93  my_allocator( a )
94  {
95  for( ; begin != end; ++begin )
96  this->push(*begin);
97  }
98 
101  internal::concurrent_queue_base_v3<T>(), my_allocator( a )
102  {
103  this->assign( src, copy_construct_item );
104  }
105 
106 #if __TBB_CPP11_RVALUE_REF_PRESENT
109  internal::concurrent_queue_base_v3<T>(), my_allocator( std::move(src.my_allocator) )
110  {
111  this->internal_swap( src );
112  }
113 
115  internal::concurrent_queue_base_v3<T>(), my_allocator( a )
116  {
117  // checking that memory allocated by one instance of allocator can be deallocated
118  // with another
119  if( my_allocator == src.my_allocator) {
120  this->internal_swap( src );
121  } else {
122  // allocators are different => performing per-element move
123  this->assign( src, move_construct_item );
124  src.clear();
125  }
126  }
127 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
128 
131 
133  void push( const T& source ) {
134  this->internal_push( &source, copy_construct_item );
135  }
136 
137 #if __TBB_CPP11_RVALUE_REF_PRESENT
138  void push( T&& source ) {
139  this->internal_push( &source, move_construct_item );
140  }
141 
142 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
143  template<typename... Arguments>
144  void emplace( Arguments&&... args ) {
145  push( T(std::forward<Arguments>( args )...) );
146  }
147 #endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
148 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
149 
151 
153  bool try_pop( T& result ) {
154  return this->internal_try_pop( &result );
155  }
156 
158  size_type unsafe_size() const {return this->internal_size();}
159 
161  bool empty() const {return this->internal_empty();}
162 
164  void clear() ;
165 
167  allocator_type get_allocator() const { return this->my_allocator; }
168 
169  typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
170  typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
171 
172  //------------------------------------------------------------------------
173  // The iterators are intended only for debugging. They are slow and not thread safe.
174  //------------------------------------------------------------------------
175  iterator unsafe_begin() {return iterator(*this);}
177  const_iterator unsafe_begin() const {return const_iterator(*this);}
179 } ;
180 
181 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
182 // Deduction guide for the constructor from two iterators
183 template<typename InputIterator,
184  typename T = typename std::iterator_traits<InputIterator>::value_type,
185  typename A = cache_aligned_allocator<T>
186 > concurrent_queue(InputIterator, InputIterator, const A& = A())
188 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
189 
190 template<typename T, class A>
192  clear();
193  this->internal_finish_clear();
194 }
195 
196 template<typename T, class A>
198  T value;
199  while( !empty() ) try_pop(value);
200 }
201 
202 } // namespace strict_ppl
203 
205 
210 template<typename T, class A = cache_aligned_allocator<T> >
211 class concurrent_bounded_queue: public internal::concurrent_queue_base_v8 {
212  template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
214 
217 
220 
222  class destroyer: internal::no_copy {
224  public:
227  };
228 
229  T& get_ref( page& p, size_t index ) {
230  __TBB_ASSERT( index<items_per_page, NULL );
231  return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
232  }
233 
234  virtual void copy_item( page& dst, size_t index, const void* src ) __TBB_override {
235  new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
236  }
237 
238 #if __TBB_CPP11_RVALUE_REF_PRESENT
239  virtual void move_item( page& dst, size_t index, const void* src ) __TBB_override {
240  new( &get_ref(dst,index) ) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
241  }
242 #else
243  virtual void move_item( page&, size_t, const void* ) __TBB_override {
244  __TBB_ASSERT( false, "Unreachable code" );
245  }
246 #endif
247 
248  virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
249  new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
250  }
251 
252 #if __TBB_CPP11_RVALUE_REF_PRESENT
253  virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
254  new( &get_ref(dst,dindex) ) T( std::move(get_ref( const_cast<page&>(src), sindex )) );
255  }
256 #else
257  virtual void move_page_item( page&, size_t, const page&, size_t ) __TBB_override {
258  __TBB_ASSERT( false, "Unreachable code" );
259  }
260 #endif
261 
262  virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) __TBB_override {
263  T& from = get_ref(src,index);
264  destroyer d(from);
265  *static_cast<T*>(dst) = tbb::internal::move( from );
266  }
267 
269  size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
270  page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
271  if( !p )
273  return p;
274  }
275 
277  size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
278  my_allocator.deallocate( reinterpret_cast<char*>(p), n );
279  }
280 
281 public:
283  typedef T value_type;
284 
286  typedef A allocator_type;
287 
289  typedef T& reference;
290 
292  typedef const T& const_reference;
293 
295 
297  typedef std::ptrdiff_t size_type;
298 
300  typedef std::ptrdiff_t difference_type;
301 
304  concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
305  {
306  }
307 
310  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
311  {
312  assign( src );
313  }
314 
315 #if __TBB_CPP11_RVALUE_REF_PRESENT
318  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( std::move(src.my_allocator) )
319  {
320  internal_swap( src );
321  }
322 
324  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
325  {
326  // checking that memory allocated by one instance of allocator can be deallocated
327  // with another
328  if( my_allocator == src.my_allocator) {
329  this->internal_swap( src );
330  } else {
331  // allocators are different => performing per-element move
332  this->move_content( src );
333  src.clear();
334  }
335  }
336 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
337 
339  template<typename InputIterator>
340  concurrent_bounded_queue( InputIterator begin, InputIterator end,
341  const allocator_type& a = allocator_type())
342  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
343  {
344  for( ; begin != end; ++begin )
346  }
347 
350 
352  void push( const T& source ) {
353  internal_push( &source );
354  }
355 
356 #if __TBB_CPP11_RVALUE_REF_PRESENT
357  void push( T&& source ) {
359  internal_push_move( &source );
360  }
361 
362 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
363  template<typename... Arguments>
364  void emplace( Arguments&&... args ) {
365  push( T(std::forward<Arguments>( args )...) );
366  }
367 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
368 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
369 
371 
372  void pop( T& destination ) {
373  internal_pop( &destination );
374  }
375 
376 #if TBB_USE_EXCEPTIONS
377  void abort() {
379  internal_abort();
380  }
381 #endif
382 
384 
386  bool try_push( const T& source ) {
387  return internal_push_if_not_full( &source );
388  }
389 
390 #if __TBB_CPP11_RVALUE_REF_PRESENT
391 
394  bool try_push( T&& source ) {
395  return internal_push_move_if_not_full( &source );
396  }
397 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
398  template<typename... Arguments>
399  bool try_emplace( Arguments&&... args ) {
400  return try_push( T(std::forward<Arguments>( args )...) );
401  }
402 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
403 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
404 
406 
408  bool try_pop( T& destination ) {
409  return internal_pop_if_present( &destination );
410  }
411 
413 
416  size_type size() const {return internal_size();}
417 
419  bool empty() const {return internal_empty();}
420 
422  size_type capacity() const {
423  return my_capacity;
424  }
425 
427 
429  void set_capacity( size_type new_capacity ) {
430  internal_set_capacity( new_capacity, sizeof(T) );
431  }
432 
434  allocator_type get_allocator() const { return this->my_allocator; }
435 
437  void clear() ;
438 
439  typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
440  typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
441 
442  //------------------------------------------------------------------------
443  // The iterators are intended only for debugging. They are slow and not thread safe.
444  //------------------------------------------------------------------------
445  iterator unsafe_begin() {return iterator(*this);}
447  const_iterator unsafe_begin() const {return const_iterator(*this);}
449 
450 };
451 
452 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
453 // guide for concurrent_bounded_queue(InputIterator, InputIterator, ...)
454 template<typename InputIterator,
455  typename T = typename std::iterator_traits<InputIterator>::value_type,
456  typename A = cache_aligned_allocator<T>
457 > concurrent_bounded_queue(InputIterator, InputIterator, const A& = A())
458 -> concurrent_bounded_queue<T, A>;
459 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
460 
461 template<typename T, class A>
463  clear();
464  internal_finish_clear();
465 }
466 
467 template<typename T, class A>
469  T value;
470  while( try_pop(value) ) /*noop*/;
471 }
472 
474 
475 } // namespace tbb
476 
477 #endif /* __TBB_concurrent_queue_H */
T value_type
Element type in the queue.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
void clear()
clear the queue. not thread-safe.
const_iterator unsafe_begin() const
void pop(T &destination)
Dequeue item from head of queue.
void emplace(Arguments &&... args)
#define __TBB_override
Definition: tbb_stddef.h:244
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
concurrent_bounded_queue(const concurrent_bounded_queue &src, const allocator_type &a=allocator_type())
Copy constructor.
concurrent_queue(const concurrent_queue &src, const allocator_type &a=allocator_type())
Copy constructor.
T & get_ref(page &p, size_t index)
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
concurrent_bounded_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
T value_type
Element type in the queue.
const_iterator unsafe_end() const
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
page_allocator_type my_allocator
Allocator type.
std::ptrdiff_t size_type
Integral type for representing size of the queue.
allocator_type get_allocator() const
return allocator object
friend class internal::concurrent_queue_iterator
internal::concurrent_queue_iterator< concurrent_bounded_queue, T > iterator
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
A high-performance thread-safe non-blocking concurrent queue.
virtual void copy_item(page &dst, size_t index, const void *src) __TBB_override
virtual void copy_page_item(page &dst, size_t dindex, const page &src, size_t sindex) __TBB_override
void __TBB_EXPORTED_METHOD internal_abort()
Abort all pending queue operations.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
void emplace(Arguments &&... args)
ptrdiff_t my_capacity
Capacity of the queue.
internal::concurrent_queue_iterator< concurrent_bounded_queue, const T > const_iterator
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
allocator_traits< Alloc >::template rebind_alloc< T >::other type
bool __TBB_EXPORTED_METHOD internal_push_if_not_full(const void *src)
Attempt to enqueue item onto queue using copy operation.
bool __TBB_EXPORTED_METHOD internal_empty() const
Check if the queue is emtpy.
const_iterator unsafe_end() const
Class used to ensure exception-safety of method "pop".
bool try_pop(T &destination)
Attempt to dequeue an item from head of queue.
const T & const_reference
Const reference type.
void internal_swap(concurrent_queue_base_v3 &src)
swap queues
size_type size() const
Return number of pushes minus number of pops.
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
void const char const char int ITT_FORMAT __itt_group_sync p
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
virtual page * allocate_page() __TBB_override
custom allocator
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
auto last(Container &c) -> decltype(begin(c))
const_iterator unsafe_begin() const
concurrent_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
virtual void deallocate_block(void *b, size_t n) __TBB_override
Deallocates block created by allocate_block.
void __TBB_EXPORTED_METHOD internal_set_capacity(ptrdiff_t capacity, size_t element_size)
Set the queue capacity.
bool empty() const
Equivalent to size()==0.
~concurrent_bounded_queue()
Destroy queue.
bool internal_empty() const
check if the queue is empty; thread safe
void clear()
Clear the queue. not thread-safe.
tbb::internal::allocator_rebind< A, char >::type page_allocator_type
concurrent_queue(concurrent_queue &&src, const allocator_type &a)
virtual void deallocate_page(page *p) __TBB_override
custom de-allocator
The graph class.
allocator_type get_allocator() const
Return allocator object.
concurrent_queue_base_v3::copy_specifics copy_specifics
virtual void * allocate_block(size_t n) __TBB_override
Allocates a block of size n (bytes)
const T & const_reference
Const reference type.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
bool __TBB_EXPORTED_METHOD internal_pop_if_present(void *dst)
Attempt to dequeue item from queue.
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
void set_capacity(size_type new_capacity)
Set the capacity.
size_type capacity() const
Maximum number of allowed elements.
concurrent_queue(const allocator_type &a=allocator_type())
Construct empty queue.
ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const
Get size of queue.
concurrent_queue_base_v3::padded_page< T > padded_page
concurrent_bounded_queue(const allocator_type &a=allocator_type())
Construct empty queue.
virtual void assign_and_destroy_item(void *dst, page &src, size_t index) __TBB_override
void __TBB_EXPORTED_METHOD internal_push(const void *src)
Enqueue item at tail of queue using copy operation.
virtual void move_item(page &dst, size_t index, const void *src) __TBB_override
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex) __TBB_override
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:309
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
void __TBB_EXPORTED_METHOD internal_pop(void *dst)
Dequeue item from head of queue.
internal::concurrent_queue_iterator< concurrent_queue, const T > const_iterator
std::ptrdiff_t difference_type
Difference type for iterator.
static void move_construct_item(T *location, const void *src)
size_t size_type
Integral type for representing size of the queue.
bool try_pop(T &result)
Attempt to dequeue an item from head of queue.
void push(const T &source)
Enqueue an item at tail of queue.
bool try_emplace(Arguments &&... args)
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
size_type unsafe_size() const
Return the number of items in the queue; thread unsafe.
concurrent_bounded_queue(concurrent_bounded_queue &&src, const allocator_type &a)
ptrdiff_t difference_type
Difference type for iterator.
A high-performance thread-safe blocking concurrent bounded queue.
bool try_push(T &&source)
Move an item at tail of queue if queue is not already full.
friend class internal::concurrent_queue_iterator
tbb::internal::allocator_rebind< A, char >::type page_allocator_type
Allocator type.
bool empty() const
Equivalent to size()<=0.
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
void push(const T &source)
Enqueue an item at tail of queue.
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_base_v3 &src)
copy internal representation
internal::concurrent_queue_iterator< concurrent_queue, T > iterator
static void copy_construct_item(T *location, const void *src)
bool try_push(const T &source)
Enqueue an item at tail of queue if queue is not already full.

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.