TPIE

v1.1rc1-6-g0c97303
tpie::ami Namespace Reference

The namespace within TPIE for the Access Method Interface elements. More...

Classes

class  heap_element
 This is a heap element. More...
 
class  heap_ptr
 This is a record pointer element. More...
 
class  Internal_Sorter_Base
 The base class for internal sorters. More...
 
class  Internal_Sorter_Obj
 Comparision object based Internal_Sorter_base subclass implementation; uses quick_sort_obj(). More...
 
class  merge_base
 Superclass for merge management objects. More...
 
class  merge_heap_obj
 
class  merge_heap_op
 A merge heap object base class - also serves as the full implementation for objects with a < comparison operator. More...
 
class  merge_heap_ptr_obj
 A record pointer heap that uses a comparison object. More...
 
class  merge_heap_ptr_op
 A record pointer heap base class - also serves as the full implementation for objects with a < comparison operator. More...
 
class  qsort_item
 A simple class that facilitates doing key sorting followed by in-memory permuting to sort items in-memory. More...
 
class  stack
 An implementation of an external-memory stack compatible with the old AMI interface. More...
 
class  stream
 A Stream<T> object stores an ordered collection of objects of type T on external memory. More...
 

Typedefs

typedef TPIE_OS_SIZE_T arity_t
 Intended to signal the number of input streams in a merge. More...
 

Enumerations

enum  err {
  NO_ERROR = 0, IO_ERROR, END_OF_STREAM, READ_ONLY,
  OS_ERROR, BASE_METHOD, BTE_ERROR, MM_ERROR,
  OBJECT_INITIALIZATION, OBJECT_INVALID, PERMISSION_DENIED, INSUFFICIENT_MAIN_MEMORY,
  INSUFFICIENT_AVAILABLE_STREAMS, ENV_UNDEFINED, NO_MAIN_MEMORY_OPERATION, BIT_MATRIX_BOUNDS,
  NOT_POWER_OF_2, NULL_POINTER, GENERIC_ERROR = 0xfff, SCAN_DONE = 0x1000,
  SCAN_CONTINUE, MERGE_DONE = 0x2000, MERGE_CONTINUE, MERGE_OUTPUT,
  MERGE_READ_MULTIPLE, MATRIX_BOUNDS = 0x3000, SORT_ALREADY_SORTED = 0x4000
}
 Legacy TPIE error codes. More...
 
enum  merge_output_type { MERGE_OUTPUT_OVERWRITE = 1, MERGE_OUTPUT_APPEND }
 
enum  stream_type { READ_STREAM = 1, WRITE_STREAM, APPEND_STREAM, READ_WRITE_STREAM }
 AMI stream types passed to constructors. More...
 
enum  stream_status { STREAM_STATUS_VALID = 0, STREAM_STATUS_INVALID }
 AMI stream status. More...
 

Functions

template<class T , class M >
err merge (stream< T > **instreams, arity_t arity, stream< T > *outstream, M *m_obj)
 Merges arity streams using a merge management object and writes result into outstream. More...
 
template<class T , class M >
err partition_and_merge (stream< T > *instream, stream< T > *outstream, M *m_obj)
 Partitions a stream into substreams small enough to fit. More...
 
template<class T , class M >
err single_merge (stream< T > **instreams, arity_t arity, stream< T > *outstream, M *m_obj)
 Merges arity streams in memory using a merge management object and write result into outstream. More...
 
template<class T , class M >
err main_mem_merge (stream< T > *instream, stream< T > *outstream, M *m_obj)
 Reads instream in memory and merges it using m_obj->main_mem_operate(); if instream does not fit in main memory returns INSUFFICIENT_MAIN_MEMORY;. More...
 
template<class T , class M >
void merge_sorted_runs (typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator start, typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, M *MergeHeap, TPIE_OS_OFFSET cutoff=-1, progress_indicator_base *indicator=NULL)
 This is a common merge routine for all of the AMI_merge_sorted, AMI_ptr_merge_sorted and AMI_key_merge_sorted entry points. More...
 
template<class T , class CMPR >
void merge_sorted (typename tpie::array< std::auto_ptr< file_stream< T > > >::iterator start, typename tpie::array< std::auto_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, CMPR *cmp)
 Merging with a heap that contains the records to be merged. More...
 
template<class T , class CMPR >
void ptr_merge_sorted (typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator start, typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, CMPR *cmp)
 Merging with a heap that keeps a pointer to the records rather than the records themselves: CMPR is the class of the comparison object, and must contain the method compare() which is called from within the merge. More...
 
 TPIE_DEPRECATED_CLASS_A (template< typename T > class TPIE_DEPRECATED_CLASS_B queue:public tpie::queue< T >{public:queue(){}queue(const std::string &basename):tpie::queue< T >(basename){}})
 
err exception_kind (const exception &e)
 
template<class T >
err sort (stream< T > *instream_ami, stream< T > *outstream_ami, tpie::progress_indicator_base *indicator=NULL)
 
template<class T , class CMPR >
err sort (stream< T > *instream_ami, stream< T > *outstream_ami, CMPR *cmp, progress_indicator_base *indicator=NULL)
 
template<class T >
err ptr_sort (stream< T > *instream_ami, stream< T > *outstream_ami, progress_indicator_base *indicator=NULL)
 
template<class T , class CMPR >
err ptr_sort (stream< T > *instream_ami, stream< T > *outstream_ami, CMPR *cmp, progress_indicator_base *indicator=NULL)
 
template<class T , class KEY , class CMPR >
err key_sort (stream< T > *instream_ami, stream< T > *outstream_ami, KEY, CMPR *cmp, progress_indicator_base *indicator=NULL)
 
template<class T >
err sort (stream< T > *instream_ami, progress_indicator_base *indicator=0)
 
template<class T , class CMPR >
err sort (stream< T > *instream_ami, CMPR *cmp, progress_indicator_base *indicator=NULL)
 
template<class T >
err ptr_sort (stream< T > *instream_ami, progress_indicator_base *indicator=NULL)
 
template<class T , class CMPR >
err ptr_sort (stream< T > *instream_ami, CMPR *cmp, progress_indicator_base *indicator=NULL)
 
template<class T , class KEY , class CMPR >
err key_sort (stream< T > *instream_ami, KEY, CMPR *cmp, progress_indicator_base *indicator=NULL)
 

Detailed Description

The namespace within TPIE for the Access Method Interface elements.

In-place sorting variant of key_sort(stream<T> instream_ami, stream<T> *outstream_ami, KEY dummykey, CMPR *cmp, progress_indicator_base indicator=NULL), see also In-place Variants for Sorting in TPIE.

