TPIE

v1.1rc1-6-g0c97303
tpie::merge_sorter< T, UseProgress, pred_t > Class Template Reference

Merge sorting consists of three phases. More...

#include <tpie/pipelining/merge_sorter.h>

Public Types

typedef boost::shared_ptr
< merge_sorter
ptr
 
typedef progress_types
< UseProgress > 
Progress
 

Public Member Functions

 merge_sorter (pred_t pred=pred_t())
 
void set_parameters (stream_size_type runLength, memory_size_type fanout)
 Enable setting run length and fanout manually (for testing purposes). More...
 
void set_available_memory (memory_size_type m)
 Calculate parameters from given memory amount. More...
 
void set_available_memory (memory_size_type m1, memory_size_type m2, memory_size_type m3)
 Calculate parameters from given memory amount. More...
 
void set_phase_1_memory (memory_size_type m1)
 
void set_phase_2_memory (memory_size_type m2)
 
void set_phase_3_memory (memory_size_type m3)
 
void begin ()
 Initiate phase 1: Formation of input runs. More...
 
void push (const T &item)
 Push item to merge sorter during phase 1. More...
 
void end ()
 End phase 1. More...
 
void calc (typename Progress::base &pi)
 Perform phase 2: Performing all merges in the merge tree except the last one. More...
 
void evacuate ()
 
void evacuate_before_merging ()
 
void evacuate_before_reporting ()
 
void reinitialize_final_merger ()
 
bool can_pull ()
 In phase 3, return true if there are more items in the final merge phase. More...
 
pull ()
 In phase 3, fetch next item in the final merge phase. More...
 
stream_size_type item_count ()
 
memory_size_type evacuated_memory_usage () const
 
void set_items (stream_size_type n)
 Set upper bound on number of items pushed. More...
 

Static Public Member Functions

static memory_size_type memory_usage_phase_1 (const sort_parameters &params)
 
static memory_size_type minimum_memory_phase_1 ()
 
static memory_size_type memory_usage_phase_2 (const sort_parameters &params)
 
static memory_size_type minimum_memory_phase_2 ()
 
static memory_size_type memory_usage_phase_3 (const sort_parameters &params)
 
static memory_size_type minimum_memory_phase_3 ()
 
static memory_size_type maximum_memory_phase_3 ()
 

Static Public Attributes

static const memory_size_type maximumFanout = 250
 

Detailed Description

template<typename T, bool UseProgress, typename pred_t = std::less<T>>
class tpie::merge_sorter< T, UseProgress, pred_t >

Merge sorting consists of three phases.

  1. Sorting and forming runs
  2. Merging runs
  3. Final merge and report

If the number of elements received during phase 1 is less than the length of a single run, we are in "report internal" mode, meaning we do not write anything to disk. This causes phase 2 to be a no-op and phase 3 to be a simple array traversal.

Definition at line 44 of file merge_sorter.h.

Member Function Documentation

template<typename T , bool UseProgress, typename pred_t = std::less<T>>
void tpie::merge_sorter< T, UseProgress, pred_t >::begin ( )
inline

Initiate phase 1: Formation of input runs.

Definition at line 124 of file merge_sorter.h.

References tpie::sort_parameters::fanout, tpie::log_debug(), tpie::array< T, Allocator >::resize(), tpie::sort_parameters::runLength, and tp_assert.

124  {
125  tp_assert(m_state == stParameters, "Merge sorting already begun");
126  if (!m_parametersSet) throw merge_sort_not_ready();
127  log_debug() << "Start forming input runs" << std::endl;
128  m_currentRunItems.resize((size_t)p.runLength);
129  m_runFiles.resize(p.fanout*2);
130  m_currentRunItemCount = 0;
131  m_finishedRuns = 0;
132  m_state = stRunFormation;
133  m_itemCount = 0;
134  }
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:124
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:431
stream_size_type runLength
Run length, subject to memory restrictions during phase 2.
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
memory_size_type fanout
Fanout of merge tree during phase 3.
template<typename T , bool UseProgress, typename pred_t = std::less<T>>
void tpie::merge_sorter< T, UseProgress, pred_t >::calc ( typename Progress::base pi)
inline

Perform phase 2: Performing all merges in the merge tree except the last one.

Definition at line 197 of file merge_sorter.h.

References tpie::progress_indicator_base::done(), tpie::progress_indicator_base::init(), tpie::progress_indicator_base::step(), and tp_assert.

