TPIE

v1.1rc1-6-g0c97303
base.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_PARALLEL_BASE_H__
21 #define __TPIE_PIPELINING_PARALLEL_BASE_H__
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_base.h>
25 #include <tpie/array_view.h>
26 #include <boost/shared_ptr.hpp>
28 #include <tpie/pipelining/parallel/options.h>
29 
30 namespace tpie {
31 
32 namespace pipelining {
33 
34 namespace parallel_bits {
35 
36 // predeclare
37 template <typename T>
38 class before;
39 template <typename dest_t>
41 template <typename T>
42 class after;
43 template <typename T1, typename T2>
44 class state;
45 
54 template <typename Input, typename Output>
55 class threads {
56  typedef before<Input> before_t;
57 
58 protected:
59  static const size_t alignment = 64;
60 
63  aligned_array<pi_t, alignment> m_progressIndicators;
64 
71  threads * t;
72  public:
74  : t(t)
75  , index(0)
76  {
77  }
78 
79  virtual void init_node(node & r) override {
80  r.set_progress_indicator(t->m_progressIndicators.get(index));
81  }
82 
83  size_t index;
84  };
85 
86  friend class progress_indicator_hook;
87 
88  std::vector<before_t *> m_dests;
89 
90 public:
91  before_t & operator[](size_t idx) {
92  return *m_dests[idx];
93  }
94 
95  stream_size_type sum_steps() {
96  stream_size_type res = 0;
97  for (size_t i = 0; i < m_progressIndicators.size(); ++i) {
98  res += m_progressIndicators.get(i)->get_current();
99  }
100  return res;
101  }
102 
103  virtual ~threads() {}
104 };
105 
109 template <typename Input, typename Output, typename fact_t>
110 class threads_impl : public threads<Input, Output> {
111 private:
112  typedef threads<Input, Output> p_t;
113 
115  typedef typename p_t::pi_t pi_t;
116 
117  typedef after<Output> after_t;
118  typedef typename fact_t::template constructed<after_t>::type worker_t;
119  typedef typename worker_t::item_type T1;
120  typedef Output T2;
122  static const size_t alignment = p_t::alignment;
124 
126  size_t numJobs;
127 
129  aligned_before_t m_data;
130 
131 public:
132  threads_impl(fact_t fact,
133  state<T1, T2> & st)
134  : numJobs(st.opts.numJobs)
135  {
136  typename p_t::progress_indicator_hook hook(this);
137  fact.hook_initialization(&hook);
138  // uninitialized allocation
139  m_data.realloc(numJobs);
140  this->m_progressIndicators.realloc(numJobs);
141  this->m_dests.resize(numJobs);
142 
143  // construct elements manually
144  for (size_t i = 0; i < numJobs; ++i) {
145  // for debugging: check that pointer is aligned.
146  if (((size_t) m_data.get(i)) % alignment != 0) {
147  log_warning() << "Thread " << i << " is not aligned: Address "
148  << m_data.get(i) << " is off by " <<
149  (((size_t) m_data.get(i)) % alignment) << " bytes"
150  << std::endl;
151  }
152 
153  hook.index = i;
154  new (this->m_progressIndicators.get(i)) pi_t();
155 
156  this->m_dests[i] =
157  new(m_data.get(i))
158  before_t(st, i, fact.construct(after_t(st, i)));
159  }
160  }
161 
162  virtual ~threads_impl() {
163  for (size_t i = 0; i < numJobs; ++i) {
164  m_data.get(i)->~before_t();
165  this->m_progressIndicators.get(i)->~pi_t();
166  }
167  m_data.realloc(0);
168  this->m_progressIndicators.realloc(0);
169  }
170 };
171 
175 class after_base : public node {
176 public:
180  virtual void worker_initialize() = 0;
181 
187  virtual void flush_buffer() = 0;
188 
192  virtual void set_consumer(node *) = 0;
193 };
194 
203 class state_base {
204 public:
205  typedef boost::mutex mutex_t;
206  typedef boost::condition_variable cond_t;
207  typedef boost::unique_lock<boost::mutex> lock_t;
208 
209  const options opts;
210 
212  mutex_t mutex;
213 
220  cond_t producerCond;
221 
231  cond_t * workerCond;
232 
235 
237  void set_input_ptr(size_t idx, node * v) {
238  m_inputs[idx] = v;
239  }
240 
242  void set_output_ptr(size_t idx, after_base * v) {
243  m_outputs[idx] = v;
244  }
245 
253  node & input(size_t idx) { return *m_inputs[idx]; }
254 
265  after_base & output(size_t idx) { return *m_outputs[idx]; }
266 
268  worker_state get_state(size_t idx) {
269  return m_states[idx];
270  }
271 
273  void transition_state(size_t idx, worker_state from, worker_state to) {
274  if (m_states[idx] != from) {
275  std::stringstream ss;
276  ss << idx << " Invalid state transition " << from << " -> " << to << "; current state is " << m_states[idx];
277  log_error() << ss.str() << std::endl;
278  throw exception(ss.str());
279  }
280  m_states[idx] = to;
281  }
282 
283 protected:
284  std::vector<node *> m_inputs;
285  std::vector<after_base *> m_outputs;
286  std::vector<worker_state> m_states;
287 
288  state_base(const options opts)
289  : opts(opts)
290  , runningWorkers(0)
291  , m_inputs(opts.numJobs, 0)
292  , m_outputs(opts.numJobs, 0)
293  , m_states(opts.numJobs, INITIALIZING)
294  {
295  workerCond = new cond_t[opts.numJobs];
296  }
297 
298  virtual ~state_base() {
299  delete[] workerCond;
300  }
301 };
302 
306 template <typename T>
308  memory_size_type m_inputSize;
309  array<T> m_inputBuffer;
310 
311 public:
312  array_view<T> get_input() {
313  return array_view<T>(&m_inputBuffer[0], m_inputSize);
314  }
315 
316  void set_input(array_view<T> input) {
317  if (input.size() > m_inputBuffer.size())
318  throw tpie::exception(m_inputBuffer.size() ? "Input too large" : "Input buffer not initialized");
319 
320  memory_size_type items =
321  std::copy(input.begin(), input.end(), m_inputBuffer.begin())
322  -m_inputBuffer.begin();
323 
324  m_inputSize = items;
325  }
326 
327  parallel_input_buffer(const options & opts)
328  : m_inputSize(0)
329  , m_inputBuffer(opts.bufSize)
330  {
331  }
332 };
333 
337 template <typename T>
339  memory_size_type m_outputSize;
340  array<T> m_outputBuffer;
341  friend class after<T>;
342 
343 public:
344  array_view<T> get_output() {
345  return array_view<T>(&m_outputBuffer[0], m_outputSize);
346  }
347 
348  parallel_output_buffer(const options & opts)
349  : m_outputSize(0)
350  , m_outputBuffer(opts.bufSize)
351  {
352  }
353 };
354 
362 template <typename T>
363 class consumer : public node {
364 public:
365  typedef T item_type;
366 
367  virtual void consume(array_view<T>) = 0;
368  // node has virtual dtor
369 };
370 
375 template <typename T1, typename T2>
376 class state : public state_base {
377 public:
378  typedef boost::shared_ptr<state> ptr;
379  typedef state_base::mutex_t mutex_t;
380  typedef state_base::cond_t cond_t;
381  typedef state_base::lock_t lock_t;
382 
383  array<parallel_input_buffer<T1> *> m_inputBuffers;
384  array<parallel_output_buffer<T2> *> m_outputBuffers;
385 
386  consumer<T2> * m_cons;
387 
388  std::auto_ptr<threads<T1, T2> > pipes;
389 
390  template <typename fact_t>
391  state(const options opts, const fact_t & fact)
392  : state_base(opts)
393  , m_inputBuffers(opts.numJobs)
394  , m_outputBuffers(opts.numJobs)
395  , m_cons(0)
396  {
397  typedef threads_impl<T1, T2, fact_t> pipes_impl_t;
398  pipes.reset(new pipes_impl_t(fact, *this));
399  }
400 
401  void set_consumer_ptr(consumer<T2> * cons) {
402  m_cons = cons;
403  }
404 
405  consumer<T2> * const * get_consumer_ptr_ptr() const {
406  return &m_cons;
407  }
408 };
409 
413 template <typename T>
414 class after : public after_base {
415 protected:
416  state_base & st;
417  size_t parId;
418  std::auto_ptr<parallel_output_buffer<T> > m_buffer;
419  array<parallel_output_buffer<T> *> & m_outputBuffers;
420  typedef state_base::lock_t lock_t;
421  consumer<T> * const * m_cons;
422 
423 public:
424  typedef T item_type;
425 
426  template <typename Input>
427  after(state<Input, T> & state,
428  size_t parId)
429  : st(state)
430  , parId(parId)
431  , m_outputBuffers(state.m_outputBuffers)
432  , m_cons(state.get_consumer_ptr_ptr())
433  {
434  state.set_output_ptr(parId, this);
435  set_name("Parallel after", PRIORITY_INSIGNIFICANT);
436  if (m_cons == 0) throw tpie::exception("Unexpected nullptr");
437  if (*m_cons != 0) throw tpie::exception("Expected nullptr");
438  }
439 
440  virtual void set_consumer(node * cons) override {
441  this->add_push_destination(*cons);
442  }
443 
444  after(const after & other)
445  : after_base(other)
446  , st(other.st)
447  , parId(other.parId)
448  , m_outputBuffers(other.m_outputBuffers)
449  , m_cons(other.m_cons)
450  {
451  st.set_output_ptr(parId, this);
452  if (m_cons == 0) throw tpie::exception("Unexpected nullptr in copy");
453  if (*m_cons != 0) throw tpie::exception("Expected nullptr in copy");
454  }
455 
459  void push(const T & item) {
460  if (m_buffer->m_outputSize >= m_buffer->m_outputBuffer.size())
461  flush_buffer_impl(false);
462 
463  m_buffer->m_outputBuffer[m_buffer->m_outputSize++] = item;
464  }
465 
466  virtual void end() override {
467  flush_buffer_impl(true);
468  }
469 
473  virtual void worker_initialize() {
474  m_buffer.reset(new parallel_output_buffer<T>(st.opts));
475  m_outputBuffers[parId] = m_buffer.get();
476  }
477 
482  virtual void flush_buffer() override {
483  flush_buffer_impl(true);
484  }
485 
486 private:
487  bool is_done() const {
488  switch (st.get_state(parId)) {
489  case INITIALIZING:
490  throw tpie::exception("INITIALIZING not expected in after::is_done");
491  case IDLE:
492  return true;
493  case PROCESSING:
494  // The main thread may transition us from Outputting to Idle to
495  // Processing without us noticing, or it may transition us from
496  // Partial_Output to Processing. In either case, we are done
497  // flushing the buffer.
498  return true;
499  case PARTIAL_OUTPUT:
500  case OUTPUTTING:
501  return false;
502  case DONE:
503  return true;
504  }
505  throw tpie::exception("Unknown state");
506  }
507 
523  void flush_buffer_impl(bool complete) {
524  // At this point, we could check if the output buffer is empty and
525  // short-circuit when it is without acquiring the lock; however, we
526  // must do a full PROCESSING -> OUTPUTTING -> IDLE transition in this
527  // case to let the main thread know that we are done processing the
528  // input.
529 
530  lock_t lock(st.mutex);
531  if (st.get_state(parId) == DONE) {
532  if (*m_cons == 0) throw tpie::exception("Unexpected nullptr in flush_buffer");
533  array_view<T> out = m_buffer->get_output();
534  (*m_cons)->consume(out);
535  } else {
536  st.transition_state(parId, PROCESSING, complete ? OUTPUTTING : PARTIAL_OUTPUT);
537  // notify producer that output is ready
538  st.producerCond.notify_one();
539  while (!is_done()) {
540  st.workerCond[parId].wait(lock);
541  }
542  }
543  m_buffer->m_outputSize = 0;
544  }
545 };
546 
552 template <typename T>
553 class before : public node {
554 protected:
555  state_base & st;
556  size_t parId;
557  std::auto_ptr<parallel_input_buffer<T> > m_buffer;
558  array<parallel_input_buffer<T> *> & m_inputBuffers;
559  boost::thread m_worker;
560 
564  virtual void push_all(array_view<T> items) = 0;
565 
566  template <typename Output>
567  before(state<T, Output> & st, size_t parId)
568  : st(st)
569  , parId(parId)
570  , m_inputBuffers(st.m_inputBuffers)
571  {
572  set_name("Parallel before", PRIORITY_INSIGNIFICANT);
573  }
574  // virtual dtor in node
575 
576  before(const before & other)
577  : st(other.st)
578  , parId(other.parId)
579  , m_inputBuffers(other.m_inputBuffers)
580  {
581  }
582 
583 public:
584  typedef T item_type;
585 
586  virtual void begin() override {
587  node::begin();
588  boost::thread t(run_worker, this);
589  m_worker.swap(t);
590  }
591 
592 private:
596  bool ready() {
597  switch (st.get_state(parId)) {
598  case INITIALIZING:
599  throw tpie::exception("INITIALIZING not expected in before::ready");
600  case IDLE:
601  return false;
602  case PROCESSING:
603  return true;
604  case PARTIAL_OUTPUT:
605  throw tpie::exception("State 'partial_output' was not expected in before::ready");
606  case OUTPUTTING:
607  throw tpie::exception("State 'outputting' was not expected in before::ready");
608  case DONE:
609  return false;
610  }
611  throw tpie::exception("Unknown state");
612  }
613 
617  class running_signal {
618  typedef state_base::cond_t cond_t;
619  memory_size_type & sig;
620  cond_t & producerCond;
621  public:
622  running_signal(memory_size_type & sig, cond_t & producerCond)
623  : sig(sig)
624  , producerCond(producerCond)
625  {
626  ++sig;
627  producerCond.notify_one();
628  }
629 
630  ~running_signal() {
631  --sig;
632  producerCond.notify_one();
633  }
634  };
635 
636  static void run_worker(before * self) {
637  self->worker();
638  }
639 
643  void worker() {
644  state_base::lock_t lock(st.mutex);
645 
646  m_buffer.reset(new parallel_input_buffer<T>(st.opts));
647  m_inputBuffers[parId] = m_buffer.get();
648 
649  // virtual invocation
650  st.output(parId).worker_initialize();
651 
652  st.transition_state(parId, INITIALIZING, IDLE);
653  running_signal _(st.runningWorkers, st.producerCond);
654  while (true) {
655  // wait for transition IDLE -> PROCESSING
656  while (!ready()) {
657  if (st.get_state(parId) == DONE) {
658  return;
659  }
660  st.workerCond[parId].wait(lock);
661  }
662  lock.unlock();
663 
664  // virtual invocation
665  push_all(m_buffer->get_input());
666 
667  lock.lock();
668  }
669  }
670 };
671 
675 template <typename dest_t>
676 class before_impl : public before<typename dest_t::item_type> {
677  typedef typename dest_t::item_type item_type;
678 
679  dest_t dest;
680 
681 public:
682  template <typename Output>
683  before_impl(state<item_type, Output> & st,
684  size_t parId,
685  dest_t dest)
686  : before<item_type>(st, parId)
687  , dest(dest)
688  {
689  this->add_push_destination(dest);
690  st.set_input_ptr(parId, this);
691  }
692 
699  virtual void push_all(array_view<item_type> items) {
700  for (size_t i = 0; i < items.size(); ++i) {
701  dest.push(items[i]);
702  }
703 
704  // virtual invocation
705  this->st.output(this->parId).flush_buffer();
706  }
707 };
708 
712 template <typename Input, typename Output, typename dest_t>
713 class consumer_impl : public consumer<typename dest_t::item_type> {
715  typedef typename state_t::ptr stateptr;
716  dest_t dest;
717  stateptr st;
718 public:
719  typedef typename dest_t::item_type item_type;
720 
721  consumer_impl(const dest_t & dest, stateptr st)
722  : dest(dest)
723  , st(st)
724  {
725  this->add_push_destination(dest);
726  this->set_name("Parallel output", PRIORITY_INSIGNIFICANT);
727  for (size_t i = 0; i < st->opts.numJobs; ++i) {
728  st->output(i).set_consumer(this);
729  }
730  }
731 
735  virtual void consume(array_view<item_type> a) override {
736  for (size_t i = 0; i < a.size(); ++i) {
737  dest.push(a[i]);
738  }
739  }
740 };
741 
747 template <typename T1, typename T2>
748 class producer : public node {
749 public:
750  typedef T1 item_type;
751 
752 private:
753  typedef state<T1, T2> state_t;
754  typedef typename state_t::ptr stateptr;
755  stateptr st;
756  array<T1> inputBuffer;
757  size_t written;
758  size_t readyIdx;
759  boost::shared_ptr<consumer<T2> > cons;
760  internal_queue<memory_size_type> m_outputOrder;
761  stream_size_type m_steps;
762 
770  bool has_ready_pipe() {
771  for (size_t i = 0; i < st->opts.numJobs; ++i) {
772  switch (st->get_state(i)) {
773  case INITIALIZING:
774  case PROCESSING:
775  break;
776  case PARTIAL_OUTPUT:
777  case OUTPUTTING:
778  // If we have to maintain order of items, the only
779  // outputting worker we consider to be waiting is the
780  // "front worker".
781  if (st->opts.maintainOrder && m_outputOrder.front() != i)
782  break;
783  // fallthrough
784  case IDLE:
785  readyIdx = i;
786  return true;
787  case DONE:
788  throw tpie::exception("State DONE not expected in has_ready_pipe().");
789  }
790  }
791  return false;
792  }
793 
804  bool has_outputting_pipe() {
805  for (size_t i = 0; i < st->opts.numJobs; ++i) {
806  switch (st->get_state(i)) {
807  case INITIALIZING:
808  case IDLE:
809  case PROCESSING:
810  break;
811  case PARTIAL_OUTPUT:
812  case OUTPUTTING:
813  if (st->opts.maintainOrder && m_outputOrder.front() != i)
814  break;
815  readyIdx = i;
816  return true;
817  case DONE:
818  throw tpie::exception("State DONE not expected in has_outputting_pipe().");
819  }
820  }
821  return false;
822  }
823 
834  bool has_processing_pipe() {
835  for (size_t i = 0; i < st->opts.numJobs; ++i) {
836  switch (st->get_state(i)) {
837  case INITIALIZING:
838  case IDLE:
839  case PARTIAL_OUTPUT:
840  case OUTPUTTING:
841  break;
842  case PROCESSING:
843  return true;
844  case DONE:
845  throw tpie::exception("State DONE not expected in has_processing_pipe().");
846  }
847  }
848  return false;
849  }
850 
854  void flush_steps() {
855  // The number of items has been forwarded along unchanged to all
856  // the workers (it is still a valid upper bound).
857  //
858  // This means the workers each expect to handle all the items,
859  // which means the number of steps reported in total is scaled up
860  // by the number of workers.
861  //
862  // Therefore, we similarly scale up the number of times we call step.
863  // In effect, every time step() is called once in a single worker,
864  // we process this as if all workers called step().
865 
866  stream_size_type steps = st->pipes->sum_steps();
867  if (steps != m_steps) {
868  this->get_progress_indicator()->step(st->opts.numJobs*(steps - m_steps));
869  m_steps = steps;
870  }
871  }
872 
873 public:
874  template <typename consumer_t>
875  producer(stateptr st, const consumer_t & cons)
876  : st(st)
877  , written(0)
878  , cons(new consumer_t(cons))
879  , m_steps(0)
880  {
881  for (size_t i = 0; i < st->opts.numJobs; ++i) {
882  this->add_push_destination(st->input(i));
883  }
884  this->set_name("Parallel input", PRIORITY_INSIGNIFICANT);
885  memory_size_type usage =
886  st->opts.numJobs * st->opts.bufSize * (sizeof(T1) + sizeof(T2)) // workers
887  + st->opts.bufSize * sizeof(item_type) // our buffer
888  ;
889  this->set_minimum_memory(usage);
890 
891  if (st->opts.maintainOrder) {
892  m_outputOrder.resize(st->opts.numJobs);
893  }
894  }
895 
896  virtual void begin() override {
897  inputBuffer.resize(st->opts.bufSize);
898 
899  state_base::lock_t lock(st->mutex);
900  while (st->runningWorkers != st->opts.numJobs) {
901  st->producerCond.wait(lock);
902  }
903  }
904 
914  void push(item_type item) {
915  inputBuffer[written++] = item;
916  if (written < st->opts.bufSize) {
917  // Wait for more items before doing anything expensive such as
918  // locking.
919  return;
920  }
921  state_base::lock_t lock(st->mutex);
922 
923  flush_steps();
924 
925  empty_input_buffer(lock);
926  }
927 
928 private:
929  void empty_input_buffer(state_base::lock_t & lock) {
930  while (written > 0) {
931  while (!has_ready_pipe()) {
932  st->producerCond.wait(lock);
933  }
934  switch (st->get_state(readyIdx)) {
935  case INITIALIZING:
936  throw tpie::exception("State 'INITIALIZING' not expected at this point");
937  case IDLE:
938  {
939  // Send buffer to ready worker
940  item_type * first = &inputBuffer[0];
941  item_type * last = first + written;
942  parallel_input_buffer<T1> & dest = *st->m_inputBuffers[readyIdx];
943  dest.set_input(array_view<T1>(first, last));
944  st->transition_state(readyIdx, IDLE, PROCESSING);
945  st->workerCond[readyIdx].notify_one();
946  written = 0;
947  if (st->opts.maintainOrder)
948  m_outputOrder.push(readyIdx);
949  break;
950  }
951  case PROCESSING:
952  throw tpie::exception("State 'processing' not expected at this point");
953  case PARTIAL_OUTPUT:
954  // Receive buffer (virtual invocation)
955  cons->consume(st->m_outputBuffers[readyIdx]->get_output());
956  st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
957  st->workerCond[readyIdx].notify_one();
958  break;
959  case OUTPUTTING:
960  // Receive buffer (virtual invocation)
961  cons->consume(st->m_outputBuffers[readyIdx]->get_output());
962 
963  st->transition_state(readyIdx, OUTPUTTING, IDLE);
964  st->workerCond[readyIdx].notify_one();
965  if (st->opts.maintainOrder) {
966  if (m_outputOrder.front() != readyIdx) {
967  log_error() << "Producer: Expected " << readyIdx << " in front; got "
968  << m_outputOrder.front() << std::endl;
969  throw tpie::exception("Producer got wrong entry from has_ready_pipe");
970  }
971  m_outputOrder.pop();
972  }
973  break;
974  case DONE:
975  throw tpie::exception("State 'DONE' not expected at this point");
976  }
977  }
978  }
979 
980 public:
981  virtual void end() override {
982  state_base::lock_t lock(st->mutex);
983 
984  flush_steps();
985 
986  empty_input_buffer(lock);
987 
988  inputBuffer.resize(0);
989 
990  st->set_consumer_ptr(cons.get());
991 
992  bool done = false;
993  while (!done) {
994  while (!has_outputting_pipe()) {
995  if (!has_processing_pipe()) {
996  done = true;
997  break;
998  }
999  // All items pushed; wait for processors to complete
1000  st->producerCond.wait(lock);
1001  }
1002  if (done) break;
1003 
1004  // virtual invocation
1005  cons->consume(st->m_outputBuffers[readyIdx]->get_output());
1006 
1007  if (st->get_state(readyIdx) == PARTIAL_OUTPUT) {
1008  st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
1009  st->workerCond[readyIdx].notify_one();
1010  continue;
1011  }
1012  st->transition_state(readyIdx, OUTPUTTING, IDLE);
1013  if (st->opts.maintainOrder) {
1014  if (m_outputOrder.front() != readyIdx) {
1015  log_error() << "Producer: Expected " << readyIdx << " in front; got "
1016  << m_outputOrder.front() << std::endl;
1017  throw tpie::exception("Producer got wrong entry from has_ready_pipe");
1018  }
1019  m_outputOrder.pop();
1020  }
1021  }
1022  // Notify all workers that all processing is done
1023  for (size_t i = 0; i < st->opts.numJobs; ++i) {
1024  st->transition_state(i, IDLE, DONE);
1025  st->workerCond[i].notify_one();
1026  }
1027  while (st->runningWorkers > 0) {
1028  st->producerCond.wait(lock);
1029  }
1030  // All workers terminated
1031 
1032  flush_steps();
1033  }
1034 };
1035 
1036 } // namespace parallel_bits
1037 
1038 } // namespace pipelining
1039 
1040 } // namespace tpie
1041 
1042 #endif
after_base & output(size_t idx)
Get the specified after instance.
Definition: base.h:265
void set_input_ptr(size_t idx, node *v)
Must not be used concurrently.
Definition: base.h:237
Encapsulation of two pointers from any random access container.
virtual void begin()
Begin pipeline processing phase.
Definition: node.h:167
virtual void flush_buffer()=0
Called by before::worker after a batch of items has been pushed.
virtual void worker_initialize()=0
Called by before::worker to initialize buffers.
virtual void set_consumer(node *)=0
For internal use in order to construct the pipeline graph.
virtual void flush_buffer() override
Invoked by before::push_all when all input items have been pushed.
Definition: base.h:482
void set_progress_indicator(progress_indicator_base *pi)
Used internally. Set the progress indicator to use.
Definition: node.h:273
Accepts input items from the main thread and sends them down the pipeline.
Definition: base.h:38
Factory hook that sets the progress indicator of the nodes run in parallel to the null progress indic...
Definition: base.h:70
cond_t producerCond
Condition variable.
Definition: base.h:220
Class containing an array of node instances.
Definition: base.h:55
void set_output_ptr(size_t idx, after_base *v)
Must not be used concurrently.
Definition: base.h:242
progress_indicator_null pi_t
Progress indicator type.
Definition: base.h:62
Base class of all nodes.
Definition: node.h:58
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Definition: node.h:352
Accepts output items and sends them to the main thread.
Definition: base.h:42
cond_t * workerCond
Condition variable, one per worker.
Definition: base.h:231
void push(T val)
Add an element to the front of the queue.
a dummy progress indicator that produces no output
Aligned, uninitialized storage.
Definition: aligned_array.h:41
void push(item_type item)
Accumulate input buffer and send off to workers.
Definition: base.h:914
virtual void push_all(array_view< T > items)=0
Overridden in subclass to push a buffer of items.
progress_indicator_base * get_progress_indicator()
Used internally. Get the progress indicator used.
Definition: node.h:280
size_t runningWorkers
Shared state, must have mutex to write.
Definition: base.h:234
Encapsulation of two pointers from any random access container.
Definition: array_view.h:47
void pop()
Remove an element from the back of the queue.
T front()
Return the item that has been in the queue for the longest time.
Common state in parallel pipelining library.
Definition: base.h:203
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Definition: node.h:241
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:421
virtual void push_all(array_view< item_type > items)
Push all items from buffer and flush output buffer afterwards.
Definition: base.h:699
virtual void set_consumer(node *cons) override
For internal use in order to construct the pipeline graph.
Definition: base.h:440
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Subclass of threads instantiating and managing the pipelines.
Definition: base.h:110
Node running in main thread, accepting an output buffer from the managing producer and forwards them ...
Definition: base.h:363
Non-templated virtual base class of after.
Definition: base.h:175
size_t size() const
Get number of elements in the array.
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:431
iterator begin() const
Return an iterator to the beginning of the array.
virtual void begin() override
Begin pipeline processing phase.
Definition: base.h:586
void push(const T &item)
Push to thread-local buffer; flush it when full.
Definition: base.h:459
virtual void end() override
End pipeline processing phase.
Definition: base.h:466
User-supplied options to the parallelism framework.
Definition: options.h:32
State subclass containing the item type specific state, i.e.
Definition: base.h:44
iterator begin()
Return an iterator to the beginning of the array.
Definition: array.h:274
size_type size() const
Return the size of the array.
Definition: array.h:472
worker_state get_state(size_t idx)
Shared state, must have mutex to use.
Definition: base.h:268
Producer, running in main thread, managing the parallel execution.
Definition: base.h:748
virtual void end() override
End pipeline processing phase.
Definition: base.h:981
void transition_state(size_t idx, worker_state from, worker_state to)
Shared state, must have mutex to use.
Definition: base.h:273
virtual void consume(array_view< item_type > a) override
Push all items from output buffer to the rest of the pipeline.
Definition: base.h:735
iterator end() const
Return an iterator to the end of the array.
Whether to maintain order in parallel or not.
logstream & log_error()
Return logstream for writing error log messages.
Definition: tpie_log.h:104
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:114
virtual void begin() override
Begin pipeline processing phase.
Definition: base.h:896
void resize(size_t size=0)
Resize the queue; all data is lost.
Concrete consumer implementation.
Definition: base.h:713
virtual void worker_initialize()
Invoked by before::worker (in worker thread context).
Definition: base.h:473
node & input(size_t idx)
Get the specified before instance.
Definition: base.h:253