In-place sorting variant of ptr_sort(stream<T> instream_ami, stream<T> *outstream_ami, CMPR *cmp, progress_indicator_base indicator=NULL), see also In-place Variants for Sorting in TPIE.

In-place sorting variant of ptr_sort(stream<T> instream_ami, stream<T> *outstream_ami, progress_indicator_base indicator=NULL), see also In-place Variants for Sorting in TPIE.

In-place sorting variant of sort(stream<T> instream_ami, stream<T> *outstream_ami, CMPR *cmp, progress_indicator_base indicator=NULL), see also In-place Variants for Sorting in TPIE.

In-place sorting variant of sort(stream<T> instream_ami, stream<T> *outstream_ami, tpie::progress_indicator_base indicator=NULL), see also In-place Variants for Sorting in TPIE.

A version of sort that takes an input stream of elements of type T, an output stream, and a user-specified comparison object.

A version of sort that takes an input stream of elements of type T, and an output stream, and and uses the < operator to sort, see also Sorting in TPIE.

Comparing within Sorting:
TPIE's sort() has two polymorphs, namely the comparison operator and comparison class polymorphs. The comparison operator version tends to be the fastest and most straightforward to use. The comparison class version is comparable in speed (maybe slightly slower), but somewhat more flexible, as it can support multiple, different sorts on the same keys.
Comparison operator version:
This version works on streams of objects for which the operator "<" is defined.
Comparison class version:
This version of sort() uses a method of a user-defined comparison object to determine the order of two input objects. The object must have a public member function named compare(), having the following prototype:

inline int compare (const KEY & k1, const KEY & k2);

The user-written compare() function computes the order of the two user-defined keys k1 and k2, and returns -1, 0, or +1 to indicate that k1<k2, k1==k2, or k1>k2 respectively.

The comparison object "cmp", of (user-defined) class represented by CMPR, must have a member function called "compare" which is used for sorting the input stream; see also Comparing within Sorting.

Typedef Documentation

typedef TPIE_OS_SIZE_T tpie::ami::arity_t

Intended to signal the number of input streams in a merge.

Definition at line 44 of file merge_sorted_runs.h.

Enumeration Type Documentation

Legacy TPIE error codes.

Functions in the AMI interface of TPIE typically return error codes of the enumerated type err.

Enumerator
NO_ERROR 

No error occurred.

The call the the entry point returned normally.

IO_ERROR 

A low level I/O error occurred.

END_OF_STREAM 

An attempt was made to read past the end of a stream or write past the end of a substream.

READ_ONLY 

An attempt was made to write to a read-only stream.

OS_ERROR 

An unexpected operating system error occurred.

Details should appear in the log file if logging is enabled.

See also
sec_logging
BASE_METHOD 

An attempt was made to call a member function of the virtual base class of tpie::ami::stream.

This indicates a bug in the implementation of TPIE streams.

BTE_ERROR 

An error occurred at the BTE level.

MM_ERROR 

An error occurred within the memory manager.

OBJECT_INITIALIZATION 

An TPIE entry point was not able to properly initialize the operation management object that was passed to it.

This generally indicates a bug in the operation management object's initialization code.

OBJECT_INVALID 

A passed object is invalid.

PERMISSION_DENIED 

A passed object is inaccessible due to insufficient permissions.

INSUFFICIENT_MAIN_MEMORY 

The memory manager could not make adequate main memory available to complete the requested operation.

Many operations adapt themselves to use whatever main memory is available, but in some cases, when memory is extremely tight, they may not be able to function.

INSUFFICIENT_AVAILABLE_STREAMS 

TPIE could not allocate enough intermediate streams to perform the requested operation.

Certain operating system restrictions limit the number of streams that can be created on certain platforms. Only in unusual circumstances, such as when the application itself has a very large number of open streams, will this error occur.

ENV_UNDEFINED 

An environment variable necessary to initialize the TPIE accessing environment was not defined.

BIT_MATRIX_BOUNDS 

A bit matrix larger than the number of bits in an offset into a stream was passed.

NOT_POWER_OF_2 

The length of a stream on which a bit permutation was to be performed is not a power of two.

NULL_POINTER 

An attempt was made to perform a matrix operation on matrices whose bounds did not match appropriately.

SCAN_DONE 

Value returned by a scan_object: Indicates that the function should be called again with any "taken" inputs replaced by the next objects from their respective streams.

SCAN_CONTINUE 

Value returned by a scan_object: Indicates that the scan is complete and no more input needs to be processed.

MERGE_DONE 

Value returned by a merge_management_object, signaling that the merge() completed.

MERGE_CONTINUE 

Value returned by a merge_management_object, telling merge() to continue to call the operate() member function of the management object with more data.

MERGE_OUTPUT 

Value returned by a merge_management_object, signaling that the last merge() call generated output for the output stream.

MERGE_READ_MULTIPLE 

Value returned by a merge_management_object, telling merge() that more than one input ob ject was consumed and thus the input flags should be consulted.

MATRIX_BOUNDS 

Matrix related error.

SORT_ALREADY_SORTED 

Values returned by sort routines if input does not require sorting.

Definition at line 45 of file err.h.

45  {
47  NO_ERROR = 0,
49  IO_ERROR,
54  READ_ONLY,
57  OS_ERROR,
63  BTE_ERROR,
65  MM_ERROR,
90  NO_MAIN_MEMORY_OPERATION,
100 
101  GENERIC_ERROR = 0xfff,
102 
106  SCAN_DONE = 0x1000,
110 
112  MERGE_DONE = 0x2000,
119  MERGE_OUTPUT,
124 
126  MATRIX_BOUNDS = 0x3000,
127 
129  SORT_ALREADY_SORTED = 0x4000
130 
131  };
An TPIE entry point was not able to properly initialize the operation management object that was pass...
Definition: err.h:69
The length of a stream on which a bit permutation was to be performed is not a power of two...
Definition: err.h:96
Value returned by a merge_management_object, telling merge() to continue to call the operate() member...
Definition: err.h:116
Value returned by a scan_object: Indicates that the scan is complete and no more input needs to be pr...
Definition: err.h:109
No error occurred.
Definition: err.h:47
Value returned by a scan_object: Indicates that the function should be called again with any "taken" ...
Definition: err.h:106
A passed object is invalid.
Definition: err.h:71
Matrix related error.
Definition: err.h:126
A low level I/O error occurred.
Definition: err.h:49
An attempt was made to call a member function of the virtual base class of tpie::ami::stream.
Definition: err.h:61
A passed object is inaccessible due to insufficient permissions.
Definition: err.h:73
Value returned by a merge_management_object, signaling that the merge() completed.
Definition: err.h:112
An environment variable necessary to initialize the TPIE accessing environment was not defined...
Definition: err.h:89
Values returned by sort routines if input does not require sorting.
Definition: err.h:129
An error occurred within the memory manager.
Definition: err.h:65
An unexpected operating system error occurred.
Definition: err.h:57
TPIE could not allocate enough intermediate streams to perform the requested operation.
Definition: err.h:86
A bit matrix larger than the number of bits in an offset into a stream was passed.
Definition: err.h:93
An error occurred at the BTE level.
Definition: err.h:63
Value returned by a merge_management_object, telling merge() that more than one input ob ject was con...
Definition: err.h:123
An attempt was made to write to a read-only stream.
Definition: err.h:54
The memory manager could not make adequate main memory available to complete the requested operation...
Definition: err.h:79
An attempt was made to perform a matrix operation on matrices whose bounds did not match appropriatel...
Definition: err.h:99
An attempt was made to read past the end of a stream or write past the end of a substream.
Definition: err.h:52
Value returned by a merge_management_object, signaling that the last merge() call generated output fo...
Definition: err.h:119

