00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #ifndef STXXL_SORT_STREAM_HEADER
00016 #define STXXL_SORT_STREAM_HEADER
00017
00018 #ifdef STXXL_BOOST_CONFIG
00019 #include <boost/config.hpp>
00020 #endif
00021
00022 #include <stxxl/bits/stream/stream.h>
00023 #include <stxxl/bits/mng/mng.h>
00024 #include <stxxl/bits/algo/sort_base.h>
00025 #include <stxxl/bits/algo/sort_helper.h>
00026 #include <stxxl/bits/algo/adaptor.h>
00027 #include <stxxl/bits/algo/run_cursor.h>
00028 #include <stxxl/bits/algo/losertree.h>
00029 #include <stxxl/bits/stream/sorted_runs.h>
00030
00031
00032 __STXXL_BEGIN_NAMESPACE
00033
00034 namespace stream
00035 {
00038
00039
00041
00043
00051 template <
00052 class Input_,
00053 class Cmp_,
00054 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00055 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00056 class basic_runs_creator : private noncopyable
00057 {
00058 protected:
00059 Input_ & input;
00060 Cmp_ cmp;
00061
00062 public:
00063 typedef Cmp_ cmp_type;
00064 typedef typename Input_::value_type value_type;
00065 typedef typed_block<BlockSize_, value_type> block_type;
00066 typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00067 typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00068
00069 private:
00070 typedef typename sorted_runs_type::run_type run_type;
00071 sorted_runs_type result_;
00072 unsigned_type m_;
00073 bool result_computed;
00074
00076 unsigned_type fetch(block_type * blocks, unsigned_type first_idx, unsigned_type last_idx)
00077 {
00078 typename element_iterator_traits<block_type>::element_iterator output =
00079 make_element_iterator(blocks, first_idx);
00080 unsigned_type curr_idx = first_idx;
00081 while (!input.empty() && curr_idx != last_idx) {
00082 *output = *input;
00083 ++input;
00084 ++output;
00085 ++curr_idx;
00086 }
00087 return curr_idx;
00088 }
00089
00090 void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
00091 {
00092 unsigned_type last_idx = num_blocks * block_type::size;
00093 if (first_idx < last_idx) {
00094 typename element_iterator_traits<block_type>::element_iterator curr =
00095 make_element_iterator(blocks, first_idx);
00096 while (first_idx != last_idx) {
00097 *curr = cmp.max_value();
00098 ++curr;
00099 ++first_idx;
00100 }
00101 }
00102 }
00103
00105 void sort_run(block_type * run, unsigned_type elements)
00106 {
00107 std::sort(make_element_iterator(run, 0),
00108 make_element_iterator(run, elements),
00109 cmp);
00110 }
00111
00112 void compute_result();
00113
00114 public:
00119 basic_runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) :
00120 input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false)
00121 {
00122 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00123 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00124 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00125 }
00126 assert(m_ > 0);
00127 }
00128
00132 const sorted_runs_type & result()
00133 {
00134 if (!result_computed)
00135 {
00136 compute_result();
00137 result_computed = true;
00138 #ifdef STXXL_PRINT_STAT_AFTER_RF
00139 STXXL_MSG(*stats::get_instance());
00140 #endif //STXXL_PRINT_STAT_AFTER_RF
00141 }
00142 return result_;
00143 }
00144 };
00145
00149 template <class Input_, class Cmp_, unsigned BlockSize_, class AllocStr_>
00150 void basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result()
00151 {
00152 unsigned_type i = 0;
00153 unsigned_type m2 = m_ / 2;
00154 const unsigned_type el_in_run = m2 * block_type::size;
00155 STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2);
00156 unsigned_type blocks1_length = 0, blocks2_length = 0;
00157 block_type * Blocks1 = NULL;
00158
00159 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
00160 Blocks1 = new block_type[m2 * 2];
00161 #else
00162 while (!input.empty() && blocks1_length != block_type::size)
00163 {
00164 result_.small_.push_back(*input);
00165 ++input;
00166 ++blocks1_length;
00167 }
00168
00169 if (blocks1_length == block_type::size && !input.empty())
00170 {
00171 Blocks1 = new block_type[m2 * 2];
00172 std::copy(result_.small_.begin(), result_.small_.end(), Blocks1[0].begin());
00173 result_.small_.clear();
00174 }
00175 else
00176 {
00177 STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
00178 result_.elements = blocks1_length;
00179 std::sort(result_.small_.begin(), result_.small_.end(), cmp);
00180 return;
00181 }
00182 #endif //STXXL_SMALL_INPUT_PSORT_OPT
00183
00184
00185 blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
00186
00187
00188 sort_run(Blocks1, blocks1_length);
00189 if (blocks1_length <= block_type::size && input.empty())
00190 {
00191
00192 STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
00193 assert(result_.small_.empty());
00194 result_.small_.insert(result_.small_.end(), Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
00195 result_.elements = blocks1_length;
00196 delete[] Blocks1;
00197 return;
00198 }
00199
00200 block_type * Blocks2 = Blocks1 + m2;
00201 block_manager * bm = block_manager::get_instance();
00202 request_ptr * write_reqs = new request_ptr[m2];
00203 run_type run;
00204
00205 unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size);
00206 run.resize(cur_run_size);
00207 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00208
00209 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00210
00211
00212 fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
00213
00214 for (i = 0; i < cur_run_size; ++i)
00215 {
00216 run[i].value = Blocks1[i][0];
00217 write_reqs[i] = Blocks1[i].write(run[i].bid);
00218 }
00219 result_.runs.push_back(run);
00220 result_.runs_sizes.push_back(blocks1_length);
00221 result_.elements += blocks1_length;
00222
00223 if (input.empty())
00224 {
00225
00226 wait_all(write_reqs, write_reqs + cur_run_size);
00227 delete[] write_reqs;
00228 delete[] Blocks1;
00229 return;
00230 }
00231
00232 STXXL_VERBOSE1("Filling the second part of the allocated blocks");
00233 blocks2_length = fetch(Blocks2, 0, el_in_run);
00234
00235 if (input.empty())
00236 {
00237
00238
00239 blocks2_length += el_in_run;
00240 sort_run(Blocks1, blocks2_length);
00241 wait_all(write_reqs, write_reqs + cur_run_size);
00242 bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00243
00244 cur_run_size = div_ceil(blocks2_length, block_type::size);
00245 run.resize(cur_run_size);
00246 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00247
00248
00249 fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
00250
00251 assert(cur_run_size > m2);
00252
00253 for (i = 0; i < m2; ++i)
00254 {
00255 run[i].value = Blocks1[i][0];
00256 write_reqs[i]->wait();
00257 write_reqs[i] = Blocks1[i].write(run[i].bid);
00258 }
00259
00260 request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2];
00261
00262 for ( ; i < cur_run_size; ++i)
00263 {
00264 run[i].value = Blocks1[i][0];
00265 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
00266 }
00267
00268 result_.runs[0] = run;
00269 result_.runs_sizes[0] = blocks2_length;
00270 result_.elements = blocks2_length;
00271
00272 wait_all(write_reqs, write_reqs + m2);
00273 delete[] write_reqs;
00274 wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
00275 delete[] write_reqs1;
00276
00277 delete[] Blocks1;
00278
00279 return;
00280 }
00281
00282
00283
00284 sort_run(Blocks2, blocks2_length);
00285
00286 cur_run_size = div_ceil(blocks2_length, block_type::size);
00287 run.resize(cur_run_size);
00288 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00289
00290 for (i = 0; i < cur_run_size; ++i)
00291 {
00292 run[i].value = Blocks2[i][0];
00293 write_reqs[i]->wait();
00294 write_reqs[i] = Blocks2[i].write(run[i].bid);
00295 }
00296 assert((blocks2_length % el_in_run) == 0);
00297
00298 result_.runs.push_back(run);
00299 result_.runs_sizes.push_back(blocks2_length);
00300 result_.elements += blocks2_length;
00301
00302 while (!input.empty())
00303 {
00304 blocks1_length = fetch(Blocks1, 0, el_in_run);
00305 sort_run(Blocks1, blocks1_length);
00306 cur_run_size = div_ceil(blocks1_length, block_type::size);
00307 run.resize(cur_run_size);
00308 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00309
00310
00311 fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
00312
00313 for (i = 0; i < cur_run_size; ++i)
00314 {
00315 run[i].value = Blocks1[i][0];
00316 write_reqs[i]->wait();
00317 write_reqs[i] = Blocks1[i].write(run[i].bid);
00318 }
00319 result_.runs.push_back(run);
00320 result_.runs_sizes.push_back(blocks1_length);
00321 result_.elements += blocks1_length;
00322
00323 std::swap(Blocks1, Blocks2);
00324 std::swap(blocks1_length, blocks2_length);
00325 }
00326
00327 wait_all(write_reqs, write_reqs + m2);
00328 delete[] write_reqs;
00329 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00330 }
00331
00339 template <
00340 class Input_,
00341 class Cmp_,
00342 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00343 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00344 class runs_creator : public basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>
00345 {
00346 private:
00347 typedef basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> base;
00348
00349 public:
00350 typedef typename base::block_type block_type;
00351
00352 private:
00353 using base::input;
00354
00355 public:
00360 runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) : base(i, c, memory_to_use)
00361 { }
00362 };
00363
00364
00372 template <class ValueType_>
00373 struct use_push
00374 {
00375 typedef ValueType_ value_type;
00376 };
00377
00389 template <
00390 class ValueType_,
00391 class Cmp_,
00392 unsigned BlockSize_,
00393 class AllocStr_>
00394 class runs_creator<
00395 use_push<ValueType_>,
00396 Cmp_,
00397 BlockSize_,
00398 AllocStr_>
00399 : private noncopyable
00400 {
00401 Cmp_ cmp;
00402
00403 public:
00404 typedef Cmp_ cmp_type;
00405 typedef ValueType_ value_type;
00406 typedef typed_block<BlockSize_, value_type> block_type;
00407 typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00408 typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00409 typedef sorted_runs_type result_type;
00410
00411 private:
00412 typedef typename sorted_runs_type::run_type run_type;
00413 sorted_runs_type result_;
00414 unsigned_type m_;
00415
00416 bool result_computed;
00417
00418 const unsigned_type m2;
00419 const unsigned_type el_in_run;
00420 unsigned_type cur_el;
00421 block_type * Blocks1;
00422 block_type * Blocks2;
00423 request_ptr * write_reqs;
00424 run_type run;
00425
00426 void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
00427 {
00428 unsigned_type last_idx = num_blocks * block_type::size;
00429 if (first_idx < last_idx) {
00430 typename element_iterator_traits<block_type>::element_iterator curr =
00431 make_element_iterator(blocks, first_idx);
00432 while (first_idx != last_idx) {
00433 *curr = cmp.max_value();
00434 ++curr;
00435 ++first_idx;
00436 }
00437 }
00438 }
00439
00440 void sort_run(block_type * run, unsigned_type elements)
00441 {
00442 std::sort(make_element_iterator(run, 0),
00443 make_element_iterator(run, elements),
00444 cmp);
00445 }
00446
00447 void compute_result()
00448 {
00449 if (cur_el == 0)
00450 return;
00451
00452 unsigned_type cur_el_reg = cur_el;
00453 sort_run(Blocks1, cur_el_reg);
00454 result_.elements += cur_el_reg;
00455 if (cur_el_reg <= block_type::size && result_.elements == cur_el_reg)
00456 {
00457
00458 STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << cur_el_reg);
00459 result_.small_.resize(cur_el_reg);
00460 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin());
00461 return;
00462 }
00463
00464 const unsigned_type cur_run_size = div_ceil(cur_el_reg, block_type::size);
00465 run.resize(cur_run_size);
00466 block_manager * bm = block_manager::get_instance();
00467 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00468
00469 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00470
00471 result_.runs_sizes.push_back(cur_el_reg);
00472
00473
00474 fill_with_max_value(Blocks1, cur_run_size, cur_el_reg);
00475
00476 unsigned_type i = 0;
00477 for ( ; i < cur_run_size; ++i)
00478 {
00479 run[i].value = Blocks1[i][0];
00480 if (write_reqs[i].get())
00481 write_reqs[i]->wait();
00482
00483 write_reqs[i] = Blocks1[i].write(run[i].bid);
00484 }
00485 result_.runs.push_back(run);
00486
00487 for (i = 0; i < m2; ++i)
00488 if (write_reqs[i].get())
00489 write_reqs[i]->wait();
00490 }
00491
00492 void cleanup()
00493 {
00494 delete[] write_reqs;
00495 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00496 write_reqs = NULL;
00497 Blocks1 = Blocks2 = NULL;
00498 }
00499
00500 public:
00504 runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00505 cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false),
00506 m2(m_ / 2),
00507 el_in_run(m2 * block_type::size),
00508 cur_el(0),
00509 Blocks1(new block_type[m2 * 2]),
00510 Blocks2(Blocks1 + m2),
00511 write_reqs(new request_ptr[m2])
00512 {
00513 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00514 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00515 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00516 }
00517 assert(m2 > 0);
00518 }
00519
00520 ~runs_creator()
00521 {
00522 if (!result_computed)
00523 cleanup();
00524 }
00525
00528 void push(const value_type & val)
00529 {
00530 assert(result_computed == false);
00531 unsigned_type cur_el_reg = cur_el;
00532 if (cur_el_reg < el_in_run)
00533 {
00534 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = val;
00535 ++cur_el;
00536 return;
00537 }
00538
00539 assert(el_in_run == cur_el);
00540 cur_el = 0;
00541
00542
00543 sort_run(Blocks1, el_in_run);
00544 result_.elements += el_in_run;
00545
00546 const unsigned_type cur_run_size = div_ceil(el_in_run, block_type::size);
00547 run.resize(cur_run_size);
00548 block_manager * bm = block_manager::get_instance();
00549 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00550
00551 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00552
00553 result_.runs_sizes.push_back(el_in_run);
00554
00555 for (unsigned_type i = 0; i < cur_run_size; ++i)
00556 {
00557 run[i].value = Blocks1[i][0];
00558 if (write_reqs[i].get())
00559 write_reqs[i]->wait();
00560
00561 write_reqs[i] = Blocks1[i].write(run[i].bid);
00562 }
00563
00564 result_.runs.push_back(run);
00565
00566 std::swap(Blocks1, Blocks2);
00567
00568 push(val);
00569 }
00570
00574 const sorted_runs_type & result()
00575 {
00576 if (1)
00577 {
00578 if (!result_computed)
00579 {
00580 compute_result();
00581 result_computed = true;
00582 cleanup();
00583 #ifdef STXXL_PRINT_STAT_AFTER_RF
00584 STXXL_MSG(*stats::get_instance());
00585 #endif //STXXL_PRINT_STAT_AFTER_RF
00586 }
00587 }
00588 return result_;
00589 }
00590 };
00591
00592
00599 template <class ValueType_>
00600 struct from_sorted_sequences
00601 {
00602 typedef ValueType_ value_type;
00603 };
00604
00616 template <
00617 class ValueType_,
00618 class Cmp_,
00619 unsigned BlockSize_,
00620 class AllocStr_>
00621 class runs_creator<
00622 from_sorted_sequences<ValueType_>,
00623 Cmp_,
00624 BlockSize_,
00625 AllocStr_>
00626 : private noncopyable
00627 {
00628 typedef ValueType_ value_type;
00629 typedef typed_block<BlockSize_, value_type> block_type;
00630 typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00631 typedef AllocStr_ alloc_strategy_type;
00632 Cmp_ cmp;
00633
00634 public:
00635 typedef Cmp_ cmp_type;
00636 typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00637 typedef sorted_runs_type result_type;
00638
00639 private:
00640 typedef typename sorted_runs_type::run_type run_type;
00641
00642 sorted_runs_type result_;
00643 unsigned_type m_;
00644 buffered_writer<block_type> writer;
00645 block_type * cur_block;
00646 unsigned_type offset;
00647 unsigned_type iblock;
00648 unsigned_type irun;
00649 alloc_strategy_type alloc_strategy;
00650
00651 public:
00656 runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00657 cmp(c),
00658 m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
00659 writer(m_, m_ / 2),
00660 cur_block(writer.get_free_block()),
00661 offset(0),
00662 iblock(0),
00663 irun(0)
00664 {
00665 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00666 assert(m_ > 0);
00667 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00668 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00669 }
00670 }
00671
00674 void push(const value_type & val)
00675 {
00676 assert(offset < block_type::size);
00677
00678 (*cur_block)[offset] = val;
00679 ++offset;
00680
00681 if (offset == block_type::size)
00682 {
00683
00684
00685 block_manager * bm = block_manager::get_instance();
00686
00687 result_.runs.resize(irun + 1);
00688 result_.runs[irun].resize(iblock + 1);
00689 bm->new_blocks(
00690 alloc_strategy,
00691 make_bid_iterator(result_.runs[irun].begin() + iblock),
00692 make_bid_iterator(result_.runs[irun].end()),
00693 iblock
00694 );
00695
00696 result_.runs[irun][iblock].value = (*cur_block)[0];
00697 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00698 ++iblock;
00699
00700 offset = 0;
00701 }
00702
00703 ++result_.elements;
00704 }
00705
00707 void finish()
00708 {
00709 if (offset == 0 && iblock == 0)
00710 return;
00711
00712
00713 result_.runs_sizes.resize(irun + 1);
00714 result_.runs_sizes.back() = iblock * block_type::size + offset;
00715
00716 if (offset)
00717 {
00718 while (offset != block_type::size)
00719 {
00720 (*cur_block)[offset] = cmp.max_value();
00721 ++offset;
00722 }
00723 offset = 0;
00724
00725 block_manager * bm = block_manager::get_instance();
00726
00727 result_.runs.resize(irun + 1);
00728 result_.runs[irun].resize(iblock + 1);
00729 bm->new_blocks(
00730 alloc_strategy,
00731 make_bid_iterator(result_.runs[irun].begin() + iblock),
00732 make_bid_iterator(result_.runs[irun].end()),
00733 iblock
00734 );
00735
00736 result_.runs[irun][iblock].value = (*cur_block)[0];
00737 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00738 }
00739 else
00740 { }
00741
00742 alloc_strategy = alloc_strategy_type();
00743 iblock = 0;
00744 ++irun;
00745 }
00746
00750 const sorted_runs_type & result()
00751 {
00752 finish();
00753 writer.flush();
00754
00755 return result_;
00756 }
00757 };
00758
00759
00764 template <class RunsType_, class Cmp_>
00765 bool check_sorted_runs(const RunsType_ & sruns, Cmp_ cmp)
00766 {
00767 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00768 typedef typename RunsType_::block_type block_type;
00769 typedef typename block_type::value_type value_type;
00770 STXXL_VERBOSE2("Elements: " << sruns.elements);
00771 unsigned_type nruns = sruns.runs.size();
00772 STXXL_VERBOSE2("Runs: " << nruns);
00773 unsigned_type irun = 0;
00774 for (irun = 0; irun < nruns; ++irun)
00775 {
00776 const unsigned_type nblocks = sruns.runs[irun].size();
00777 block_type * blocks = new block_type[nblocks];
00778 request_ptr * reqs = new request_ptr[nblocks];
00779 for (unsigned_type j = 0; j < nblocks; ++j)
00780 {
00781 reqs[j] = blocks[j].read(sruns.runs[irun][j].bid);
00782 }
00783 wait_all(reqs, reqs + nblocks);
00784 for (unsigned_type j = 0; j < nblocks; ++j)
00785 {
00786 if (cmp(blocks[j][0], sruns.runs[irun][j].value) ||
00787 cmp(sruns.runs[irun][j].value, blocks[j][0]))
00788 {
00789 STXXL_ERRMSG("check_sorted_runs wrong trigger in the run");
00790 return false;
00791 }
00792 }
00793 if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
00794 make_element_iterator(blocks, sruns.runs_sizes[irun]),
00795 cmp))
00796 {
00797 STXXL_ERRMSG("check_sorted_runs wrong order in the run");
00798 return false;
00799 }
00800
00801 delete[] reqs;
00802 delete[] blocks;
00803 }
00804
00805 STXXL_MSG("Checking runs finished successfully");
00806
00807 return true;
00808 }
00809
00810
00812
00814
00822 template <class RunsType_,
00823 class Cmp_,
00824 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00825 class basic_runs_merger : private noncopyable
00826 {
00827 protected:
00828 typedef RunsType_ sorted_runs_type;
00829 typedef AllocStr_ alloc_strategy;
00830 typedef typename sorted_runs_type::size_type size_type;
00831 typedef Cmp_ value_cmp;
00832 typedef typename sorted_runs_type::run_type run_type;
00833 typedef typename sorted_runs_type::block_type block_type;
00834 typedef block_type out_block_type;
00835 typedef typename run_type::value_type trigger_entry_type;
00836 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00837 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00838 typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00839 typedef loser_tree<run_cursor_type, run_cursor2_cmp_type> loser_tree_type;
00840 typedef stxxl::int64 diff_type;
00841 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00842 typedef typename std::vector<sequence>::size_type seqs_size_type;
00843
00844 public:
00846 typedef typename sorted_runs_type::value_type value_type;
00847
00848 private:
00849 sorted_runs_type sruns;
00850 value_cmp cmp;
00851 size_type elements_remaining;
00852
00853 out_block_type * current_block;
00854 unsigned_type buffer_pos;
00855 value_type current_value;
00856
00857 run_type consume_seq;
00858 int_type * prefetch_seq;
00859 prefetcher_type * prefetcher;
00860 loser_tree_type * losers;
00861 #if STXXL_PARALLEL_MULTIWAY_MERGE
00862 std::vector<sequence> * seqs;
00863 std::vector<block_type *> * buffers;
00864 diff_type num_currently_mergeable;
00865 #endif
00866
00867 #if STXXL_CHECK_ORDER_IN_SORTS
00868 value_type last_element;
00869 #endif //STXXL_CHECK_ORDER_IN_SORTS
00870
00872
00873 void merge_recursively(unsigned_type memory_to_use);
00874
00875 void deallocate_prefetcher()
00876 {
00877 if (prefetcher)
00878 {
00879 delete losers;
00880 #if STXXL_PARALLEL_MULTIWAY_MERGE
00881 delete seqs;
00882 delete buffers;
00883 #endif
00884 delete prefetcher;
00885 delete[] prefetch_seq;
00886 prefetcher = NULL;
00887 }
00888
00889 sruns.deallocate_blocks();
00890 }
00891
00892 void fill_current_block()
00893 {
00894 STXXL_VERBOSE1("fill_current_block");
00895 if (do_parallel_merge())
00896 {
00897 #if STXXL_PARALLEL_MULTIWAY_MERGE
00898
00899 diff_type rest = out_block_type::size;
00900
00901 do
00902 {
00903 if (num_currently_mergeable < rest)
00904 {
00905 if (!prefetcher || prefetcher->empty())
00906 {
00907
00908 num_currently_mergeable = elements_remaining;
00909 }
00910 else
00911 {
00912 num_currently_mergeable = sort_helper::count_elements_less_equal(
00913 *seqs, consume_seq[prefetcher->pos()].value, cmp);
00914 }
00915 }
00916
00917 diff_type output_size = STXXL_MIN(num_currently_mergeable, rest);
00918
00919 STXXL_VERBOSE1("before merge " << output_size);
00920
00921 stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size);
00922
00923
00924 rest -= output_size;
00925 num_currently_mergeable -= output_size;
00926
00927 STXXL_VERBOSE1("after merge");
00928
00929 sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *prefetcher);
00930 } while (rest > 0 && (*seqs).size() > 0);
00931
00932 #if STXXL_CHECK_ORDER_IN_SORTS
00933 if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp))
00934 {
00935 for (value_type * i = current_block->begin() + 1; i != current_block->end(); ++i)
00936 if (cmp(*i, *(i - 1)))
00937 {
00938 STXXL_VERBOSE1("Error at position " << (i - current_block->begin()));
00939 }
00940 assert(false);
00941 }
00942 #endif //STXXL_CHECK_ORDER_IN_SORTS
00943
00944
00945 #else
00946 STXXL_THROW_UNREACHABLE();
00947 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
00948 }
00949 else
00950 {
00951
00952 losers->multi_merge(current_block->elem, current_block->elem + STXXL_MIN<size_type>(out_block_type::size, elements_remaining));
00953
00954 }
00955 STXXL_VERBOSE1("current block filled");
00956
00957 if (elements_remaining <= out_block_type::size)
00958 deallocate_prefetcher();
00959 }
00960
00961 public:
00966 basic_runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
00967 sruns(r),
00968 cmp(c),
00969 elements_remaining(sruns.elements),
00970 current_block(NULL),
00971 buffer_pos(0),
00972 prefetch_seq(NULL),
00973 prefetcher(NULL),
00974 losers(NULL)
00975 #if STXXL_PARALLEL_MULTIWAY_MERGE
00976 , seqs(NULL),
00977 buffers(NULL),
00978 num_currently_mergeable(0)
00979 #endif
00980 #if STXXL_CHECK_ORDER_IN_SORTS
00981 , last_element(cmp.min_value())
00982 #endif
00983 {
00984 initialize(r, memory_to_use);
00985 }
00986
00987 protected:
00988 void initialize(const sorted_runs_type & r, unsigned_type memory_to_use)
00989 {
00990 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00991
00992 sruns = r;
00993 elements_remaining = r.elements;
00994
00995 if (empty())
00996 return;
00997
00998 if (!sruns.small_run().empty())
00999 {
01000
01001 STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << elements_remaining);
01002 assert(elements_remaining == size_type(sruns.small_run().size()));
01003 assert(sruns.small_run().size() <= out_block_type::size);
01004 current_block = new out_block_type;
01005 std::copy(sruns.small_run().begin(), sruns.small_run().end(), current_block->begin());
01006 current_value = current_block->elem[0];
01007 buffer_pos = 1;
01008
01009 return;
01010 }
01011
01012 #if STXXL_CHECK_ORDER_IN_SORTS
01013 assert(check_sorted_runs(r, cmp));
01014 #endif //STXXL_CHECK_ORDER_IN_SORTS
01015
01016 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
01017
01018 int_type disks_number = config::get_instance()->disks_number();
01019 unsigned_type min_prefetch_buffers = 2 * disks_number;
01020 unsigned_type input_buffers = (memory_to_use > sizeof(out_block_type) ? memory_to_use - sizeof(out_block_type) : 0) / block_type::raw_size;
01021 unsigned_type nruns = sruns.runs.size();
01022
01023 if (input_buffers < nruns + min_prefetch_buffers)
01024 {
01025
01026
01027 STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better");
01028 STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)");
01029 STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger.");
01030 STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers);
01031
01032
01033 unsigned_type recursive_merge_buffers = memory_to_use / block_type::raw_size;
01034 if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
01035
01036
01037 STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but "
01038 << min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and");
01039 STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting.");
01040 throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
01041 }
01042
01043 merge_recursively(memory_to_use);
01044
01045 nruns = sruns.runs.size();
01046 }
01047
01048 assert(nruns + min_prefetch_buffers <= input_buffers);
01049
01050 unsigned_type i;
01051 unsigned_type prefetch_seq_size = 0;
01052 for (i = 0; i < nruns; ++i)
01053 {
01054 prefetch_seq_size += sruns.runs[i].size();
01055 }
01056
01057 consume_seq.resize(prefetch_seq_size);
01058 prefetch_seq = new int_type[prefetch_seq_size];
01059
01060 typename run_type::iterator copy_start = consume_seq.begin();
01061 for (i = 0; i < nruns; ++i)
01062 {
01063 copy_start = std::copy(
01064 sruns.runs[i].begin(),
01065 sruns.runs[i].end(),
01066 copy_start);
01067 }
01068
01069 std::stable_sort(consume_seq.begin(), consume_seq.end(),
01070 sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
01071
01072 const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns);
01073
01074 #if STXXL_SORT_OPTIMAL_PREFETCHING
01075
01076 const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
01077
01078 compute_prefetch_schedule(
01079 consume_seq,
01080 prefetch_seq,
01081 n_opt_prefetch_buffers,
01082 disks_number);
01083 #else
01084 for (i = 0; i < prefetch_seq_size; ++i)
01085 prefetch_seq[i] = i;
01086 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
01087
01088 prefetcher = new prefetcher_type(
01089 consume_seq.begin(),
01090 consume_seq.end(),
01091 prefetch_seq,
01092 STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
01093
01094 if (do_parallel_merge())
01095 {
01096 #if STXXL_PARALLEL_MULTIWAY_MERGE
01097
01098 seqs = new std::vector<sequence>(nruns);
01099 buffers = new std::vector<block_type *>(nruns);
01100
01101 for (unsigned_type i = 0; i < nruns; ++i)
01102 {
01103 (*buffers)[i] = prefetcher->pull_block();
01104 (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());
01105 }
01106
01107 #else
01108 STXXL_THROW_UNREACHABLE();
01109 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
01110 }
01111 else
01112 {
01113
01114 losers = new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp));
01115
01116 }
01117
01118 current_block = new out_block_type;
01119 fill_current_block();
01120
01121 current_value = current_block->elem[0];
01122 buffer_pos = 1;
01123 }
01124
01125 public:
01127 bool empty() const
01128 {
01129 return elements_remaining == 0;
01130 }
01131
01133 const value_type & operator * () const
01134 {
01135 assert(!empty());
01136 return current_value;
01137 }
01138
01140 const value_type * operator -> () const
01141 {
01142 return &(operator * ());
01143 }
01144
01146 basic_runs_merger & operator ++ ()
01147 {
01148 assert(!empty());
01149
01150 --elements_remaining;
01151
01152 if (buffer_pos != out_block_type::size)
01153 {
01154 current_value = current_block->elem[buffer_pos];
01155 ++buffer_pos;
01156 }
01157 else
01158 {
01159 if (!empty())
01160 {
01161 fill_current_block();
01162
01163 #if STXXL_CHECK_ORDER_IN_SORTS
01164 assert(stxxl::is_sorted(current_block->elem, current_block->elem + current_block->size, cmp));
01165 assert(!cmp(current_block->elem[0], current_value));
01166 #endif //STXXL_CHECK_ORDER_IN_SORTS
01167 current_value = current_block->elem[0];
01168 buffer_pos = 1;
01169 }
01170 }
01171
01172 #if STXXL_CHECK_ORDER_IN_SORTS
01173 if (!empty())
01174 {
01175 assert(!cmp(current_value, last_element));
01176 last_element = current_value;
01177 }
01178 #endif //STXXL_CHECK_ORDER_IN_SORTS
01179
01180 return *this;
01181 }
01182
01185 virtual ~basic_runs_merger()
01186 {
01187 deallocate_prefetcher();
01188 delete current_block;
01189 }
01190 };
01191
01192
01193 template <class RunsType_, class Cmp_, class AllocStr_>
01194 void basic_runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively(unsigned_type memory_to_use)
01195 {
01196 block_manager * bm = block_manager::get_instance();
01197 unsigned_type ndisks = config::get_instance()->disks_number();
01198 unsigned_type nwrite_buffers = 2 * ndisks;
01199 unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type);
01200
01201
01202 unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type);
01203 unsigned_type recursive_merger_memory_out_block = sizeof(block_type);
01204 unsigned_type memory_for_buffers = memory_for_write_buffers
01205 + recursive_merger_memory_prefetch_buffers
01206 + recursive_merger_memory_out_block;
01207
01208 unsigned_type max_arity = (memory_to_use > memory_for_buffers ? memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
01209
01210 unsigned_type nruns = sruns.runs.size();
01211 const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity);
01212 assert(merge_factor > 1);
01213 assert(merge_factor <= max_arity);
01214 while (nruns > max_arity)
01215 {
01216 unsigned_type new_nruns = div_ceil(nruns, merge_factor);
01217 STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
01218 " opt_merge_factor: " << merge_factor << " max_arity: " << max_arity << " new_nruns: " << new_nruns);
01219
01220 sorted_runs_type new_runs;
01221 new_runs.runs.resize(new_nruns);
01222 new_runs.runs_sizes.resize(new_nruns);
01223 new_runs.elements = sruns.elements;
01224
01225 unsigned_type runs_left = nruns;
01226 unsigned_type cur_out_run = 0;
01227 unsigned_type elements_in_new_run = 0;
01228
01229 while (runs_left > 0)
01230 {
01231 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01232 elements_in_new_run = 0;
01233 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
01234 {
01235 elements_in_new_run += sruns.runs_sizes[i];
01236 }
01237 const unsigned_type blocks_in_new_run1 = div_ceil(elements_in_new_run, block_type::size);
01238
01239 new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
01240
01241 new_runs.runs[cur_out_run++].resize(blocks_in_new_run1);
01242 runs_left -= runs2merge;
01243 }
01244
01245
01246 for (unsigned_type i = 0; i < new_runs.runs.size(); ++i)
01247 bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[i].begin()), make_bid_iterator(new_runs.runs[i].end()));
01248
01249
01250 runs_left = nruns;
01251 cur_out_run = 0;
01252 size_type elements_left = sruns.elements;
01253
01254 while (runs_left > 0)
01255 {
01256 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01257 STXXL_VERBOSE("Merging " << runs2merge << " runs");
01258
01259 sorted_runs_type cur_runs;
01260 cur_runs.runs.resize(runs2merge);
01261 cur_runs.runs_sizes.resize(runs2merge);
01262
01263 std::copy(sruns.runs.begin() + nruns - runs_left,
01264 sruns.runs.begin() + nruns - runs_left + runs2merge,
01265 cur_runs.runs.begin());
01266 std::copy(sruns.runs_sizes.begin() + nruns - runs_left,
01267 sruns.runs_sizes.begin() + nruns - runs_left + runs2merge,
01268 cur_runs.runs_sizes.begin());
01269
01270 runs_left -= runs2merge;
01271
01272
01273
01274
01275
01276 cur_runs.elements = new_runs.runs_sizes[cur_out_run];
01277 elements_left -= cur_runs.elements;
01278
01279 if (runs2merge > 1)
01280 {
01281 basic_runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, memory_to_use - memory_for_write_buffers);
01282
01283 {
01284 buf_ostream<block_type, typename run_type::iterator> out(
01285 new_runs.runs[cur_out_run].begin(),
01286 nwrite_buffers);
01287
01288 size_type cnt = 0;
01289 const size_type cnt_max = cur_runs.elements;
01290
01291 while (cnt != cnt_max)
01292 {
01293 *out = *merger;
01294 if ((cnt % block_type::size) == 0)
01295 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
01296
01297 ++cnt;
01298 ++out;
01299 ++merger;
01300 }
01301 assert(merger.empty());
01302
01303 while (cnt % block_type::size)
01304 {
01305 *out = cmp.max_value();
01306 ++out;
01307 ++cnt;
01308 }
01309 }
01310 }
01311 else
01312 {
01313 bm->delete_blocks(
01314 make_bid_iterator(new_runs.runs.back().begin()),
01315 make_bid_iterator(new_runs.runs.back().end()));
01316
01317 assert(cur_runs.runs.size() == 1);
01318
01319 std::copy(cur_runs.runs.front().begin(),
01320 cur_runs.runs.front().end(),
01321 new_runs.runs.back().begin());
01322 new_runs.runs_sizes.back() = cur_runs.runs_sizes.front();
01323 }
01324
01325 ++cur_out_run;
01326 }
01327 assert(elements_left == 0);
01328
01329 nruns = new_nruns;
01330 sruns = new_runs;
01331 }
01332 }
01333
01334
01342 template <class RunsType_,
01343 class Cmp_,
01344 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
01345 class runs_merger : public basic_runs_merger<RunsType_, Cmp_, AllocStr_>
01346 {
01347 private:
01348 typedef RunsType_ sorted_runs_type;
01349 typedef basic_runs_merger<RunsType_, Cmp_, AllocStr_> base;
01350 typedef typename base::value_cmp value_cmp;
01351 typedef typename base::block_type block_type;
01352
01353 public:
01358 runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
01359 base(r, c, memory_to_use)
01360 {
01361 }
01362 };
01363
01364
01366
01368
01377 template <class Input_,
01378 class Cmp_,
01379 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
01380 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY,
01381 class runs_creator_type = runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> >
01382 class sort : public noncopyable
01383 {
01384 typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
01385 typedef runs_merger<sorted_runs_type, Cmp_, AllocStr_> runs_merger_type;
01386
01387 runs_creator_type creator;
01388 runs_merger_type merger;
01389
01390 public:
01392 typedef typename Input_::value_type value_type;
01393
01398 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) :
01399 creator(in, c, memory_to_use),
01400 merger(creator.result(), c, memory_to_use)
01401 {
01402 sort_helper::verify_sentinel_strict_weak_ordering(c);
01403 }
01404
01410 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) :
01411 creator(in, c, memory_to_use_rc),
01412 merger(creator.result(), c, memory_to_use_m)
01413 {
01414 sort_helper::verify_sentinel_strict_weak_ordering(c);
01415 }
01416
01417
01419 bool empty() const
01420 {
01421 return merger.empty();
01422 }
01423
01425 const value_type & operator * () const
01426 {
01427 assert(!empty());
01428 return *merger;
01429 }
01430
01431 const value_type * operator -> () const
01432 {
01433 assert(!empty());
01434 return merger.operator -> ();
01435 }
01436
01438 sort & operator ++ ()
01439 {
01440 ++merger;
01441 return *this;
01442 }
01443 };
01444
01450 template <
01451 class ValueType_,
01452 unsigned BlockSize_>
01453 class compute_sorted_runs_type
01454 {
01455 typedef ValueType_ value_type;
01456 typedef BID<BlockSize_> bid_type;
01457 typedef sort_helper::trigger_entry<bid_type, value_type> trigger_entry_type;
01458
01459 public:
01460 typedef sorted_runs<trigger_entry_type> result;
01461 };
01462
01464 }
01465
01468
01470
01479 template <unsigned BlockSize,
01480 class RandomAccessIterator,
01481 class CmpType,
01482 class AllocStr>
01483 void sort(RandomAccessIterator begin,
01484 RandomAccessIterator end,
01485 CmpType cmp,
01486 unsigned_type MemSize,
01487 AllocStr AS)
01488 {
01489 STXXL_UNUSED(AS);
01490 #ifdef BOOST_MSVC
01491 typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
01492 #else
01493 typedef __typeof__(stream::streamify(begin, end)) InputType;
01494 #endif //BOOST_MSVC
01495 InputType Input(begin, end);
01496 typedef stream::sort<InputType, CmpType, BlockSize, AllocStr> sorter_type;
01497 sorter_type Sort(Input, cmp, MemSize);
01498 stream::materialize(Sort, begin);
01499 }
01500
01502
01503 __STXXL_END_NAMESPACE
01504
01505 #endif // !STXXL_SORT_STREAM_HEADER
01506