FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
sorter.h
1 #ifndef __MY_SORTER_H__
2 #define __MY_SORTER_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashMatrix.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <assert.h>
24 #include <memory>
25 #if defined(_OPENMP)
26 #include <parallel/algorithm>
27 #else
28 #include <algorithm>
29 #endif
30 
31 namespace fm
32 {
33 
34 class sorter
35 {
36 public:
37  virtual bool is_sorted(const char *data, size_t num, bool decreasing) const = 0;
38  virtual void sort_with_index(char *data, off_t *offs, size_t num,
39  bool decreasing) const = 0;
40  virtual void sort(char *data, size_t num, bool decreasing) const = 0;
41  virtual void serial_sort(char *data, size_t num, bool decreasing) const = 0;
42  virtual void merge(
43  const std::vector<std::pair<const char *, const char *> > &arrs,
44  char *output, size_t out_num) const = 0;
45  virtual void merge(
46  const std::vector<std::pair<const char *, const char *> > &arrs,
47  const std::vector<std::pair<int, off_t> > &merge_index,
48  char *output, size_t out_num) const = 0;
49  virtual void merge_with_index(
50  const std::vector<std::pair<const char *, const char *> > &arrs,
51  char *output, size_t out_num,
52  std::vector<std::pair<int, off_t> > &merge_index) const = 0;
53 };
54 
55 template<class T>
56 class type_sorter: public sorter
57 {
58  struct {
59  bool operator()(const T &e1, const T &e2) const {
60  return e1 < e2;
61  }
62  } entry_less;
63  struct {
64  bool operator()(const T &e1, const T &e2) const {
65  return e1 > e2;
66  }
67  } entry_greater;
68 
69 public:
70  virtual bool is_sorted(const char *data, size_t num, bool decreasing) const;
71  virtual void sort_with_index(char *data, off_t *offs, size_t num,
72  bool decreasing) const;
73  virtual void sort(char *data, size_t num, bool decreasing) const;
74  virtual void serial_sort(char *data, size_t num, bool decreasing) const;
75  virtual void merge(
76  const std::vector<std::pair<const char *, const char *> > &arrs,
77  char *output, size_t out_num) const;
78  virtual void merge(
79  const std::vector<std::pair<const char *, const char *> > &arrs,
80  const std::vector<std::pair<int, off_t> > &merge_index,
81  char *output, size_t out_num) const;
82  virtual void merge_with_index(
83  const std::vector<std::pair<const char *, const char *> > &arrs,
84  char *output, size_t out_num,
85  std::vector<std::pair<int, off_t> > &merge_index) const;
86 };
87 
88 template<class T>
89 bool type_sorter<T>::is_sorted(const char *data1, size_t num, bool decreasing) const
90 {
91  T *data = (T *) data1;
92  if (decreasing)
93  return std::is_sorted(data, data + num, entry_greater);
94  else
95  return std::is_sorted(data, data + num, entry_less);
96 }
97 
98 template<class T>
99 void type_sorter<T>::sort_with_index(char *data1, off_t *offs, size_t num,
100  bool decreasing) const
101 {
102  T *data = (T *) data1;
103  struct indexed_entry {
104  T val;
105  off_t idx;
106  };
107 
108  std::unique_ptr<indexed_entry[]> entries
109  = std::unique_ptr<indexed_entry[]>(new indexed_entry[num]);
110 #pragma omp parallel for
111  for (size_t i = 0; i < num; i++) {
112  entries[i].val = data[i];
113  entries[i].idx = i;
114  }
115 
116  struct {
117  bool operator()(const indexed_entry &e1, const indexed_entry &e2) const {
118  return e1.val < e2.val;
119  }
120  } entry_less;
121  struct {
122  bool operator()(const indexed_entry &e1, const indexed_entry &e2) const {
123  return e1.val > e2.val;
124  }
125  } entry_greater;
126 
127  indexed_entry *start = entries.get();
128  indexed_entry *end = start + num;
129 #if defined(_OPENMP)
130  if (decreasing)
131  __gnu_parallel::sort(start, end, entry_greater);
132  else
133  __gnu_parallel::sort(start, end, entry_less);
134 #else
135  if (decreasing)
136  std::sort(start, end, entry_greater);
137  else
138  std::sort(start, end, entry_less);
139 #endif
140 #pragma omp parallel for
141  for (size_t i = 0; i < num; i++) {
142  data[i] = start[i].val;
143  offs[i] = start[i].idx;
144  }
145 }
146 
147 template<class T>
148 void type_sorter<T>::sort(char *data1, size_t num, bool decreasing) const
149 {
150  T *data = (T *) data1;
151  T *start = (T *) data;
152  T *end = start + num;
153 #if defined(_OPENMP)
154  if (decreasing)
155  __gnu_parallel::sort(start, end, entry_greater);
156  else
157  __gnu_parallel::sort(start, end, entry_less);
158 #else
159  if (decreasing)
160  std::sort(start, end, entry_greater);
161  else
162  std::sort(start, end, entry_less);
163 #endif
164 }
165 
166 template<class T>
167 void type_sorter<T>::serial_sort(char *data1, size_t num, bool decreasing) const
168 {
169  T *data = (T *) data1;
170  T *start = (T *) data;
171  T *end = start + num;
172  if (decreasing)
173  std::sort(start, end, entry_greater);
174  else
175  std::sort(start, end, entry_less);
176 }
177 
178 template<class T>
179 void type_sorter<T>::merge(
180  const std::vector<std::pair<const char *, const char *> > &raw_arrs,
181  char *output, size_t out_num) const
182 {
183  std::vector<std::pair<T *, T *> > arrs(raw_arrs.size());
184  for (size_t i = 0; i < arrs.size(); i++)
185  arrs[i] = std::pair<T *, T *>((T *) raw_arrs[i].first,
186  (T *) raw_arrs[i].second);
187  __gnu_parallel::multiway_merge(arrs.begin(), arrs.end(), (T *) output,
188  out_num, entry_less);
189 }
190 
191 /*
192  * Merge multiple arrays according to the specified locations.
193  * Here I assume there are a few number of arrays to merge.
194  */
195 template<class T>
196 void type_sorter<T>::merge(
197  const std::vector<std::pair<const char *, const char *> > &arrs,
198  const std::vector<std::pair<int, off_t> > &merge_index,
199  char *output, size_t out_num) const
200 {
201  T *t_output = (T *) output;
202 #pragma omp parallel for
203  for (size_t i = 0; i < out_num; i++) {
204  int arr_idx = merge_index[i].first;
205  off_t off_in_arr = merge_index[i].second;
206  const T *t_arr = (const T *) arrs[arr_idx].first;
207  assert(&t_arr[off_in_arr] <= (const T *) arrs[arr_idx].second);
208  t_output[i] = t_arr[off_in_arr];
209  }
210 }
211 
212 /*
213  * Get the length of an array indicated by the pair (`first' is the beginning
214  * of the array and `second' is the end of the array.
215  */
216 template<class T>
217 size_t get_length(const std::pair<const char *, const char *> &arr)
218 {
219  return (arr.second - arr.first) / sizeof(T);
220 }
221 
222 /*
223  * Merge multiple arrays and return the merged result as well as how
224  * the arrays are merged.
225  * Here I assume there are a few number of arrays to merge.
226  */
227 template<class T>
228 void type_sorter<T>::merge_with_index(
229  const std::vector<std::pair<const char *, const char *> > &arrs,
230  char *output, size_t out_num,
231  std::vector<std::pair<int, off_t> > &merge_index) const
232 {
233  struct indexed_entry {
234  T val;
235  int arr_idx;
236  off_t off_in_arr;
237  };
238  std::unique_ptr<indexed_entry[]> buf(new indexed_entry[out_num]);
239  // Move data from `arrs' to `buf' in parallel.
240 #pragma omp parallel
241  {
242  size_t avg_part_len = ceil(((double) out_num) / omp_get_num_threads());
243  size_t thread_id = omp_get_thread_num();
244  size_t start = thread_id * avg_part_len;
245  if (out_num > start) {
246  size_t part_len = std::min(out_num - start, avg_part_len);
247 
248  // Find the first array for the current thread.
249  size_t curr_arr_idx = 0;
250  size_t i = 0;
251  while (true) {
252  // If the array is empty, it works fine.
253  size_t num_eles = get_length<T>(arrs[curr_arr_idx]);
254  if (i + num_eles > start)
255  break;
256  i += num_eles;
257  curr_arr_idx++;
258  }
259  assert(start >= i);
260  off_t curr_off_in_arr = start - i;
261  assert(get_length<T>(arrs[curr_arr_idx]) > 0);
262  assert(arrs[curr_arr_idx].first <= arrs[curr_arr_idx].second);
263  for (size_t i = 0; i < part_len; i++) {
264  const T *curr_ptr
265  = ((const T *) arrs[curr_arr_idx].first) + curr_off_in_arr;
266  assert(curr_ptr < (const T *) arrs[curr_arr_idx].second);
267  buf[start + i].val = *curr_ptr;
268  buf[start + i].arr_idx = curr_arr_idx;
269  buf[start + i].off_in_arr = curr_off_in_arr;
270  // If the current pointer points to the last element in the array,
271  // switch to the next array.
272  if (curr_ptr == ((const T *) arrs[curr_arr_idx].second) - 1) {
273  curr_arr_idx++;
274  // We need to skip the empty arrays.
275  while (curr_arr_idx < arrs.size()
276  && get_length<T>(arrs[curr_arr_idx]) == 0)
277  curr_arr_idx++;
278  if (curr_arr_idx < arrs.size())
279  assert(arrs[curr_arr_idx].first <= arrs[curr_arr_idx].second);
280  curr_off_in_arr = 0;
281  if (i + 1 < part_len)
282  assert(curr_arr_idx < arrs.size());
283  }
284  else
285  curr_off_in_arr++;
286  }
287  }
288  }
289 
290  std::vector<std::pair<indexed_entry *, indexed_entry *> > indexed_arrs(
291  arrs.size());
292  size_t off = 0;
293  for (size_t i = 0; i < arrs.size(); i++) {
294  size_t len = get_length<T>(arrs[i]);
295  indexed_arrs[i] = std::pair<indexed_entry *, indexed_entry *>(
296  &buf[off], &buf[off + len]);
297  off += len;
298  }
299  assert(off == out_num);
300 
301  struct {
302  bool operator()(const indexed_entry &e1, const indexed_entry &e2) const {
303  return e1.val < e2.val;
304  }
305  } entry_less;
306  std::unique_ptr<indexed_entry[]> merge_res(new indexed_entry[out_num]);
307  __gnu_parallel::multiway_merge(indexed_arrs.begin(), indexed_arrs.end(),
308  merge_res.get(), out_num, entry_less);
309  T *t_output = (T *) output;
310  for (size_t i = 0; i < out_num; i++) {
311  t_output[i] = merge_res[i].val;
312  merge_index[i].first = merge_res[i].arr_idx;
313  merge_index[i].second = merge_res[i].off_in_arr;
314  }
315 }
316 
317 }
318 
319 #endif