Definition at line 45 of file merge.h.

45  {
46  MERGE_OUTPUT_OVERWRITE = 1,
47  MERGE_OUTPUT_APPEND
48  };

AMI stream status.

Enumerator
STREAM_STATUS_VALID 

Stream is valid.

STREAM_STATUS_INVALID 

Stream is invalid.

Definition at line 60 of file stream.h.

60  {
65  };
Stream is valid.
Definition: stream.h:62
Stream is invalid.
Definition: stream.h:64

AMI stream types passed to constructors.

Definition at line 52 of file stream.h.

52  {
53  READ_STREAM = 1, // Open existing stream for reading
54  WRITE_STREAM, // Open for writing. Create if non-existent
55  APPEND_STREAM, // Open for writing at end. Create if needed.
56  READ_WRITE_STREAM // Open to read and write.
57  };

Function Documentation

template<class T , class M >
err tpie::ami::main_mem_merge ( stream< T > *  instream,
stream< T > *  outstream,
M *  m_obj 
)

Reads instream in memory and merges it using m_obj->main_mem_operate(); if instream does not fit in main memory returns INSUFFICIENT_MAIN_MEMORY;.

Definition at line 442 of file merge.h.

References tpie::consecutive_memory_available(), INSUFFICIENT_MAIN_MEMORY, NO_ERROR, tpie::ami::stream< T >::read_array(), tpie::ami::stream< T >::seek(), tpie::ami::stream< T >::stream_len(), tp_assert, tpie::tpie_delete_array(), and tpie::ami::stream< T >::write_array().

Referenced by partition_and_merge().

443  {
444 
445  err ae;
446  TPIE_OS_OFFSET len;
447  TPIE_OS_SIZE_T sz_avail;
448 
449  // How much memory is available?
450  sz_avail = consecutive_memory_available ();
451 
452  len = instream->stream_len();
453  if ((len * static_cast<TPIE_OS_OFFSET>(sizeof(T))) <= static_cast<TPIE_OS_OFFSET>(sz_avail)) {
454 
455  // If the whole input can fit in main memory just call
456  // m_obj->main_mem_operate
457 
458  ae = instream->seek(0);
459  assert(ae == NO_ERROR);
460 
461  // This code is sloppy and has to be rewritten correctly for
462  // parallel buffer allocation. It will not work with anything
463  // other than a registration based memory manager.
464  T *mm_stream;
465  TPIE_OS_OFFSET len1;
466  //allocate and read input stream in memory we know it fits, so we may cast.
467  mm_stream = tpie_new_array<T>(static_cast<TPIE_OS_SIZE_T>(len));
468 
469  len1 = len;
470  if ((ae = instream->read_array(mm_stream, &len1)) !=
471  NO_ERROR) {
472  return ae;
473  }
474  tp_assert(len1 == len, "Did not read the right amount; "
475  "Allocated space for " << len << ", read " << len1 << '.');
476 
477  //just call m_obj->main_mem_operate. We know that len items fit into
478  //main memory, so we may cast to TPIE_OS_SIZE_T
479  if ((ae = m_obj->main_mem_operate(mm_stream,
480  static_cast<TPIE_OS_SIZE_T>(len))) !=
481  NO_ERROR) {
482  TP_LOG_WARNING_ID("main_mem_operate failed");
483  return ae;
484  }
485 
486  //write array back to stream
487  if ((ae = outstream->write_array(mm_stream,
488  static_cast<TPIE_OS_SIZE_T>(len))) !=
489  NO_ERROR) {
490  TP_LOG_WARNING_ID("write array failed");
491  return ae;
492  }
493 
494  tpie_delete_array(mm_stream, static_cast<TPIE_OS_SIZE_T>(len));
495  return NO_ERROR;
496 
497  } else {
498 
499  // Something went wrong. We should not have called this
500  // function, since we don't have enough main memory.
501  TP_LOG_WARNING_ID("out of memory");
503  }
504  };
size_t consecutive_memory_available(size_t granularity=5 *1024 *1024)
Find the largest amount of memory that can be allocated as a single chunk.
No error occurred.
Definition: err.h:47
err
Legacy TPIE error codes.
Definition: err.h:45
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
The memory manager could not make adequate main memory available to complete the requested operation...
Definition: err.h:79
void tpie_delete_array(T *a, size_t size)
Delete an array allocated with tpie_new_array.
Definition: memory.h:398
template<class T , class M >
err tpie::ami::merge ( stream< T > **  instreams,
arity_t  arity,
stream< T > *  outstream,
M *  m_obj 
)

Merges arity streams using a merge management object and writes result into outstream.

It is assumed that the available memory can fit the arity streams, the output stream and also the space required by the merge management object; merge() checks this and then calls single_merge();

Definition at line 253 of file merge.h.

References tpie::consecutive_memory_available(), INSUFFICIENT_MAIN_MEMORY, tpie::ami::stream< T >::main_memory_usage(), single_merge(), tpie::STREAM_USAGE_CURRENT, and tpie::STREAM_USAGE_MAXIMUM.

