TPIE

v1.1rc1-6-g0c97303
Pipelining

Pipelining concepts

Some algorithms can be expressed in terms of stream sweeps. For instance, the Graham sweep algorithm for computing the convex hull of a set of points will sweep through the input points from left to right, maintaining the upper convex hull and the lower convex hull in a stack of points. Thus, the algorithm consists of four components: reading the input, sorting by x-coordinate, computing the upper and lower hull, and reporting the convex polygon.

The pipelining framework is used to implement such so-called streaming algorithms that process streams of items in this manner. The programmer implements the specialized components needed for the algorithm (computing the upper and lower hull, for instance), and stitches them together with built-ins such as reading, sorting and writing.

In this way, we may test each component individually and reuse them in multiple contexts. Without the TPIE pipelining framework, streaming algorithms are often implemented as monolithic classes with multiple interdependent methods that do not facilitate individual development and testing. Using virtual polymorphism may enable unit testing of components, but the virtual method speed penalty paid per item per operation is too high a price to pay in our case.

What we want instead is a kind of compile-time polymorphism: Implementations of operations that use C++ generic programming to let the programmer mix and match at code time, but have the methods inlined and fused together at compile time. Without an underlying framework, this kind of architecture will lead to a lot of typedefs that are overly verbose and somewhat unmaintainable.

The pipelining framework provides compile-time polymorphism along with high maintainability, high testability and low verbosity.

Nodes

The unit of currency in pipelining is the node implementations. Each pipelining node performs either a general-purpose or specialized operation that is used in streaming algorithms. An implementation must derive from tpie::pipelining::node and implement either push() or the pull() / can_pull() pair. In the first case, the node is called a push node, and in the second case, it is called a pull node. The exception to this rule is the initiator node which instead overrides the virtual go() method. Furthermore, any implementation may request memory using the set_minimum_memory() and set_memory_fraction() methods in either its constructor or in an overridden virtual prepare() method; it may override begin() and end() to do initialization and finalization related to its computation.

  • A node SHOULD aggregate the node(s) it pushes to or pulls from, and thus a node is usually templated on its destination type.
  • A node SHOULD be copyable such that any other copyable node may aggregate it.
  • A push/pull node MUST declare a public typedef item_type, the type of item accepted by push or returned by pull. This is used by built-in type generic nodes such as the sorter or the reverser.
  • At runtime, it MUST declare the node(s) it pushes to or pulls from by calling the methods node::add_push_destination and node::add_pull_source.
  • At runtime, it SHOULD declare a name by calling the method node::set_name.

Thus, a node implementation of a push node may look as follows:

template <typename dest_t>
class hello_world_type : public tpie::pipelining::node {
dest_t dest;
public:
typedef tpie::memory_size_type item_type;
hello_world_type(dest_t dest)
: dest(dest)
{
add_push_destination(dest); // dest refers to the parameter
// dest and not the member
// this->dest, but in this case
// the framework does not care
set_name("My first pipelining node");
}
void push(const item_type & item) {
if ((item % 2) == 0) {
dest.push(item/2);
} else {
dest.push(3*item+1);
}
}
};

Factories

Since the C++ language does not infer template arguments to constructor calls, but does infer template arguments to functions and methods, we use factories to instantiate the node implementations. Usually, the built-in factories contained in factory_helpers.h will suffice:

but in some cases it is helpful to implement one's own factory.

We could implement a hello_world_factory as follows:

class hello_world_factory : public tpie::pipelining::factory_base {
public:
template <typename dest_t>
struct constructed {
typedef hello_world_type<dest_t> type;
};
template <typename dest_t>
hello_world_type<dest_t> construct(const dest_t & dest) {
hello_world_type<dest_t> hw(dest);
this->init_node(hw);
return hw;
}
};

For a terminating node, which doesn't have a destination, we would implement a so called termfactory as follows:

