TPIE

v1.1rc1-6-g0c97303
file_stream.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2011, 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_FILE_STREAM_H__
21 #define __TPIE_PIPELINING_FILE_STREAM_H__
22 
23 #include <tpie/file_stream.h>
24 
25 #include <tpie/pipelining/node.h>
26 #include <tpie/pipelining/factory_helpers.h>
27 
28 namespace tpie {
29 
30 namespace pipelining {
31 
32 namespace bits {
33 
39 template <typename dest_t>
40 class input_t : public node {
41 public:
42  typedef typename dest_t::item_type item_type;
43 
44  inline input_t(const dest_t & dest, file_stream<item_type> & fs) : dest(dest), fs(fs) {
46  set_name("Read", PRIORITY_INSIGNIFICANT);
48  }
49 
50  virtual void propagate() override {
51  if (fs.is_open()) {
52  forward("items", fs.size());
53  } else {
54  forward("items", 0);
55  }
56  set_steps(fs.size());
57  }
58 
59  virtual void go() override {
60  if (fs.is_open()) {
61  while (fs.can_read()) {
62  dest.push(fs.read());
63  step();
64  }
65  }
66  }
67 
68 private:
69  dest_t dest;
71 };
72 
78 template <typename T>
79 class pull_input_t : public node {
80 public:
81  typedef T item_type;
82 
83  inline pull_input_t(file_stream<T> & fs) : fs(fs) {
84  set_name("Read", PRIORITY_INSIGNIFICANT);
86  }
87 
88  virtual void propagate() override {
89  forward("items", fs.size());
90  set_steps(fs.size());
91  }
92 
93  inline T pull() {
94  step();
95  return fs.read();
96  }
97 
98  inline bool can_pull() {
99  return fs.can_read();
100  }
101 
102  file_stream<T> & fs;
103 };
104 
110 template <typename T>
111 class output_t : public node {
112 public:
113  typedef T item_type;
114 
115  inline output_t(file_stream<T> & fs) : fs(fs) {
116  set_name("Write", PRIORITY_INSIGNIFICANT);
118  }
119 
120  inline void push(const T & item) {
121  fs.write(item);
122  }
123 private:
124  file_stream<T> & fs;
125 };
126 
132 template <typename source_t>
133 class pull_output_t : public node {
134 public:
135  typedef typename source_t::item_type item_type;
136 
137  inline pull_output_t(const source_t & source, file_stream<item_type> & fs) : source(source), fs(fs) {
138  add_pull_source(source);
139  set_name("Write", PRIORITY_INSIGNIFICANT);
141  }
142 
143  virtual void go() override {
144  source.begin();
145  while (source.can_pull()) {
146  fs.write(source.pull());
147  }
148  source.end();
149  }
150 
151  source_t source;
153 };
154 
155 template <typename T>
156 class tee_t {
157 public:
158  template <typename dest_t>
159  class type: public node {
160  public:
161  typedef T item_type;
162  type(const dest_t & dest, file_stream<item_type> & fs): fs(fs), dest(dest) {
163  add_push_destination(dest);
165  }
166 
167  void push(const item_type & i) {
168  fs.write(i);
169  dest.push(i);
170  }
171  private:
173  dest_t dest;
174  };
175 };
176 
177 } // namespace bits
178 
179 template<typename T>
182 }
183 
184 template<typename T>
185 inline pullpipe_begin<termfactory_1<bits::pull_input_t<T>, file_stream<T> &> > pull_input(file_stream<T> & fs) {
186  return termfactory_1<bits::pull_input_t<T>, file_stream<T> &>(fs);
187 }
188 
189 template <typename T>
190 inline pipe_end<termfactory_1<bits::output_t<T>, file_stream<T> &> > output(file_stream<T> & fs) {
191  return termfactory_1<bits::output_t<T>, file_stream<T> &>(fs);
192 }
193 
194 template<typename T>
195 inline pullpipe_end<factory_1<bits::pull_output_t, file_stream<T> &> > pull_output(file_stream<T> & fs) {
196  return factory_1<bits::pull_output_t, file_stream<T> &>(fs);
197 }
198 
199 template <typename T>
200 inline pipe_middle<factory_1<bits::tee_t<typename T::item_type>::template type, T &> >
201 tee(T & fs) {return factory_1<bits::tee_t<typename T::item_type>::template type, T &>(fs);}
202 
203 } // namespace pipelining
204 
205 } // namespace tpie
206 #endif
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
Definition: node.h:370
virtual void propagate() override
Propagate stream metadata.
Definition: file_stream.h:88
file_stream pull input generator.
Definition: file_stream.h:79
const item_type & read()
Read an item from the stream.
Definition: file_stream.h:91
void write(const item_type &item)
Write an item to the stream.
Definition: file_stream.h:64
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: file_stream.h:59
Base class of all nodes.
Definition: node.h:58
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Definition: node.h:352
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Definition: node.h:241
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:421
Node factory for 1-argument generator.
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:586
static memory_size_type memory_usage(float blockFactor=1.0, bool includeDefaultFileAccessor=true)
Calculate the amount of memory used by a single file_stream.
Definition: file_stream.h:134
file_stream output terminator.
Definition: file_stream.h:111
bool is_open() const
Check if file is open.
bool can_read() const
Check if we can read an item with read().
Definition: stream_crtp.h:108
file_stream input generator.
Definition: file_stream.h:40
file_stream output pull data source.
Definition: file_stream.h:133
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
Definition: node.h:566
void forward(std::string key, T value, bool explicitForward=true)
Called by implementers to forward auxiliary data to successors.
Definition: node.h:461
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: file_stream.h:143
stream_size_type size() const
Get the size of the file measured in items.
Definition: stream_crtp.h:132
Simple class acting both as a tpie::file and a tpie::file::stream.
virtual void propagate() override
Propagate stream metadata.
Definition: file_stream.h:50