TPIE

v1.1rc1-6-g0c97303
join.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_PIPELINING_JOIN_H
21 #define TPIE_PIPELINING_JOIN_H
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_helpers.h>
25 
26 namespace tpie {
27 namespace pipelining {
28 
37 template <typename T>
38 class join {
39 public:
40  class source_base : public node {
41  public:
42  source_base(node_token token)
43  : node(token)
44  {
45  }
46 
47  virtual void push(const T & v) = 0;
48 
49  protected:
50  ~source_base() {}
51  };
52 
53  template <typename dest_t>
54  class source_impl : public source_base {
55  public:
56  source_impl(const dest_t & dest, node_token token, source_base * & the_source)
57  : source_base(token)
58  , the_source(the_source)
59  , dest(dest)
60  {
61  this->set_name("Join source", PRIORITY_INSIGNIFICANT);
62  this->add_push_destination(dest);
63  }
64 
65  virtual void prepare() override {
66  if (the_source != NULL && the_source != this) {
67  // If join.source() is used twice, the second construction of node()
68  // should fail since the node_token is already used.
69  // Thus, this exception should never be thrown.
70  throw exception("Attempted to set join source a second time");
71  }
72  the_source = this;
73  };
74 
75  virtual void push(const T & v) override {
76  dest.push(v);
77  }
78 
79  private:
80  source_base * & the_source;
81  dest_t dest;
82  };
83 
84  pipe_begin<factory_2<source_impl, node_token, source_base * &> > source() {
85  return factory_2<source_impl, node_token, source_base * &>(source_token, the_source);
86  }
87 
88  class sink_impl : public node {
89  public:
90  typedef T item_type;
91 
92  sink_impl(node_token source_token, source_base * & the_source)
93  : the_source(the_source)
94  {
95  set_name("Join sink", PRIORITY_INSIGNIFICANT);
96  add_push_destination(source_token);
97  }
98 
99  virtual void begin() override {
100  the_source_cache = the_source;
101  }
102 
103  void push(const T & v) {
104  the_source_cache->push(v);
105  }
106 
107  private:
108  source_base * the_source_cache;
109  source_base * & the_source;
110  };
111 
112  pipe_end<termfactory_2<sink_impl, node_token, source_base * &> > sink() {
113  return termfactory_2<sink_impl, node_token, source_base * &>(source_token, the_source);
114  }
115 
116  join() : the_source(NULL) {}
117 private:
118  source_base * the_source;
119  node_token source_token;
120 };
121 
122 } // namespace pipelining
123 } // namespace tpie
124 
125 #endif // TPIE_PIPELINING_JOIN_H
Base class of all nodes.
Definition: node.h:58
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Definition: node.h:352
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Definition: node.h:241
virtual void begin() override
Begin pipeline processing phase.
Definition: join.h:99
Joins multiple push streams into one.
Definition: join.h:38
virtual void prepare() override
Called before memory assignment but after depending phases have executed and ended.
Definition: join.h:65
node()
Default constructor, using a new node_token.
Definition: node.h:296