class goodbye_world_type : public tpie::pipelining::node {
public:
typedef tpie::memory_size_type item_type;
void push(item_type) {}
};
class goodbye_world_factory : public tpie::pipelining::factory_base {
public:
typedef goodbye_world constructed_type;
goodbye_world_type construct() {
goodbye_world_type gw;
this->init_node(gw);
return gw;
}
};

The main differences between an ordinary factory and a termfactory:

  • Instead of a templated construct() accepting the destination as its first parameter, the construct() method takes no parameters, and
  • constructed<dest_t>::type is replaced by the simpler constructed_type typedef.

Factory concatenation

To use the above defined factories, we might write the following:

using namespace tpie;
using namespace tpie::pipelining;
hello_world_factory fact1;
goodbye_world_factory fact2;
pipeline p = fact0.construct(fact1.construct(fact2.construct()));
p();

However, this is tedious, and so the pipelining framework provides several helper classes to ease the construction of pipelines, namely the descendants of pipe_base which are called pipe_begin, pipe_middle and pipe_end.

inline pipe_middle<factory_0<hello_world_type> >
hello_world() {
return factory_0<hello_world_type>();
}
inline pipe_end<termfactory_0<goodbye_world_type> >
goodbye_world() {
return termfactory_0<goodbye_world_type>();
}

which we would use as follows:

using namespace tpie;
using namespace tpie::pipelining;
pipeline p = input(inputstream) | hello_world() | goodbye_world();
p();

The three terms that are piped together have types pipe_begin, pipe_middle and pipe_end respectively. As one might expect, piping together a pipe_begin and a pipe_middle yields a new pipe_begin, and piping together a pipe_begin and a pipe_end yields a pipeline object (actually a pipeline_impl object).

Pipeline phases

Consider the following implementation of a reverser:

template <typename dest_t>
class reverser_type : public tpie::pipelining::node {
dest_t dest;
public:
typedef point3d item_type;
reverser_type(const dest_t & dest)
: dest(dest)
{
set_name("Reverser",
tpie::pipelining::PRIORITY_SIGNIFICANT);
}
void push(point3d p) {
points.push(p);
}
void end() {
// Pushing items in end() is bad!
while (!points.empty()) {
dest.push(points.pop());
}
}
};

This implementation seems innocuous at first, but it is in fact very wasteful. Note that the reverser needs to know the entire stream before it can push anything to its destination. This means that when all items have been pushed to it, we could as well deallocate all the item buffers that earlier nodes may have used while processing. As well, we could have waited until the stack was ready to push before initializing later nodes in the pipeline.

This is what pipelining phases are for. Phases are collections of nodes that do not have to operate synchronously. Nodes may establish an ordering of pipelining phases by adding dependencies to nodes in other phases.

Common buffering operations that give rise to new phases are sorting and reversing, and these are already implemented in the pipelining framework.

For an idea of how to properly implement a buffering node such as a reverser using node::add_dependency, see tpie/pipelining/reverser.h.

Virtual chunks

So far, all pipelining code we have seen has been heavily templated, and in practice, debugging and compiler errors will not be easy on the eyes. Also, with the current setup we have seen, it is not easy (if at all possible) to distribute node implementations across compiled objects.

However, the pipelining framework supports virtual chunks which operate on the same level as, but are orthogonal to, pipeline phases as discussed in the previous section.

Whereas phases are computed at runtime and define the runtime order in which the node implementations have begin, go and end called, virtual chunks exist at compile time and are fused together at runtime.

Let us look at an example of how to use virtual chunks. The following is an example of an HTML handler with optional parsing and weeding. If weeding is requested (noText or noDynamic is set to true), the input HTML is parsed. Otherwise, it is fed directly to the output without parsing. The items passed around are html_tokens (representing a context-free HTML token; text, start node, end node, attribute, etc.) and tag_paths (representing a context-sensitive HTML leaf node; a token as well as the path from the root to the token).

