• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • Examples
  • File List

sort.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/algo/sort.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2003 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2006 Johannes Singler <singler@ira.uka.de>
00008  *  Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00009  *
00010  *  Distributed under the Boost Software License, Version 1.0.
00011  *  (See accompanying file LICENSE_1_0.txt or copy at
00012  *  http://www.boost.org/LICENSE_1_0.txt)
00013  **************************************************************************/
00014 
00015 #ifndef STXXL_SORT_HEADER
00016 #define STXXL_SORT_HEADER
00017 
00018 #include <functional>
00019 
00020 #include <stxxl/bits/mng/mng.h>
00021 #include <stxxl/bits/common/rand.h>
00022 #include <stxxl/bits/mng/adaptor.h>
00023 #include <stxxl/bits/common/simple_vector.h>
00024 #include <stxxl/bits/common/switch.h>
00025 #include <stxxl/bits/common/settings.h>
00026 #include <stxxl/bits/mng/block_alloc_interleaved.h>
00027 #include <stxxl/bits/algo/sort_base.h>
00028 #include <stxxl/bits/algo/sort_helper.h>
00029 #include <stxxl/bits/algo/intksort.h>
00030 #include <stxxl/bits/algo/adaptor.h>
00031 #include <stxxl/bits/algo/async_schedule.h>
00032 #include <stxxl/bits/mng/block_prefetcher.h>
00033 #include <stxxl/bits/mng/buf_writer.h>
00034 #include <stxxl/bits/algo/run_cursor.h>
00035 #include <stxxl/bits/algo/losertree.h>
00036 #include <stxxl/bits/algo/inmemsort.h>
00037 #include <stxxl/bits/parallel.h>
00038 #include <stxxl/bits/common/is_sorted.h>
00039 
00040 
00041 __STXXL_BEGIN_NAMESPACE
00042 
00045 
00046 
00049 namespace sort_local
00050 {
00051     template <typename block_type, typename bid_type>
00052     struct read_next_after_write_completed
00053     {
00054         block_type * block;
00055         bid_type bid;
00056         request_ptr * req;
00057         void operator () (request * /*completed_req*/)
00058         {
00059             *req = block->read(bid);
00060         }
00061     };
00062 
00063 
00064     template <
00065         typename block_type,
00066         typename run_type,
00067         typename input_bid_iterator,
00068         typename value_cmp>
00069     void
00070     create_runs(
00071         input_bid_iterator it,
00072         run_type ** runs,
00073         int_type nruns,
00074         int_type _m,
00075         value_cmp cmp)
00076     {
00077         typedef typename block_type::value_type type;
00078         typedef typename block_type::bid_type bid_type;
00079         STXXL_VERBOSE1("stxxl::create_runs nruns=" << nruns << " m=" << _m);
00080 
00081         int_type m2 = _m / 2;
00082         block_manager * bm = block_manager::get_instance();
00083         block_type * Blocks1 = new block_type[m2];
00084         block_type * Blocks2 = new block_type[m2];
00085         bid_type * bids1 = new bid_type[m2];
00086         bid_type * bids2 = new bid_type[m2];
00087         request_ptr * read_reqs1 = new request_ptr[m2];
00088         request_ptr * read_reqs2 = new request_ptr[m2];
00089         request_ptr * write_reqs = new request_ptr[m2];
00090         read_next_after_write_completed<block_type, bid_type> * next_run_reads =
00091             new read_next_after_write_completed<block_type, bid_type>[m2];
00092 
00093         disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00094 
00095         int_type i;
00096         int_type run_size = 0;
00097 
00098         assert(nruns >= 2);
00099 
00100         run_size = runs[0]->size();
00101         assert(run_size == m2);
00102 
00103         for (i = 0; i < run_size; ++i)
00104         {
00105             STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks1[i].elem));
00106             bids1[i] = *(it++);
00107             read_reqs1[i] = Blocks1[i].read(bids1[i]);
00108         }
00109 
00110         run_size = runs[1]->size();
00111 
00112         for (i = 0; i < run_size; ++i)
00113         {
00114             STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks2[i].elem));
00115             bids2[i] = *(it++);
00116             read_reqs2[i] = Blocks2[i].read(bids2[i]);
00117         }
00118 
00119         for (int_type k = 0; k < nruns - 1; ++k)
00120         {
00121             run_type * run = runs[k];
00122             run_size = run->size();
00123             assert(run_size == m2);
00124             #ifndef NDEBUG
00125             int_type next_run_size = runs[k + 1]->size();
00126             #endif
00127             assert((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2));
00128 
00129             STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
00130             wait_all(read_reqs1, run_size);
00131             STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
00132             for (i = 0; i < run_size; ++i)
00133                 bm->delete_block(bids1[i]);
00134 
00135             std::sort(make_element_iterator(Blocks1, 0),
00136                       make_element_iterator(Blocks1, run_size * block_type::size),
00137                       cmp);
00138 
00139             STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00140             if (k > 0)
00141                 wait_all(write_reqs, m2);
00142             STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00143 
00144             int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0;
00145             for (i = 0; i < m2; ++i)
00146             {
00147                 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
00148                 (*run)[i].value = Blocks1[i][0];
00149                 if (i >= runplus2size) {
00150                     write_reqs[i] = Blocks1[i].write((*run)[i].bid);
00151                 }
00152                 else
00153                 {
00154                     next_run_reads[i].block = Blocks1 + i;
00155                     next_run_reads[i].req = read_reqs1 + i;
00156                     bids1[i] = next_run_reads[i].bid = *(it++);
00157                     write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]);
00158                 }
00159             }
00160             std::swap(Blocks1, Blocks2);
00161             std::swap(bids1, bids2);
00162             std::swap(read_reqs1, read_reqs2);
00163         }
00164 
00165         run_type * run = runs[nruns - 1];
00166         run_size = run->size();
00167         STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
00168         wait_all(read_reqs1, run_size);
00169         STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
00170         for (i = 0; i < run_size; ++i)
00171             bm->delete_block(bids1[i]);
00172 
00173         std::sort(make_element_iterator(Blocks1, 0),
00174                   make_element_iterator(Blocks1, run_size * block_type::size),
00175                   cmp);
00176 
00177         STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00178         wait_all(write_reqs, m2);
00179         STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00180 
00181         for (i = 0; i < run_size; ++i)
00182         {
00183             STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
00184             (*run)[i].value = Blocks1[i][0];
00185             write_reqs[i] = Blocks1[i].write((*run)[i].bid);
00186         }
00187 
00188         STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00189         wait_all(write_reqs, run_size);
00190         STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00191 
00192         delete[] Blocks1;
00193         delete[] Blocks2;
00194         delete[] bids1;
00195         delete[] bids2;
00196         delete[] read_reqs1;
00197         delete[] read_reqs2;
00198         delete[] write_reqs;
00199         delete[] next_run_reads;
00200     }
00201 
00202 
00203     template <typename block_type, typename run_type, typename value_cmp>
00204     bool check_sorted_runs(run_type ** runs,
00205                            unsigned_type nruns,
00206                            unsigned_type m,
00207                            value_cmp cmp)
00208     {
00209         typedef typename block_type::value_type value_type;
00210 
00211         STXXL_MSG("check_sorted_runs  Runs: " << nruns);
00212         unsigned_type irun = 0;
00213         for (irun = 0; irun < nruns; ++irun)
00214         {
00215             const unsigned_type nblocks_per_run = runs[irun]->size();
00216             unsigned_type blocks_left = nblocks_per_run;
00217             block_type * blocks = new block_type[m];
00218             request_ptr * reqs = new request_ptr[m];
00219             value_type last = cmp.min_value();
00220 
00221             for (unsigned_type off = 0; off < nblocks_per_run; off += m)
00222             {
00223                 const unsigned_type nblocks = STXXL_MIN(blocks_left, m);
00224                 const unsigned_type nelements = nblocks * block_type::size;
00225                 blocks_left -= nblocks;
00226 
00227                 for (unsigned_type j = 0; j < nblocks; ++j)
00228                 {
00229                     reqs[j] = blocks[j].read((*runs[irun])[j + off].bid);
00230                 }
00231                 wait_all(reqs, reqs + nblocks);
00232 
00233                 if (off && cmp(blocks[0][0], last))
00234                 {
00235                     STXXL_MSG("check_sorted_runs  wrong first value in the run " << irun);
00236                     STXXL_MSG(" first value: " << blocks[0][0]);
00237                     STXXL_MSG(" last  value: " << last);
00238                     for (unsigned_type k = 0; k < block_type::size; ++k)
00239                         STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k]);
00240 
00241                     return false;
00242                 }
00243 
00244                 for (unsigned_type j = 0; j < nblocks; ++j)
00245                 {
00246                     if (!(blocks[j][0] == (*runs[irun])[j + off].value))
00247                     {
00248                         STXXL_MSG("check_sorted_runs  wrong trigger in the run " << irun << " block " << (j + off));
00249                         STXXL_MSG("                   trigger value: " << (*runs[irun])[j + off].value);
00250                         STXXL_MSG("Data in the block:");
00251                         for (unsigned_type k = 0; k < block_type::size; ++k)
00252                             STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k]);
00253 
00254                         STXXL_MSG("BIDS:");
00255                         for (unsigned_type k = 0; k < nblocks; ++k)
00256                         {
00257                             if (k == j)
00258                                 STXXL_MSG("Bad one comes next.");
00259                             STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00260                         }
00261 
00262                         return false;
00263                     }
00264                 }
00265                 if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
00266                                       make_element_iterator(blocks, nelements),
00267                                       cmp))
00268                 {
00269                     STXXL_MSG("check_sorted_runs  wrong order in the run " << irun);
00270                     STXXL_MSG("Data in blocks:");
00271                     for (unsigned_type j = 0; j < nblocks; ++j)
00272                     {
00273                         for (unsigned_type k = 0; k < block_type::size; ++k)
00274                             STXXL_MSG("     Element " << k << " in block " << (j + off) << " is :" << blocks[j][k]);
00275                     }
00276                     STXXL_MSG("BIDS:");
00277                     for (unsigned_type k = 0; k < nblocks; ++k)
00278                     {
00279                         STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00280                     }
00281 
00282                     return false;
00283                 }
00284 
00285                 last = blocks[nblocks - 1][block_type::size - 1];
00286             }
00287 
00288             assert(blocks_left == 0);
00289             delete[] reqs;
00290             delete[] blocks;
00291         }
00292 
00293         return true;
00294     }
00295 
00296 
00297     template <typename block_type, typename run_type, typename value_cmp>
00298     void merge_runs(run_type ** in_runs, int_type nruns, run_type * out_run, unsigned_type _m, value_cmp cmp)
00299     {
00300         typedef typename block_type::value_type value_type;
00301         typedef typename run_type::value_type trigger_entry_type;
00302         typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00303         typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00304         typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00305 
00306         run_type consume_seq(out_run->size());
00307 
00308         int_type * prefetch_seq = new int_type[out_run->size()];
00309 
00310         typename run_type::iterator copy_start = consume_seq.begin();
00311         for (int_type i = 0; i < nruns; i++)
00312         {
00313             // TODO: try to avoid copy
00314             copy_start = std::copy(
00315                 in_runs[i]->begin(),
00316                 in_runs[i]->end(),
00317                 copy_start);
00318         }
00319 
00320         std::stable_sort(consume_seq.begin(), consume_seq.end(),
00321                          sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
00322 
00323         int_type disks_number = config::get_instance()->disks_number();
00324 
00325 #ifdef PLAY_WITH_OPT_PREF
00326         const int_type n_write_buffers = 4 * disks_number;
00327 #else
00328         const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (3 * (int_type(_m) - nruns) / 4));
00329         const int_type n_write_buffers = STXXL_MAX(2 * disks_number, int_type(_m) - nruns - n_prefetch_buffers);
00330  #if STXXL_SORT_OPTIMAL_PREFETCHING
00331         // heuristic
00332         const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
00333  #endif
00334 #endif
00335 
00336 #if STXXL_SORT_OPTIMAL_PREFETCHING
00337         compute_prefetch_schedule(
00338             consume_seq,
00339             prefetch_seq,
00340             n_opt_prefetch_buffers,
00341             disks_number);
00342 #else
00343         for (unsigned_type i = 0; i < out_run->size(); i++)
00344             prefetch_seq[i] = i;
00345 
00346 #endif
00347 
00348         prefetcher_type prefetcher(consume_seq.begin(),
00349                                    consume_seq.end(),
00350                                    prefetch_seq,
00351                                    nruns + n_prefetch_buffers);
00352 
00353         buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2);
00354 
00355         int_type out_run_size = out_run->size();
00356 
00357         block_type * out_buffer = writer.get_free_block();
00358 
00359 //If parallelism is activated, one can still fall back to the
00360 //native merge routine by setting stxxl::SETTINGS::native_merge= true, //otherwise, it is used anyway.
00361 
00362         if (do_parallel_merge())
00363         {
00364 #if STXXL_PARALLEL_MULTIWAY_MERGE
00365 
00366 // begin of STL-style merging
00367 
00368             typedef stxxl::int64 diff_type;
00369             typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00370             typedef typename std::vector<sequence>::size_type seqs_size_type;
00371             std::vector<sequence> seqs(nruns);
00372             std::vector<block_type *> buffers(nruns);
00373 
00374             for (int_type i = 0; i < nruns; i++)                // initialize sequences
00375             {
00376                 buffers[i] = prefetcher.pull_block();           // get first block of each run
00377                 seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end());
00378                 // this memory location stays the same, only the data is exchanged
00379             }
00380 
00381  #if STXXL_CHECK_ORDER_IN_SORTS
00382             value_type last_elem = cmp.min_value();
00383  #endif
00384             diff_type num_currently_mergeable = 0;
00385 
00386             for (int_type j = 0; j < out_run_size; ++j)                 // for the whole output run, out_run_size is in blocks
00387             {
00388                 diff_type rest = block_type::size;                      // elements still to merge for this output block
00389 
00390                 STXXL_VERBOSE1("output block " << j);
00391                 do {
00392                     if (num_currently_mergeable < rest)
00393                     {
00394                         if (prefetcher.empty())
00395                         {
00396                             // anything remaining is already in memory
00397                             num_currently_mergeable = (out_run_size - j) * block_type::size
00398                                                       - (block_type::size - rest);
00399                         }
00400                         else
00401                         {
00402                             num_currently_mergeable = sort_helper::count_elements_less_equal(
00403                                 seqs, consume_seq[prefetcher.pos()].value, cmp);
00404                         }
00405                     }
00406 
00407                     diff_type output_size = STXXL_MIN(num_currently_mergeable, rest);   // at most rest elements
00408 
00409                     STXXL_VERBOSE1("before merge " << output_size);
00410 
00411                     stxxl::parallel::multiway_merge(seqs.begin(), seqs.end(), out_buffer->end() - rest, cmp, output_size);
00412                     // sequence iterators are progressed appropriately
00413 
00414                     rest -= output_size;
00415                     num_currently_mergeable -= output_size;
00416 
00417                     STXXL_VERBOSE1("after merge");
00418 
00419                     sort_helper::refill_or_remove_empty_sequences(seqs, buffers, prefetcher);
00420                 } while (rest > 0 && seqs.size() > 0);
00421 
00422  #if STXXL_CHECK_ORDER_IN_SORTS
00423                 if (!stxxl::is_sorted(out_buffer->begin(), out_buffer->end(), cmp))
00424                 {
00425                     for (value_type * i = out_buffer->begin() + 1; i != out_buffer->end(); i++)
00426                         if (cmp(*i, *(i - 1)))
00427                         {
00428                             STXXL_VERBOSE1("Error at position " << (i - out_buffer->begin()));
00429                         }
00430                     assert(false);
00431                 }
00432 
00433                 if (j > 0) // do not check in first iteration
00434                     assert(cmp((*out_buffer)[0], last_elem) == false);
00435 
00436                 last_elem = (*out_buffer)[block_type::size - 1];
00437  #endif
00438 
00439                 (*out_run)[j].value = (*out_buffer)[0];                              // save smallest value
00440 
00441                 out_buffer = writer.write(out_buffer, (*out_run)[j].bid);
00442             }
00443 
00444 // end of STL-style merging
00445 
00446 #else
00447             STXXL_THROW_UNREACHABLE();
00448 #endif
00449         }
00450         else
00451         {
00452 // begin of native merging procedure
00453 
00454             loser_tree<run_cursor_type, run_cursor2_cmp_type>
00455             losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp));
00456 
00457 #if STXXL_CHECK_ORDER_IN_SORTS
00458             value_type last_elem = cmp.min_value();
00459 #endif
00460 
00461             for (int_type i = 0; i < out_run_size; ++i)
00462             {
00463                 losers.multi_merge(out_buffer->elem, out_buffer->elem + block_type::size);
00464                 (*out_run)[i].value = *(out_buffer->elem);
00465 
00466 #if STXXL_CHECK_ORDER_IN_SORTS
00467                 assert(stxxl::is_sorted(
00468                            out_buffer->begin(),
00469                            out_buffer->end(),
00470                            cmp));
00471 
00472                 if (i)
00473                     assert(cmp(*(out_buffer->elem), last_elem) == false);
00474 
00475                 last_elem = (*out_buffer).elem[block_type::size - 1];
00476 #endif
00477 
00478                 out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
00479             }
00480 
00481 // end of native merging procedure
00482         }
00483 
00484         delete[] prefetch_seq;
00485 
00486         block_manager * bm = block_manager::get_instance();
00487         for (int_type i = 0; i < nruns; ++i)
00488         {
00489             unsigned_type sz = in_runs[i]->size();
00490             for (unsigned_type j = 0; j < sz; ++j)
00491                 bm->delete_block((*in_runs[i])[j].bid);
00492 
00493 
00494             delete in_runs[i];
00495         }
00496     }
00497 
00498 
00499     template <typename block_type,
00500               typename alloc_strategy,
00501               typename input_bid_iterator,
00502               typename value_cmp>
00503     simple_vector<sort_helper::trigger_entry<block_type> > *
00504     sort_blocks(input_bid_iterator input_bids,
00505                 unsigned_type _n,
00506                 unsigned_type _m,
00507                 value_cmp cmp
00508                 )
00509     {
00510         typedef typename block_type::value_type type;
00511         typedef typename block_type::bid_type bid_type;
00512         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00513         typedef simple_vector<trigger_entry_type> run_type;
00514         typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
00515 
00516         unsigned_type m2 = _m / 2;
00517         unsigned_type full_runs = _n / m2;
00518         unsigned_type partial_runs = ((_n % m2) ? 1 : 0);
00519         unsigned_type nruns = full_runs + partial_runs;
00520         unsigned_type i;
00521 
00522         block_manager * mng = block_manager::get_instance();
00523 
00524         //STXXL_VERBOSE ("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs);
00525 
00526         double begin = timestamp(), after_runs_creation, end;
00527 
00528         run_type ** runs = new run_type *[nruns];
00529 
00530         for (i = 0; i < full_runs; i++)
00531             runs[i] = new run_type(m2);
00532 
00533 
00534         if (partial_runs)
00535             runs[i] = new run_type(_n - full_runs * m2);
00536 
00537         for (i = 0; i < nruns; ++i)
00538             mng->new_blocks(alloc_strategy(), make_bid_iterator(runs[i]->begin()), make_bid_iterator(runs[i]->end()));
00539 
00540         sort_local::create_runs<block_type,
00541                                 run_type,
00542                                 input_bid_iterator,
00543                                 value_cmp>(input_bids, runs, nruns, _m, cmp);
00544 
00545         after_runs_creation = timestamp();
00546 
00547         double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
00548 
00549         disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00550 
00551         const int_type merge_factor = optimal_merge_factor(nruns, _m);
00552         run_type ** new_runs;
00553 
00554         while (nruns > 1)
00555         {
00556             int_type new_nruns = div_ceil(nruns, merge_factor);
00557             STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
00558                           " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns);
00559 
00560             new_runs = new run_type *[new_nruns];
00561 
00562             int_type runs_left = nruns;
00563             int_type cur_out_run = 0;
00564             int_type blocks_in_new_run = 0;
00565 
00566             while (runs_left > 0)
00567             {
00568                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00569                 blocks_in_new_run = 0;
00570                 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
00571                     blocks_in_new_run += runs[i]->size();
00572 
00573                 // allocate run
00574                 new_runs[cur_out_run++] = new run_type(blocks_in_new_run);
00575                 runs_left -= runs2merge;
00576             }
00577             // allocate blocks for the new runs
00578             if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && !input_bids->is_managed())
00579             {
00580                 // if we sort a file we can reuse the input bids for the output
00581                 input_bid_iterator cur = input_bids;
00582                 for (int_type i = 0; cur != (input_bids + _n); ++cur)
00583                 {
00584                     (*new_runs[0])[i++].bid = *cur;
00585                 }
00586 
00587                 bid_type & firstBID = (*new_runs[0])[0].bid;
00588                 if (firstBID.is_managed())
00589                 {
00590                     // the first block does not belong to the file
00591                     // need to reallocate it
00592                     mng->new_block(FR(), firstBID);
00593                 }
00594                 bid_type & lastBID = (*new_runs[0])[_n - 1].bid;
00595                 if (lastBID.is_managed())
00596                 {
00597                     // the first block does not belong to the file
00598                     // need to reallocate it
00599                     mng->new_block(FR(), lastBID);
00600                 }
00601             }
00602             else
00603             {
00604                 mng->new_blocks(interleaved_alloc_strategy(new_nruns, alloc_strategy()),
00605                                 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run),
00606                                 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run));
00607             }
00608             // merge all
00609             runs_left = nruns;
00610             cur_out_run = 0;
00611             while (runs_left > 0)
00612             {
00613                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00614 #if STXXL_CHECK_ORDER_IN_SORTS
00615                 assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp)));
00616 #endif
00617                 STXXL_VERBOSE("Merging " << runs2merge << " runs");
00618                 merge_runs<block_type, run_type>(runs + nruns - runs_left,
00619                                                  runs2merge, *(new_runs + (cur_out_run++)), _m, cmp
00620                                                  );
00621                 runs_left -= runs2merge;
00622             }
00623 
00624             nruns = new_nruns;
00625             delete[] runs;
00626             runs = new_runs;
00627         }
00628 
00629         run_type * result = *runs;
00630         delete[] runs;
00631 
00632         end = timestamp();
00633 
00634         STXXL_VERBOSE("Elapsed time        : " << end - begin << " s. Run creation time: " <<
00635                       after_runs_creation - begin << " s");
00636         STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s");
00637         STXXL_VERBOSE(*stats::get_instance());
00638         STXXL_UNUSED(begin + after_runs_creation + end + io_wait_after_rf);
00639 
00640         return result;
00641     }
00642 }
00643 
00644 
00687 
00688 
00689 
00690 
00691 
00692 
00693 
00694 template <typename ExtIterator_, typename StrictWeakOrdering_>
00695 void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M)
00696 {
00697     sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00698 
00699     typedef simple_vector<sort_helper::trigger_entry<typename ExtIterator_::block_type> > run_type;
00700 
00701     typedef typename ExtIterator_::vector_type::value_type value_type;
00702     typedef typename ExtIterator_::block_type block_type;
00703 
00704     unsigned_type n = 0;
00705 
00706     block_manager * mng = block_manager::get_instance();
00707 
00708     first.flush();
00709 
00710     if ((last - first) * sizeof(value_type) * sort_memory_usage_factor() < M)
00711     {
00712         stl_in_memory_sort(first, last, cmp);
00713     }
00714     else
00715     {
00716         if (!(2 * block_type::raw_size * sort_memory_usage_factor() <= M)) {
00717             throw bad_parameter("stxxl::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
00718         }
00719 
00720         if (first.block_offset())
00721         {
00722             if (last.block_offset())              // first and last element are
00723             // not the first elements of their block
00724             {
00725                 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00726                 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00727                 typename ExtIterator_::bid_type first_bid, last_bid;
00728                 request_ptr req;
00729 
00730                 req = first_block->read(*first.bid());
00731                 mng->new_block(FR(), first_bid);                // try to overlap
00732                 mng->new_block(FR(), last_bid);
00733                 req->wait();
00734 
00735 
00736                 req = last_block->read(*last.bid());
00737 
00738                 unsigned_type i = 0;
00739                 for ( ; i < first.block_offset(); ++i)
00740                 {
00741                     first_block->elem[i] = cmp.min_value();
00742                 }
00743 
00744                 req->wait();
00745 
00746 
00747                 req = first_block->write(first_bid);
00748                 for (i = last.block_offset(); i < block_type::size; ++i)
00749                 {
00750                     last_block->elem[i] = cmp.max_value();
00751                 }
00752 
00753                 req->wait();
00754 
00755 
00756                 req = last_block->write(last_bid);
00757 
00758                 n = last.bid() - first.bid() + 1;
00759 
00760                 std::swap(first_bid, *first.bid());
00761                 std::swap(last_bid, *last.bid());
00762 
00763                 req->wait();
00764 
00765 
00766                 delete first_block;
00767                 delete last_block;
00768 
00769                 run_type * out =
00770                     sort_local::sort_blocks<
00771                         typename ExtIterator_::block_type,
00772                         typename ExtIterator_::vector_type::alloc_strategy_type,
00773                         typename ExtIterator_::bids_container_iterator>
00774                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00775 
00776 
00777                 first_block = new typename ExtIterator_::block_type;
00778                 last_block = new typename ExtIterator_::block_type;
00779                 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00780                 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
00781                 request_ptr * reqs = new request_ptr[2];
00782 
00783                 reqs[0] = first_block->read(first_bid);
00784                 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00785 
00786                 reqs[0]->wait();
00787                 reqs[1]->wait();
00788 
00789                 reqs[0] = last_block->read(last_bid);
00790                 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
00791 
00792                 for (i = first.block_offset(); i < block_type::size; i++)
00793                 {
00794                     first_block->elem[i] = sorted_first_block->elem[i];
00795                 }
00796 
00797                 reqs[0]->wait();
00798                 reqs[1]->wait();
00799 
00800                 req = first_block->write(first_bid);
00801 
00802                 for (i = 0; i < last.block_offset(); ++i)
00803                 {
00804                     last_block->elem[i] = sorted_last_block->elem[i];
00805                 }
00806 
00807                 req->wait();
00808 
00809                 req = last_block->write(last_bid);
00810 
00811                 mng->delete_block(out->begin()->bid);
00812                 mng->delete_block((*out)[out->size() - 1].bid);
00813 
00814                 *first.bid() = first_bid;
00815                 *last.bid() = last_bid;
00816 
00817                 typename run_type::iterator it = out->begin();
00818                 ++it;
00819                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00820                 ++cur_bid;
00821 
00822                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00823                 {
00824                     *cur_bid = (*it).bid;
00825                 }
00826 
00827                 delete first_block;
00828                 delete sorted_first_block;
00829                 delete sorted_last_block;
00830                 delete[] reqs;
00831                 delete out;
00832 
00833                 req->wait();
00834 
00835 
00836                 delete last_block;
00837             }
00838             else
00839             {
00840                 // first element is
00841                 // not the first element of its block
00842                 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00843                 typename ExtIterator_::bid_type first_bid;
00844                 request_ptr req;
00845 
00846                 req = first_block->read(*first.bid());
00847                 mng->new_block(FR(), first_bid);                // try to overlap
00848                 req->wait();
00849 
00850 
00851                 unsigned_type i = 0;
00852                 for ( ; i < first.block_offset(); ++i)
00853                 {
00854                     first_block->elem[i] = cmp.min_value();
00855                 }
00856 
00857                 req = first_block->write(first_bid);
00858 
00859                 n = last.bid() - first.bid();
00860 
00861                 std::swap(first_bid, *first.bid());
00862 
00863                 req->wait();
00864 
00865 
00866                 delete first_block;
00867 
00868                 run_type * out =
00869                     sort_local::sort_blocks<
00870                         typename ExtIterator_::block_type,
00871                         typename ExtIterator_::vector_type::alloc_strategy_type,
00872                         typename ExtIterator_::bids_container_iterator>
00873                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00874 
00875 
00876                 first_block = new typename ExtIterator_::block_type;
00877 
00878                 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00879 
00880                 request_ptr * reqs = new request_ptr[2];
00881 
00882                 reqs[0] = first_block->read(first_bid);
00883                 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00884 
00885                 reqs[0]->wait();
00886                 reqs[1]->wait();
00887 
00888                 for (i = first.block_offset(); i < block_type::size; ++i)
00889                 {
00890                     first_block->elem[i] = sorted_first_block->elem[i];
00891                 }
00892 
00893                 req = first_block->write(first_bid);
00894 
00895                 mng->delete_block(out->begin()->bid);
00896 
00897                 *first.bid() = first_bid;
00898 
00899                 typename run_type::iterator it = out->begin();
00900                 ++it;
00901                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00902                 ++cur_bid;
00903 
00904                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00905                 {
00906                     *cur_bid = (*it).bid;
00907                 }
00908 
00909                 *cur_bid = (*it).bid;
00910 
00911                 delete sorted_first_block;
00912                 delete[] reqs;
00913                 delete out;
00914 
00915                 req->wait();
00916 
00917                 delete first_block;
00918             }
00919         }
00920         else
00921         {
00922             if (last.block_offset())            // last is
00923             // not the first element of its block
00924             {
00925                 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00926                 typename ExtIterator_::bid_type last_bid;
00927                 request_ptr req;
00928                 unsigned_type i;
00929 
00930                 req = last_block->read(*last.bid());
00931                 mng->new_block(FR(), last_bid);
00932                 req->wait();
00933 
00934 
00935                 for (i = last.block_offset(); i < block_type::size; ++i)
00936                 {
00937                     last_block->elem[i] = cmp.max_value();
00938                 }
00939 
00940                 req = last_block->write(last_bid);
00941 
00942                 n = last.bid() - first.bid() + 1;
00943 
00944                 std::swap(last_bid, *last.bid());
00945 
00946                 req->wait();
00947 
00948 
00949                 delete last_block;
00950 
00951                 run_type * out =
00952                     sort_local::sort_blocks<
00953                         typename ExtIterator_::block_type,
00954                         typename ExtIterator_::vector_type::alloc_strategy_type,
00955                         typename ExtIterator_::bids_container_iterator>
00956                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00957 
00958 
00959                 last_block = new typename ExtIterator_::block_type;
00960                 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
00961                 request_ptr * reqs = new request_ptr[2];
00962 
00963                 reqs[0] = last_block->read(last_bid);
00964                 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
00965 
00966                 reqs[0]->wait();
00967                 reqs[1]->wait();
00968 
00969                 for (i = 0; i < last.block_offset(); ++i)
00970                 {
00971                     last_block->elem[i] = sorted_last_block->elem[i];
00972                 }
00973 
00974                 req = last_block->write(last_bid);
00975 
00976                 mng->delete_block((*out)[out->size() - 1].bid);
00977 
00978                 *last.bid() = last_bid;
00979 
00980                 typename run_type::iterator it = out->begin();
00981                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00982 
00983                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00984                 {
00985                     *cur_bid = (*it).bid;
00986                 }
00987 
00988                 delete sorted_last_block;
00989                 delete[] reqs;
00990                 delete out;
00991 
00992                 req->wait();
00993 
00994                 delete last_block;
00995             }
00996             else
00997             {
00998                 // first and last element are first elements of their of blocks
00999                 n = last.bid() - first.bid();
01000 
01001                 run_type * out =
01002                     sort_local::sort_blocks<typename ExtIterator_::block_type,
01003                                             typename ExtIterator_::vector_type::alloc_strategy_type,
01004                                             typename ExtIterator_::bids_container_iterator>
01005                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
01006 
01007                 typename run_type::iterator it = out->begin();
01008                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
01009 
01010                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
01011                 {
01012                     *cur_bid = (*it).bid;
01013                 }
01014 
01015                 delete out;
01016             }
01017         }
01018     }
01019 
01020 #if STXXL_CHECK_ORDER_IN_SORTS
01021     assert(stxxl::is_sorted(first, last, cmp));
01022 #endif
01023 }
01024 
01026 
01027 __STXXL_END_NAMESPACE
01028 
01029 #endif // !STXXL_SORT_HEADER
01030 // vim: et:ts=4:sw=4

Generated on Sun Oct 17 2010 06:13:44 for Stxxl by  doxygen 1.7.1