TPIE

v1.1rc1-6-g0c97303
sort.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2011, 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_SORT_H__
21 #define __TPIE_PIPELINING_SORT_H__
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/pipe_base.h>
25 #include <tpie/pipelining/factory_base.h>
26 #include <tpie/pipelining/merge_sorter.h>
27 #include <tpie/parallel_sort.h>
28 #include <tpie/file_stream.h>
29 #include <tpie/tempname.h>
30 #include <tpie/memory.h>
31 #include <queue>
32 #include <boost/shared_ptr.hpp>
33 
34 namespace tpie {
35 
36 namespace pipelining {
37 
38 namespace bits {
39 
40 template <typename T, typename pred_t>
42 
43 template <typename T, typename pred_t>
45 
46 template <typename T, typename pred_t>
47 class sort_output_base : public node {
48  // node has virtual dtor
49 public:
51  typedef T item_type;
55  typedef typename sorter_t::ptr sorterptr;
56 
57  sorterptr get_sorter() const {
58  return m_sorter;
59  }
60 
61  void set_calc_node(node & calc) {
62  add_dependency(calc);
63  }
64 
65 protected:
66  sort_output_base(pred_t pred)
67  : m_sorter(new sorter_t(pred))
68  {
69  }
70 
71  sorterptr m_sorter;
72 };
73 
79 template <typename T, typename pred_t>
80 class sort_pull_output_t : public sort_output_base<T, pred_t> {
81 public:
83  typedef T item_type;
87  typedef typename sorter_t::ptr sorterptr;
88 
89  inline sort_pull_output_t(pred_t pred)
90  : sort_output_base<T, pred_t>(pred)
91  {
92  this->set_minimum_memory(sorter_t::minimum_memory_phase_3());
93  this->set_maximum_memory(sorter_t::maximum_memory_phase_3());
94  this->set_name("Write sorted output", PRIORITY_INSIGNIFICANT);
95  this->set_memory_fraction(1.0);
96  }
97 
98  virtual void propagate() override {
99  this->set_steps(this->m_sorter->item_count());
100  this->forward("items", static_cast<stream_size_type>(this->m_sorter->item_count()));
101  }
102 
103  inline bool can_pull() const {
104  return this->m_sorter->can_pull();
105  }
106 
107  inline item_type pull() {
108  this->step();
109  return this->m_sorter->pull();
110  }
111 
112  // Despite this go() implementation, a sort_pull_output_t CANNOT be used as
113  // an initiator node. Normally, it is a type error to have a phase without
114  // an initiator, but with a passive_sorter you can circumvent this
115  // mechanism. Thus we customize the error message printed (but throw the
116  // same type of exception.)
117  virtual void go() override {
118  log_warning() << "Passive sorter used without an initiator in the final merge and output phase.\n"
119  << "Define an initiator and pair it up with the pipe from passive_sorter::output()." << std::endl;
120  throw not_initiator_node();
121  }
122 
123 protected:
124  virtual void set_available_memory(memory_size_type availableMemory) override {
125  node::set_available_memory(availableMemory);
126  this->m_sorter->set_phase_3_memory(availableMemory);
127  }
128 };
129 
135 template <typename pred_t, typename dest_t>
136 class sort_output_t : public sort_output_base<typename dest_t::item_type, pred_t> {
137 public:
139  typedef typename dest_t::item_type item_type;
145  typedef typename sorter_t::ptr sorterptr;
146 
147  inline sort_output_t(const dest_t & dest, pred_t pred)
148  : p_t(pred)
149  , dest(dest)
150  {
151  this->add_push_destination(dest);
152  this->set_minimum_memory(sorter_t::minimum_memory_phase_3());
153  this->set_maximum_memory(sorter_t::maximum_memory_phase_3());
154  this->set_name("Write sorted output", PRIORITY_INSIGNIFICANT);
155  this->set_memory_fraction(1.0);
156  }
157 
158  virtual void propagate() override {
159  this->set_steps(this->m_sorter->item_count());
160  this->forward("items", static_cast<stream_size_type>(this->m_sorter->item_count()));
161  }
162 
163  virtual void go() override {
164  while (this->m_sorter->can_pull()) {
165  dest.push(this->m_sorter->pull());
166  this->step();
167  }
168  }
169 
170 protected:
171  virtual void set_available_memory(memory_size_type availableMemory) override {
172  node::set_available_memory(availableMemory);
173  this->m_sorter->set_phase_3_memory(availableMemory);
174  }
175 
176 private:
177  dest_t dest;
178 };
179 
185 template <typename T, typename pred_t>
186 class sort_calc_t : public node {
187 public:
189  typedef T item_type;
193  typedef typename sorter_t::ptr sorterptr;
194 
196 
197  inline sort_calc_t(const sort_calc_t & other)
198  : node(other)
199  , m_sorter(other.m_sorter)
200  , dest(other.dest)
201  {
202  }
203 
204  template <typename dest_t>
205  inline sort_calc_t(dest_t dest)
206  : dest(new dest_t(dest))
207  {
208  m_sorter = this->dest->get_sorter();
209  this->dest->set_calc_node(*this);
210  init();
211  }
212 
213  inline sort_calc_t(sorterptr sorter)
214  : m_sorter(sorter)
215  {
216  init();
217  }
218 
219  void init() {
220  set_minimum_memory(sorter_t::minimum_memory_phase_2());
221  set_name("Perform merge heap", PRIORITY_SIGNIFICANT);
222  set_memory_fraction(1.0);
223  }
224 
225  virtual void propagate() override {
226  set_steps(1000);
227  }
228 
229  virtual void go() override {
231  m_sorter->calc(*pi);
232  }
233 
234  virtual bool can_evacuate() override {
235  return true;
236  }
237 
238  virtual void evacuate() override {
239  m_sorter->evacuate_before_reporting();
240  }
241 
242  sorterptr get_sorter() const {
243  return m_sorter;
244  }
245 
246  void set_input_node(node & input) {
247  add_dependency(input);
248  }
249 
250 protected:
251  virtual void set_available_memory(memory_size_type availableMemory) override {
252  node::set_available_memory(availableMemory);
253  m_sorter->set_phase_2_memory(availableMemory);
254  }
255 
256 private:
257  sorterptr m_sorter;
258  boost::shared_ptr<Output> dest;
259 };
260 
266 template <typename T, typename pred_t>
267 class sort_input_t : public node {
268 public:
270  typedef T item_type;
274  typedef typename sorter_t::ptr sorterptr;
275 
277  : m_sorter(dest.get_sorter())
278  , dest(dest)
279  {
280  this->dest.set_input_node(*this);
281  set_minimum_memory(sorter_t::minimum_memory_phase_1());
282  set_name("Form input runs", PRIORITY_SIGNIFICANT);
283  set_memory_fraction(1.0);
284  }
285 
286  virtual void propagate() override {
287  if (this->can_fetch("items"))
288  m_sorter->set_items(this->fetch<stream_size_type>("items"));
289  m_sorter->begin();
290  }
291 
292  inline void push(const item_type & item) {
293  m_sorter->push(item);
294  }
295 
296  virtual void end() override {
297  node::end();
298  m_sorter->end();
299  }
300 
301  virtual bool can_evacuate() override {
302  return true;
303  }
304 
305  virtual void evacuate() override {
306  m_sorter->evacuate_before_merging();
307  }
308 
309 protected:
310  virtual void set_available_memory(memory_size_type availableMemory) override {
311  node::set_available_memory(availableMemory);
312  m_sorter->set_phase_1_memory(availableMemory);
313  }
314 
315 private:
316  sorterptr m_sorter;
318 };
319 
320 template <typename child_t>
322  const child_t & self() const { return *static_cast<const child_t *>(this); }
323 public:
324  template <typename dest_t>
325  struct constructed {
326  private:
328  typedef typename dest_t::item_type item_type;
329  public:
330  typedef typename child_t::template predicate<item_type>::type pred_type;
332  };
333 
334  template <typename dest_t>
335  typename constructed<dest_t>::type construct(const dest_t & dest) const {
336  typedef typename dest_t::item_type item_type;
337  typedef typename constructed<dest_t>::pred_type pred_type;
338 
339  sort_output_t<pred_type, dest_t> output(dest, self().template get_pred<item_type>());
340  this->init_sub_node(output);
342  this->init_sub_node(calc);
344  this->init_sub_node(input);
345 
346  return input;
347  }
348 };
349 
353 class default_pred_sort_factory : public sort_factory_base<default_pred_sort_factory> {
354 public:
355  template <typename item_type>
356  class predicate {
357  public:
358  typedef std::less<item_type> type;
359  };
360 
361  template <typename T>
362  std::less<T> get_pred() const {
363  return std::less<T>();
364  }
365 };
366 
370 template <typename pred_t>
371 class sort_factory : public sort_factory_base<sort_factory<pred_t> > {
372 public:
373  template <typename Dummy>
374  class predicate {
375  public:
376  typedef pred_t type;
377  };
378 
379  sort_factory(const pred_t & p)
380  : pred(p)
381  {
382  }
383 
384  template <typename T>
385  pred_t get_pred() const {
386  return pred;
387  }
388 
389 private:
390  pred_t pred;
391 };
392 
393 } // namespace bits
394 
398 inline pipe_middle<bits::default_pred_sort_factory>
400  typedef bits::default_pred_sort_factory fact;
401  return pipe_middle<fact>(fact()).name("Sort");
402 }
403 
407 template <typename pred_t>
408 inline pipe_middle<bits::sort_factory<pred_t> >
409 pipesort(const pred_t & p) {
410  typedef bits::sort_factory<pred_t> fact;
411  return pipe_middle<fact>(fact(p)).name("Sort");
412 }
413 
414 template <typename T, typename pred_t=std::less<T> >
416 
417 namespace bits {
418 
422 template <typename T, typename pred_t>
424 public:
428  typedef input_t constructed_type;
430  typedef typename sorter_t::ptr sorterptr;
431 
433  : output(&output)
434  {
435  }
436 
437  constructed_type construct() const {
438  calc_t calc(output->get_sorter());
439  output->set_calc_node(calc);
440  this->init_node(calc);
441  input_t input(calc);
442  this->init_node(input);
443  return input;
444  }
445 
446 private:
447  output_t * output;
448 };
449 
453 template <typename T, typename pred_t>
455 public:
457  typedef output_t constructed_type;
458 
460  : m_sorter(sorter)
461  {
462  }
463 
464  constructed_type construct() const;
465 
466 private:
467  const passive_sorter<T, pred_t> & m_sorter;
468 };
469 
470 } // namespace bits
471 
480 template <typename T, typename pred_t>
481 class passive_sorter {
482 public:
484  typedef T item_type;
488  typedef typename sorter_t::ptr sorterptr;
491 
492  inline passive_sorter(pred_t pred = pred_t())
493  : m_sorter(new sorter_t())
494  , pred(pred)
495  , m_output(pred)
496  {
497  }
498 
504  }
505 
511  }
512 
513 private:
514  sorterptr m_sorter;
515  pred_t pred;
516  output_t m_output;
518  passive_sorter & operator=(const passive_sorter &);
519 
520  friend class bits::passive_sorter_factory_2<T, pred_t>;
521 };
522 
523 namespace bits {
524 
525 template <typename T, typename pred_t>
526 typename passive_sorter_factory_2<T, pred_t>::constructed_type
527 passive_sorter_factory_2<T, pred_t>::construct() const {
528  constructed_type res = m_sorter.m_output;
529  init_node(res);
530  return res;
531 }
532 
533 } // namespace bits
534 
535 } // namespace pipelining
536 
537 } // namespace tpie
538 
539 #endif
virtual void propagate() override
Propagate stream metadata.
Definition: sort.h:98
virtual void evacuate() override
Overridden by nodes that have data to evacuate.
Definition: sort.h:238
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
Definition: sort.h:171
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
Definition: sort.h:87
Memory management subsystem.
T item_type
Type of items sorted.
Definition: sort.h:51
T item_type
Type of items sorted.
Definition: sort.h:189
Sort factory using the given predicate as comparator.
Definition: sort.h:371
The base class for indicating the progress of some task.
virtual bool can_evacuate() override
Overridden by nodes that have data to evacuate.
Definition: sort.h:234
pullpipe_begin< bits::passive_sorter_factory_2< item_type, pred_t > > output()
Get the output pull node.
Definition: sort.h:509
T item_type
Type of items sorted.
Definition: sort.h:83
merge_sorter< item_type, true, pred_t > sorter_t
Type of the merge sort implementation used.
Definition: sort.h:486
Factory for the passive sorter output node.
Definition: sort.h:454
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
Definition: sort.h:251
Merge sorting consists of three phases.
Definition: merge_sorter.h:44
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
Definition: sort.h:55
pipe_middle< bits::default_pred_sort_factory > pipesort()
Pipelining sorter using std::less.
Definition: sort.h:399
T item_type
Type of items sorted.
Definition: sort.h:484
Pipelined sorter with push input and pull output.
Definition: sort.h:415
virtual void propagate() override
Propagate stream metadata.
Definition: sort.h:158
Pipe sorter middle node.
Definition: sort.h:41
virtual void set_available_memory(memory_size_type availableMemory)
Called by the memory manager to set the amount of memory assigned to this node.
Definition: node.h:445
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: sort.h:163
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
Definition: sort.h:488
Base class of all nodes.
Definition: node.h:58
merge_sorter< item_type, true, pred_t > sorter_t
Type of the merge sort implementation used.
Definition: sort.h:191
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
Definition: sort.h:310
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Definition: node.h:352
merge_sorter< item_type, true, pred_t > sorter_t
Type of the merge sort implementation used.
Definition: sort.h:143
progress_indicator_base * proxy_progress_indicator()
Get a non-initialized progress indicator for use with external implementations.
Definition: node.h:604
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
Definition: sort.h:145
Simple parallel quick sort implementation with progress tracking.
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
Definition: node.h:434
Sort factory using std::less as comparator.
Definition: sort.h:353
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Definition: node.h:241
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:421
virtual void evacuate() override
Overridden by nodes that have data to evacuate.
Definition: sort.h:305
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: sort.h:117
Pipe sorter pull output node.
Definition: sort.h:80
Pipe sorter push output node.
Definition: sort.h:136
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
Definition: node.h:404
virtual void end()
End pipeline processing phase.
Definition: node.h:205
pipe_end< bits::passive_sorter_factory< item_type, pred_t > > input()
Get the input push node.
Definition: sort.h:502
dest_t::item_type item_type
Type of items sorted.
Definition: sort.h:139
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: sort.h:229
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:586
void set_memory_fraction(double f)
Set the memory priority of this node.
Definition: node.h:104
merge_sorter< item_type, true, pred_t > sorter_t
Type of the merge sort implementation used.
Definition: sort.h:85
A pipe_middle class pushes input down the pipeline.
Definition: pipe_base.h:119
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
Definition: sort.h:124
bits::sort_pull_output_t< item_type, pred_t > output_t
Type of pipe sorter output.
Definition: sort.h:490
sort_output_base< item_type, pred_t > p_t
Base class.
Definition: sort.h:141
Temporary file names.
virtual void propagate() override
Propagate stream metadata.
Definition: sort.h:225
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
Definition: node.h:566
merge_sorter< item_type, true, pred_t > sorter_t
Type of the merge sort implementation used.
Definition: sort.h:272
merge_sorter< item_type, true, pred_t > sorter_t
Type of the merge sort implementation used.
Definition: sort.h:53
virtual void propagate() override
Propagate stream metadata.
Definition: sort.h:286
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
Definition: sort.h:193
void forward(std::string key, T value, bool explicitForward=true)
Called by implementers to forward auxiliary data to successors.
Definition: node.h:461
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
Definition: sort.h:274
bool can_fetch(std::string key)
Find out if there is a piece of auxiliary data forwarded with a given name.
Definition: node.h:513
Factory for the passive sorter input node.
Definition: sort.h:423
virtual void end() override
End pipeline processing phase.
Definition: sort.h:296
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:114
Simple class acting both as a tpie::file and a tpie::file::stream.
Pipe sorter input node.
Definition: sort.h:44
virtual bool can_evacuate() override
Overridden by nodes that have data to evacuate.
Definition: sort.h:301
T item_type
Type of items sorted.
Definition: sort.h:270