virtual_chunk_begin<html_token> input_pipe;
virtual_chunk<html_token, tag_path> parse_pipe;
virtual_chunk<tag_path, tag_path> remove_text;
virtual_chunk<tag_path, tag_path> remove_dynamic;
virtual_chunk_end<tag_path> reassembling_output;
virtual_chunk_end<html_token> simple_output;
if (!url.empty()) {
input_pipe = curl_input(url) // pipe_begin
| curl_body_extract() // pipe_middle
| html_scanner(); // pipe_middle
// result is boxed into a virtual chunk
} else {
input_pipe = default_tag_generator();
// pipe_begin boxed into virtual chunk
}
pipeline p;
if (noText || noDynamic) {
parse_pipe = html_parser();
if (noText)
remove_text = html_text_weeder();
if (noDynamic)
remove_dynamic = html_javascript_weeder()
| html_css_weeder();
reassembling_output = html_reassembler();
p = input_pipe // virtual_chunk_begin
| parse_pipe // virtual_chunk_middle
| remove_text // optional virtual_chunk_middle
| remove_dynamic // optional virtual_chunk_middle
| reassembling_output; // virtual_chunk_end
} else {
simple_output = tag_printer();
p = input_pipe // virtual_chunk_begin
| simple_output; // virtual_chunk_end
}
p(); // invoking the pipeline as without chunks

Usually, supporting virtual chunks requires no additional work on the node end, as long as the node is templated to accept any node as destination.

In addition to constructing virtual chunks inline from pipe_bases, virtual chunks may be returned from a function in an implementation object out into a using object. This way, the using object does not have to define the node implementations - all it has to know is the type of object passed between the virtual chunks.

If the above options were implemented using compile-time switching on template parameters, the emitted code size would be eight times as large, corresponding to the eight different combinations of choices for noText, noDynamic and url.empty().

Order of operations

0. The following virtual methods may be overridden in the implementation:

  • prepare
  • begin
  • end
  • evacuate (in which case can_evacuate should be overridden to return true)
  • set_available_memory

The node is constructed, and prepare() is called. The constructor should call any of the following node protected methods as appropriate:

  • set_name
  • add_push_destination
  • add_pull_source
  • add_dependency

Furthermore, the constructor and/or prepare() may call the following:

  • set_memory_fraction
  • set_minimum_memory
  • fetch
  • forward

Virtual method set_available_memory is called. If overridden, the overriding implementation must call node::set_available_memory with the same argument.

  1. begin() is called on all nodes in the current phase in topological order of the item flow graph, beginning with the initiator node. At this point, the implementation may fetch<>() auxiliary data from its predecessors and forward() auxiliary data to its successors. The default implementation of begin() is empty, so it is not necessary to perform a super call. If the node does not push as many items as are pushed into it, it should check using can_fetch() whether "items" has been forwarded. "items" is a stream_size_type, and the node should forward the expected number of items being pushed to the destination. If the implementation wants to report progress information, it should call set_steps() in begin.
  2. Either push() or pull() is called any number of times.
  3. end() is called on all nodes in the current phase in topological order of the actor graph, such that it is permitted to call push and pull in end(). The default implementation of end() does nothing, so there is no reason to perform a super call.
  4. If can_evacuate() is overridden to return true, and evacuation is necessary, evacuate() is called, in which case the node should deallocate all internal buffers shared between this node and a depending node.

Method matrix

Each row in the following matrix has a method called by the framework on the left, and a checkmark in the row for each method an implementation may call.

Framework
calls
set_nameadd_*_destination
add_dependency
set_memory_fraction
set_minimum_memory
forwardcan_fetchfetchpushcan_pullpull
constructor X X X
prepare X X X X
set_available_memory
begin X X
push X X X
can_pull X
pull X X X
end X X X

Note that the push, can_pull and pull contracts are those obeyed by the pipelining node implementations in the library; the core framework itself does not enforce these requirements.

Initiator nodes