254  {
255 
256  TPIE_OS_SIZE_T sz_avail;
257  TPIE_OS_OFFSET sz_stream, sz_needed = 0;
258 
259  // How much main memory is available?
260  sz_avail = consecutive_memory_available ();
261 
262  // Iterate through the streams, finding out how much additional
263  // memory each stream will need in the worst case (the streams are
264  // in memory, but their memory usage could be smaller then the
265  // maximum one; one scenario is when the streams have been loaded
266  // from disk with no subsequent read_item/write_item operation, in
267  // which case their current memory usage is just the header block);
268  // count also the output stream
269  for (unsigned int ii = 0; ii < arity + 1; ii++) {
270  instreams[ii]->main_memory_usage(&sz_stream, STREAM_USAGE_MAXIMUM);
271  sz_needed += sz_stream;
272  instreams[ii]->main_memory_usage(&sz_stream, STREAM_USAGE_CURRENT);
273  sz_needed -= sz_stream;
274  }
275 
276  //count the space used by the merge_management object (include
277  //overhead added to a stream)
278  sz_needed += m_obj->space_usage_overhead() +
279  arity * m_obj->space_usage_per_stream();
280 
281  //streams and m_obj must fit in memory!
282  if (sz_needed >= static_cast<TPIE_OS_OFFSET>(sz_avail)) {
283  TP_LOG_WARNING("Insufficient main memory to perform a merge.\n");
285  }
286  assert(sz_needed < sz_avail);
287 
288  //merge streams in memory
289  return single_merge(instreams, arity, outstream, m_obj);
290  };
size_t consecutive_memory_available(size_t granularity=5 *1024 *1024)
Find the largest amount of memory that can be allocated as a single chunk.
Amount currently in use.
Definition: stream_usage.h:34
Max amount that will ever be used.
Definition: stream_usage.h:36
err single_merge(stream< T > **instreams, arity_t arity, stream< T > *outstream, M *m_obj)
Merges arity streams in memory using a merge management object and write result into outstream...
Definition: merge.h:298
The memory manager could not make adequate main memory available to complete the requested operation...
Definition: err.h:79
template<class T , class CMPR >
void tpie::ami::merge_sorted ( typename tpie::array< std::auto_ptr< file_stream< T > > >::iterator  start,
typename tpie::array< std::auto_ptr< file_stream< T > > >::iterator  end,
file_stream< T > *  outStream,
CMPR *  cmp 
)

Merging with a heap that contains the records to be merged.

CMPR is the class of the comparison object, and must contain the method compare() which is called from within the merge.

This is one of the merge entry points for merging without the merge_management_object used by TPIE's merge. These routines perform the special case of merging when the the required output is the original records interleaved according to a comparison operator or function.

Definition at line 149 of file merge_sorted_runs.h.

References tpie::ami::merge_heap_op< REC, Compare >::allocate(), and merge_sorted_runs().

151  {
152 
153  // make a merge heap which uses the user's comparison object
154  // and initialize it
155  merge_heap_obj<T,CMPR> mrgheap (cmp);
156  mrgheap.allocate(end-start);
157 
158  //Rewind all the input streams
159  for (typename tpie::array<std::auto_ptr<file_stream<T> > >::iterator i=start;
160  i != end; ++i)
161  i->seek(0);
162 
163  merge_sorted_runs(start, end, outStream, mrgheap);
164  }
A generic array with a fixed size.
Definition: array.h:143
void merge_sorted_runs(typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator start, typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, M *MergeHeap, TPIE_OS_OFFSET cutoff=-1, progress_indicator_base *indicator=NULL)
This is a common merge routine for all of the AMI_merge_sorted, AMI_ptr_merge_sorted and AMI_key_merg...
template<class T , class M >
void tpie::ami::merge_sorted_runs ( typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator  start,
typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator  end,
file_stream< T > *  outStream,
M *  MergeHeap,
TPIE_OS_OFFSET  cutoff = -1,
progress_indicator_base *  indicator = NULL 
)

This is a common merge routine for all of the AMI_merge_sorted, AMI_ptr_merge_sorted and AMI_key_merge_sorted entry points.

It is also used by the sort entry points AMI_sort, AMI_ptr_sort and AMI_key_sort and by the routine AMI_partition_and_merge. Differences are encapsulated within the merge heap object MergeHeap. It is assumed that MergeHeap::allocate() was called before entering ami_merge_sorted_runs. ami_merge_sorted_runs takes both the max number of elements to read from any stream and also a boolean flag for showing a progress indicator.

Sorted Stream Merging:
TPIE provides several merge entry points for merging sorted streams to produce a single, interleaved output stream. merge_sorted_runs has three polymorphs, namely the comparison operator, comparison class and the key-based polymorphs. The comparison operator version tends to be the fastest and most straightforward to use. The comparison class version is comparable in speed (maybe slightly slower), but somewhat more flexible, as it can support multiple, different merges on the same keys.

Definition at line 71 of file merge_sorted_runs.h.

References tpie::stream_crtp< child_t >::can_read(), tpie::file_stream< T >::read(), and tpie::file_stream< T >::write().

Referenced by merge_sorted(), and ptr_merge_sorted().

75  {
76 
77  size_t i;
78  size_t arity = end-start;
79 
80  //Pointers to current leading elements of streams
81  tpie::array<const T *> in_objects(arity);
82  tpie::array<TPIE_OS_OFFSET> nread(arity);
83 
84  // **************************************************************
85  // * Read first element from stream. Do not rewind! We may read *
86  // * more elements from the same stream later. *
87  // **************************************************************
88 
89  for (i = 0; i < arity; i++) {
90  file_stream<T> * stream = (start+i)->get();
91  if (stream->can_read()) {
92  in_objects[i] = &stream->read();
93  MergeHeap->insert( in_objects[i], i );
94  } else
95  in_objects[i] = NULL;
96  nread[i] = 1;
97  if (indicator) indicator->step();
98  }
99 
100  // *********************************************************
101  // * Build a heap from the smallest items of each stream *
102  // *********************************************************
103 
104  MergeHeap->initialize ( );
105 
106  // *********************************************************
107  // * Perform the merge until the inputs are exhausted. *
108  // *********************************************************
109  while (MergeHeap->sizeofheap() > 0) {
110  i = MergeHeap->get_min_run_id ();
111  outStream->write(*in_objects[i]);
112 
113  bool eof = false;
114 
115  //Check if we read as many elements as we are allowed to
116  if ( (cutoff != -1) && (nread[i]>=cutoff))
117  eof = true;
118  else {
119  file_stream<T> * stream = (start+i)->get();
120  if (stream->can_read()) in_objects[i] = &stream->read();
121  else eof = true;
122 
123  if (indicator) indicator->step();
124  }
125 
126  if (eof)
127  MergeHeap->delete_min_and_insert(NULL);
128  else {
129  nread[i]++;
130  MergeHeap->delete_min_and_insert (in_objects[i]);
131  }
132  } // while
133  }
A generic array with a fixed size.
Definition: array.h:143
template<class T , class M >
err tpie::ami::partition_and_merge ( stream< T > *  instream,
stream< T > *  outstream,
M *  m_obj 
)

Partitions a stream into substreams small enough to fit.

in main memory, operates on each in main memory, and then merges them together, possibly in several passes if low memory conditions dictate. This function takes three arguments: instream points to the input stream, outstream points to the output stream, and mo points to a merge management object that controls the merge. This function takes care of all the details of determining how much main memory is available, how big the initial substreams can be, how many streams can be merged at a time, and how many levels of merging must take place.

In order to complete the merge successfully, the function needs sufficient memory for a binary merge. If not enough memory is available, the function fails and it returns INSUFFICIENT_MAIN_MEMORY. Otherwise, it returns NO_ERROR.

Merge Management Objects for partition_and_merge:
The partition_and_merge() entry point requires a merge management object similar to the one described here. The following three additional member functions must also be provided.
main_mem_operate():
err main_mem_operate(T* mm_stream, size_t len); where mm_stream is a pointer to an array of objects that have been read into main memory, len is the number of objects in the array. This function is called by AMI_partition_and_merge() when a substream of the data is small enough to fit into main memory, and the (application-specific) processing of this subset of the data can therefore be completed in internal memory.
space_usage_per_stream():
size_t space_usage_per_stream(void); This function should return the amount of main memory that the merge management object will need per per input stream. Merge management objects are allowed to maintain data structures whose size is linear in the number of input streams being processed.
space_usage_overhead(): size_t space_usage_overhead(void); This function should return an upper bound on the number of bytes of
main memory the merge management object will allocate in addition to the portion that is linear in the number of streams.

Definition at line 508 of file merge.h.

References tpie::ami::stream< T >::available_streams(), tpie::ami::stream< T >::chunk_size(), tpie::consecutive_memory_available(), INSUFFICIENT_AVAILABLE_STREAMS, INSUFFICIENT_MAIN_MEMORY, main_mem_merge(), tpie::ami::stream< T >::main_memory_usage(), NO_ERROR, tpie::ami::stream< T >::persist(), tpie::ami::stream< T >::read_array(), tpie::ami::stream< T >::seek(), single_merge(), tpie::ami::stream< T >::stream_len(), tpie::STREAM_USAGE_MAXIMUM, tp_assert, tpie::tpie_delete(), tpie::tpie_delete_array(), and tpie::ami::stream< T >::write_array().

