TPIE

v1.1rc1-6-g0c97303
serialization_sort.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_PIPELINING_SERIALIZATION_SORT_H
21 #define TPIE_PIPELINING_SERIALIZATION_SORT_H
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/pipe_base.h>
25 #include <tpie/pipelining/factory_base.h>
26 #include <tpie/serialization_sort.h>
27 #include <tpie/serialization.h>
28 
29 namespace tpie {
30 
31 namespace pipelining {
32 
33 namespace serialization_bits {
34 
35 template <typename T, typename pred_t>
37 public:
38  typedef T item_type;
39  typedef pred_t pred_type;
41  typedef boost::shared_ptr<sorter_t> sorterptr;
42 };
43 
44 template <typename Traits>
46 
47 template <typename Traits>
49 
50 template <typename Traits>
51 class sort_output_base : public node {
52  typedef typename Traits::pred_type pred_type;
53 public:
55  typedef typename Traits::item_type item_type;
57  typedef typename Traits::sorter_t sorter_t;
59  typedef typename Traits::sorterptr sorterptr;
60 
61  sorterptr get_sorter() const {
62  return m_sorter;
63  }
64 
65  void set_calc_node(node & calc) {
66  add_dependency(calc);
67  }
68 
69 protected:
70  sort_output_base(pred_type pred)
71  : m_sorter(new sorter_t(sizeof(item_type), pred))
72  {
73  }
74 
75  sorterptr m_sorter;
76 };
77 
81 template <typename Traits>
82 class sort_pull_output_t : public sort_output_base<Traits> {
83 public:
84  typedef typename Traits::item_type item_type;
85  typedef typename Traits::pred_type pred_type;
86  typedef typename Traits::sorter_t sorter_t;
87  typedef typename Traits::sorterptr sorterptr;
88 
89  sort_pull_output_t(pred_type pred)
91  {
92  this->set_minimum_memory(sorter_t::minimum_memory_phase_3());
93  this->set_name("Write sorted output", PRIORITY_INSIGNIFICANT);
94  this->set_memory_fraction(1.0);
95  }
96 
97  virtual void propagate() override {
98  this->set_steps(this->m_sorter->item_count());
99  this->forward("items", static_cast<stream_size_type>(this->m_sorter->item_count()));
100  }
101 
102  inline bool can_pull() const {
103  return this->m_sorter->can_pull();
104  }
105 
106  inline item_type pull() {
107  this->step();
108  return this->m_sorter->pull();
109  }
110 
111  // Despite this go() implementation, a sort_pull_output_t CANNOT be used as
112  // an initiator node. Normally, it is a type error to have a phase without
113  // an initiator, but with a passive_sorter you can circumvent this
114  // mechanism. Thus we customize the error message printed (but throw the
115  // same type of exception.)
116  virtual void go() override {
117  log_warning() << "Passive sorter used without an initiator in the final merge and output phase.\n"
118  << "Define an initiator and pair it up with the pipe from passive_sorter::output()." << std::endl;
119  throw not_initiator_node();
120  }
121 
122 protected:
123  virtual void set_available_memory(memory_size_type availableMemory) override {
124  node::set_available_memory(availableMemory);
125  this->m_sorter->set_phase_3_memory(availableMemory);
126  }
127 };
128 
132 template <typename Traits, typename dest_t>
133 class sort_output_t : public sort_output_base<Traits> {
134  typedef typename Traits::pred_type pred_type;
135 public:
136  typedef typename Traits::item_type item_type;
138  typedef typename Traits::sorter_t sorter_t;
139  typedef typename Traits::sorterptr sorterptr;
140 
141  sort_output_t(const dest_t & dest, pred_type pred)
142  : p_t(pred)
143  , dest(dest)
144  {
145  this->add_push_destination(dest);
146  this->set_minimum_memory(sorter_t::minimum_memory_phase_3());
147  this->set_name("Write sorted output", PRIORITY_INSIGNIFICANT);
148  this->set_memory_fraction(1.0);
149  }
150 
151  virtual void propagate() override {
152  this->set_steps(this->m_sorter->item_count());
153  this->forward("items", static_cast<stream_size_type>(this->m_sorter->item_count()));
154  }
155 
156  virtual void go() override {
157  while (this->m_sorter->can_pull()) {
158  dest.push(this->m_sorter->pull());
159  this->step();
160  }
161  }
162 
163 protected:
164  virtual void set_available_memory(memory_size_type availableMemory) override {
165  node::set_available_memory(availableMemory);
166  this->m_sorter->set_phase_3_memory(availableMemory);
167  }
168 
169 private:
170  dest_t dest;
171 };
172 
178 template <typename Traits>
179 class sort_calc_t : public node {
180 public:
181  typedef typename Traits::item_type item_type;
182  typedef typename Traits::sorter_t sorter_t;
183  typedef typename Traits::sorterptr sorterptr;
184 
185  typedef sort_output_base<Traits> Output;
186 
187  sort_calc_t(const sort_calc_t & other)
188  : node(other)
189  , m_sorter(other.m_sorter)
190  , dest(other.dest)
191  {
192  }
193 
194  template <typename dest_t>
195  sort_calc_t(dest_t dest)
196  : dest(new dest_t(dest))
197  {
198  m_sorter = this->dest->get_sorter();
199  this->dest->set_calc_node(*this);
200  init();
201  }
202 
203  sort_calc_t(sorterptr sorter)
204  : m_sorter(sorter)
205  {
206  init();
207  }
208 
209  void init() {
210  set_minimum_memory(sorter_t::minimum_memory_phase_2());
211  set_name("Perform merge heap", PRIORITY_SIGNIFICANT);
212  set_memory_fraction(1.0);
213  }
214 
215  virtual void propagate() override {
216  set_steps(1000);
217  }
218 
219  virtual void go() override {
221  log_debug() << "TODO: Progress information during merging." << std::endl;
222  m_sorter->merge_runs();
223  pi->init(1);
224  pi->step();
225  pi->done();
226  }
227 
228  virtual bool can_evacuate() override {
229  return true;
230  }
231 
232  virtual void evacuate() override {
233  m_sorter->evacuate();
234  }
235 
236  sorterptr get_sorter() const {
237  return m_sorter;
238  }
239 
240  void set_input_node(node & input) {
241  add_dependency(input);
242  }
243 
244 protected:
245  virtual void set_available_memory(memory_size_type availableMemory) override {
246  node::set_available_memory(availableMemory);
247  m_sorter->set_phase_2_memory(availableMemory);
248  }
249 
250 private:
251  sorterptr m_sorter;
252  boost::shared_ptr<Output> dest;
253 };
254 
260 template <typename Traits>
261 class sort_input_t : public node {
262  typedef typename Traits::pred_type pred_type;
263 public:
264  typedef typename Traits::item_type item_type;
265  typedef typename Traits::sorter_t sorter_t;
266  typedef typename Traits::sorterptr sorterptr;
267 
268  sort_input_t(sort_calc_t<Traits> dest)
269  : m_sorter(dest.get_sorter())
270  , dest(dest)
271  {
272  this->dest.set_input_node(*this);
273  set_minimum_memory(sorter_t::minimum_memory_phase_1());
274  set_name("Form input runs", PRIORITY_SIGNIFICANT);
275  set_memory_fraction(1.0);
276  }
277 
278  virtual void begin() override {
279  node::begin();
280  m_sorter->begin();
281  }
282 
283  void push(const item_type & item) {
284  m_sorter->push(item);
285  }
286 
287  virtual void end() override {
288  node::end();
289  m_sorter->end();
290  }
291 
292  virtual bool can_evacuate() override {
293  return true;
294  }
295 
296  virtual void evacuate() override {
297  m_sorter->evacuate();
298  }
299 
300 protected:
301  virtual void set_available_memory(memory_size_type availableMemory) override {
302  node::set_available_memory(availableMemory);
303  m_sorter->set_phase_1_memory(availableMemory);
304  }
305 
306 private:
307  sorterptr m_sorter;
308  sort_calc_t<Traits> dest;
309 };
310 
311 template <typename child_t>
313  const child_t & self() const { return *static_cast<const child_t *>(this); }
314 public:
315  template <typename dest_t>
316  struct constructed {
317  private:
319  typedef typename dest_t::item_type item_type;
320  public:
321  typedef typename child_t::template predicate<item_type>::type pred_type;
323  typedef sort_input_t<Traits> type;
324  };
325 
326  template <typename dest_t>
327  typename constructed<dest_t>::type construct(const dest_t & dest) const {
328  typedef typename dest_t::item_type item_type;
329  typedef typename constructed<dest_t>::pred_type pred_type;
330  typedef typename constructed<dest_t>::Traits Traits;
331 
332  sort_output_t<Traits, dest_t> output(dest, self().template get_pred<item_type>());
333  this->init_sub_node(output);
334  sort_calc_t<Traits> calc(output);
335  this->init_sub_node(calc);
336  sort_input_t<Traits> input(calc);
337  this->init_sub_node(input);
338 
339  return input;
340  }
341 };
342 
346 class default_pred_sort_factory : public sort_factory_base<default_pred_sort_factory> {
347 public:
348  template <typename item_type>
349  class predicate {
350  public:
351  typedef std::less<item_type> type;
352  };
353 
354  template <typename T>
355  std::less<T> get_pred() const {
356  return std::less<T>();
357  }
358 };
359 
363 template <typename pred_t>
364 class sort_factory : public sort_factory_base<sort_factory<pred_t> > {
365 public:
366  template <typename Dummy>
367  class predicate {
368  public:
369  typedef pred_t type;
370  };
371 
372  sort_factory(const pred_t & p)
373  : pred(p)
374  {
375  }
376 
377  template <typename T>
378  pred_t get_pred() const {
379  return pred;
380  }
381 
382 private:
383  pred_t pred;
384 };
385 
386 } // namespace serialization_bits
387 
391 inline pipe_middle<serialization_bits::default_pred_sort_factory>
394  return pipe_middle<fact>(fact()).name("Sort");
395 }
396 
400 template <typename pred_t>
401 pipe_middle<serialization_bits::sort_factory<pred_t> >
402 serialization_pipesort(const pred_t & p) {
404  return pipe_middle<fact>(fact(p)).name("Sort");
405 }
406 
407 template <typename T, typename pred_t=std::less<T> >
409 
410 namespace serialization_bits {
411 
415 template <typename Traits>
417 public:
419  typedef sort_calc_t<Traits> calc_t;
421  typedef input_t constructed_type;
422  typedef typename Traits::sorter_t sorter_t;
423  typedef typename Traits::sorterptr sorterptr;
424 
426  : output(&output)
427  {
428  }
429 
430  constructed_type construct() const {
431  calc_t calc(output->get_sorter());
432  output->set_calc_node(calc);
433  this->init_node(calc);
434  input_t input(calc);
435  this->init_node(input);
436  return input;
437  }
438 
439 private:
440  output_t * output;
441 };
442 
446 template <typename Traits>
448 public:
450  typedef output_t constructed_type;
451 
453  : m_sorter(sorter)
454  {
455  }
456 
457  constructed_type construct() const;
458 
459 private:
460  const passive_sorter<Traits> & m_sorter;
461 };
462 
463 } // namespace serialization_bits
464 
473 template <typename T, typename pred_t>
476 public:
478  typedef T item_type;
480  typedef typename Traits::sorter_t sorter_t;
482  typedef typename Traits::sorterptr sorterptr;
485 
486  serialization_passive_sorter(pred_t pred = pred_t())
487  : m_sorter(new sorter_t())
488  , pred(pred)
489  , m_output(pred)
490  {
491  }
492 
498  }
499 
505  }
506 
507 private:
508  sorterptr m_sorter;
509  pred_t pred;
510  output_t m_output;
513 
515 };
516 
517 namespace serialization_bits {
518 
519 template <typename Traits>
520 typename passive_sorter_factory_2<Traits>::constructed_type
521 passive_sorter_factory_2<Traits>::construct() const {
522  constructed_type res = m_sorter.m_output;
523  init_node(res);
524  return res;
525 }
526 
527 } // namespace serialization_bits
528 
529 } // namespace pipelining
530 
531 } // namespace tpie
532 
533 #endif // TPIE_PIPELINING_SERIALIZATION_SORT_H
virtual void begin()
Begin pipeline processing phase.
Definition: node.h:167
The base class for indicating the progress of some task.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
Traits::sorterptr sorterptr
Smart pointer to sorter_t.
virtual bool can_evacuate() override
Overridden by nodes that have data to evacuate.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
virtual void done()
Advance the indicator to the end.
virtual void set_available_memory(memory_size_type availableMemory)
Called by the memory manager to set the amount of memory assigned to this node.
Definition: node.h:445
serialization_bits::sort_pull_output_t< Traits > output_t
Type of pipe sorter output.
virtual void propagate() override
Propagate stream metadata.
Base class of all nodes.
Definition: node.h:58
Traits::sorter_t sorter_t
Type of the merge sort implementation used.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Definition: node.h:352
progress_indicator_base * proxy_progress_indicator()
Get a non-initialized progress indicator for use with external implementations.
Definition: node.h:604
Pipelined sorter with push input and pull output.
virtual void evacuate() override
Overridden by nodes that have data to evacuate.
Binary serialization and unserialization.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
virtual void propagate() override
Propagate stream metadata.
virtual void evacuate() override
Overridden by nodes that have data to evacuate.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Definition: node.h:241
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:421
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
virtual bool can_evacuate() override
Overridden by nodes that have data to evacuate.
Traits::sorterptr sorterptr
Smart pointer to sorter_t.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:124
Traits::sorter_t sorter_t
Type of the merge sort implementation used.
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
Definition: node.h:404
virtual void end()
End pipeline processing phase.
Definition: node.h:205
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:586
pipe_middle< serialization_bits::default_pred_sort_factory > serialization_pipesort()
Pipelining sorter using std::less.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
void set_memory_fraction(double f)
Set the memory priority of this node.
Definition: node.h:104
virtual void begin() override
Begin pipeline processing phase.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
A pipe_middle class pushes input down the pipeline.
Definition: pipe_base.h:119
virtual void propagate() override
Propagate stream metadata.
pipe_end< serialization_bits::passive_sorter_factory< Traits > > input()
Get the input push node.
pullpipe_begin< serialization_bits::passive_sorter_factory_2< Traits > > output()
Get the output pull node.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
Definition: node.h:566
virtual void end() override
End pipeline processing phase.
void forward(std::string key, T value, bool explicitForward=true)
Called by implementers to forward auxiliary data to successors.
Definition: node.h:461
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:114
Sort factory using the given predicate as comparator.
Traits::item_type item_type
Type of items sorted.
virtual void init(stream_size_type range=0)
Initialize progress indicator.