197  {
198  tp_assert(m_state == stMerge, "Wrong phase");
199  if (!m_reportInternal) {
200  prepare_pull(pi);
201  } else {
202  pi.init(1);
203  pi.step();
204  pi.done();
205  }
206  m_state = stReport;
207  }
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
template<typename T , bool UseProgress, typename pred_t = std::less<T>>
bool tpie::merge_sorter< T, UseProgress, pred_t >::can_pull ( )
inline

In phase 3, return true if there are more items in the final merge phase.

Definition at line 394 of file merge_sorter.h.

References tp_assert.

Referenced by tpie::merge_sorter< T, UseProgress, pred_t >::pull().

394  {
395  tp_assert(m_state == stReport, "Wrong phase");
396  if (m_reportInternal) return m_itemsPulled < m_currentRunItemCount;
397  else {
398  if (m_evacuated) reinitialize_final_merger();
399  return m_merger.can_pull();
400  }
401  }
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
template<typename T , bool UseProgress, typename pred_t = std::less<T>>
void tpie::merge_sorter< T, UseProgress, pred_t >::end ( )
inline

End phase 1.

Definition at line 153 of file merge_sorter.h.

References tpie::get_memory_manager(), tpie::sort_parameters::internalReportThreshold, tpie::log_debug(), tpie::array< T, Allocator >::resize(), tpie::array< T, Allocator >::size(), tpie::array< T, Allocator >::swap(), and tp_assert.

153  {
154  tp_assert(m_state == stRunFormation, "Wrong phase");
155  sort_current_run();
156 
157  if (m_itemCount == 0) {
158  tp_assert(m_currentRunItemCount == 0, "m_itemCount == 0, but m_currentRunItemCount != 0");
159  m_reportInternal = true;
160  m_itemsPulled = 0;
161  m_currentRunItems.resize(0);
162  log_debug() << "Got no items. Internal reporting mode." << std::endl;
163  } else if (m_finishedRuns == 0 && m_currentRunItems.size() <= p.internalReportThreshold) {
164  // Our current buffer fits within the memory requirements of phase 2.
165  m_reportInternal = true;
166  m_itemsPulled = 0;
167  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode." << std::endl;
168 
169  } else if (m_finishedRuns == 0
170  && m_currentRunItemCount <= p.internalReportThreshold
171  && array<T>::memory_usage(m_currentRunItemCount) <= get_memory_manager().available()) {
172  // Our current buffer does not fit within the memory requirements
173  // of phase 2, but we have enough temporary memory to copy and
174  // resize the buffer.
175 
176  array<T> currentRun(array_view<T>(m_currentRunItems, 0, m_currentRunItemCount));
177  m_currentRunItems.swap(currentRun);
178 
179  m_reportInternal = true;
180  m_itemsPulled = 0;
181  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode "
182  << "after resizing item buffer." << std::endl;
183 
184  } else {
185  m_reportInternal = false;
186  empty_current_run();
187  m_currentRunItems.resize(0);
188  log_debug() << "Got " << m_finishedRuns << " runs. External reporting mode." << std::endl;
189  }
190  m_state = stMerge;
191  }
stream_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
memory_manager & get_memory_manager()
Return a reference to the memory manager.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:124
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:431
void swap(array &other)
Swap two arrays.
Definition: array.h:445
size_type size() const
Return the size of the array.
Definition: array.h:472
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
static stream_size_type memory_usage(stream_size_type size)
Return the number of bytes required to create a data structure supporting a given number of elements...
Definition: util.h:81
template<typename T , bool UseProgress, typename pred_t = std::less<T>>
T tpie::merge_sorter< T, UseProgress, pred_t >::pull ( )
inline

In phase 3, fetch next item in the final merge phase.

Definition at line 406 of file merge_sorter.h.

References tpie::merge_sorter< T, UseProgress, pred_t >::can_pull(), tpie::array< T, Allocator >::resize(), and tp_assert.

406  {
407  tp_assert(m_state == stReport, "Wrong phase");
408  if (m_reportInternal && m_itemsPulled < m_currentRunItemCount) {
409  T el = m_currentRunItems[m_itemsPulled++];
410  if (!can_pull()) m_currentRunItems.resize(0);
411  return el;
412  } else {
413  if (m_evacuated) reinitialize_final_merger();
414  return m_merger.pull();
415  }
416  }
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:431
bool can_pull()
In phase 3, return true if there are more items in the final merge phase.
Definition: merge_sorter.h:394
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
template<typename T , bool UseProgress, typename pred_t = std::less<T>>
void tpie::merge_sorter< T, UseProgress, pred_t >::push ( const T &  item)
inline