509  {
510 
511  err ae;
512  TPIE_OS_OFFSET len;
513  TPIE_OS_SIZE_T sz_avail, sz_stream;
514  unsigned int ii;
515  int jj;
516 
517  //How much memory is available?
518  sz_avail = consecutive_memory_available ();
519 
520  // If the whole input can fit in main memory then just call
521  // main_mem_merge() to deal with it by loading it once and
522  // processing it.
523  len = instream->stream_len();
524  if ((len * static_cast<TPIE_OS_OFFSET>(sizeof(T))) <= static_cast<TPIE_OS_OFFSET>(sz_avail)) {
525  return main_mem_merge(instream, outstream, m_obj);
526  }
527  //else {
528 
529 
530 
531  // The number of substreams that can be merged together at once; i
532  // this many substreams (at most) we are dividing the input stream
533  arity_t merge_arity;
534 
535  //nb of substreams the original input stream will be split into
536  arity_t nb_orig_substr;
537 
538  // length (nb obj of type T) of the original substreams of the input
539  // stream. The last one may be shorter than this.
540  TPIE_OS_OFFSET sz_orig_substr;
541 
542  // The initial temporary stream, to which substreams of the
543  // original input stream are written.
544  stream<T> *initial_tmp_stream;
545 
546  // A pointer to the buffer in main memory to read a memory load into.
547  T *mm_stream;
548 
549 
550  // Loop variables:
551 
552  // The stream being read at the current level.
553  stream<T> *current_input;
554 
555  // The output stream for the current level if it is not outstream.
556  stream<T> *intermediate_tmp_stream;
557 
558  // The size of substreams of *current_input that are being
559  // merged. The last one may be smaller. This value should be
560  // sz_orig_substr * (merge_arity ** k) where k is the
561  // number of iterations the loop has gone through.
562  TPIE_OS_OFFSET current_substream_len;
563 
564  // The exponenent used to verify that current_substream_len is
565  // correct.
566  unsigned int k;
567 
568  TPIE_OS_OFFSET sub_start, sub_end;
569 
570 
571 
572  // How many substreams will there be? The main memory
573  // available to us is the total amount available, minus what
574  // is needed for the input stream and the temporary stream.
575  if ((ae = instream->main_memory_usage(&sz_stream, STREAM_USAGE_MAXIMUM))
576  != NO_ERROR) {
577  return ae;
578  }
579  if (sz_avail <= 2 * sz_stream + sizeof(T)) {
581  }
582  sz_avail -= 2 * sz_stream;
583 
584 
585  // number of elements that will fit in memory (M) -R
586  sz_orig_substr = sz_avail / sizeof(T);
587 
588  // Round the original substream length off to an integral number of
589  // chunks. This is for systems like HP-UX that cannot map in
590  // overlapping regions. It is also required for BTE's that are
591  // capable of freeing chunks as they are read.
592  {
593  TPIE_OS_OFFSET sz_chunk_size = instream->chunk_size();
594 
595  sz_orig_substr = sz_chunk_size *
596  ((sz_orig_substr + sz_chunk_size - 1) /sz_chunk_size);
597  // WARNING sz_orig_substr now may not fit in memory!!! -R
598  }
599 
600  // number of memoryloads in input ceil(N/M) -R
601  nb_orig_substr = static_cast<arity_t>((len + sz_orig_substr - 1) /
602  sz_orig_substr);
603 
604  // Account for the space that a merge object will use.
605  {
606  TPIE_OS_SIZE_T sz_avail_during_merge = sz_avail - m_obj->space_usage_overhead();
607  TPIE_OS_SIZE_T sz_stream_during_merge = sz_stream +m_obj->space_usage_per_stream();
608 
609  merge_arity = static_cast<arity_t>((sz_avail_during_merge +
610  sz_stream_during_merge - 1) /
611  sz_stream_during_merge);
612  }
613 
614  // Make sure that the AMI is willing to provide us with the number
615  // of substreams we want. It may not be able to due to operating
616  // system restrictions, such as on the number of regions that can be
617  // mmap()ed in.
618  {
619  int ami_available_streams = instream->available_streams();
620 
621  if (ami_available_streams != -1) {
622  if (ami_available_streams <= 4) {
624  }
625  // safe to cast, since ami_available_streams > 4
626  if (merge_arity > static_cast<arity_t>(ami_available_streams) - 2) {
627  merge_arity = ami_available_streams - 2;
628  TP_LOG_DEBUG_ID("Reduced merge arity due to AMI restrictions.");
629  }
630  }
631  }
632  TP_LOG_DEBUG_ID("partition_and_merge(): merge arity = "
633  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(merge_arity));
634  if (merge_arity < 2) {
636  }
637 
638 
639  //#define MINIMIZE_INITIAL_SUBSTREAM_LENGTH
640 #ifdef MINIMIZE_INITIAL_SUBSTREAM_LENGTH
641 
642  // Make the substreams as small as possible without increasing the
643  // height of the merge tree.
644  {
645  // The tree height is the ceiling of the log base merge_arity
646  // of the number of original substreams.
647 
648  double tree_height = log((double)nb_orig_substr)/ log((double)merge_arity);
649  tp_assert(tree_height > 0, "Negative or zero tree height!");
650 
651  tree_height = ceil(tree_height);
652 
653  // See how many substreams we could possibly fit in the tree
654  // without increasing the height.
655  double max_original_substreams = pow((double)merge_arity, tree_height);
656  tp_assert(max_original_substreams >= nb_orig_substr,
657  "Number of permitted substreams was reduced.");
658 
659  // How big will such substreams be?
660  double new_sz_original_substream = ceil((double)len /
661  max_original_substreams);
662  tp_assert(new_sz_original_substream <= sz_orig_substr,
663  "Size of original streams increased.");
664 
665  sz_orig_substr = (size_t)new_sz_original_substream;
666  TP_LOG_DEBUG_ID("Memory constraints set original substreams = " << nb_orig_substr);
667 
668  nb_orig_substr = (len + sz_orig_substr - 1) / sz_orig_substr;
669  TP_LOG_DEBUG_ID("Tree height constraints set original substreams = " << nb_orig_substr);
670  }
671 #endif // MINIMIZE_INITIAL_SUBSTREAM_LENGTH
672 
673 
674  // Create a temporary stream, then iterate through the substreams,
675  // processing each one and writing it to the corresponding substream
676  // of the temporary stream.
677  initial_tmp_stream = tpie_new<stream<T> >();
678  mm_stream = tpie_new_array<T>(static_cast<TPIE_OS_SIZE_T>(sz_orig_substr));
679 
680  instream->seek(0);
681  assert(ae == NO_ERROR);
682 
683  tp_assert(static_cast<TPIE_OS_OFFSET>(nb_orig_substr * sz_orig_substr - len) < sz_orig_substr,
684  "Total substream length too long or too many.");
685  tp_assert(len - static_cast<TPIE_OS_OFFSET>(nb_orig_substr - 1) * sz_orig_substr <= sz_orig_substr,
686  "Total substream length too short or too few.");
687 
688  for (ii = 0; ii++ < nb_orig_substr; ) {
689 
690  TPIE_OS_OFFSET mm_len;
691  if (ii == nb_orig_substr) {
692  mm_len = len % sz_orig_substr;
693  // If it is an exact multiple, then the mod will come out 0,
694  // which is wrong.
695  if (!mm_len) {
696  mm_len = sz_orig_substr;
697  }
698  } else {
699  mm_len = sz_orig_substr;
700  }
701 #ifndef TPIE_NDEBUG
702  TPIE_OS_OFFSET mm_len_bak = mm_len;
703 #endif
704 
705  // Read a memory load out of the input stream.
706  ae = instream->read_array(mm_stream, &mm_len);
707  if (ae != NO_ERROR) {
708  return ae;
709  }
710  tp_assert(mm_len == mm_len_bak,
711  "Did not read the requested number of objects." <<
712  "\n\tmm_len = " << mm_len <<
713  "\n\tmm_len_bak = " << mm_len_bak << '.');
714 
715  // Solve in main memory. We know it fits, so cast to TPIE_OS_SIZE_T
716  m_obj->main_mem_operate(mm_stream, static_cast<TPIE_OS_SIZE_T>(mm_len));
717 
718  // Write the result out to the temporary stream.
719  ae = initial_tmp_stream->write_array(mm_stream, static_cast<TPIE_OS_SIZE_T>(mm_len));
720  if (ae != NO_ERROR) {
721  return ae;
722  }
723  } //for
724  tpie_delete_array(mm_stream, static_cast<TPIE_OS_SIZE_T>(sz_orig_substr));
725 
726  // Make sure the total length of the temporary stream is the same as
727  // the total length of the original input stream.
728  tp_assert(instream->stream_len() == initial_tmp_stream->stream_len(),
729  "Stream lengths do not match:" <<
730  "\n\tinstream->stream_len() = " << instream->stream_len() <<
731  "\n\tinitial_tmp_stream->stream_len() = " <<
732  initial_tmp_stream->stream_len() << ".\n");
733 
734  // Set up the loop invariants for the first iteration of hte main
735  // loop.
736  current_input = initial_tmp_stream;
737  current_substream_len = sz_orig_substr;
738 
739  // Pointers to the substreams that will be merged.
740  stream<T>* *the_substreams = tpie_new_array<stream<T>* >(merge_arity);
741 
742  //Monitoring prints.
743  TP_LOG_DEBUG_ID("Number of runs from run formation is "
744  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(nb_orig_substr));
745  TP_LOG_DEBUG_ID("Merge arity is "
746  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(merge_arity));
747 
748 
749  k = 0;
750  // The main loop. At the outermost level we are looping over levels
751  // of the merge tree. Typically this will be very small, e.g. 1-3.
752  // CAVEAT: is the cast o.k.??
753  for( ; current_substream_len < len;
754  current_substream_len *= merge_arity) {
755 
756  // The number of substreams to be processed at this level.
757  arity_t substream_count;
758 
759  // Set up to process a given level.
760  tp_assert(len == current_input->stream_len(),
761  "Current level stream not same length as input." <<
762  "\n\tlen = " << len <<
763  "\n\tcurrent_input->stream_len() = " <<
764  current_input->stream_len() << ".\n");
765 
766  // Do we have enough main memory to merge all the substreams on
767  // the current level into the output stream? If so, then we will
768  // do so, if not then we need an additional level of iteration to
769  // process the substreams in groups.
770  substream_count = static_cast<arity_t>((len + current_substream_len - 1) /
771  current_substream_len);
772 
773  if (substream_count <= merge_arity) {
774 
775  TP_LOG_DEBUG_ID("Merging substreams directly to the output stream.");
776 
777  // Create all the substreams
778  for (sub_start = 0, ii = 0 ;
779  ii < substream_count;
780  sub_start += current_substream_len, ii++) {
781 
782  sub_end = sub_start + current_substream_len - 1;
783  if (sub_end >= len) {
784  sub_end = len - 1;
785  }
786  current_input->new_substream(READ_STREAM, sub_start, sub_end, the_substreams + ii);
787  // The substreams are read-once.
788  the_substreams[ii]->persist(PERSIST_READ_ONCE);
789  }
790 
791  tp_assert((sub_start >= len) &&
792  (sub_start < len + current_substream_len),
793  "Loop ended in wrong location.");
794 
795  // Fool the OS into unmapping the current block of the input
796  // stream so that blocks of the substreams can be mapped in
797  // without overlapping it. This is needed for correct execution
798  // on HP-UX.
799  //this needs to be cleaned up..Laura
800  current_input->seek(0);
801  assert(ae == NO_ERROR);
802 
803  // Merge them into the output stream.
804  ae = single_merge(the_substreams, substream_count, outstream, m_obj);
805  if (ae != NO_ERROR) {
806  return ae;
807  }
808  // Delete the substreams.
809  for (ii = 0; ii < substream_count; ii++) {
810  tpie_delete(the_substreams[ii]);
811  }
812  // And the current input, which is an intermediate stream of
813  // some kind.
814  tpie_delete(current_input);
815  } else {
816 
817  //substream_count is > merge_arity
818  TP_LOG_DEBUG_ID("Merging substreams to an intermediate stream.");
819 
820  // Create the next intermediate stream.
821  intermediate_tmp_stream = new stream<T>;
822 
823  // Fool the OS into unmapping the current block of the input
824  // stream so that blocks of the substreams can be mapped in
825  // without overlapping it. This is needed for correct execution
826  // on HU-UX.
827  //this needs to be cleaned up..Laura
828  current_input->seek(0);
829  assert(ae == NO_ERROR);
830 
831  // Loop through the substreams of the current stream, merging as
832  // many as we can at a time until all are done with.
833  for (sub_start = 0, ii = 0, jj = 0;
834  ii < substream_count;
835  sub_start += current_substream_len, ii++, jj++) {
836 
837  sub_end = sub_start + current_substream_len - 1;
838  if (sub_end >= len) {
839  sub_end = len - 1;
840  }
841  current_input->new_substream(READ_STREAM, sub_start, sub_end, the_substreams + jj);
842  // The substreams are read-once.
843  the_substreams[jj]->persist(PERSIST_READ_ONCE);
844 
845  // If we've got all we can handle or we've seen them all, then
846  // merge them.
847  if ((jj >= static_cast<int>(merge_arity) - 1) ||
848  (ii == substream_count - 1)) {
849 
850  tp_assert(jj <= static_cast<int>(merge_arity) - 1,
851  "Index got too large.");
852 #if DEBUG_ASSERTIONS
853  // Check the lengths before the merge.
854  TPIE_OS_OFFSET sz_output, sz_output_after_merge;
855  TPIE_OS_OFFSET sz_substream_total;
856 
857  {
858  unsigned int kk;
859 
860  sz_output = intermediate_tmp_stream->stream_len();
861  sz_substream_total = 0;
862 
863  for (kk = jj+1; kk--; ) {
864  sz_substream_total += the_substreams[kk]->stream_len();
865  }
866 
867  }
868 #endif
869 
870  // This should append to the stream, since
871  // single_merge() does not rewind the output before
872  // merging.
873  ae = single_merge(the_substreams, jj+1,
874  intermediate_tmp_stream, m_obj);
875  if (ae != NO_ERROR) {
876  return ae;
877  }
878 
879 #if DEBUG_ASSERTIONS
880  // Verify the total lengths after the merge.
881  sz_output_after_merge = intermediate_tmp_stream->stream_len();
882  tp_assert(sz_output_after_merge - sz_output ==
883  sz_substream_total,
884  "Stream lengths do not add up: " <<
885  sz_output_after_merge - sz_output <<
886  " written when " <<
887  sz_substream_total <<
888  " were to have been read.");
889 
890 #endif
891 
892  // Delete the substreams. jj is currently the index of the
893  // largest, so we want to bump it up before the idiomatic
894  // loop.
895  for (jj++; jj--; )
896  tpie_delete(the_substreams[jj]);
897 
898  // Now jj should be -1 so that it gets bumped back up to 0
899  // before the next iteration of the outer loop.
900  tp_assert((jj == -1), "Index not reduced to -1.");
901 
902  } // if
903  } //for
904 
905  // Get rid of the current input stream and use the next one.
906  tpie_delete(current_input);
907  current_input = intermediate_tmp_stream;
908  }
909 
910  k++;
911 
912  }
913 
914  //Monitoring prints.
915  TP_LOG_DEBUG_ID("Number of passes incl run formation is " << k+1);
916 
917  tpie_delete_array(the_substreams, merge_arity);
918  return NO_ERROR;
919 
920  }
size_t consecutive_memory_available(size_t granularity=5 *1024 *1024)
Find the largest amount of memory that can be allocated as a single chunk.
Max amount that will ever be used.
Definition: stream_usage.h:36
No error occurred.
Definition: err.h:47
err single_merge(stream< T > **instreams, arity_t arity, stream< T > *outstream, M *m_obj)
Merges arity streams in memory using a merge management object and write result into outstream...
Definition: merge.h:298
TPIE_OS_SIZE_T arity_t
Intended to signal the number of input streams in a merge.
Definition: merge.h:59
err main_mem_merge(stream< T > *instream, stream< T > *outstream, M *m_obj)
Reads instream in memory and merges it using m_obj->main_mem_operate(); if instream does not fit in m...
Definition: merge.h:442
TPIE could not allocate enough intermediate streams to perform the requested operation.
Definition: err.h:86
err
Legacy TPIE error codes.
Definition: err.h:45
void tpie_delete(T *p)
Delete an object allocated with tpie_new.
Definition: memory.h:380
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
The memory manager could not make adequate main memory available to complete the requested operation...
Definition: err.h:79
void tpie_delete_array(T *a, size_t size)
Delete an array allocated with tpie_new_array.
Definition: memory.h:398
template<class T , class CMPR >
void tpie::ami::ptr_merge_sorted ( typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator  start,
typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator  end,
file_stream< T > *  outStream,
CMPR *  cmp 
)

