TPIE

v1.1rc1-6-g0c97303
serialization.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
23 
24 #ifndef TPIE_PIPELINING_SERIALIZATION_H
25 #define TPIE_PIPELINING_SERIALIZATION_H
26 
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/factory_helpers.h>
29 #include <tpie/pipelining/pair_factory.h>
31 
32 namespace tpie {
33 
34 namespace pipelining {
35 
36 namespace serialization_bits {
37 
38 template <typename dest_t>
39 class input_t : public node {
40  dest_t dest;
42 
43 public:
44  typedef typename dest_t::item_type item_type;
45 
46  input_t(const dest_t & dest, serialization_reader * rd)
47  : dest(dest)
48  , rd(rd)
49  {
50  set_name("Serialization reader");
52  set_minimum_memory(rd->memory_usage());
53  }
54 
55  virtual void propagate() override {
56  set_steps(rd->size());
57  }
58 
59  virtual void go() override {
60  item_type x;
61  stream_size_type bytesRead = 0;
62  while (rd->can_read()) {
63  rd->unserialize(x);
64  dest.push(x);
65 
66  stream_size_type bytesRead2 = rd->offset();
67  step(bytesRead2 - bytesRead);
68  bytesRead = bytesRead2;
69  }
70  }
71 };
72 
73 typedef factory_1<input_t, serialization_reader *> input_factory;
74 
75 
76 template <typename T>
77 class output_t : public node {
79 
80 public:
81  typedef T item_type;
82 
84  : wr(wr)
85  {
86  set_name("Serialization writer");
87  set_minimum_memory(wr->memory_usage());
88  }
89 
90  void push(const T & x) {
91  wr->serialize(x);
92  }
93 };
94 
95 template <typename T>
98 };
99 
100 } // namespace serialization_bits
101 
103 inline serialization_input(serialization_reader & rd) {
105 }
106 
107 template <typename T>
108 pipe_end<typename serialization_bits::output_factory<T>::type>
109 serialization_output(serialization_writer & wr) {
110  return typename serialization_bits::output_factory<T>::type(&wr);
111 }
112 
113 namespace serialization_bits {
114 
115 template <typename> class rev_input_t;
116 
117 template <typename dest_t>
118 class rev_output_t : public node {
119  friend class rev_input_t<rev_output_t<dest_t> >;
120 
121  dest_t dest;
122  tpie::temp_file * m_stack;
123 
125 
126 public:
127  typedef typename dest_t::item_type item_type;
128 
129  rev_output_t(const dest_t & dest)
130  : dest(dest)
131  , m_stack(0)
132  {
133  this->set_name("Serialization reverse reader");
134  this->add_push_destination(dest);
135  }
136 
137  virtual void propagate() override {
138  if (m_stack == 0)
139  throw tpie::exception("No one created my stack");
140 
141  rd.open(m_stack->path());
142  this->set_steps(rd.size());
143  }
144 
145  virtual void go() override {
146  item_type x;
147  stream_size_type bytesRead = 0;
148  while (rd.can_read()) {
149  rd.unserialize(x);
150  dest.push(x);
151 
152  stream_size_type bytesRead2 = rd.offset();
153  step(bytesRead2 - bytesRead);
154  bytesRead = bytesRead2;
155  }
156  }
157 
158  virtual void end() override {
159  delete m_stack;
160  }
161 };
162 
163 typedef factory_0<rev_output_t> rev_output_factory;
164 
165 template <typename dest_t>
166 class rev_input_t;
167 
168 template <typename output_dest_t>
169 class rev_input_t<rev_output_t<output_dest_t> > : public node {
171  dest_t dest;
172 
174  stream_size_type items;
175 
176 public:
177  typedef typename dest_t::item_type item_type;
178 
179  rev_input_t(const dest_t & dest)
180  : dest(dest)
181  , wr()
182  , items(0)
183  {
184  this->set_name("Serialization reverse writer");
185  this->dest.add_dependency(*this);
186  }
187 
188  virtual void begin() override {
189  dest.m_stack = new tpie::temp_file();
190  wr.open(dest.m_stack->path());
191  }
192 
193  void push(const item_type & x) {
194  wr.serialize(x);
195  ++items;
196  }
197 
198  virtual void end() override {
199  wr.close();
200  this->forward<stream_size_type>("items", items);
201  }
202 };
203 
204 typedef factory_0<rev_input_t> rev_input_factory;
205 
207 
208 } // namespace serialization_bits
209 
210 pipe_middle<serialization_bits::reverse_factory>
211 inline serialization_reverser() {
212  serialization_bits::rev_input_factory i;
213  serialization_bits::rev_output_factory o;
214  return serialization_bits::reverse_factory(i, o);
215 }
216 
217 } // namespace pipelining
218 
219 } // namespace tpie
220 
221 #endif // TPIE_PIPELINING_SERIALIZATION_H
stream_size_type offset()
Number of bytes read, not including the header.
virtual void propagate() override
Propagate stream metadata.
Node factory for 1-argument terminator.
Stream of serializable items.
Base class of all nodes.
Definition: node.h:58
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Definition: node.h:352
stream_size_type size()
Size of file in bytes, not including the header.
void serialize(const T &v)
Serialize a serializable item and write it to the stream.
stream_size_type offset()
Number of bytes read, not including the header.
const std::string & path()
Get the path of the associated file.
void unserialize(T &v)
Unserialize an unserializable item from the stream.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Definition: node.h:241
Class representing the existence of a temporary file.
Definition: tempname.h:152
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:421
virtual void propagate() override
Propagate stream metadata.
Definition: serialization.h:55
Node factory for 1-argument generator.
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
Definition: node.h:404
virtual void end() override
End pipeline processing phase.
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:586
virtual void begin() override
Begin pipeline processing phase.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
Definition: node.h:566
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: serialization.h:59
Node factory for 0-argument generator.