concurrent_priority_queue.h

00001 /*
00002     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_priority_queue_H
00022 #define __TBB_concurrent_priority_queue_H
00023 
00024 #if !TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE
00025 #error Set TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE to include concurrent_priority_queue.h
00026 #endif
00027 
00028 #include "atomic.h"
00029 #include "cache_aligned_allocator.h"
00030 #include "tbb_exception.h"
00031 #include "tbb_stddef.h"
00032 #include "tbb_profiling.h"
00033 #include "_aggregator_internal.h"
00034 #include <vector>
00035 #include <iterator>
00036 #include <functional>
00037 
00038 namespace tbb {
00039 namespace interface5 {
00040 
00041 using namespace tbb::internal;
00042 
00044 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
00045 class concurrent_priority_queue {
00046  public:
00048     typedef T value_type;
00049 
00051     typedef T& reference;
00052 
00054     typedef const T& const_reference;
00055 
00057     typedef size_t size_type;
00058 
00060     typedef ptrdiff_t difference_type;
00061 
00063     typedef A allocator_type;
00064 
00066     explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), data(a) {
00067         my_aggregator.initialize_handler(my_functor_t(this));
00068     }
00069 
00071     explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : mark(0), data(a) {
00072         data.reserve(init_capacity);
00073         my_aggregator.initialize_handler(my_functor_t(this));
00074     }
00075 
00077     template<typename InputIterator>
00078     concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : data(begin, end, a)
00079     {
00080         mark = data.size();
00081         my_aggregator.initialize_handler(my_functor_t(this));
00082         heapify();
00083     }
00084 
00086 
00088     explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark), data(src.data.begin(), src.data.end(), src.data.get_allocator())
00089     {
00090         my_aggregator.initialize_handler(my_functor_t(this));
00091         heapify();
00092     }
00093 
00094     concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark), data(src.data.begin(), src.data.end(), a)
00095     {
00096         my_aggregator.initialize_handler(my_functor_t(this));
00097         heapify();
00098     }
00099 
00101 
00103     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
00104         if (this != &src) {
00105             std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
00106             mark = src.mark;
00107         }
00108         return *this;
00109     }
00110 
00112 
00113     bool empty() const { return data.empty(); }
00114 
00116 
00117     size_type size() const { return data.size(); }
00118 
00120 
00121     size_type capacity() const { return data.capacity(); }
00122 
00124     void push(const_reference elem) {
00125         cpq_operation op_data(elem, PUSH_OP);
00126         my_aggregator.execute(&op_data);
00127         if (op_data.status == FAILED) // exception thrown
00128             throw_exception(eid_bad_alloc);
00129     }
00130 
00132 
00134     bool try_pop(reference elem) {
00135         cpq_operation op_data(POP_OP);
00136         op_data.elem = &elem;
00137         my_aggregator.execute(&op_data);
00138         return op_data.status==SUCCEEDED;
00139     }
00140 
00142     void reserve(size_type new_cap) {
00143         cpq_operation op_data(RESERVE_OP);
00144         op_data.sz = new_cap;
00145         my_aggregator.execute(&op_data);
00146         if (op_data.status == FAILED) // exception thrown
00147             throw_exception(eid_bad_alloc);
00148     }
00149 
00151 
00153     void clear() {
00154         data.clear();
00155         mark = 0;
00156     }
00157 
00159     void shrink_to_fit() {
00160         std::vector<value_type, allocator_type>(data.begin(), data.end(), data.get_allocator()).swap(data);
00161     }
00162 
00164     void swap(concurrent_priority_queue& q) {
00165         data.swap(q.data);
00166         std::swap(mark, q.mark);
00167     }
00168 
00170     allocator_type get_allocator() const { return data.get_allocator(); }
00171 
00172  private:
00173     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, RESERVE_OP};
00174     enum operation_status { WAIT=0, SUCCEEDED, FAILED };
00175 
00176     class cpq_operation : public aggregated_operation<cpq_operation> {
00177      public:
00178         operation_type type;
00179         union {
00180             value_type *elem;
00181             size_type sz;
00182         };
00183         cpq_operation(const_reference e, operation_type t) :
00184             type(t), elem(const_cast<value_type*>(&e)) {}
00185         cpq_operation(operation_type t) : type(t) {}
00186     };
00187 
00188     class my_functor_t {
00189         concurrent_priority_queue<T, Compare, A> *cpq;
00190      public:
00191         my_functor_t() {}
00192         my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
00193         void operator()(cpq_operation* op_list) {
00194             cpq->handle_operations(op_list);
00195         }
00196     };
00197 
00198     aggregator< my_functor_t, cpq_operation> my_aggregator;
00200     char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
00202     size_type mark;
00203     Compare compare;
00205     char padding2[NFS_MaxLineSize - sizeof(size_type) - sizeof(Compare)];
00207 
00225     std::vector<value_type, allocator_type> data;
00226 
00227     void handle_operations(cpq_operation *op_list) {
00228         cpq_operation *tmp, *pop_list=NULL;
00229 
00230         __TBB_ASSERT(mark == data.size(), NULL);
00231 
00232         // first pass processes all constant time operations: pushes,
00233         // tops, some pops. Also reserve.
00234         while (op_list) {
00235             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
00236             // node. This thread is going to handle the operation, and so will acquire it
00237             // and perform the associated operation w/o triggering a race condition; the
00238             // thread that created the operation is waiting on the status field, so when
00239             // this thread is done with the operation, it will perform a
00240             // store_with_release to give control back to the waiting thread in
00241             // aggregator::insert_operation.
00242             call_itt_notify(acquired, &(op_list->status));
00243             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
00244             tmp = op_list;
00245             op_list = itt_hide_load_word(op_list->next);
00246             if (tmp->type == PUSH_OP) {
00247                 __TBB_TRY {
00248                     data.push_back(*(tmp->elem));
00249                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00250                 } __TBB_CATCH(...) {
00251                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
00252                 }
00253             }
00254             else if (tmp->type == POP_OP) {
00255                 if (mark < data.size() &&
00256                     compare(data[0], data[data.size()-1])) {
00257                     // there are newly pushed elems and the last one
00258                     // is higher than top
00259                     *(tmp->elem) = data[data.size()-1]; // copy the data
00260                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00261                     data.pop_back();
00262                     __TBB_ASSERT(mark<=data.size(), NULL);
00263                 }
00264                 else { // no convenient item to pop; postpone
00265                     itt_hide_store_word(tmp->next, pop_list);
00266                     pop_list = tmp;
00267                 }
00268             }
00269             else {
00270                 __TBB_ASSERT(tmp->type == RESERVE_OP, NULL);
00271                 __TBB_TRY {
00272                     data.reserve(tmp->sz);
00273                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00274                 } __TBB_CATCH(...) {
00275                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
00276                 }
00277             }
00278         }
00279 
00280         // second pass processes pop operations
00281         while (pop_list) {
00282             tmp = pop_list;
00283             pop_list = itt_hide_load_word(pop_list->next);
00284             __TBB_ASSERT(tmp->type == POP_OP, NULL);
00285             if (data.empty()) {
00286                 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
00287             }
00288             else {
00289                 __TBB_ASSERT(mark<=data.size(), NULL);
00290                 if (mark < data.size() &&
00291                     compare(data[0], data[data.size()-1])) {
00292                     // there are newly pushed elems and the last one is
00293                     // higher than top
00294                     *(tmp->elem) = data[data.size()-1]; // copy the data
00295                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00296                     data.pop_back();
00297                 }
00298                 else { // extract top and push last element down heap
00299                     *(tmp->elem) = data[0]; // copy the data
00300                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00301                     reheap();
00302                 }
00303             }
00304         }
00305 
00306         // heapify any leftover pushed elements before doing the next
00307         // batch of operations
00308         if (mark<data.size()) heapify();
00309         __TBB_ASSERT(mark == data.size(), NULL);
00310     }
00311 
00313     void heapify() {
00314         if (!mark) mark = 1;
00315         for (; mark<data.size(); ++mark) {
00316             // for each unheapified element under size
00317             size_type cur_pos = mark;
00318             value_type to_place = data[mark];
00319             do { // push to_place up the heap
00320                 size_type parent = (cur_pos-1)>>1;
00321                 if (!compare(data[parent], to_place)) break;
00322                 data[cur_pos] = data[parent];
00323                 cur_pos = parent;
00324             } while( cur_pos );
00325             data[cur_pos] = to_place;
00326         }
00327     }
00328 
00330 
00331     void reheap() {
00332         size_type cur_pos=0, child=1;
00333 
00334         while (child < mark) {
00335             size_type target = child;
00336             if (child+1 < mark && compare(data[child], data[child+1]))
00337                 ++target;
00338             // target now has the higher priority child
00339             if (compare(data[target], data[data.size()-1])) break;
00340             data[cur_pos] = data[target];
00341             cur_pos = target;
00342             child = (cur_pos<<1)+1;
00343         }
00344         data[cur_pos] = data[data.size()-1];
00345         data.pop_back();
00346         if (mark > data.size()) mark = data.size();
00347     }
00348 };
00349 
00350 } // namespace interface5
00351 
00352 using interface5::concurrent_priority_queue;
00353 
00354 } // namespace tbb
00355 
00356 #endif /* __TBB_concurrent_priority_queue_H */

Copyright © 2005-2011 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.