• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • Examples
  • File List

priority_queue.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/containers/priority_queue.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 1999 Peter Sanders <sanders@mpi-sb.mpg.de>
00007  *  Copyright (C) 2003, 2004, 2007 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00008  *  Copyright (C) 2007, 2009 Johannes Singler <singler@ira.uka.de>
00009  *  Copyright (C) 2007-2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00010  *
00011  *  Distributed under the Boost Software License, Version 1.0.
00012  *  (See accompanying file LICENSE_1_0.txt or copy at
00013  *  http://www.boost.org/LICENSE_1_0.txt)
00014  **************************************************************************/
00015 
00016 #ifndef STXXL_PRIORITY_QUEUE_HEADER
00017 #define STXXL_PRIORITY_QUEUE_HEADER
00018 
00019 #include <vector>
00020 
00021 #include <stxxl/bits/mng/typed_block.h>
00022 #include <stxxl/bits/mng/block_alloc.h>
00023 #include <stxxl/bits/mng/read_write_pool.h>
00024 #include <stxxl/bits/mng/prefetch_pool.h>
00025 #include <stxxl/bits/mng/write_pool.h>
00026 #include <stxxl/bits/common/tmeta.h>
00027 #include <stxxl/bits/algo/sort_base.h>
00028 #include <stxxl/bits/parallel.h>
00029 #include <stxxl/bits/common/is_sorted.h>
00030 
00031 #if defined(_GLIBCXX_PARALLEL) && ((__GNUC__ * 10000 + __GNUC_MINOR__ * 100) < 40400)
00032 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00033 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00034 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00035 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 0
00036 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 0
00037 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 0
00038 #endif
00039 
00040 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00041 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 1
00042 #endif
00043 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00044 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 1
00045 #endif
00046 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00047 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 1
00048 #endif
00049 
00050 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00051 #define STXXL_PQ_EXTERNAL_LOSER_TREE 0 // no loser tree for the external sequences
00052 #else
00053 #define STXXL_PQ_EXTERNAL_LOSER_TREE 1
00054 #endif
00055 
00056 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00057 #define STXXL_PQ_INTERNAL_LOSER_TREE 0 // no loser tree for the internal sequences
00058 #else
00059 #define STXXL_PQ_INTERNAL_LOSER_TREE 1
00060 #endif
00061 
00062 #include <stxxl/bits/containers/pq_helpers.h>
00063 #include <stxxl/bits/containers/pq_mergers.h>
00064 #include <stxxl/bits/containers/pq_ext_merger.h>
00065 #include <stxxl/bits/containers/pq_losertree.h>
00066 
00067 __STXXL_BEGIN_NAMESPACE
00068 
00069 /*
00070    KNBufferSize1 = 32;
00071    KNN = 512; // length of group 1 sequences
00072    KNKMAX = 64;  // maximal arity
00073    LogKNKMAX = 6;  // ceil(log KNKMAX)
00074    KNLevels = 4; // overall capacity >= KNN*KNKMAX^KNLevels
00075  */
00076 
00077 // internal memory consumption >= N_*(KMAX_^IntLevels_) + ext
00078 
00079 template <
00080     class Tp_,
00081     class Cmp_,
00082     unsigned BufferSize1_ = 32,                    // equalize procedure call overheads etc.
00083     unsigned N_ = 512,                             // length of group 1 sequences
00084     unsigned IntKMAX_ = 64,                        // maximal arity for internal mergers
00085     unsigned IntLevels_ = 4,                       // number of internal groups
00086     unsigned BlockSize_ = (2 * 1024 * 1024),       // external block size
00087     unsigned ExtKMAX_ = 64,                        // maximal arity for external mergers
00088     unsigned ExtLevels_ = 2,                       // number of external groups
00089     class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY
00090     >
00091 struct priority_queue_config
00092 {
00093     typedef Tp_ value_type;
00094     typedef Cmp_ comparator_type;
00095     typedef AllocStr_ alloc_strategy_type;
00096     enum
00097     {
00098         delete_buffer_size = BufferSize1_,
00099         N = N_,
00100         IntKMAX = IntKMAX_,
00101         num_int_groups = IntLevels_,
00102         num_ext_groups = ExtLevels_,
00103         BlockSize = BlockSize_,
00104         ExtKMAX = ExtKMAX_,
00105         element_size = sizeof(Tp_)
00106     };
00107 };
00108 
00109 __STXXL_END_NAMESPACE
00110 
00111 namespace std
00112 {
00113     template <class BlockType_,
00114               class Cmp_,
00115               unsigned Arity_,
00116               class AllocStr_>
00117     void swap(stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & a,
00118               stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & b)
00119     {
00120         a.swap(b);
00121     }
00122     template <class ValTp_, class Cmp_, unsigned KNKMAX>
00123     void swap(stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & a,
00124               stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & b)
00125     {
00126         a.swap(b);
00127     }
00128 }
00129 
00130 __STXXL_BEGIN_NAMESPACE
00131 
00133 template <class Config_>
00134 class priority_queue : private noncopyable
00135 {
00136 public:
00137     typedef Config_ Config;
00138     enum
00139     {
00140         delete_buffer_size = Config::delete_buffer_size,
00141         N = Config::N,
00142         IntKMAX = Config::IntKMAX,
00143         num_int_groups = Config::num_int_groups,
00144         num_ext_groups = Config::num_ext_groups,
00145         total_num_groups = Config::num_int_groups + Config::num_ext_groups,
00146         BlockSize = Config::BlockSize,
00147         ExtKMAX = Config::ExtKMAX
00148     };
00149 
00151     typedef typename Config::value_type value_type;
00153     typedef typename Config::comparator_type comparator_type;
00154     typedef typename Config::alloc_strategy_type alloc_strategy_type;
00156     typedef stxxl::uint64 size_type;
00157     typedef typed_block<BlockSize, value_type> block_type;
00158     typedef read_write_pool<block_type> pool_type;
00159 
00160 protected:
00161     typedef priority_queue_local::internal_priority_queue<value_type, std::vector<value_type>, comparator_type>
00162     insert_heap_type;
00163 
00164     typedef priority_queue_local::loser_tree<
00165         value_type,
00166         comparator_type,
00167         IntKMAX> int_merger_type;
00168 
00169     typedef priority_queue_local::ext_merger<
00170         block_type,
00171         comparator_type,
00172         ExtKMAX,
00173         alloc_strategy_type> ext_merger_type;
00174 
00175 
00176     int_merger_type int_mergers[num_int_groups];
00177     pool_type * pool;
00178     bool pool_owned;
00179     ext_merger_type * ext_mergers;
00180 
00181     // one delete buffer for each tree => group buffer
00182     value_type group_buffers[total_num_groups][N + 1];          // tree->group_buffers->delete_buffer (extra space for sentinel)
00183     value_type * group_buffer_current_mins[total_num_groups];   // group_buffer_current_mins[i] is current start of group_buffers[i], end is group_buffers[i] + N
00184 
00185     // overall delete buffer
00186     value_type delete_buffer[delete_buffer_size + 1];
00187     value_type * delete_buffer_current_min;                     // current start of delete_buffer
00188     value_type * delete_buffer_end;                             // end of delete_buffer
00189 
00190     comparator_type cmp;
00191 
00192     // insert buffer
00193     insert_heap_type insert_heap;
00194 
00195     // how many groups are active
00196     unsigned_type num_active_groups;
00197 
00198     // total size not counting insert_heap and delete_buffer
00199     size_type size_;
00200 
00201 private:
00202     void init();
00203 
00204     void refill_delete_buffer();
00205     unsigned_type refill_group_buffer(unsigned_type k);
00206 
00207     unsigned_type make_space_available(unsigned_type level);
00208     void empty_insert_heap();
00209 
00210     value_type get_supremum() const { return cmp.min_value(); } //{ return group_buffers[0][KNN].key; }
00211     unsigned_type current_delete_buffer_size() const { return delete_buffer_end - delete_buffer_current_min; }
00212     unsigned_type current_group_buffer_size(unsigned_type i) const { return &(group_buffers[i][N]) - group_buffer_current_mins[i]; }
00213 
00214 public:
00220     priority_queue(pool_type & pool_);
00221 
00231     _STXXL_DEPRECATED(priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_));
00232 
00242     priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem);
00243 
00246     void swap(priority_queue & obj)
00247     {
00248         //swap_1D_arrays(int_mergers,obj.int_mergers,num_int_groups); // does not work in g++ 3.4.3 :( bug?
00249         for (unsigned_type i = 0; i < num_int_groups; ++i)
00250             std::swap(int_mergers[i], obj.int_mergers[i]);
00251 
00252         //std::swap(pool,obj.pool);
00253         //std::swap(pool_owned, obj.pool_owned);
00254         std::swap(ext_mergers, obj.ext_mergers);
00255         for (unsigned_type i1 = 0; i1 < total_num_groups; ++i1)
00256             for (unsigned_type i2 = 0; i2 < (N + 1); ++i2)
00257                 std::swap(group_buffers[i1][i2], obj.group_buffers[i1][i2]);
00258 
00259         swap_1D_arrays(group_buffer_current_mins, obj.group_buffer_current_mins, total_num_groups);
00260         swap_1D_arrays(delete_buffer, obj.delete_buffer, delete_buffer_size + 1);
00261         std::swap(delete_buffer_current_min, obj.delete_buffer_current_min);
00262         std::swap(delete_buffer_end, obj.delete_buffer_end);
00263         std::swap(cmp, obj.cmp);
00264         std::swap(insert_heap, obj.insert_heap);
00265         std::swap(num_active_groups, obj.num_active_groups);
00266         std::swap(size_, obj.size_);
00267     }
00268 
00269     virtual ~priority_queue();
00270 
00273     size_type size() const;
00274 
00277     bool empty() const { return (size() == 0); }
00278 
00290     const value_type & top() const;
00291 
00298     void pop();
00299 
00304     void push(const value_type & obj);
00305 
00310     unsigned_type mem_cons() const
00311     {
00312         unsigned_type dynam_alloc_mem = 0;
00313         //dynam_alloc_mem += w_pool.mem_cons();
00314         //dynam_alloc_mem += p_pool.mem_cons();
00315         for (int i = 0; i < num_int_groups; ++i)
00316             dynam_alloc_mem += int_mergers[i].mem_cons();
00317 
00318         for (int i = 0; i < num_ext_groups; ++i)
00319             dynam_alloc_mem += ext_mergers[i].mem_cons();
00320 
00321 
00322         return (sizeof(*this) +
00323                 sizeof(ext_merger_type) * num_ext_groups +
00324                 dynam_alloc_mem);
00325     }
00326 };
00327 
00328 
00329 template <class Config_>
00330 inline typename priority_queue<Config_>::size_type priority_queue<Config_>::size() const
00331 {
00332     return size_ +
00333            insert_heap.size() - 1 +
00334            (delete_buffer_end - delete_buffer_current_min);
00335 }
00336 
00337 
00338 template <class Config_>
00339 inline const typename priority_queue<Config_>::value_type & priority_queue<Config_>::top() const
00340 {
00341     assert(!insert_heap.empty());
00342 
00343     const typename priority_queue<Config_>::value_type & t = insert_heap.top();
00344     if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, t))
00345         return t;
00346     else
00347         return *delete_buffer_current_min;
00348 }
00349 
00350 template <class Config_>
00351 inline void priority_queue<Config_>::pop()
00352 {
00353     //STXXL_VERBOSE1("priority_queue::pop()");
00354     assert(!insert_heap.empty());
00355 
00356     if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, insert_heap.top()))
00357         insert_heap.pop();
00358     else
00359     {
00360         assert(delete_buffer_current_min < delete_buffer_end);
00361         ++delete_buffer_current_min;
00362         if (delete_buffer_current_min == delete_buffer_end)
00363             refill_delete_buffer();
00364     }
00365 }
00366 
00367 template <class Config_>
00368 inline void priority_queue<Config_>::push(const value_type & obj)
00369 {
00370     //STXXL_VERBOSE3("priority_queue::push("<< obj <<")");
00371     assert(int_mergers->not_sentinel(obj));
00372     if (insert_heap.size() == N + 1)
00373         empty_insert_heap();
00374 
00375 
00376     assert(!insert_heap.empty());
00377 
00378     insert_heap.push(obj);
00379 }
00380 
00381 
00383 
00384 template <class Config_>
00385 priority_queue<Config_>::priority_queue(pool_type & pool_) :
00386     pool(&pool_),
00387     pool_owned(false),
00388     delete_buffer_end(delete_buffer + delete_buffer_size),
00389     insert_heap(N + 2),
00390     num_active_groups(0), size_(0)
00391 {
00392     STXXL_VERBOSE2("priority_queue::priority_queue(pool)");
00393     init();
00394 }
00395 
00396 // DEPRECATED
00397 template <class Config_>
00398 priority_queue<Config_>::priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_) :
00399     pool(new pool_type(p_pool_, w_pool_)),
00400     pool_owned(true),
00401     delete_buffer_end(delete_buffer + delete_buffer_size),
00402     insert_heap(N + 2),
00403     num_active_groups(0), size_(0)
00404 {
00405     STXXL_VERBOSE2("priority_queue::priority_queue(p_pool, w_pool)");
00406     init();
00407 }
00408 
00409 template <class Config_>
00410 priority_queue<Config_>::priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem) :
00411     pool(new pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
00412     pool_owned(true),
00413     delete_buffer_end(delete_buffer + delete_buffer_size),
00414     insert_heap(N + 2),
00415     num_active_groups(0), size_(0)
00416 {
00417     STXXL_VERBOSE2("priority_queue::priority_queue(pool sizes)");
00418     init();
00419 }
00420 
00421 template <class Config_>
00422 void priority_queue<Config_>::init()
00423 {
00424     assert(!cmp(cmp.min_value(), cmp.min_value())); // verify strict weak ordering
00425 
00426     ext_mergers = new ext_merger_type[num_ext_groups];
00427     for (unsigned_type j = 0; j < num_ext_groups; ++j)
00428         ext_mergers[j].set_pool(pool);
00429 
00430     value_type sentinel = cmp.min_value();
00431     insert_heap.push(sentinel);                                // always keep the sentinel
00432     delete_buffer[delete_buffer_size] = sentinel;              // sentinel
00433     delete_buffer_current_min = delete_buffer_end;             // empty
00434     for (unsigned_type i = 0; i < total_num_groups; i++)
00435     {
00436         group_buffers[i][N] = sentinel;                        // sentinel
00437         group_buffer_current_mins[i] = &(group_buffers[i][N]); // empty
00438     }
00439 }
00440 
00441 template <class Config_>
00442 priority_queue<Config_>::~priority_queue()
00443 {
00444     STXXL_VERBOSE2("priority_queue::~priority_queue()");
00445     if (pool_owned)
00446         delete pool;
00447 
00448     delete[] ext_mergers;
00449 }
00450 
00451 //--------------------- Buffer refilling -------------------------------
00452 
00453 // refill group_buffers[j] and return number of elements found
00454 template <class Config_>
00455 unsigned_type priority_queue<Config_>::refill_group_buffer(unsigned_type group)
00456 {
00457     STXXL_VERBOSE2("priority_queue::refill_group_buffer(" << group << ")");
00458 
00459     value_type * target;
00460     unsigned_type length;
00461     size_type group_size = (group < num_int_groups) ?
00462                            int_mergers[group].size() :
00463                            ext_mergers[group - num_int_groups].size();                         //elements left in segments
00464     unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group]; //elements left in target buffer
00465     if (group_size + left_elements >= size_type(N))
00466     {                                                                                          // buffer will be filled completely
00467         target = group_buffers[group];
00468         length = N - left_elements;
00469     }
00470     else
00471     {
00472         target = group_buffers[group] + N - group_size - left_elements;
00473         length = group_size;
00474     }
00475 
00476     if (length > 0)
00477     {
00478         // shift remaininig elements to front
00479         memmove(target, group_buffer_current_mins[group], left_elements * sizeof(value_type));
00480         group_buffer_current_mins[group] = target;
00481 
00482         // fill remaining space from group
00483         if (group < num_int_groups)
00484             int_mergers[group].multi_merge(target + left_elements, length);
00485         else
00486             ext_mergers[group - num_int_groups].multi_merge(
00487                 target + left_elements,
00488                 target + left_elements + length);
00489     }
00490 
00491     //STXXL_MSG(length + left_elements);
00492     //std::copy(target,target + length + left_elements,std::ostream_iterator<value_type>(std::cout, "\n"));
00493 #if STXXL_CHECK_ORDER_IN_SORTS
00494     priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp);
00495     if (!stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
00496     {
00497         STXXL_VERBOSE2("length: " << length << " left_elements: " << left_elements);
00498         for (value_type * v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
00499         {
00500             if (inv_cmp(*v, *(v - 1)))
00501             {
00502                 STXXL_MSG("Error in buffer " << group << " at position " << (v - group_buffer_current_mins[group] - 1) << "/" << (v - group_buffer_current_mins[group]) << "   " << *(v - 2) << " " << *(v - 1) << " " << *v << " " << *(v + 1));
00503             }
00504         }
00505         assert(false);
00506     }
00507 #endif
00508 
00509     return length + left_elements;
00510 }
00511 
00512 
00513 template <class Config_>
00514 void priority_queue<Config_>::refill_delete_buffer()
00515 {
00516     STXXL_VERBOSE2("priority_queue::refill_delete_buffer()");
00517 
00518     size_type total_group_size = 0;
00519     //num_active_groups is <= 4
00520     for (int i = num_active_groups - 1; i >= 0; i--)
00521     {
00522         if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
00523         {
00524             unsigned_type length = refill_group_buffer(i);
00525             // max active level dry now?
00526             if (length == 0 && unsigned(i) == num_active_groups - 1)
00527                 --num_active_groups;
00528 
00529             total_group_size += length;
00530         }
00531         else
00532             total_group_size += delete_buffer_size;  // actually only a sufficient lower bound
00533     }
00534 
00535     unsigned_type length;
00536     if (total_group_size >= delete_buffer_size)      // buffer can be filled completely
00537     {
00538         length = delete_buffer_size;                 // amount to be copied
00539         size_ -= size_type(delete_buffer_size);      // amount left in group_buffers
00540     }
00541     else
00542     {
00543         length = total_group_size;
00544         assert(size_ == size_type(length)); // trees and group_buffers get empty
00545         size_ = 0;
00546     }
00547 
00548     priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp);
00549 
00550     // now call simplified refill routines
00551     // which can make the assumption that
00552     // they find all they are asked in the buffers
00553     delete_buffer_current_min = delete_buffer_end - length;
00554     STXXL_VERBOSE2("Active groups = " << num_active_groups);
00555     switch (num_active_groups)
00556     {
00557     case 0:
00558         break;
00559     case 1:
00560         std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
00561         group_buffer_current_mins[0] += length;
00562         break;
00563     case 2:
00564 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00565         {
00566             std::pair<value_type *, value_type *> seqs[2] =
00567             {
00568                 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00569                 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
00570             };
00571             parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
00572 
00573             group_buffer_current_mins[0] = seqs[0].first;
00574             group_buffer_current_mins[1] = seqs[1].first;
00575         }
00576 #else
00577         priority_queue_local::merge_iterator(
00578             group_buffer_current_mins[0],
00579             group_buffer_current_mins[1], delete_buffer_current_min, length, cmp);
00580 #endif
00581         break;
00582     case 3:
00583 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00584         {
00585             std::pair<value_type *, value_type *> seqs[3] =
00586             {
00587                 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00588                 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
00589                 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
00590             };
00591             parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
00592 
00593             group_buffer_current_mins[0] = seqs[0].first;
00594             group_buffer_current_mins[1] = seqs[1].first;
00595             group_buffer_current_mins[2] = seqs[2].first;
00596         }
00597 #else
00598         priority_queue_local::merge3_iterator(
00599             group_buffer_current_mins[0],
00600             group_buffer_current_mins[1],
00601             group_buffer_current_mins[2], delete_buffer_current_min, length, cmp);
00602 #endif
00603         break;
00604     case 4:
00605 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00606         {
00607             std::pair<value_type *, value_type *> seqs[4] =
00608             {
00609                 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00610                 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
00611                 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
00612                 std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
00613             };
00614             parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
00615 
00616             group_buffer_current_mins[0] = seqs[0].first;
00617             group_buffer_current_mins[1] = seqs[1].first;
00618             group_buffer_current_mins[2] = seqs[2].first;
00619             group_buffer_current_mins[3] = seqs[3].first;
00620         }
00621 #else
00622         priority_queue_local::merge4_iterator(
00623             group_buffer_current_mins[0],
00624             group_buffer_current_mins[1],
00625             group_buffer_current_mins[2],
00626             group_buffer_current_mins[3], delete_buffer_current_min, length, cmp); //side effect free
00627 #endif
00628         break;
00629     default:
00630         STXXL_THROW(std::runtime_error, "priority_queue<...>::refill_delete_buffer()",
00631                     "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
00632     }
00633 
00634 #if STXXL_CHECK_ORDER_IN_SORTS
00635     if (!stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
00636     {
00637         for (value_type * v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
00638         {
00639             if (inv_cmp(*v, *(v - 1)))
00640             {
00641                 STXXL_MSG("Error at position " << (v - delete_buffer_current_min - 1) << "/" << (v - delete_buffer_current_min) << "   " << *(v - 1) << " " << *v);
00642             }
00643         }
00644         assert(false);
00645     }
00646 #endif
00647     //std::copy(delete_buffer_current_min,delete_buffer_current_min + length,std::ostream_iterator<value_type>(std::cout, "\n"));
00648 }
00649 
00650 //--------------------------------------------------------------------
00651 
00652 // check if space is available on level k and
00653 // empty this level if necessary leading to a recursive call.
00654 // return the level where space was finally available
00655 template <class Config_>
00656 unsigned_type priority_queue<Config_>::make_space_available(unsigned_type level)
00657 {
00658     STXXL_VERBOSE2("priority_queue::make_space_available(" << level << ")");
00659     unsigned_type finalLevel;
00660     assert(level < total_num_groups);
00661     assert(level <= num_active_groups);
00662 
00663     if (level == num_active_groups)
00664         ++num_active_groups;
00665 
00666     const bool spaceIsAvailable_ =
00667         (level < num_int_groups) ? int_mergers[level].is_space_available()
00668         : ((level == total_num_groups - 1) ? true : (ext_mergers[level - num_int_groups].is_space_available()));
00669 
00670     if (spaceIsAvailable_)
00671     {
00672         finalLevel = level;
00673     }
00674     else
00675     {
00676         finalLevel = make_space_available(level + 1);
00677 
00678         if (level < num_int_groups - 1)                                  // from internal to internal tree
00679         {
00680             unsigned_type segmentSize = int_mergers[level].size();
00681             value_type * newSegment = new value_type[segmentSize + 1];
00682             int_mergers[level].multi_merge(newSegment, segmentSize);     // empty this level
00683 
00684             newSegment[segmentSize] = delete_buffer[delete_buffer_size]; // sentinel
00685             // for queues where size << #inserts
00686             // it might make sense to stay in this level if
00687             // segmentSize < alpha * KNN * k^level for some alpha < 1
00688             int_mergers[level + 1].insert_segment(newSegment, segmentSize);
00689         }
00690         else
00691         {
00692             if (level == num_int_groups - 1) // from internal to external tree
00693             {
00694                 const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
00695                 STXXL_VERBOSE1("Inserting segment into first level external: " << level << " " << segmentSize);
00696                 ext_mergers[0].insert_segment(int_mergers[num_int_groups - 1], segmentSize);
00697             }
00698             else // from external to external tree
00699             {
00700                 const size_type segmentSize = ext_mergers[level - num_int_groups].size();
00701                 STXXL_VERBOSE1("Inserting segment into second level external: " << level << " " << segmentSize);
00702                 ext_mergers[level - num_int_groups + 1].insert_segment(ext_mergers[level - num_int_groups], segmentSize);
00703             }
00704         }
00705     }
00706     return finalLevel;
00707 }
00708 
00709 
00710 // empty the insert heap into the main data structure
00711 template <class Config_>
00712 void priority_queue<Config_>::empty_insert_heap()
00713 {
00714     STXXL_VERBOSE2("priority_queue::empty_insert_heap()");
00715     assert(insert_heap.size() == (N + 1));
00716 
00717     const value_type sup = get_supremum();
00718 
00719     // build new segment
00720     value_type * newSegment = new value_type[N + 1];
00721     value_type * newPos = newSegment;
00722 
00723     // put the new data there for now
00724     //insert_heap.sortTo(newSegment);
00725     value_type * SortTo = newSegment;
00726 
00727     insert_heap.sort_to(SortTo);
00728 
00729     SortTo = newSegment + N;
00730     insert_heap.clear();
00731     insert_heap.push(*SortTo);
00732 
00733     assert(insert_heap.size() == 1);
00734 
00735     newSegment[N] = sup; // sentinel
00736 
00737     // copy the delete_buffer and group_buffers[0] to temporary storage
00738     // (the temporary can be eliminated using some dirty tricks)
00739     const unsigned_type tempSize = N + delete_buffer_size;
00740     value_type temp[tempSize + 1];
00741     unsigned_type sz1 = current_delete_buffer_size();
00742     unsigned_type sz2 = current_group_buffer_size(0);
00743     value_type * pos = temp + tempSize - sz1 - sz2;
00744     std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
00745     std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
00746     temp[tempSize] = sup; // sentinel
00747 
00748     // refill delete_buffer
00749     // (using more complicated code it could be made somewhat fuller
00750     // in certain circumstances)
00751     priority_queue_local::merge_iterator(pos, newPos, delete_buffer_current_min, sz1, cmp);
00752 
00753     // refill group_buffers[0]
00754     // (as above we might want to take the opportunity
00755     // to make group_buffers[0] fuller)
00756     priority_queue_local::merge_iterator(pos, newPos, group_buffer_current_mins[0], sz2, cmp);
00757 
00758     // merge the rest to the new segment
00759     // note that merge exactly trips into the footsteps
00760     // of itself
00761     priority_queue_local::merge_iterator(pos, newPos, newSegment, N, cmp);
00762 
00763     // and insert it
00764     unsigned_type freeLevel = make_space_available(0);
00765     assert(freeLevel == 0 || int_mergers[0].size() == 0);
00766     int_mergers[0].insert_segment(newSegment, N);
00767 
00768     // get rid of invalid level 2 buffers
00769     // by inserting them into tree 0 (which is almost empty in this case)
00770     if (freeLevel > 0)
00771     {
00772         for (int_type i = freeLevel; i >= 0; i--)
00773         {
00774             // reverse order not needed
00775             // but would allow immediate refill
00776 
00777             newSegment = new value_type[current_group_buffer_size(i) + 1]; // with sentinel
00778             std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
00779             int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i));
00780             group_buffer_current_mins[i] = group_buffers[i] + N;           // empty
00781         }
00782     }
00783 
00784     // update size
00785     size_ += size_type(N);
00786 
00787     // special case if the tree was empty before
00788     if (delete_buffer_current_min == delete_buffer_end)
00789         refill_delete_buffer();
00790 }
00791 
00792 namespace priority_queue_local
00793 {
00794     struct Parameters_for_priority_queue_not_found_Increase_IntM
00795     {
00796         enum { fits = false };
00797         typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
00798     };
00799 
00800     struct dummy
00801     {
00802         enum { fits = false };
00803         typedef dummy result;
00804     };
00805 
00806     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_, bool stop = false>
00807     struct find_B_m
00808     {
00809         typedef find_B_m<E_, IntM_, MaxS_, B_, m_, stop> Self;
00810         enum {
00811             k = IntM_ / B_,    // number of blocks that fit into M
00812             element_size = E_, // element size
00813             IntM = IntM_,
00814             B = B_,            // block size
00815             m = m_,            // number of blocks fitting into buffers
00816             c = k - m_,
00817             // memory occ. by block must be at least 10 times larger than size of ext sequence
00818             // && satisfy memory req && if we have two ext mergers their degree must be at least 64=m/2
00819             fits = c > 10 && ((k - m) * (m) * (m * B / (element_size * 4 * 1024))) >= MaxS_
00820                    && ((MaxS_ < ((k - m) * m / (2 * element_size)) * 1024) || m >= 128),
00821             step = 1
00822         };
00823 
00824         typedef typename find_B_m<element_size, IntM, MaxS_, B, m + step, fits || (m >= k - step)>::result candidate1;
00825         typedef typename find_B_m<element_size, IntM, MaxS_, B / 2, 1, fits || candidate1::fits>::result candidate2;
00826         typedef typename IF<fits, Self, typename IF<candidate1::fits, candidate1, candidate2>::result>::result result;
00827     };
00828 
00829     // specialization for the case when no valid parameters are found
00830     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, bool stop>
00831     struct find_B_m<E_, IntM_, MaxS_, 2048, 1, stop>
00832     {
00833         enum { fits = false };
00834         typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
00835     };
00836 
00837     // to speedup search
00838     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_>
00839     struct find_B_m<E_, IntM_, MaxS_, B_, m_, true>
00840     {
00841         enum { fits = false };
00842         typedef dummy result;
00843     };
00844 
00845     // E_ size of element in bytes
00846     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_>
00847     struct find_settings
00848     {
00849         // start from block size (8*1024*1024) bytes
00850         typedef typename find_B_m<E_, IntM_, MaxS_, (8 * 1024 * 1024), 1>::result result;
00851     };
00852 
00853     struct Parameters_not_found_Try_to_change_the_Tune_parameter
00854     {
00855         typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
00856     };
00857 
00858 
00859     template <unsigned_type AI_, unsigned_type X_, unsigned_type CriticalSize_>
00860     struct compute_N
00861     {
00862         typedef compute_N<AI_, X_, CriticalSize_> Self;
00863         enum
00864         {
00865             X = X_,
00866             AI = AI_,
00867             N = X / (AI * AI) // two stage internal
00868         };
00869         typedef typename IF<(N >= CriticalSize_), Self, typename compute_N<AI / 2, X, CriticalSize_>::result>::result result;
00870     };
00871 
00872     template <unsigned_type X_, unsigned_type CriticalSize_>
00873     struct compute_N<1, X_, CriticalSize_>
00874     {
00875         typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
00876     };
00877 }
00878 
00880 
00883 
00885 
00949 template <class Tp_, class Cmp_, unsigned_type IntM_, unsigned MaxS_, unsigned Tune_ = 6>
00950 class PRIORITY_QUEUE_GENERATOR
00951 {
00952 public:
00953     // actual calculation of B, m, k and element_size
00954     typedef typename priority_queue_local::find_settings<sizeof(Tp_), IntM_, MaxS_>::result settings;
00955     enum {
00956         B = settings::B,
00957         m = settings::m,
00958         X = B * (settings::k - m) / settings::element_size,  // interpretation of result
00959         Buffer1Size = 32                                     // fixed
00960     };
00961     // derivation of N, AI, AE
00962     typedef typename priority_queue_local::compute_N<(1 << Tune_), X, 4 * Buffer1Size>::result ComputeN;
00963     enum
00964     {
00965         N = ComputeN::N,
00966         AI = ComputeN::AI,
00967         AE = (m / 2 < 2) ? 2 : (m / 2)            // at least 2
00968     };
00969     enum {
00970         // Estimation of maximum internal memory consumption (in bytes)
00971         EConsumption = X * settings::element_size + settings::B * AE + ((MaxS_ / X) / AE) * settings::B * 1024
00972     };
00973     /*
00974         unsigned BufferSize1_ = 32, // equalize procedure call overheads etc.
00975         unsigned N_ = 512,          // bandwidth
00976         unsigned IntKMAX_ = 64,     // maximal arity for internal mergers
00977         unsigned IntLevels_ = 4,
00978         unsigned BlockSize_ = (2*1024*1024),
00979         unsigned ExtKMAX_ = 64,     // maximal arity for external mergers
00980         unsigned ExtLevels_ = 2,
00981      */
00982     typedef priority_queue<priority_queue_config<Tp_, Cmp_, Buffer1Size, N, AI, 2, B, AE, 2> > result;
00983 };
00984 
00986 
00987 __STXXL_END_NAMESPACE
00988 
00989 
00990 namespace std
00991 {
00992     template <class Config_>
00993     void swap(stxxl::priority_queue<Config_> & a,
00994               stxxl::priority_queue<Config_> & b)
00995     {
00996         a.swap(b);
00997     }
00998 }
00999 
01000 #endif // !STXXL_PRIORITY_QUEUE_HEADER
01001 // vim: et:ts=4:sw=4

Generated on Sun Oct 17 2010 06:13:44 for Stxxl by  doxygen 1.7.1