FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
vertex_index_reader.h
1 #ifndef __VERTEX_INDEX_READER__
2 #define __VERTEX_INDEX_READER__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashGraph.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include "FG_basic_types.h"
24 #include "vertex.h"
25 #include "vertex_index.h"
26 #include "vertex_request.h"
27 
28 namespace safs
29 {
30  class io_interface;
31 }
32 
33 namespace fg
34 {
35 
36 class directed_vertex_request;
37 class index_comp_allocator;
38 
39 // Represent the vertices in [first, last).
40 typedef std::pair<vertex_id_t, vertex_id_t> id_range_t;
41 
42 /*
43  * This iterates on the entries of the vertex index.
44  * This can iterate on both undirected and directed vertex index.
45  */
46 class index_iterator
47 {
48  static const int BUF_SIZE = sizeof(directed_vertex_entry);
49 protected:
50  char curr_buf[BUF_SIZE];
51  char next_buf[BUF_SIZE];
52  bool _has_next;
53 public:
54  bool has_next() const {
55  return _has_next;
56  }
57 
58  virtual void move_next() = 0;
59  virtual bool move_to(off_t idx) = 0;
60  virtual int get_num_vertices() const = 0;
61 
62  off_t get_curr_off() const {
63  const vertex_offset *v_off
64  = reinterpret_cast<const vertex_offset *>(curr_buf);
65  return v_off->get_off();
66  }
67 
68  vsize_t get_curr_size() const {
69  const vertex_offset *v_next_off
70  = reinterpret_cast<const vertex_offset *>(next_buf);
71  const vertex_offset *v_curr_off
72  = reinterpret_cast<const vertex_offset *>(curr_buf);
73  return v_next_off->get_off() - v_curr_off->get_off();
74  }
75 
76  off_t get_curr_out_off() const {
77  const directed_vertex_entry *v_entry
78  = reinterpret_cast<const directed_vertex_entry *>(curr_buf);
79  return v_entry->get_out_off();
80  }
81 
82  vsize_t get_curr_out_size() const {
83  const directed_vertex_entry *v_next_entry
84  = reinterpret_cast<const directed_vertex_entry *>(next_buf);
85  const directed_vertex_entry *v_curr_entry
86  = reinterpret_cast<const directed_vertex_entry *>(curr_buf);
87  return v_next_entry->get_out_off() - v_curr_entry->get_out_off();
88  }
89 };
90 
91 template<class EntryType>
92 class page_index_iterator_impl: public index_iterator
93 {
95  int num_entries;
96 public:
97  page_index_iterator_impl(safs::page_byte_array::seq_const_iterator<EntryType> &_it): it(_it) {
98  num_entries = it.get_num_tot_entries();
99  assert(num_entries >= 2);
100  assert(it.has_next());
101  EntryType *v_next_entry = reinterpret_cast<EntryType *>(next_buf);
102  EntryType *v_curr_entry = reinterpret_cast<EntryType *>(curr_buf);
103  *v_curr_entry = it.next();
104  assert(it.has_next());
105  *v_next_entry = it.next();
106  _has_next = true;
107  }
108 
109  virtual void move_next() {
110  _has_next = it.has_next();
111  EntryType *v_next_entry = reinterpret_cast<EntryType *>(next_buf);
112  EntryType *v_curr_entry = reinterpret_cast<EntryType *>(curr_buf);
113  if (_has_next) {
114  *v_curr_entry = *v_next_entry;
115  *v_next_entry = it.next();
116  }
117  }
118 
119  virtual bool move_to(off_t idx) {
120  bool ret = it.move_to(idx);
121  if (!ret) {
122  _has_next = false;
123  return false;
124  }
125  EntryType *v_next_entry = reinterpret_cast<EntryType *>(next_buf);
126  EntryType *v_curr_entry = reinterpret_cast<EntryType *>(curr_buf);
127  *v_curr_entry = it.next();
128  if (it.has_next()) {
129  _has_next = true;
130  *v_next_entry = it.next();
131  }
132  else
133  _has_next = false;
134  return _has_next;
135  }
136 
137  virtual int get_num_vertices() const {
138  return num_entries - 1;
139  }
140 };
141 
142 template<class EntryType>
143 class array_index_iterator_impl: public index_iterator
144 {
145  EntryType *start;
146  EntryType *p;
147  EntryType *end;
148 public:
149  array_index_iterator_impl(EntryType *start, EntryType *end) {
150  this->start = start;
151  this->p = start;
152  this->end = end;
153  assert(end - p >= 2);
154  EntryType *v_next_entry = reinterpret_cast<EntryType *>(next_buf);
155  EntryType *v_curr_entry = reinterpret_cast<EntryType *>(curr_buf);
156  *v_curr_entry = *p;
157  p++;
158  *v_next_entry = *p;
159  _has_next = true;
160  }
161 
162  virtual void move_next() {
163  EntryType *v_next_entry = reinterpret_cast<EntryType *>(next_buf);
164  EntryType *v_curr_entry = reinterpret_cast<EntryType *>(curr_buf);
165  *v_curr_entry = *v_next_entry;
166  p++;
167  _has_next = p < end;
168  if (_has_next)
169  *v_next_entry = *p;
170  }
171 
172  virtual bool move_to(off_t idx) {
173  EntryType *v_next_entry = reinterpret_cast<EntryType *>(next_buf);
174  EntryType *v_curr_entry = reinterpret_cast<EntryType *>(curr_buf);
175  p = start + idx;
176  if (p + 1 < end) {
177  *v_curr_entry = *p;
178  *v_next_entry = *(p + 1);
179  _has_next = true;
180  }
181  else
182  _has_next = false;
183  return _has_next;
184  }
185 
186  virtual int get_num_vertices() const {
187  return end - start - 1;
188  }
189 };
190 
191 class compressed_undirected_index_iterator: public index_iterator
192 {
193  size_t begin;
194  size_t idx;
195  size_t end;
196  const in_mem_cundirected_vertex_index &index;
197 public:
198  compressed_undirected_index_iterator(const in_mem_cundirected_vertex_index &_index,
199  const id_range_t &range): index(_index) {
200  vertex_offset first_entry = index.get_vertex(range.first);
201  new (curr_buf) vertex_offset(first_entry);
202  size_t size = index.get_size(range.first);
203  new (next_buf) vertex_offset(first_entry.get_off() + size);
204  begin = idx = range.first;
205  end = range.second;
206  _has_next = true;
207  }
208 
209  virtual void move_next() {
210  const vertex_offset *v_next_off
211  = reinterpret_cast<const vertex_offset *>(next_buf);
212  vertex_offset e = *v_next_off;
213  new (curr_buf) vertex_offset(e);
214  idx++;
215  _has_next = (idx < end);
216  if (_has_next) {
217  size_t size = index.get_size(idx);
218  new (next_buf) vertex_offset(e.get_off() + size);
219  }
220  }
221 
222  virtual bool move_to(off_t rel_idx) {
223  this->idx = begin + rel_idx;
224  if ((size_t) idx < end) {
225  vertex_offset e = index.get_vertex(idx);
226  new (curr_buf) vertex_offset(e);
227  size_t size = index.get_size(idx);
228  new (next_buf) vertex_offset(e.get_off() + size);
229  _has_next = true;
230  }
231  else
232  _has_next = false;
233  return _has_next;
234  }
235 
236  virtual int get_num_vertices() const {
237  return end - idx;
238  }
239 };
240 
241 class compressed_directed_index_iterator: public index_iterator
242 {
243  size_t begin;
244  size_t idx;
245  size_t end;
246  const in_mem_cdirected_vertex_index &index;
247 public:
248  compressed_directed_index_iterator(const in_mem_cdirected_vertex_index &_index,
249  const id_range_t &range): index(_index) {
250  directed_vertex_entry first_entry = index.get_vertex(range.first);
251  new (curr_buf) directed_vertex_entry(first_entry);
252  size_t in_size = index.get_in_size(range.first);
253  size_t out_size = index.get_out_size(range.first);
254  new (next_buf) directed_vertex_entry(first_entry.get_in_off() + in_size,
255  first_entry.get_out_off() + out_size);
256  begin = idx = range.first;
257  end = range.second;
258  _has_next = true;
259  }
260 
261  virtual void move_next() {
262  const directed_vertex_entry *v_next_entry
263  = reinterpret_cast<const directed_vertex_entry *>(next_buf);
264  directed_vertex_entry e = *v_next_entry;
265  new (curr_buf) directed_vertex_entry(e);
266  idx++;
267  _has_next = (idx < end);
268  if (_has_next) {
269  size_t in_size = index.get_in_size(idx);
270  size_t out_size = index.get_out_size(idx);
271  new (next_buf) directed_vertex_entry(e.get_in_off() + in_size,
272  e.get_out_off() + out_size);
273  }
274  }
275 
276  virtual bool move_to(off_t rel_idx) {
277  this->idx = begin + rel_idx;
278  if ((size_t) idx < end) {
279  directed_vertex_entry e = index.get_vertex(idx);
280  new (curr_buf) directed_vertex_entry(e);
281  size_t in_size = index.get_in_size(idx);
282  size_t out_size = index.get_out_size(idx);
283  new (next_buf) directed_vertex_entry(e.get_in_off() + in_size,
284  e.get_out_off() + out_size);
285  _has_next = true;
286  }
287  else
288  _has_next = false;
289  return _has_next;
290  }
291 
292  virtual int get_num_vertices() const {
293  return end - idx;
294  }
295 };
296 
297 /*
298  * This interface defines the method invoked in vertex_index_reader.
299  * It is designed to compute on multiple index entries. If we require
300  * vertices that are adjacent in vertex ids, we should merge all of
301  * these requests in a single index_compute.
302  */
303 class index_compute
304 {
305  index_comp_allocator &alloc;
306  id_range_t id_range;
307 public:
308  index_compute(index_comp_allocator &_alloc): alloc(_alloc) {
309  this->id_range.first = INVALID_VERTEX_ID;
310  this->id_range.second = INVALID_VERTEX_ID;
311  }
312 
313  virtual ~index_compute() {
314  }
315 
316  void clear() {
317  this->id_range.first = INVALID_VERTEX_ID;
318  this->id_range.second = INVALID_VERTEX_ID;
319  }
320 
321  void init(const id_range_t &range) {
322  this->id_range = range;
323  }
324 
325  void init(vertex_id_t id) {
326  assert(id_range.first == INVALID_VERTEX_ID);
327  id_range.first = id;
328  id_range.second = id + 1;
329  }
330 
331  void add_vertex(vertex_id_t id) {
332  assert(id_range.second - 1 <= id);
333  id_range.second = id + 1;
334  }
335 
336  bool empty() const {
337  return id_range.first == INVALID_VERTEX_ID;
338  }
339 
340  vertex_id_t get_first_vertex() const {
341  return id_range.first;
342  }
343 
344  vertex_id_t get_last_vertex() const {
345  return id_range.second - 1;
346  }
347 
348  const id_range_t &get_range() const {
349  return id_range;
350  }
351 
352  virtual bool run(vertex_id_t start_vid, index_iterator &it) = 0;
353 
354  virtual index_comp_allocator &get_allocator() const {
355  return alloc;
356  }
357 };
358 
359 class index_comp_allocator
360 {
361 public:
362  virtual ~index_comp_allocator() {
363  }
364  virtual index_compute *alloc() = 0;
365  virtual void free(index_compute *compute) = 0;
366 };
367 
368 /*
369  * This interface reads vertex index from SSDs.
370  * It accepts the requests of reading vertices or partial vertices as well
371  * as other vertex information in the vertex index such as the number of edges.
372  * This is a per-thread data structure.
373  */
374 class vertex_index_reader
375 {
376 public:
377  typedef std::shared_ptr<vertex_index_reader> ptr;
378 
379  static ptr create(const in_mem_query_vertex_index::ptr index, bool directed);
380  static ptr create(std::shared_ptr<safs::io_interface> io, bool directed);
381 
382  virtual ~vertex_index_reader() {
383  }
384 
385  virtual void request_index(index_compute *compute) = 0;
386  virtual void wait4complete(int num) = 0;
387  virtual size_t get_num_pending_tasks() const = 0;
388 };
389 
390 /*
391  * The classes below implements 4 index_compute to request:
392  * vertex,
393  * part of vertex,
394  * # edges,
395  * # directed edges.
396  */
397 
398 /*
399  * The function class of requesting a vertex.
400  */
401 class req_vertex_func
402 {
403  edge_type type;
404 public:
405  req_vertex_func(edge_type type = edge_type::IN_EDGE) {
406  this->type = type;
407  }
408 
409  void operator()(vertex_id_t vid, vertex_compute &compute,
410  const index_iterator &it) const {
411  assert(type != edge_type::NONE);
412  if (type == edge_type::IN_EDGE) {
413  ext_mem_vertex_info info(vid, it.get_curr_off(),
414  it.get_curr_size());
415  compute.issue_io_request(info);
416  }
417  else if (type == edge_type::OUT_EDGE) {
418  ext_mem_vertex_info info(vid, it.get_curr_out_off(),
419  it.get_curr_out_size());
420  compute.issue_io_request(info);
421  }
422  else {
423  ext_mem_vertex_info in_info(vid, it.get_curr_off(),
424  it.get_curr_size());
425  ext_mem_vertex_info out_info(vid, it.get_curr_out_off(),
426  it.get_curr_out_size());
427  ((directed_vertex_compute &) compute).issue_io_request(in_info,
428  out_info);
429  }
430  }
431 };
432 
433 /*
434  * The function class of requesting the number of edges of a undirected vertex.
435  */
436 struct req_undirected_edge_func
437 {
438  void operator()(vertex_id_t vid, vertex_compute &compute,
439  const index_iterator &it) const {
440  compute.run_on_vertex_size(vid, it.get_curr_size());
441  }
442 };
443 
444 /*
445  * The function class of requesting the number of directed edges of a vertex.
446  */
447 struct req_directed_edge_func
448 {
449  void operator()(vertex_id_t vid, vertex_compute &compute,
450  const index_iterator &it) const {
451  directed_vertex_compute &dcompute
452  = (directed_vertex_compute &) compute;
453  dcompute.run_on_vertex_size(vid, it.get_curr_size(),
454  it.get_curr_out_size());
455  }
456 };
457 
458 class worker_thread;
459 /*
460  * The two classes below are optimized for vertices to request their own
461  * adjacency lists. In this case, we don't need to create a vertex_compute
462  * for each vertex and all adjacency lists are accessed in one or two I/O
463  * requests (two I/O requests are required if both in-edge and out-edge lists
464  * are accessed).
465  */
466 
467 /*
468  * This class is optimized for a list of vertices whose Ids are contiguous.
469  * In this case, we can identify the list of vertices with the first and
470  * the last Id in the range.
471  */
472 class dense_self_vertex_compute: public index_compute
473 {
474  edge_type type;
475  worker_thread *thread;
476 
477  int get_num_vertices() const {
478  return get_last_vertex() - get_first_vertex() + 1;
479  }
480 public:
481  dense_self_vertex_compute(index_comp_allocator &alloc): index_compute(alloc) {
482  this->thread = NULL;
483  type = IN_EDGE;
484  }
485 
486  void init(const id_range_t &range, worker_thread *t, edge_type type) {
487  index_compute::init(range);
488  this->thread = t;
489  this->type = type;
490  }
491 
492  virtual bool run(vertex_id_t start_vid, index_iterator &it);
493 };
494 
495 /*
496  * This class handles the case that the vertex Ids aren't contiguous, but
497  * they are close enough so that a single I/O request can fetch all of
498  * these index entries. Therefore, we need to store all vertex Ids individually.
499  */
500 class sparse_self_vertex_compute: public index_compute
501 {
502  size_t num_ranges;
503  embedded_array<id_range_t> ranges;
504  int entry_size_log;
505  edge_type type;
506  worker_thread *thread;
507  bool in_mem;
508 
509  off_t get_page(vertex_id_t id) const {
510  return ROUND_PAGE(id << entry_size_log);
511  }
512 
513  off_t get_last_page() const {
514  return get_page(get_last_vertex());
515  }
516 
517  void run_in_vertices(vertex_id_t start_vid, index_iterator &it);
518  void run_out_vertices(vertex_id_t start_vid, index_iterator &it);
519  void run_both_vertices(vertex_id_t start_vid, index_iterator &it);
520 public:
521  sparse_self_vertex_compute(index_comp_allocator &alloc,
522  int entry_size_log, bool in_mem): index_compute(alloc) {
523  this->in_mem = in_mem;
524  this->thread = NULL;
525  type = IN_EDGE;
526  num_ranges = 0;
527  this->entry_size_log = entry_size_log;
528  }
529 
530  void init(const id_range_t &range, worker_thread *t, edge_type type) {
531  index_compute::init(range);
532  this->thread = t;
533  this->type = type;
534  ranges[0] = range;
535  num_ranges = 1;
536  }
537 
538  bool add_range(id_range_t &range) {
539  if (in_mem || get_last_page() == get_page(range.first)
540  || get_last_page() + safs::PAGE_SIZE == get_page(range.first)) {
541  index_compute::add_vertex(range.second - 1);
542  if ((size_t) ranges.get_capacity() <= num_ranges)
543  ranges.resize(ranges.get_capacity() * 2);
544  ranges[num_ranges++] = range;
545  return true;
546  }
547  else
548  return false;
549  }
550 
551  size_t get_num_ranges() const {
552  return num_ranges;
553  }
554 
555  const id_range_t &get_range(off_t idx) const {
556  return ranges[idx];
557  }
558 
559  virtual bool run(vertex_id_t start_vid, index_iterator &it);
560 };
561 
562 /*
563  * These are the implementation for dense index computes.
564  * The dense index computes are adjacent to each other. The next index compute
565  * runs on the next vertex in the vertex ID space. There are no repeated vertex
566  * IDs or gaps between vertices.
567  */
568 class dense_vertex_compute: public index_compute
569 {
570  static const int MAX_INDEX_COMPUTE_SIZE = 512;
571 protected:
572  embedded_array<vertex_compute *> computes;
573  int num_gets;
574 public:
575  dense_vertex_compute(index_comp_allocator &_alloc): index_compute(_alloc) {
576  num_gets = 0;
577  }
578 
579  vertex_compute *get_compute(vertex_id_t id) const {
580  assert(id >= get_first_vertex());
581  return computes[id - get_first_vertex()];
582  }
583 
584  vertex_compute *get_first_compute() const {
585  assert(get_num_vertices() >= 1);
586  return computes[0];
587  }
588 
589  int get_num_vertices() const {
590  return get_last_vertex() - get_first_vertex() + 1;
591  }
592 
593  void init(vertex_id_t id, vertex_compute *compute) {
594  index_compute::init(id);
595  computes[0] = compute;
596  }
597 
598  bool add_vertex(vertex_id_t id, vertex_compute *compute) {
599  // We don't want to have too many requests in a compute.
600  // Otherwise, we need to use use malloc to allocate memory.
601  if (computes.get_capacity() <= get_num_vertices()) {
602  if (computes.get_capacity() >= MAX_INDEX_COMPUTE_SIZE)
603  return false;
604  else
605  computes.resize(computes.get_capacity() * 2);
606  }
607 
608  // The next vertex has to be the ID of the previous vertex + 1.
609  // There should be no space.
610  if (get_last_vertex() + 1 == id) {
611  index_compute::add_vertex(id);
612  computes[get_num_vertices() - 1] = compute;
613  return true;
614  }
615  else
616  return false;
617  }
618 
619  template<class Func>
620  bool run_temp(vertex_id_t vid, index_iterator &it, Func &func) {
621  while (it.has_next()) {
622  num_gets++;
623  func(vid, *get_compute(vid), it);
624  vid++;
625  it.move_next();
626  }
627  return num_gets == get_num_vertices();
628  }
629 };
630 
631 /*
632  * This requests a vertex. It works for both directed and undirected vertices.
633  * If it's an undirected vertex, the edge type is always IN_EDGE.
634  */
635 class req_vertex_compute: public dense_vertex_compute
636 {
637  req_vertex_func func;
638  edge_type type;
639 public:
640  req_vertex_compute(index_comp_allocator &_alloc): dense_vertex_compute(_alloc) {
641  type = edge_type::IN_EDGE;
642  }
643 
644  using dense_vertex_compute::init;
645  void init(const directed_vertex_request &req, vertex_compute *compute) {
646  type = req.get_type();
647  func = req_vertex_func(req.get_type());
648  dense_vertex_compute::init(req.get_id(), compute);
649  }
650 
651  using dense_vertex_compute::add_vertex;
652  bool add_vertex(const directed_vertex_request &req, vertex_compute *compute) {
653  assert(type == req.get_type());
654  return dense_vertex_compute::add_vertex(req.get_id(), compute);
655  }
656 
657  virtual bool run(vertex_id_t vid, index_iterator &it) {
658  return run_temp<req_vertex_func>(vid, it, func);
659  }
660 
661  req_vertex_func get_func() const {
662  return func;
663  }
664 };
665 
666 /*
667  * This requests # edges of a vertex.
668  */
669 class req_undirected_edge_compute: public dense_vertex_compute
670 {
671 public:
672  req_undirected_edge_compute(index_comp_allocator &_alloc): dense_vertex_compute(_alloc) {
673  }
674 
675  virtual bool run(vertex_id_t vid, index_iterator &it) {
676  req_undirected_edge_func func;
677  return run_temp<req_undirected_edge_func>(vid, it, func);
678  }
679 
680  req_undirected_edge_func get_func() const {
681  return req_undirected_edge_func();
682  }
683 };
684 
685 /*
686  * This requests # directed edges of a vertex.
687  */
688 class req_directed_edge_compute: public dense_vertex_compute
689 {
690 public:
691  req_directed_edge_compute(index_comp_allocator &_alloc): dense_vertex_compute(
692  _alloc) {
693  }
694 
695  virtual bool run(vertex_id_t vid, index_iterator &it) {
696  req_directed_edge_func func;
697  return run_temp<req_directed_edge_func>(vid, it, func);
698  }
699 
700  req_directed_edge_func get_func() const {
701  return req_directed_edge_func();
702  }
703 };
704 
705 /*
706  * These index computes handle the general cases. That is, there might be a gap
707  * between vertices as well as repeated vertices.
708  */
709 class general_vertex_compute: public index_compute
710 {
711  static const int MAX_INDEX_COMPUTE_SIZE = 512;
712  int num_vertices;
713  embedded_array<vertex_id_t> ids;
714  embedded_array<vertex_compute *> computes;
715  int entry_size_log;
716  bool in_mem;
717 
718  off_t get_page(vertex_id_t id) const {
719  return ROUND_PAGE(id << entry_size_log);
720  }
721 
722  off_t get_last_page() const {
723  return get_page(get_last_vertex());
724  }
725 protected:
726  int num_gets;
727 
728  int get_start_idx(vertex_id_t start_vid) const {
729  if (start_vid == get_id(0))
730  return 0;
731  else {
732  const vertex_id_t *loc = std::lower_bound(ids.data(),
733  ids.data() + num_vertices, start_vid);
734  assert(loc != ids.data() + num_vertices);
735  return loc - ids.data();
736  }
737  }
738 public:
739  general_vertex_compute(index_comp_allocator &_alloc,
740  int entry_size_log, bool in_mem): index_compute(_alloc) {
741  this->in_mem = in_mem;
742  num_vertices = 0;
743  num_gets = 0;
744  this->entry_size_log = entry_size_log;
745  }
746 
747  vertex_compute *get_compute(int idx) const {
748  return computes[idx];
749  }
750 
751  vertex_compute *get_first_compute() const {
752  assert(get_num_vertices() >= 1);
753  return computes[0];
754  }
755 
756  vertex_id_t get_id(int idx) const {
757  return ids[idx];
758  }
759 
760  int get_num_vertices() const {
761  return num_vertices;
762  }
763 
764  void init(vertex_id_t id, vertex_compute *compute) {
765  index_compute::init(id);
766  ids[0] = id;
767  computes[0] = compute;
768  num_vertices++;
769  }
770 
771  void clear() {
772  index_compute::clear();
773  num_vertices = 0;
774  }
775 
776  bool add_vertex(vertex_id_t id, vertex_compute *compute) {
777  // We don't want to have too many requests in a compute.
778  // Otherwise, we need to use use malloc to allocate memory.
779  if (computes.get_capacity() <= get_num_vertices()) {
780  if (computes.get_capacity() >= MAX_INDEX_COMPUTE_SIZE)
781  return false;
782  else {
783  ids.resize(ids.get_capacity() * 2);
784  computes.resize(computes.get_capacity() * 2);
785  }
786  }
787 
788  if (in_mem || get_last_page() == get_page(id)
789  || get_last_page() + safs::PAGE_SIZE == get_page(id)) {
790  index_compute::add_vertex(id);
791  computes[get_num_vertices()] = compute;
792  ids[get_num_vertices()] = id;
793  num_vertices++;
794  return true;
795  }
796  else
797  return false;
798  }
799 
800  template<class Func>
801  bool run_temp(vertex_id_t start_vid, index_iterator &it, Func &func) {
802  int num_vertices = get_num_vertices();
803  for (int i = get_start_idx(start_vid); i < num_vertices; i++) {
804  vertex_id_t id = get_id(i);
805  vertex_compute *compute = get_compute(i);
806  assert(id >= start_vid);
807  if (!it.move_to(id - start_vid)) {
808  break;
809  }
810  func(id, *compute, it);
811  num_gets++;
812  }
813  return num_gets == get_num_vertices();
814  }
815 };
816 
817 class genrq_vertex_compute: public general_vertex_compute
818 {
819  edge_type type;
820  req_vertex_func func;
821 public:
822  genrq_vertex_compute(index_comp_allocator &_alloc, int entry_size_log,
823  bool in_mem): general_vertex_compute(_alloc, entry_size_log, in_mem) {
824  type = edge_type::IN_EDGE;
825  }
826 
827  using general_vertex_compute::init;
828  void init(const directed_vertex_request &req, vertex_compute *compute) {
829  type = req.get_type();
830  func = req_vertex_func(req.get_type());
831  general_vertex_compute::init(req.get_id(), compute);
832  }
833 
834  using general_vertex_compute::add_vertex;
835  bool add_vertex(const directed_vertex_request &req, vertex_compute *compute) {
836  assert(type == req.get_type());
837  return general_vertex_compute::add_vertex(req.get_id(), compute);
838  }
839 
840  virtual bool run(vertex_id_t vid, index_iterator &it) {
841  return run_temp<req_vertex_func>(vid, it, func);
842  }
843 
844  req_vertex_func get_func() const {
845  return func;
846  }
847 };
848 
849 class genrq_edge_compute: public general_vertex_compute
850 {
851 public:
852  genrq_edge_compute(index_comp_allocator &_alloc, int entry_size_log,
853  bool in_mem): general_vertex_compute(_alloc, entry_size_log, in_mem) {
854  }
855 
856  virtual bool run(vertex_id_t vid, index_iterator &it) {
857  req_undirected_edge_func func;
858  return run_temp<req_undirected_edge_func>(vid, it, func);
859  }
860 
861  req_undirected_edge_func get_func() const {
862  return req_undirected_edge_func();
863  }
864 };
865 
866 class genrq_directed_edge_compute: public general_vertex_compute
867 {
868 public:
869  genrq_directed_edge_compute(index_comp_allocator &_alloc, int entry_size_log,
870  bool in_mem): general_vertex_compute(_alloc, entry_size_log, in_mem) {
871  }
872 
873  virtual bool run(vertex_id_t vid, index_iterator &it) {
874  req_directed_edge_func func;
875  return run_temp<req_directed_edge_func>(vid, it, func);
876  }
877 
878  req_directed_edge_func get_func() const {
879  return req_directed_edge_func();
880  }
881 };
882 
883 template<class Func>
884 class single_index_compute: public index_compute
885 {
886  vertex_compute *compute;
887  Func func;
888 public:
889  single_index_compute(
890  index_comp_allocator &alloc): index_compute(alloc) {
891  compute = NULL;
892  }
893 
894  template<class MultIndexCompute>
895  void init(const MultIndexCompute &compute) {
896  assert(compute.get_num_vertices() == 1);
897  index_compute::init(compute.get_first_vertex());
898  assert(this->compute == NULL);
899  this->compute = compute.get_first_compute();
900  this->func = compute.get_func();
901  }
902 
903  virtual bool run(vertex_id_t start_vid, index_iterator &it) {
904  assert(start_vid == get_first_vertex());
905  assert(it.has_next());
906  func(start_vid, *compute, it);
907  return true;
908  }
909 };
910 
911 typedef single_index_compute<req_vertex_func> single_vertex_compute;
912 typedef single_index_compute<req_undirected_edge_func> single_edge_compute;
913 typedef single_index_compute<req_directed_edge_func> single_directed_edge_compute;
914 
915 template<class compute_type>
916 class index_comp_allocator_impl: public index_comp_allocator
917 {
918  class compute_initiator: public obj_initiator<compute_type>
919  {
920  index_comp_allocator_impl<compute_type> &alloc;
921  public:
922  compute_initiator(
923  index_comp_allocator_impl<compute_type> &_alloc): alloc(_alloc) {
924  }
925 
926  virtual void init(compute_type *obj) {
927  new (obj) compute_type(alloc);
928  }
929  };
930 
931  class compute_destructor: public obj_destructor<compute_type>
932  {
933  public:
934  void destroy(compute_type *obj) {
935  obj->~compute_type();
936  }
937  };
938 
939  obj_allocator<compute_type> allocator;
940 public:
941  index_comp_allocator_impl(thread *t): allocator(
942  "index-compute-allocator", t->get_node_id(), false, 1024 * 1024,
943  safs::params.get_max_obj_alloc_size(),
944  typename obj_initiator<compute_type>::ptr(new compute_initiator(*this)),
945  typename obj_destructor<compute_type>::ptr(new compute_destructor())) {
946  }
947 
948  virtual index_compute *alloc() {
949  return allocator.alloc_obj();
950  }
951 
952  virtual void free(index_compute *obj) {
953  allocator.free((compute_type *) obj);
954  }
955 };
956 
957 template<class compute_type>
958 class general_index_comp_allocator_impl: public index_comp_allocator
959 {
960  class compute_initiator: public obj_initiator<compute_type>
961  {
962  general_index_comp_allocator_impl<compute_type> &alloc;
963  int entry_size_log;
964  bool in_mem;
965  public:
966  compute_initiator(general_index_comp_allocator_impl<compute_type> &_alloc,
967  int entry_size_log, bool in_mem): alloc(_alloc) {
968  this->entry_size_log = entry_size_log;
969  this->in_mem = in_mem;
970  }
971 
972  virtual void init(compute_type *obj) {
973  new (obj) compute_type(alloc, entry_size_log, in_mem);
974  }
975  };
976 
977  class compute_destructor: public obj_destructor<compute_type>
978  {
979  public:
980  void destroy(compute_type *obj) {
981  obj->~compute_type();
982  }
983  };
984 
985  obj_allocator<compute_type> allocator;
986 public:
987  general_index_comp_allocator_impl(thread *t, int entry_size_log,
988  bool in_mem): allocator("sparse-index-compute-allocator",
989  t->get_node_id(), false, 1024 * 1024, safs::params.get_max_obj_alloc_size(),
990  typename obj_initiator<compute_type>::ptr(new compute_initiator(*this,
991  entry_size_log, in_mem)),
992  typename obj_destructor<compute_type>::ptr(new compute_destructor())) {
993  }
994 
995  virtual index_compute *alloc() {
996  return allocator.alloc_obj();
997  }
998 
999  virtual void free(index_compute *obj) {
1000  allocator.free((compute_type *) obj);
1001  }
1002 };
1003 
1004 /*
1005  * This is a helper class that requests
1006  * vertex,
1007  * part of vertex,
1008  * # edges,
1009  * # directed edges.
1010  */
1011 class simple_index_reader
1012 {
1013  worker_thread *t;
1014 
1015  index_comp_allocator_impl<req_vertex_compute> *req_vertex_comp_alloc;
1016  index_comp_allocator_impl<req_undirected_edge_compute> *req_undirected_edge_comp_alloc;
1017  index_comp_allocator_impl<req_directed_edge_compute> *req_directed_edge_comp_alloc;
1018 
1019  general_index_comp_allocator_impl<genrq_vertex_compute> *genrq_vertex_comp_alloc;
1020  general_index_comp_allocator_impl<genrq_edge_compute> *genrq_edge_comp_alloc;
1021  general_index_comp_allocator_impl<genrq_directed_edge_compute> *genrq_directed_edge_comp_alloc;
1022 
1023  index_comp_allocator_impl<single_vertex_compute> *single_vertex_comp_alloc;
1024  index_comp_allocator_impl<single_edge_compute> *single_edge_comp_alloc;
1025  index_comp_allocator_impl<single_directed_edge_compute> *single_directed_edge_comp_alloc;
1026 
1027  index_comp_allocator_impl<dense_self_vertex_compute> *dense_self_req_alloc;
1028  general_index_comp_allocator_impl<sparse_self_vertex_compute> *sparse_self_req_alloc;
1029 
1030  typedef std::pair<vertex_id_t, vertex_compute *> id_compute_t;
1031  typedef std::pair<directed_vertex_request, directed_vertex_compute *> directed_compute_t;
1032  std::vector<id_compute_t> vertex_comps;
1033  std::vector<directed_compute_t> part_vertex_comps[edge_type::NUM_TYPES];
1034  std::vector<id_compute_t> edge_comps;
1035  std::vector<id_compute_t> directed_edge_comps;
1036 
1037  // These two optimize the case of requesting the adjacency lists of themselves.
1038  std::vector<id_range_t> self_undirected_reqs;
1039  std::vector<id_range_t> self_part_reqs[edge_type::NUM_TYPES];
1040 
1041  struct id_compute_lesseq
1042  {
1043  bool operator()(const id_compute_t &comp1, const id_compute_t &comp2) {
1044  return comp1.first <= comp2.first;
1045  }
1046  };
1047 
1048  struct id_compute_less
1049  {
1050  bool operator()(const id_compute_t &comp1, const id_compute_t &comp2) {
1051  return comp1.first < comp2.first;
1052  }
1053  };
1054 
1055  struct directed_compute_lesseq
1056  {
1057  bool operator()(const directed_compute_t &comp1, const directed_compute_t &comp2) {
1058  return comp1.first.get_id() <= comp2.first.get_id();
1059  }
1060  };
1061 
1062  struct directed_compute_less
1063  {
1064  bool operator()(const directed_compute_t &comp1, const directed_compute_t &comp2) {
1065  return comp1.first.get_id() < comp2.first.get_id();
1066  }
1067  };
1068 
1069  bool in_mem;
1070  vertex_index_reader::ptr index_reader;
1071 
1072  void flush_computes();
1073 
1074  static int get_index_entry_size_log(bool directed) {
1075  double size_log = log2(get_index_entry_size(
1076  directed ? graph_type::DIRECTED : graph_type::UNDIRECTED));
1077  assert(size_log == floor(size_log));
1078  return size_log;
1079  }
1080 
1081  void init(worker_thread *t, bool directed);
1082 
1083  simple_index_reader(const in_mem_query_vertex_index::ptr index,
1084  bool directed, worker_thread *t) {
1085  in_mem = true;
1086  init(t, directed);
1087  index_reader = vertex_index_reader::create(index, directed);
1088  }
1089 
1090  simple_index_reader(std::shared_ptr<safs::io_interface> io, bool directed,
1091  worker_thread *t) {
1092  in_mem = false;
1093  init(t, directed);
1094  index_reader = vertex_index_reader::create(io, directed);
1095  }
1096 
1097  /*
1098  * Process the self requests (vertices request their own adjacency lists).
1099  * The adjacency list requests in most applications are of this type and
1100  * we can optimize this type of requests differently, because we can easily
1101  * merge them without paying extra overhead.
1102  */
1103  void process_self_requests(std::vector<id_range_t> &reqs, edge_type type);
1104 
1105  /*
1106  * This gets the number of partitions in the vector. Each partition
1107  * has all vertices adjacent to each other. There are no gaps or repeated
1108  * vertices in a partition.
1109  */
1110  template<class EntryType, class GetId>
1111  static int get_num_parts(const std::vector<EntryType> &vec)
1112  {
1113  GetId get_id;
1114  if (get_id(vec.back().first) - get_id(vec.front().first) + 1
1115  == vec.size())
1116  return 1;
1117 
1118  vertex_id_t id = get_id(vec[0].first);
1119  int num_parts = 1;
1120  for (size_t i = 1; i < vec.size(); i++) {
1121  if (id + 1 == get_id(vec[i].first))
1122  id++;
1123  else {
1124  num_parts++;
1125  id = get_id(vec[i].first);
1126  }
1127  }
1128  return num_parts;
1129  }
1130 
1131  static bool is_dense(const std::vector<id_compute_t> &vec)
1132  {
1133  if (vec.size() <= 1)
1134  return true;
1135  struct get_id {
1136  vertex_id_t operator()(vertex_id_t id) {
1137  return id;
1138  }
1139  };
1140  int num_parts = get_num_parts<id_compute_t, get_id>(vec);
1141  if (num_parts == 1)
1142  return true;
1143  return vec.size() / num_parts >= 16;
1144  }
1145 
1146  static bool is_dense(const std::vector<directed_compute_t> &vec)
1147  {
1148  if (vec.size() <= 1)
1149  return true;
1150  struct get_id {
1151  vertex_id_t operator()(const directed_vertex_request &req) {
1152  return req.get_id();
1153  }
1154  };
1155  int num_parts = get_num_parts<directed_compute_t, get_id>(vec);
1156  if (num_parts == 1)
1157  return true;
1158  return vec.size() / num_parts >= 16;
1159  }
1160 
1161  static void request_vertex(std::vector<id_range_t> &reqs, vertex_id_t id) {
1162  if (reqs.empty())
1163  reqs.push_back(id_range_t(id, id + 1));
1164  else if (reqs.back().second == id)
1165  reqs.back().second = id + 1;
1166  else
1167  reqs.push_back(id_range_t(id, id + 1));
1168  }
1169 
1170 public:
1171  typedef std::shared_ptr<simple_index_reader> ptr;
1172 
1173  static ptr create(std::shared_ptr<safs::io_interface> io, bool directed,
1174  worker_thread *t) {
1175  return ptr(new simple_index_reader(io, directed, t));
1176  }
1177 
1178  static ptr create(const in_mem_query_vertex_index::ptr index, bool directed,
1179  worker_thread *t) {
1180  return ptr(new simple_index_reader(index, directed, t));
1181  }
1182 
1183  ~simple_index_reader() {
1184  assert(get_num_pending_tasks() == 0);
1185  delete req_vertex_comp_alloc;
1186  delete req_undirected_edge_comp_alloc;
1187  delete req_directed_edge_comp_alloc;
1188 
1189  delete genrq_vertex_comp_alloc;
1190  delete genrq_edge_comp_alloc;
1191  delete genrq_directed_edge_comp_alloc;
1192 
1193  delete single_vertex_comp_alloc;
1194  delete single_edge_comp_alloc;
1195  delete single_directed_edge_comp_alloc;
1196 
1197  delete dense_self_req_alloc;
1198  delete sparse_self_req_alloc;
1199  }
1200 
1201  /*
1202  * These two methods issue general vertex requests.
1203  */
1204 
1205  /*
1206  * A general way of requesting undirected vertices for a vertex.
1207  */
1208  void request_vertices(vertex_id_t ids[], int num, vertex_compute &compute) {
1209  for (int i = 0; i < num; i++)
1210  vertex_comps.push_back(id_compute_t(ids[i], &compute));
1211  }
1212 
1213  /*
1214  * A general way of requesting directed vertices for a vertex.
1215  */
1216  void request_vertices(const directed_vertex_request reqs[], int num,
1217  directed_vertex_compute &compute) {
1218  for (int i = 0; i < num; i++)
1219  part_vertex_comps[reqs[i].get_type()].push_back(
1220  directed_compute_t(reqs[i], &compute));
1221  }
1222 
1223  /*
1224  * These two methods request the adjacency list for the vertex itself.
1225  */
1226 
1227  /*
1228  * Request an undirected vertex for the vertex of `id'.
1229  */
1230  void request_vertex(vertex_id_t id) {
1231  request_vertex(self_undirected_reqs, id);
1232  }
1233 
1234  /*
1235  * Request a directed vertex for the vertex of `id'.
1236  */
1237  void request_vertex(const directed_vertex_request req) {
1238  request_vertex(self_part_reqs[req.get_type()], req.get_id());
1239  }
1240 
1241  void request_num_edges(vertex_id_t ids[], int num, vertex_compute &compute) {
1242  // TODO it should only work for undirected vertices.
1243 // ABORT_MSG("request_num_edges isn't supported currently");
1244  for (int i = 0; i < num; i++)
1245  edge_comps.push_back(id_compute_t(ids[i], &compute));
1246  }
1247 
1248  void request_num_directed_edges(vertex_id_t ids[], int num,
1249  directed_vertex_compute &compute) {
1250  for (int i = 0; i < num; i++)
1251  directed_edge_comps.push_back(id_compute_t(ids[i], &compute));
1252  }
1253 
1254  void wait4complete(int num) {
1255  flush_computes();
1256  index_reader->wait4complete(num);
1257  }
1258 
1259  size_t get_num_pending_tasks() const {
1260  return index_reader->get_num_pending_tasks();
1261  }
1262 };
1263 
1264 }
1265 
1266 #endif
unsigned int vsize_t
Basic data types used in FlashGraph.
Definition: FG_basic_types.h:33
edge_type
Edge type of an edge in the graph.
Definition: vertex.h:43
Definition: vertex.h:45