TPIE

v1.1rc1-6-g0c97303
buffer.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
23 
24 #ifndef __TPIE_PIPELINING_BUFFER_H__
25 #define __TPIE_PIPELINING_BUFFER_H__
26 
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/factory_helpers.h>
29 #include <tpie/file_stream.h>
30 
31 namespace tpie {
32 
33 namespace pipelining {
34 
35 namespace bits {
36 
40 template <typename T>
41 class buffer_input_t: public node {
42 public:
43  typedef T item_type;
45  : node(token)
46  , queue(queue)
47  {
48  set_name("Storing items", PRIORITY_SIGNIFICANT);
50  }
51 
52  virtual void begin() override {
53  node::begin();
54  queue.open();
55  }
56 
60  void push(const item_type & item) {
61  queue.write(item);
62  }
63 
64 private:
65  file_stream<T> & queue;
66 };
67 
68 template <typename T>
69 class buffer_pull_output_t: public node {
71 
72 public:
73  typedef T item_type;
74 
75  buffer_pull_output_t(file_stream<T> & queue, const node_token & input_token)
76  : queue(queue)
77  {
78  add_dependency(input_token);
79  set_name("Fetching items", PRIORITY_SIGNIFICANT);
81  }
82 
83  virtual void propagate() override {
84  queue.seek(0);
85  forward("items", queue.size());
86  }
87 
88  bool can_pull() const {
89  return queue.can_read();
90  }
91 
92  T pull() {
93  return queue.read();
94  }
95 
96  virtual void end() override {
97  queue.close();
98  }
99 };
100 
104 template <typename T>
106 public:
107  typedef T item_type;
108 
109  delayed_buffer_input_t(const node_token & token)
110  : node(token)
111  {
112  set_name("Storing items", PRIORITY_INSIGNIFICANT);
114  }
115 
116  virtual void propagate() override {
117  m_queue = tpie::tpie_new<tpie::file_stream<item_type> >();
118  m_queue->open();
119  forward("queue", m_queue);
120  }
121 
122  void push(const T & item) {
123  m_queue->write(item);
124  }
125 
126 private:
127  tpie::file_stream<T> * m_queue;
128 };
129 
133 template <typename dest_t>
135 public:
136  typedef typename dest_t::item_type item_type;
137 
138  delayed_buffer_output_t(const dest_t &dest, const node_token & input_token)
139  : dest(dest)
140  {
141  add_dependency(input_token);
142  add_push_destination(dest);
144  set_name("Fetching items", PRIORITY_INSIGNIFICANT);
145  }
146 
147  virtual void propagate() override {
148  m_queue = fetch<tpie::file_stream<item_type> *>("queue");
149  forward("items", m_queue->size());
150  set_steps(m_queue->size());
151  }
152 
153  virtual void go() override {
154  m_queue->seek(0);
155  while (m_queue->can_read()) {
156  dest.push(m_queue->read());
157  step();
158  }
159  }
160 
161  virtual void end() override {
162  tpie::tpie_delete(m_queue);
163  }
164 
165 private:
166  dest_t dest;
167  file_stream<item_type> * m_queue;
168 };
169 
170 
171 
172 } // namespace bits
173 
178 template <typename T>
180 public:
181  typedef T item_type;
184 private:
189 
190 public:
191  passive_buffer() {}
192 
193  inline input_t raw_input() {
194  return input_t(queue, input_token);
195  }
196 
197  inline output_t raw_output() {
198  return output_t(queue, input_token);
199  }
200 
201  inline inputpipe_t input() {
202  return inputfact_t(queue, input_token);
203  }
204 
205  inline outputpipe_t output() {
206  return outputfact_t(queue, input_token);
207  }
208 
209 private:
210  node_token input_token;
211  file_stream<T> queue;
212 
214  passive_buffer & operator=(const passive_buffer &);
215 };
216 
217 template <typename dest_t>
218 class delayed_buffer_t: public node {
219 public:
220  typedef typename dest_t::item_type item_type;
223 
224  delayed_buffer_t(const dest_t & dest)
225  : input_token()
226  , input(input_token)
227  , output(dest, input_token)
228  {
229  add_push_destination(input);
230  set_name("Delayed buffer", PRIORITY_INSIGNIFICANT);
231  }
232 
234  : node(o)
235  , input_token(o.input_token)
236  , input(o.input)
237  , output(o.output)
238  {
239  }
240 
241  virtual void push(item_type item) {
242  input.push(item);
243  }
244 
245  node_token input_token;
246 
247  input_t input;
248  output_t output;
249 };
250 
251 inline pipe_middle<factory_0<delayed_buffer_t> > delayed_buffer() {
253 }
254 
255 } // namespace pipelining
256 
257 } // namespace tpie
258 
259 #endif // __TPIE_PIPELINING_BUFFER_H__
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: buffer.h:153
virtual void begin()
Begin pipeline processing phase.
Definition: node.h:167
virtual void propagate() override
Propagate stream metadata.
Definition: buffer.h:116
void open(const std::string &path, access_type accessType=access_read_write, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential)
Open a file.
Output node for delayed buffer.
Definition: buffer.h:134
virtual void begin() override
Begin pipeline processing phase.
Definition: buffer.h:52
const item_type & read()
Read an item from the stream.
Definition: file_stream.h:91
void push(const item_type &item)
For push nodes, push the next item to this node.
Definition: buffer.h:60
void write(const item_type &item)
Write an item to the stream.
Definition: file_stream.h:64
virtual void end() override
End pipeline processing phase.
Definition: buffer.h:161
Base class of all nodes.
Definition: node.h:58
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Definition: node.h:352
virtual void propagate() override
Propagate stream metadata.
Definition: buffer.h:83
Node factory for 2-argument terminator.
void seek(stream_offset_type offset, offset_type whence=beginning)
Moves the logical offset in the stream.
Definition: stream_crtp.h:50
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Definition: node.h:241
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:421
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
Definition: node.h:404
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:586
static memory_size_type memory_usage(float blockFactor=1.0, bool includeDefaultFileAccessor=true)
Calculate the amount of memory used by a single file_stream.
Definition: file_stream.h:134
Basic Implementation of I/O Efficient FIFO queue.
Definition: queue.h:42
Simple class acting both as file and a file::stream.
Definition: file_stream.h:44
bool can_read() const
Check if we can read an item with read().
Definition: stream_crtp.h:108
A pipe_middle class pushes input down the pipeline.
Definition: pipe_base.h:119
void tpie_delete(T *p)
Delete an object allocated with tpie_new.
Definition: memory.h:380
Plain old file_stream buffer.
Definition: buffer.h:179
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
Definition: node.h:566
Input node for delayed buffer.
Definition: buffer.h:105
Input node for buffer.
Definition: buffer.h:41
node()
Default constructor, using a new node_token.
Definition: node.h:296
void forward(std::string key, T value, bool explicitForward=true)
Called by implementers to forward auxiliary data to successors.
Definition: node.h:461
stream_size_type size() const
Get the size of the file measured in items.
Definition: stream_crtp.h:132
virtual void propagate() override
Propagate stream metadata.
Definition: buffer.h:147
void close()
Close the file and release resources.
Simple class acting both as a tpie::file and a tpie::file::stream.
virtual void end() override
End pipeline processing phase.
Definition: buffer.h:96
Node factory for 0-argument generator.