FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
vertex_compute.h
1 #ifndef __VERTEX_COMPUTE_H__
2 #define __VERTEX_COMPUTE_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashGraph.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <algorithm>
24 #include <deque>
25 #include <unordered_map>
26 
27 #include "thread.h"
28 #include "slab_allocator.h"
29 
30 #include "vertex_request.h"
31 #include "scan_pointer.h"
32 #include "graph_index.h"
33 
34 namespace fg
35 {
36 
37 class worker_thread;
38 class graph_engine;
39 class compute_vertex;
40 class compute_directed_vertex;
41 
42 /*
43  * This data structure represents an active vertex that is being processed
44  * in a worker thread. It is used to handle two types of asynchronous
45  * requests: for the adjacency list and the number of edges.
46  */
47 class vertex_compute: public safs::user_compute
48 {
49  struct vertex_info_comp
50  {
51  bool operator()(const ext_mem_vertex_info &info1,
52  const ext_mem_vertex_info &info2) {
53  // We should order the vertex requests by their IDs.
54  // When a directed vertex wants to request both edges list,
55  // it results in two I/O requests and we want them to be served
56  // around the same time, so we don't need to consume a lot
57  // of resources for one of the requests.
58  // Ordering requests by vertex IDs has the same effect as vertex
59  // location on SSDs.
60  return info1.get_id() > info2.get_id();
61  }
62  };
63 
64 protected:
65  // TODO use the embedded array as the container.
66  std::priority_queue<ext_mem_vertex_info, std::vector<ext_mem_vertex_info>,
67  vertex_info_comp> requested_vertices;
68  graph_engine *graph;
69 
70  // The thread that creates the vertex compute.
71  worker_thread *issue_thread;
72  compute_vertex_pointer v;
73 
74  /*
75  * These two variables keep track of the number of completed requests
76  * for adjacency lists.
77  */
78 
79  // The number of requested vertices that will be read in the user compute.
80  size_t num_requested;
81  // The number of issued requests.
82  size_t num_issued;
83  // The number of vertices read by the user compute.
84  size_t num_complete_fetched;
85 
86  /*
87  * These two variables keep track of the number of completed requests
88  * for the number of edges.
89  */
90 
91  size_t num_edge_requests;
92  size_t num_edge_completed;
93 
94  size_t get_num_pending_ios() const {
95  assert(num_issued >= num_complete_fetched);
96  return num_issued - num_complete_fetched;
97  }
98 
99  bool issued_to_io() const {
100  // When the vertex_compute is created, it has one reference.
101  // If the vertex_compute has been issued to SAFS, its reference count
102  // should be larger than 1.
103  return get_ref() > 1 || get_num_pending_ios() > 0;
104  }
105 
106  void start_run();
107  void finish_run();
108 public:
109  vertex_compute(graph_engine *graph,
110  safs::compute_allocator *alloc): safs::user_compute(alloc) {
111  this->graph = graph;
112  issue_thread = (worker_thread *) thread::get_curr_thread();
113  num_requested = 0;
114  num_complete_fetched = 0;
115  num_issued = 0;
116  num_edge_requests = 0;
117  num_edge_completed = 0;
118  }
119 
120  void init(compute_vertex_pointer v) {
121  this->v = v;
122  }
123 
124  /*
125  * The methods below are from user_compute.
126  */
127 
128  virtual int serialize(char *buf, int size) const {
129  return 0;
130  }
131 
132  virtual int get_serialized_size() const {
133  return 0;
134  }
135 
136  virtual void set_scan_dir(bool forward) {
137  }
138 
139  virtual int has_requests() {
140  return requested_vertices.size() > 0;
141  }
142 
143  virtual safs::request_range get_next_request();
144 
145  virtual void run(safs::page_byte_array &);
146 
147  virtual bool has_completed() {
148  // If the user compute has got all requested data and it has
149  // no more requests to issue, we can consider the user compute
150  // has been completed.
151  // NOTE: it's possible that requested data may not be passed to
152  // this user compute, so we only count the requests that are going
153  // to be passed to this user compute.
154  return num_requested == num_complete_fetched && !has_requests();
155  }
156 
157  /*
158  * The methods below deal with requesting the adjacency list of vertices.
159  */
160 
161  /*
162  * The method accepts the requests from graph applications and issues
163  * the request for the adjacency lists of vertices. It has to get
164  * the location and the size of the vertices from the vertex index
165  * before issuing the real requests to SAFS.
166  */
167  virtual void request_vertices(vertex_id_t ids[], size_t num);
168 
169  /*
170  * This is a callback function. When the location and the size of
171  * a vertex is ready, the vertex index notifies the vertex compute
172  * of the information.
173  */
174  void issue_io_request(const ext_mem_vertex_info &info);
175 
176  /*
177  * The methods below deal with requesting # edges of vertices.
178  */
179 
180  /*
181  * This is a callback function. When the vertex index gets the vertex size,
182  * it notifies the vertex_compute of this information.
183  */
184  void run_on_vertex_size(vertex_id_t id, vsize_t size);
185 
186  /*
187  * This methods accepts the requests from graph applications and issues
188  * the requests to the vertex index.
189  */
190  virtual void request_num_edges(vertex_id_t ids[], size_t num);
191 
192  /*
193  * The methods are used for both cases (requesting the adjacency list
194  * and #edges).
195  */
196 
197  /*
198  * This indicates the total number of pending requests for both adjacency
199  * lists and #edges.
200  */
201  virtual size_t get_num_pending() const {
202  return (num_edge_requests - num_edge_completed)
203  + (num_requested - num_complete_fetched);
204  }
205 
206  graph_engine &get_graph() {
207  return *graph;
208  }
209 };
210 
211 class directed_vertex_compute: public vertex_compute
212 {
213  typedef std::unordered_map<vertex_id_t, safs::page_byte_array *> combine_map_t;
214  combine_map_t combine_map;
215 
216  void run_on_page_vertex(page_directed_vertex &);
217 public:
218  directed_vertex_compute(graph_engine *graph,
219  safs::compute_allocator *alloc): vertex_compute(graph, alloc) {
220  }
221 
222  virtual void set_scan_dir(bool forward) {
223  vertex_compute::set_scan_dir(forward);
224  }
225 
226  virtual void run(safs::page_byte_array &);
227 
228  /*
229  * These two methods accept the requests from graph applications and issue
230  * the request for the adjacency lists of vertices. It has to get
231  * the location and the size of the vertices from the vertex index
232  * before issuing the real requests to SAFS.
233  * `request_vertices' requests both edge lists of vertices.
234  * `request_partial_vertices' requests one type of edge lists of vertices.
235  */
236  virtual void request_vertices(vertex_id_t ids[], size_t num);
237  void request_partial_vertices(directed_vertex_request reqs[], size_t num);
238 
239  /*
240  * This is a callback function. When the vertex index gets the vertex size,
241  * it notifies the vertex_compute of this information.
242  */
243  void run_on_vertex_size(vertex_id_t id, size_t in_size, size_t out_size);
244 
245  using vertex_compute::issue_io_request;
246  void issue_io_request(const ext_mem_vertex_info &in_info,
247  const ext_mem_vertex_info &out_info);
248 
249  /*
250  * This methods accepts the requests from graph applications and issues
251  * the requests to the vertex index.
252  */
253  void request_num_edges(vertex_id_t ids[], size_t num);
254 };
255 
256 /*
257  * This class is to compute on the undirected vertices requested
258  * by a single I/O request.
259  */
260 class merged_vertex_compute: public safs::user_compute
261 {
262  vertex_id_t start_id;
263  int num_vertices;
264  graph_engine *graph;
265 protected:
266  worker_thread *issue_thread;
267 
268  void start_run(compute_vertex_pointer v);
269  void finish_run(compute_vertex_pointer v);
270 public:
271  merged_vertex_compute(graph_engine *graph,
272  safs::compute_allocator *alloc): safs::user_compute(alloc) {
273  this->graph = graph;
274  start_id = INVALID_VERTEX_ID;
275  num_vertices = 0;
276  issue_thread = (worker_thread *) thread::get_curr_thread();
277  }
278 
279  graph_engine &get_graph() {
280  return *graph;
281  }
282 
283  vertex_id_t get_start_id() const {
284  return start_id;
285  }
286 
287  int get_num_vertices() const {
288  return num_vertices;
289  }
290 
291  virtual void init(vertex_id_t start_id, int num_vertices, edge_type type) {
292  this->start_id = start_id;
293  this->num_vertices = num_vertices;
294  }
295 
296  virtual int serialize(char *buf, int size) const {
297  return 0;
298  }
299 
300  virtual int get_serialized_size() const {
301  return 0;
302  }
303 
304  virtual void set_scan_dir(bool forward) {
305  }
306 
307  virtual int has_requests() {
308  return false;
309  }
310 
311  virtual safs::request_range get_next_request() {
312  ABORT_MSG("get_next_request isn't supported");
313  }
314 
315  virtual void run(safs::page_byte_array &arr) = 0;
316  virtual bool has_completed() = 0;
317 };
318 
319 class merged_undirected_vertex_compute: public merged_vertex_compute
320 {
321  bool complete;
322 public:
323  merged_undirected_vertex_compute(graph_engine *graph,
324  safs::compute_allocator *alloc): merged_vertex_compute(graph, alloc) {
325  complete = false;
326  }
327 
328  virtual void run(safs::page_byte_array &arr);
329 
330  virtual bool has_completed() {
331  return complete;
332  }
333 };
334 
335 /*
336  * This class is to compute on the directed vertices requested
337  * by a single I/O request.
338  */
339 class merged_directed_vertex_compute: public merged_vertex_compute
340 {
341  edge_type type;
342  int num_fetched_arrs;
343  int num_required_arrs;
344  safs::page_byte_array *buffered_arr;
345 
346  void run_on_array(safs::page_byte_array &arr);
347  void run_on_arrays(safs::page_byte_array &in_arr, safs::page_byte_array &out_arr);
348 public:
349  merged_directed_vertex_compute(graph_engine *graph,
350  safs::compute_allocator *alloc): merged_vertex_compute(graph, alloc) {
351  type = edge_type::NONE;
352  num_fetched_arrs = 0;
353  num_required_arrs = 0;
354  buffered_arr = NULL;
355  }
356 
357  void init(vertex_id_t start_id, int num_vertices, edge_type type) {
358  merged_vertex_compute::init(start_id, num_vertices, type);
359  this->type = type;
360  this->num_fetched_arrs = 0;
361  switch(type) {
362  case IN_EDGE:
363  case OUT_EDGE:
364  this->num_required_arrs = 1;
365  break;
366  case BOTH_EDGES:
367  this->num_required_arrs = 2;
368  break;
369  default:
370  assert(0);
371  }
372  }
373 
374  virtual void run(safs::page_byte_array &arr);
375 
376  virtual bool has_completed() {
377  return num_fetched_arrs == num_required_arrs;
378  }
379 };
380 
381 /*
382  * This class is to compute on the undirected vertices that are stored closely
383  * on the disks and are read by a single I/O request.
384  */
385 class sparse_vertex_compute: public safs::user_compute
386 {
387 protected:
388  struct vertex_range_t {
389  std::pair<vertex_id_t, vertex_id_t> id_range;
390  off_t start_off;
391  };
392  embedded_array<vertex_range_t> ranges;
393  int num_vertices;
394  int num_ranges;
395  bool complete;
396  graph_engine *graph;
397  worker_thread *issue_thread;
398 
399  void start_run(compute_vertex_pointer v);
400  void finish_run(compute_vertex_pointer v);
401 public:
402  sparse_vertex_compute(graph_engine *graph,
403  safs::compute_allocator *alloc): safs::user_compute(alloc) {
404  this->graph = graph;
405  num_ranges = 0;
406  num_vertices = 0;
407  complete = false;
408  issue_thread = (worker_thread *) thread::get_curr_thread();
409  }
410 
411  graph_engine &get_graph() {
412  return *graph;
413  }
414 
415  int get_num_ranges() const {
416  return num_ranges;
417  }
418 
419  int get_num_vertices() const {
420  return num_vertices;
421  }
422 
423  vertex_id_t get_first_vertex() const {
424  return ranges[0].id_range.first;
425  }
426 
427  virtual void init(const std::pair<vertex_id_t, vertex_id_t> &range,
428  const std::pair<off_t, off_t> off_ranges[],
429  edge_type type) {
430  ranges[0].id_range = range;
431  ranges[0].start_off = off_ranges[0].first;
432  num_ranges = 1;
433  num_vertices = range.second - range.first;
434  complete = false;
435  }
436 
437  virtual bool add_range(const std::pair<vertex_id_t, vertex_id_t> &range,
438  const std::pair<off_t, off_t> off_ranges[]) {
439  if (num_ranges >= ranges.get_capacity())
440  return false;
441  else {
442  ranges[num_ranges].id_range = range;
443  ranges[num_ranges].start_off = off_ranges[0].first;
444  num_ranges++;
445  num_vertices += (range.second - range.first);
446  return true;
447  }
448  }
449 
450  virtual int serialize(char *buf, int size) const {
451  return 0;
452  }
453 
454  virtual int get_serialized_size() const {
455  return 0;
456  }
457 
458  virtual void set_scan_dir(bool forward) {
459  }
460 
461  virtual int has_requests() {
462  return false;
463  }
464 
465  virtual safs::request_range get_next_request() {
466  ABORT_MSG("get_next_request isn't supported");
467  }
468 
469  virtual void run(safs::page_byte_array &arr) = 0;
470 
471  virtual bool has_completed() {
472  return complete;
473  }
474 };
475 
476 class sparse_undirected_vertex_compute: public sparse_vertex_compute
477 {
478 public:
479  sparse_undirected_vertex_compute(graph_engine *graph,
480  safs::compute_allocator *alloc): sparse_vertex_compute(graph, alloc) {
481  }
482 
483  virtual void run(safs::page_byte_array &arr);
484 };
485 
486 class sparse_directed_vertex_compute: public sparse_vertex_compute
487 {
488  edge_type type;
489  std::vector<off_t> out_start_offs;
490  safs::page_byte_array *buffered_arr;
491 
492  void run_on_array(safs::page_byte_array &arr);
493  void run_on_arrays(safs::page_byte_array &in_arr, safs::page_byte_array &out_arr);
494 public:
495  sparse_directed_vertex_compute(graph_engine *graph,
496  safs::compute_allocator *alloc): sparse_vertex_compute(graph, alloc) {
497  type = edge_type::NONE;
498  buffered_arr = NULL;
499  }
500 
501  virtual void init(const std::pair<vertex_id_t, vertex_id_t> &range,
502  const std::pair<off_t, off_t> off_ranges[],
503  edge_type type) {
504  this->type = type;
505  sparse_vertex_compute::init(range, off_ranges, type);
506  if (type == BOTH_EDGES) {
507  out_start_offs.clear();
508  out_start_offs.push_back(off_ranges[1].first);
509  }
510  }
511 
512  virtual bool add_range(const std::pair<vertex_id_t, vertex_id_t> &range,
513  const std::pair<off_t, off_t> off_ranges[]) {
514  bool ret = sparse_vertex_compute::add_range(range, off_ranges);
515  if (ret && type == BOTH_EDGES)
516  out_start_offs.push_back(off_ranges[1].first);
517  return ret;
518  }
519 
520  virtual void run(safs::page_byte_array &arr);
521 };
522 
523 template<class compute_type>
524 class vertex_compute_allocator: public safs::compute_allocator
525 {
526  class compute_initiator: public obj_initiator<compute_type>
527  {
528  graph_engine *graph;
529  vertex_compute_allocator<compute_type> *alloc;
530  public:
531  compute_initiator(graph_engine *graph,
532  vertex_compute_allocator<compute_type> *alloc) {
533  this->graph = graph;
534  this->alloc = alloc;
535  }
536 
537  virtual void init(compute_type *obj) {
538  new (obj) compute_type(graph, alloc);
539  }
540  };
541 
542  class compute_destructor: public obj_destructor<compute_type>
543  {
544  public:
545  void destroy(compute_type *obj) {
546  obj->~compute_type();
547  }
548  };
549 
550  obj_allocator<compute_type> allocator;
551 public:
552  vertex_compute_allocator(graph_engine *graph, thread *t): allocator(
553  "vertex-compute-allocator", t->get_node_id(), false, 1024 * 1024,
554  safs::params.get_max_obj_alloc_size(),
555  typename obj_initiator<compute_type>::ptr(new compute_initiator(graph, this)),
556  typename obj_destructor<compute_type>::ptr(new compute_destructor())) {
557  }
558 
559  virtual safs::user_compute *alloc() {
560  return allocator.alloc_obj();
561  }
562 
563  virtual void free(safs::user_compute *obj) {
564  allocator.free((compute_type *) obj);
565  }
566 };
567 
568 }
569 
570 #endif
unsigned int vsize_t
Basic data types used in FlashGraph.
Definition: FG_basic_types.h:33
Definition: cache.h:550
edge_type
Edge type of an edge in the graph.
Definition: vertex.h:43
Definition: io_request.h:231
Definition: io_request.h:316
virtual void free(user_compute *compute)=0
Definition: io_request.h:454
Definition: vertex.h:45
Definition: vertex.h:47
int get_ref() const
Definition: io_request.h:446
Definition: vertex.h:46
user_compute(compute_allocator *alloc)
Definition: io_request.h:330