TPIE

v1.1rc1-6-g0c97303
node.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_NODE_H__
21 #define __TPIE_PIPELINING_NODE_H__
22 
23 #include <tpie/pipelining/exception.h>
24 #include <tpie/pipelining/tokens.h>
27 #include <boost/any.hpp>
28 #include <tpie/pipelining/priority_type.h>
30 
31 namespace tpie {
32 
33 namespace pipelining {
34 
35 namespace bits {
36 
38  node & m_node;
39 
40 public:
43  , m_node(s)
44  {
45  }
46 
47  inline void refresh();
48 };
49 
50 } // namespace bits
51 
58 class node {
59 public:
60  enum STATE {
61  STATE_FRESH,
62  STATE_IN_PREPARE,
63  STATE_AFTER_PREPARE,
64  STATE_IN_PROPAGATE,
65  STATE_AFTER_PROPAGATE,
66  STATE_IN_BEGIN,
67  STATE_AFTER_BEGIN,
68  STATE_IN_END,
69  STATE_AFTER_END
70  };
71 
75  virtual ~node() {}
76 
81  inline memory_size_type get_minimum_memory() const {
82  return m_minimumMemory;
83  }
84 
89  inline memory_size_type get_maximum_memory() const {
90  return m_maximumMemory;
91  }
92 
96  inline memory_size_type get_available_memory() const {
97  return m_availableMemory;
98  }
99 
104  inline void set_memory_fraction(double f) {
105  m_memoryFraction = f;
106  }
107 
111  inline double get_memory_fraction() const {
112  return m_memoryFraction;
113  }
114 
119  inline bits::node_map::ptr get_node_map() const {
120  return token.get_map();
121  }
122 
127  inline node_token::id_t get_id() const {
128  return token.id();
129  }
130 
137  virtual void prepare() {
138  }
139 
151  virtual void propagate() {
152  }
153 
167  virtual void begin() {
168  }
169 
175  virtual void go() {
177  go(pi);
178  // if go didn't throw, it was overridden - but it shouldn't be
179  log_warning() << "node subclass " << typeid(*this).name() << " uses old go() interface" << std::endl;
180  }
181 
186  virtual void go(progress_indicator_base &) {
187  log_warning() << "node subclass " << typeid(*this).name() << " is not an initiator node" << std::endl;
188  throw not_initiator_node();
189  }
190 
205  virtual void end() {
206  }
207 
211  virtual bool can_evacuate() {
212  return false;
213  }
214 
218  virtual void evacuate() {
219  }
220 
225  inline priority_type get_name_priority() {
226  return m_namePriority;
227  }
228 
233  inline const std::string & get_name() {
234  return m_name;
235  }
236 
241  inline void set_name(const std::string & name, priority_type priority = PRIORITY_USER) {
242  m_name = name;
243  m_namePriority = priority;
244  }
245 
249  inline void set_breadcrumb(const std::string & breadcrumb) {
250  m_name = m_name.empty() ? breadcrumb : (breadcrumb + " | " + m_name);
251  }
252 
258  inline void add_successor(node * succ) {
259  m_successors.push_back(succ);
260  }
261 
266  inline stream_size_type get_steps() {
267  return m_stepsTotal;
268  }
269 
274  m_pi = pi;
275  }
276 
281  return m_pi;
282  }
283 
284  STATE get_state() const {
285  return m_state;
286  }
287 
288  void set_state(STATE s) {
289  m_state = s;
290  }
291 
292 protected:
296  inline node()
297  : token(this)
298  , m_minimumMemory(0)
299  , m_maximumMemory(std::numeric_limits<memory_size_type>::max())
300  , m_availableMemory(0)
301  , m_memoryFraction(0.0)
302  , m_namePriority(PRIORITY_NO_NAME)
303  , m_stepsTotal(0)
304  , m_stepsLeft(0)
305  , m_pi(0)
306  , m_state(STATE_FRESH)
307  {
308  }
309 
314  inline node(const node & other)
315  : token(other.token, this)
316  , m_minimumMemory(other.m_minimumMemory)
317  , m_maximumMemory(other.m_maximumMemory)
318  , m_availableMemory(other.m_availableMemory)
319  , m_memoryFraction(other.m_memoryFraction)
320  , m_name(other.m_name)
321  , m_namePriority(other.m_namePriority)
322  , m_stepsTotal(other.m_stepsTotal)
323  , m_stepsLeft(other.m_stepsLeft)
324  , m_pi(other.m_pi)
325  , m_state(other.m_state)
326  {
327  if (m_state != STATE_FRESH)
328  throw call_order_exception(
329  "Tried to copy pipeline node after prepare had been called");
330  }
331 
335  inline node(const node_token & token)
336  : token(token, this, true)
337  , m_minimumMemory(0)
338  , m_maximumMemory(std::numeric_limits<memory_size_type>::max())
339  , m_availableMemory(0)
340  , m_memoryFraction(0.0)
341  , m_namePriority(PRIORITY_NO_NAME)
342  , m_stepsTotal(0)
343  , m_stepsLeft(0)
344  , m_pi(0)
345  , m_state(STATE_FRESH)
346  {
347  }
348 
352  inline void add_push_destination(const node_token & dest) {
353  bits::node_map::ptr m = token.map_union(dest);
354  m->add_relation(token.id(), dest.id(), bits::pushes);
355  }
356 
360  inline void add_push_destination(const node & dest) {
361  if (get_state() != STATE_FRESH) {
362  throw call_order_exception("add_push_destination called too late");
363  }
364  add_push_destination(dest.token);
365  }
366 
370  void add_pull_source(const node_token & dest) {
371  if (get_state() != STATE_FRESH) {
372  throw call_order_exception("add_pull_source called too late");
373  }
374  bits::node_map::ptr m = token.map_union(dest);
375  m->add_relation(token.id(), dest.id(), bits::pulls);
376  }
377 
381  void add_pull_source(const node & dest) {
382  add_pull_source(dest.token);
383  }
384 
388  void add_pull_destination(const node_token & dest) {
389  add_pull_source(dest);
390  }
391 
395  void add_pull_destination(const node & dest) {
396  add_pull_source(dest);
397  }
398 
404  inline void add_dependency(const node_token & dest) {
405  bits::node_map::ptr m = token.map_union(dest);
406  m->add_relation(token.id(), dest.id(), bits::depends);
407  }
408 
414  inline void add_dependency(const node & dest) {
415  add_dependency(dest.token);
416  }
417 
421  inline void set_minimum_memory(memory_size_type minimumMemory) {
422  if (get_state() != STATE_FRESH && get_state() != STATE_IN_PREPARE) {
423  throw call_order_exception("set_minimum_memory");
424  }
425  m_minimumMemory = minimumMemory;
426  }
427 
434  inline void set_maximum_memory(memory_size_type maximumMemory) {
435  if (get_state() != STATE_FRESH && get_state() != STATE_IN_PREPARE) {
436  throw call_order_exception("set_maximum_memory");
437  }
438  m_maximumMemory = maximumMemory;
439  }
440 
445  virtual void set_available_memory(memory_size_type availableMemory) {
446  m_availableMemory = availableMemory;
447  }
448 
454  // Implementation note: If the type of the `value` parameter is changed
455  // from `T` to `const T &`, this will yield linker errors if an application
456  // attempts to pass a const reference to a static data member inside a
457  // templated class.
458  // See http://stackoverflow.com/a/5392050
460  template <typename T>
461  void forward(std::string key, T value, bool explicitForward = true) {
462  forward_any(key, boost::any(value), explicitForward);
463  }
464 
468  void forward_any(std::string key, boost::any value, bool explicitForward = true) {
469  switch (get_state()) {
470  case STATE_IN_PREPARE:
471  log_debug() << "forward in prepare" << std::endl;
472  break;
473  case STATE_IN_PROPAGATE:
474  log_debug() << "forward in propagate" << std::endl;
475  break;
476  case STATE_IN_BEGIN:
477  log_debug() << "forward in begin" << std::endl;
478  break;
479  case STATE_AFTER_BEGIN:
480  log_debug() << "forward after begin" << std::endl;
481  break;
482  case STATE_AFTER_END:
483  throw call_order_exception("forward");
484  default:
485  log_debug() << "forward in unknown state " << get_state() << std::endl;
486  break;
487  }
488 
489  for (size_t i = 0; i < m_successors.size(); ++i) {
490  m_successors[i]->add_forwarded_data(key, value, explicitForward);
491  }
492  }
493 
494 public:
501  void add_forwarded_data(std::string key, boost::any value, bool explicitForward = true) {
502  if (m_values.count(key) &&
503  !explicitForward && m_values[key].second) return;
504  m_values[key].first = value;
505  m_values[key].second = explicitForward;
506  forward_any(key, value, false);
507  }
508 
513  inline bool can_fetch(std::string key) {
514  return m_values.count(key) != 0;
515  }
516 
521  inline boost::any fetch_any(std::string key) {
522  if (m_values.count(key) != 0) {
523  return m_values[key].first;
524  } else {
525  std::stringstream ss;
526  ss << "Tried to fetch nonexistent key '" << key << '\'';
527  throw invalid_argument_exception(ss.str());
528  }
529  }
530 
534  template <typename T>
535  inline T fetch(std::string key) {
536  if (m_values.count(key) == 0) {
537  std::stringstream ss;
538  ss << "Tried to fetch nonexistent key '" << key
539  << "' of type " << typeid(T).name();
540  throw invalid_argument_exception(ss.str());
541  }
542  try {
543  return boost::any_cast<T>(m_values[key].first);
544  } catch (boost::bad_any_cast m) {
545  std::stringstream ss;
546  ss << "Trying to fetch key '" << key << "' of type "
547  << typeid(T).name() << " but forwarded data was of type "
548  << m_values[key].first.type().name() << ". Message was: " << m.what();
549  throw invalid_argument_exception(ss.str());
550  }
551  }
552 
553 protected:
558  const node_token & get_token() {
559  return token;
560  }
561 
566  void set_steps(stream_size_type steps) {
567  switch (get_state()) {
568  case STATE_FRESH:
569  case STATE_IN_PREPARE:
570  case STATE_IN_PROPAGATE:
571  break;
572  case STATE_IN_BEGIN:
573  log_error() << "set_steps in begin(); use set_steps in propagate() instead." << std::endl;
574  throw call_order_exception("set_steps");
575  default:
576  log_error() << "set_steps in unknown state " << get_state() << std::endl;
577  throw call_order_exception("set_steps");
578  }
579  m_stepsTotal = m_stepsLeft = steps;
580  }
581 
586  void step(stream_size_type steps = 1) {
587  assert(get_state() == STATE_IN_END || get_state() == STATE_AFTER_BEGIN || get_state() == STATE_IN_END);
588  if (m_stepsLeft < steps) {
589  log_warning() << typeid(*this).name() << " ==== Too many steps!" << std::endl;
590  m_stepsLeft = 0;
591  } else {
592  m_stepsLeft -= steps;
593  }
594  m_pi->step(steps);
595  }
596 
605  if (m_piProxy.get() != 0) return m_piProxy.get();
607  m_piProxy.reset(pi);
608  return pi;
609  }
610 
611 #ifdef DOXYGEN
612  inline bool can_pull() const;
617 
621  inline item_type pull();
622 
626  inline void push(const item_type & item);
627 #endif
628 
629  friend class bits::phase;
630 
631 private:
632  node_token token;
633 
634  memory_size_type m_minimumMemory;
635  memory_size_type m_maximumMemory;
636  memory_size_type m_availableMemory;
637  double m_memoryFraction;
638 
639  std::string m_name;
640  priority_type m_namePriority;
641 
642  std::vector<node *> m_successors;
643  typedef std::map<std::string, std::pair<boost::any, bool> > valuemap;
644  valuemap m_values;
645 
646  stream_size_type m_stepsTotal;
647  stream_size_type m_stepsLeft;
649  STATE m_state;
650  std::auto_ptr<progress_indicator_base> m_piProxy;
651 
652  friend class bits::proxy_progress_indicator;
653 };
654 
655 namespace bits {
656 
658  double proxyMax = static_cast<double>(get_range());
659  double proxyCur = static_cast<double>(get_current());
660  double parentMax = static_cast<double>(m_node.m_stepsTotal);
661  double parentCur = static_cast<double>(m_node.m_stepsTotal-m_node.m_stepsLeft);
662  double missing = parentMax*proxyCur/proxyMax - parentCur;
663  if (missing < 1.0) return;
664  stream_size_type times = static_cast<stream_size_type>(1.0+missing);
665  times = std::min(m_node.m_stepsLeft, times);
666  m_node.step(times);
667 }
668 
669 } // namespace bits
670 
671 } // namespace pipelining
672 
673 } // namespace tpie
674 
675 #endif // __TPIE_PIPELINING_NODE_H__
memory_size_type get_available_memory() const
Get the amount of memory assigned to this node.
Definition: node.h:96
bool can_pull() const
For pull nodes, return true if there are more items to be pulled.
void add_dependency(const node &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
Definition: node.h:414
virtual void begin()
Begin pipeline processing phase.
Definition: node.h:167
The base class for indicating the progress of some task.
Null-object progress indicator.
virtual void evacuate()
Overridden by nodes that have data to evacuate.
Definition: node.h:218
stream_size_type get_range()
Get the maximum value of the current range.
virtual ~node()
Virtual dtor.
Definition: node.h:75
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
Definition: node.h:370
void refresh()
Display the indicator.
Definition: node.h:657
bits::node_map::ptr get_node_map() const
Get the local node map, mapping node IDs to node pointers for all the nodes reachable from this one...
Definition: node.h:119
void add_push_destination(const node &dest)
Called by implementers to declare a push destination.
Definition: node.h:360
virtual void go()
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: node.h:175
virtual void prepare()
Called before memory assignment but after depending phases have executed and ended.
Definition: node.h:137
void add_pull_destination(const node_token &dest)
Legacy alias of add_pull_source.
Definition: node.h:388
stream_size_type get_steps()
Used internally for progress indication.
Definition: node.h:266
void set_progress_indicator(progress_indicator_base *pi)
Used internally. Set the progress indicator to use.
Definition: node.h:273
virtual void set_available_memory(memory_size_type availableMemory)
Called by the memory manager to set the amount of memory assigned to this node.
Definition: node.h:445
void push(const item_type &item)
For push nodes, push the next item to this node.
node(const node_token &token)
Constructor using a given fresh node_token.
Definition: node.h:335
void add_forwarded_data(std::string key, boost::any value, bool explicitForward=true)
Called by users to add forwarded data to this node and recursively to its successors.
Definition: node.h:501
void set_breadcrumb(const std::string &breadcrumb)
Used internally when a pair_factory has a name set.
Definition: node.h:249
Base class of all nodes.
Definition: node.h:58
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Definition: node.h:352
priority_type get_name_priority()
Get the priority of this node's name.
Definition: node.h:225
memory_size_type get_maximum_memory() const
Get the maximum amount of memory declared by this node.
Definition: node.h:89
progress_indicator_base * proxy_progress_indicator()
Get a non-initialized progress indicator for use with external implementations.
Definition: node.h:604
const node_token & get_token()
Get the node_token that maps this node's ID to a pointer to this.
Definition: node.h:558
a dummy progress indicator that produces no output
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
Definition: node.h:434
progress_indicator_base * get_progress_indicator()
Used internally. Get the progress indicator used.
Definition: node.h:280
Pipeline tokens.
Management of a single pipelining phase.
Definition: graph.h:45
Progress indicator base.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Definition: node.h:241
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:421
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Predeclarations of some pipelining classes.
stream_size_type get_current()
Get the current value of the step counter.
void add_successor(node *succ)
Used internally to facilitate forwarding parameters to successors in the item flow graph...
Definition: node.h:258
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:124
virtual void propagate()
Propagate stream metadata.
Definition: node.h:151
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
Definition: node.h:404
void forward_any(std::string key, boost::any value, bool explicitForward=true)
See node::forward.
Definition: node.h:468
virtual void end()
End pipeline processing phase.
Definition: node.h:205
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:586
void add_pull_source(const node &dest)
Called by implementers to declare a pull source.
Definition: node.h:381
void set_memory_fraction(double f)
Set the memory priority of this node.
Definition: node.h:104
boost::any fetch_any(std::string key)
Fetch piece of auxiliary data as boost::any (the internal representation).
Definition: node.h:521
void add_pull_destination(const node &dest)
Legacy alias of add_pull_source.
Definition: node.h:395
virtual void go(progress_indicator_base &)
Deprecated go()-implementation signature.
Definition: node.h:186
virtual bool can_evacuate()
Overridden by nodes that have data to evacuate.
Definition: node.h:211
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
Definition: node.h:566
double get_memory_fraction() const
Get the memory priority of this node.
Definition: node.h:111
item_type pull()
For pull nodes, pull the next item from this node.
node()
Default constructor, using a new node_token.
Definition: node.h:296
void forward(std::string key, T value, bool explicitForward=true)
Called by implementers to forward auxiliary data to successors.
Definition: node.h:461
bool can_fetch(std::string key)
Find out if there is a piece of auxiliary data forwarded with a given name.
Definition: node.h:513
node_token::id_t get_id() const
Get the internal node ID of this node (mainly for debugging purposes).
Definition: node.h:127
node(const node &other)
Copy constructor.
Definition: node.h:314
logstream & log_error()
Return logstream for writing error log messages.
Definition: tpie_log.h:104
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:114
const std::string & get_name()
Get this node's name.
Definition: node.h:233
T fetch(std::string key)
Fetch piece of auxiliary data, expecting a given value type.
Definition: node.h:535
memory_size_type get_minimum_memory() const
Get the minimum amount of memory declared by this node.
Definition: node.h:81