FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
graph_index.h
1 #ifndef __GRAPH_INDEX_H__
2 #define __GRAPH_INDEX_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashGraph.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <algorithm>
24 #include <boost/foreach.hpp>
25 #include <boost/format.hpp>
26 
27 #include "thread.h"
28 
29 #include "log.h"
30 #include "vertex.h"
31 #include "partitioner.h"
32 #include "vertex_program.h"
33 #include "graph_file_header.h"
34 #include "vertex_pointer.h"
35 
36 namespace fg
37 {
38 
39 class compute_vertex;
40 class part_compute_vertex;
41 
42 /*
43  * A pointer to the vertically partitioned vertices in the graph index.
44  */
45 class vpart_vertex_pointer
46 {
47  vertex_id_t id;
48  vsize_t off;
49 public:
50  vpart_vertex_pointer() {
51  id = INVALID_VERTEX_ID;
52  off = -1;
53  }
54 
55  vpart_vertex_pointer(vertex_id_t id, vsize_t off) {
56  this->id = id;
57  this->off = off;
58  }
59 
60  vertex_id_t get_vertex_id() const {
61  return id;
62  }
63 
64  vsize_t get_off() const {
65  return off;
66  }
67 };
68 
77 {
78  graph_index(const graph_index &);
79  graph_index &operator=(const graph_index &);
80 public:
81  typedef std::shared_ptr<graph_index> ptr;
84  }
85 
86  virtual ~graph_index() {
87  }
88 
89  virtual void init(int num_threads, int num_nodes) {
90  }
91  virtual void init_vparts(int hpart_id, int num_vparts,
92  std::vector<vertex_id_t> &ids) = 0;
93 
94  virtual size_t get_vertices(const vertex_id_t ids[], int num,
95  compute_vertex *v_buf[]) const = 0;
96  virtual size_t get_vertices(int part_id, const local_vid_t ids[], int num,
97  compute_vertex *v_buf[]) const = 0;
98  virtual compute_vertex &get_vertex(vertex_id_t id) = 0;
99  virtual compute_vertex &get_vertex(int part_id, local_vid_t id) = 0;
100 
101  /*
102  * The interface of getting vertically partitioned vertices.
103  */
104  virtual size_t get_num_vpart_vertices(int hpart_id) const = 0;
105  virtual size_t get_vpart_vertex_pointers(int hpart_id,
106  vpart_vertex_pointer ps[], int num) const = 0;
107  virtual size_t get_vpart_vertices(int hpart_id, int vpart_id,
108  vpart_vertex_pointer ps[], int num,
109  compute_vertex_pointer vertices[]) const = 0;
110  virtual size_t get_vpart_vertices(vertex_id_t id,
111  compute_vertex_pointer vertices[], int num) const = 0;
112 
113  virtual vertex_id_t get_max_vertex_id() const = 0;
114 
115  virtual vertex_id_t get_min_vertex_id() const = 0;
116 
117  virtual size_t get_num_vertices() const = 0;
118 
119  virtual const graph_partitioner &get_partitioner() const = 0;
120 
121  virtual vertex_program::ptr create_def_vertex_program() const = 0;
122  virtual vertex_program::ptr create_def_part_vertex_program() const = 0;
123 
124  virtual local_vid_t get_local_id(int part_id, const compute_vertex &v) const = 0;
125  virtual vertex_id_t get_vertex_id(int part_id, const compute_vertex &v) const = 0;
126  virtual vertex_id_t get_vertex_id(int part_id, compute_vertex_pointer v) const = 0;
127  virtual vertex_id_t get_vertex_id(const compute_vertex &v) const = 0;
128  virtual bool belong2part(const compute_vertex &v, int part_id) const = 0;
129 };
130 
131 template<class vertex_type, class part_vertex_type>
132 class NUMA_graph_index;
133 
134 template<class vertex_type, class part_vertex_type>
135 class graph_local_partition
136 {
137  typedef std::pair<size_t, part_vertex_type *> part_vertex_array;
138 
139  struct vpart_vertex_comp
140  {
141  bool operator()(const part_vertex_type &v1,
142  const vertex_id_t v2) const {
143  return v1.get_id() < v2;
144  }
145  };
146 
147  int node_id;
148  int part_id;
149  size_t tot_num_vertices;
150  size_t num_vertices;
151  vertex_type *vertex_arr;
152  std::vector<part_vertex_array> part_vertex_arrs;
153  const graph_partitioner &partitioner;
154 
155  graph_local_partition(const graph_partitioner &_partitioner, int part_id,
156  int node_id, size_t tot_num_vertices): partitioner(_partitioner) {
157  this->part_id = part_id;
158  this->tot_num_vertices = tot_num_vertices;
159  this->num_vertices = partitioner.get_part_size(part_id,
160  tot_num_vertices);
161  this->node_id = node_id;
162  vertex_arr = NULL;
163  }
164 public:
165  ~graph_local_partition() {
166  if (vertex_arr)
167  free_large(vertex_arr, sizeof(vertex_arr[0]) * num_vertices);
168 
169  BOOST_FOREACH(part_vertex_array arr, part_vertex_arrs)
170  free_large(arr.second, sizeof(arr.second[0]) * arr.first);
171  }
172 
173  void init() {
174  if (num_vertices == 0)
175  return;
176 
177  vertex_arr = (vertex_type *) malloc_large(
178  sizeof(vertex_arr[0]) * num_vertices);
179  assert(vertex_arr);
180  std::vector<vertex_id_t> local_ids;
181  local_ids.reserve(num_vertices);
182  partitioner.get_all_vertices_in_part(this->part_id, tot_num_vertices,
183  local_ids);
184  assert(local_ids.size() == num_vertices);
185  BOOST_FOREACH(vertex_id_t vid, local_ids) {
186  int part_id;
187  off_t part_off;
188  partitioner.map2loc(vid, part_id, part_off);
189  assert(this->part_id == part_id);
190  new (vertex_arr + part_off) vertex_type(vid);
191  }
192  if (local_ids.size() < num_vertices) {
193  assert(local_ids.size() == num_vertices - 1);
194  new (vertex_arr + local_ids.size()) vertex_type(INVALID_VERTEX_ID);
195  }
196  }
197 
198  /*
199  * This method is to initialize the vertical partitions inside
200  * the horizontal partition.
201  */
202  void init_vparts(int num_parts, std::vector<vertex_id_t> &ids) {
203  if (ids.empty())
204  return;
205 
206  assert(std::is_sorted(ids.begin(), ids.end()));
207  assert(num_parts > 1);
208  part_vertex_arrs.resize(num_parts);
209  for (int i = 0; i < num_parts; i++) {
210  part_vertex_arrs[i].first = ids.size();
211  part_vertex_arrs[i].second = (part_vertex_type *) malloc_large(
212  sizeof(part_vertex_arrs[i].second[0]) * ids.size());
213  }
214  for (size_t i = 0; i < ids.size(); i++) {
215  vertex_id_t id = ids[i];
216  // horizontal part id.
217  BOOST_VERIFY(this->part_id == partitioner.map(id));
218  // vertical parts.
219  for (int vpart_id = 0; vpart_id < num_parts; vpart_id++)
220  new (part_vertex_arrs[vpart_id].second + i) part_vertex_type(id, vpart_id);
221  }
222  }
223 
224  local_vid_t get_local_id(const compute_vertex &v) const {
225  vertex_type *addr1 = (vertex_type *) &v;
226  // If the compute vertex is a main vertex in this partition.
227  if (vertex_arr <= addr1 && addr1 < vertex_arr + num_vertices)
228  return local_vid_t(addr1 - vertex_arr);
229  else
230  return local_vid_t(INVALID_VERTEX_ID);
231  }
232 
233  vertex_id_t get_vertex_id(const compute_vertex &v) const {
234  vertex_type *addr1 = (vertex_type *) &v;
235  // If the compute vertex is a main vertex in this partition.
236  if (vertex_arr <= addr1 && addr1 < vertex_arr + num_vertices) {
237  vertex_id_t local_id = addr1 - vertex_arr;
238  vertex_id_t id;
239  partitioner.loc2map(part_id, local_id, id);
240  return id;
241  }
242  else {
243  // If not, is it one of the vertically partitioned vertices?
244  part_vertex_type *addr2 = (part_vertex_type *) &v;
245  for (size_t i = 0; i < part_vertex_arrs.size(); i++) {
246  part_vertex_type *arr_start = part_vertex_arrs[i].second;
247  part_vertex_type *arr_end
248  = part_vertex_arrs[i].second + part_vertex_arrs[i].first;
249  if (arr_start <= addr2 && addr2 < arr_end)
250  return addr2->get_id();
251  }
252  // If not, the vertex doesn't belong to this partition.
253  return INVALID_VERTEX_ID;
254  }
255  }
256 
257  compute_vertex &get_vertex(vertex_id_t id) {
258  assert(vertex_arr);
259  return vertex_arr[id];
260  }
261 
262  size_t get_num_vpart_vertices() const {
263  if (part_vertex_arrs.empty())
264  return 0;
265  else
266  return part_vertex_arrs[0].first;
267  }
268 
269  size_t get_vpart_vertex_pointers(vpart_vertex_pointer ps[],
270  int num) const {
271  int act_num = min(num, get_num_vpart_vertices());
272  for (int i = 0; i < act_num; i++)
273  ps[i] = vpart_vertex_pointer(part_vertex_arrs[0].second[i].get_id(), i);
274  return act_num;
275  }
276 
277  size_t get_vpart_vertices(int vpart_id, vpart_vertex_pointer ps[], int num,
278  compute_vertex_pointer vertices[]) const {
279  assert((size_t) vpart_id < part_vertex_arrs.size());
280  int act_num = min(num, get_num_vpart_vertices());
281  for (int i = 0; i < act_num; i++)
282  vertices[i] = compute_vertex_pointer(
283  &part_vertex_arrs[vpart_id].second[ps[i].get_off()], true);
284  return act_num;
285  }
286 
287  size_t get_vpart_vertices(vertex_id_t id,
288  compute_vertex_pointer vertices[], int num) const {
289  assert(!part_vertex_arrs.empty());
290  // All vpart vertices are sorted on vertex Id.
291  part_vertex_type *entry = std::lower_bound(part_vertex_arrs[0].second,
292  part_vertex_arrs[0].second + part_vertex_arrs[0].first, id,
293  vpart_vertex_comp());
294  if (entry == part_vertex_arrs[0].second + part_vertex_arrs[0].first)
295  return 0;
296  off_t off = entry - part_vertex_arrs[0].second;
297  int act_num = std::min((size_t) num, part_vertex_arrs.size());
298  for (int i = 0; i < act_num; i++)
299  vertices[i] = compute_vertex_pointer(
300  part_vertex_arrs[i].second + off, true);
301  return act_num;
302  }
303 
304  size_t get_num_vertices() const {
305  return num_vertices;
306  }
307 
308  int get_node_id() const {
309  return node_id;
310  }
311 
312  friend class NUMA_graph_index<vertex_type, part_vertex_type>;
313 };
314 
315 template<class vertex_type, class part_vertex_type>
316 class NUMA_graph_index: public graph_index
317 {
318  graph_header header;
319  vertex_id_t max_vertex_id;
320  vertex_id_t min_vertex_id;
321  std::unique_ptr<range_graph_partitioner> partitioner;
322  // A graph index per thread
323  std::vector<std::unique_ptr<graph_local_partition<vertex_type, part_vertex_type> > > index_arr;
324 
325  class init_thread: public thread
326  {
327  graph_local_partition<vertex_type, part_vertex_type> *index;
328  public:
329  init_thread(graph_local_partition<vertex_type, part_vertex_type> *index): thread(
330  "index-init-thread", index->get_node_id()) {
331  this->index = index;
332  }
333 
334  virtual void run() {
335  index->init();
336  this->stop();
337  }
338  };
339 public:
340  static graph_index::ptr create(const graph_header &header) {
341  NUMA_graph_index<vertex_type, part_vertex_type> *index
342  = new NUMA_graph_index<vertex_type, part_vertex_type>();
343  index->header = header;
344  return graph_index::ptr(index);
345  }
346 
347  void init(int num_threads, int num_nodes) {
348  partitioner = std::unique_ptr<range_graph_partitioner>(
349  new range_graph_partitioner(num_threads));
350 
351  // Construct the indices.
352  for (int i = 0; i < num_threads; i++) {
353  index_arr.emplace_back(new graph_local_partition<vertex_type, part_vertex_type>(
354  // The partitions are assigned to worker threads.
355  // The memory used to store the partitions should
356  // be on the same NUMA as the worker threads.
357  *partitioner, i, i % num_nodes,
358  header.get_num_vertices()));
359  }
360 
361  std::vector<init_thread *> threads(num_threads);
362  for (int i = 0; i < num_threads; i++) {
363  threads[i] = new init_thread(index_arr[i].get());
364  threads[i]->start();
365  }
366  for (int i = 0; i < num_threads; i++) {
367  threads[i]->join();
368  delete threads[i];
369  }
370 
371  min_vertex_id = 0;
372  max_vertex_id = header.get_num_vertices() - 1;
373 
374  BOOST_LOG_TRIVIAL(info) << boost::format("There are %1% vertices")
375  % header.get_num_vertices();
376  }
377 
378  virtual void init_vparts(int hpart_id, int num_vparts,
379  std::vector<vertex_id_t> &ids) {
380  index_arr[hpart_id]->init_vparts(num_vparts, ids);
381  }
382 
383  virtual size_t get_vertices(const vertex_id_t ids[], int num,
384  compute_vertex *v_buf[]) const {
385  for (int i = 0; i < num; i++) {
386  vertex_id_t id = ids[i];
387  int part_id;
388  off_t part_off;
389  partitioner->map2loc(id, part_id, part_off);
390  v_buf[i] = &index_arr[part_id]->get_vertex(part_off);
391  }
392  return num;
393  }
394 
395  virtual compute_vertex &get_vertex(vertex_id_t id) {
396  int part_id;
397  off_t part_off;
398  partitioner->map2loc(id, part_id, part_off);
399  return index_arr[part_id]->get_vertex(part_off);
400  }
401 
402  virtual size_t get_vertices(int part_id, const local_vid_t ids[], int num,
403  compute_vertex *v_buf[]) const {
404  for (int i = 0; i < num; i++)
405  v_buf[i] = &index_arr[part_id]->get_vertex(ids[i].id);
406  return num;
407  }
408 
409  virtual compute_vertex &get_vertex(int part_id, local_vid_t id) {
410  return index_arr[part_id]->get_vertex(id.id);
411  }
412 
413  virtual size_t get_num_vpart_vertices(int hpart_id) const {
414  return index_arr[hpart_id]->get_num_vpart_vertices();
415  }
416 
417  virtual size_t get_vpart_vertex_pointers(int hpart_id,
418  vpart_vertex_pointer ps[], int num) const {
419  return index_arr[hpart_id]->get_vpart_vertex_pointers(ps, num);
420  }
421 
422  virtual size_t get_vpart_vertices(int hpart_id, int vpart_id,
423  vpart_vertex_pointer ps[], int num,
424  compute_vertex_pointer vertices[]) const {
425  return index_arr[hpart_id]->get_vpart_vertices(vpart_id, ps, num,
426  vertices);
427  }
428 
429  virtual size_t get_vpart_vertices(vertex_id_t id,
430  compute_vertex_pointer vertices[], int num) const {
431  int part_id = partitioner->map(id);
432  return index_arr[part_id]->get_vpart_vertices(id, vertices, num);
433  }
434 
435  virtual vertex_id_t get_max_vertex_id() const {
436  return max_vertex_id;
437  }
438 
439  virtual vertex_id_t get_min_vertex_id() const {
440  return min_vertex_id;
441  }
442 
443  virtual size_t get_num_vertices() const {
444  return header.get_num_vertices();
445  }
446 
447  virtual const graph_partitioner &get_partitioner() const {
448  return *partitioner;
449  }
450 
451  virtual vertex_program::ptr create_def_vertex_program(
452  ) const {
453  return vertex_program::ptr(new vertex_program_impl<vertex_type>());
454  }
455 
456  virtual vertex_program::ptr create_def_part_vertex_program(
457  ) const {
458  return vertex_program::ptr(new vertex_program_impl<part_vertex_type>());
459  }
460 
461  virtual vertex_id_t get_vertex_id(const compute_vertex &v) const {
462  for (size_t i = 0; i < index_arr.size(); i++) {
463  vertex_id_t id = index_arr[i]->get_vertex_id(v);
464  if (id != INVALID_VERTEX_ID)
465  return id;
466  }
467  ABORT_MSG("can't find vertex ID");
468  }
469 
470  virtual vertex_id_t get_vertex_id(int part_id, compute_vertex_pointer v) const {
471  if (v.is_part())
472  return ((part_vertex_type &) *v).get_id();
473  else {
474  local_vid_t local_id = index_arr[part_id]->get_local_id(*v);
475  vertex_id_t id;
476  partitioner->loc2map(part_id, local_id.id, id);
477  return id;
478  }
479  }
480 
481  virtual vertex_id_t get_vertex_id(int part_id, const compute_vertex &v) const {
482  return index_arr[part_id]->get_vertex_id(v);
483  }
484 
485  virtual local_vid_t get_local_id(int part_id, const compute_vertex &v) const {
486  return index_arr[part_id]->get_local_id(v);
487  }
488 
489  virtual bool belong2part(const compute_vertex &v, int part_id) const {
490  // TODO there might be a more light-weight implementation.
491  return get_vertex_id(part_id, v) != INVALID_VERTEX_ID;
492  }
493 };
494 
495 #if 0
496 template<class vertex_type>
497 class graph_index_impl: public graph_index
498 {
499  // This contains the vertices with edges.
500  std::vector<vertex_type> vertices;
501  size_t min_vertex_size;
502 
503  graph_index_impl(const std::string &index_file) {
504  vertex_index *index = load_vertex_index(index_file);
505  this->min_vertex_size = get_min_ext_mem_vertex_size(
506  index->get_graph_header().get_graph_type());
507  size_t num_vertices = index->get_num_vertices();
508  size_t num_non_empty = 0;
509  vertices.resize(num_vertices);
510  for (size_t i = 0; i < num_vertices; i++) {
511  vertices[i] = vertex_type(i, index);
512  if (vertices[i].get_ext_mem_size() > min_vertex_size) {
513  num_non_empty++;
514  }
515  }
516  printf("There are %ld vertices and %ld non-empty, vertex array capacity: %ld\n",
517  num_vertices, num_non_empty, vertices.capacity());
518  vertex_index::destroy(index);
519  }
520 public:
521  static graph_index *create(const std::string &index_file) {
522  return new graph_index_impl<vertex_type>(index_file);
523  }
524 
525  virtual compute_vertex &get_vertex(vertex_id_t id) {
526  return vertices[id];
527  }
528 
529  virtual size_t get_num_vertices() const {
530  return vertices.size();
531  }
532 
533  virtual vertex_id_t get_max_vertex_id() const {
534  return vertices.back().get_id();
535  }
536 
537  virtual vertex_id_t get_min_vertex_id() const {
538  return vertices.front().get_id();
539  }
540 };
541 #endif
542 
543 }
544 
545 #endif
graph_index()
Definition: graph_index.h:83
unsigned int vsize_t
Basic data types used in FlashGraph.
Definition: FG_basic_types.h:33
This file contains a set of graph index implementation. The graph index is the in-memory container to...
Definition: graph_index.h:76