• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • Examples
  • File List

sort_stream.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/stream/sort_stream.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2006 Johannes Singler <singler@ira.uka.de>
00008  *  Copyright (C) 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00009  *
00010  *  Distributed under the Boost Software License, Version 1.0.
00011  *  (See accompanying file LICENSE_1_0.txt or copy at
00012  *  http://www.boost.org/LICENSE_1_0.txt)
00013  **************************************************************************/
00014 
00015 #ifndef STXXL_SORT_STREAM_HEADER
00016 #define STXXL_SORT_STREAM_HEADER
00017 
00018 #ifdef STXXL_BOOST_CONFIG
00019  #include <boost/config.hpp>
00020 #endif
00021 
00022 #include <stxxl/bits/stream/stream.h>
00023 #include <stxxl/bits/mng/mng.h>
00024 #include <stxxl/bits/algo/sort_base.h>
00025 #include <stxxl/bits/algo/sort_helper.h>
00026 #include <stxxl/bits/algo/adaptor.h>
00027 #include <stxxl/bits/algo/run_cursor.h>
00028 #include <stxxl/bits/algo/losertree.h>
00029 #include <stxxl/bits/stream/sorted_runs.h>
00030 
00031 
00032 __STXXL_BEGIN_NAMESPACE
00033 
00034 namespace stream
00035 {
00038 
00039 
00041     //     CREATE RUNS                                                    //
00043 
00051     template <
00052         class Input_,
00053         class Cmp_,
00054         unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00055         class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00056     class basic_runs_creator : private noncopyable
00057     {
00058     protected:
00059         Input_ & input;
00060         Cmp_ cmp;
00061 
00062     public:
00063         typedef Cmp_ cmp_type;
00064         typedef typename Input_::value_type value_type;
00065         typedef typed_block<BlockSize_, value_type> block_type;
00066         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00067         typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00068 
00069     private:
00070         typedef typename sorted_runs_type::run_type run_type;
00071         sorted_runs_type result_; // stores the result (sorted runs)
00072         unsigned_type m_;         // memory for internal use in blocks
00073         bool result_computed;     // true iff result is already computed (used in 'result' method)
00074 
00076         unsigned_type fetch(block_type * blocks, unsigned_type first_idx, unsigned_type last_idx)
00077         {
00078             typename element_iterator_traits<block_type>::element_iterator output =
00079                 make_element_iterator(blocks, first_idx);
00080             unsigned_type curr_idx = first_idx;
00081             while (!input.empty() && curr_idx != last_idx) {
00082                 *output = *input;
00083                 ++input;
00084                 ++output;
00085                 ++curr_idx;
00086             }
00087             return curr_idx;
00088         }
00089 
00090         void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
00091         {
00092             unsigned_type last_idx = num_blocks * block_type::size;
00093             if (first_idx < last_idx) {
00094                 typename element_iterator_traits<block_type>::element_iterator curr =
00095                     make_element_iterator(blocks, first_idx);
00096                 while (first_idx != last_idx) {
00097                     *curr = cmp.max_value();
00098                     ++curr;
00099                     ++first_idx;
00100                 }
00101             }
00102         }
00103 
00105         void sort_run(block_type * run, unsigned_type elements)
00106         {
00107             std::sort(make_element_iterator(run, 0),
00108                       make_element_iterator(run, elements),
00109                       cmp);
00110         }
00111 
00112         void compute_result();
00113 
00114     public:
00119         basic_runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) :
00120             input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false)
00121         {
00122             sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00123             if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00124                 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00125             }
00126             assert(m_ > 0);
00127         }
00128 
00132         const sorted_runs_type & result()
00133         {
00134             if (!result_computed)
00135             {
00136                 compute_result();
00137                 result_computed = true;
00138 #ifdef STXXL_PRINT_STAT_AFTER_RF
00139                 STXXL_MSG(*stats::get_instance());
00140 #endif //STXXL_PRINT_STAT_AFTER_RF
00141             }
00142             return result_;
00143         }
00144     };
00145 
00149     template <class Input_, class Cmp_, unsigned BlockSize_, class AllocStr_>
00150     void basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result()
00151     {
00152         unsigned_type i = 0;
00153         unsigned_type m2 = m_ / 2;
00154         const unsigned_type el_in_run = m2 * block_type::size; // # el in a run
00155         STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2);
00156         unsigned_type blocks1_length = 0, blocks2_length = 0;
00157         block_type * Blocks1 = NULL;
00158 
00159 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
00160         Blocks1 = new block_type[m2 * 2];
00161 #else
00162         while (!input.empty() && blocks1_length != block_type::size)
00163         {
00164             result_.small_.push_back(*input);
00165             ++input;
00166             ++blocks1_length;
00167         }
00168 
00169         if (blocks1_length == block_type::size && !input.empty())
00170         {
00171             Blocks1 = new block_type[m2 * 2];
00172             std::copy(result_.small_.begin(), result_.small_.end(), Blocks1[0].begin());
00173             result_.small_.clear();
00174         }
00175         else
00176         {
00177             STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
00178             result_.elements = blocks1_length;
00179             std::sort(result_.small_.begin(), result_.small_.end(), cmp);
00180             return;
00181         }
00182 #endif //STXXL_SMALL_INPUT_PSORT_OPT
00183 
00184         // the first block may be there already
00185         blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
00186 
00187         // sort first run
00188         sort_run(Blocks1, blocks1_length);
00189         if (blocks1_length <= block_type::size && input.empty())
00190         {
00191             // small input, do not flush it on the disk(s)
00192             STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
00193             assert(result_.small_.empty());
00194             result_.small_.insert(result_.small_.end(), Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
00195             result_.elements = blocks1_length;
00196             delete[] Blocks1;
00197             return;
00198         }
00199 
00200         block_type * Blocks2 = Blocks1 + m2;
00201         block_manager * bm = block_manager::get_instance();
00202         request_ptr * write_reqs = new request_ptr[m2];
00203         run_type run;
00204 
00205         unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size);  // in blocks
00206         run.resize(cur_run_size);
00207         bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00208 
00209         disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00210 
00211         // fill the rest of the last block with max values
00212         fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
00213 
00214         for (i = 0; i < cur_run_size; ++i)
00215         {
00216             run[i].value = Blocks1[i][0];
00217             write_reqs[i] = Blocks1[i].write(run[i].bid);
00218         }
00219         result_.runs.push_back(run);
00220         result_.runs_sizes.push_back(blocks1_length);
00221         result_.elements += blocks1_length;
00222 
00223         if (input.empty())
00224         {
00225             // return
00226             wait_all(write_reqs, write_reqs + cur_run_size);
00227             delete[] write_reqs;
00228             delete[] Blocks1;
00229             return;
00230         }
00231 
00232         STXXL_VERBOSE1("Filling the second part of the allocated blocks");
00233         blocks2_length = fetch(Blocks2, 0, el_in_run);
00234 
00235         if (input.empty())
00236         {
00237             // optimization if the whole set fits into both halves
00238             // (re)sort internally and return
00239             blocks2_length += el_in_run;
00240             sort_run(Blocks1, blocks2_length);  // sort first an second run together
00241             wait_all(write_reqs, write_reqs + cur_run_size);
00242             bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00243 
00244             cur_run_size = div_ceil(blocks2_length, block_type::size);
00245             run.resize(cur_run_size);
00246             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00247 
00248             // fill the rest of the last block with max values
00249             fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
00250 
00251             assert(cur_run_size > m2);
00252 
00253             for (i = 0; i < m2; ++i)
00254             {
00255                 run[i].value = Blocks1[i][0];
00256                 write_reqs[i]->wait();
00257                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00258             }
00259 
00260             request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2];
00261 
00262             for ( ; i < cur_run_size; ++i)
00263             {
00264                 run[i].value = Blocks1[i][0];
00265                 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
00266             }
00267 
00268             result_.runs[0] = run;
00269             result_.runs_sizes[0] = blocks2_length;
00270             result_.elements = blocks2_length;
00271 
00272             wait_all(write_reqs, write_reqs + m2);
00273             delete[] write_reqs;
00274             wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
00275             delete[] write_reqs1;
00276 
00277             delete[] Blocks1;
00278 
00279             return;
00280         }
00281 
00282         // more than 2 runs can be filled, i. e. the general case
00283 
00284         sort_run(Blocks2, blocks2_length);
00285 
00286         cur_run_size = div_ceil(blocks2_length, block_type::size);  // in blocks
00287         run.resize(cur_run_size);
00288         bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00289 
00290         for (i = 0; i < cur_run_size; ++i)
00291         {
00292             run[i].value = Blocks2[i][0];
00293             write_reqs[i]->wait();
00294             write_reqs[i] = Blocks2[i].write(run[i].bid);
00295         }
00296         assert((blocks2_length % el_in_run) == 0);
00297 
00298         result_.runs.push_back(run);
00299         result_.runs_sizes.push_back(blocks2_length);
00300         result_.elements += blocks2_length;
00301 
00302         while (!input.empty())
00303         {
00304             blocks1_length = fetch(Blocks1, 0, el_in_run);
00305             sort_run(Blocks1, blocks1_length);
00306             cur_run_size = div_ceil(blocks1_length, block_type::size);  // in blocks
00307             run.resize(cur_run_size);
00308             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00309 
00310             // fill the rest of the last block with max values (occurs only on the last run)
00311             fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
00312 
00313             for (i = 0; i < cur_run_size; ++i)
00314             {
00315                 run[i].value = Blocks1[i][0];
00316                 write_reqs[i]->wait();
00317                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00318             }
00319             result_.runs.push_back(run);
00320             result_.runs_sizes.push_back(blocks1_length);
00321             result_.elements += blocks1_length;
00322 
00323             std::swap(Blocks1, Blocks2);
00324             std::swap(blocks1_length, blocks2_length);
00325         }
00326 
00327         wait_all(write_reqs, write_reqs + m2);
00328         delete[] write_reqs;
00329         delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00330     }
00331 
00339     template <
00340         class Input_,
00341         class Cmp_,
00342         unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00343         class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00344     class runs_creator : public basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>
00345     {
00346     private:
00347         typedef basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> base;
00348 
00349     public:
00350         typedef typename base::block_type block_type;
00351 
00352     private:
00353         using base::input;
00354 
00355     public:
00360         runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) : base(i, c, memory_to_use)
00361         { }
00362     };
00363 
00364 
00372     template <class ValueType_>
00373     struct use_push
00374     {
00375         typedef ValueType_ value_type;
00376     };
00377 
00389     template <
00390         class ValueType_,
00391         class Cmp_,
00392         unsigned BlockSize_,
00393         class AllocStr_>
00394     class runs_creator<
00395         use_push<ValueType_>,
00396         Cmp_,
00397         BlockSize_,
00398         AllocStr_>
00399         : private noncopyable
00400     {
00401         Cmp_ cmp;
00402 
00403     public:
00404         typedef Cmp_ cmp_type;
00405         typedef ValueType_ value_type;
00406         typedef typed_block<BlockSize_, value_type> block_type;
00407         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00408         typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00409         typedef sorted_runs_type result_type;
00410 
00411     private:
00412         typedef typename sorted_runs_type::run_type run_type;
00413         sorted_runs_type result_; // stores the result (sorted runs)
00414         unsigned_type m_;         // memory for internal use in blocks
00415 
00416         bool result_computed;     // true after the result() method was called for the first time
00417 
00418         const unsigned_type m2;
00419         const unsigned_type el_in_run;
00420         unsigned_type cur_el;
00421         block_type * Blocks1;
00422         block_type * Blocks2;
00423         request_ptr * write_reqs;
00424         run_type run;
00425 
00426         void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
00427         {
00428             unsigned_type last_idx = num_blocks * block_type::size;
00429             if (first_idx < last_idx) {
00430                 typename element_iterator_traits<block_type>::element_iterator curr =
00431                     make_element_iterator(blocks, first_idx);
00432                 while (first_idx != last_idx) {
00433                     *curr = cmp.max_value();
00434                     ++curr;
00435                     ++first_idx;
00436                 }
00437             }
00438         }
00439 
00440         void sort_run(block_type * run, unsigned_type elements)
00441         {
00442             std::sort(make_element_iterator(run, 0),
00443                       make_element_iterator(run, elements),
00444                       cmp);
00445         }
00446 
00447         void compute_result()
00448         {
00449             if (cur_el == 0)
00450                 return;
00451 
00452             unsigned_type cur_el_reg = cur_el;
00453             sort_run(Blocks1, cur_el_reg);
00454             result_.elements += cur_el_reg;
00455             if (cur_el_reg <= block_type::size && result_.elements == cur_el_reg)
00456             {
00457                 // small input, do not flush it on the disk(s)
00458                 STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << cur_el_reg);
00459                 result_.small_.resize(cur_el_reg);
00460                 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin());
00461                 return;
00462             }
00463 
00464             const unsigned_type cur_run_size = div_ceil(cur_el_reg, block_type::size);     // in blocks
00465             run.resize(cur_run_size);
00466             block_manager * bm = block_manager::get_instance();
00467             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00468 
00469             disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00470 
00471             result_.runs_sizes.push_back(cur_el_reg);
00472 
00473             // fill the rest of the last block with max values
00474             fill_with_max_value(Blocks1, cur_run_size, cur_el_reg);
00475 
00476             unsigned_type i = 0;
00477             for ( ; i < cur_run_size; ++i)
00478             {
00479                 run[i].value = Blocks1[i][0];
00480                 if (write_reqs[i].get())
00481                     write_reqs[i]->wait();
00482 
00483                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00484             }
00485             result_.runs.push_back(run);
00486 
00487             for (i = 0; i < m2; ++i)
00488                 if (write_reqs[i].get())
00489                     write_reqs[i]->wait();
00490         }
00491 
00492         void cleanup()
00493         {
00494             delete[] write_reqs;
00495             delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00496             write_reqs = NULL;
00497             Blocks1 = Blocks2 = NULL;
00498         }
00499 
00500     public:
00504         runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00505             cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false),
00506             m2(m_ / 2),
00507             el_in_run(m2 * block_type::size),
00508             cur_el(0),
00509             Blocks1(new block_type[m2 * 2]),
00510             Blocks2(Blocks1 + m2),
00511             write_reqs(new request_ptr[m2])
00512         {
00513             sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00514             if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00515                 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00516             }
00517             assert(m2 > 0);
00518         }
00519 
00520         ~runs_creator()
00521         {
00522             if (!result_computed)
00523                 cleanup();
00524         }
00525 
00528         void push(const value_type & val)
00529         {
00530             assert(result_computed == false);
00531             unsigned_type cur_el_reg = cur_el;
00532             if (cur_el_reg < el_in_run)
00533             {
00534                 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = val;
00535                 ++cur_el;
00536                 return;
00537             }
00538 
00539             assert(el_in_run == cur_el);
00540             cur_el = 0;
00541 
00542             //sort and store Blocks1
00543             sort_run(Blocks1, el_in_run);
00544             result_.elements += el_in_run;
00545 
00546             const unsigned_type cur_run_size = div_ceil(el_in_run, block_type::size);    // in blocks
00547             run.resize(cur_run_size);
00548             block_manager * bm = block_manager::get_instance();
00549             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00550 
00551             disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00552 
00553             result_.runs_sizes.push_back(el_in_run);
00554 
00555             for (unsigned_type i = 0; i < cur_run_size; ++i)
00556             {
00557                 run[i].value = Blocks1[i][0];
00558                 if (write_reqs[i].get())
00559                     write_reqs[i]->wait();
00560 
00561                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00562             }
00563 
00564             result_.runs.push_back(run);
00565 
00566             std::swap(Blocks1, Blocks2);
00567 
00568             push(val);
00569         }
00570 
00574         const sorted_runs_type & result()
00575         {
00576             if (1)
00577             {
00578                 if (!result_computed)
00579                 {
00580                     compute_result();
00581                     result_computed = true;
00582                     cleanup();
00583 #ifdef STXXL_PRINT_STAT_AFTER_RF
00584                     STXXL_MSG(*stats::get_instance());
00585 #endif //STXXL_PRINT_STAT_AFTER_RF
00586                 }
00587             }
00588             return result_;
00589         }
00590     };
00591 
00592 
00599     template <class ValueType_>
00600     struct from_sorted_sequences
00601     {
00602         typedef ValueType_ value_type;
00603     };
00604 
00616     template <
00617         class ValueType_,
00618         class Cmp_,
00619         unsigned BlockSize_,
00620         class AllocStr_>
00621     class runs_creator<
00622         from_sorted_sequences<ValueType_>,
00623         Cmp_,
00624         BlockSize_,
00625         AllocStr_>
00626         : private noncopyable
00627     {
00628         typedef ValueType_ value_type;
00629         typedef typed_block<BlockSize_, value_type> block_type;
00630         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00631         typedef AllocStr_ alloc_strategy_type;
00632         Cmp_ cmp;
00633 
00634     public:
00635         typedef Cmp_ cmp_type;
00636         typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00637         typedef sorted_runs_type result_type;
00638 
00639     private:
00640         typedef typename sorted_runs_type::run_type run_type;
00641 
00642         sorted_runs_type result_; // stores the result (sorted runs)
00643         unsigned_type m_;         // memory for internal use in blocks
00644         buffered_writer<block_type> writer;
00645         block_type * cur_block;
00646         unsigned_type offset;
00647         unsigned_type iblock;
00648         unsigned_type irun;
00649         alloc_strategy_type alloc_strategy;  // needs to be reset after each run
00650 
00651     public:
00656         runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00657             cmp(c),
00658             m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
00659             writer(m_, m_ / 2),
00660             cur_block(writer.get_free_block()),
00661             offset(0),
00662             iblock(0),
00663             irun(0)
00664         {
00665             sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00666             assert(m_ > 0);
00667             if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00668                 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00669             }
00670         }
00671 
00674         void push(const value_type & val)
00675         {
00676             assert(offset < block_type::size);
00677 
00678             (*cur_block)[offset] = val;
00679             ++offset;
00680 
00681             if (offset == block_type::size)
00682             {
00683                 // write current block
00684 
00685                 block_manager * bm = block_manager::get_instance();
00686                 // allocate space for the block
00687                 result_.runs.resize(irun + 1);
00688                 result_.runs[irun].resize(iblock + 1);
00689                 bm->new_blocks(
00690                     alloc_strategy,
00691                     make_bid_iterator(result_.runs[irun].begin() + iblock),
00692                     make_bid_iterator(result_.runs[irun].end()),
00693                     iblock
00694                     );
00695 
00696                 result_.runs[irun][iblock].value = (*cur_block)[0];         // init trigger
00697                 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00698                 ++iblock;
00699 
00700                 offset = 0;
00701             }
00702 
00703             ++result_.elements;
00704         }
00705 
00707         void finish()
00708         {
00709             if (offset == 0 && iblock == 0) // current run is empty
00710                 return;
00711 
00712 
00713             result_.runs_sizes.resize(irun + 1);
00714             result_.runs_sizes.back() = iblock * block_type::size + offset;
00715 
00716             if (offset)    // if current block is partially filled
00717             {
00718                 while (offset != block_type::size)
00719                 {
00720                     (*cur_block)[offset] = cmp.max_value();
00721                     ++offset;
00722                 }
00723                 offset = 0;
00724 
00725                 block_manager * bm = block_manager::get_instance();
00726                 // allocate space for the block
00727                 result_.runs.resize(irun + 1);
00728                 result_.runs[irun].resize(iblock + 1);
00729                 bm->new_blocks(
00730                     alloc_strategy,
00731                     make_bid_iterator(result_.runs[irun].begin() + iblock),
00732                     make_bid_iterator(result_.runs[irun].end()),
00733                     iblock
00734                     );
00735 
00736                 result_.runs[irun][iblock].value = (*cur_block)[0];         // init trigger
00737                 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00738             }
00739             else
00740             { }
00741 
00742             alloc_strategy = alloc_strategy_type();  // reinitialize block allocator for the next run
00743             iblock = 0;
00744             ++irun;
00745         }
00746 
00750         const sorted_runs_type & result()
00751         {
00752             finish();
00753             writer.flush();
00754 
00755             return result_;
00756         }
00757     };
00758 
00759 
00764     template <class RunsType_, class Cmp_>
00765     bool check_sorted_runs(const RunsType_ & sruns, Cmp_ cmp)
00766     {
00767         sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00768         typedef typename RunsType_::block_type block_type;
00769         typedef typename block_type::value_type value_type;
00770         STXXL_VERBOSE2("Elements: " << sruns.elements);
00771         unsigned_type nruns = sruns.runs.size();
00772         STXXL_VERBOSE2("Runs: " << nruns);
00773         unsigned_type irun = 0;
00774         for (irun = 0; irun < nruns; ++irun)
00775         {
00776             const unsigned_type nblocks = sruns.runs[irun].size();
00777             block_type * blocks = new block_type[nblocks];
00778             request_ptr * reqs = new request_ptr[nblocks];
00779             for (unsigned_type j = 0; j < nblocks; ++j)
00780             {
00781                 reqs[j] = blocks[j].read(sruns.runs[irun][j].bid);
00782             }
00783             wait_all(reqs, reqs + nblocks);
00784             for (unsigned_type j = 0; j < nblocks; ++j)
00785             {
00786                 if (cmp(blocks[j][0], sruns.runs[irun][j].value) ||
00787                     cmp(sruns.runs[irun][j].value, blocks[j][0])) 
00788                 {
00789                     STXXL_ERRMSG("check_sorted_runs  wrong trigger in the run");
00790                     return false;
00791                 }
00792             }
00793             if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
00794                                   make_element_iterator(blocks, sruns.runs_sizes[irun]),
00795                                   cmp))
00796             {
00797                 STXXL_ERRMSG("check_sorted_runs  wrong order in the run");
00798                 return false;
00799             }
00800 
00801             delete[] reqs;
00802             delete[] blocks;
00803         }
00804 
00805         STXXL_MSG("Checking runs finished successfully");
00806 
00807         return true;
00808     }
00809 
00810 
00812     //     MERGE RUNS                                                     //
00814 
00822     template <class RunsType_,
00823               class Cmp_,
00824               class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00825     class basic_runs_merger : private noncopyable
00826     {
00827     protected:
00828         typedef RunsType_ sorted_runs_type;
00829         typedef AllocStr_ alloc_strategy;
00830         typedef typename sorted_runs_type::size_type size_type;
00831         typedef Cmp_ value_cmp;
00832         typedef typename sorted_runs_type::run_type run_type;
00833         typedef typename sorted_runs_type::block_type block_type;
00834         typedef block_type out_block_type;
00835         typedef typename run_type::value_type trigger_entry_type;
00836         typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00837         typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00838         typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00839         typedef loser_tree<run_cursor_type, run_cursor2_cmp_type> loser_tree_type;
00840         typedef stxxl::int64 diff_type;
00841         typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00842         typedef typename std::vector<sequence>::size_type seqs_size_type;
00843 
00844     public:
00846         typedef typename sorted_runs_type::value_type value_type;
00847 
00848     private:
00849         sorted_runs_type sruns;
00850         value_cmp cmp;
00851         size_type elements_remaining;
00852 
00853         out_block_type * current_block;
00854         unsigned_type buffer_pos;
00855         value_type current_value;               // cache for the current value
00856 
00857         run_type consume_seq;
00858         int_type * prefetch_seq;
00859         prefetcher_type * prefetcher;
00860         loser_tree_type * losers;
00861 #if STXXL_PARALLEL_MULTIWAY_MERGE
00862         std::vector<sequence> * seqs;
00863         std::vector<block_type *> * buffers;
00864         diff_type num_currently_mergeable;
00865 #endif
00866 
00867 #if STXXL_CHECK_ORDER_IN_SORTS
00868         value_type last_element;
00869 #endif //STXXL_CHECK_ORDER_IN_SORTS
00870         
00872 
00873         void merge_recursively(unsigned_type memory_to_use);
00874 
00875         void deallocate_prefetcher()
00876         {
00877             if (prefetcher)
00878             {
00879                 delete losers;
00880 #if STXXL_PARALLEL_MULTIWAY_MERGE
00881                 delete seqs;
00882                 delete buffers;
00883 #endif
00884                 delete prefetcher;
00885                 delete[] prefetch_seq;
00886                 prefetcher = NULL;
00887             }
00888             // free blocks in runs , (or the user should do it?)
00889             sruns.deallocate_blocks();
00890         }
00891 
00892         void fill_current_block()
00893         {
00894             STXXL_VERBOSE1("fill_current_block");
00895             if (do_parallel_merge())
00896             {
00897 #if STXXL_PARALLEL_MULTIWAY_MERGE
00898 // begin of STL-style merging
00899                 diff_type rest = out_block_type::size;          // elements still to merge for this output block
00900 
00901                 do                                              // while rest > 0 and still elements available
00902                 {
00903                     if (num_currently_mergeable < rest)
00904                     {
00905                         if (!prefetcher || prefetcher->empty())
00906                         {
00907                             // anything remaining is already in memory
00908                             num_currently_mergeable = elements_remaining;
00909                         }
00910                         else
00911                         {
00912                             num_currently_mergeable = sort_helper::count_elements_less_equal(
00913                                 *seqs, consume_seq[prefetcher->pos()].value, cmp);
00914                         }
00915                     }
00916 
00917                     diff_type output_size = STXXL_MIN(num_currently_mergeable, rest);     // at most rest elements
00918 
00919                     STXXL_VERBOSE1("before merge " << output_size);
00920 
00921                     stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size);
00922                     // sequence iterators are progressed appropriately
00923 
00924                     rest -= output_size;
00925                     num_currently_mergeable -= output_size;
00926 
00927                     STXXL_VERBOSE1("after merge");
00928 
00929                     sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *prefetcher);
00930                 } while (rest > 0 && (*seqs).size() > 0);
00931 
00932 #if STXXL_CHECK_ORDER_IN_SORTS
00933                 if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp))
00934                 {
00935                     for (value_type * i = current_block->begin() + 1; i != current_block->end(); ++i)
00936                         if (cmp(*i, *(i - 1)))
00937                         {
00938                             STXXL_VERBOSE1("Error at position " << (i - current_block->begin()));
00939                         }
00940                     assert(false);
00941                 }
00942 #endif //STXXL_CHECK_ORDER_IN_SORTS
00943 
00944 // end of STL-style merging
00945 #else
00946                 STXXL_THROW_UNREACHABLE();
00947 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
00948             }
00949             else
00950             {
00951 // begin of native merging procedure
00952                 losers->multi_merge(current_block->elem, current_block->elem + STXXL_MIN<size_type>(out_block_type::size, elements_remaining));
00953 // end of native merging procedure
00954             }
00955             STXXL_VERBOSE1("current block filled");
00956 
00957             if (elements_remaining <= out_block_type::size)
00958                 deallocate_prefetcher();
00959         }
00960 
00961     public:
00966         basic_runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
00967             sruns(r),
00968             cmp(c),
00969             elements_remaining(sruns.elements),
00970             current_block(NULL),
00971             buffer_pos(0),
00972             prefetch_seq(NULL),
00973             prefetcher(NULL),
00974             losers(NULL)
00975 #if STXXL_PARALLEL_MULTIWAY_MERGE
00976             , seqs(NULL),
00977             buffers(NULL),
00978             num_currently_mergeable(0)
00979 #endif
00980 #if STXXL_CHECK_ORDER_IN_SORTS
00981             , last_element(cmp.min_value())
00982 #endif //STXXL_CHECK_ORDER_IN_SORTS
00983         {
00984             initialize(r, memory_to_use);
00985         }
00986 
00987     protected:
00988         void initialize(const sorted_runs_type & r, unsigned_type memory_to_use)
00989         {
00990             sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00991 
00992             sruns = r;
00993             elements_remaining = r.elements;
00994 
00995             if (empty())
00996                 return;
00997 
00998             if (!sruns.small_run().empty())
00999             {
01000                 // we have a small input <= B, that is kept in the main memory
01001                 STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << elements_remaining);
01002                 assert(elements_remaining == size_type(sruns.small_run().size()));
01003                 assert(sruns.small_run().size() <= out_block_type::size);
01004                 current_block = new out_block_type;
01005                 std::copy(sruns.small_run().begin(), sruns.small_run().end(), current_block->begin());
01006                 current_value = current_block->elem[0];
01007                 buffer_pos = 1;
01008 
01009                 return;
01010             }
01011 
01012 #if STXXL_CHECK_ORDER_IN_SORTS
01013             assert(check_sorted_runs(r, cmp));
01014 #endif //STXXL_CHECK_ORDER_IN_SORTS
01015 
01016             disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
01017 
01018             int_type disks_number = config::get_instance()->disks_number();
01019             unsigned_type min_prefetch_buffers = 2 * disks_number;
01020             unsigned_type input_buffers = (memory_to_use > sizeof(out_block_type) ? memory_to_use - sizeof(out_block_type) : 0) / block_type::raw_size;
01021             unsigned_type nruns = sruns.runs.size();
01022 
01023             if (input_buffers < nruns + min_prefetch_buffers)
01024             {
01025                 // can not merge runs in one pass
01026                 // merge recursively:
01027                 STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better");
01028                 STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)");
01029                 STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger.");
01030                 STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers);
01031 
01032                 // check whether we have enough memory to merge recursively
01033                 unsigned_type recursive_merge_buffers = memory_to_use / block_type::raw_size;
01034                 if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
01035                     // recursive merge uses min_prefetch_buffers for input buffering and min_prefetch_buffers output buffering
01036                     // as well as 1 current output block and at least 2 input blocks
01037                     STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but "
01038                                  << min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and");
01039                     STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting.");
01040                     throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
01041                 }
01042 
01043                 merge_recursively(memory_to_use);
01044 
01045                 nruns = sruns.runs.size();
01046             }
01047 
01048             assert(nruns + min_prefetch_buffers <= input_buffers);
01049 
01050             unsigned_type i;
01051             unsigned_type prefetch_seq_size = 0;
01052             for (i = 0; i < nruns; ++i)
01053             {
01054                 prefetch_seq_size += sruns.runs[i].size();
01055             }
01056 
01057             consume_seq.resize(prefetch_seq_size);
01058             prefetch_seq = new int_type[prefetch_seq_size];
01059 
01060             typename run_type::iterator copy_start = consume_seq.begin();
01061             for (i = 0; i < nruns; ++i)
01062             {
01063                 copy_start = std::copy(
01064                     sruns.runs[i].begin(),
01065                     sruns.runs[i].end(),
01066                     copy_start);
01067             }
01068 
01069             std::stable_sort(consume_seq.begin(), consume_seq.end(),
01070                              sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
01071 
01072             const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns);
01073 
01074 #if STXXL_SORT_OPTIMAL_PREFETCHING
01075             // heuristic
01076             const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
01077 
01078             compute_prefetch_schedule(
01079                 consume_seq,
01080                 prefetch_seq,
01081                 n_opt_prefetch_buffers,
01082                 disks_number);
01083 #else
01084             for (i = 0; i < prefetch_seq_size; ++i)
01085                 prefetch_seq[i] = i;
01086 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
01087 
01088             prefetcher = new prefetcher_type(
01089                 consume_seq.begin(),
01090                 consume_seq.end(),
01091                 prefetch_seq,
01092                 STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
01093 
01094             if (do_parallel_merge())
01095             {
01096 #if STXXL_PARALLEL_MULTIWAY_MERGE
01097 // begin of STL-style merging
01098                 seqs = new std::vector<sequence>(nruns);
01099                 buffers = new std::vector<block_type *>(nruns);
01100 
01101                 for (unsigned_type i = 0; i < nruns; ++i)                                       //initialize sequences
01102                 {
01103                     (*buffers)[i] = prefetcher->pull_block();                                   //get first block of each run
01104                     (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());  //this memory location stays the same, only the data is exchanged
01105                 }
01106 // end of STL-style merging
01107 #else
01108                 STXXL_THROW_UNREACHABLE();
01109 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
01110             }
01111             else
01112             {
01113 // begin of native merging procedure
01114                 losers = new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp));
01115 // end of native merging procedure
01116             }
01117 
01118             current_block = new out_block_type;
01119             fill_current_block();
01120 
01121             current_value = current_block->elem[0];
01122             buffer_pos = 1;
01123         }
01124 
01125     public:
01127         bool empty() const
01128         {
01129             return elements_remaining == 0;
01130         }
01131 
01133         const value_type & operator * () const
01134         {
01135             assert(!empty());
01136             return current_value;
01137         }
01138 
01140         const value_type * operator -> () const
01141         {
01142             return &(operator * ());
01143         }
01144 
01146         basic_runs_merger & operator ++ ()  // preincrement operator
01147         {
01148             assert(!empty());
01149 
01150             --elements_remaining;
01151 
01152             if (buffer_pos != out_block_type::size)
01153             {
01154                 current_value = current_block->elem[buffer_pos];
01155                 ++buffer_pos;
01156             }
01157             else
01158             {
01159                 if (!empty())
01160                 {
01161                     fill_current_block();
01162 
01163 #if STXXL_CHECK_ORDER_IN_SORTS
01164                     assert(stxxl::is_sorted(current_block->elem, current_block->elem + current_block->size, cmp));
01165                     assert(!cmp(current_block->elem[0], current_value));
01166 #endif //STXXL_CHECK_ORDER_IN_SORTS
01167                     current_value = current_block->elem[0];
01168                     buffer_pos = 1;
01169                 }
01170             }
01171 
01172 #if STXXL_CHECK_ORDER_IN_SORTS
01173             if (!empty())
01174             {
01175                 assert(!cmp(current_value, last_element));
01176                 last_element = current_value;
01177             }
01178 #endif //STXXL_CHECK_ORDER_IN_SORTS
01179 
01180             return *this;
01181         }
01182 
01185         virtual ~basic_runs_merger()
01186         {
01187             deallocate_prefetcher();
01188             delete current_block;
01189         }
01190     };
01191 
01192 
01193     template <class RunsType_, class Cmp_, class AllocStr_>
01194     void basic_runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively(unsigned_type memory_to_use)
01195     {
01196         block_manager * bm = block_manager::get_instance();
01197         unsigned_type ndisks = config::get_instance()->disks_number();
01198         unsigned_type nwrite_buffers = 2 * ndisks;
01199         unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type);
01200 
01201         // memory consumption of the recursive merger (uses block_type as out_block_type)
01202         unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type);
01203         unsigned_type recursive_merger_memory_out_block = sizeof(block_type);
01204         unsigned_type memory_for_buffers = memory_for_write_buffers
01205                                            + recursive_merger_memory_prefetch_buffers
01206                                            + recursive_merger_memory_out_block;
01207         // maximum arity in the recursive merger
01208         unsigned_type max_arity = (memory_to_use > memory_for_buffers ? memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
01209 
01210         unsigned_type nruns = sruns.runs.size();
01211         const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity);
01212         assert(merge_factor > 1);
01213         assert(merge_factor <= max_arity);
01214         while (nruns > max_arity)
01215         {
01216             unsigned_type new_nruns = div_ceil(nruns, merge_factor);
01217             STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
01218                           " opt_merge_factor: " << merge_factor << " max_arity: " << max_arity << " new_nruns: " << new_nruns);
01219 
01220             sorted_runs_type new_runs;
01221             new_runs.runs.resize(new_nruns);
01222             new_runs.runs_sizes.resize(new_nruns);
01223             new_runs.elements = sruns.elements;
01224 
01225             unsigned_type runs_left = nruns;
01226             unsigned_type cur_out_run = 0;
01227             unsigned_type elements_in_new_run = 0;
01228 
01229             while (runs_left > 0)
01230             {
01231                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01232                 elements_in_new_run = 0;
01233                 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
01234                 {
01235                     elements_in_new_run += sruns.runs_sizes[i];
01236                 }
01237                 const unsigned_type blocks_in_new_run1 = div_ceil(elements_in_new_run, block_type::size);
01238 
01239                 new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
01240                 // allocate run
01241                 new_runs.runs[cur_out_run++].resize(blocks_in_new_run1);
01242                 runs_left -= runs2merge;
01243             }
01244 
01245             // allocate blocks for the new runs
01246             for (unsigned_type i = 0; i < new_runs.runs.size(); ++i)
01247                 bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[i].begin()), make_bid_iterator(new_runs.runs[i].end()));
01248 
01249             // merge all
01250             runs_left = nruns;
01251             cur_out_run = 0;
01252             size_type elements_left = sruns.elements;
01253 
01254             while (runs_left > 0)
01255             {
01256                 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01257                 STXXL_VERBOSE("Merging " << runs2merge << " runs");
01258 
01259                 sorted_runs_type cur_runs;
01260                 cur_runs.runs.resize(runs2merge);
01261                 cur_runs.runs_sizes.resize(runs2merge);
01262 
01263                 std::copy(sruns.runs.begin() + nruns - runs_left,
01264                           sruns.runs.begin() + nruns - runs_left + runs2merge,
01265                           cur_runs.runs.begin());
01266                 std::copy(sruns.runs_sizes.begin() + nruns - runs_left,
01267                           sruns.runs_sizes.begin() + nruns - runs_left + runs2merge,
01268                           cur_runs.runs_sizes.begin());
01269 
01270                 runs_left -= runs2merge;
01271                 /*
01272                    cur_runs.elements = (runs_left)?
01273                    (new_runs.runs[cur_out_run].size()*block_type::size):
01274                    (elements_left);
01275                  */
01276                 cur_runs.elements = new_runs.runs_sizes[cur_out_run];
01277                 elements_left -= cur_runs.elements;
01278 
01279                 if (runs2merge > 1)
01280                 {
01281                     basic_runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, memory_to_use - memory_for_write_buffers);
01282 
01283                     {   // make sure everything is being destroyed in right time
01284                         buf_ostream<block_type, typename run_type::iterator> out(
01285                             new_runs.runs[cur_out_run].begin(),
01286                             nwrite_buffers);
01287 
01288                         size_type cnt = 0;
01289                         const size_type cnt_max = cur_runs.elements;
01290 
01291                         while (cnt != cnt_max)
01292                         {
01293                             *out = *merger;
01294                             if ((cnt % block_type::size) == 0) // have to write the trigger value
01295                                 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
01296 
01297                             ++cnt;
01298                             ++out;
01299                             ++merger;
01300                         }
01301                         assert(merger.empty());
01302 
01303                         while (cnt % block_type::size)
01304                         {
01305                             *out = cmp.max_value();
01306                             ++out;
01307                             ++cnt;
01308                         }
01309                     }
01310                 }
01311                 else
01312                 {
01313                     bm->delete_blocks(
01314                         make_bid_iterator(new_runs.runs.back().begin()),
01315                         make_bid_iterator(new_runs.runs.back().end()));
01316 
01317                     assert(cur_runs.runs.size() == 1);
01318 
01319                     std::copy(cur_runs.runs.front().begin(),
01320                               cur_runs.runs.front().end(),
01321                               new_runs.runs.back().begin());
01322                     new_runs.runs_sizes.back() = cur_runs.runs_sizes.front();
01323                 }
01324 
01325                 ++cur_out_run;
01326             }
01327             assert(elements_left == 0);
01328 
01329             nruns = new_nruns;
01330             sruns = new_runs;
01331         }
01332     }
01333 
01334 
01342     template <class RunsType_,
01343               class Cmp_,
01344               class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
01345     class runs_merger : public basic_runs_merger<RunsType_, Cmp_, AllocStr_>
01346     {
01347     private:
01348         typedef RunsType_ sorted_runs_type;
01349         typedef basic_runs_merger<RunsType_, Cmp_, AllocStr_> base;
01350         typedef typename base::value_cmp value_cmp;
01351         typedef typename base::block_type block_type;
01352 
01353     public:
01358         runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
01359             base(r, c, memory_to_use)
01360         {
01361         }
01362     };
01363 
01364 
01366     //     SORT                                                           //
01368 
01377     template <class Input_,
01378               class Cmp_,
01379               unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
01380               class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY,
01381               class runs_creator_type = runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> >
01382     class sort : public noncopyable
01383     {
01384         typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
01385         typedef runs_merger<sorted_runs_type, Cmp_, AllocStr_> runs_merger_type;
01386 
01387         runs_creator_type creator;
01388         runs_merger_type merger;
01389 
01390     public:
01392         typedef typename Input_::value_type value_type;
01393 
01398         sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) :
01399             creator(in, c, memory_to_use),
01400             merger(creator.result(), c, memory_to_use)
01401         {
01402             sort_helper::verify_sentinel_strict_weak_ordering(c);
01403         }
01404 
01410         sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) :
01411             creator(in, c, memory_to_use_rc),
01412             merger(creator.result(), c, memory_to_use_m)
01413         {
01414             sort_helper::verify_sentinel_strict_weak_ordering(c);
01415         }
01416 
01417 
01419         bool empty() const
01420         {
01421             return merger.empty();
01422         }
01423 
01425         const value_type & operator * () const
01426         {
01427             assert(!empty());
01428             return *merger;
01429         }
01430 
01431         const value_type * operator -> () const
01432         {
01433             assert(!empty());
01434             return merger.operator -> ();
01435         }
01436 
01438         sort & operator ++ ()
01439         {
01440             ++merger;
01441             return *this;
01442         }
01443     };
01444 
01450     template <
01451         class ValueType_,
01452         unsigned BlockSize_>
01453     class compute_sorted_runs_type
01454     {
01455         typedef ValueType_ value_type;
01456         typedef BID<BlockSize_> bid_type;
01457         typedef sort_helper::trigger_entry<bid_type, value_type> trigger_entry_type;
01458 
01459     public:
01460         typedef sorted_runs<trigger_entry_type> result;
01461     };
01462 
01464 }
01465 
01468 
01470 
01479 template <unsigned BlockSize,
01480           class RandomAccessIterator,
01481           class CmpType,
01482           class AllocStr>
01483 void sort(RandomAccessIterator begin,
01484           RandomAccessIterator end,
01485           CmpType cmp,
01486           unsigned_type MemSize,
01487           AllocStr AS)
01488 {
01489     STXXL_UNUSED(AS);
01490 #ifdef BOOST_MSVC
01491     typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
01492 #else
01493     typedef __typeof__(stream::streamify(begin, end)) InputType;
01494 #endif //BOOST_MSVC
01495     InputType Input(begin, end);
01496     typedef stream::sort<InputType, CmpType, BlockSize, AllocStr> sorter_type;
01497     sorter_type Sort(Input, cmp, MemSize);
01498     stream::materialize(Sort, begin);
01499 }
01500 
01502 
01503 __STXXL_END_NAMESPACE
01504 
01505 #endif // !STXXL_SORT_STREAM_HEADER
01506 // vim: et:ts=4:sw=4

Generated on Sun Oct 17 2010 06:13:44 for Stxxl by  doxygen 1.7.1