00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #ifndef STXXL_PRIORITY_QUEUE_HEADER
00017 #define STXXL_PRIORITY_QUEUE_HEADER
00018
00019 #include <vector>
00020
00021 #include <stxxl/bits/mng/typed_block.h>
00022 #include <stxxl/bits/mng/block_alloc.h>
00023 #include <stxxl/bits/mng/read_write_pool.h>
00024 #include <stxxl/bits/mng/prefetch_pool.h>
00025 #include <stxxl/bits/mng/write_pool.h>
00026 #include <stxxl/bits/common/tmeta.h>
00027 #include <stxxl/bits/algo/sort_base.h>
00028 #include <stxxl/bits/parallel.h>
00029 #include <stxxl/bits/common/is_sorted.h>
00030
00031 #if defined(_GLIBCXX_PARALLEL) && ((__GNUC__ * 10000 + __GNUC_MINOR__ * 100) < 40400)
00032 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00033 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00034 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00035 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 0
00036 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 0
00037 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 0
00038 #endif
00039
00040 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00041 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 1
00042 #endif
00043 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00044 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 1
00045 #endif
00046 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00047 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 1
00048 #endif
00049
00050 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00051 #define STXXL_PQ_EXTERNAL_LOSER_TREE 0 // no loser tree for the external sequences
00052 #else
00053 #define STXXL_PQ_EXTERNAL_LOSER_TREE 1
00054 #endif
00055
00056 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00057 #define STXXL_PQ_INTERNAL_LOSER_TREE 0 // no loser tree for the internal sequences
00058 #else
00059 #define STXXL_PQ_INTERNAL_LOSER_TREE 1
00060 #endif
00061
00062 #include <stxxl/bits/containers/pq_helpers.h>
00063 #include <stxxl/bits/containers/pq_mergers.h>
00064 #include <stxxl/bits/containers/pq_ext_merger.h>
00065 #include <stxxl/bits/containers/pq_losertree.h>
00066
00067 __STXXL_BEGIN_NAMESPACE
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079 template <
00080 class Tp_,
00081 class Cmp_,
00082 unsigned BufferSize1_ = 32,
00083 unsigned N_ = 512,
00084 unsigned IntKMAX_ = 64,
00085 unsigned IntLevels_ = 4,
00086 unsigned BlockSize_ = (2 * 1024 * 1024),
00087 unsigned ExtKMAX_ = 64,
00088 unsigned ExtLevels_ = 2,
00089 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY
00090 >
00091 struct priority_queue_config
00092 {
00093 typedef Tp_ value_type;
00094 typedef Cmp_ comparator_type;
00095 typedef AllocStr_ alloc_strategy_type;
00096 enum
00097 {
00098 delete_buffer_size = BufferSize1_,
00099 N = N_,
00100 IntKMAX = IntKMAX_,
00101 num_int_groups = IntLevels_,
00102 num_ext_groups = ExtLevels_,
00103 BlockSize = BlockSize_,
00104 ExtKMAX = ExtKMAX_,
00105 element_size = sizeof(Tp_)
00106 };
00107 };
00108
00109 __STXXL_END_NAMESPACE
00110
00111 namespace std
00112 {
00113 template <class BlockType_,
00114 class Cmp_,
00115 unsigned Arity_,
00116 class AllocStr_>
00117 void swap(stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & a,
00118 stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & b)
00119 {
00120 a.swap(b);
00121 }
00122 template <class ValTp_, class Cmp_, unsigned KNKMAX>
00123 void swap(stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & a,
00124 stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & b)
00125 {
00126 a.swap(b);
00127 }
00128 }
00129
00130 __STXXL_BEGIN_NAMESPACE
00131
00133 template <class Config_>
00134 class priority_queue : private noncopyable
00135 {
00136 public:
00137 typedef Config_ Config;
00138 enum
00139 {
00140 delete_buffer_size = Config::delete_buffer_size,
00141 N = Config::N,
00142 IntKMAX = Config::IntKMAX,
00143 num_int_groups = Config::num_int_groups,
00144 num_ext_groups = Config::num_ext_groups,
00145 total_num_groups = Config::num_int_groups + Config::num_ext_groups,
00146 BlockSize = Config::BlockSize,
00147 ExtKMAX = Config::ExtKMAX
00148 };
00149
00151 typedef typename Config::value_type value_type;
00153 typedef typename Config::comparator_type comparator_type;
00154 typedef typename Config::alloc_strategy_type alloc_strategy_type;
00156 typedef stxxl::uint64 size_type;
00157 typedef typed_block<BlockSize, value_type> block_type;
00158 typedef read_write_pool<block_type> pool_type;
00159
00160 protected:
00161 typedef priority_queue_local::internal_priority_queue<value_type, std::vector<value_type>, comparator_type>
00162 insert_heap_type;
00163
00164 typedef priority_queue_local::loser_tree<
00165 value_type,
00166 comparator_type,
00167 IntKMAX> int_merger_type;
00168
00169 typedef priority_queue_local::ext_merger<
00170 block_type,
00171 comparator_type,
00172 ExtKMAX,
00173 alloc_strategy_type> ext_merger_type;
00174
00175
00176 int_merger_type int_mergers[num_int_groups];
00177 pool_type * pool;
00178 bool pool_owned;
00179 ext_merger_type * ext_mergers;
00180
00181
00182 value_type group_buffers[total_num_groups][N + 1];
00183 value_type * group_buffer_current_mins[total_num_groups];
00184
00185
00186 value_type delete_buffer[delete_buffer_size + 1];
00187 value_type * delete_buffer_current_min;
00188 value_type * delete_buffer_end;
00189
00190 comparator_type cmp;
00191
00192
00193 insert_heap_type insert_heap;
00194
00195
00196 unsigned_type num_active_groups;
00197
00198
00199 size_type size_;
00200
00201 private:
00202 void init();
00203
00204 void refill_delete_buffer();
00205 unsigned_type refill_group_buffer(unsigned_type k);
00206
00207 unsigned_type make_space_available(unsigned_type level);
00208 void empty_insert_heap();
00209
00210 value_type get_supremum() const { return cmp.min_value(); }
00211 unsigned_type current_delete_buffer_size() const { return delete_buffer_end - delete_buffer_current_min; }
00212 unsigned_type current_group_buffer_size(unsigned_type i) const { return &(group_buffers[i][N]) - group_buffer_current_mins[i]; }
00213
00214 public:
00220 priority_queue(pool_type & pool_);
00221
00231 _STXXL_DEPRECATED(priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_));
00232
00242 priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem);
00243
00246 void swap(priority_queue & obj)
00247 {
00248
00249 for (unsigned_type i = 0; i < num_int_groups; ++i)
00250 std::swap(int_mergers[i], obj.int_mergers[i]);
00251
00252
00253
00254 std::swap(ext_mergers, obj.ext_mergers);
00255 for (unsigned_type i1 = 0; i1 < total_num_groups; ++i1)
00256 for (unsigned_type i2 = 0; i2 < (N + 1); ++i2)
00257 std::swap(group_buffers[i1][i2], obj.group_buffers[i1][i2]);
00258
00259 swap_1D_arrays(group_buffer_current_mins, obj.group_buffer_current_mins, total_num_groups);
00260 swap_1D_arrays(delete_buffer, obj.delete_buffer, delete_buffer_size + 1);
00261 std::swap(delete_buffer_current_min, obj.delete_buffer_current_min);
00262 std::swap(delete_buffer_end, obj.delete_buffer_end);
00263 std::swap(cmp, obj.cmp);
00264 std::swap(insert_heap, obj.insert_heap);
00265 std::swap(num_active_groups, obj.num_active_groups);
00266 std::swap(size_, obj.size_);
00267 }
00268
00269 virtual ~priority_queue();
00270
00273 size_type size() const;
00274
00277 bool empty() const { return (size() == 0); }
00278
00290 const value_type & top() const;
00291
00298 void pop();
00299
00304 void push(const value_type & obj);
00305
00310 unsigned_type mem_cons() const
00311 {
00312 unsigned_type dynam_alloc_mem = 0;
00313
00314
00315 for (int i = 0; i < num_int_groups; ++i)
00316 dynam_alloc_mem += int_mergers[i].mem_cons();
00317
00318 for (int i = 0; i < num_ext_groups; ++i)
00319 dynam_alloc_mem += ext_mergers[i].mem_cons();
00320
00321
00322 return (sizeof(*this) +
00323 sizeof(ext_merger_type) * num_ext_groups +
00324 dynam_alloc_mem);
00325 }
00326 };
00327
00328
00329 template <class Config_>
00330 inline typename priority_queue<Config_>::size_type priority_queue<Config_>::size() const
00331 {
00332 return size_ +
00333 insert_heap.size() - 1 +
00334 (delete_buffer_end - delete_buffer_current_min);
00335 }
00336
00337
00338 template <class Config_>
00339 inline const typename priority_queue<Config_>::value_type & priority_queue<Config_>::top() const
00340 {
00341 assert(!insert_heap.empty());
00342
00343 const typename priority_queue<Config_>::value_type & t = insert_heap.top();
00344 if ( cmp(*delete_buffer_current_min, t))
00345 return t;
00346 else
00347 return *delete_buffer_current_min;
00348 }
00349
00350 template <class Config_>
00351 inline void priority_queue<Config_>::pop()
00352 {
00353
00354 assert(!insert_heap.empty());
00355
00356 if ( cmp(*delete_buffer_current_min, insert_heap.top()))
00357 insert_heap.pop();
00358 else
00359 {
00360 assert(delete_buffer_current_min < delete_buffer_end);
00361 ++delete_buffer_current_min;
00362 if (delete_buffer_current_min == delete_buffer_end)
00363 refill_delete_buffer();
00364 }
00365 }
00366
00367 template <class Config_>
00368 inline void priority_queue<Config_>::push(const value_type & obj)
00369 {
00370
00371 assert(int_mergers->not_sentinel(obj));
00372 if (insert_heap.size() == N + 1)
00373 empty_insert_heap();
00374
00375
00376 assert(!insert_heap.empty());
00377
00378 insert_heap.push(obj);
00379 }
00380
00381
00383
00384 template <class Config_>
00385 priority_queue<Config_>::priority_queue(pool_type & pool_) :
00386 pool(&pool_),
00387 pool_owned(false),
00388 delete_buffer_end(delete_buffer + delete_buffer_size),
00389 insert_heap(N + 2),
00390 num_active_groups(0), size_(0)
00391 {
00392 STXXL_VERBOSE2("priority_queue::priority_queue(pool)");
00393 init();
00394 }
00395
00396
00397 template <class Config_>
00398 priority_queue<Config_>::priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_) :
00399 pool(new pool_type(p_pool_, w_pool_)),
00400 pool_owned(true),
00401 delete_buffer_end(delete_buffer + delete_buffer_size),
00402 insert_heap(N + 2),
00403 num_active_groups(0), size_(0)
00404 {
00405 STXXL_VERBOSE2("priority_queue::priority_queue(p_pool, w_pool)");
00406 init();
00407 }
00408
00409 template <class Config_>
00410 priority_queue<Config_>::priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem) :
00411 pool(new pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
00412 pool_owned(true),
00413 delete_buffer_end(delete_buffer + delete_buffer_size),
00414 insert_heap(N + 2),
00415 num_active_groups(0), size_(0)
00416 {
00417 STXXL_VERBOSE2("priority_queue::priority_queue(pool sizes)");
00418 init();
00419 }
00420
00421 template <class Config_>
00422 void priority_queue<Config_>::init()
00423 {
00424 assert(!cmp(cmp.min_value(), cmp.min_value()));
00425
00426 ext_mergers = new ext_merger_type[num_ext_groups];
00427 for (unsigned_type j = 0; j < num_ext_groups; ++j)
00428 ext_mergers[j].set_pool(pool);
00429
00430 value_type sentinel = cmp.min_value();
00431 insert_heap.push(sentinel);
00432 delete_buffer[delete_buffer_size] = sentinel;
00433 delete_buffer_current_min = delete_buffer_end;
00434 for (unsigned_type i = 0; i < total_num_groups; i++)
00435 {
00436 group_buffers[i][N] = sentinel;
00437 group_buffer_current_mins[i] = &(group_buffers[i][N]);
00438 }
00439 }
00440
00441 template <class Config_>
00442 priority_queue<Config_>::~priority_queue()
00443 {
00444 STXXL_VERBOSE2("priority_queue::~priority_queue()");
00445 if (pool_owned)
00446 delete pool;
00447
00448 delete[] ext_mergers;
00449 }
00450
00451
00452
00453
00454 template <class Config_>
00455 unsigned_type priority_queue<Config_>::refill_group_buffer(unsigned_type group)
00456 {
00457 STXXL_VERBOSE2("priority_queue::refill_group_buffer(" << group << ")");
00458
00459 value_type * target;
00460 unsigned_type length;
00461 size_type group_size = (group < num_int_groups) ?
00462 int_mergers[group].size() :
00463 ext_mergers[group - num_int_groups].size();
00464 unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group];
00465 if (group_size + left_elements >= size_type(N))
00466 {
00467 target = group_buffers[group];
00468 length = N - left_elements;
00469 }
00470 else
00471 {
00472 target = group_buffers[group] + N - group_size - left_elements;
00473 length = group_size;
00474 }
00475
00476 if (length > 0)
00477 {
00478
00479 memmove(target, group_buffer_current_mins[group], left_elements * sizeof(value_type));
00480 group_buffer_current_mins[group] = target;
00481
00482
00483 if (group < num_int_groups)
00484 int_mergers[group].multi_merge(target + left_elements, length);
00485 else
00486 ext_mergers[group - num_int_groups].multi_merge(
00487 target + left_elements,
00488 target + left_elements + length);
00489 }
00490
00491
00492
00493 #if STXXL_CHECK_ORDER_IN_SORTS
00494 priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp);
00495 if (!stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
00496 {
00497 STXXL_VERBOSE2("length: " << length << " left_elements: " << left_elements);
00498 for (value_type * v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
00499 {
00500 if (inv_cmp(*v, *(v - 1)))
00501 {
00502 STXXL_MSG("Error in buffer " << group << " at position " << (v - group_buffer_current_mins[group] - 1) << "/" << (v - group_buffer_current_mins[group]) << " " << *(v - 2) << " " << *(v - 1) << " " << *v << " " << *(v + 1));
00503 }
00504 }
00505 assert(false);
00506 }
00507 #endif
00508
00509 return length + left_elements;
00510 }
00511
00512
00513 template <class Config_>
00514 void priority_queue<Config_>::refill_delete_buffer()
00515 {
00516 STXXL_VERBOSE2("priority_queue::refill_delete_buffer()");
00517
00518 size_type total_group_size = 0;
00519
00520 for (int i = num_active_groups - 1; i >= 0; i--)
00521 {
00522 if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
00523 {
00524 unsigned_type length = refill_group_buffer(i);
00525
00526 if (length == 0 && unsigned(i) == num_active_groups - 1)
00527 --num_active_groups;
00528
00529 total_group_size += length;
00530 }
00531 else
00532 total_group_size += delete_buffer_size;
00533 }
00534
00535 unsigned_type length;
00536 if (total_group_size >= delete_buffer_size)
00537 {
00538 length = delete_buffer_size;
00539 size_ -= size_type(delete_buffer_size);
00540 }
00541 else
00542 {
00543 length = total_group_size;
00544 assert(size_ == size_type(length));
00545 size_ = 0;
00546 }
00547
00548 priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp);
00549
00550
00551
00552
00553 delete_buffer_current_min = delete_buffer_end - length;
00554 STXXL_VERBOSE2("Active groups = " << num_active_groups);
00555 switch (num_active_groups)
00556 {
00557 case 0:
00558 break;
00559 case 1:
00560 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
00561 group_buffer_current_mins[0] += length;
00562 break;
00563 case 2:
00564 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00565 {
00566 std::pair<value_type *, value_type *> seqs[2] =
00567 {
00568 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00569 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
00570 };
00571 parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length);
00572
00573 group_buffer_current_mins[0] = seqs[0].first;
00574 group_buffer_current_mins[1] = seqs[1].first;
00575 }
00576 #else
00577 priority_queue_local::merge_iterator(
00578 group_buffer_current_mins[0],
00579 group_buffer_current_mins[1], delete_buffer_current_min, length, cmp);
00580 #endif
00581 break;
00582 case 3:
00583 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00584 {
00585 std::pair<value_type *, value_type *> seqs[3] =
00586 {
00587 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00588 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
00589 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
00590 };
00591 parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length);
00592
00593 group_buffer_current_mins[0] = seqs[0].first;
00594 group_buffer_current_mins[1] = seqs[1].first;
00595 group_buffer_current_mins[2] = seqs[2].first;
00596 }
00597 #else
00598 priority_queue_local::merge3_iterator(
00599 group_buffer_current_mins[0],
00600 group_buffer_current_mins[1],
00601 group_buffer_current_mins[2], delete_buffer_current_min, length, cmp);
00602 #endif
00603 break;
00604 case 4:
00605 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00606 {
00607 std::pair<value_type *, value_type *> seqs[4] =
00608 {
00609 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00610 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
00611 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
00612 std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
00613 };
00614 parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length);
00615
00616 group_buffer_current_mins[0] = seqs[0].first;
00617 group_buffer_current_mins[1] = seqs[1].first;
00618 group_buffer_current_mins[2] = seqs[2].first;
00619 group_buffer_current_mins[3] = seqs[3].first;
00620 }
00621 #else
00622 priority_queue_local::merge4_iterator(
00623 group_buffer_current_mins[0],
00624 group_buffer_current_mins[1],
00625 group_buffer_current_mins[2],
00626 group_buffer_current_mins[3], delete_buffer_current_min, length, cmp);
00627 #endif
00628 break;
00629 default:
00630 STXXL_THROW(std::runtime_error, "priority_queue<...>::refill_delete_buffer()",
00631 "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
00632 }
00633
00634 #if STXXL_CHECK_ORDER_IN_SORTS
00635 if (!stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
00636 {
00637 for (value_type * v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
00638 {
00639 if (inv_cmp(*v, *(v - 1)))
00640 {
00641 STXXL_MSG("Error at position " << (v - delete_buffer_current_min - 1) << "/" << (v - delete_buffer_current_min) << " " << *(v - 1) << " " << *v);
00642 }
00643 }
00644 assert(false);
00645 }
00646 #endif
00647
00648 }
00649
00650
00651
00652
00653
00654
00655 template <class Config_>
00656 unsigned_type priority_queue<Config_>::make_space_available(unsigned_type level)
00657 {
00658 STXXL_VERBOSE2("priority_queue::make_space_available(" << level << ")");
00659 unsigned_type finalLevel;
00660 assert(level < total_num_groups);
00661 assert(level <= num_active_groups);
00662
00663 if (level == num_active_groups)
00664 ++num_active_groups;
00665
00666 const bool spaceIsAvailable_ =
00667 (level < num_int_groups) ? int_mergers[level].is_space_available()
00668 : ((level == total_num_groups - 1) ? true : (ext_mergers[level - num_int_groups].is_space_available()));
00669
00670 if (spaceIsAvailable_)
00671 {
00672 finalLevel = level;
00673 }
00674 else
00675 {
00676 finalLevel = make_space_available(level + 1);
00677
00678 if (level < num_int_groups - 1)
00679 {
00680 unsigned_type segmentSize = int_mergers[level].size();
00681 value_type * newSegment = new value_type[segmentSize + 1];
00682 int_mergers[level].multi_merge(newSegment, segmentSize);
00683
00684 newSegment[segmentSize] = delete_buffer[delete_buffer_size];
00685
00686
00687
00688 int_mergers[level + 1].insert_segment(newSegment, segmentSize);
00689 }
00690 else
00691 {
00692 if (level == num_int_groups - 1)
00693 {
00694 const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
00695 STXXL_VERBOSE1("Inserting segment into first level external: " << level << " " << segmentSize);
00696 ext_mergers[0].insert_segment(int_mergers[num_int_groups - 1], segmentSize);
00697 }
00698 else
00699 {
00700 const size_type segmentSize = ext_mergers[level - num_int_groups].size();
00701 STXXL_VERBOSE1("Inserting segment into second level external: " << level << " " << segmentSize);
00702 ext_mergers[level - num_int_groups + 1].insert_segment(ext_mergers[level - num_int_groups], segmentSize);
00703 }
00704 }
00705 }
00706 return finalLevel;
00707 }
00708
00709
00710
00711 template <class Config_>
00712 void priority_queue<Config_>::empty_insert_heap()
00713 {
00714 STXXL_VERBOSE2("priority_queue::empty_insert_heap()");
00715 assert(insert_heap.size() == (N + 1));
00716
00717 const value_type sup = get_supremum();
00718
00719
00720 value_type * newSegment = new value_type[N + 1];
00721 value_type * newPos = newSegment;
00722
00723
00724
00725 value_type * SortTo = newSegment;
00726
00727 insert_heap.sort_to(SortTo);
00728
00729 SortTo = newSegment + N;
00730 insert_heap.clear();
00731 insert_heap.push(*SortTo);
00732
00733 assert(insert_heap.size() == 1);
00734
00735 newSegment[N] = sup;
00736
00737
00738
00739 const unsigned_type tempSize = N + delete_buffer_size;
00740 value_type temp[tempSize + 1];
00741 unsigned_type sz1 = current_delete_buffer_size();
00742 unsigned_type sz2 = current_group_buffer_size(0);
00743 value_type * pos = temp + tempSize - sz1 - sz2;
00744 std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
00745 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
00746 temp[tempSize] = sup;
00747
00748
00749
00750
00751 priority_queue_local::merge_iterator(pos, newPos, delete_buffer_current_min, sz1, cmp);
00752
00753
00754
00755
00756 priority_queue_local::merge_iterator(pos, newPos, group_buffer_current_mins[0], sz2, cmp);
00757
00758
00759
00760
00761 priority_queue_local::merge_iterator(pos, newPos, newSegment, N, cmp);
00762
00763
00764 unsigned_type freeLevel = make_space_available(0);
00765 assert(freeLevel == 0 || int_mergers[0].size() == 0);
00766 int_mergers[0].insert_segment(newSegment, N);
00767
00768
00769
00770 if (freeLevel > 0)
00771 {
00772 for (int_type i = freeLevel; i >= 0; i--)
00773 {
00774
00775
00776
00777 newSegment = new value_type[current_group_buffer_size(i) + 1];
00778 std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
00779 int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i));
00780 group_buffer_current_mins[i] = group_buffers[i] + N;
00781 }
00782 }
00783
00784
00785 size_ += size_type(N);
00786
00787
00788 if (delete_buffer_current_min == delete_buffer_end)
00789 refill_delete_buffer();
00790 }
00791
00792 namespace priority_queue_local
00793 {
00794 struct Parameters_for_priority_queue_not_found_Increase_IntM
00795 {
00796 enum { fits = false };
00797 typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
00798 };
00799
00800 struct dummy
00801 {
00802 enum { fits = false };
00803 typedef dummy result;
00804 };
00805
00806 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_, bool stop = false>
00807 struct find_B_m
00808 {
00809 typedef find_B_m<E_, IntM_, MaxS_, B_, m_, stop> Self;
00810 enum {
00811 k = IntM_ / B_,
00812 element_size = E_,
00813 IntM = IntM_,
00814 B = B_,
00815 m = m_,
00816 c = k - m_,
00817
00818
00819 fits = c > 10 && ((k - m) * (m) * (m * B / (element_size * 4 * 1024))) >= MaxS_
00820 && ((MaxS_ < ((k - m) * m / (2 * element_size)) * 1024) || m >= 128),
00821 step = 1
00822 };
00823
00824 typedef typename find_B_m<element_size, IntM, MaxS_, B, m + step, fits || (m >= k - step)>::result candidate1;
00825 typedef typename find_B_m<element_size, IntM, MaxS_, B / 2, 1, fits || candidate1::fits>::result candidate2;
00826 typedef typename IF<fits, Self, typename IF<candidate1::fits, candidate1, candidate2>::result>::result result;
00827 };
00828
00829
00830 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, bool stop>
00831 struct find_B_m<E_, IntM_, MaxS_, 2048, 1, stop>
00832 {
00833 enum { fits = false };
00834 typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
00835 };
00836
00837
00838 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_>
00839 struct find_B_m<E_, IntM_, MaxS_, B_, m_, true>
00840 {
00841 enum { fits = false };
00842 typedef dummy result;
00843 };
00844
00845
00846 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_>
00847 struct find_settings
00848 {
00849
00850 typedef typename find_B_m<E_, IntM_, MaxS_, (8 * 1024 * 1024), 1>::result result;
00851 };
00852
00853 struct Parameters_not_found_Try_to_change_the_Tune_parameter
00854 {
00855 typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
00856 };
00857
00858
00859 template <unsigned_type AI_, unsigned_type X_, unsigned_type CriticalSize_>
00860 struct compute_N
00861 {
00862 typedef compute_N<AI_, X_, CriticalSize_> Self;
00863 enum
00864 {
00865 X = X_,
00866 AI = AI_,
00867 N = X / (AI * AI)
00868 };
00869 typedef typename IF<(N >= CriticalSize_), Self, typename compute_N<AI / 2, X, CriticalSize_>::result>::result result;
00870 };
00871
00872 template <unsigned_type X_, unsigned_type CriticalSize_>
00873 struct compute_N<1, X_, CriticalSize_>
00874 {
00875 typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
00876 };
00877 }
00878
00880
00883
00885
00949 template <class Tp_, class Cmp_, unsigned_type IntM_, unsigned MaxS_, unsigned Tune_ = 6>
00950 class PRIORITY_QUEUE_GENERATOR
00951 {
00952 public:
00953
00954 typedef typename priority_queue_local::find_settings<sizeof(Tp_), IntM_, MaxS_>::result settings;
00955 enum {
00956 B = settings::B,
00957 m = settings::m,
00958 X = B * (settings::k - m) / settings::element_size,
00959 Buffer1Size = 32
00960 };
00961
00962 typedef typename priority_queue_local::compute_N<(1 << Tune_), X, 4 * Buffer1Size>::result ComputeN;
00963 enum
00964 {
00965 N = ComputeN::N,
00966 AI = ComputeN::AI,
00967 AE = (m / 2 < 2) ? 2 : (m / 2)
00968 };
00969 enum {
00970
00971 EConsumption = X * settings::element_size + settings::B * AE + ((MaxS_ / X) / AE) * settings::B * 1024
00972 };
00973
00974
00975
00976
00977
00978
00979
00980
00981
00982 typedef priority_queue<priority_queue_config<Tp_, Cmp_, Buffer1Size, N, AI, 2, B, AE, 2> > result;
00983 };
00984
00986
00987 __STXXL_END_NAMESPACE
00988
00989
00990 namespace std
00991 {
00992 template <class Config_>
00993 void swap(stxxl::priority_queue<Config_> & a,
00994 stxxl::priority_queue<Config_> & b)
00995 {
00996 a.swap(b);
00997 }
00998 }
00999
01000 #endif // !STXXL_PRIORITY_QUEUE_HEADER
01001