Push item to merge sorter during phase 1.

Definition at line 139 of file merge_sorter.h.

References tpie::sort_parameters::runLength, and tp_assert.

139  {
140  tp_assert(m_state == stRunFormation, "Wrong phase");
141  if (m_currentRunItemCount >= p.runLength) {
142  sort_current_run();
143  empty_current_run();
144  }
145  m_currentRunItems[m_currentRunItemCount] = item;
146  ++m_currentRunItemCount;
147  ++m_itemCount;
148  }
stream_size_type runLength
Run length, subject to memory restrictions during phase 2.
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
template<typename T , bool UseProgress, typename pred_t = std::less<T>>
void tpie::merge_sorter< T, UseProgress, pred_t >::set_available_memory ( memory_size_type  m)
inline

Calculate parameters from given memory amount.

Parameters
mMemory available for phase 2, 3 and 4

Definition at line 80 of file merge_sorter.h.

80  {
81  calculate_parameters(m, m, m);
82  }
template<typename T , bool UseProgress, typename pred_t = std::less<T>>
void tpie::merge_sorter< T, UseProgress, pred_t >::set_available_memory ( memory_size_type  m1,
memory_size_type  m2,
memory_size_type  m3 
)
inline

Calculate parameters from given memory amount.

Parameters
m1Memory available for phase 1
m2Memory available for phase 2
m3Memory available for phase 3

Definition at line 90 of file merge_sorter.h.

90  {
91  calculate_parameters(m1, m2, m3);
92  }
template<typename T , bool UseProgress, typename pred_t = std::less<T>>
void tpie::merge_sorter< T, UseProgress, pred_t >::set_items ( stream_size_type  n)
inline

Set upper bound on number of items pushed.

If the number of items to push is less than the size of a single run, this method will decrease the run size to that. This may make it easier for the sorter to go into internal reporting mode.

Definition at line 591 of file merge_sorter.h.

References tpie::sort_parameters::internalReportThreshold, tpie::log_debug(), and tpie::sort_parameters::runLength.

591  {
592  if (!m_parametersSet)
593  throw exception("Wrong state in set_items: parameters not set");
594  if (m_state != stParameters)
595  throw exception("Wrong state in set_items: state is not stParameters");
596 
597  if (n < p.runLength) {
598  log_debug() << "Decreasing run length from " << p.runLength
599  << " to " << n << std::endl;
600 
601  p.runLength = n;
602 
603  // Mirror the restriction from calculate_parameters.
606 
607  log_debug() << "New merge sort parameters\n";
608  p.dump(log_debug());
609  log_debug() << std::endl;
610  }
611  }
stream_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:124
stream_size_type runLength
Run length, subject to memory restrictions during phase 2.
template<typename T , bool UseProgress, typename pred_t = std::less<T>>
void tpie::merge_sorter< T, UseProgress, pred_t >::set_parameters ( stream_size_type  runLength,
memory_size_type  fanout 
)
inline

Enable setting run length and fanout manually (for testing purposes).

Definition at line 66 of file merge_sorter.h.

References tpie::sort_parameters::fanout, tpie::sort_parameters::finalFanout, tpie::sort_parameters::internalReportThreshold, tpie::log_debug(), tpie::sort_parameters::runLength, and tp_assert.

66  {
67  tp_assert(m_state == stParameters, "Merge sorting already begun");
68  p.runLength = p.internalReportThreshold = runLength;
69  p.fanout = p.finalFanout = fanout;
70  m_parametersSet = true;
71  log_debug() << "Manually set merge sort run length and fanout\n";
72  log_debug() << "Run length = " << p.runLength << " (uses memory " << (p.runLength*sizeof(T) + file_stream<T>::memory_usage()) << ")\n";
73  log_debug() << "Fanout = " << p.fanout << " (uses memory " << fanout_memory_usage(p.fanout) << ")" << std::endl;
74  }
stream_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
memory_size_type finalFanout
Fanout of merge tree during phase 4.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:124
stream_size_type runLength
Run length, subject to memory restrictions during phase 2.
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
memory_size_type fanout
Fanout of merge tree during phase 3.

The documentation for this class was generated from the following file: