TPIE

v1.1rc1-6-g0c97303
parallel_sort.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*-
2 // vi:set ts=4 sts=4 sw=4 noet cino+=(0 :
3 // Copyright 2011, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
24 
25 #ifndef __TPIE_PARALLEL_SORT_H__
26 #define __TPIE_PARALLEL_SORT_H__
27 
28 #include <algorithm>
29 #include <boost/bind.hpp>
30 #include <boost/cstdint.hpp>
31 #include <boost/iterator/iterator_traits.hpp>
32 #include <boost/thread/mutex.hpp>
33 #include <boost/thread/thread.hpp>
34 #include <cmath>
35 #include <functional>
37 #include <tpie/dummy_progress.h>
38 #include <tpie/internal_queue.h>
39 #include <tpie/job.h>
40 #include <tpie/config.h>
41 
42 namespace tpie {
43 
52 template <typename iterator_type, typename comp_type, bool Progress,
53  size_t min_size=1024*1024*8/sizeof(typename boost::iterator_value<iterator_type>::type)>
55 private:
57 
61  struct progress_t {
62  typename P::base * pi;
63  boost::uint64_t work_estimate;
64  boost::uint64_t total_work_estimate;
65  boost::condition_variable cond;
66  boost::mutex mutex;
67  };
68 
70  typedef typename boost::iterator_value<iterator_type>::type value_type;
71 
75  static inline boost::uint64_t sortWork(boost::uint64_t n) {
76  return static_cast<uint64_t>(
77  log(static_cast<double>(n)) * static_cast<double>(n) * 1.8
78  / log(static_cast<double>(2)));
79  }
80 
87  template <typename comp_t>
88  static inline iterator_type unguarded_partition(iterator_type first,
89  iterator_type last,
90  comp_t & comp) {
91  // Textbook partitioning.
92  iterator_type pivot = first;
93  while (true) {
94  do --last;
95  while (comp(*pivot, *last));
96 
97  do {
98  if (first == last) break;
99  ++first;
100  } while (comp(*first, *pivot));
101 
102  if (first == last) break;
103 
104  std::iter_swap(first, last);
105  }
106  std::iter_swap(last, pivot);
107  return last;
108  }
109 
117  static inline iterator_type median(iterator_type a, iterator_type b, iterator_type c, comp_type & comp) {
118  if (comp(*a, *b)) {
119  if (comp(*b, *c)) return b;
120  else if (comp(*a, *c)) return c;
121  else return a;
122  } else {
123  if (comp(*a, *c)) return a;
124  else if (comp(*b, *c)) return c;
125  else return b;
126  }
127  }
128 
137  static inline iterator_type pick_pivot(iterator_type a, iterator_type b, comp_type & comp) {
138  if (a == b) return a;
139  assert(a < b);
140 
141  // Since (b-a) is at least min_size, which is at least 100000 in
142  // realistic contexts, ((b-a)/8)*c is a good approximation of
143  // (c*(b-a))/8.
144  size_t step = (b-a)/8;
145 
146  return median(median(a+0, a+step, a+step*2, comp),
147  median(a+step*3, a+step*4, a+step*5, comp),
148  median(a+step*6, a+step*7, b-1, comp), comp);
149  }
150 
157  static inline iterator_type partition(iterator_type a, iterator_type b, comp_type & comp) {
158  iterator_type pivot = pick_pivot(a, b, comp);
159 
160  std::iter_swap(pivot, a);
161  iterator_type l = unguarded_partition(a, b, comp);
162 
163  return l;
164  }
165 
166 #ifdef DOXYGEN
167 public:
168 #endif
169  class qsort_job : public job {
173  public:
177  qsort_job(iterator_type a, iterator_type b, comp_type comp, qsort_job * parent, progress_t & p)
178  : a(a), b(b), comp(comp), parent(parent), progress(p) {
179 
180  // Does nothing.
181  }
182 
183  ~qsort_job() {
184  for (size_t i = 0; i < children.size(); ++i) {
185  delete children[i];
186  }
187  children.resize(0);
188  }
189 
195  virtual void operator()() {
196  assert(a <= b);
197  assert(&*a != 0);
198  while (static_cast<size_t>(b - a) >= min_size) {
199  iterator_type pivot = partition(a, b, comp);
200  add_progress(b - a);
201  //qsort_job * j = tpie_new<qsort_job>(a, pivot, comp, this);
202  qsort_job * j = new qsort_job(a, pivot, comp, this, progress);
203  j->enqueue(this);
204  children.push_back(j);
205  a = pivot+1;
206  }
207  std::sort(a, b, comp);
208  add_progress(sortWork(b - a));
209  }
210 
211  protected:
212  virtual void on_done() override {
213  // Unfortunately, it might not be safe to delete our children at
214  // this point, as other threads might in theory wait for them to
215  // .join(). It is safer to postpone deletion until our own
216  // deletion.
217  if (!parent) {
218  boost::mutex::scoped_lock lock(progress.mutex);
219  progress.work_estimate = progress.total_work_estimate;
220  progress.cond.notify_one();
221  }
222  }
223 
224  private:
225  iterator_type a;
226  iterator_type b;
227  comp_type comp;
228  qsort_job * parent;
229  progress_t & progress;
230 
231  std::vector<qsort_job *> children;
232 
233  void add_progress(boost::uint64_t amount) {
234  boost::mutex::scoped_lock lock(progress.mutex);
235  progress.work_estimate += amount;
236  progress.cond.notify_one();
237  }
238  };
239 public:
240  parallel_sort_impl(typename P::base * p) {
241  progress.pi = p;
242  }
243 
249  void operator()(iterator_type a, iterator_type b, comp_type comp=std::less<value_type>() ) {
250  progress.work_estimate = 0;
251  progress.total_work_estimate = sortWork(b-a);
252  if (progress.pi) progress.pi->init(progress.total_work_estimate);
253 
254  if (static_cast<size_t>(b - a) < min_size) {
255  std::sort(a, b, comp);
256  if (progress.pi) progress.pi->done();
257  return;
258  }
259 
260  qsort_job * master = new qsort_job(a, b, comp, 0, progress);
261  master->enqueue();
262 
263  boost::uint64_t prev_work_estimate = 0;
264  boost::mutex::scoped_lock lock(progress.mutex);
265  while (progress.work_estimate < progress.total_work_estimate) {
266  if (progress.pi && progress.work_estimate > prev_work_estimate) progress.pi->step(progress.work_estimate - prev_work_estimate);
267  prev_work_estimate = progress.work_estimate;
268  progress.cond.wait(lock);
269  }
270  lock.unlock();
271 
272  master->join();
273  delete master;
274  if (progress.pi) progress.pi->done();
275  }
276 private:
277  static const size_t max_job_count=256;
278  progress_t progress;
279  bool kill;
280  size_t working;
281 
282  std::pair<iterator_type, iterator_type> jobs[max_job_count];
283  size_t job_count;
284 };
285 
294 template <bool Progress, typename iterator_type, typename comp_type>
295 void parallel_sort(iterator_type a,
296  iterator_type b,
298  comp_type comp=std::less<typename boost::iterator_value<iterator_type>::type>()) {
299 #ifdef TPIE_PARALLEL_SORT
301  s(a,b,comp);
302 #else
303  pi.init(1);
304  std::sort(a,b,comp);
305  pi.done();
306 #endif
307 }
308 
316 template <typename iterator_type, typename comp_type>
317 void parallel_sort(iterator_type a,
318  iterator_type b,
319  comp_type comp=std::less<typename boost::iterator_value<iterator_type>::type>()) {
320 #ifdef TPIE_PARALLEL_SORT
322  s(a,b,comp);
323 #else
324  std::sort(a, b, comp);
325 #endif
326 }
327 
328 
329 }
330 #endif //__TPIE_PARALLEL_SORT_H__
The base class for indicating the progress of some task.
void parallel_sort(iterator_type a, iterator_type b, typename tpie::progress_types< Progress >::base &pi, comp_type comp=std::less< typename boost::iterator_value< iterator_type >::type >())
Sort items in the range [a,b) using a parallel quick sort.
void operator()(iterator_type a, iterator_type b, comp_type comp=std::less< value_type >())
Perform a parallel sort of the items in the interval [a,b).
virtual void done()
Advance the indicator to the end.
virtual void on_done() override
Called when this job and all subjobs are done.
Generic internal queue with known memory requirements.
A simple parallel sort implementation with progress tracking.
Definition: parallel_sort.h:54
Definition: job.h:33
Represents quick sort work at a given level.
Progress indicator base.
void join()
Wait for this job and its subjobs to complete.
virtual void operator()()
Running a job with iterators a and b will repeatedly partition [a,b), spawn a job on the left part an...
Job class for job manager.
qsort_job(iterator_type a, iterator_type b, comp_type comp, qsort_job *parent, progress_t &p)
Construct a qsort_job.
For applications where you wish to disable progress indicators via a template parameter, refer to progress_types members names sub, fp and base.
void sort(file_stream< T > &instream, file_stream< T > &outstream, Compare comp, progress_indicator_base &indicator)
Sort elements of a stream using the given STL-style comparator object.
Definition: sort.h:59
void enqueue(job *parent=0)
Add this job to the job pool.
Progress indicator concept in an efficient non-inheritance way.
virtual void init(stream_size_type range=0)
Initialize progress indicator.