FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
simple_KV_store.h
1 #ifndef __SIMPLE_KV_STORE_H__
2 #define __SIMPLE_KV_STORE_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of SAFSlib.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include "io_interface.h"
24 #include "container.h"
25 #include "cache.h"
26 #include "slab_allocator.h"
27 
28 namespace safs
29 {
30 
31 template<class ValueType, class TaskType>
32 class simple_KV_store;
33 
34 template<class ValueType, class ValueTaskType>
35 class KV_compute: public user_compute
36 {
37  embedded_array<ValueTaskType> tasks;
38  int num_tasks;
39  int num_added_tasks;
40  bool has_run;
41  simple_KV_store<ValueType, ValueTaskType> *store;
42 public:
43  KV_compute(simple_KV_store<ValueType, ValueTaskType> *store,
44  compute_allocator *alloc): user_compute(alloc) {
45  this->store = store;
46  num_tasks = 0;
47  num_added_tasks = 0;
48  has_run = false;
49  }
50 
51  bool has_tasks() const {
52  return num_tasks > 0;
53  }
54 
55  void add_task(const ValueTaskType &task) {
56  num_added_tasks++;
57  if (tasks.get_capacity() <= num_tasks)
58  tasks.resize(num_tasks * 2);
59  if (num_tasks == 0 || !tasks[num_tasks - 1].merge(task))
60  tasks[num_tasks++] = task;
61  has_run = false;
62  }
63 
64  virtual int serialize(char *buf, int size) const {
65  return 0;
66  }
67 
68  virtual int get_serialized_size() const {
69  return 0;
70  }
71 
72  virtual void run(page_byte_array &arr) {
73  off_t start_off = arr.get_offset() / sizeof(ValueType);
74  off_t end_off = (arr.get_offset() + arr.get_size()) / sizeof(ValueType);
75  for (int i = 0; i < num_tasks; i++) {
76  off_t idx = tasks[i].get_idx();
77  BOOST_VERIFY(idx >= start_off && idx < end_off);
78  int num_entries = tasks[i].get_num_entries();
79  off_t it_start = (idx - start_off) * sizeof(ValueType);
80  off_t it_end = it_start + sizeof(ValueType) * num_entries;
81  page_byte_array::seq_const_iterator<ValueType> it
82  = arr.get_seq_iterator<ValueType>(it_start, it_end);
83  tasks[i].run(it);
84  }
85  has_run = true;
86  store->complete_tasks(num_added_tasks);
87  }
88 
89  virtual bool has_completed() {
90  return has_run;
91  }
92 
93  virtual int has_requests() {
94  return false;
95  }
96 
97  virtual request_range get_next_request() {
98  ABORT_MSG("get_next_request isn't supported");
99  }
100 };
101 
102 template<class ValueType, class ValueTaskType>
103 class KV_compute_allocator: public compute_allocator
104 {
105  class compute_initializer: public obj_initiator<KV_compute<ValueType, ValueTaskType> >
106  {
107  KV_compute_allocator<ValueType, ValueTaskType> *alloc;
108  simple_KV_store<ValueType, ValueTaskType> *store;
109  public:
110  compute_initializer(
111  simple_KV_store<ValueType, ValueTaskType> *store,
112  KV_compute_allocator<ValueType, ValueTaskType> *alloc) {
113  this->store = store;
114  this->alloc = alloc;
115  }
116 
117  virtual void init(KV_compute<ValueType, ValueTaskType> *obj) {
118  new (obj) KV_compute<ValueType, ValueTaskType>(store, alloc);
119  }
120  };
121 
122  class compute_destructor: public obj_destructor<KV_compute<ValueType, ValueTaskType> >
123  {
124  public:
125  void destroy(KV_compute<ValueType, ValueTaskType> *obj) {
126  obj->~KV_compute<ValueType, ValueTaskType>();
127  }
128  };
129 
130  obj_allocator<KV_compute<ValueType, ValueTaskType> > allocator;
131 public:
132  KV_compute_allocator(simple_KV_store<ValueType, ValueTaskType> *store,
133  int node_id): allocator("KV_compute_allocator",
134  node_id, false, 1024 * 1024, params.get_max_obj_alloc_size(),
135  typename obj_initiator<KV_compute<ValueType, ValueTaskType> >::ptr(
136  new compute_initializer(store, this)),
137  typename obj_destructor<KV_compute<ValueType, ValueTaskType> >::ptr(
138  new compute_destructor())) {
139  }
140 
141  virtual user_compute *alloc() {
142  return allocator.alloc_obj();
143  }
144 
145  virtual void free(user_compute *obj) {
146  allocator.free((KV_compute<ValueType, ValueTaskType> *) obj);
147  }
148 };
149 
150 /*
151  * This is a simple key-value store over a single file.
152  * It supports only one type of user-defined tasks on values and can be used
153  * in one thread.
154  * User tasks are executed asynchronously.
155  */
156 template<class ValueType, class TaskType>
157 class simple_KV_store
158 {
159  io_interface::ptr io;
160  KV_compute_allocator<ValueType, TaskType> alloc;
161 
162  struct task_comp_larger {
163  bool operator()(const TaskType task1, const TaskType task2) {
164  return task1.get_idx() >= task2.get_idx();
165  }
166  };
167  struct task_comp_smaller {
168  bool operator()(const TaskType &task1, const TaskType &task2) {
169  return task1.get_idx() <= task2.get_idx();
170  }
171  };
172  struct task_less {
173  bool operator()(const TaskType &task1, const TaskType &task2) {
174  return task1.get_idx() < task2.get_idx();
175  }
176  };
177 
178  std::deque<TaskType> task_buf;
179  ssize_t num_pending_tasks;
180 
181  embedded_array<io_request> req_buf;
182  int num_reqs;
183 
184  void add_io_request(io_request &req) {
185  if (req_buf.get_capacity() <= num_reqs)
186  req_buf.resize(req_buf.get_capacity() * 2);
187  req_buf[num_reqs] = req;
188  num_reqs++;
189  }
190 
191  void flush_io_requests() {
192  io->access(req_buf.data(), num_reqs);
193  num_reqs = 0;
194  }
195 
196  void sort_tasks() {
197  if (task_buf.size() < 2)
198  return;
199  if (task_buf[0].get_idx() > task_buf[1].get_idx()) {
200  if (std::is_sorted(task_buf.begin(), task_buf.end(), task_comp_larger()))
201  return;
202  }
203  else
204  if (std::is_sorted(task_buf.begin(), task_buf.end(), task_comp_smaller()))
205  return;
206  std::sort(task_buf.begin(), task_buf.end(), task_less());
207  }
208 
209  simple_KV_store(io_interface::ptr io): alloc(this, io->get_node_id()) {
210  this->io = io;
211  num_reqs = 0;
212  assert(PAGE_SIZE % sizeof(ValueType) == 0);
213  num_pending_tasks = 0;
214  }
215 public:
216  typedef std::shared_ptr<simple_KV_store<ValueType, TaskType> > ptr;
217 
218  static ptr create(io_interface::ptr io) {
219  return ptr(new simple_KV_store<ValueType, TaskType>(io));
220  }
221 
222  void flush_requests() {
223  if (task_buf.empty())
224  return;
225  // We can't issue any I/O requests to SAFS.
226  if (io->num_pending_ios() >= io->get_max_num_pending_ios())
227  return;
228 
229  // Each time we issue a single request to serve as many user tasks
230  // as possible. Each time we issue a request to read at least one
231  // page. We'll merge requests if the pages touched by user tasks
232  // in the input array are adjacent to each other.
233 
234  sort_tasks();
235  int avail_io_slots = io->get_remaining_io_slots();
236  // The offset of the first page accessed by the I/O request.
237  const TaskType task = task_buf.front();
238  task_buf.pop_front();
239  off_t first_page_off = ROUND_PAGE(task.get_idx() * sizeof(ValueType));
240  // The offset of the last page accessed by the I/O request.
241  // The page is excluded by the I/O request.
242  off_t last_page_off = ROUNDUP_PAGE((task.get_idx()
243  + task.get_num_entries()) * sizeof(ValueType));
244  KV_compute<ValueType, TaskType> *compute
245  = (KV_compute<ValueType, TaskType> *) alloc.alloc();
246  compute->add_task(task);
247  while (!task_buf.empty()
248  // We have one more request outside of the while loop.
249  && num_reqs < avail_io_slots - 1) {
250  const TaskType task = task_buf.front();
251  task_buf.pop_front();
252  off_t page_off = ROUND_PAGE(task.get_idx() * sizeof(ValueType));
253  off_t end_page_off = ROUNDUP_PAGE((task.get_idx()
254  + task.get_num_entries()) * sizeof(ValueType));
255  // If the range overlaps.
256  if ((page_off >= first_page_off && page_off <= last_page_off)
257  || (end_page_off >= first_page_off && end_page_off <= last_page_off)) {
258  compute->add_task(task);
259  first_page_off = std::min(page_off, first_page_off);
260  last_page_off = std::max(end_page_off, last_page_off);
261  continue;
262  }
263 
264  // The user task accesses a page far away from the range covered
265  // by the current I/O request.
266  data_loc_t loc(io->get_file_id(), first_page_off);
267  io_request req(compute, loc, last_page_off - first_page_off, READ);
268  add_io_request(req);
269 
270  // Re-initialize the range covered by the new I/O request.
271  compute = (KV_compute<ValueType, TaskType> *) alloc.alloc();
272  compute->add_task(task);
273  first_page_off = ROUND_PAGE(task.get_idx() * sizeof(ValueType));
274  last_page_off = end_page_off;
275  }
276 
277  assert(compute->has_tasks());
278  data_loc_t loc(io->get_file_id(), first_page_off);
279  io_request req(compute, loc, last_page_off - first_page_off, READ);
280  add_io_request(req);
281  flush_io_requests();
282  }
283 
284  /*
285  * Serve user requests asynchronously.
286  */
287  void async_request(TaskType &task) {
288  if (task_buf.empty() || !task_buf.back().merge(task)) {
289  task_buf.push_back(task);
290  num_pending_tasks++;
291  }
292  }
293 
294  void complete_tasks(int num_tasks) {
295  num_pending_tasks -= num_tasks;
296  assert(num_pending_tasks >= 0);
297  }
298 
299  size_t get_num_pending_tasks() const {
300  return num_pending_tasks;
301  }
302 };
303 
304 }
305 
306 #endif
user_compute(compute_allocator *alloc)
Definition: io_request.h:330