TPIE

v1.1rc1-6-g0c97303
merger.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_MERGER_H__
21 #define __TPIE_PIPELINING_MERGER_H__
22 
24 #include <tpie/file_stream.h>
25 #include <tpie/tpie_assert.h>
26 
27 namespace tpie {
28 
29 template <typename T, typename pred_t>
30 class merger {
31 public:
32  inline merger(pred_t pred)
33  : pq(0, predwrap(pred))
34  {
35  }
36 
37  inline bool can_pull() {
38  return !pq.empty();
39  }
40 
41  inline T pull() {
42  tp_assert(can_pull(), "pull() while !can_pull()");
43  T el = pq.top().first;
44  size_t i = pq.top().second;
45  if (in[i].can_read() && itemsRead[i] < runLength) {
46  pq.pop_and_push(std::make_pair(in[i].read(), i));
47  ++itemsRead[i];
48  } else {
49  pq.pop();
50  }
51  if (!can_pull()) {
52  reset();
53  }
54  return el;
55  }
56 
57  inline void reset() {
58  in.resize(0);
59  pq.resize(0);
60  itemsRead.resize(0);
61  }
62 
63  // Initialize merger with given sorted input runs. Each file stream is
64  // assumed to have a stream offset pointing to the first item in the run,
65  // and runLength items are read from each stream (unless end of stream
66  // occurs earlier).
67  // Precondition: !can_pull()
68  inline void reset(array<file_stream<T> > & inputs, size_t runLength) {
69  this->runLength = runLength;
70  tp_assert(pq.empty(), "Reset before we are done");
71  n = inputs.size();
72  in.swap(inputs);
73  pq.resize(n);
74  for (size_t i = 0; i < n; ++i) {
75  pq.unsafe_push(std::make_pair(in[i].read(), i));
76  }
77  pq.make_safe();
78  itemsRead.resize(n, 1);
79  }
80 
81  inline static memory_size_type memory_usage(memory_size_type fanout) {
82  return sizeof(merger)
83  - sizeof(internal_priority_queue<std::pair<T, size_t>, predwrap>) // pq
84  + static_cast<memory_size_type>(internal_priority_queue<std::pair<T, size_t>, predwrap>::memory_usage(fanout)) // pq
85  - sizeof(array<file_stream<T> >) // in
86  + static_cast<memory_size_type>(array<file_stream<T> >::memory_usage(fanout)) // in
87  - fanout*sizeof(file_stream<T>) // in file_streams
88  + fanout*file_stream<T>::memory_usage() // in file_streams
89  - sizeof(array<size_t>) // itemsRead
90  + static_cast<memory_size_type>(array<size_t>::memory_usage(fanout)) // itemsRead
91  ;
92  }
93 
94  class predwrap {
95  public:
96  typedef std::pair<T, size_t> item_type;
97  typedef item_type first_argument_type;
98  typedef item_type second_argument_type;
99  typedef bool result_type;
100 
101  predwrap(pred_t pred)
102  : pred(pred)
103  {
104  }
105 
106  inline bool operator()(const item_type & lhs, const item_type & rhs) {
107  return pred(lhs.first, rhs.first);
108  }
109 
110  private:
111  pred_t pred;
112  };
113 
114 private:
117  array<size_t> itemsRead;
118  size_t runLength;
119  size_t n;
120 };
121 
122 } // namespace tpie
123 
124 #endif // __TPIE_PIPELINING_MERGER_H__
Defines the tp_assert macro.
A generic array with a fixed size.
Definition: array.h:143
Standard binary internal heap.
Simple heap based priority queue implementation.
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:431
static memory_size_type memory_usage(float blockFactor=1.0, bool includeDefaultFileAccessor=true)
Calculate the amount of memory used by a single file_stream.
Definition: file_stream.h:134
Simple class acting both as file and a file::stream.
Definition: file_stream.h:44
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
Simple class acting both as a tpie::file and a tpie::file::stream.