TPIE

v1.1rc1-6-g0c97303
merge_sorter.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_MERGE_SORTER_H__
21 #define __TPIE_PIPELINING_MERGE_SORTER_H__
22 
23 #include <tpie/pipelining/sort_parameters.h>
24 #include <tpie/pipelining/merger.h>
25 #include <tpie/pipelining/exception.h>
26 #include <tpie/dummy_progress.h>
27 #include <tpie/array_view.h>
28 
29 namespace tpie {
30 
43 template <typename T, bool UseProgress, typename pred_t = std::less<T> >
44 class merge_sorter {
45 public:
46  typedef boost::shared_ptr<merge_sorter> ptr;
48 
49  static const memory_size_type maximumFanout = 250; // arbitrary. TODO: run experiments to find threshold
50 
51  inline merge_sorter(pred_t pred = pred_t())
52  : m_state(stParameters)
53  , p()
54  , m_parametersSet(false)
55  , m_merger(pred)
56  , pred(pred)
57  , m_evacuated(false)
58  , m_finalMergeInitialized(false)
59  {
60  }
61 
66  inline void set_parameters(stream_size_type runLength, memory_size_type fanout) {
67  tp_assert(m_state == stParameters, "Merge sorting already begun");
68  p.runLength = p.internalReportThreshold = runLength;
69  p.fanout = p.finalFanout = fanout;
70  m_parametersSet = true;
71  log_debug() << "Manually set merge sort run length and fanout\n";
72  log_debug() << "Run length = " << p.runLength << " (uses memory " << (p.runLength*sizeof(T) + file_stream<T>::memory_usage()) << ")\n";
73  log_debug() << "Fanout = " << p.fanout << " (uses memory " << fanout_memory_usage(p.fanout) << ")" << std::endl;
74  }
75 
80  inline void set_available_memory(memory_size_type m) {
81  calculate_parameters(m, m, m);
82  }
83 
90  inline void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3) {
91  calculate_parameters(m1, m2, m3);
92  }
93 
94 private:
95  // set_phase_?_memory helper
96  inline void maybe_calculate_parameters() {
97  if (p.memoryPhase1 > 0 &&
98  p.memoryPhase2 > 0 &&
99  p.memoryPhase3 > 0)
100  calculate_parameters(p.memoryPhase1,
101  p.memoryPhase2,
102  p.memoryPhase3);
103  }
104 
105 public:
106  inline void set_phase_1_memory(memory_size_type m1) {
107  p.memoryPhase1 = m1;
108  maybe_calculate_parameters();
109  }
110 
111  inline void set_phase_2_memory(memory_size_type m2) {
112  p.memoryPhase2 = m2;
113  maybe_calculate_parameters();
114  }
115 
116  inline void set_phase_3_memory(memory_size_type m3) {
117  p.memoryPhase3 = m3;
118  maybe_calculate_parameters();
119  }
120 
124  inline void begin() {
125  tp_assert(m_state == stParameters, "Merge sorting already begun");
126  if (!m_parametersSet) throw merge_sort_not_ready();
127  log_debug() << "Start forming input runs" << std::endl;
128  m_currentRunItems.resize((size_t)p.runLength);
129  m_runFiles.resize(p.fanout*2);
130  m_currentRunItemCount = 0;
131  m_finishedRuns = 0;
132  m_state = stRunFormation;
133  m_itemCount = 0;
134  }
135 
139  inline void push(const T & item) {
140  tp_assert(m_state == stRunFormation, "Wrong phase");
141  if (m_currentRunItemCount >= p.runLength) {
142  sort_current_run();
143  empty_current_run();
144  }
145  m_currentRunItems[m_currentRunItemCount] = item;
146  ++m_currentRunItemCount;
147  ++m_itemCount;
148  }
149 
153  inline void end() {
154  tp_assert(m_state == stRunFormation, "Wrong phase");
155  sort_current_run();
156 
157  if (m_itemCount == 0) {
158  tp_assert(m_currentRunItemCount == 0, "m_itemCount == 0, but m_currentRunItemCount != 0");
159  m_reportInternal = true;
160  m_itemsPulled = 0;
161  m_currentRunItems.resize(0);
162  log_debug() << "Got no items. Internal reporting mode." << std::endl;
163  } else if (m_finishedRuns == 0 && m_currentRunItems.size() <= p.internalReportThreshold) {
164  // Our current buffer fits within the memory requirements of phase 2.
165  m_reportInternal = true;
166  m_itemsPulled = 0;
167  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode." << std::endl;
168 
169  } else if (m_finishedRuns == 0
170  && m_currentRunItemCount <= p.internalReportThreshold
171  && array<T>::memory_usage(m_currentRunItemCount) <= get_memory_manager().available()) {
172  // Our current buffer does not fit within the memory requirements
173  // of phase 2, but we have enough temporary memory to copy and
174  // resize the buffer.
175 
176  array<T> currentRun(array_view<T>(m_currentRunItems, 0, m_currentRunItemCount));
177  m_currentRunItems.swap(currentRun);
178 
179  m_reportInternal = true;
180  m_itemsPulled = 0;
181  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode "
182  << "after resizing item buffer." << std::endl;
183 
184  } else {
185  m_reportInternal = false;
186  empty_current_run();
187  m_currentRunItems.resize(0);
188  log_debug() << "Got " << m_finishedRuns << " runs. External reporting mode." << std::endl;
189  }
190  m_state = stMerge;
191  }
192 
197  inline void calc(typename Progress::base & pi) {
198  tp_assert(m_state == stMerge, "Wrong phase");
199  if (!m_reportInternal) {
200  prepare_pull(pi);
201  } else {
202  pi.init(1);
203  pi.step();
204  pi.done();
205  }
206  m_state = stReport;
207  }
208 
209  inline void evacuate() {
210  tp_assert(m_state == stMerge || m_state == stReport, "Wrong phase");
211  if (m_reportInternal) {
212  log_debug() << "Evacuate merge_sorter (" << this << ") in internal reporting mode" << std::endl;
213  m_reportInternal = false;
214  memory_size_type runCount = (m_currentRunItemCount > 0) ? 1 : 0;
215  empty_current_run();
216  m_currentRunItems.resize(0);
217  initialize_final_merger(0, runCount);
218  } else if (m_state == stMerge) {
219  log_debug() << "Evacuate merge_sorter (" << this << ") before merge in external reporting mode (noop)" << std::endl;
220  return;
221  }
222  log_debug() << "Evacuate merge_sorter (" << this << ") before reporting in external reporting mode" << std::endl;
223  m_merger.reset();
224  m_evacuated = true;
225  }
226 
227  inline void evacuate_before_merging() {
228  if (m_state == stMerge) evacuate();
229  }
230 
231  inline void evacuate_before_reporting() {
232  if (m_state == stReport && (!m_reportInternal || m_itemsPulled == 0)) evacuate();
233  }
234 
235 private:
237  // Phase 1 helpers.
239 
240  inline void sort_current_run() {
241  parallel_sort(m_currentRunItems.begin(), m_currentRunItems.begin()+m_currentRunItemCount, pred);
242  }
243 
244  // postcondition: m_currentRunItemCount = 0
245  inline void empty_current_run() {
246  if (m_finishedRuns < 10)
247  log_debug() << "Write " << m_currentRunItemCount << " items to run file " << m_finishedRuns << std::endl;
248  else if (m_finishedRuns == 10)
249  log_debug() << "..." << std::endl;
250  file_stream<T> fs;
251  open_run_file_write(fs, 0, m_finishedRuns);
252  for (memory_size_type i = 0; i < m_currentRunItemCount; ++i) {
253  fs.write(m_currentRunItems[i]);
254  }
255  m_currentRunItemCount = 0;
256  ++m_finishedRuns;
257  }
258 
263  inline void initialize_merger(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount) {
264  // runCount is a memory_size_type since we must be able to have that
265  // many file_streams open at the same time.
266 
267  // Open files and seek to the first item in the run.
268  array<file_stream<T> > in(runCount);
269  for (memory_size_type i = 0; i < runCount; ++i) {
270  open_run_file_read(in[i], mergeLevel, runNumber+i);
271  }
272  stream_size_type runLength = calculate_run_length(p.runLength, p.fanout, mergeLevel);
273  // Pass file streams with correct stream offsets to the merger
274  m_merger.reset(in, runLength);
275  }
276 
280  inline void initialize_final_merger(memory_size_type finalMergeLevel, memory_size_type runCount) {
281  if (m_finalMergeInitialized) {
282  reinitialize_final_merger();
283  return;
284  }
285 
286  m_finalMergeInitialized = true;
287  m_finalMergeLevel = finalMergeLevel;
288  m_finalRunCount = runCount;
289  if (runCount > p.finalFanout) {
290  log_debug() << "Run count in final level (" << runCount << ") is greater than the final fanout (" << p.finalFanout << ")\n";
291 
292  memory_size_type i = p.finalFanout-1;
293  memory_size_type n = runCount-i;
294  log_debug() << "Merge " << n << " runs starting from #" << i << std::endl;
295  dummy_progress_indicator pi;
296  m_finalMergeSpecialRunNumber = merge_runs(finalMergeLevel, i, n, pi);
297  } else {
298  log_debug() << "Run count in final level (" << runCount << ") is less or equal to the final fanout (" << p.finalFanout << ")" << std::endl;
299  m_finalMergeSpecialRunNumber = std::numeric_limits<memory_size_type>::max();
300  }
301  reinitialize_final_merger();
302  }
303 
304 public:
305  inline void reinitialize_final_merger() {
306  tp_assert(m_finalMergeInitialized, "reinitialize_final_merger while !m_finalMergeInitialized");
307  if (m_finalMergeSpecialRunNumber != std::numeric_limits<memory_size_type>::max()) {
308  array<file_stream<T> > in(p.finalFanout);
309  for (memory_size_type i = 0; i < p.finalFanout-1; ++i) {
310  open_run_file_read(in[i], m_finalMergeLevel, i);
311  log_debug() << "Run " << i << " is at offset " << in[i].offset() << " and has size " << in[i].size() << std::endl;
312  }
313  open_run_file_read(in[p.finalFanout-1], m_finalMergeLevel+1, m_finalMergeSpecialRunNumber);
314  log_debug() << "Special large run is at offset " << in[p.finalFanout-1].offset() << " and has size " << in[p.finalFanout-1].size() << std::endl;
315  stream_size_type runLength = calculate_run_length(p.runLength, p.fanout, m_finalMergeLevel+1);
316  log_debug() << "Run length " << runLength << std::endl;
317  m_merger.reset(in, runLength);
318  } else {
319  initialize_merger(m_finalMergeLevel, 0, m_finalRunCount);
320  }
321  m_evacuated = false;
322  }
323 
324 private:
328  static inline stream_size_type calculate_run_length(stream_size_type initialRunLength, memory_size_type fanout, memory_size_type mergeLevel) {
329  stream_size_type runLength = initialRunLength;
330  for (memory_size_type i = 0; i < mergeLevel; ++i) {
331  runLength *= fanout;
332  }
333  return runLength;
334  }
335 
341  template <typename ProgressIndicator>
342  inline memory_size_type merge_runs(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount, ProgressIndicator & pi) {
343  initialize_merger(mergeLevel, runNumber, runCount);
344  file_stream<T> out;
345  memory_size_type nextRunNumber = runNumber/p.fanout;
346  open_run_file_write(out, mergeLevel+1, nextRunNumber);
347  while (m_merger.can_pull()) {
348  pi.step();
349  out.write(m_merger.pull());
350  }
351  return nextRunNumber;
352  }
353 
357  inline void prepare_pull(typename Progress::base & pi) {
358  // Compute merge depth (number of passes over data).
359  int treeHeight= static_cast<int>(ceil(log(static_cast<float>(m_finishedRuns)) /
360  log(static_cast<float>(p.fanout))));
361  pi.init(item_count()*treeHeight);
362 
363  memory_size_type mergeLevel = 0;
364  memory_size_type runCount = m_finishedRuns;
365  while (runCount > p.fanout) {
366  log_debug() << "Merge " << runCount << " runs in merge level " << mergeLevel << '\n';
367  memory_size_type newRunCount = 0;
368  for (memory_size_type i = 0; i < runCount; i += p.fanout) {
369  memory_size_type n = std::min(runCount-i, p.fanout);
370 
371  if (newRunCount < 10)
372  log_debug() << "Merge " << n << " runs starting from #" << i << std::endl;
373  else if (newRunCount == 10)
374  log_debug() << "..." << std::endl;
375 
376  merge_runs(mergeLevel, i, n, pi);
377  ++newRunCount;
378  }
379  ++mergeLevel;
380  runCount = newRunCount;
381  }
382  log_debug() << "Final merge level " << mergeLevel << " has " << runCount << " runs" << std::endl;
383  initialize_final_merger(mergeLevel, runCount);
384 
385  m_state = stReport;
386  pi.done();
387  }
388 
389 public:
394  inline bool can_pull() {
395  tp_assert(m_state == stReport, "Wrong phase");
396  if (m_reportInternal) return m_itemsPulled < m_currentRunItemCount;
397  else {
398  if (m_evacuated) reinitialize_final_merger();
399  return m_merger.can_pull();
400  }
401  }
402 
406  inline T pull() {
407  tp_assert(m_state == stReport, "Wrong phase");
408  if (m_reportInternal && m_itemsPulled < m_currentRunItemCount) {
409  T el = m_currentRunItems[m_itemsPulled++];
410  if (!can_pull()) m_currentRunItems.resize(0);
411  return el;
412  } else {
413  if (m_evacuated) reinitialize_final_merger();
414  return m_merger.pull();
415  }
416  }
417 
418  inline stream_size_type item_count() {
419  return m_itemCount;
420  }
421 
422  static memory_size_type memory_usage_phase_1(const sort_parameters & params) {
423  return params.runLength * sizeof(T)
424  + file_stream<T>::memory_usage()
425  + 2*params.fanout*sizeof(temp_file);
426  }
427 
428  static memory_size_type minimum_memory_phase_1() {
429  // Our *absolute minimum* memory requirements are a single item and
430  // twice as many temp_files as the fanout.
431  // However, our fanout calculation does not take the memory available
432  // in this phase (run formation) into account.
433  // Thus, we assume the largest fanout, meaning we might overshoot.
434  // If we do overshoot, we will just spend the extra bytes on a run length
435  // longer than 1, which is probably what the user wants anyway.
436  sort_parameters p((sort_parameters()));
437  p.runLength = 1;
438  p.fanout = calculate_fanout(std::numeric_limits<memory_size_type>::max());
439  return memory_usage_phase_1(p);
440  }
441 
442  static memory_size_type memory_usage_phase_2(const sort_parameters & params) {
443  return fanout_memory_usage(params.fanout);
444  }
445 
446  static memory_size_type minimum_memory_phase_2() {
447  return fanout_memory_usage(calculate_fanout(0));
448  }
449 
450  static memory_size_type memory_usage_phase_3(const sort_parameters & params) {
451  return fanout_memory_usage(params.finalFanout);
452  }
453 
454  static memory_size_type minimum_memory_phase_3() {
455  return fanout_memory_usage(calculate_fanout(0));
456  }
457 
458  static memory_size_type maximum_memory_phase_3() {
459  return fanout_memory_usage(maximumFanout);
460  }
461 
462  inline memory_size_type evacuated_memory_usage() const {
463  return 2*p.fanout*sizeof(temp_file);
464  }
465 
466 private:
473  inline void calculate_parameters(const memory_size_type m1, const memory_size_type m2, const memory_size_type m3) {
474  tp_assert(m_state == stParameters, "Merge sorting already begun");
475 
476  p.memoryPhase1 = m1;
477  p.memoryPhase2 = m2;
478  p.memoryPhase3 = m3;
479 
480  // We must set aside memory for temp_files in m_runFiles.
481  // m_runFiles contains fanout*2 temp_files, so calculate fanout before run length.
482 
483  // Phase 2 (merge):
484  // Run length: unbounded
485  // Fanout: determined by the size of our merge heap and the stream memory usage.
486  log_debug() << "Phase 2: " << p.memoryPhase2 << " b available memory\n";
487  p.fanout = calculate_fanout(p.memoryPhase2);
488  if (fanout_memory_usage(p.fanout) > p.memoryPhase2) {
489  log_debug() << "Not enough memory for fanout " << p.fanout << "! (" << p.memoryPhase2 << " < " << fanout_memory_usage(p.fanout) << ")\n";
490  p.memoryPhase2 = fanout_memory_usage(p.fanout);
491  }
492 
493  // Phase 3 (final merge & report):
494  // Run length: unbounded
495  // Fanout: determined by the stream memory usage.
496  log_debug() << "Phase 3: " << p.memoryPhase3 << " b available memory\n";
497  p.finalFanout = calculate_fanout(p.memoryPhase3);
498 
499  if (p.finalFanout > p.fanout)
500  p.finalFanout = p.fanout;
501 
502  if (fanout_memory_usage(p.finalFanout) > p.memoryPhase3) {
503  log_debug() << "Not enough memory for fanout " << p.finalFanout << "! (" << p.memoryPhase3 << " < " << fanout_memory_usage(p.finalFanout) << ")\n";
504  p.memoryPhase3 = fanout_memory_usage(p.finalFanout);
505  }
506 
507  // Phase 1 (run formation):
508  // Run length: determined by the number of items we can hold in memory.
509  // Fanout: unbounded
510 
511  memory_size_type streamMemory = file_stream<T>::memory_usage();
512  memory_size_type tempFileMemory = 2*p.fanout*sizeof(temp_file);
513 
514  log_debug() << "Phase 1: " << p.memoryPhase1 << " b available memory; " << streamMemory << " b for a single stream; " << tempFileMemory << " b for temp_files\n";
515  memory_size_type min_m1 = sizeof(T) + streamMemory + tempFileMemory;
516  if (p.memoryPhase1 < min_m1) {
517  log_warning() << "Not enough phase 1 memory for an item and an open stream! (" << p.memoryPhase1 << " < " << min_m1 << ")\n";
518  p.memoryPhase1 = min_m1;
519  }
520  p.runLength = (p.memoryPhase1 - streamMemory - tempFileMemory)/sizeof(T);
521 
522  p.internalReportThreshold = (std::min(p.memoryPhase1,
523  std::min(p.memoryPhase2,
524  p.memoryPhase3))
525  - tempFileMemory)/sizeof(T);
528 
529  m_parametersSet = true;
530 
531  log_debug() << "Calculated merge sort parameters\n";
532  p.dump(log_debug());
533  log_debug() << std::endl;
534 
535  log_debug() << "Merge sort phase 1: "
536  << m1 << " b available, " << memory_usage_phase_1(p) << " b expected" << std::endl;
537  if (memory_usage_phase_1(p) > m1) {
538  log_warning() << "Merge sort phase 1 exceeds the alloted memory usage: "
539  << m1 << " b available, but " << memory_usage_phase_1(p) << " b expected" << std::endl;
540  }
541  log_debug() << "Merge sort phase 2: "
542  << m2 << " b available, " << memory_usage_phase_2(p) << " b expected" << std::endl;
543  if (memory_usage_phase_2(p) > m2) {
544  log_warning() << "Merge sort phase 2 exceeds the alloted memory usage: "
545  << m2 << " b available, but " << memory_usage_phase_2(p) << " b expected" << std::endl;
546  }
547  log_debug() << "Merge sort phase 3: "
548  << m3 << " b available, " << memory_usage_phase_3(p) << " b expected" << std::endl;
549  if (memory_usage_phase_3(p) > m3) {
550  log_warning() << "Merge sort phase 3 exceeds the alloted memory usage: "
551  << m3 << " b available, but " << memory_usage_phase_3(p) << " b expected" << std::endl;
552  }
553  }
554 
558  static inline memory_size_type calculate_fanout(memory_size_type availableMemory) {
559  memory_size_type fanout_lo = 2;
560  memory_size_type fanout_hi = maximumFanout + 1;
561  // binary search
562  while (fanout_lo < fanout_hi - 1) {
563  memory_size_type mid = fanout_lo + (fanout_hi-fanout_lo)/2;
564  if (fanout_memory_usage(mid) <= availableMemory) {
565  fanout_lo = mid;
566  } else {
567  fanout_hi = mid;
568  }
569  }
570  return fanout_lo;
571  }
572 
576  static inline memory_size_type fanout_memory_usage(memory_size_type fanout) {
577  return merger<T, pred_t>::memory_usage(fanout) // accounts for the `fanout' open streams
578  + file_stream<T>::memory_usage() // output stream
579  + 2*sizeof(temp_file); // merge_sorter::m_runFiles
580  }
581 
582 public:
591  void set_items(stream_size_type n) {
592  if (!m_parametersSet)
593  throw exception("Wrong state in set_items: parameters not set");
594  if (m_state != stParameters)
595  throw exception("Wrong state in set_items: state is not stParameters");
596 
597  if (n < p.runLength) {
598  log_debug() << "Decreasing run length from " << p.runLength
599  << " to " << n << std::endl;
600 
601  p.runLength = n;
602 
603  // Mirror the restriction from calculate_parameters.
606 
607  log_debug() << "New merge sort parameters\n";
608  p.dump(log_debug());
609  log_debug() << std::endl;
610  }
611  }
612 
613 private:
619  inline memory_size_type run_file_index(memory_size_type mergeLevel, memory_size_type runNumber) {
620  // runNumber is a memory_size_type since it is used as an index into
621  // m_runFiles.
622 
623  return (mergeLevel % 2)*p.fanout + (runNumber % p.fanout);
624  }
625 
629  inline void open_run_file_write(file_stream<T> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
630  // see run_file_index comment about runNumber
631 
632  memory_size_type idx = run_file_index(mergeLevel, runNumber);
633  if (runNumber < p.fanout) m_runFiles[idx].free();
634  fs.open(m_runFiles[idx], access_read_write);
635  fs.seek(0, file_stream<T>::end);
636  }
637 
641  inline void open_run_file_read(file_stream<T> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
642  // see run_file_index comment about runNumber
643 
644  memory_size_type idx = run_file_index(mergeLevel, runNumber);
645  fs.open(m_runFiles[idx], access_read);
646  fs.seek(calculate_run_length(p.runLength, p.fanout, mergeLevel) * (runNumber / p.fanout), file_stream<T>::beginning);
647  }
648 
649  enum state_type {
650  stParameters,
651  stRunFormation,
652  stMerge,
653  stReport
654  };
655 
656  state_type m_state;
657 
658  sort_parameters p;
659  bool m_parametersSet;
660 
661  merger<T, pred_t> m_merger;
662 
663  array<temp_file> m_runFiles;
664 
665  // number of runs already written to disk.
666  stream_size_type m_finishedRuns;
667 
668  // current run buffer. size 0 before begin(), size runLength after begin().
669  array<T> m_currentRunItems;
670 
671  // Number of items in current run buffer.
672  // Used to index into m_currentRunItems, so memory_size_type.
673  memory_size_type m_currentRunItemCount;
674 
675  bool m_reportInternal;
676 
677  // When doing internal reporting: the number of items already reported
678  // Used in comparison with m_currentRunItemCount
679  memory_size_type m_itemsPulled;
680 
681  stream_size_type m_itemCount;
682 
683  pred_t pred;
684 
685  bool m_evacuated;
686  bool m_finalMergeInitialized;
687  memory_size_type m_finalMergeLevel;
688  memory_size_type m_finalRunCount;
689  memory_size_type m_finalMergeSpecialRunNumber;
690 };
691 
692 } // namespace tpie
693 
694 #endif // __TPIE_PIPELINING_MERGE_SORTER_H__
Encapsulation of two pointers from any random access container.
stream_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
The base class for indicating the progress of some task.
void parallel_sort(iterator_type a, iterator_type b, typename tpie::progress_types< Progress >::base &pi, comp_type comp=std::less< typename boost::iterator_value< iterator_type >::type >())
Sort items in the range [a,b) using a parallel quick sort.
A generic array with a fixed size.
Definition: array.h:143
Merge sorting consists of three phases.
Definition: merge_sorter.h:44
void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3)
Calculate parameters from given memory amount.
Definition: merge_sorter.h:90
virtual void done()
Advance the indicator to the end.
void set_items(stream_size_type n)
Set upper bound on number of items pushed.
Definition: merge_sorter.h:591
Open a file for reading.
Definition: access_type.h:31
void set_available_memory(memory_size_type m)
Calculate parameters from given memory amount.
Definition: merge_sorter.h:80
memory_size_type memoryPhase3
Memory available during output phase.
void set_parameters(stream_size_type runLength, memory_size_type fanout)
Enable setting run length and fanout manually (for testing purposes).
Definition: merge_sorter.h:66
Encapsulation of two pointers from any random access container.
Definition: array_view.h:47
memory_manager & get_memory_manager()
Return a reference to the memory manager.
memory_size_type finalFanout
Fanout of merge tree during phase 4.
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:124
void calc(typename Progress::base &pi)
Perform phase 2: Performing all merges in the merge tree except the last one.
Definition: merge_sorter.h:197
memory_size_type memoryPhase2
Memory available while merging runs.
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:431
static memory_size_type memory_usage(float blockFactor=1.0, bool includeDefaultFileAccessor=true)
Calculate the amount of memory used by a single file_stream.
Definition: file_stream.h:134
Simple class acting both as file and a file::stream.
Definition: file_stream.h:44
void swap(array &other)
Swap two arrays.
Definition: array.h:445
iterator begin()
Return an iterator to the beginning of the array.
Definition: array.h:274
size_type size() const
Return the size of the array.
Definition: array.h:472
void begin()
Initiate phase 1: Formation of input runs.
Definition: merge_sorter.h:124
bool can_pull()
In phase 3, return true if there are more items in the final merge phase.
Definition: merge_sorter.h:394
stream_size_type runLength
Run length, subject to memory restrictions during phase 2.
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
For applications where you wish to disable progress indicators via a template parameter, refer to progress_types members names sub, fp and base.
T pull()
In phase 3, fetch next item in the final merge phase.
Definition: merge_sorter.h:406
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:114
memory_size_type fanout
Fanout of merge tree during phase 3.
Progress indicator concept in an efficient non-inheritance way.
void push(const T &item)
Push item to merge sorter during phase 1.
Definition: merge_sorter.h:139
void end()
End phase 1.
Definition: merge_sorter.h:153
Open a file for reading or writing.
Definition: access_type.h:35
virtual void init(stream_size_type range=0)
Initialize progress indicator.
memory_size_type memoryPhase1
Memory available while forming sorted runs.