TPIE

v1.1rc1-6-g0c97303
serialization_sort.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_SERIALIZATION_SORT_H
21 #define TPIE_SERIALIZATION_SORT_H
22 
23 #include <queue>
24 #include <boost/filesystem.hpp>
25 
26 #include <tpie/array.h>
27 #include <tpie/array_view.h>
28 #include <tpie/tempname.h>
29 #include <tpie/tpie_log.h>
30 #include <tpie/stats.h>
31 #include <tpie/parallel_sort.h>
32 
33 #include <tpie/serialization2.h>
35 
36 namespace tpie {
37 
38 namespace serialization_bits {
39 
42  memory_size_type memoryPhase1;
44  memory_size_type memoryPhase2;
46  memory_size_type memoryPhase3;
48  memory_size_type minimumItemSize;
50  std::string tempDir;
51 
52  void dump(std::ostream & out) const {
53  out << "Serialization merge sort parameters\n"
54  << "Phase 1 memory: " << memoryPhase1 << '\n'
55  << "Phase 2 memory: " << memoryPhase2 << '\n'
56  << "Phase 3 memory: " << memoryPhase3 << '\n'
57  << "Minimum item size: " << minimumItemSize << '\n'
58  << "Temporary directory: " << tempDir << '\n';
59  }
60 };
61 
62 template <typename T, typename pred_t>
64  array<T> m_buffer;
65  memory_size_type m_items;
66  memory_size_type m_serializedSize;
67  memory_size_type m_memAvail;
68 
69  memory_size_type m_largestItem;
70 
71  pred_t m_pred;
72 
73  bool m_full;
74 
75 public:
76  internal_sort(pred_t pred = pred_t())
77  : m_items(0)
78  , m_serializedSize(0)
79  , m_largestItem(sizeof(T))
80  , m_pred(pred)
81  , m_full(false)
82  {
83  }
84 
85  void begin(memory_size_type memAvail) {
86  m_buffer.resize(memAvail / sizeof(T) / 2);
87  m_items = m_serializedSize = 0;
88  m_largestItem = sizeof(T);
89  m_full = false;
90  m_memAvail = memAvail;
91  }
92 
99  bool push(const T & item) {
100  if (m_full) return false;
101 
102  if (m_items == m_buffer.size()) {
103  m_full = true;
104  return false;
105  }
106 
107  memory_size_type serSize = serialized_size(item);
108 
109  if (serSize > sizeof(T)) {
110  // amount of memory this item needs for its extra stuff (stuff not in the buffer).
111  memory_size_type serializedExtra = serSize - sizeof(T);
112 
113  // amount of memory not used for the buffer and not used for extra stuff already.
114  memory_size_type memRemainingExtra = m_memAvail - memory_usage();
115 
116  if (serializedExtra > memRemainingExtra) {
117  m_full = true;
118  return false;
119  }
120 
121  if (serSize > m_largestItem)
122  m_largestItem = serSize;
123  }
124 
125  m_serializedSize += serSize;
126 
127  m_buffer[m_items++] = item;
128 
129  return true;
130  }
131 
132  memory_size_type get_largest_item_size() {
133  return m_largestItem;
134  }
135 
142  memory_size_type current_serialized_size() {
143  return m_serializedSize;
144  }
145 
156  memory_size_type memory_usage() {
157  return m_buffer.size() * sizeof(T)
158  + (m_serializedSize - m_items * sizeof(T));
159  }
160 
161  bool can_shrink_buffer() {
163  }
164 
165  void shrink_buffer() {
166  array<T> newBuffer(array_view<const T>(begin(), end()));
167  m_buffer.swap(newBuffer);
168  }
169 
170  void sort() {
171  parallel_sort(m_buffer.get(), m_buffer.get() + m_items, m_pred);
172  }
173 
174  const T * begin() const {
175  return m_buffer.get();
176  }
177 
178  const T * end() const {
179  return m_buffer.get() + m_items;
180  }
181 
185  void free() {
186  m_buffer.resize(0);
187  reset();
188  }
189 
194  void reset() {
195  m_items = m_serializedSize = 0;
196  m_full = false;
197  }
198 };
199 
239 template <typename T>
241  // Physical index of the run file with logical index 0.
242  size_t m_fileOffset;
243  // Physical index of the run file that begins the next run.
244  size_t m_nextLevelFileOffset;
245  // Physical index of the next run file to write
246  size_t m_nextFileOffset;
247 
248  bool m_writerOpen;
249  size_t m_readersOpen;
250 
251  serialization_writer m_writer;
252  stream_size_type m_currentWriterByteSize;
253 
254  array<serialization_reader> m_readers;
255 
256  std::string m_tempDir;
257 
258  std::string run_file(size_t physicalIndex) {
259  if (m_tempDir.size() == 0) throw exception("run_file: temp dir is the empty string");
260  std::stringstream ss;
261  ss << m_tempDir << '/' << physicalIndex << ".tpie";
262  return ss.str();
263  }
264 
265 public:
266  file_handler()
267  : m_fileOffset(0)
268  , m_nextLevelFileOffset(0)
269  , m_nextFileOffset(0)
270 
271  , m_writerOpen(false)
272  , m_readersOpen(0)
273 
274  , m_writer()
275  , m_currentWriterByteSize(0)
276  {
277  }
278 
279  ~file_handler() {
280  reset();
281  }
282 
283  void set_temp_dir(const std::string & tempDir) {
284  if (m_nextFileOffset != 0)
285  throw exception("set_temp_dir: trying to change path after files already open");
286  m_tempDir = tempDir;
287  }
288 
289  void open_new_writer() {
290  if (m_writerOpen) throw exception("open_new_writer: Writer already open");
291  m_writer.open(run_file(m_nextFileOffset++));
292  m_currentWriterByteSize = m_writer.file_size();
293  m_writerOpen = true;
294  }
295 
296  void write(const T & v) {
297  if (!m_writerOpen) throw exception("write: No writer open");
298  m_writer.serialize(v);
299  }
300 
301  void close_writer() {
302  if (!m_writerOpen) throw exception("close_writer: No writer open");
303  m_writer.close();
304  stream_size_type sz = m_writer.file_size();
305  increase_usage(m_nextFileOffset-1, static_cast<stream_offset_type>(sz));
306  m_writerOpen = false;
307  }
308 
309  size_t remaining_runs() {
310  return m_nextLevelFileOffset - m_fileOffset;
311  }
312 
313  size_t next_level_runs() {
314  return m_nextFileOffset - m_nextLevelFileOffset;
315  }
316 
317  bool readers_open() {
318  return m_readersOpen > 0;
319  }
320 
321  void open_readers(size_t fanout) {
322  if (m_readersOpen != 0) throw exception("open_readers: readers already open");
323  if (fanout == 0) throw exception("open_readers: fanout == 0");
324  if (remaining_runs() == 0) {
325  if (m_writerOpen) throw exception("Writer open while moving to next merge level");
326  m_nextLevelFileOffset = m_nextFileOffset;
327  }
328  if (fanout > remaining_runs()) throw exception("open_readers: fanout out of bounds");
329 
330  if (m_readers.size() < fanout) m_readers.resize(fanout);
331  for (size_t i = 0; i < fanout; ++i) {
332  m_readers[i].open(run_file(m_fileOffset + i));
333  }
334  m_readersOpen = fanout;
335  }
336 
337  bool can_read(size_t idx) {
338  if (m_readersOpen == 0) throw exception("can_read: no readers open");
339  if (m_readersOpen < idx) throw exception("can_read: index out of bounds");
340  return m_readers[idx].can_read();
341  }
342 
343  T read(size_t idx) {
344  if (m_readersOpen == 0) throw exception("read: no readers open");
345  if (m_readersOpen < idx) throw exception("read: index out of bounds");
346  T res;
347  m_readers[idx].unserialize(res);
348  return res;
349  }
350 
351  void close_readers_and_delete() {
352  if (m_readersOpen == 0) throw exception("close_readers_and_delete: no readers open");
353 
354  for (size_t i = 0; i < m_readersOpen; ++i) {
355  decrease_usage(m_fileOffset + i, m_readers[i].file_size());
356  m_readers[i].close();
357  boost::filesystem::remove(run_file(m_fileOffset + i));
358  }
359  m_fileOffset += m_readersOpen;
360  m_readersOpen = 0;
361  }
362 
363  void move_last_reader_to_next_level() {
364  if (remaining_runs() != 1)
365  throw exception("move_last_reader_to_next_level: remaining_runs != 1");
366  m_nextLevelFileOffset = m_fileOffset;
367  }
368 
369  void reset() {
370  if (m_readersOpen > 0) {
371  log_debug() << "reset: Close readers" << std::endl;
372  close_readers_and_delete();
373  }
374  m_readers.resize(0);
375  if (m_writerOpen) {
376  log_debug() << "reset: Close writer" << std::endl;
377  close_writer();
378  }
379  log_debug() << "Remove " << m_fileOffset << " through " << m_nextFileOffset << std::endl;
380  for (size_t i = m_fileOffset; i < m_nextFileOffset; ++i) {
381  std::string runFile = run_file(i);
383  rd.open(runFile);
384  decrease_usage(i, rd.file_size());
385  rd.close();
386  boost::filesystem::remove(runFile);
387  }
388  m_fileOffset = m_nextLevelFileOffset = m_nextFileOffset = 0;
389  }
390 
391 private:
392  void increase_usage(size_t idx, stream_size_type sz) {
393  log_debug() << "+ " << idx << ' ' << sz << std::endl;
394  increment_temp_file_usage(static_cast<stream_offset_type>(sz));
395  }
396 
397  void decrease_usage(size_t idx, stream_size_type sz) {
398  log_debug() << "- " << idx << ' ' << sz << std::endl;
399  increment_temp_file_usage(-static_cast<stream_offset_type>(sz));
400  }
401 };
402 
403 template <typename T, typename pred_t>
404 class merger {
405  class mergepred_t {
406  pred_t m_pred;
407 
408  public:
409  typedef std::pair<T, size_t> item_type;
410 
411  mergepred_t(const pred_t & pred) : m_pred(pred) {}
412 
413  // Used with std::priority_queue, so invert the original relation.
414  bool operator()(const item_type & a, const item_type & b) const {
415  return m_pred(b.first, a.first);
416  }
417  };
418 
419  typedef typename mergepred_t::item_type item_type;
420 
421  file_handler<T> & files;
422  pred_t pred;
423  std::vector<serialization_reader> rd;
424  typedef std::priority_queue<item_type, std::vector<item_type>, mergepred_t> priority_queue_type;
425  priority_queue_type pq;
426 
427 public:
428  merger(file_handler<T> & files, const pred_t & pred)
429  : files(files)
430  , pred(pred)
431  , pq(mergepred_t(pred))
432  {
433  }
434 
435  // Assume files.open_readers(fanout) has just been called
436  void init(size_t fanout) {
437  rd.resize(fanout);
438  for (size_t i = 0; i < fanout; ++i)
439  push_from(i);
440  }
441 
442  bool empty() const {
443  return pq.empty();
444  }
445 
446  const T & top() const {
447  return pq.top().first;
448  }
449 
450  void pop() {
451  size_t idx = pq.top().second;
452  pq.pop();
453  push_from(idx);
454  }
455 
456  // files.close_readers_and_delete() should be called after this
457  void free() {
458  {
459  priority_queue_type tmp(pred);
460  std::swap(pq, tmp);
461  }
462  rd.resize(0);
463  }
464 
465 private:
466  void push_from(size_t idx) {
467  if (files.can_read(idx)) {
468  pq.push(std::make_pair(files.read(idx), idx));
469  }
470  }
471 };
472 
473 } // namespace serialization_bits
474 
475 template <typename T, typename pred_t = std::less<T> >
477 public:
478  typedef boost::shared_ptr<serialization_sort> ptr;
479 
480 private:
481  enum sorter_state { state_initial, state_1, state_2, state_3 };
482 
483  sorter_state m_state;
486  bool m_parametersSet;
489 
490  stream_size_type m_items;
491  bool m_reportInternal;
492  const T * m_nextInternalItem;
493 
494 public:
495  serialization_sort(memory_size_type minimumItemSize = sizeof(T), pred_t pred = pred_t())
496  : m_state(state_initial)
497  , m_sorter(pred)
498  , m_parametersSet(false)
499  , m_files()
500  , m_merger(m_files, pred)
501  , m_items(0)
502  , m_reportInternal(false)
503  , m_nextInternalItem(0)
504  {
505  m_params.memoryPhase1 = 0;
506  m_params.memoryPhase2 = 0;
507  m_params.memoryPhase3 = 0;
508  m_params.minimumItemSize = minimumItemSize;
509  }
510 
511 private:
512  // set_phase_?_memory helper
513  inline void maybe_calculate_parameters() {
514  if (m_state != state_initial)
515  throw tpie::exception("Bad state in maybe_calculate_parameters");
516  if (m_params.memoryPhase1 > 0 &&
517  m_params.memoryPhase2 > 0 &&
518  m_params.memoryPhase3 > 0)
519 
520  calculate_parameters();
521  }
522 
523 public:
524  void set_phase_1_memory(memory_size_type m1) {
525  m_params.memoryPhase1 = m1;
526  maybe_calculate_parameters();
527  }
528 
529  void set_phase_2_memory(memory_size_type m2) {
530  m_params.memoryPhase2 = m2;
531  maybe_calculate_parameters();
532  }
533 
534  void set_phase_3_memory(memory_size_type m3) {
535  m_params.memoryPhase3 = m3;
536  maybe_calculate_parameters();
537  }
538 
539  void set_available_memory(memory_size_type m) {
540  set_phase_1_memory(m);
541  set_phase_2_memory(m);
542  set_phase_3_memory(m);
543  }
544 
545  void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3) {
546  set_phase_1_memory(m1);
547  set_phase_2_memory(m2);
548  set_phase_3_memory(m3);
549  }
550 
551  static memory_size_type minimum_memory_phase_1() {
552  return serialization_writer::memory_usage()*2;
553  }
554 
555  static memory_size_type minimum_memory_phase_2() {
556  return serialization_writer::memory_usage()
557  + 2*serialization_reader::memory_usage();
558  }
559 
560  static memory_size_type minimum_memory_phase_3() {
561  return 2*serialization_reader::memory_usage();
562  }
563 
564 private:
565  void calculate_parameters() {
566  if (m_state != state_initial)
567  throw tpie::exception("Bad state in calculate_parameters");
568 
569  memory_size_type memAvail1 = m_params.memoryPhase1;
570  if (memAvail1 <= serialization_writer::memory_usage()) {
571  log_error() << "Not enough memory for run formation; have " << memAvail1
572  << " bytes but " << serialization_writer::memory_usage()
573  << " is required for writing a run." << std::endl;
574  throw exception("Not enough memory for run formation");
575  }
576 
577  memory_size_type memAvail2 = m_params.memoryPhase2;
578 
579  // We have to keep a writer open no matter what.
580  if (memAvail2 <= serialization_writer::memory_usage()) {
581  log_error() << "Not enough memory for merging. "
582  << "mem avail = " << memAvail2
583  << ", writer usage = " << serialization_writer::memory_usage()
584  << std::endl;
585  throw exception("Not enough memory for merging.");
586  }
587 
588  memory_size_type memAvail3 = m_params.memoryPhase3;
589 
590  // We have to keep a writer open no matter what.
591  if (memAvail2 <= serialization_writer::memory_usage()) {
592  log_error() << "Not enough memory for outputting. "
593  << "mem avail = " << memAvail3
594  << ", writer usage = " << serialization_writer::memory_usage()
595  << std::endl;
596  throw exception("Not enough memory for outputting.");
597  }
598 
599  memory_size_type memForMerge = std::min(memAvail2, memAvail3);
600 
601  // We do not yet know the serialized size of the largest item,
602  // so this calculation has to be redone.
603  // Instead, we assume that all items have minimum size.
604 
605  // We have to keep a writer open no matter what.
606  memory_size_type fanoutMemory = memForMerge - serialization_writer::memory_usage();
607 
608  // This is a lower bound on the memory used per fanout.
609  memory_size_type perFanout = m_params.minimumItemSize + serialization_reader::memory_usage();
610 
611  // Floored division to compute the largest possible fanout.
612  memory_size_type fanout = fanoutMemory / perFanout;
613  if (fanout < 2) {
614  log_error() << "Not enough memory for merging, even when minimum item size is assumed. "
615  << "mem avail = " << memForMerge
616  << ", fanout memory = " << fanoutMemory
617  << ", per fanout >= " << perFanout
618  << std::endl;
619  throw exception("Not enough memory for merging.");
620  }
621 
622  m_params.tempDir = tempname::tpie_dir_name();
623  m_files.set_temp_dir(m_params.tempDir);
624 
625  log_info() << "Calculated serialization_sort parameters.\n";
626  m_params.dump(log_info());
627  log_info() << std::flush;
628 
629  m_parametersSet = true;
630  }
631 
632 public:
633  void begin() {
634  if (!m_parametersSet)
635  throw tpie::exception("Parameters not set in serialization_sorter");
636  if (m_state != state_initial)
637  throw tpie::exception("Bad state in begin");
638  m_state = state_1;
639 
640  log_info() << "Before begin; mem usage = "
641  << get_memory_manager().used() << std::endl;
642  m_sorter.begin(m_params.memoryPhase1 - serialization_writer::memory_usage());
643  log_info() << "After internal sorter begin; mem usage = "
644  << get_memory_manager().used() << std::endl;
645  boost::filesystem::create_directory(m_params.tempDir);
646  }
647 
648  void push(const T & item) {
649  if (m_state != state_1)
650  throw tpie::exception("Bad state in push");
651 
652  ++m_items;
653 
654  if (m_sorter.push(item)) return;
655  end_run();
656  if (!m_sorter.push(item)) {
657  throw exception("Couldn't fit a single item in buffer");
658  }
659  }
660 
661  void end() {
662  if (m_state != state_1)
663  throw tpie::exception("Bad state in end");
664 
665  memory_size_type internalThreshold =
666  std::min(m_params.memoryPhase2, m_params.memoryPhase3);
667 
668  log_debug() << "m_sorter.memory_usage == " << m_sorter.memory_usage() << '\n'
669  << "internalThreshold == " << internalThreshold << std::endl;
670 
671  if (m_items == 0) {
672  m_reportInternal = true;
673  m_nextInternalItem = 0;
674  m_sorter.free();
675  log_debug() << "Got no items. Internal reporting mode." << std::endl;
676  } else if (m_files.next_level_runs() == 0
677  && m_sorter.memory_usage()
678  <= internalThreshold) {
679 
680  m_sorter.sort();
681  m_reportInternal = true;
682  m_nextInternalItem = m_sorter.begin();
683  log_debug() << "Got " << m_sorter.current_serialized_size()
684  << " bytes of items. Internal reporting mode." << std::endl;
685  } else if (m_files.next_level_runs() == 0
686  && m_sorter.current_serialized_size() <= internalThreshold
687  && m_sorter.can_shrink_buffer()) {
688 
689  m_sorter.sort();
690  m_sorter.shrink_buffer();
691  m_reportInternal = true;
692  m_nextInternalItem = m_sorter.begin();
693  log_debug() << "Got " << m_sorter.current_serialized_size()
694  << " bytes of items. Internal reporting mode after shrinking buffer." << std::endl;
695 
696  } else {
697 
698  end_run();
699  log_debug() << "Got " << m_files.next_level_runs() << " runs. "
700  << "External reporting mode." << std::endl;
701  m_sorter.free();
702  m_reportInternal = false;
703  }
704 
705  log_info() << "After internal sorter end; mem usage = "
706  << get_memory_manager().used() << std::endl;
707 
708  m_state = state_2;
709  }
710 
711  stream_size_type item_count() {
712  return m_items;
713  }
714 
715  void evacuate() {
716  switch (m_state) {
717  case state_initial:
718  throw tpie::exception("Cannot evacuate in state initial");
719  case state_1:
720  throw tpie::exception("Cannot evacuate in state 1");
721  case state_2:
722  case state_3:
723  if (m_reportInternal) {
724  end_run();
725  m_sorter.free();
726  m_reportInternal = false;
727  log_debug() << "Evacuate out of internal reporting mode." << std::endl;
728  } else {
729  log_debug() << "Evacuate in external reporting mode - noop." << std::endl;
730  }
731  break;
732  }
733  }
734 
735  memory_size_type evacuated_memory_usage() const {
736  return 0;
737  }
738 
739  void merge_runs() {
740  if (m_state != state_2)
741  throw tpie::exception("Bad state in end");
742 
743  if (m_reportInternal) {
744  log_debug() << "merge_runs: internal reporting; doing nothing." << std::endl;
745  m_state = state_3;
746  return;
747  }
748 
749  memory_size_type largestItem = m_sorter.get_largest_item_size();
750  if (largestItem == 0) {
751  log_warning() << "Largest item is 0 bytes; doing nothing." << std::endl;
752  m_state = state_3;
753  return;
754  }
755 
756  if (m_params.memoryPhase2 <= serialization_writer::memory_usage())
757  throw exception("Not enough memory for merging.");
758 
759  // Perform almost the same computation as in calculate_parameters.
760  // Only change the item size to largestItem rather than minimumItemSize.
761  memory_size_type fanoutMemory = m_params.memoryPhase2 - serialization_writer::memory_usage();
762  memory_size_type perFanout = largestItem + serialization_reader::memory_usage();
763  memory_size_type fanout = fanoutMemory / perFanout;
764 
765  if (fanout < 2) {
766  log_error() << "Not enough memory for merging. "
767  << "mem avail = " << m_params.memoryPhase2
768  << ", fanout memory = " << fanoutMemory
769  << ", per fanout = " << perFanout
770  << std::endl;
771  throw exception("Not enough memory for merging.");
772  }
773 
774  memory_size_type finalFanoutMemory = m_params.memoryPhase3;
775  memory_size_type finalFanout =
776  std::min(fanout,
777  finalFanoutMemory / perFanout);
778 
779  if (finalFanout < 2) {
780  log_error() << "Not enough memory for merging (final fanout < 2). "
781  << "mem avail = " << m_params.memoryPhase3
782  << ", final fanout memory = " << finalFanoutMemory
783  << ", per fanout = " << perFanout
784  << std::endl;
785  throw exception("Not enough memory for merging.");
786  }
787 
788  log_debug() << "Calculated merge phase parameters for serialization sort.\n"
789  << "Fanout: " << fanout << '\n'
790  << "Final fanout: " << finalFanout << '\n'
791  ;
792 
793  while (m_files.next_level_runs() > finalFanout) {
794  if (m_files.remaining_runs() != 0)
795  throw exception("m_files.remaining_runs() != 0");
796  log_debug() << "Runs in current level: " << m_files.next_level_runs() << '\n';
797  for (size_t remainingRuns = m_files.next_level_runs(); remainingRuns > 0;) {
798  size_t f = std::min(fanout, remainingRuns);
799  merge_runs(f);
800  remainingRuns -= f;
801  if (remainingRuns != m_files.remaining_runs())
802  throw exception("remainingRuns != m_files.remaining_runs()");
803  }
804  }
805 
806  m_state = state_3;
807  }
808 
809 private:
810  void end_run() {
811  m_sorter.sort();
812  if (m_sorter.begin() == m_sorter.end()) return;
813  m_files.open_new_writer();
814  for (const T * item = m_sorter.begin(); item != m_sorter.end(); ++item) {
815  m_files.write(*item);
816  }
817  m_files.close_writer();
818  m_sorter.reset();
819  }
820 
821  void initialize_merger(size_t fanout) {
822  if (fanout == 0) throw exception("initialize_merger: fanout == 0");
823  m_files.open_readers(fanout);
824  m_merger.init(fanout);
825  }
826 
827  void free_merger_and_files() {
828  m_merger.free();
829  m_files.close_readers_and_delete();
830  }
831 
832  void merge_runs(size_t fanout) {
833  if (fanout == 0) throw exception("merge_runs: fanout == 0");
834 
835  if (fanout == 1 && m_files.remaining_runs() == 1) {
836  m_files.move_last_reader_to_next_level();
837  return;
838  }
839 
840  initialize_merger(fanout);
841  m_files.open_new_writer();
842  while (!m_merger.empty()) {
843  m_files.write(m_merger.top());
844  m_merger.pop();
845  }
846  free_merger_and_files();
847  m_files.close_writer();
848  }
849 
850 public:
851  T pull() {
852  if (!can_pull())
853  throw exception("pull: !can_pull");
854 
855  if (m_reportInternal) {
856  T item = *m_nextInternalItem++;
857  if (m_nextInternalItem == m_sorter.end()) {
858  m_sorter.free();
859  m_nextInternalItem = 0;
860  }
861  return item;
862  }
863 
864  if (!m_files.readers_open()) {
865  if (m_files.next_level_runs() == 0)
866  throw exception("pull: next_level_runs == 0");
867  initialize_merger(m_files.next_level_runs());
868  }
869 
870  T item = m_merger.top();
871  m_merger.pop();
872 
873  if (m_merger.empty()) {
874  free_merger_and_files();
875  m_files.reset();
876  }
877 
878  return item;
879  }
880 
881  bool can_pull() {
882  if (m_reportInternal) return m_nextInternalItem != 0;
883  if (!m_files.readers_open()) return m_files.next_level_runs() > 0;
884  return !m_merger.empty();
885  }
886 };
887 
888 }
889 
890 #endif // TPIE_SERIALIZATION_SORT_H
memory_size_type memoryPhase2
Memory available while merging runs.
size_t serialized_size(const T &v)
Given a serializable, serialize it and measure its serialized size.
Encapsulation of two pointers from any random access container.
T * get()
Return a raw pointer to the array content.
Definition: array.h:477
Binary serialization and unserialization.
void parallel_sort(iterator_type a, iterator_type b, typename tpie::progress_types< Progress >::base &pi, comp_type comp=std::less< typename boost::iterator_value< iterator_type >::type >())
Sort items in the range [a,b) using a parallel quick sort.
memory_size_type memoryPhase1
Memory available while forming sorted runs.
void reset()
Reset sorter, but keep the remembered largest item size and buffer size.
logstream & log_info()
Return logstream for writing info log messages.
Definition: tpie_log.h:109
size_t available() const
Return the amount of memory still available to allocation.
Stream of serializable items.
memory_size_type memoryPhase3
Memory available during output phase.
static std::string tpie_dir_name(const std::string &post_base="", const std::string &dir="")
Generate path for a new temporary directory.
Logging functionality and log_level codes for different priorities of log messages.
I/O statistics.
Generic internal array with known memory requirements.
void serialize(const T &v)
Serialize a serializable item and write it to the stream.
void free()
Deallocate buffer and call reset().
Simple parallel quick sort implementation with progress tracking.
stream_size_type file_size()
Size of file in bytes, including the header.
memory_manager & get_memory_manager()
Return a reference to the memory manager.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:124
File handling for merge sort.
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:431
size_t used() const
Return the current amount of memory used.
std::string tempDir
Directory in which temporary files are stored.
void swap(array &other)
Swap two arrays.
Definition: array.h:445
void increment_temp_file_usage(stream_offset_type delta)
Increment (possibly by a negative amount) the number of bytes being used by temporary files...
size_type size() const
Return the size of the array.
Definition: array.h:472
Temporary file names.
memory_size_type minimumItemSize
Minimum size of serialized items.
bool push(const T &item)
True if all items up to and including this one fits in buffer.
memory_size_type current_serialized_size()
Get the serialized size of the items written.
logstream & log_error()
Return logstream for writing error log messages.
Definition: tpie_log.h:104
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:114
memory_size_type memory_usage()
Compute current memory usage.