Merging with a heap that keeps a pointer to the records rather than the records themselves: CMPR is the class of the comparison object, and must contain the method compare() which is called from within the merge.

This is one of the merge entry points for merging without the merge_management_object used by TPIE's merge. These routines perform the special case of merging when the the required output is the original records interleaved according to a comparison operator or function.

Definition at line 181 of file merge_sorted_runs.h.

References tpie::ami::merge_heap_ptr_op< REC, CMPR >::allocate(), and merge_sorted_runs().

184  {
185 
186  // make a merge heap of pointers which uses the user's comparison
187  // object and initialize it
188  merge_heap_ptr_obj<T,CMPR> mrgheap (cmp);
189  mrgheap.allocate(end-start);
190 
191  // Rewind all the input streams
192  for (typename tpie::array<std::auto_ptr<file_stream<T> > >::iteratorarity_t i=start;
193  i != end; ++i)
194  i->seek(0);
195 
196  merge_sorted_runs(start, end, outStream, mrgheap);
197  }
A generic array with a fixed size.
Definition: array.h:143
void merge_sorted_runs(typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator start, typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, M *MergeHeap, TPIE_OS_OFFSET cutoff=-1, progress_indicator_base *indicator=NULL)
This is a common merge routine for all of the AMI_merge_sorted, AMI_ptr_merge_sorted and AMI_key_merg...
template<class T , class M >
err tpie::ami::single_merge ( stream< T > **  instreams,
arity_t  arity,
stream< T > *  outstream,
M *  m_obj 
)

