24 #ifndef __TPIE_PIPELINING_BUFFER_H__
25 #define __TPIE_PIPELINING_BUFFER_H__
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/factory_helpers.h>
33 namespace pipelining {
48 set_name(
"Storing items", PRIORITY_SIGNIFICANT);
60 void push(
const item_type & item) {
79 set_name(
"Fetching items", PRIORITY_SIGNIFICANT);
88 bool can_pull()
const {
96 virtual void end()
override {
104 template <
typename T>
112 set_name(
"Storing items", PRIORITY_INSIGNIFICANT);
117 m_queue = tpie::tpie_new<tpie::file_stream<item_type> >();
122 void push(
const T & item) {
123 m_queue->write(item);
133 template <
typename dest_t>
136 typedef typename dest_t::item_type item_type;
144 set_name(
"Fetching items", PRIORITY_INSIGNIFICANT);
148 m_queue = fetch<tpie::file_stream<item_type> *>(
"queue");
153 virtual void go()
override {
156 dest.push(m_queue->
read());
161 virtual void end()
override {
178 template <
typename T>
194 return input_t(queue, input_token);
198 return output_t(queue, input_token);
217 template <
typename dest_t>
220 typedef typename dest_t::item_type item_type;
227 , output(dest, input_token)
230 set_name(
"Delayed buffer", PRIORITY_INSIGNIFICANT);
235 , input_token(o.input_token)
241 virtual void push(item_type item) {
259 #endif // __TPIE_PIPELINING_BUFFER_H__
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
virtual void begin()
Begin pipeline processing phase.
void open(const std::string &path, access_type accessType=access_read_write, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential)
Open a file.
Output node for delayed buffer.
const item_type & read()
Read an item from the stream.
void write(const item_type &item)
Write an item to the stream.
virtual void end() override
End pipeline processing phase.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
virtual void propagate() override
Propagate stream metadata.
Node factory for 2-argument terminator.
void seek(stream_offset_type offset, offset_type whence=beginning)
Moves the logical offset in the stream.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
void step(stream_size_type steps=1)
Step the progress indicator.
static memory_size_type memory_usage(float blockFactor=1.0, bool includeDefaultFileAccessor=true)
Calculate the amount of memory used by a single file_stream.
Basic Implementation of I/O Efficient FIFO queue.
Simple class acting both as file and a file::stream.
bool can_read() const
Check if we can read an item with read().
A pipe_middle class pushes input down the pipeline.
void tpie_delete(T *p)
Delete an object allocated with tpie_new.
Plain old file_stream buffer.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
node()
Default constructor, using a new node_token.
void forward(std::string key, T value, bool explicitForward=true)
Called by implementers to forward auxiliary data to successors.
stream_size_type size() const
Get the size of the file measured in items.
virtual void propagate() override
Propagate stream metadata.
void close()
Close the file and release resources.
Simple class acting both as a tpie::file and a tpie::file::stream.
virtual void end() override
End pipeline processing phase.
Node factory for 0-argument generator.