You will rarely need to implement initiators. For an initiator, instead of push(), the virtual go() method must be overridden, and this is called once. go() should forward() a piece of stream_size_type data named "items" indicating the expected number of items being pushed.

Implementation troubleshooting

Common faults in node implementations include

  • Missing add_push_destination/add_pull_source/add_dependency. If pipeline::plot does not yield a graph containing all the nodes you expect, this is probably the problem.
  • Non-public inheritance of the node base class. This usually leads to an error that tpie::pipelining::bits::node_map::ptr tpie::pipelining::node::get_node_map() const is inaccessible.

Memory assignment

Each node has three properties that control how much memory is assigned to it: its minimum memory, its maximum memory and its memory fraction. The framework guarantees that the amount of memory assigned to the node is between the minimum and the maximum of the node.

If there is not enough available memory in total to satisfy all minimum memory requirements, a warning is printed to the log and memory overusage can be expected.

The principle behind the memory fraction is this. In the absence of minimum and maximum memory requirements, memory assigned to two nodes is distributed proportionally to their memory fractions, so a node with fraction 2.0 is assigned twice the memory of a node with fraction 1.0. Given minimum and maximum memory requirements, the framework first tries to distribute memory proportionally to all memory fractions. If this does not violate min/max constraints, this memory assignment is used. Otherwise, the framework performs a binary search to find a memory assignment that honors min/max constraints first and proportionality in memory fractions second.

Parallel execution

The pipelining framework provides transparent parallel execution of pipelines. For CPU intensive computations in which the function of one item does not depend on the previous item such as point projection, one simply wraps the part of the pipeline to be parallelized in a call to parallel() as such:

maintain_order_type maintainOrder = arbitrary_order; // or maintain_order
size_t numJobs = 4;
size_t bufSize = 1024;
pipeline p =
input_points()
| parallel(projection(mat), maintainOrder, numJobs, bufSize)
| output_points();

The three extra parameters, maintainOrder (def. arbitrary_order), numJobs (def. tpie::default_worker_count) and bufSize (def. 64), are optional. If maintainOrder is set to maintain_order, the framework will make sure that the output is produced in the same order as the input, which may incur a performance penalty in some cases when the execution time varies per item. numJobs declares the number of worker threads to utilize. It defaults to the same number of worker threads as used by e.g. parallel internal sorting. bufSize is the number of items that are sent to a thread at a time. There is an overhead associated to each buffer sent (a couple virtual calls and a thread switch), so you should not set this too low. On the other hand, a larger buffer increases the memory overhead.

Pipelining library

The pipelining framework comes with a library of node implementations.

Buffer

To get simple buffering to disk of an item stream, a delayed_buffer() will accept pushed items, store them in a temporary file, and push them to its destination in another phase. This is necessary, for instance, when the item stream is being sorted (which is a buffering operation in itself), but the same item stream is later needed in its original order.

For a buffer that accepts items pushed to it and can be pulled from in another phase, define a local passive_buffer, and get its input and output parts with passive_buffer::input() and passive_buffer::output(), respectively.

Reverser

Like the buffer, the reverser exists as an active push input/push output and a passive push input/pull output form. For the passive reverser, define a passive_reverser and use passive_reverser::sink and passive_reverser::source. For the active reverser, simply use reverser().

Sorter

Like the buffer and the reverser, there is an active sorter, pipesort(), and a passive sorter with passive_sorter::input() and passive_sorter::output(). Both accept an optional less-than-predicate that defaults to std::less.

Input and output files

To read and entire file_stream and push its contents, define a file_stream variable, for instance file_stream<size_t> foo; and use it in your pipeline as input(foo). For a pull pipe, use pull_input(foo). Similarly, for outputting to a file_stream, there are the output(foo) and pull_output(foo) nodes. To write the item stream to a file_stream and push it on to another destination, use tee(foo).

scanf and printf

For reading and writing 32-bit ints using scanf (stdin) and printf (stdout), the pipelining framework provides scanf_ints() and printf_ints().