FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
EM_vector.h
1 #ifndef __EM_VECTOR_H__
2 #define __EM_VECTOR_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashMatrix.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <unordered_map>
24 #include <memory>
25 #include <atomic>
26 
27 #include "io_interface.h"
28 
29 #include "vec_store.h"
30 #include "local_vec_store.h"
31 #include "mem_worker_thread.h"
32 #include "EM_object.h"
33 
34 namespace fm
35 {
36 
37 namespace detail
38 {
39 
40 class matrix_store;
41 class EM_vec_store;
42 
43 /*
44  * This sorts the first external-memory vector in `vecs' and shuffles
45  * the remaining vectors accordingly.
46  */
47 std::vector<std::shared_ptr<EM_vec_store> > sort(
48  const std::vector<std::shared_ptr<const EM_vec_store> > &vecs);
49 
50 class EM_vec_store: public vec_store, public EM_object
51 {
52  struct empty_free {
53  void operator()(EM_vec_store *) {
54  }
55  };
56 
57  file_holder::ptr holder;
58  io_set::ptr ios;
59 
60  EM_vec_store(size_t length, const scalar_type &type);
61  EM_vec_store(safs::file_io_factory::shared_ptr factory);
62  EM_vec_store(const EM_vec_store &store);
63 public:
64  typedef std::shared_ptr<EM_vec_store> ptr;
65  typedef std::shared_ptr<const EM_vec_store> const_ptr;
66 
67  static ptr cast(vec_store::ptr vec);
68  static const_ptr cast(vec_store::const_ptr vec);
69 
70  static ptr create(size_t length, const scalar_type &type) {
71  return ptr(new EM_vec_store(length, type));
72  }
73 
74  // This creates a byte array.
75  static ptr create(safs::file_io_factory::shared_ptr factory) {
76  return ptr(new EM_vec_store(factory));
77  }
78 
79  ~EM_vec_store();
80 
81  /*
82  * This sets the vector persistent.
83  * Even if this vector object is destroyed, the vector data will still
84  * exist in SAFS.
85  */
86  bool set_persistent(const std::string &name);
87 
88  virtual bool resize(size_t length);
89 
90  virtual bool append(std::vector<vec_store::const_ptr>::const_iterator vec_it,
91  std::vector<vec_store::const_ptr>::const_iterator vec_end);
92  virtual bool append(const vec_store &vec);
93  virtual vec_store::ptr deep_copy() const;
94  virtual vec_store::ptr shallow_copy();
95  virtual vec_store::const_ptr shallow_copy() const;
96 
97  virtual size_t get_portion_size() const;
98  virtual local_vec_store::const_ptr get_portion(off_t loc, size_t size) const;
99  virtual local_vec_store::ptr get_portion(off_t loc, size_t size);
100  virtual bool set_portion(std::shared_ptr<const local_vec_store> store,
101  off_t loc);
102  /*
103  * This is different from the one used in the memory data container.
104  * This interface accepts a portion compute object, which is invoked
105  * when the data of the portion is ready in memory. The returned
106  * local vector store doesn't have invalid data right after it's
107  * returned.
108  */
109  virtual local_vec_store::ptr get_portion_async(off_t start,
110  size_t size, portion_compute::ptr compute) const;
111  virtual std::vector<local_vec_store::ptr> get_portion_async(
112  const std::vector<std::pair<off_t, size_t> > &locs,
113  portion_compute::ptr compute) const;
114  /*
115  * Write the data in the local buffer to some portion in the vector.
116  * The location is indicated in the local buffer. However, a user
117  * can redirect the location by providing `off' as a parameter.
118  */
119  virtual void write_portion_async(local_vec_store::const_ptr portion,
120  off_t off = -1);
121 
122  virtual void reset_data();
123  virtual void set_data(const set_vec_operate &op);
124 
125  virtual vec_store::ptr sort_with_index();
126  virtual void sort();
127  virtual bool is_sorted() const;
128 
129  virtual std::shared_ptr<matrix_store> conv2mat(size_t nrow,
130  size_t ncol, bool byrow);
131 
132  virtual std::vector<safs::io_interface::ptr> create_ios() const;
133 
134  friend std::vector<EM_vec_store::ptr> sort(
135  const std::vector<EM_vec_store::const_ptr> &vecs);
136 };
137 
138 namespace EM_sort_detail
139 {
140 
141 /*
142  * This priority queue helps to sort data in the ascending order.
143  */
144 class anchor_prio_queue
145 {
146  struct anchor_struct
147  {
148  local_buf_vec_store::const_ptr local_anchors;
149  int id;
150  off_t curr_off;
151  };
152 
153  class anchor_ptr_less {
154  const bulk_operate *gt;
155  public:
156  anchor_ptr_less(const scalar_type &type) {
157  gt = type.get_basic_ops().get_op(basic_ops::op_idx::GT);;
158  }
159 
160  bool operator()(const anchor_struct *anchor1,
161  const anchor_struct *anchor2) const {
162  bool ret;
163  gt->runAA(1, anchor1->local_anchors->get(anchor1->curr_off),
164  anchor2->local_anchors->get(anchor2->curr_off), &ret);
165  return ret;
166  }
167  };
168 
169  const size_t sort_buf_size;
170  const size_t anchor_gap_size;
171  std::vector<anchor_struct> anchor_bufs;
172  typedef std::priority_queue<anchor_struct *, std::vector<anchor_struct *>,
173  anchor_ptr_less> anchor_queue_t;
174  anchor_queue_t queue;
175 
176  off_t get_anchor_off(const anchor_struct &anchor) const;
177 public:
178  typedef std::shared_ptr<anchor_prio_queue> ptr;
179 
180  anchor_prio_queue(const std::vector<local_buf_vec_store::ptr> &anchor_vals,
181  size_t sort_buf_size, size_t anchor_gap_size);
182  scalar_variable::ptr get_min_frontier() const;
183  size_t get_anchor_gap_size() const {
184  return anchor_gap_size;
185  }
186 
187  /*
188  * Here we pop a set of chunks of data whose values are the potentially
189  * the smallest.
190  */
191  std::vector<off_t> pop(size_t size);
192  /*
193  * Fetch the first anchors from all queues.
194  */
195  std::vector<off_t> fetch_all_first();
196 };
197 
198 /*
199  * This class keeps some summaries of sorted vectors in memory, so later on
200  * we can use these summaries to pinpoint the right data for merging.
201  * To generate a summary of a vector, we put some conceptional anchors in
202  * the vector and keep the values in the anchor location.
203  */
204 class sort_portion_summary
205 {
206  const size_t sort_buf_size;
207  const size_t anchor_gap_size;
208  std::vector<local_buf_vec_store::ptr> anchor_vals;
209 public:
210  sort_portion_summary(size_t num_sort_bufs, size_t sort_buf_size,
211  size_t anchor_gap_size);
212  void add_portion(local_buf_vec_store::const_ptr sorted_buf);
213  anchor_prio_queue::ptr get_prio_queue() const;
214 
215  local_vec_store::const_ptr get_anchor_vals(off_t idx) const {
216  return anchor_vals[idx];
217  }
218  size_t get_num_bufs() const {
219  return anchor_vals.size();
220  }
221 };
222 
223 /*
224  * This class sorts a portion of data in the vector and writes the sorted
225  * results to disks.
226  */
227 class EM_vec_sort_compute: public portion_compute
228 {
229  // The portions are from different vectors.
230  // Each portion has to be read from the disks.
231  std::vector<local_buf_vec_store::ptr> portions;
232  // Where the sorted portion written to.
233  std::vector<EM_vec_store::ptr> to_vecs;
234  sort_portion_summary &summary;
235  // The number of portions that have been read from the disks.
236  size_t num_completed;
237 public:
238  EM_vec_sort_compute(const std::vector<EM_vec_store::ptr> &vecs,
239  sort_portion_summary &_summary): summary(_summary) {
240  this->to_vecs = vecs;
241  num_completed = 0;
242  }
243  virtual void run(char *buf, size_t size);
244  void set_bufs(std::vector<local_buf_vec_store::ptr> portions) {
245  this->portions = portions;
246  }
247 };
248 
249 class EM_vec_merge_dispatcher;
250 
251 /*
252  * This class merges data read from disks and writes it back to disks.
253  */
254 class EM_vec_merge_compute: public portion_compute
255 {
256  // This defines the container with all the portions used for merging
257  // a vector. The first buffer in the set may be the leftover from
258  // the previous merge.
259  typedef std::vector<local_buf_vec_store::const_ptr> merge_set_t;
260  std::vector<merge_set_t> stores;
261  EM_vec_merge_dispatcher &dispatcher;
262  size_t num_completed;
263  // It's not the same as the number of local buffers in `stores' because
264  // `stores' may contain the buffer with the leftdata from the previous
265  // merge.
266  size_t num_expected;
267 public:
268  EM_vec_merge_compute(
269  const std::vector<local_buf_vec_store::ptr> &prev_leftovers,
270  EM_vec_merge_dispatcher &_dispatcher);
271  virtual void run(char *buf, size_t size);
272  void set_bufs(const std::vector<merge_set_t> &bufs);
273 };
274 
275 /*
276  * The two functions compute the sort buffer size and anchor gap size.
277  */
278 std::pair<size_t, size_t> cal_sort_buf_size(const scalar_type &type,
279  size_t num_eles);
280 std::pair<size_t, size_t> cal_sort_buf_size(
281  const std::vector<const scalar_type *> &types, size_t num_eles);
282 
283 }
284 
285 }
286 
287 }
288 
289 #endif