20 #ifndef __TPIE_PIPELINING_NODE_H__
21 #define __TPIE_PIPELINING_NODE_H__
23 #include <tpie/pipelining/exception.h>
27 #include <boost/any.hpp>
28 #include <tpie/pipelining/priority_type.h>
33 namespace pipelining {
65 STATE_AFTER_PROPAGATE,
82 return m_minimumMemory;
90 return m_maximumMemory;
97 return m_availableMemory;
105 m_memoryFraction = f;
112 return m_memoryFraction;
120 return token.get_map();
179 log_warning() <<
"node subclass " <<
typeid(*this).name() <<
" uses old go() interface" << std::endl;
187 log_warning() <<
"node subclass " <<
typeid(*this).name() <<
" is not an initiator node" << std::endl;
226 return m_namePriority;
241 inline void set_name(
const std::string & name, priority_type priority = PRIORITY_USER) {
243 m_namePriority = priority;
250 m_name = m_name.empty() ? breadcrumb : (breadcrumb +
" | " + m_name);
259 m_successors.push_back(succ);
284 STATE get_state()
const {
288 void set_state(STATE s) {
299 , m_maximumMemory(std::numeric_limits<memory_size_type>::max())
300 , m_availableMemory(0)
301 , m_memoryFraction(0.0)
302 , m_namePriority(PRIORITY_NO_NAME)
306 , m_state(STATE_FRESH)
315 : token(other.token, this)
316 , m_minimumMemory(other.m_minimumMemory)
317 , m_maximumMemory(other.m_maximumMemory)
318 , m_availableMemory(other.m_availableMemory)
319 , m_memoryFraction(other.m_memoryFraction)
320 , m_name(other.m_name)
321 , m_namePriority(other.m_namePriority)
322 , m_stepsTotal(other.m_stepsTotal)
323 , m_stepsLeft(other.m_stepsLeft)
325 , m_state(other.m_state)
327 if (m_state != STATE_FRESH)
329 "Tried to copy pipeline node after prepare had been called");
336 : token(token, this, true)
338 , m_maximumMemory(std::numeric_limits<memory_size_type>::max())
339 , m_availableMemory(0)
340 , m_memoryFraction(0.0)
341 , m_namePriority(PRIORITY_NO_NAME)
345 , m_state(STATE_FRESH)
353 bits::node_map::ptr m = token.map_union(dest);
354 m->add_relation(token.id(), dest.id(), bits::pushes);
361 if (get_state() != STATE_FRESH) {
371 if (get_state() != STATE_FRESH) {
374 bits::node_map::ptr m = token.map_union(dest);
375 m->add_relation(token.id(), dest.id(), bits::pulls);
405 bits::node_map::ptr m = token.map_union(dest);
406 m->add_relation(token.id(), dest.id(), bits::depends);
422 if (get_state() != STATE_FRESH && get_state() != STATE_IN_PREPARE) {
425 m_minimumMemory = minimumMemory;
435 if (get_state() != STATE_FRESH && get_state() != STATE_IN_PREPARE) {
438 m_maximumMemory = maximumMemory;
446 m_availableMemory = availableMemory;
460 template <
typename T>
461 void forward(std::string key, T value,
bool explicitForward =
true) {
462 forward_any(key, boost::any(value), explicitForward);
468 void forward_any(std::string key, boost::any value,
bool explicitForward =
true) {
469 switch (get_state()) {
470 case STATE_IN_PREPARE:
471 log_debug() <<
"forward in prepare" << std::endl;
473 case STATE_IN_PROPAGATE:
474 log_debug() <<
"forward in propagate" << std::endl;
477 log_debug() <<
"forward in begin" << std::endl;
479 case STATE_AFTER_BEGIN:
480 log_debug() <<
"forward after begin" << std::endl;
482 case STATE_AFTER_END:
485 log_debug() <<
"forward in unknown state " << get_state() << std::endl;
489 for (
size_t i = 0; i < m_successors.size(); ++i) {
490 m_successors[i]->add_forwarded_data(key, value, explicitForward);
502 if (m_values.count(key) &&
503 !explicitForward && m_values[key].second)
return;
504 m_values[key].first = value;
505 m_values[key].second = explicitForward;
514 return m_values.count(key) != 0;
522 if (m_values.count(key) != 0) {
523 return m_values[key].first;
525 std::stringstream ss;
526 ss <<
"Tried to fetch nonexistent key '" << key <<
'\'';
534 template <
typename T>
536 if (m_values.count(key) == 0) {
537 std::stringstream ss;
538 ss <<
"Tried to fetch nonexistent key '" << key
539 <<
"' of type " <<
typeid(T).name();
543 return boost::any_cast<T>(m_values[key].first);
544 }
catch (boost::bad_any_cast m) {
545 std::stringstream ss;
546 ss <<
"Trying to fetch key '" << key <<
"' of type "
547 <<
typeid(T).name() <<
" but forwarded data was of type "
548 << m_values[key].first.type().name() <<
". Message was: " << m.what();
567 switch (get_state()) {
569 case STATE_IN_PREPARE:
570 case STATE_IN_PROPAGATE:
573 log_error() <<
"set_steps in begin(); use set_steps in propagate() instead." << std::endl;
576 log_error() <<
"set_steps in unknown state " << get_state() << std::endl;
579 m_stepsTotal = m_stepsLeft = steps;
586 void step(stream_size_type steps = 1) {
587 assert(get_state() == STATE_IN_END || get_state() == STATE_AFTER_BEGIN || get_state() == STATE_IN_END);
588 if (m_stepsLeft < steps) {
589 log_warning() <<
typeid(*this).name() <<
" ==== Too many steps!" << std::endl;
592 m_stepsLeft -= steps;
605 if (m_piProxy.get() != 0)
return m_piProxy.get();
621 inline item_type
pull();
626 inline void push(
const item_type & item);
634 memory_size_type m_minimumMemory;
635 memory_size_type m_maximumMemory;
636 memory_size_type m_availableMemory;
637 double m_memoryFraction;
640 priority_type m_namePriority;
642 std::vector<node *> m_successors;
643 typedef std::map<std::string, std::pair<boost::any, bool> > valuemap;
646 stream_size_type m_stepsTotal;
647 stream_size_type m_stepsLeft;
650 std::auto_ptr<progress_indicator_base> m_piProxy;
658 double proxyMax =
static_cast<double>(
get_range());
659 double proxyCur =
static_cast<double>(
get_current());
660 double parentMax =
static_cast<double>(m_node.m_stepsTotal);
661 double parentCur =
static_cast<double>(m_node.m_stepsTotal-m_node.m_stepsLeft);
662 double missing = parentMax*proxyCur/proxyMax - parentCur;
663 if (missing < 1.0)
return;
664 stream_size_type times =
static_cast<stream_size_type
>(1.0+missing);
665 times = std::min(m_node.m_stepsLeft, times);
675 #endif // __TPIE_PIPELINING_NODE_H__
memory_size_type get_available_memory() const
Get the amount of memory assigned to this node.
bool can_pull() const
For pull nodes, return true if there are more items to be pulled.
void add_dependency(const node &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
virtual void begin()
Begin pipeline processing phase.
The base class for indicating the progress of some task.
Null-object progress indicator.
virtual void evacuate()
Overridden by nodes that have data to evacuate.
stream_size_type get_range()
Get the maximum value of the current range.
virtual ~node()
Virtual dtor.
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
void refresh()
Display the indicator.
bits::node_map::ptr get_node_map() const
Get the local node map, mapping node IDs to node pointers for all the nodes reachable from this one...
void add_push_destination(const node &dest)
Called by implementers to declare a push destination.
virtual void go()
For initiator nodes, execute this phase by pushing all items to be pushed.
virtual void prepare()
Called before memory assignment but after depending phases have executed and ended.
void add_pull_destination(const node_token &dest)
Legacy alias of add_pull_source.
stream_size_type get_steps()
Used internally for progress indication.
void set_progress_indicator(progress_indicator_base *pi)
Used internally. Set the progress indicator to use.
virtual void set_available_memory(memory_size_type availableMemory)
Called by the memory manager to set the amount of memory assigned to this node.
void push(const item_type &item)
For push nodes, push the next item to this node.
node(const node_token &token)
Constructor using a given fresh node_token.
void add_forwarded_data(std::string key, boost::any value, bool explicitForward=true)
Called by users to add forwarded data to this node and recursively to its successors.
void set_breadcrumb(const std::string &breadcrumb)
Used internally when a pair_factory has a name set.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
priority_type get_name_priority()
Get the priority of this node's name.
memory_size_type get_maximum_memory() const
Get the maximum amount of memory declared by this node.
progress_indicator_base * proxy_progress_indicator()
Get a non-initialized progress indicator for use with external implementations.
const node_token & get_token()
Get the node_token that maps this node's ID to a pointer to this.
a dummy progress indicator that produces no output
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
progress_indicator_base * get_progress_indicator()
Used internally. Get the progress indicator used.
Management of a single pipelining phase.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Predeclarations of some pipelining classes.
stream_size_type get_current()
Get the current value of the step counter.
void add_successor(node *succ)
Used internally to facilitate forwarding parameters to successors in the item flow graph...
logstream & log_debug()
Return logstream for writing debug log messages.
virtual void propagate()
Propagate stream metadata.
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
void forward_any(std::string key, boost::any value, bool explicitForward=true)
See node::forward.
virtual void end()
End pipeline processing phase.
void step(stream_size_type steps=1)
Step the progress indicator.
void add_pull_source(const node &dest)
Called by implementers to declare a pull source.
void set_memory_fraction(double f)
Set the memory priority of this node.
boost::any fetch_any(std::string key)
Fetch piece of auxiliary data as boost::any (the internal representation).
void add_pull_destination(const node &dest)
Legacy alias of add_pull_source.
virtual void go(progress_indicator_base &)
Deprecated go()-implementation signature.
virtual bool can_evacuate()
Overridden by nodes that have data to evacuate.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
double get_memory_fraction() const
Get the memory priority of this node.
item_type pull()
For pull nodes, pull the next item from this node.
node()
Default constructor, using a new node_token.
void forward(std::string key, T value, bool explicitForward=true)
Called by implementers to forward auxiliary data to successors.
bool can_fetch(std::string key)
Find out if there is a piece of auxiliary data forwarded with a given name.
node_token::id_t get_id() const
Get the internal node ID of this node (mainly for debugging purposes).
node(const node &other)
Copy constructor.
logstream & log_error()
Return logstream for writing error log messages.
logstream & log_warning()
Return logstream for writing warning log messages.
const std::string & get_name()
Get this node's name.
T fetch(std::string key)
Fetch piece of auxiliary data, expecting a given value type.
memory_size_type get_minimum_memory() const
Get the minimum amount of memory declared by this node.