TPIE

v1.1rc1-6-g0c97303
merge.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2008, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef _TPIE_AMI_MERGE_H
21 #define _TPIE_AMI_MERGE_H
22 
27 
28 
29 // Get definitions for working with Unix and Windows
30 #include <tpie/portability.h>
31 
32 // For log() and such as needed to compute tree heights.
33 #include <cmath>
34 
35 #include <tpie/stream.h>
36 
37 #include <tpie/memory.h>
38 #include <tpie/tpie_assert.h>
39 
40 namespace tpie {
41 
42  namespace ami {
43 
46  MERGE_OUTPUT_OVERWRITE = 1,
47  MERGE_OUTPUT_APPEND
48  };
49 
50  } // ami namespace
51 
52 } // tpie namespace
53 
54 namespace tpie {
55 
57  typedef int merge_flag;
59  typedef TPIE_OS_SIZE_T arity_t;
60 
61  namespace ami {
62 
63  using tpie::merge_flag;
64  using tpie::arity_t;
65 
66 #define CONST const
67 
68 
69  template<class T> class merge_base;
70 
71 
79  template<class T, class M>
80  err merge(stream<T> **instreams, arity_t arity,
81  stream<T> *outstream, M *m_obj);
82 
83 
101 
102 
130  template<class T, class M>
132  stream<T> *outstream, M *m_obj);
133 
138  template<class T, class M>
139  err single_merge(stream<T> **instreams, arity_t arity,
140  stream<T> *outstream, M *m_obj);
141 
142 
148  template<class T, class M>
149  err main_mem_merge(stream<T> *instream,
150  stream<T> *outstream, M *m_obj);
151 
152 
153 
154 
155 
156 
157 
228  template<class T>
229  class merge_base {
230  public:
231 
232 #if VIRTUAL_BASE
233  virtual err initialize(arity_t arity,
234  CONST T * CONST * in,
235  merge_flag *taken_flags,
236  int &taken_index) = 0;
237  virtual err operate(CONST T * CONST *in,
238  merge_flag *taken_flags,
239  int &taken_index,
240  T *out) = 0;
241  virtual err main_mem_operate(T* mm_stream, TPIE_OS_SIZE_T len) = 0;
242  virtual TPIE_OS_SIZE_T space_usage_overhead(void) = 0;
243  virtual TPIE_OS_SIZE_T space_usage_per_stream(void) = 0;
244 #endif // VIRTUAL_BASE
245 
246  };
247 
248 
249 
250 
251  template<class T, class M>
252  err
253  merge(stream<T> **instreams, arity_t arity,
254  stream<T> *outstream, M *m_obj) {
255 
256  TPIE_OS_SIZE_T sz_avail;
257  TPIE_OS_OFFSET sz_stream, sz_needed = 0;
258 
259  // How much main memory is available?
260  sz_avail = consecutive_memory_available ();
261 
262  // Iterate through the streams, finding out how much additional
263  // memory each stream will need in the worst case (the streams are
264  // in memory, but their memory usage could be smaller then the
265  // maximum one; one scenario is when the streams have been loaded
266  // from disk with no subsequent read_item/write_item operation, in
267  // which case their current memory usage is just the header block);
268  // count also the output stream
269  for (unsigned int ii = 0; ii < arity + 1; ii++) {
270  instreams[ii]->main_memory_usage(&sz_stream, STREAM_USAGE_MAXIMUM);
271  sz_needed += sz_stream;
272  instreams[ii]->main_memory_usage(&sz_stream, STREAM_USAGE_CURRENT);
273  sz_needed -= sz_stream;
274  }
275 
276  //count the space used by the merge_management object (include
277  //overhead added to a stream)
278  sz_needed += m_obj->space_usage_overhead() +
279  arity * m_obj->space_usage_per_stream();
280 
281  //streams and m_obj must fit in memory!
282  if (sz_needed >= static_cast<TPIE_OS_OFFSET>(sz_avail)) {
283  TP_LOG_WARNING("Insufficient main memory to perform a merge.\n");
285  }
286  assert(sz_needed < sz_avail);
287 
288  //merge streams in memory
289  return single_merge(instreams, arity, outstream, m_obj);
290  };
291 
292 
293 
294 
295 
296  template<class T, class M>
297  err
298  single_merge(stream<T> **instreams, arity_t arity,
299  stream<T> *outstream, M *m_obj) {
300 
301  TPIE_OS_SIZE_T ii;
302  err ami_err;
303 
304  // Create an array of pointers for the input.
305  T* *in_objects = tpie_new_array<T*>(arity);
306 
307  // Create an array of flags the merge object can use to ask for more
308  // input from specific streams.
309  merge_flag* taken_flags = tpie_new_array<merge_flag>(arity);
310 
311  // An index to speed things up when the merge object takes only from
312  // one index.
313  int taken_index;
314 
315  //Output of the merge object.
316  T merge_out;
317 
318 #if DEBUG_PERFECT_MERGE
319  unsigned int input_count = 0, output_count = 0;
320 #endif
321 
322  // Rewind and read the first item from every stream; count the
323  // number of non-null items read
324  for (ii = arity; ii--; ) {
325  if ((ami_err = instreams[ii]->seek(0)) != NO_ERROR) {
326  tpie_delete_array(in_objects, arity);
327  tpie_delete_array(taken_flags, arity);
328  return ami_err;
329  }
330  if ((ami_err = instreams[ii]->read_item(&(in_objects[ii]))) !=
331  NO_ERROR) {
332  //error on read
333  if (ami_err == END_OF_STREAM) {
334  in_objects[ii] = NULL;
335  } else {
336  tpie_delete_array(in_objects, arity);
337  tpie_delete_array(taken_flags, arity);
338  return ami_err;
339  }
340  // Set the taken flags to 0 before we call intialize()
341  taken_flags[ii] = 0;
342  } else {
343  //item read succesfully
344 #if DEBUG_PERFECT_MERGE
345  input_count++;
346 #endif
347  }
348  }
349 
350  // Initialize the merge object.
351  if (((ami_err = m_obj->initialize(arity, in_objects, taken_flags,
352  taken_index)) != NO_ERROR) &&
353  (ami_err != MERGE_READ_MULTIPLE)) {
354  return OBJECT_INITIALIZATION;
355  }
356 
357 
358  // Now simply call the merge object repeatedly until it claims to
359  // be done or generates an error.
360  while (1) {
361  if (ami_err == MERGE_READ_MULTIPLE) {
362  for (ii = arity; ii--; ) {
363  if (taken_flags[ii]) {
364  ami_err = instreams[ii]->read_item(&(in_objects[ii]));
365  if (ami_err != NO_ERROR) {
366  if (ami_err == END_OF_STREAM) {
367  in_objects[ii] = NULL;
368  } else {
369  tpie_delete_array(in_objects, arity);
370  tpie_delete_array(taken_flags, arity);
371  return ami_err;
372  }
373  } else {
374 #if DEBUG_PERFECT_MERGE
375  input_count++;
376 #endif
377  }
378  }
379  // Clear all flags before operate is called.
380  taken_flags[ii] = 0;
381  }
382  } else {
383  // The last call took at most one item.
384  if (taken_index >= 0) {
385  ami_err = instreams[taken_index]->
386  read_item(&(in_objects[taken_index]));
387  if (ami_err != NO_ERROR) {
388  if (ami_err == END_OF_STREAM) {
389  in_objects[taken_index] = NULL;
390  } else {
391  tpie_delete_array(in_objects, arity);
392  tpie_delete_array(taken_flags, arity);
393  return ami_err;
394  }
395  } else {
396 #if DEBUG_PERFECT_MERGE
397  input_count++;
398 #endif
399  }
400  taken_flags[taken_index] = 0;
401  }
402  }
403  ami_err = m_obj->operate(in_objects, taken_flags, taken_index,
404  &merge_out);
405  if (ami_err == MERGE_DONE) {
406  break;
407  } else if (ami_err == MERGE_OUTPUT) {
408 #if DEBUG_PERFECT_MERGE
409  output_count++;
410 #endif
411  if ((ami_err = outstream->write_item(merge_out)) !=
412  NO_ERROR) {
413  tpie_delete_array(in_objects, arity);
414  tpie_delete_array(taken_flags, arity);
415  return ami_err;
416  }
417  } else if ((ami_err != MERGE_CONTINUE) &&
418  (ami_err != MERGE_READ_MULTIPLE)) {
419  tpie_delete_array(in_objects, arity);
420  tpie_delete_array(taken_flags, arity);
421  return ami_err;
422  }
423  }
424 
425 #if DEBUG_PERFECT_MERGE
426  tp_assert(input_count == output_count,
427  "Merge done, input_count = " << input_count <<
428  ", output_count = " << output_count << '.');
429 #endif
430 
431  tpie_delete_array(in_objects, arity);
432  tpie_delete_array(taken_flags, arity);
433 
434  return NO_ERROR;
435  };
436 
437 
438 
439 
440 
441  template<class T, class M>
443  stream<T> *outstream, M *m_obj) {
444 
445  err ae;
446  TPIE_OS_OFFSET len;
447  TPIE_OS_SIZE_T sz_avail;
448 
449  // How much memory is available?
450  sz_avail = consecutive_memory_available ();
451 
452  len = instream->stream_len();
453  if ((len * static_cast<TPIE_OS_OFFSET>(sizeof(T))) <= static_cast<TPIE_OS_OFFSET>(sz_avail)) {
454 
455  // If the whole input can fit in main memory just call
456  // m_obj->main_mem_operate
457 
458  ae = instream->seek(0);
459  assert(ae == NO_ERROR);
460 
461  // This code is sloppy and has to be rewritten correctly for
462  // parallel buffer allocation. It will not work with anything
463  // other than a registration based memory manager.
464  T *mm_stream;
465  TPIE_OS_OFFSET len1;
466  //allocate and read input stream in memory we know it fits, so we may cast.
467  mm_stream = tpie_new_array<T>(static_cast<TPIE_OS_SIZE_T>(len));
468 
469  len1 = len;
470  if ((ae = instream->read_array(mm_stream, &len1)) !=
471  NO_ERROR) {
472  return ae;
473  }
474  tp_assert(len1 == len, "Did not read the right amount; "
475  "Allocated space for " << len << ", read " << len1 << '.');
476 
477  //just call m_obj->main_mem_operate. We know that len items fit into
478  //main memory, so we may cast to TPIE_OS_SIZE_T
479  if ((ae = m_obj->main_mem_operate(mm_stream,
480  static_cast<TPIE_OS_SIZE_T>(len))) !=
481  NO_ERROR) {
482  TP_LOG_WARNING_ID("main_mem_operate failed");
483  return ae;
484  }
485 
486  //write array back to stream
487  if ((ae = outstream->write_array(mm_stream,
488  static_cast<TPIE_OS_SIZE_T>(len))) !=
489  NO_ERROR) {
490  TP_LOG_WARNING_ID("write array failed");
491  return ae;
492  }
493 
494  tpie_delete_array(mm_stream, static_cast<TPIE_OS_SIZE_T>(len));
495  return NO_ERROR;
496 
497  } else {
498 
499  // Something went wrong. We should not have called this
500  // function, since we don't have enough main memory.
501  TP_LOG_WARNING_ID("out of memory");
503  }
504  };
505 
506 
507  template<class T, class M>
509  stream<T> *outstream, M *m_obj) {
510 
511  err ae;
512  TPIE_OS_OFFSET len;
513  TPIE_OS_SIZE_T sz_avail, sz_stream;
514  unsigned int ii;
515  int jj;
516 
517  //How much memory is available?
518  sz_avail = consecutive_memory_available ();
519 
520  // If the whole input can fit in main memory then just call
521  // main_mem_merge() to deal with it by loading it once and
522  // processing it.
523  len = instream->stream_len();
524  if ((len * static_cast<TPIE_OS_OFFSET>(sizeof(T))) <= static_cast<TPIE_OS_OFFSET>(sz_avail)) {
525  return main_mem_merge(instream, outstream, m_obj);
526  }
527  //else {
528 
529 
530 
531  // The number of substreams that can be merged together at once; i
532  // this many substreams (at most) we are dividing the input stream
533  arity_t merge_arity;
534 
535  //nb of substreams the original input stream will be split into
536  arity_t nb_orig_substr;
537 
538  // length (nb obj of type T) of the original substreams of the input
539  // stream. The last one may be shorter than this.
540  TPIE_OS_OFFSET sz_orig_substr;
541 
542  // The initial temporary stream, to which substreams of the
543  // original input stream are written.
544  stream<T> *initial_tmp_stream;
545 
546  // A pointer to the buffer in main memory to read a memory load into.
547  T *mm_stream;
548 
549 
550  // Loop variables:
551 
552  // The stream being read at the current level.
553  stream<T> *current_input;
554 
555  // The output stream for the current level if it is not outstream.
556  stream<T> *intermediate_tmp_stream;
557 
558  // The size of substreams of *current_input that are being
559  // merged. The last one may be smaller. This value should be
560  // sz_orig_substr * (merge_arity ** k) where k is the
561  // number of iterations the loop has gone through.
562  TPIE_OS_OFFSET current_substream_len;
563 
564  // The exponenent used to verify that current_substream_len is
565  // correct.
566  unsigned int k;
567 
568  TPIE_OS_OFFSET sub_start, sub_end;
569 
570 
571 
572  // How many substreams will there be? The main memory
573  // available to us is the total amount available, minus what
574  // is needed for the input stream and the temporary stream.
575  if ((ae = instream->main_memory_usage(&sz_stream, STREAM_USAGE_MAXIMUM))
576  != NO_ERROR) {
577  return ae;
578  }
579  if (sz_avail <= 2 * sz_stream + sizeof(T)) {
581  }
582  sz_avail -= 2 * sz_stream;
583 
584 
585  // number of elements that will fit in memory (M) -R
586  sz_orig_substr = sz_avail / sizeof(T);
587 
588  // Round the original substream length off to an integral number of
589  // chunks. This is for systems like HP-UX that cannot map in
590  // overlapping regions. It is also required for BTE's that are
591  // capable of freeing chunks as they are read.
592  {
593  TPIE_OS_OFFSET sz_chunk_size = instream->chunk_size();
594 
595  sz_orig_substr = sz_chunk_size *
596  ((sz_orig_substr + sz_chunk_size - 1) /sz_chunk_size);
597  // WARNING sz_orig_substr now may not fit in memory!!! -R
598  }
599 
600  // number of memoryloads in input ceil(N/M) -R
601  nb_orig_substr = static_cast<arity_t>((len + sz_orig_substr - 1) /
602  sz_orig_substr);
603 
604  // Account for the space that a merge object will use.
605  {
606  TPIE_OS_SIZE_T sz_avail_during_merge = sz_avail - m_obj->space_usage_overhead();
607  TPIE_OS_SIZE_T sz_stream_during_merge = sz_stream +m_obj->space_usage_per_stream();
608 
609  merge_arity = static_cast<arity_t>((sz_avail_during_merge +
610  sz_stream_during_merge - 1) /
611  sz_stream_during_merge);
612  }
613 
614  // Make sure that the AMI is willing to provide us with the number
615  // of substreams we want. It may not be able to due to operating
616  // system restrictions, such as on the number of regions that can be
617  // mmap()ed in.
618  {
619  int ami_available_streams = instream->available_streams();
620 
621  if (ami_available_streams != -1) {
622  if (ami_available_streams <= 4) {
624  }
625  // safe to cast, since ami_available_streams > 4
626  if (merge_arity > static_cast<arity_t>(ami_available_streams) - 2) {
627  merge_arity = ami_available_streams - 2;
628  TP_LOG_DEBUG_ID("Reduced merge arity due to AMI restrictions.");
629  }
630  }
631  }
632  TP_LOG_DEBUG_ID("partition_and_merge(): merge arity = "
633  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(merge_arity));
634  if (merge_arity < 2) {
636  }
637 
638 
639  //#define MINIMIZE_INITIAL_SUBSTREAM_LENGTH
640 #ifdef MINIMIZE_INITIAL_SUBSTREAM_LENGTH
641 
642  // Make the substreams as small as possible without increasing the
643  // height of the merge tree.
644  {
645  // The tree height is the ceiling of the log base merge_arity
646  // of the number of original substreams.
647 
648  double tree_height = log((double)nb_orig_substr)/ log((double)merge_arity);
649  tp_assert(tree_height > 0, "Negative or zero tree height!");
650 
651  tree_height = ceil(tree_height);
652 
653  // See how many substreams we could possibly fit in the tree
654  // without increasing the height.
655  double max_original_substreams = pow((double)merge_arity, tree_height);
656  tp_assert(max_original_substreams >= nb_orig_substr,
657  "Number of permitted substreams was reduced.");
658 
659  // How big will such substreams be?
660  double new_sz_original_substream = ceil((double)len /
661  max_original_substreams);
662  tp_assert(new_sz_original_substream <= sz_orig_substr,
663  "Size of original streams increased.");
664 
665  sz_orig_substr = (size_t)new_sz_original_substream;
666  TP_LOG_DEBUG_ID("Memory constraints set original substreams = " << nb_orig_substr);
667 
668  nb_orig_substr = (len + sz_orig_substr - 1) / sz_orig_substr;
669  TP_LOG_DEBUG_ID("Tree height constraints set original substreams = " << nb_orig_substr);
670  }
671 #endif // MINIMIZE_INITIAL_SUBSTREAM_LENGTH
672 
673 
674  // Create a temporary stream, then iterate through the substreams,
675  // processing each one and writing it to the corresponding substream
676  // of the temporary stream.
677  initial_tmp_stream = tpie_new<stream<T> >();
678  mm_stream = tpie_new_array<T>(static_cast<TPIE_OS_SIZE_T>(sz_orig_substr));
679 
680  instream->seek(0);
681  assert(ae == NO_ERROR);
682 
683  tp_assert(static_cast<TPIE_OS_OFFSET>(nb_orig_substr * sz_orig_substr - len) < sz_orig_substr,
684  "Total substream length too long or too many.");
685  tp_assert(len - static_cast<TPIE_OS_OFFSET>(nb_orig_substr - 1) * sz_orig_substr <= sz_orig_substr,
686  "Total substream length too short or too few.");
687 
688  for (ii = 0; ii++ < nb_orig_substr; ) {
689 
690  TPIE_OS_OFFSET mm_len;
691  if (ii == nb_orig_substr) {
692  mm_len = len % sz_orig_substr;
693  // If it is an exact multiple, then the mod will come out 0,
694  // which is wrong.
695  if (!mm_len) {
696  mm_len = sz_orig_substr;
697  }
698  } else {
699  mm_len = sz_orig_substr;
700  }
701 #ifndef TPIE_NDEBUG
702  TPIE_OS_OFFSET mm_len_bak = mm_len;
703 #endif
704 
705  // Read a memory load out of the input stream.
706  ae = instream->read_array(mm_stream, &mm_len);
707  if (ae != NO_ERROR) {
708  return ae;
709  }
710  tp_assert(mm_len == mm_len_bak,
711  "Did not read the requested number of objects." <<
712  "\n\tmm_len = " << mm_len <<
713  "\n\tmm_len_bak = " << mm_len_bak << '.');
714 
715  // Solve in main memory. We know it fits, so cast to TPIE_OS_SIZE_T
716  m_obj->main_mem_operate(mm_stream, static_cast<TPIE_OS_SIZE_T>(mm_len));
717 
718  // Write the result out to the temporary stream.
719  ae = initial_tmp_stream->write_array(mm_stream, static_cast<TPIE_OS_SIZE_T>(mm_len));
720  if (ae != NO_ERROR) {
721  return ae;
722  }
723  } //for
724  tpie_delete_array(mm_stream, static_cast<TPIE_OS_SIZE_T>(sz_orig_substr));
725 
726  // Make sure the total length of the temporary stream is the same as
727  // the total length of the original input stream.
728  tp_assert(instream->stream_len() == initial_tmp_stream->stream_len(),
729  "Stream lengths do not match:" <<
730  "\n\tinstream->stream_len() = " << instream->stream_len() <<
731  "\n\tinitial_tmp_stream->stream_len() = " <<
732  initial_tmp_stream->stream_len() << ".\n");
733 
734  // Set up the loop invariants for the first iteration of hte main
735  // loop.
736  current_input = initial_tmp_stream;
737  current_substream_len = sz_orig_substr;
738 
739  // Pointers to the substreams that will be merged.
740  stream<T>* *the_substreams = tpie_new_array<stream<T>* >(merge_arity);
741 
742  //Monitoring prints.
743  TP_LOG_DEBUG_ID("Number of runs from run formation is "
744  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(nb_orig_substr));
745  TP_LOG_DEBUG_ID("Merge arity is "
746  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(merge_arity));
747 
748 
749  k = 0;
750  // The main loop. At the outermost level we are looping over levels
751  // of the merge tree. Typically this will be very small, e.g. 1-3.
752  // CAVEAT: is the cast o.k.??
753  for( ; current_substream_len < len;
754  current_substream_len *= merge_arity) {
755 
756  // The number of substreams to be processed at this level.
757  arity_t substream_count;
758 
759  // Set up to process a given level.
760  tp_assert(len == current_input->stream_len(),
761  "Current level stream not same length as input." <<
762  "\n\tlen = " << len <<
763  "\n\tcurrent_input->stream_len() = " <<
764  current_input->stream_len() << ".\n");
765 
766  // Do we have enough main memory to merge all the substreams on
767  // the current level into the output stream? If so, then we will
768  // do so, if not then we need an additional level of iteration to
769  // process the substreams in groups.
770  substream_count = static_cast<arity_t>((len + current_substream_len - 1) /
771  current_substream_len);
772 
773  if (substream_count <= merge_arity) {
774 
775  TP_LOG_DEBUG_ID("Merging substreams directly to the output stream.");
776 
777  // Create all the substreams
778  for (sub_start = 0, ii = 0 ;
779  ii < substream_count;
780  sub_start += current_substream_len, ii++) {
781 
782  sub_end = sub_start + current_substream_len - 1;
783  if (sub_end >= len) {
784  sub_end = len - 1;
785  }
786  current_input->new_substream(READ_STREAM, sub_start, sub_end, the_substreams + ii);
787  // The substreams are read-once.
788  the_substreams[ii]->persist(PERSIST_READ_ONCE);
789  }
790 
791  tp_assert((sub_start >= len) &&
792  (sub_start < len + current_substream_len),
793  "Loop ended in wrong location.");
794 
795  // Fool the OS into unmapping the current block of the input
796  // stream so that blocks of the substreams can be mapped in
797  // without overlapping it. This is needed for correct execution
798  // on HP-UX.
799  //this needs to be cleaned up..Laura
800  current_input->seek(0);
801  assert(ae == NO_ERROR);
802 
803  // Merge them into the output stream.
804  ae = single_merge(the_substreams, substream_count, outstream, m_obj);
805  if (ae != NO_ERROR) {
806  return ae;
807  }
808  // Delete the substreams.
809  for (ii = 0; ii < substream_count; ii++) {
810  tpie_delete(the_substreams[ii]);
811  }
812  // And the current input, which is an intermediate stream of
813  // some kind.
814  tpie_delete(current_input);
815  } else {
816 
817  //substream_count is > merge_arity
818  TP_LOG_DEBUG_ID("Merging substreams to an intermediate stream.");
819 
820  // Create the next intermediate stream.
821  intermediate_tmp_stream = new stream<T>;
822 
823  // Fool the OS into unmapping the current block of the input
824  // stream so that blocks of the substreams can be mapped in
825  // without overlapping it. This is needed for correct execution
826  // on HU-UX.
827  //this needs to be cleaned up..Laura
828  current_input->seek(0);
829  assert(ae == NO_ERROR);
830 
831  // Loop through the substreams of the current stream, merging as
832  // many as we can at a time until all are done with.
833  for (sub_start = 0, ii = 0, jj = 0;
834  ii < substream_count;
835  sub_start += current_substream_len, ii++, jj++) {
836 
837  sub_end = sub_start + current_substream_len - 1;
838  if (sub_end >= len) {
839  sub_end = len - 1;
840  }
841  current_input->new_substream(READ_STREAM, sub_start, sub_end, the_substreams + jj);
842  // The substreams are read-once.
843  the_substreams[jj]->persist(PERSIST_READ_ONCE);
844 
845  // If we've got all we can handle or we've seen them all, then
846  // merge them.
847  if ((jj >= static_cast<int>(merge_arity) - 1) ||
848  (ii == substream_count - 1)) {
849 
850  tp_assert(jj <= static_cast<int>(merge_arity) - 1,
851  "Index got too large.");
852 #if DEBUG_ASSERTIONS
853  // Check the lengths before the merge.
854  TPIE_OS_OFFSET sz_output, sz_output_after_merge;
855  TPIE_OS_OFFSET sz_substream_total;
856 
857  {
858  unsigned int kk;
859 
860  sz_output = intermediate_tmp_stream->stream_len();
861  sz_substream_total = 0;
862 
863  for (kk = jj+1; kk--; ) {
864  sz_substream_total += the_substreams[kk]->stream_len();
865  }
866 
867  }
868 #endif
869 
870  // This should append to the stream, since
871  // single_merge() does not rewind the output before
872  // merging.
873  ae = single_merge(the_substreams, jj+1,
874  intermediate_tmp_stream, m_obj);
875  if (ae != NO_ERROR) {
876  return ae;
877  }
878 
879 #if DEBUG_ASSERTIONS
880  // Verify the total lengths after the merge.
881  sz_output_after_merge = intermediate_tmp_stream->stream_len();
882  tp_assert(sz_output_after_merge - sz_output ==
883  sz_substream_total,
884  "Stream lengths do not add up: " <<
885  sz_output_after_merge - sz_output <<
886  " written when " <<
887  sz_substream_total <<
888  " were to have been read.");
889 
890 #endif
891 
892  // Delete the substreams. jj is currently the index of the
893  // largest, so we want to bump it up before the idiomatic
894  // loop.
895  for (jj++; jj--; )
896  tpie_delete(the_substreams[jj]);
897 
898  // Now jj should be -1 so that it gets bumped back up to 0
899  // before the next iteration of the outer loop.
900  tp_assert((jj == -1), "Index not reduced to -1.");
901 
902  } // if
903  } //for
904 
905  // Get rid of the current input stream and use the next one.
906  tpie_delete(current_input);
907  current_input = intermediate_tmp_stream;
908  }
909 
910  k++;
911 
912  }
913 
914  //Monitoring prints.
915  TP_LOG_DEBUG_ID("Number of passes incl run formation is " << k+1);
916 
917  tpie_delete_array(the_substreams, merge_arity);
918  return NO_ERROR;
919 
920  }
921 
922  } // ami namespace
923 
924 } // tpie namespace
925 
926 
927 
928 #endif
Defines the tp_assert macro.
err read_array(T *mm_space, stream_offset_type *len)
Reads *len items from the current position of the stream into the array mm_array. ...
Definition: stream.h:484
err write_item(const T &elt)
Writes elt to the stream in the current position.
Definition: stream.h:478
size_t consecutive_memory_available(size_t granularity=5 *1024 *1024)
Find the largest amount of memory that can be allocated as a single chunk.
Amount currently in use.
Definition: stream_usage.h:34
An TPIE entry point was not able to properly initialize the operation management object that was pass...
Definition: err.h:69
Value returned by a merge_management_object, telling merge() to continue to call the operate() member...
Definition: err.h:116
Memory management subsystem.
Max amount that will ever be used.
Definition: stream_usage.h:36
No error occurred.
Definition: err.h:47
TPIE_OS_SIZE_T arity_t
Intended to signal the number of input streams in a merge.
int merge_flag
Intended to signal in a merge which of the input streams are non-empty.
Definition: merge.h:57
err partition_and_merge(stream< T > *instream, stream< T > *outstream, M *m_obj)
Partitions a stream into substreams small enough to fit.
Definition: merge.h:508
This file contains a few deprecated definitions for legacy code.
err single_merge(stream< T > **instreams, arity_t arity, stream< T > *outstream, M *m_obj)
Merges arity streams in memory using a merge management object and write result into outstream...
Definition: merge.h:298
err read_item(T **elt)
Reads the current item from the stream and advance the "current item" pointer to the next item...
Definition: stream.h:469
AMI streams.
void persist(persistence p)
Set the stream's persistence flag to p, which can have one of two values: PERSIST_DELETE or PERSIST_P...
Definition: stream.h:309
Value returned by a merge_management_object, signaling that the merge() completed.
Definition: err.h:112
err seek(stream_offset_type offset)
Move the current position to off (measured in terms of items.
Definition: stream.h:419
stream_offset_type stream_len(void) const
Returns the number of items in the stream.
Definition: stream.h:213
TPIE_OS_SIZE_T arity_t
Intended to signal the number of input streams in a merge.
Definition: merge.h:59
size_t available_streams(void)
Returns the number of globally available streams.
Definition: stream.h:293
err main_mem_merge(stream< T > *instream, stream< T > *outstream, M *m_obj)
Reads instream in memory and merges it using m_obj->main_mem_operate(); if instream does not fit in m...
Definition: merge.h:442
err main_memory_usage(size_type *usage, stream_usage usage_type) const
This function is used for obtaining the amount of main memory used by an Stream object (in bytes)...
Definition: stream.h:449
A Stream object stores an ordered collection of objects of type T on external memory.
Definition: stream.h:99
Superclass for merge management objects.
Definition: merge.h:69
err merge(stream< T > **instreams, arity_t arity, stream< T > *outstream, M *m_obj)
Merges arity streams using a merge management object and writes result into outstream.
Definition: merge.h:253
TPIE could not allocate enough intermediate streams to perform the requested operation.
Definition: err.h:86
err
Legacy TPIE error codes.
Definition: err.h:45
void tpie_delete(T *p)
Delete an object allocated with tpie_new.
Definition: memory.h:380
Value returned by a merge_management_object, telling merge() that more than one input ob ject was con...
Definition: err.h:123
#define tp_assert(condition, message)
Definition: tpie_assert.h:47
The memory manager could not make adequate main memory available to complete the requested operation...
Definition: err.h:79
memory_size_type chunk_size(void) const
Returns the maximum number of items (of type T) that can be stored in one block.
Definition: stream.h:301
An attempt was made to read past the end of a stream or write past the end of a substream.
Definition: err.h:52
Value returned by a merge_management_object, signaling that the last merge() call generated output fo...
Definition: err.h:119
void tpie_delete_array(T *a, size_t size)
Delete an array allocated with tpie_new_array.
Definition: memory.h:398
merge_output_type
Definition: merge.h:45
err write_array(const T *mm_space, memory_size_type len)
Writes len items from array |mm_array to the stream, starting in the current position.
Definition: stream.h:501