Merges arity streams in memory using a merge management object and write result into outstream.

Definition at line 298 of file merge.h.

References END_OF_STREAM, MERGE_CONTINUE, MERGE_DONE, MERGE_OUTPUT, MERGE_READ_MULTIPLE, NO_ERROR, OBJECT_INITIALIZATION, tpie::ami::stream< T >::read_item(), tp_assert, tpie::tpie_delete_array(), and tpie::ami::stream< T >::write_item().

Referenced by merge(), and partition_and_merge().

299  {
300 
301  TPIE_OS_SIZE_T ii;
302  err ami_err;
303 
304  // Create an array of pointers for the input.
305  T* *in_objects = tpie_new_array<T*>(arity);
306 
307  // Create an array of flags the merge object can use to ask for more
308  // input from specific streams.
309  merge_flag* taken_flags = tpie_new_array<merge_flag>(arity);
310 
311  // An index to speed things up when the merge object takes only from
312  // one index.
313  int taken_index;
314 
315  //Output of the merge object.
316  T merge_out;
317 
318 #if DEBUG_PERFECT_MERGE
319  unsigned int input_count = 0, output_count = 0;
320 #endif
321 
322  // Rewind and read the first item from every stream; count the
323  // number of non-null items read
324  for (ii = arity; ii--; ) {
325  if ((ami_err = instreams[ii]->seek(0)) != NO_ERROR) {
326  tpie_delete_array(in_objects, arity);
327  tpie_delete_array(taken_flags, arity);
328  return ami_err;
329  }
330  if ((ami_err = instreams[ii]->read_item(&(in_objects[ii]))) !=
331  NO_ERROR) {
332  //error on read
333  if (ami_err == END_OF_STREAM) {
334  in_objects[ii] = NULL;
335  } else {
336  tpie_delete_array(in_objects, arity);
337  tpie_delete_array(taken_flags, arity);
338  return ami_err;
339  }
340  // Set the taken flags to 0 before we call intialize()
341  taken_flags[ii] = 0;
342  } else {
343  //item read succesfully
344 #if DEBUG_PERFECT_MERGE
345  input_count++;
346 #endif
347  }
348  }
349 
350  // Initialize the merge object.
351  if (((ami_err = m_obj->initialize(arity, in_objects, taken_flags,
352  taken_index)) != NO_ERROR) &&
353  (ami_err != MERGE_READ_MULTIPLE)) {
354  return OBJECT_INITIALIZATION;
355  }
356 
357 
358  // Now simply call the merge object repeatedly until it claims to
359  // be done or generates an error.
360  while (1) {
361  if (ami_err == MERGE_READ_MULTIPLE) {
362  for (ii = arity; ii--; ) {
363  if (taken_flags[ii]) {
364  ami_err = instreams[ii]->read_item(&(in_objects[ii]));
365  if (ami_err != NO_ERROR) {
366  if (ami_err == END_OF_STREAM) {
367  in_objects[ii] = NULL;
368  } else {
369  tpie_delete_array(in_objects, arity);
370  tpie_delete_array(taken_flags, arity);
371  return ami_err;
372  }
373  } else {
374 #if DEBUG_PERFECT_MERGE
375  input_count++;
376 #endif
377  }
378  }
379  // Clear all flags before operate is called.
380  taken_flags[ii] = 0;
381  }
382  } else {
383  // The last call took at most one item.
384  if (taken_index >= 0) {
385  ami_err = instreams[taken_index]->
386  read_item(&(in_objects[taken_index]));
387  if (ami_err != NO_ERROR) {
388  if (ami_err == END_OF_STREAM) {
389  in_objects[taken_index] = NULL;
390  } else {
391  tpie_delete_array(in_objects, arity);
392  tpie_delete_array(taken_flags, arity);
393  return ami_err;
394  }
395  } else {
396 #if DEBUG_PERFECT_MERGE
397  input_count++;
398 #endif
399  }
400  taken_flags[taken_index] = 0;
401  }
402  }
403  ami_err = m_obj->operate(in_objects, taken_flags, taken_index,
404  &merge_out);
405  if (ami_err == MERGE_DONE) {
406  break;
407  } else if (ami_err == MERGE_OUTPUT) {
408 #if DEBUG_PERFECT_MERGE
409  output_count++;
410 #endif
411  if ((ami_err = outstream->write_item(merge_out)) !=
412  NO_ERROR) {
413  tpie_delete_array(in_objects, arity);
414  tpie_delete_array(taken_flags, arity);
415  return ami_err;
416  }
417  } else if ((ami_err != MERGE_CONTINUE) &&
418  (ami_err != MERGE_READ_MULTIPLE)) {
419  tpie_delete_array(in_objects, arity);
420  tpie_delete_array(taken_flags, arity);
421  return ami_err;
422  }
423  }
424 
425 #if DEBUG_PERFECT_MERGE
426  tp_assert(input_count == output_count,
427  "Merge done, input_count = " << input_count <<
428  ", output_count = " << output_count << '.');
429 #endif
430 
431  tpie_delete_array(in_objects, arity);
432  tpie_delete_array(taken_flags, arity);
433 
434  return NO_ERROR;
435  };
An TPIE entry point was not able to properly initialize the operation management object that was pass...
Definition: err.h:69
Value returned by a merge_management_object, telling merge() to continue to call the operate() member...
Definition: err.h:116
No error occurred.
Definition: err.h:47
int merge_flag
Intended to signal in a merge which of the input streams are non-empty.
Definition: merge.h:57
Value returned by a merge_management_object, signaling that the merge() completed.
Definition: err.h:112
err
Legacy TPIE error codes.
Definition: err.h:45
Value returned by a merge_management_object, telling merge() that more than one input ob ject was con...
Definition: err.h:123
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
An attempt was made to read past the end of a stream or write past the end of a substream.
Definition: err.h:52
Value returned by a merge_management_object, signaling that the last merge() call generated output fo...
Definition: err.h:119
void tpie_delete_array(T *a, size_t size)
Delete an array allocated with tpie_new_array.
Definition: memory.h:398