TPIE

v1.1rc1-6-g0c97303
sort_manager.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2008, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
27 
28 #ifndef _TPIE_AMI_SORT_MANAGER_H
29 #define _TPIE_AMI_SORT_MANAGER_H
30 
31 // Get definitions for working with Unix and Windows
32 #include <tpie/portability.h>
33 #include <tpie/stream.h>
34 #include <tpie/tempname.h>
35 #include <tpie/array.h>
36 #include <tpie/merge.h>
37 #include <tpie/merge_sorted_runs.h>
38 #include <tpie/mergeheap.h> //For templated heaps
39 #include <tpie/internal_sort.h> // Contains classes for sorting internal runs
40 // using different comparison types
41 #include <cmath> //for log, ceil, etc.
42 #include <string>
43 #include <boost/filesystem.hpp>
44 
46 
47 #include <tpie/tpie_assert.h>
48 
49 namespace tpie {
50 
56 
57 template <class T, class I, class M>
58 class sort_manager {
59 
60 public:
61  sort_manager(I* isort, M* mheap);
62 
63  ~sort_manager() {
64  // No code in this destructor.
65  };
66 
70  void sort(file_stream<T>* in, file_stream<T>* out,
71  progress_indicator_base* indicator = NULL);
72 
77  void sort(file_stream<T>* in, progress_indicator_base* indicator = NULL);
78 
79 private:
80  // *************
81  // * Functions *
82  // *************
83 
84  void start_sort(); // high level wrapper to full sort
85  void compute_sort_params(); // compute nInputItems, mrgArity, nRuns
86  void partition_and_sort_runs(progress_indicator_base* indicator, tpie::array<temp_file> & temporaries); // make initial sorted runs
87  void merge_to_output(progress_indicator_base* indicator, tpie::array<temp_file> & temporaries); // loop over merge tree, create output stream
88  // Merge a single group mrgArity streams to an output stream
89  void single_merge(
90  typename tpie::array<tpie::auto_ptr<file_stream<T> > >::iterator,
91  typename tpie::array<tpie::auto_ptr<file_stream<T> > >::iterator,
92  file_stream<T>*, TPIE_OS_OFFSET = -1, progress_indicator_base* indicator=0);
93 
94  // **************
95  // * Attributes *
96  // **************
97 
98  I* m_internalSorter; // Method for sorting runs in memory
99  M* m_mergeHeap; // Merge heap implementation
100  file_stream<T>* inStream;
101  file_stream<T>* outStream;
102  TPIE_OS_OFFSET nInputItems; // Number of items in inStream;
103  TPIE_OS_SIZE_T mmBytesAvail; // Amount of spare memory we can use
104  TPIE_OS_SIZE_T mmBytesPerStream; // Memory consumed by each Stream obj
105 
106  progress_indicator_base* m_indicator; // pointer to progress indicator
107 
108  TPIE_OS_OFFSET progCount; //counter for showing progress
109 
110  bool use2xSpace; //flag to indicate if we are doing a 2x sort
111 
112  // The maximum number of stream items of type T that we can
113  // sort in internal memory
114  TPIE_OS_SIZE_T nItemsPerRun;
115 
116  TPIE_OS_OFFSET nRuns; //The number of sorted runs left to merge
117  arity_t mrgArity; //Max runs we can merge at one time
118 
119  // The output stream to which we are currently writing runs
120  file_stream<T>* curOutputRunStream;
121 
122  // The mininum number of runs in each output stream
123  // some streams can have one additional run
124  TPIE_OS_OFFSET minRunsPerStream;
125  // The number of extra runs or the number of streams that
126  // get one additional run.
127  arity_t nXtraRuns;
128 
129  // The last run can have fewer than nItemsPerRun;
130  TPIE_OS_SIZE_T nItemsInLastRun;
131  // How many items we will sort in a given run
132  TPIE_OS_SIZE_T nItemsInThisRun;
133  // For each output stream, how many runs it should get
134  TPIE_OS_OFFSET runsInStream;
135 
136  // A buffer for building the output file names
137  std::string newName;
138 
139  //prefix of temp files created during sort
140  std::string working_disk;
141 
142 private:
143  sort_manager(const sort_manager<T,I,M>& other);
144  sort_manager<T,I,M>& operator=(const sort_manager<T,I,M>& other);
145 };
146 
147 template <class T, class I, class M>
148 sort_manager<T, I, M>::sort_manager(I* isort, M* mheap):
149  m_internalSorter(isort),
150  m_mergeHeap(mheap),
151  inStream(0),
152  outStream(0),
153  nInputItems(0),
154  mmBytesAvail(0),
155  mmBytesPerStream(0),
156  m_indicator(NULL),
157  progCount(0),
158  use2xSpace(false),
159  nItemsPerRun(0),
160  nRuns(0),
161  mrgArity(0),
162  curOutputRunStream(NULL),
163  minRunsPerStream(0),
164  nXtraRuns(0),
165  nItemsInLastRun(0),
166  nItemsInThisRun(0),
167  runsInStream(0) {
168 
169  // Prefix of temp files created during sort
170  working_disk = std::string(tempname::tpie_name("sort"));
171 };
172 
173 template<class T, class I, class M>
175  progress_indicator_base* indicator){
176  m_indicator = indicator;
177 
178  // if the input and output stream are the same, we only use 2x space.
179  // otherwise, we need 3x space. (input, current temp runs, output runs)
180  use2xSpace = (in == out);
181 
182  inStream = in;
183  outStream = out;
184 
185  // Basic checks that input is ok
186  if (in==NULL || out==NULL) {
187  if (m_indicator) {m_indicator->init(1); m_indicator->step(); m_indicator->done();}
188  throw exception("NULL_POINTER");
189  }
190 
191  if (inStream->size() < 2) {
192  if (m_indicator) {m_indicator->init(1); m_indicator->step(); m_indicator->done();}
193  in->seek(0);
194  if (in != out) {
195  out->seek(0);
196  if (in->size() == 1)
197  out->write(in->read());
198  }
199  return;
200  }
201 
202  // Else, there is something to sort, do it
203  start_sort();
204 }
205 
206 template<class T, class I, class M>
208  sort(in, in, indicator);
209 }
210 
211 template<class T, class I, class M>
213 
214  // ********************************************************************
215  // * PHASE 1: See if we can sort the entire stream in internal memory *
216  // * without the need to use general merge sort *
217  // ********************************************************************
218 
219  // Figure out how much memory we've got to work with.
220  mmBytesAvail = consecutive_memory_available();
221 
222  // Space for internal buffers for the input and output stream may not
223  // have been allocated yet. Query the space usage and subtract.
224  mmBytesPerStream = file_stream<T>::memory_usage(1);
225 
226  // This is how much we can use for internal sort if
227  // we are not doing general merge sort
228  mmBytesAvail -= 2 * mmBytesPerStream;
229 
230  // Check if all input items can be sorted internally using less than
231  // mmBytesAvail
232  nInputItems = inStream->size();
233 
234  inStream->seek (0);
235 
236  if (nInputItems < TPIE_OS_OFFSET(m_internalSorter->MaxItemCount(mmBytesAvail))) {
237 
238  fractional_progress fp(m_indicator);
239  fp.id() << __FILE__ << __FUNCTION__ << "internal_sort" << typeid(T) << typeid(I) << typeid(M);
240  fractional_subindicator allocate_progress(fp, "allocate", TPIE_FSI, nInputItems, "Allocating");
241  fractional_subindicator sort_progress(fp, "sort", TPIE_FSI, nInputItems);
242  fp.init();
243  allocate_progress.init(nInputItems);
244  m_internalSorter->allocate(static_cast<TPIE_OS_SIZE_T>(nInputItems));
245  allocate_progress.done();
246 
247  // load the items into main memory, sort, and write to output.
248  // m_internalSorter also checks if inStream/outStream are the same and
249  // truncates/rewrites inStream if they are. This probably should not
250  // be the job of m_internalSorter-> TODO: build a cleaner interface
251  m_internalSorter->sort(inStream,
252  outStream,
253  static_cast<TPIE_OS_SIZE_T>(nInputItems),
254  &sort_progress);
255  // de-allocate the internal array of items
256  m_internalSorter->deallocate();
257  fp.done();
258  return;
259  }
260 
261  // ******************************************************************
262  // * Input stream too large for main memory, use general merge sort *
263  // ******************************************************************
264 
265  // PHASE 2: compute nItemsPerRun, nItemsPerRun, nRuns
266  compute_sort_params();
267 
268  // ********************************************************************
269  // * By this point we have checked that we have valid input, checked *
270  // * that we indeed need an external memory sort, verified that we *
271  // * have enough memory to partition and at least do a binary merge. *
272  // * Also checked that we have enough file descriptors to merge, *
273  // * and calculated the mrgArity and nItemsPerRun given memory *
274  // * constraints. We have also calculated nRuns for the initial *
275  // * number of runs we will partition into. Let's sort! *
276  // ********************************************************************
277 
278  // ********************************************************************
279  // * WARNING: Since we accounted for all known memory usage in PHASE 2*
280  // * be very wary of memory allocation via "new" or constructors from *
281  // * this point on and make sure it was accounted for in PHASE 2 *
282  // ********************************************************************
283  fractional_progress fp(m_indicator);
284  fp.id() << __FILE__ << __FUNCTION__ << "external_sort" << typeid(T) << typeid(I) << typeid(M);
285  fractional_subindicator run_progress(fp, "run", TPIE_FSI, nInputItems,"",tpie::IMPORTANCE_LOG);
286  fractional_subindicator merge_progress(fp, "merge", TPIE_FSI, nInputItems,"",tpie::IMPORTANCE_LOG);
287  fp.init();
288 
289  tpie::array<temp_file> temporaries(mrgArity*2);
290 
291  // PHASE 3: partition and form sorted runs
292  TP_LOG_DEBUG_ID ("Beginning general merge sort.");
293  partition_and_sort_runs(&run_progress, temporaries);
294  // PHASE 4: merge sorted runs to a single output stream
295  merge_to_output(&merge_progress, temporaries);
296 
297  fp.done();
298 }
299 
300 template<class T, class I, class M>
301 void sort_manager<T,I,M>::compute_sort_params(void){
302  // ********************************************************************
303  // * PHASE 2: Compute/check limits *
304  // * Compute the maximum number of items we can sort in main memory *
305  // * and the maximium number of sorted runs we can merge at one time *
306  // * Before doing any sorting, check that we can fit at least one item*
307  // * in internal memory for sorting and that we can merge at least two*
308  // * runs at at time *
309  // * *
310  // * Memory needed for the run formation phase: *
311  // * 2*mmBytesPerStream + {for input/output streams} *
312  // * nItemsPerRun*space_per_sort_item() + {for each item sorted } *
313  // * space_overhead_sort() {constant overhead in *
314  // * sort management object *
315  // * during sorting } *
316  // * *
317  // * Memory needed for a D-way merge: *
318  // * Cost per merge stream: *
319  // * mmBytesPerStream+ {a open stream to read from} *
320  // * space_per_merge_item()+ {used in internal merge heap} *
321  // * sizeof(T*)+sizeof(off_t) {arrays in single_merge()} *
322  // * sizeof(stream<T>*) {array element that points to *
323  // * merge stream} *
324  // * Fixed costs: *
325  // * 2*mmBytesPerStream+ {original input stream + output *
326  // * of current merge} *
327  // * space_overhead_merge()+ {fixed dynamic memory costs of *
328  // * merge heap} *
329  // * 3*space_overhead() {overhead per "new" memory request *
330  // * for allocating array of streams *
331  // * in merge_to_output and two arrays *
332  // * in single_merge} *
333  // * *
334  // * Total cost for D-way Merge: *
335  // * D*(Cost per merge stream)+(Fixed costs) *
336  // * *
337  // * Any additional memory requests that call "new" directly or *
338  // * indirectly should be documented and accounted for in this phase *
339  // ********************************************************************
340 
341  TP_LOG_DEBUG_ID ("Computing merge sort parameters.");
342 
343  TPIE_OS_SIZE_T mmBytesAvailSort; // Bytes available for sorting
344 
345  TP_LOG_DEBUG ("Each object of size " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(sizeof(T)) << " uses "
346  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(m_internalSorter->space_per_item ()) << " bytes "
347  << "for sorting in memory\n");
348 
349  // Subtract off size of temp output stream
350  // The size of the input stream was already subtracted from
351  // mmBytesAvail
352  mmBytesAvailSort=mmBytesAvail - mmBytesPerStream;
353 
354  nItemsPerRun=m_internalSorter->MaxItemCount(mmBytesAvailSort);
355 
356  if(nItemsPerRun<1){
357  throw stream_exception("Insufficient Memory for forming sorted runs");
358  }
359 
360  // Now we know the max number of Items we can sort in a single
361  // internal memory run. Next, compute the number of runs we can
362  // merge together at one time
363 
364  TPIE_OS_SIZE_T mmBytesPerMergeItem = mmBytesPerStream +
365  m_mergeHeap->space_per_item() + sizeof(T*) +
366  sizeof(TPIE_OS_OFFSET)+sizeof(ami::stream<T>*);
367 
368  // Fixed cost of mergheap impl. + MM_manager overhead of allocating
369  // an array of stream<T> ptrs (pending)
370  // cost of Input stream already accounted for in mmBytesAvail..
371  TPIE_OS_SIZE_T mmBytesFixedForMerge = m_mergeHeap->space_overhead() +
372  mmBytesPerStream;
373 
374  if (mmBytesFixedForMerge > mmBytesAvail) {
375  throw stream_exception("Insufficient memory for merge heap and output stream");
376  }
377 
378  // Cast down from TPIE_OS_OFFSET (type of mmBytesAvail).
379  // mmBytesPerMergeItem is at least 1KB, so we are OK unless we
380  // have more than 2 TerraBytes of memory, assuming 64 bit
381  // (or smaller) TPIE_OS_OFFSETS. I look forward to the day
382  // this comment seems silly and wrong
383  mrgArity = static_cast<arity_t>(mmBytesAvail-mmBytesFixedForMerge) /
384  mmBytesPerMergeItem;
385  TP_LOG_DEBUG("mem avail=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mmBytesAvail-mmBytesFixedForMerge)
386  << " bytes per merge item=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mmBytesPerMergeItem)
387  << " initial mrgArity=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) << "\n");
388 
389  // Need to support at least binary merge
390  if(mrgArity < 2) {
391  throw stream_exception("Merge arity < 2 -- Insufficient memory for a merge.");
392  }
393 
394  // Make sure that the AMI is willing to provide us with the
395  // number of substreams we want. It may not be able to due to
396  // operating system restrictions, such as on the number of regions
397  // that can be mmap()ed in, max number of file descriptors, etc.
398  int availableStreams = static_cast<int>(available_files());
399 
400  // Merging requires an available stream/file decriptor for
401  // each of the mrgArity input strems. We need one additional file descriptor
402  // for the output of the current merge, so binary merge requires
403  // three available streams.
404  if (availableStreams < 3) {
405  throw stream_exception("Not enough stream descriptors available to perform merge.");
406  }
407 
408  // Can at least do binary merge. See if availableStreams limits
409  // maximum mrgArity
410  // Due to the previous test, we know that available_streams >= 3.
411  if (mrgArity > static_cast<arity_t>(availableStreams - 1)) {
412 
413  mrgArity = static_cast<arity_t>(availableStreams - 1);
414 
415  TP_LOG_DEBUG_ID ("Reduced merge arity due to AMI restrictions.");
416  }
417 
418  // The number of memory-sized runs that the original input stream
419  // will be partitioned into.
420  nRuns = ((nInputItems + nItemsPerRun - 1) / nItemsPerRun);
421 
422 #ifdef TPIE_SORT_SMALL_MRGARITY
423  // KEEP OUT!!!
424  // This should not be done by the typical user and is only for
425  // testing/debugging purposes. ONLY define this flag and set a value
426  // if you know what you are doing.
427  TP_LOG_WARNING_ID("TPIE_SORT_SMALL_MRGARITY flag is set."
428  " Did you mean to do this?");
429  if(mrgArity > TPIE_SORT_SMALL_MRGARITY) {
430  TP_LOG_WARNING_ID("Reducing merge arity due to compiler specified flag");
431  mrgArity=TPIE_SORT_SMALL_MRGARITY;
432  }
433 #endif // TPIE_SORT_SMALL_MRGARITY
434 
435 #ifdef TPIE_SORT_SMALL_RUNSIZE
436  // KEEP OUT!!!
437  // This should not be done by the typical user and is only for
438  // testing/debugging purposes ONLY define this flag and set a value
439  // if you know what you are doing.
440  TP_LOG_WARNING_ID("TPIE_SORT_SMALL_RUNSIZE flag is set."
441  " Did you mean to do this?");
442  if(nItemsPerRun > TPIE_SORT_SMALL_RUNSIZE) {
443  TP_LOG_WARNING_ID("Reducing run size due to compiler specified flag");
444  nItemsPerRun=TPIE_SORT_SMALL_RUNSIZE;
445  }
446 
447  // need to adjust nRuns
448  nRuns = ((nInputItems + nItemsPerRun - 1) / nItemsPerRun);
449 #endif // TPIE_SORT_SMALL_RUNSIZE
450 
451  //#define MINIMIZE_INITIAL_RUN_LENGTH
452 #ifdef MINIMIZE_INITIAL_RUN_LENGTH
453  // If compiled with the above flag, try to reduce the length of
454  // the initial sorted runs without increasing the merge tree height
455  // This could be a speed-up if it is faster to quicksort many small
456  // runs and merge them than it is to quicksort fewer long
457  // runs and merge them.
458  TP_LOG_DEBUG_ID ("Minimizing initial run lengths without increasing" <<
459  " the height of the merge tree.");
460 
461  // The tree height is the ceiling of the log base mrgArity of the
462  // number of original runs.
463  double tree_height = log((double)nRuns) / log((double)mrgArity);
464  tp_assert (tree_height > 0, "Negative or zero tree height!");
465  tree_height = ceil (tree_height);
466 
467  // See how many runs we could possibly fit in the tree without
468  // increasing the height.
469  double maxOrigRuns = pow ((double) mrgArity, tree_height);
470  tp_assert (maxOrigRuns >= nRuns "Number of permitted runs was reduced!");
471 
472  // How big will such runs be?
473  double new_nItemsPerRun = ceil (nInputItems/ maxOrigRuns);
474  tp_assert (new_nItemsPerRun <= nItemsPerRun,
475  "Size of original runs increased!");
476 
477  // Update the number of items per run and the number of original runs
478  nItemsPerRun = (TPIE_OS_SIZE_T) new_nItemsPerRun;
479 
480  TP_LOG_DEBUG_ID ("With long internal memory runs, nRuns = "
481  << nRuns);
482 
483  nRuns = (nInputItems + nItemsPerRun - 1) / nItemsPerRun;
484 
485  TP_LOG_DEBUG_ID ("With shorter internal memory runs "
486  << "and the same merge tree height, nRuns = "
487  << nRuns );
488 
489  tp_assert (maxOrigRuns >= nRuns,
490  "We increased the merge height when we weren't supposed to do so!");
491 #endif // MINIMIZE_INITIAL_SUBSTREAM_LENGTH
492 
493 
494  // If we have just a few runs, we don't need the
495  // full mrgArity. This is the last change to mrgArity
496  // N.B. We need to "up"-cast mrgArity here!
497  if(static_cast<TPIE_OS_OFFSET>(mrgArity)>nRuns){
498  // N.B. We know that nRuns is small, so
499  // it is safr to downcast.
500  mrgArity=static_cast<TPIE_OS_SIZE_T>(nRuns);
501  }
502 
503  // We should always end up with at least two runs
504  // otherwise why are we doing it externally?
505  tp_assert (nRuns > 1, "Less than two runs to merge!");
506  // Check that numbers are consistent with input size
507  tp_assert (nRuns * nItemsPerRun - nInputItems < nItemsPerRun,
508  "Total expected output size is too large.");
509  tp_assert (nInputItems - (nRuns - 1) * nItemsPerRun <= nItemsPerRun,
510  "Total expected output size is too small.");
511 
512  TP_LOG_DEBUG_ID ("Input stream has " << nInputItems << " items");
513  TP_LOG_DEBUG ("Max number of items per runs " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(nItemsPerRun) );
514  TP_LOG_DEBUG ("\nInitial number of runs " << nRuns );
515  TP_LOG_DEBUG ("\nMerge arity is " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) << "\n" );
516 }
517 
518 template<class T, class I, class M>
519 void sort_manager<T,I,M>::partition_and_sort_runs(progress_indicator_base* indicator, tpie::array<temp_file> & temporaries){
520  // ********************************************************************
521  // * PHASE 3: Partition *
522  // * Partition the input stream into nRuns of at most nItemsPerRun *
523  // * and sort them, and write them to temporay output files. *
524  // * The last run may have fewer than nItemsPerRun. To keep the number*
525  // * of files down and to support sequential I/O, we distribute the *
526  // * nRuns evenly across mrgArity files, thus each file on disk holds *
527  // * multiple sorted runs. *
528  // ********************************************************************
529 
530  // The mininum number of runs in each output stream
531  // some streams can have one additional run
532  minRunsPerStream = nRuns/mrgArity;
533  // The number of extra runs or the number of streams that
534  // get one additional run. This is less than mrgArity and
535  // it is OK to downcast to an arity_t.
536  nXtraRuns = static_cast<arity_t>(nRuns - minRunsPerStream*mrgArity);
537  tp_assert(nXtraRuns<mrgArity, "Too many extra runs");
538 
539  // The last run can have fewer than nItemsPerRun;
540  // general case
541  nItemsInLastRun = static_cast<TPIE_OS_SIZE_T>(nInputItems % nItemsPerRun);
542  if(nItemsInLastRun==0){
543  // Input size is an exact multiple of nItemsPerStream
544  nItemsInLastRun=nItemsPerRun;
545  }
546 
547  // Initialize memory for the internal memory runs
548  // accounted for in phase 2: (nItemsPerRun*size_of_sort_item) +
549  // space_overhead_sort
550  m_internalSorter->allocate(nItemsPerRun);
551 
552  TP_LOG_DEBUG_ID ("Partitioning and forming sorted runs.");
553 
554  // nItemsPerRun except for last run.
555  nItemsInThisRun=nItemsPerRun;
556 
557  // Rewind the input stream, we are about to begin
558  inStream->seek(0);
559 
560  // ********************************************************************
561  // * Partition and make initial sorted runs *
562  // ********************************************************************
563  TPIE_OS_OFFSET check_size = 0; //for debugging
564 
565  if (indicator)
566  indicator->init(nRuns*1000);
567 
568  for(arity_t ii=0; ii<mrgArity; ii++){ //For each output stream
569  // Dynamically allocate the stream
570  // We account for these mmBytesPerStream in phase 2 (output stream)
571  curOutputRunStream = tpie_new<file_stream<T> >();
572  curOutputRunStream->open(temporaries[ii], access_write);
573 
574  // How many runs should this stream get?
575  // extra runs go in the LAST nXtraRuns streams so that
576  // the one short run is always in the LAST output stream
577  runsInStream = minRunsPerStream + ((ii >= mrgArity-nXtraRuns)?1:0);
578 
579  for(TPIE_OS_OFFSET jj=0; jj < runsInStream; jj++ ) { // For each run in this stream
580  // See if this is the last run
581  if( (ii==mrgArity-1) && (jj==runsInStream-1)) {
582  nItemsInThisRun=nItemsInLastRun;
583  }
584 
585  progress_indicator_subindicator sort_indicator(indicator, 1000);
586  m_internalSorter->sort(inStream, curOutputRunStream,
587  nItemsInThisRun, &sort_indicator);
588  } // For each run in this stream
589 
590  // All runs created for this stream, clean up
591  TP_LOG_DEBUG_ID ("Wrote " << runsInStream << " runs and "
592  << curOutputRunStream->size() << " items to file "
593  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii));
594  check_size+=curOutputRunStream->size();
595  tpie_delete(curOutputRunStream);
596 
597  }//For each output stream
598  tp_assert(check_size == nInputItems, "item count mismatch");
599 
600  // Done with partitioning and initial run formation
601  // free space associated with internal memory sorting
602  m_internalSorter->deallocate();
603  if(use2xSpace){
604  //recall outStream/inStream point to same file in this case
605  inStream->truncate(0); //free up disk space
606  inStream->seek(0);
607  }
608  if (indicator) indicator->done();
609 }
610 
611 template<class T, class I, class M>
612 void sort_manager<T,I,M>::merge_to_output(progress_indicator_base* indicator, tpie::array<temp_file> & temporaries){
613  // ********************************************************************
614  // * PHASE 4: Merge *
615  // * Loop over all levels of the merge tree, reading mrgArity runs *
616  // * at a time from the streams at the current level and distributing *
617  // * merged runs over mrgArity output streams one level up, until *
618  // * a single output stream exists *
619  // ********************************************************************
620 
621  // The input streams we from which will read sorted runs
622  // This Memory allocation accounted for in phase 2:
623  // mrgArity*sizeof(stream<T>*) + space_overhead()[fixed cost]
624  tpie::array<tpie::auto_ptr<file_stream<T> > > mergeInputStreams(mrgArity);
625 
626  TP_LOG_DEBUG_ID("Allocated " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(sizeof(ami::stream<T>*)*mrgArity)
627  << " bytes for " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) << " merge input stream pointers.\n"
628  << "Mem. avail. is " << consecutive_memory_available ());
629 
630  // the number of iterations the main loop has gone through,
631  // at most the height of the merge tree log_{M/B}(N/B),
632  // typically 1 or 2
633  int mrgHeight = 0;
634  int treeHeight = 0; //for progress
635  TPIE_OS_SIZE_T ii; //index vars
636  TPIE_OS_OFFSET jj; //index vars
637 
638  // This Memory allocation accounted for in phase 2:
639  // mrgArity*space_per_merge_item
640  m_mergeHeap->allocate( mrgArity ); //Allocate mem for mergeheap
641 
642  // *****************************************************************
643  // * *
644  // * The main loop. At the outermost level we are looping over *
645  // * levels of the merge tree. Typically this will be very small, *
646  // * e.g. 1-3. The final merge pass is handled outside the loop. *
647  // * Future extension may want to do something special in the last *
648  // * merge *
649  // * *
650  // *****************************************************************
651 
652  if (indicator) {
653  //compute merge depth, number of passes over data
654  treeHeight= static_cast<int>(ceil(log(static_cast<float>(nRuns)) /
655  log(static_cast<float>(mrgArity))));
656 
657  indicator->set_range( nInputItems * treeHeight);
658  indicator->init();
659  }
660 
661  //nRuns is initially the number of runs we formed in partition_and_sort
662  //phase. nXtraRuns is initially the number of outputs streams that
663  //contain one extra run. Runs and nXtraRuns are updated as we
664  //complete a merge level.
665  while (nRuns > TPIE_OS_OFFSET(mrgArity)) {
666  // if (m_indicator) {
667  // std::string description;
668  // std::stringstream buf;
669  // buf << "Merge pass " << mrgHeight+1 << " of " << treeHeight << " ";
670  // buf >> description;
671  // m_indicator->set_percentage_range(0, nInputItems);
672  // m_indicator->init(description);
673  // }
674 
675  // We are not yet at the top of the merge tree
676  // Write merged runs to temporary output streams
677  TP_LOG_DEBUG ("Intermediate merge. level="<<mrgHeight << "\n");
678 
679  // The number of output runs we will form after a mrgArity merge
680  nRuns = (nRuns + mrgArity - 1)/mrgArity;
681 
682  // Distribute the new nRuns evenly across mrgArity (or fewer)
683  // output streams
684  minRunsPerStream = nRuns/mrgArity;
685 
686  // We may have less mrgArity input runs for the last
687  // merged output run if the current set of merge streams has
688  // xtra runs
689  arity_t mergeRunsInLastOutputRun=(nXtraRuns>0) ? nXtraRuns : mrgArity;
690 
691  // The number of extra runs or the number of output streams that
692  // get one additional run. This is less than mrgArity and
693  // it is OK to downcast to an arity_t.
694  nXtraRuns = static_cast<arity_t>(nRuns - minRunsPerStream*mrgArity);
695  tp_assert(nXtraRuns<mrgArity, "Too many extra runs");
696 
697  // How many Streams we will create at the next level
698  arity_t nOutputStreams = (minRunsPerStream > 0) ? mrgArity : nXtraRuns;
699 
700  arity_t nRunsToMerge = mrgArity; // may change for last output run
701 
702  // open the mrgArity Input streams from which to read runs
703  for(ii = 0; ii < mrgArity; ii++){
704  // Dynamically allocate the stream
705  // We account for these mmBytesPerStream in phase 2
706  // (input stream to read from)
707  file_stream<T> * stream = tpie_new<file_stream<T> >();
708  mergeInputStreams[ii].reset(stream);
709  stream->open(temporaries[mrgArity*(mrgHeight%2)+ii], access_read);
710  stream->seek(0);
711  }
712 
713  TPIE_OS_OFFSET check_size=0;
714  // For each new output stream, fill with merged runs.
715  // strange indexing is for the case that there are fewer than mrgArity
716  // output streams needed, and we use the LAST nOutputStreams. This
717  // always keeps the one possible short run in the LAST of the
718  // mrgArity output streams.
719  TP_LOG_DEBUG("Writing " << nRuns << " runs to " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(nOutputStreams)
720  << " output files.\nEach output file has at least "
721  << minRunsPerStream << " runs.\n");
722 
723  for(ii = mrgArity-nOutputStreams; ii < mrgArity; ii++){
724  // Dynamically allocate the stream
725  // We account for these mmBytesPerStream in phase 2
726  // (temp merge output stream)
727  file_stream<T> curOutputRunStream;
728  curOutputRunStream.open(temporaries[mrgArity*((mrgHeight+1)%2)+ii], access_write);
729 
730  // How many runs should this stream get?
731  // extra runs go in the LAST nXtraRuns streams so that
732  // the one short run is always in the LAST output stream
733  runsInStream = minRunsPerStream + ((ii >= mrgArity-nXtraRuns)?1:0);
734  TP_LOG_DEBUG("Writing " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(runsInStream) << " runs to output "
735  << " file " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii) << "\n");
736  for( jj=0; jj < runsInStream; jj++ ) { // For each run in this stream
737  // See if this is the last run.
738  if( (ii==mrgArity-1) && (jj==runsInStream-1)) {
739  nRunsToMerge=mergeRunsInLastOutputRun;
740  }
741  // Merge runs to curOutputRunStream
742  single_merge(mergeInputStreams.find(mrgArity-nRunsToMerge),
743  mergeInputStreams.find(mrgArity),
744  &curOutputRunStream,
745  nItemsPerRun, indicator);
746  } // For each output run in this stream
747 
748  // Commit new output stream to disk
749  TP_LOG_DEBUG("Wrote " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(runsInStream) << " runs and "
750  << curOutputRunStream.size() << " items "
751  << "to file " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii) << "\n");
752  check_size+=curOutputRunStream.size();
753  } // For each new output stream
754 
755  tp_assert(check_size==nInputItems, "item count mismatch in merge");
756  // All output streams created/filled.
757  // Clean up, go up to next level
758 
759  // Delete temp input merge streams
760  for(ii = 0; ii < mrgArity; ii++) {
761  mergeInputStreams[ii].reset();
762  temporaries[mrgArity*(mrgHeight%2)+ii].free();
763  }
764  // Update run lengths
765  nItemsPerRun=mrgArity*nItemsPerRun; //except for maybe last run
766  mrgHeight++; // moving up a level
767  } // while (nRuns > mrgArity)
768 
769  tp_assert( nRuns > 1, "Not enough runs to merge to final output");
770  tp_assert( nRuns <= TPIE_OS_OFFSET(mrgArity), "Too many runs to merge to final output");
771 
772  // We are at the last merge phase, write to specified output stream
773  // Open up the nRuns final merge streams to merge
774  // These runs are packed in the LAST nRuns elements of the array
775  // nRuns is small, so it is safe to downcast.
776  TP_LOG_DEBUG_ID ("Final merge. level="<<mrgHeight);
777  TP_LOG_DEBUG("Merge runs left="<<nRuns<<"\n");
778  for(ii = mrgArity-static_cast<TPIE_OS_SIZE_T>(nRuns); ii < mrgArity; ii++){
779  /* Dynamically allocate the stream
780  We account for these mmBytesPerStream in phase 2
781  (input stream to read from)
782  Put LAST nRuns files in FIRST nRuns spots here
783  either one of mergeInputStreams loading or the call to
784  single_merge is a little messy. I put the mess here. (abd) */
785  TP_LOG_DEBUG ("Putting merge stream "<< static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii) << " in slot "
786  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii-(mrgArity-static_cast<TPIE_OS_SIZE_T>(nRuns))) << "\n");
787  file_stream<T> * stream = tpie_new<file_stream<T> >();
788  mergeInputStreams[ii-(mrgArity-static_cast<TPIE_OS_SIZE_T>(nRuns))].reset(stream);
789  stream->open(temporaries[mrgArity*(mrgHeight%2)+ii], access_read);
790  stream->seek(0);
791  }
792 
793  // Merge last remaining runs to the output stream.
794  // mergeInputStreams is address( address (the first input stream) )
795  // N.B. nRuns is small, so it is safe to downcast.
796  single_merge(mergeInputStreams.begin(),
797  mergeInputStreams.find((size_t)nRuns),
798  outStream, -1, indicator);
799 
800  if (indicator) indicator->done();
801  tp_assert((TPIE_OS_OFFSET)outStream->size() == nInputItems, "item count mismatch");
802 
803  TP_LOG_DEBUG("merge cleanup\n");
804 
805  // Delete stream ptr arrays
806  mergeInputStreams.resize(0);
807 
808  // Deallocate the merge heap, free up memory
809  m_mergeHeap->deallocate();
810  TP_LOG_DEBUG_ID ("Number of passes incl run formation is " <<
811  mrgHeight+2 );
812  TP_LOG_DEBUG("AMI_partition_and_merge END\n");
813 }
814 
815 template<class T, class I, class M>
817  typename tpie::array<tpie::auto_ptr<file_stream<T> > >::iterator start,
818  typename tpie::array<tpie::auto_ptr<file_stream<T> > >::iterator end,
819  file_stream < T >*outStream, TPIE_OS_OFFSET cutoff, progress_indicator_base* indicator)
820 {
821 
822  merge_sorted_runs(start, end, outStream, m_mergeHeap,
823  cutoff, indicator);
824 }
825 
826 
827 } // tpie namespace
828 
829 #endif // _TPIE_AMI_SORT_MANAGER_H
Defines the tp_assert macro.
size_t consecutive_memory_available(size_t granularity=5 *1024 *1024)
Find the largest amount of memory that can be allocated as a single chunk.
The base class for indicating the progress of some task.
static std::string tpie_name(const std::string &post_base="", const std::string &dir="", const std::string &ext="")
Generate path for a new temporary file.
A generic array with a fixed size.
Definition: array.h:143
merge_sorted_runs as used in several of TPIE's merge variants
#define TPIE_FSI
For use when constructing a fractional subindicator.
const item_type & read()
Read an item from the stream.
Definition: file_stream.h:91
Fractional progress reporter.
This file contains a few deprecated definitions for legacy code.
Open a file for reading.
Definition: access_type.h:31
void write(const item_type &item)
Write an item to the stream.
Definition: file_stream.h:64
void merge_sorted_runs(typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator start, typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, M *MergeHeap, TPIE_OS_OFFSET cutoff=-1, progress_indicator_base *indicator=NULL)
This is a common merge routine for all of the AMI_merge_sorted, AMI_ptr_merge_sorted and AMI_key_merg...
err single_merge(stream< T > **instreams, arity_t arity, stream< T > *outstream, M *m_obj)
Merges arity streams in memory using a merge management object and write result into outstream...
Definition: merge.h:298
void sort(file_stream< T > *in, file_stream< T > *out, progress_indicator_base *indicator=NULL)
Sort in stream to out stream an save in stream (uses 3x space)
Definition: sort_manager.h:174
Internal sorter objects.
AMI streams.
Generic internal array with known memory requirements.
Merge heap templates.
TPIE_OS_SIZE_T arity_t
Intended to signal the number of input streams in a merge.
Definition: merge.h:59
void seek(stream_offset_type offset, offset_type whence=beginning)
Moves the logical offset in the stream.
Definition: stream_crtp.h:50
Progress indicator base.
Open a file for writing only, content is truncated.
Definition: access_type.h:33
memory_size_type available_files()
Return the additional number of files that can be opened before running out of file descriptors...
static memory_size_type memory_usage(float blockFactor=1.0, bool includeDefaultFileAccessor=true)
Calculate the amount of memory used by a single file_stream.
Definition: file_stream.h:134
Merge management objects.
Simple class acting both as file and a file::stream.
Definition: file_stream.h:44
void tpie_delete(T *p)
Delete an object allocated with tpie_new.
Definition: memory.h:380
like std::auto_ptr, but delete the object with tpie_delete.
Definition: memory.h:416
Temporary file names.
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
A class of manager objects for merge sorting objects of type T.
Definition: sort_manager.h:58
void sort(file_stream< T > &instream, file_stream< T > &outstream, Compare comp, progress_indicator_base &indicator)
Sort elements of a stream using the given STL-style comparator object.
Definition: sort.h:59
stream_size_type size() const
Get the size of the file measured in items.
Definition: stream_crtp.h:132
Subindicator for fractional progress reporting.