Some algorithms can be expressed in terms of stream sweeps. For instance, the Graham sweep algorithm for computing the convex hull of a set of points will sweep through the input points from left to right, maintaining the upper convex hull and the lower convex hull in a stack of points. Thus, the algorithm consists of four components: reading the input, sorting by x-coordinate, computing the upper and lower hull, and reporting the convex polygon.
The pipelining framework is used to implement such so-called streaming algorithms that process streams of items in this manner. The programmer implements the specialized components needed for the algorithm (computing the upper and lower hull, for instance), and stitches them together with built-ins such as reading, sorting and writing.
In this way, we may test each component individually and reuse them in multiple contexts. Without the TPIE pipelining framework, streaming algorithms are often implemented as monolithic classes with multiple interdependent methods that do not facilitate individual development and testing. Using virtual polymorphism may enable unit testing of components, but the virtual method speed penalty paid per item per operation is too high a price to pay in our case.
What we want instead is a kind of compile-time polymorphism: Implementations of operations that use C++ generic programming to let the programmer mix and match at code time, but have the methods inlined and fused together at compile time. Without an underlying framework, this kind of architecture will lead to a lot of typedefs that are overly verbose and somewhat unmaintainable.
The pipelining framework provides compile-time polymorphism along with high maintainability, high testability and low verbosity.
The unit of currency in pipelining is the node implementations. Each pipelining node performs either a general-purpose or specialized operation that is used in streaming algorithms. An implementation must derive from tpie::pipelining::node and implement either push()
or the pull()
/ can_pull()
pair. In the first case, the node is called a push node, and in the second case, it is called a pull node. The exception to this rule is the initiator node which instead overrides the virtual go()
method. Furthermore, any implementation may request memory using the set_minimum_memory()
and set_memory_fraction()
methods in either its constructor or in an overridden virtual prepare()
method; it may override begin()
and end()
to do initialization and finalization related to its computation.
item_type
, the type of item accepted by push
or returned by pull
. This is used by built-in type generic nodes such as the sorter or the reverser.node::add_push_destination
and node::add_pull_source
.node::set_name
.Thus, a node
implementation of a push node may look as follows:
Since the C++ language does not infer template arguments to constructor calls, but does infer template arguments to functions and methods, we use factories to instantiate the node implementations. Usually, the built-in factories contained in factory_helpers.h
will suffice:
but in some cases it is helpful to implement one's own factory.
We could implement a hello_world_factory
as follows:
For a terminating node, which doesn't have a destination, we would implement a so called termfactory as follows:
The main differences between an ordinary factory and a termfactory:
construct()
accepting the destination as its first parameter, the construct()
method takes no parameters, andconstructed<dest_t>::type
is replaced by the simpler constructed_type
typedef.To use the above defined factories, we might write the following:
However, this is tedious, and so the pipelining framework provides several helper classes to ease the construction of pipelines, namely the descendants of pipe_base
which are called pipe_begin
, pipe_middle
and pipe_end
.
which we would use as follows:
The three terms that are piped together have types pipe_begin
, pipe_middle
and pipe_end
respectively. As one might expect, piping together a pipe_begin
and a pipe_middle
yields a new pipe_begin
, and piping together a pipe_begin
and a pipe_end
yields a pipeline object (actually a pipeline_impl
object).
Consider the following implementation of a reverser:
This implementation seems innocuous at first, but it is in fact very wasteful. Note that the reverser needs to know the entire stream before it can push anything to its destination. This means that when all items have been pushed to it, we could as well deallocate all the item buffers that earlier nodes may have used while processing. As well, we could have waited until the stack was ready to push before initializing later nodes in the pipeline.
This is what pipelining phases are for. Phases are collections of nodes that do not have to operate synchronously. Nodes may establish an ordering of pipelining phases by adding dependencies to nodes in other phases.
Common buffering operations that give rise to new phases are sorting and reversing, and these are already implemented in the pipelining framework.
For an idea of how to properly implement a buffering node such as a reverser using node::add_dependency
, see tpie/pipelining/reverser.h
.
So far, all pipelining code we have seen has been heavily templated, and in practice, debugging and compiler errors will not be easy on the eyes. Also, with the current setup we have seen, it is not easy (if at all possible) to distribute node implementations across compiled objects.
However, the pipelining framework supports virtual chunks which operate on the same level as, but are orthogonal to, pipeline phases as discussed in the previous section.
Whereas phases are computed at runtime and define the runtime order in which the node implementations have begin, go and end called, virtual chunks exist at compile time and are fused together at runtime.
Let us look at an example of how to use virtual chunks. The following is an example of an HTML handler with optional parsing and weeding. If weeding is requested (noText or noDynamic is set to true), the input HTML is parsed. Otherwise, it is fed directly to the output without parsing. The items passed around are html_tokens (representing a context-free HTML token; text, start node, end node, attribute, etc.) and tag_paths (representing a context-sensitive HTML leaf node; a token as well as the path from the root to the token).
Usually, supporting virtual chunks requires no additional work on the node end, as long as the node is templated to accept any node as destination.
In addition to constructing virtual chunks inline from pipe_bases, virtual chunks may be returned from a function in an implementation object out into a using object. This way, the using object does not have to define the node implementations - all it has to know is the type of object passed between the virtual chunks.
If the above options were implemented using compile-time switching on template parameters, the emitted code size would be eight times as large, corresponding to the eight different combinations of choices for noText, noDynamic and url.empty().
0. The following virtual methods may be overridden in the implementation:
prepare
begin
end
evacuate
(in which case can_evacuate
should be overridden to return true)set_available_memory
The node
is constructed, and prepare()
is called. The constructor should call any of the following node protected methods as appropriate:
set_name
add_push_destination
add_pull_source
add_dependency
Furthermore, the constructor and/or prepare()
may call the following:
set_memory_fraction
set_minimum_memory
fetch
forward
Virtual method set_available_memory
is called. If overridden, the overriding implementation must call node::set_available_memory
with the same argument.
begin()
is called on all nodes in the current phase in topological order of the item flow graph, beginning with the initiator node. At this point, the implementation may fetch<>()
auxiliary data from its predecessors and forward()
auxiliary data to its successors. The default implementation of begin()
is empty, so it is not necessary to perform a super call. If the node does not push as many items as are pushed into it, it should check using can_fetch()
whether "items"
has been forwarded. "items"
is a stream_size_type, and the node should forward the expected number of items being pushed to the destination. If the implementation wants to report progress information, it should call set_steps()
in begin.push()
or pull()
is called any number of times.end()
is called on all nodes in the current phase in topological order of the actor graph, such that it is permitted to call push
and pull
in end()
. The default implementation of end()
does nothing, so there is no reason to perform a super call.can_evacuate()
is overridden to return true, and evacuation is necessary, evacuate()
is called, in which case the node should deallocate all internal buffers shared between this node and a depending node.Each row in the following matrix has a method called by the framework on the left, and a checkmark in the row for each method an implementation may call.
Framework calls | set_name | add_*_destination add_dependency | set_memory_fraction set_minimum_memory | forward | can_fetch | fetch | push | can_pull | pull |
---|---|---|---|---|---|---|---|---|---|
constructor | X | X | X | ||||||
prepare | X | X | X | X | |||||
set_available_memory | |||||||||
begin | X | X | |||||||
push | X | X | X | ||||||
can_pull | X | ||||||||
pull | X | X | X | ||||||
end | X | X | X |
Note that the push, can_pull and pull contracts are those obeyed by the pipelining node implementations in the library; the core framework itself does not enforce these requirements.
You will rarely need to implement initiators. For an initiator, instead of push()
, the virtual go()
method must be overridden, and this is called once. go() should forward() a piece of stream_size_type data named "items" indicating the expected number of items being pushed.
Common faults in node implementations include
add_push_destination/add_pull_source/add_dependency
. If pipeline::plot
does not yield a graph containing all the nodes you expect, this is probably the problem.node
base class. This usually leads to an error that tpie::pipelining::bits::node_map::ptr tpie::pipelining::node::get_node_map() const
is inaccessible.Each node has three properties that control how much memory is assigned to it: its minimum memory, its maximum memory and its memory fraction. The framework guarantees that the amount of memory assigned to the node is between the minimum and the maximum of the node.
If there is not enough available memory in total to satisfy all minimum memory requirements, a warning is printed to the log and memory overusage can be expected.
The principle behind the memory fraction is this. In the absence of minimum and maximum memory requirements, memory assigned to two nodes is distributed proportionally to their memory fractions, so a node with fraction 2.0 is assigned twice the memory of a node with fraction 1.0. Given minimum and maximum memory requirements, the framework first tries to distribute memory proportionally to all memory fractions. If this does not violate min/max constraints, this memory assignment is used. Otherwise, the framework performs a binary search to find a memory assignment that honors min/max constraints first and proportionality in memory fractions second.
The pipelining framework provides transparent parallel execution of pipelines. For CPU intensive computations in which the function of one item does not depend on the previous item such as point projection, one simply wraps the part of the pipeline to be parallelized in a call to parallel()
as such:
The three extra parameters, maintainOrder
(def. arbitrary_order), numJobs
(def. tpie::default_worker_count) and bufSize
(def. 64), are optional. If maintainOrder
is set to maintain_order, the framework will make sure that the output is produced in the same order as the input, which may incur a performance penalty in some cases when the execution time varies per item. numJobs
declares the number of worker threads to utilize. It defaults to the same number of worker threads as used by e.g. parallel internal sorting. bufSize
is the number of items that are sent to a thread at a time. There is an overhead associated to each buffer sent (a couple virtual calls and a thread switch), so you should not set this too low. On the other hand, a larger buffer increases the memory overhead.
The pipelining framework comes with a library of node implementations.
To get simple buffering to disk of an item stream, a delayed_buffer()
will accept pushed items, store them in a temporary file, and push them to its destination in another phase. This is necessary, for instance, when the item stream is being sorted (which is a buffering operation in itself), but the same item stream is later needed in its original order.
For a buffer that accepts items pushed to it and can be pulled from in another phase, define a local passive_buffer
, and get its input and output parts with passive_buffer::input()
and passive_buffer::output()
, respectively.
Like the buffer, the reverser exists as an active push input/push output and a passive push input/pull output form. For the passive reverser, define a passive_reverser
and use passive_reverser::sink
and passive_reverser::source
. For the active reverser, simply use reverser()
.
Like the buffer and the reverser, there is an active sorter, pipesort()
, and a passive sorter with passive_sorter::input()
and passive_sorter::output()
. Both accept an optional less-than-predicate that defaults to std::less
.
To read and entire file_stream and push its contents, define a file_stream
variable, for instance file_stream<size_t> foo;
and use it in your pipeline as input(foo)
. For a pull pipe, use pull_input(foo)
. Similarly, for outputting to a file_stream, there are the output(foo)
and pull_output(foo)
nodes. To write the item stream to a file_stream and push it on to another destination, use tee(foo)
.
For reading and writing 32-bit ints using scanf (stdin) and printf (stdout), the pipelining framework provides scanf_ints()
and printf_ints()
.