TPIE

v1.1rc1-6-g0c97303
stream.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2008, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
23 #ifndef _TPIE_AMI_STREAM_H
24 #define _TPIE_AMI_STREAM_H
25 
26 
27 // Include the configuration header.
28 #include <tpie/config.h>
29 
30 // Get definitions for working with Unix and Windows
31 #include <tpie/portability.h>
32 
33 // Get the error codes.
34 #include <tpie/err.h>
35 #include <tpie/persist.h>
36 
37 #include <tpie/tempname.h>
38 #include <tpie/file_stream.h>
39 #include <tpie/file_count.h>
40 
41 #include <tpie/tpie_log.h>
42 
43 #include <tpie/stream_usage.h>
44 
45 #include <tpie/tpie_assert.h>
46 
47 namespace tpie {
48 
49  namespace ami {
50 
52  enum stream_type {
53  READ_STREAM = 1, // Open existing stream for reading
54  WRITE_STREAM, // Open for writing. Create if non-existent
55  APPEND_STREAM, // Open for writing at end. Create if needed.
56  READ_WRITE_STREAM // Open to read and write.
57  };
58 
65  };
66 
67  } // ami namespace
68 
69 } // tpie namespace
70 
71 
72 namespace tpie {
73 
74  namespace ami {
75 
98 template<class T >
99 class stream {
100 
101 public:
102  typedef T item_type;
103 
104  // We have a variety of constructors for different uses.
105 
111  stream();
112 
118  stream(const std::string& path_name,
119  stream_type st = READ_WRITE_STREAM);
120 
141  stream_offset_type sub_begin,
142  stream_offset_type sub_end,
143  stream<T> **sub_stream);
144 
152  inline stream_status status() const {
153  return m_status;
154  }
155 
160  inline bool is_valid() const {
161  return m_status == STREAM_STATUS_VALID;
162  }
163 
168  inline bool operator!() const {
169  return !is_valid();
170  }
171 
179  inline err read_item(T **elt);
180 
186  inline err write_item(const T &elt);
187 
194  err read_array(T *mm_space, stream_offset_type *len);
195 
201  err read_array(T *mm_space, memory_size_type & len);
202 
208  err write_array(const T *mm_space, memory_size_type len);
209 
213  inline stream_offset_type stream_len(void) const {
214  return m_stream.size();
215  }
216 
220  inline std::string name() const;
221 
225  inline err seek(stream_offset_type offset);
226 
231  inline stream_offset_type tell() const {
232  return m_stream.offset();
233  }
234 
244  inline err truncate(stream_offset_type offset);
245 
263  err main_memory_usage(size_type *usage,
264  stream_usage usage_type) const;
265 
266 
267 
271  static memory_size_type memory_usage(memory_size_type count);
272 
273  // This method used to return a statistics object, but this is no longer
274  // supported.
275  //struct stats_stream & stats() const {
276  // tp_assert(0, "stream::stats() is no longer supported");
277  // return *((stats_stream *)0);
278  //}
279 
280  // This method used to return a statistics object, but this is no longer
281  // supported.
282  //static stats_stream & gstats() {
283  // tp_assert(0, "stream::stats() is no longer supported");
284  // return *((stats_stream *)0);
285  //}
286 
293  size_t available_streams(void) {
294  return available_files();
295  }
296 
301  memory_size_type chunk_size(void) const {
302  return file_base::block_size(1.0) / sizeof(T);
303  }
304 
309  void persist(persistence p) {
310  m_temp.set_persistent(p == PERSIST_PERSISTENT);
311  }
312 
317  persistence persist() const {
318  return m_temp.is_persistent() ? PERSIST_PERSISTENT : PERSIST_DELETE;
319  }
320 
323  // This function gives easy access to the stream's file name and its length.
325  std::string& sprint();
326 
331  return m_stream;
332  }
333 
334 private:
335 
337  stream(const stream<T>& other);
339  stream<T>& operator=(const stream<T>& other);
340 
341  temp_file m_temp;
342  file_stream<T> m_stream;
343 
346  //bool m_destructBTEStream;
347  stream_status m_status;
348 
349  static inline float block_factor() {
350 #ifndef STREAM_UFS_BLOCK_FACTOR
351  return 1.0;
352 #else
353 # ifdef WIN32
354  return static_cast<float>(STREAM_UFS_BLOCK_FACTOR)/32;
355 # else
356  return static_cast<float>(STREAM_UFS_BLOCK_FACTOR)/512;
357 # endif
358 #endif
359  }
360 };
361 
362 // Create a temporary AMI stream on one of the devices in the default
363 // device description. Persistence is PERSIST_DELETE by default. We
364 // are given the index of the string describing the desired device.
365  template<class T>
366  stream<T>::stream(): m_stream(block_factor()), m_status(STREAM_STATUS_INVALID)
367  {
368  TP_LOG_DEBUG_ID("Temporary stream in file: ");
369  TP_LOG_DEBUG_ID( m_temp.path() );
370  try {
371  m_stream.open( m_temp );
372  } catch(const stream_exception &e) {
373  TP_LOG_FATAL_ID("Open failed: " << e.what());
374  return;
375  }
376  // Set status to VALID.
377  m_status = STREAM_STATUS_VALID;
378  };
379 
380 
381 // A stream created with this constructor will persist on disk at the
382 // location specified by the path name.
383  template<class T>
384  stream<T>::stream(const std::string& path_name, stream_type st) :
385  m_temp(path_name, true), m_stream(block_factor()), m_status(STREAM_STATUS_INVALID) {
386  try {
387  m_stream.open(m_temp, st==READ_STREAM ? access_read: access_read_write);
388  if (st == APPEND_STREAM) m_stream.seek(0, file_stream_base::end);
389  } catch(const stream_exception &e) {
390  TP_LOG_FATAL_ID("Open failed: " << e.what());
391  return;
392  }
393  m_status = STREAM_STATUS_VALID;
394  };
395 
396 
397  // *stream::new_substream* //
398  template<class T>
400  stream_offset_type sub_begin,
401  stream_offset_type sub_end,
402  stream<T> **sub_stream)
403  {
404  unused(st);
405  unused(sub_begin);
406  unused(sub_end);
407  unused(sub_stream);
408  return BTE_ERROR;
409  }
410 
411 
412  template<class T>
413  inline std::string stream<T>::name() const {
414  return m_stream.path();
415  }
416 
417 // Move to a specific offset.
418  template<class T>
419  inline err stream<T>::seek(stream_offset_type offset) {
420  try {
421  m_stream.seek(offset);
422  } catch(const stream_exception &e) {
423  TP_LOG_WARNING_ID("BTE error - seek failed: " << e.what());
424  return BTE_ERROR;
425  }
426  return NO_ERROR;
427  }
428 
429 // Truncate
430  template<class T>
431  inline err stream<T>::truncate(stream_offset_type offset) {
432  try {
433  m_stream.truncate(offset);
434  } catch(const stream_exception & e) {
435  TP_LOG_WARNING_ID("BTE error - truncate failed: " << e.what());
436  return BTE_ERROR;
437  }
438  return NO_ERROR;
439  }
440 
441 template<class T>
442 memory_size_type stream<T>::memory_usage(memory_size_type count) {
443  return count*(file_stream<T>::memory_usage(block_factor()) + sizeof(stream<T>));
444 }
445 
446 
447 // Query memory usage
448  template<class T>
449  err stream<T>::main_memory_usage(memory_size_type *usage,
450  stream_usage usage_type) const {
451 
452  switch (usage_type) {
454  *usage = sizeof(*this) + file_stream<T>::memory_usage(0.0);
455  return NO_ERROR;
459  *usage = memory_usage(1);
460  return NO_ERROR;
461  case STREAM_USAGE_BUFFER:
462  *usage = file_stream<T>::memory_usage(block_factor()) - file_stream<T>::memory_usage(0.0);
463  return NO_ERROR;
464  }
465  return BTE_ERROR;
466  }
467 
468  template<class T>
469  inline err stream<T>::read_item(T **elt) {
470  if (!m_stream.can_read())
471  return END_OF_STREAM;
472 
473  *elt = &(const_cast<T &>(m_stream.read()));
474  return NO_ERROR;
475  }
476 
477  template<class T>
478  inline err stream<T>::write_item(const T &elt) {
479  m_stream.write(elt);
480  return NO_ERROR;
481  }
482 
483  template<class T>
484  err stream<T>::read_array(T *mm_space, stream_offset_type *len) {
485  size_type l=(size_t)*len;
486  err e = read_array(mm_space, l);
487  *len = l;
488  return e;
489  }
490 
491  template<class T>
492  err stream<T>::read_array(T *mm_space, memory_size_type & len) {
493  size_type l = static_cast<size_type>(std::min(
494  static_cast<stream_size_type>(len),
495  static_cast<stream_size_type>(m_stream.size() - m_stream.offset())));
496  m_stream.read(mm_space, mm_space+l);
497  return (l == len)?NO_ERROR:END_OF_STREAM;
498  }
499 
500  template<class T>
501  err stream<T>::write_array(const T *mm_space, size_type len) {
502  m_stream.write(mm_space, mm_space+len);
503  return NO_ERROR;
504  }
505 
506  template<class T>
507  std::string& stream<T>::sprint() {
508  static std::string buf;
509  std::stringstream ss;
510  ss << "STREAM " << name() << " " << static_cast<long>(stream_len());
511  ss >> buf;
512  return buf;
513  }
514 
515  } // ami namespace
516 
517 } // tpie namespace
518 
519 #endif // _TPIE_AMI_STREAM_H
Defines the tp_assert macro.
err read_array(T *mm_space, stream_offset_type *len)
Reads *len items from the current position of the stream into the array mm_array. ...
Definition: stream.h:484
err write_item(const T &elt)
Writes elt to the stream in the current position.
Definition: stream.h:478
Stream is valid.
Definition: stream.h:62
static memory_size_type memory_usage(memory_size_type count)
Returns the number of bytes that count streams will maximaly consume.
Definition: stream.h:442
Amount currently in use.
Definition: stream_usage.h:34
Maximum additional amount used by each substream created.
Definition: stream_usage.h:38
bool operator!() const
Returns true if the block's status is not BLOCK_STATUS_VALID.
Definition: stream.h:168
PERSIST_DELETE
Delete the stream from the disk when it is destructed.
Definition: persist.h:39
Max amount that will ever be used.
Definition: stream_usage.h:36
std::string name() const
Returns the path name of this stream in newly allocated space.
Definition: stream.h:413
stream_offset_type tell() const
Returns the current position in the stream measured of items from the beginning of the stream...
Definition: stream.h:231
persistence persist() const
Set the stram's persistence flag to PERSIST_PERSISTENT, thereby ensuring it is not deleted when destr...
Definition: stream.h:317
void unused(const T &x)
Declare that a variable is unused on purpose.
Definition: util.h:42
err new_substream(stream_type st, stream_offset_type sub_begin, stream_offset_type sub_end, stream< T > **sub_stream)
A substream is a TPIE stream that is part of another TPIE stream.
Definition: stream.h:399
No error occurred.
Definition: err.h:47
This file contains a few deprecated definitions for legacy code.
Open a file for reading.
Definition: access_type.h:31
Count the number of open files.
Overhead of the object without the buffer.
Definition: stream_usage.h:30
std::string & sprint()
Return a string describing the stream.
Definition: stream.h:507
Stream is invalid.
Definition: stream.h:64
void set_persistent(bool p)
Set persistence.
Definition: tempname.h:168
Logging functionality and log_level codes for different priorities of log messages.
err read_item(T **elt)
Reads the current item from the stream and advance the "current item" pointer to the next item...
Definition: stream.h:469
stream()
A new stream of type READ_WRITE_STREAM is constructed on the given device as a file with a randomly g...
Definition: stream.h:366
file_stream< T > & underlying_stream()
Get the underlying file_stream
Definition: stream.h:330
err truncate(stream_offset_type offset)
Resize the stream to off items.
Definition: stream.h:431
void persist(persistence p)
Set the stream's persistence flag to p, which can have one of two values: PERSIST_DELETE or PERSIST_P...
Definition: stream.h:309
err seek(stream_offset_type offset)
Move the current position to off (measured in terms of items.
Definition: stream.h:419
stream_offset_type stream_len(void) const
Returns the number of items in the stream.
Definition: stream.h:213
stream_status
AMI stream status.
Definition: stream.h:60
stream_usage enum
const std::string & path()
Get the path of the associated file.
Class representing the existence of a temporary file.
Definition: tempname.h:152
size_t available_streams(void)
Returns the number of globally available streams.
Definition: stream.h:293
Legacy AMI error types.
Persistence tags for deprecated TPIE AMI streams.
err main_memory_usage(size_type *usage, stream_usage usage_type) const
This function is used for obtaining the amount of main memory used by an Stream object (in bytes)...
Definition: stream.h:449
stream_type
AMI stream types passed to constructors.
Definition: stream.h:52
A Stream object stores an ordered collection of objects of type T on external memory.
Definition: stream.h:99
memory_size_type block_size() const
Get the size of a block in bytes.
memory_size_type available_files()
Return the additional number of files that can be opened before running out of file descriptors...
PERSIST_PERSISTENT
Do not delete the stream from the disk when it is destructed.
Definition: persist.h:41
static memory_size_type memory_usage(float blockFactor=1.0, bool includeDefaultFileAccessor=true)
Calculate the amount of memory used by a single file_stream.
Definition: file_stream.h:134
bool is_persistent() const
Definition: tempname.h:162
Simple class acting both as file and a file::stream.
Definition: file_stream.h:44
err
Legacy TPIE error codes.
Definition: err.h:45
An error occurred at the BTE level.
Definition: err.h:63
bool is_valid() const
Returns wether the status of the stream is STREAM_STATUS_VALID.
Definition: stream.h:160
Temporary file names.
stream_status status() const
Returns the status of the stream instance; the result is either STREAM_STATUS_VALID or STREAM_STATUS_...
Definition: stream.h:152
memory_size_type chunk_size(void) const
Returns the maximum number of items (of type T) that can be stored in one block.
Definition: stream.h:301
stream_usage
Definition: stream_usage.h:28
Simple class acting both as a tpie::file and a tpie::file::stream.
An attempt was made to read past the end of a stream or write past the end of a substream.
Definition: err.h:52
Max amount ever used by a buffer.
Definition: stream_usage.h:32
Open a file for reading or writing.
Definition: access_type.h:35
err write_array(const T *mm_space, memory_size_type len)
Writes len items from array |mm_array to the stream, starting in the current position.
Definition: stream.h:501