TPIE

v1.1rc1-6-g0c97303
merge_sorted_runs.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2008, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef _MERGE_SORTED_RUNS_H
21 #define _MERGE_SORTED_RUNS_H
22 
29 
30 // Get definitions for working with Unix and Windows
31 #include <tpie/portability.h>
32 
33 // Includes needed from TPIE
34 #include <tpie/mergeheap.h> //For templated heaps
35 
37 
38 
39 namespace tpie {
40 
41  namespace ami {
42 
44  typedef TPIE_OS_SIZE_T arity_t;
45 
70  template <class T, class M>
71  void merge_sorted_runs(typename tpie::array<tpie::auto_ptr<file_stream<T> > >::iterator start,
72  typename tpie::array<tpie::auto_ptr<file_stream<T> > >::iterator end,
73  file_stream<T> *outStream, M* MergeHeap,
74  TPIE_OS_OFFSET cutoff=-1,
75  progress_indicator_base* indicator = NULL) {
76 
77  size_t i;
78  size_t arity = end-start;
79 
80  //Pointers to current leading elements of streams
81  tpie::array<const T *> in_objects(arity);
82  tpie::array<TPIE_OS_OFFSET> nread(arity);
83 
84  // **************************************************************
85  // * Read first element from stream. Do not rewind! We may read *
86  // * more elements from the same stream later. *
87  // **************************************************************
88 
89  for (i = 0; i < arity; i++) {
90  file_stream<T> * stream = (start+i)->get();
91  if (stream->can_read()) {
92  in_objects[i] = &stream->read();
93  MergeHeap->insert( in_objects[i], i );
94  } else
95  in_objects[i] = NULL;
96  nread[i] = 1;
97  if (indicator) indicator->step();
98  }
99 
100  // *********************************************************
101  // * Build a heap from the smallest items of each stream *
102  // *********************************************************
103 
104  MergeHeap->initialize ( );
105 
106  // *********************************************************
107  // * Perform the merge until the inputs are exhausted. *
108  // *********************************************************
109  while (MergeHeap->sizeofheap() > 0) {
110  i = MergeHeap->get_min_run_id ();
111  outStream->write(*in_objects[i]);
112 
113  bool eof = false;
114 
115  //Check if we read as many elements as we are allowed to
116  if ( (cutoff != -1) && (nread[i]>=cutoff))
117  eof = true;
118  else {
119  file_stream<T> * stream = (start+i)->get();
120  if (stream->can_read()) in_objects[i] = &stream->read();
121  else eof = true;
122 
123  if (indicator) indicator->step();
124  }
125 
126  if (eof)
127  MergeHeap->delete_min_and_insert(NULL);
128  else {
129  nread[i]++;
130  MergeHeap->delete_min_and_insert (in_objects[i]);
131  }
132  } // while
133  }
134 
135 
148  template <class T, class CMPR>
149  void merge_sorted(typename tpie::array<std::auto_ptr<file_stream<T> > >::iterator start,
150  typename tpie::array<std::auto_ptr<file_stream<T> > >::iterator end,
151  file_stream<T> *outStream, CMPR *cmp) {
152 
153  // make a merge heap which uses the user's comparison object
154  // and initialize it
155  merge_heap_obj<T,CMPR> mrgheap (cmp);
156  mrgheap.allocate(end-start);
157 
158  //Rewind all the input streams
159  for (typename tpie::array<std::auto_ptr<file_stream<T> > >::iterator i=start;
160  i != end; ++i)
161  i->seek(0);
162 
163  merge_sorted_runs(start, end, outStream, mrgheap);
164  }
165 
166 
180  template <class T, class CMPR>
181  void ptr_merge_sorted(typename tpie::array<tpie::auto_ptr<file_stream<T> > >::iterator start,
182  typename tpie::array<tpie::auto_ptr<file_stream<T> > >::iterator end,
183  file_stream<T> *outStream,
184  CMPR *cmp) {
185 
186  // make a merge heap of pointers which uses the user's comparison
187  // object and initialize it
188  merge_heap_ptr_obj<T,CMPR> mrgheap (cmp);
189  mrgheap.allocate(end-start);
190 
191  // Rewind all the input streams
192  for (typename tpie::array<std::auto_ptr<file_stream<T> > >::iteratorarity_t i=start;
193  i != end; ++i)
194  i->seek(0);
195 
196  merge_sorted_runs(start, end, outStream, mrgheap);
197  }
198  } // ami namespace
199 
200 } // tpie namespace
201 
202 #endif //_MERGE_SORTED_RUNS_H
The base class for indicating the progress of some task.
A generic array with a fixed size.
Definition: array.h:143
TPIE_OS_SIZE_T arity_t
Intended to signal the number of input streams in a merge.
const item_type & read()
Read an item from the stream.
Definition: file_stream.h:91
This file contains a few deprecated definitions for legacy code.
void write(const item_type &item)
Write an item to the stream.
Definition: file_stream.h:64
void ptr_merge_sorted(typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator start, typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, CMPR *cmp)
Merging with a heap that keeps a pointer to the records rather than the records themselves: CMPR is t...
void merge_sorted_runs(typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator start, typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, M *MergeHeap, TPIE_OS_OFFSET cutoff=-1, progress_indicator_base *indicator=NULL)
This is a common merge routine for all of the AMI_merge_sorted, AMI_ptr_merge_sorted and AMI_key_merg...
Merge heap templates.
A record pointer heap that uses a comparison object.
Definition: mergeheap.h:141
Progress indicator base.
void merge_sorted(typename tpie::array< std::auto_ptr< file_stream< T > > >::iterator start, typename tpie::array< std::auto_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, CMPR *cmp)
Merging with a heap that contains the records to be merged.
void allocate(size_t size)
Allocates space for the heap.
Definition: mergeheap.h:87
A Stream object stores an ordered collection of objects of type T on external memory.
Definition: stream.h:99
Simple class acting both as file and a file::stream.
Definition: file_stream.h:44
bool can_read() const
Check if we can read an item with read().
Definition: stream_crtp.h:108
void allocate(size_t size)
Allocates space for the heap.
Definition: mergeheap.h:194
like std::auto_ptr, but delete the object with tpie_delete.
Definition: memory.h:416