00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_concurrent_queue_internal_H
00022 #define __TBB_concurrent_queue_internal_H
00023
00024 #include "tbb_stddef.h"
00025 #include "tbb_machine.h"
00026 #include "atomic.h"
00027 #include "spin_mutex.h"
00028 #include "cache_aligned_allocator.h"
00029 #include "tbb_exception.h"
00030 #include "tbb_profiling.h"
00031 #include <new>
00032
00033 #if !TBB_USE_EXCEPTIONS && _MSC_VER
00034
00035 #pragma warning (push)
00036 #pragma warning (disable: 4530)
00037 #endif
00038
00039 #include <iterator>
00040
00041 #if !TBB_USE_EXCEPTIONS && _MSC_VER
00042 #pragma warning (pop)
00043 #endif
00044
00045 namespace tbb {
00046
00047 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00048
00049
00050 namespace strict_ppl {
00051 template<typename T, typename A> class concurrent_queue;
00052 }
00053
00054 template<typename T, typename A> class concurrent_bounded_queue;
00055
00056 namespace deprecated {
00057 template<typename T, typename A> class concurrent_queue;
00058 }
00059 #endif
00060
00062 namespace strict_ppl {
00063
00065 namespace internal {
00066
00067 using namespace tbb::internal;
00068
00069 typedef size_t ticket;
00070
00071 template<typename T> class micro_queue ;
00072 template<typename T> class micro_queue_pop_finalizer ;
00073 template<typename T> class concurrent_queue_base_v3;
00074
00076
00079 struct concurrent_queue_rep_base : no_copy {
00080 template<typename T> friend class micro_queue;
00081 template<typename T> friend class concurrent_queue_base_v3;
00082
00083 protected:
00085 static const size_t phi = 3;
00086
00087 public:
00088
00089 static const size_t n_queue = 8;
00090
00092 struct page {
00093 page* next;
00094 uintptr_t mask;
00095 };
00096
00097 atomic<ticket> head_counter;
00098 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00099 atomic<ticket> tail_counter;
00100 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00101
00103 size_t items_per_page;
00104
00106 size_t item_size;
00107
00109 atomic<size_t> n_invalid_entries;
00110
00111 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
00112 } ;
00113
00114 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
00115 return uintptr_t(p)>1;
00116 }
00117
00119
00122 class concurrent_queue_page_allocator
00123 {
00124 template<typename T> friend class micro_queue ;
00125 template<typename T> friend class micro_queue_pop_finalizer ;
00126 protected:
00127 virtual ~concurrent_queue_page_allocator() {}
00128 private:
00129 virtual concurrent_queue_rep_base::page* allocate_page() = 0;
00130 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
00131 } ;
00132
00133 #if _MSC_VER && !defined(__INTEL_COMPILER)
00134
00135 #pragma warning( push )
00136 #pragma warning( disable: 4146 )
00137 #endif
00138
00140
00142 template<typename T>
00143 class micro_queue : no_copy {
00144 typedef concurrent_queue_rep_base::page page;
00145
00147 class destroyer: no_copy {
00148 T& my_value;
00149 public:
00150 destroyer( T& value ) : my_value(value) {}
00151 ~destroyer() {my_value.~T();}
00152 };
00153
00154 void copy_item( page& dst, size_t index, const void* src ) {
00155 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
00156 }
00157
00158 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00159 new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
00160 }
00161
00162 void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00163 T& from = get_ref(src,index);
00164 destroyer d(from);
00165 *static_cast<T*>(dst) = from;
00166 }
00167
00168 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
00169
00170 public:
00171 friend class micro_queue_pop_finalizer<T>;
00172
00173 struct padded_page: page {
00175 padded_page();
00177 void operator=( const padded_page& );
00179 T last;
00180 };
00181
00182 static T& get_ref( page& p, size_t index ) {
00183 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
00184 }
00185
00186 atomic<page*> head_page;
00187 atomic<ticket> head_counter;
00188
00189 atomic<page*> tail_page;
00190 atomic<ticket> tail_counter;
00191
00192 spin_mutex page_mutex;
00193
00194 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
00195
00196 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
00197
00198 micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
00199
00200 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
00201
00202 void invalidate_page_and_rethrow( ticket k ) ;
00203 };
00204
00205 template<typename T>
00206 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
00207 atomic_backoff backoff;
00208 do {
00209 backoff.pause();
00210 if( counter&1 ) {
00211 ++rb.n_invalid_entries;
00212 throw_exception( eid_bad_last_alloc );
00213 }
00214 } while( counter!=k ) ;
00215 }
00216
00217 template<typename T>
00218 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
00219 k &= -concurrent_queue_rep_base::n_queue;
00220 page* p = NULL;
00221 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00222 if( !index ) {
00223 __TBB_TRY {
00224 concurrent_queue_page_allocator& pa = base;
00225 p = pa.allocate_page();
00226 } __TBB_CATCH (...) {
00227 ++base.my_rep->n_invalid_entries;
00228 invalidate_page_and_rethrow( k );
00229 }
00230 p->mask = 0;
00231 p->next = NULL;
00232 }
00233
00234 if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
00235 call_itt_notify(acquired, &tail_counter);
00236
00237 if( p ) {
00238 spin_mutex::scoped_lock lock( page_mutex );
00239 page* q = tail_page;
00240 if( is_valid_page(q) )
00241 q->next = p;
00242 else
00243 head_page = p;
00244 tail_page = p;
00245 } else {
00246 p = tail_page;
00247 }
00248 __TBB_TRY {
00249 copy_item( *p, index, item );
00250
00251 itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
00252 call_itt_notify(releasing, &tail_counter);
00253 tail_counter += concurrent_queue_rep_base::n_queue;
00254 } __TBB_CATCH (...) {
00255 ++base.my_rep->n_invalid_entries;
00256 call_itt_notify(releasing, &tail_counter);
00257 tail_counter += concurrent_queue_rep_base::n_queue;
00258 __TBB_RETHROW();
00259 }
00260 }
00261
00262 template<typename T>
00263 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
00264 k &= -concurrent_queue_rep_base::n_queue;
00265 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
00266 call_itt_notify(acquired, &head_counter);
00267 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
00268 call_itt_notify(acquired, &tail_counter);
00269 page& p = *head_page;
00270 __TBB_ASSERT( &p, NULL );
00271 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00272 bool success = false;
00273 {
00274 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL );
00275 if( p.mask & uintptr_t(1)<<index ) {
00276 success = true;
00277 assign_and_destroy_item( dst, p, index );
00278 } else {
00279 --base.my_rep->n_invalid_entries;
00280 }
00281 }
00282 return success;
00283 }
00284
00285 template<typename T>
00286 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
00287 head_counter = src.head_counter;
00288 tail_counter = src.tail_counter;
00289 page_mutex = src.page_mutex;
00290
00291 const page* srcp = src.head_page;
00292 if( is_valid_page(srcp) ) {
00293 ticket g_index = head_counter;
00294 __TBB_TRY {
00295 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
00296 size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00297 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
00298
00299 head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
00300 page* cur_page = head_page;
00301
00302 if( srcp != src.tail_page ) {
00303 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
00304 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
00305 cur_page = cur_page->next;
00306 }
00307
00308 __TBB_ASSERT( srcp==src.tail_page, NULL );
00309 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00310 if( last_index==0 ) last_index = base.my_rep->items_per_page;
00311
00312 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
00313 cur_page = cur_page->next;
00314 }
00315 tail_page = cur_page;
00316 } __TBB_CATCH (...) {
00317 invalidate_page_and_rethrow( g_index );
00318 }
00319 } else {
00320 head_page = tail_page = NULL;
00321 }
00322 return *this;
00323 }
00324
00325 template<typename T>
00326 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
00327
00328 page* invalid_page = (page*)uintptr_t(1);
00329 {
00330 spin_mutex::scoped_lock lock( page_mutex );
00331 itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
00332 page* q = tail_page;
00333 if( is_valid_page(q) )
00334 q->next = invalid_page;
00335 else
00336 head_page = invalid_page;
00337 tail_page = invalid_page;
00338 }
00339 __TBB_RETHROW();
00340 }
00341
00342 template<typename T>
00343 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
00344 concurrent_queue_page_allocator& pa = base;
00345 page* new_page = pa.allocate_page();
00346 new_page->next = NULL;
00347 new_page->mask = src_page->mask;
00348 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
00349 if( new_page->mask & uintptr_t(1)<<begin_in_page )
00350 copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
00351 return new_page;
00352 }
00353
00354 template<typename T>
00355 class micro_queue_pop_finalizer: no_copy {
00356 typedef concurrent_queue_rep_base::page page;
00357 ticket my_ticket;
00358 micro_queue<T>& my_queue;
00359 page* my_page;
00360 concurrent_queue_page_allocator& allocator;
00361 public:
00362 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
00363 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
00364 {}
00365 ~micro_queue_pop_finalizer() ;
00366 };
00367
00368 template<typename T>
00369 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
00370 page* p = my_page;
00371 if( is_valid_page(p) ) {
00372 spin_mutex::scoped_lock lock( my_queue.page_mutex );
00373 page* q = p->next;
00374 my_queue.head_page = q;
00375 if( !is_valid_page(q) ) {
00376 my_queue.tail_page = NULL;
00377 }
00378 }
00379 itt_store_word_with_release(my_queue.head_counter, my_ticket);
00380 if( is_valid_page(p) ) {
00381 allocator.deallocate_page( p );
00382 }
00383 }
00384
00385 #if _MSC_VER && !defined(__INTEL_COMPILER)
00386 #pragma warning( pop )
00387 #endif // warning 4146 is back
00388
00389 template<typename T> class concurrent_queue_iterator_rep ;
00390 template<typename T> class concurrent_queue_iterator_base_v3;
00391
00393
00396 template<typename T>
00397 struct concurrent_queue_rep : public concurrent_queue_rep_base {
00398 micro_queue<T> array[n_queue];
00399
00401 static size_t index( ticket k ) {
00402 return k*phi%n_queue;
00403 }
00404
00405 micro_queue<T>& choose( ticket k ) {
00406
00407 return array[index(k)];
00408 }
00409 };
00410
00412
00416 template<typename T>
00417 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
00419 concurrent_queue_rep<T>* my_rep;
00420
00421 friend struct concurrent_queue_rep<T>;
00422 friend class micro_queue<T>;
00423 friend class concurrent_queue_iterator_rep<T>;
00424 friend class concurrent_queue_iterator_base_v3<T>;
00425
00426 protected:
00427 typedef typename concurrent_queue_rep<T>::page page;
00428
00429 private:
00430 typedef typename micro_queue<T>::padded_page padded_page;
00431
00432 virtual page *allocate_page() {
00433 concurrent_queue_rep<T>& r = *my_rep;
00434 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
00435 return reinterpret_cast<page*>(allocate_block ( n ));
00436 }
00437
00438 virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
00439 concurrent_queue_rep<T>& r = *my_rep;
00440 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
00441 deallocate_block( reinterpret_cast<void*>(p), n );
00442 }
00443
00445 virtual void *allocate_block( size_t n ) = 0;
00446
00448 virtual void deallocate_block( void *p, size_t n ) = 0;
00449
00450 protected:
00451 concurrent_queue_base_v3();
00452
00453 virtual ~concurrent_queue_base_v3() {
00454 #if TBB_USE_ASSERT
00455 size_t nq = my_rep->n_queue;
00456 for( size_t i=0; i<nq; i++ )
00457 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
00458 #endif
00459 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
00460 }
00461
00463 void internal_push( const void* src ) {
00464 concurrent_queue_rep<T>& r = *my_rep;
00465 ticket k = r.tail_counter++;
00466 r.choose(k).push( src, k, *this );
00467 }
00468
00470
00471 bool internal_try_pop( void* dst ) ;
00472
00474 size_t internal_size() const ;
00475
00477 bool internal_empty() const ;
00478
00480
00481 void internal_finish_clear() ;
00482
00484 void internal_throw_exception() const {
00485 throw_exception( eid_bad_alloc );
00486 }
00487
00489 void assign( const concurrent_queue_base_v3& src ) ;
00490 };
00491
00492 template<typename T>
00493 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
00494 const size_t item_size = sizeof(T);
00495 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
00496 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
00497 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
00498 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
00499 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
00500 memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
00501 my_rep->item_size = item_size;
00502 my_rep->items_per_page = item_size<=8 ? 32 :
00503 item_size<=16 ? 16 :
00504 item_size<=32 ? 8 :
00505 item_size<=64 ? 4 :
00506 item_size<=128 ? 2 :
00507 1;
00508 }
00509
00510 template<typename T>
00511 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
00512 concurrent_queue_rep<T>& r = *my_rep;
00513 ticket k;
00514 do {
00515 k = r.head_counter;
00516 for(;;) {
00517 if( r.tail_counter<=k ) {
00518
00519 return false;
00520 }
00521
00522 ticket tk=k;
00523 #if defined(_MSC_VER) && defined(_Wp64)
00524 #pragma warning (push)
00525 #pragma warning (disable: 4267)
00526 #endif
00527 k = r.head_counter.compare_and_swap( tk+1, tk );
00528 #if defined(_MSC_VER) && defined(_Wp64)
00529 #pragma warning (pop)
00530 #endif
00531 if( k==tk )
00532 break;
00533
00534 }
00535 } while( !r.choose( k ).pop( dst, k, *this ) );
00536 return true;
00537 }
00538
00539 template<typename T>
00540 size_t concurrent_queue_base_v3<T>::internal_size() const {
00541 concurrent_queue_rep<T>& r = *my_rep;
00542 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
00543 ticket hc = r.head_counter;
00544 size_t nie = r.n_invalid_entries;
00545 ticket tc = r.tail_counter;
00546 __TBB_ASSERT( hc!=tc || !nie, NULL );
00547 ptrdiff_t sz = tc-hc-nie;
00548 return sz<0 ? 0 : size_t(sz);
00549 }
00550
00551 template<typename T>
00552 bool concurrent_queue_base_v3<T>::internal_empty() const {
00553 concurrent_queue_rep<T>& r = *my_rep;
00554 ticket tc = r.tail_counter;
00555 ticket hc = r.head_counter;
00556
00557 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
00558 }
00559
00560 template<typename T>
00561 void concurrent_queue_base_v3<T>::internal_finish_clear() {
00562 concurrent_queue_rep<T>& r = *my_rep;
00563 size_t nq = r.n_queue;
00564 for( size_t i=0; i<nq; ++i ) {
00565 page* tp = r.array[i].tail_page;
00566 if( is_valid_page(tp) ) {
00567 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
00568 deallocate_page( tp );
00569 r.array[i].tail_page = NULL;
00570 } else
00571 __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
00572 }
00573 }
00574
00575 template<typename T>
00576 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
00577 concurrent_queue_rep<T>& r = *my_rep;
00578 r.items_per_page = src.my_rep->items_per_page;
00579
00580
00581 r.head_counter = src.my_rep->head_counter;
00582 r.tail_counter = src.my_rep->tail_counter;
00583 r.n_invalid_entries = src.my_rep->n_invalid_entries;
00584
00585
00586 for( size_t i = 0; i<r.n_queue; ++i )
00587 r.array[i].assign( src.my_rep->array[i], *this);
00588
00589 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
00590 "the source concurrent queue should not be concurrently modified." );
00591 }
00592
00593 template<typename Container, typename Value> class concurrent_queue_iterator;
00594
00595 template<typename T>
00596 class concurrent_queue_iterator_rep: no_assign {
00597 typedef typename micro_queue<T>::padded_page padded_page;
00598 public:
00599 ticket head_counter;
00600 const concurrent_queue_base_v3<T>& my_queue;
00601 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
00602 concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
00603 head_counter(queue.my_rep->head_counter),
00604 my_queue(queue)
00605 {
00606 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
00607 array[k] = queue.my_rep->array[k].head_page;
00608 }
00609
00611 bool get_item( T*& item, size_t k ) ;
00612 };
00613
00614 template<typename T>
00615 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
00616 if( k==my_queue.my_rep->tail_counter ) {
00617 item = NULL;
00618 return true;
00619 } else {
00620 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
00621 __TBB_ASSERT(p,NULL);
00622 size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
00623 item = µ_queue<T>::get_ref(*p,i);
00624 return (p->mask & uintptr_t(1)<<i)!=0;
00625 }
00626 }
00627
00629
00630 template<typename Value>
00631 class concurrent_queue_iterator_base_v3 : no_assign {
00633
00634 concurrent_queue_iterator_rep<Value>* my_rep;
00635
00636 template<typename C, typename T, typename U>
00637 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00638
00639 template<typename C, typename T, typename U>
00640 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00641 protected:
00643 Value* my_item;
00644
00646 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
00647 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
00648 __asm__ __volatile__("": : :"memory");
00649 #endif
00650 }
00651
00653 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
00654 : no_assign(), my_rep(NULL), my_item(NULL) {
00655 assign(i);
00656 }
00657
00659 concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
00660
00662 void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
00663
00665 void advance() ;
00666
00668 ~concurrent_queue_iterator_base_v3() {
00669 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00670 my_rep = NULL;
00671 }
00672 };
00673
00674 template<typename Value>
00675 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
00676 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00677 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
00678 size_t k = my_rep->head_counter;
00679 if( !my_rep->get_item(my_item, k) ) advance();
00680 }
00681
00682 template<typename Value>
00683 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
00684 if( my_rep!=other.my_rep ) {
00685 if( my_rep ) {
00686 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00687 my_rep = NULL;
00688 }
00689 if( other.my_rep ) {
00690 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00691 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
00692 }
00693 }
00694 my_item = other.my_item;
00695 }
00696
00697 template<typename Value>
00698 void concurrent_queue_iterator_base_v3<Value>::advance() {
00699 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
00700 size_t k = my_rep->head_counter;
00701 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
00702 #if TBB_USE_ASSERT
00703 Value* tmp;
00704 my_rep->get_item(tmp,k);
00705 __TBB_ASSERT( my_item==tmp, NULL );
00706 #endif
00707 size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
00708 if( i==queue.my_rep->items_per_page-1 ) {
00709 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
00710 root = root->next;
00711 }
00712
00713 my_rep->head_counter = ++k;
00714 if( !my_rep->get_item(my_item, k) ) advance();
00715 }
00716
00718
00719 template<typename T> struct tbb_remove_cv {typedef T type;};
00720 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
00721 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
00722 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
00723
00725
00727 template<typename Container, typename Value>
00728 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
00729 public std::iterator<std::forward_iterator_tag,Value> {
00730 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00731 template<typename T, class A>
00732 friend class ::tbb::strict_ppl::concurrent_queue;
00733 #else
00734 public:
00735 #endif
00737 concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
00738 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
00739 {
00740 }
00741
00742 public:
00743 concurrent_queue_iterator() {}
00744
00745 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00746 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
00747 {}
00748
00750 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00751 this->assign(other);
00752 return *this;
00753 }
00754
00756 Value& operator*() const {
00757 return *static_cast<Value*>(this->my_item);
00758 }
00759
00760 Value* operator->() const {return &operator*();}
00761
00763 concurrent_queue_iterator& operator++() {
00764 this->advance();
00765 return *this;
00766 }
00767
00769 Value* operator++(int) {
00770 Value* result = &operator*();
00771 operator++();
00772 return result;
00773 }
00774 };
00775
00776
00777 template<typename C, typename T, typename U>
00778 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00779 return i.my_item==j.my_item;
00780 }
00781
00782 template<typename C, typename T, typename U>
00783 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00784 return i.my_item!=j.my_item;
00785 }
00786
00787 }
00788
00790
00791 }
00792
00794 namespace internal {
00795
00796 class concurrent_queue_rep;
00797 class concurrent_queue_iterator_rep;
00798 class concurrent_queue_iterator_base_v3;
00799 template<typename Container, typename Value> class concurrent_queue_iterator;
00800
00802
00804 class concurrent_queue_base_v3: no_copy {
00806 concurrent_queue_rep* my_rep;
00807
00808 friend class concurrent_queue_rep;
00809 friend struct micro_queue;
00810 friend class micro_queue_pop_finalizer;
00811 friend class concurrent_queue_iterator_rep;
00812 friend class concurrent_queue_iterator_base_v3;
00813 protected:
00815 struct page {
00816 page* next;
00817 uintptr_t mask;
00818 };
00819
00821 ptrdiff_t my_capacity;
00822
00824 size_t items_per_page;
00825
00827 size_t item_size;
00828
00829 #if __TBB_GCC_3_3_PROTECTED_BROKEN
00830 public:
00831 #endif
00832 template<typename T>
00833 struct padded_page: page {
00835 padded_page();
00837 void operator=( const padded_page& );
00839 T last;
00840 };
00841
00842 private:
00843 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
00844 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
00845 protected:
00846 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
00847 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
00848
00850 void __TBB_EXPORTED_METHOD internal_push( const void* src );
00851
00853 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
00854
00856 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
00857
00859
00860 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
00861
00863 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
00864
00866 bool __TBB_EXPORTED_METHOD internal_empty() const;
00867
00869 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
00870
00872 virtual page *allocate_page() = 0;
00873
00875 virtual void deallocate_page( page *p ) = 0;
00876
00878
00879 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
00880
00882 void __TBB_EXPORTED_METHOD internal_throw_exception() const;
00883
00885 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
00886
00887 private:
00888 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
00889 };
00890
00892
00893 class concurrent_queue_iterator_base_v3 {
00895
00896 concurrent_queue_iterator_rep* my_rep;
00897
00898 template<typename C, typename T, typename U>
00899 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00900
00901 template<typename C, typename T, typename U>
00902 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00903
00904 void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
00905 protected:
00907 void* my_item;
00908
00910 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
00911
00913 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00914 assign(i);
00915 }
00916
00918
00919 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
00920
00922 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
00923
00925 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
00926
00928 void __TBB_EXPORTED_METHOD advance();
00929
00931 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
00932 };
00933
00934 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
00935
00937
00939 template<typename Container, typename Value>
00940 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
00941 public std::iterator<std::forward_iterator_tag,Value> {
00942
00943 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00944 template<typename T, class A>
00945 friend class ::tbb::concurrent_bounded_queue;
00946
00947 template<typename T, class A>
00948 friend class ::tbb::deprecated::concurrent_queue;
00949 #else
00950 public:
00951 #endif
00953 concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
00954 concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
00955 {
00956 }
00957
00958 public:
00959 concurrent_queue_iterator() {}
00960
00963 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00964 concurrent_queue_iterator_base_v3(other)
00965 {}
00966
00968 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00969 assign(other);
00970 return *this;
00971 }
00972
00974 Value& operator*() const {
00975 return *static_cast<Value*>(my_item);
00976 }
00977
00978 Value* operator->() const {return &operator*();}
00979
00981 concurrent_queue_iterator& operator++() {
00982 advance();
00983 return *this;
00984 }
00985
00987 Value* operator++(int) {
00988 Value* result = &operator*();
00989 operator++();
00990 return result;
00991 }
00992 };
00993
00994
00995 template<typename C, typename T, typename U>
00996 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00997 return i.my_item==j.my_item;
00998 }
00999
01000 template<typename C, typename T, typename U>
01001 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
01002 return i.my_item!=j.my_item;
01003 }
01004
01005 }
01006
01008
01009 }
01010
01011 #endif