FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
graph_engine.h
1 #ifndef __GRAPH_ENGINE_H__
2 #define __GRAPH_ENGINE_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashGraph.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <atomic>
24 
25 #include "vertex.h"
26 #include "vertex_index.h"
27 #include "trace_logger.h"
28 #include "messaging.h"
29 #include "partitioner.h"
30 #include "graph_index.h"
31 #include "graph_config.h"
32 #include "vertex_request.h"
33 #include "vertex_program.h"
34 
35 namespace safs
36 {
37  class file_io_factory;
38 }
39 
40 namespace fg
41 {
42 
43 class graph_engine;
44 class vertex_request;
45 
52 {
53 public:
61  compute_vertex(vertex_id_t id) {
62  }
63 
71  void request_vertices(vertex_id_t ids[], size_t num);
72 
80  void request_vertex_headers(vertex_id_t ids[], size_t num);
81 
87  }
88 
96  const vertex_header &header) {
97  ABORT_MSG("run_on_vertex_header isn't implemented");
98  }
99 
105  void run_on_message(vertex_program &vprog, const vertex_message &msg) {
106  ABORT_MSG("run_on_message isn't implemented");
107  }
108 };
109 
110 class part_compute_vertex: public compute_vertex
111 {
112  vertex_id_t id;
113  int part_id;
114 public:
115  part_compute_vertex(vertex_id_t id, int part_id): compute_vertex(id) {
116  this->id = id;
117  this->part_id = part_id;
118  }
119 
120  int get_part_id() const {
121  return part_id;
122  }
123 
129  vertex_id_t get_id() const {
130  return id;
131  }
132 
133  void request_vertices(vertex_id_t ids[], size_t num);
134 
135  void broadcast_vpart(const vertex_message &msg);
136 
137  void run_on_message(vertex_program &vprog, const vertex_message &msg) {
138  ABORT_MSG("run_on_message isn't implemented");
139  }
140 };
141 
142 /*
143  * This empty compute_vertex is to disable partitioning vertices by default.
144  */
145 class empty_part_compute_vertex: public part_compute_vertex
146 {
147 public:
148  empty_part_compute_vertex(vertex_id_t id,
149  int part_id): part_compute_vertex(id, part_id) {
150  }
151 
152  void run(vertex_program &) {
153  ABORT_MSG("run isn't implemented");
154  }
155 
156  void run(vertex_program &, const page_vertex &vertex) {
157  ABORT_MSG("run isn't implemented");
158  }
159 
160  void run_on_message(vertex_program &, const vertex_message &msg) {
161  ABORT_MSG("run_on_message isn't implemented");
162  }
163 
164  void run_on_vertex_header(vertex_program &prog, const vertex_header &header) {
165  ABORT_MSG("run_on_vertex_header isn't implemented");
166  }
167 };
168 
169 template<class vertex_type, class part_vertex_type = empty_part_compute_vertex>
170 class NUMA_graph_index;
171 
178 {
179 public:
184  }
185 
196  void request_partial_vertices(directed_vertex_request reqs[], size_t num);
197 };
198 
199 class part_compute_directed_vertex: public compute_directed_vertex
200 {
201  vertex_id_t id;
202  int part_id;
203 public:
207  part_compute_directed_vertex(vertex_id_t id,
208  int part_id): compute_directed_vertex(id) {
209  this->id = id;
210  this->part_id = part_id;
211  }
212 
213  int get_part_id() const {
214  return part_id;
215  }
216 
222  vertex_id_t get_id() const {
223  return id;
224  }
225 
226  void request_vertices(vertex_id_t ids[], size_t num);
227  void request_partial_vertices(directed_vertex_request reqs[], size_t num);
228 
229  void run_on_message(vertex_program &vprog, const vertex_message &msg) {
230  ABORT_MSG("run_on_message isn't implemented");
231  }
232 };
233 
238 {
239 public:
240  typedef std::shared_ptr<vertex_scheduler> ptr;
249  virtual void schedule(vertex_program &prog,
250  std::vector<compute_vertex_pointer> &vertices) = 0;
251 };
252 
258 {
259 public:
260 
268  virtual bool keep(vertex_program &, compute_vertex &v) = 0;
269 };
270 
276 {
277 public:
278  typedef std::shared_ptr<vertex_initializer> ptr;
283  virtual void init(compute_vertex &) = 0;
284 };
285 
286 
287 class graph_engine;
288 
296 {
297 public:
298  typedef std::shared_ptr<vertex_query> ptr;
304  virtual void run(graph_engine &, compute_vertex &v) = 0;
305 
313  virtual void merge(graph_engine &graph, vertex_query::ptr q) = 0;
314 
320  virtual ptr clone() = 0;
321 };
322 
323 class worker_thread;
324 class in_mem_graph;
325 class FG_graph;
326 
332 {
333  static std::atomic<long> init_count;
334 
335  graph_header header;
336  // The location of the out-part of the graph. It's valid only
337  // in a directed graph.
338  off_t out_part_off;
339 
340  graph_index::ptr vertices;
341  in_mem_query_vertex_index::ptr vindex;
342  std::shared_ptr<in_mem_graph> graph_data;
343  vertex_scheduler::ptr scheduler;
344 
345  // The number of activated vertices that haven't been processed
346  // in the current level.
347  atomic_number<size_t> num_remaining_vertices_in_level;
348  atomic_integer level;
349  volatile bool is_complete;
350 
351  // These are used for switching queues.
352  pthread_mutex_t lock;
353  pthread_barrier_t barrier1;
354  pthread_barrier_t barrier2;
355 
356  int num_nodes;
357  std::vector<worker_thread *> worker_threads;
358  std::vector<vertex_program::ptr> vprograms;
359 
360  trace_logger::ptr logger;
361  std::shared_ptr<safs::file_io_factory> graph_factory;
362  int max_processing_vertices;
363 
364  // The time when the current iteration starts.
365  struct timeval start_time, iter_start;
366 
367  void init_threads(vertex_program_creater::ptr creater);
368 protected:
369  graph_engine(FG_graph &graph, graph_index::ptr index);
370  void init(graph_index::ptr index);
371 public:
372  typedef std::shared_ptr<graph_engine> ptr;
374  static void init_flash_graph(config_map::ptr configs);
375  static void destroy_flash_graph();
376 
382  static graph_engine::ptr create(FG_graph &graph, graph_index::ptr index) {
383  return graph_engine::ptr(new graph_engine(graph, index));
384  }
385 
389  ~graph_engine();
390 
391  /*
392  * The following four variants of get_vertex return the same compute_vertex
393  * but with slightly lower overhead.
394  * These can only be used in a shared machine.
395  */
396 
403  compute_vertex &get_vertex(vertex_id_t id) {
404  return vertices->get_vertex(id);
405  }
406 
408  compute_vertex &get_vertex(int part_id, local_vid_t id) {
409  return vertices->get_vertex(part_id, id);
410  }
411 
417  size_t get_vertices(const vertex_id_t ids[], int num, compute_vertex *v_buf[]) {
418  return vertices->get_vertices(ids, num, v_buf);
419  }
420 
422  size_t get_vertices(int part_id, const local_vid_t ids[], int num,
423  compute_vertex *v_buf[]) {
424  return vertices->get_vertices(part_id, ids, num, v_buf);
425  }
426 
431  vertex_id_t get_max_vertex_id() const {
432  return vertices->get_max_vertex_id();
433  }
434 
439  vertex_id_t get_min_vertex_id() const {
440  return vertices->get_min_vertex_id();
441  }
442 
443 
448  size_t get_num_vertices() const {
449  return vertices->get_num_vertices();
450  }
451 
456  bool is_directed() const {
457  return header.is_directed_graph();
458  }
459 
465  return header;
466  }
467 
472  void set_vertex_scheduler(vertex_scheduler::ptr scheduler);
473 
482  void start(std::shared_ptr<vertex_filter> filter,
483  vertex_program_creater::ptr creater = vertex_program_creater::ptr());
484 
494  void start(const vertex_id_t ids[], int num,
495  vertex_initializer::ptr init = vertex_initializer::ptr(),
496  vertex_program_creater::ptr creater = vertex_program_creater::ptr());
497 
506  void start_all(vertex_initializer::ptr init = vertex_initializer::ptr(),
507  vertex_program_creater::ptr creater = vertex_program_creater::ptr());
508 
513  void wait4complete();
514 
515 #if 0
516 
522  void preload_graph();
523 #endif
524 
531  void init_vertices(vertex_id_t ids[], int num, vertex_initializer::ptr init);
532 
537  void init_all_vertices(vertex_initializer::ptr init);
538 
543  void query_on_all(vertex_query::ptr query);
544 
549  void get_vertex_programs(std::vector<vertex_program::ptr> &programs) {
550  programs = vprograms;
551  }
552 
557  int get_curr_level() const {
558  return level.get();
559  }
560 
561  vsize_t get_num_edges(vertex_id_t id,
562  edge_type type = edge_type::BOTH_EDGES) const {
563  return vindex->get_num_edges(id, type);
564  }
565 
575  bool progress_next_level();
576  bool progress_first_level();
577 
579  trace_logger::ptr get_logger() const {
580  return logger;
581  }
582 
587  int get_file_id() const;
588 
590  const graph_partitioner *get_partitioner() const {
591  return &vertices->get_partitioner();
592  }
593 
595  int get_num_threads() const {
596  return worker_threads.size();
597  }
598 
600  worker_thread *get_thread(int idx) const {
601  return worker_threads[idx];
602  }
603 
604  /*
605  * The following two methods keep track of the number of active vertices
606  * globally in the current iteration.
607  */
608 
613  void process_vertices(int num) {
614  assert(num_remaining_vertices_in_level.get() >= (size_t) num);
615  num_remaining_vertices_in_level.dec(num);
616  }
617 
622  size_t get_num_remaining_vertices() const {
623  return num_remaining_vertices_in_level.get();
624  }
625 
626  const in_mem_query_vertex_index::ptr get_in_mem_index() const {
627  return vindex;
628  }
629 
630  void set_max_processing_vertices(int max) {
631  max_processing_vertices = max;
632  }
633 
634  int get_max_processing_vertices() const {
635  return max_processing_vertices;
636  }
637 
638  const graph_index &get_graph_index() const {
639  return *vertices;
640  }
641 
642  size_t get_in_part_size() const {
643  return out_part_off;
644  }
645 
646  vsize_t cal_num_edges(vsize_t vertex_size) const {
647  return ext_mem_undirected_vertex::vsize2num_edges(vertex_size,
648  header.get_edge_data_size());
649  }
650 };
651 
652 }
653 
654 #endif
void request_partial_vertices(directed_vertex_request reqs[], size_t num)
This allows a vertex to request partial vertices in the graph. Defn: ""partial vertices"" – Par...
void run_on_vertex_header(vertex_program &prog, const vertex_header &header)
This method is invoked by calling the request_vertex_headers method and is where one would access the...
Definition: graph_engine.h:95
virtual void run(graph_engine &, compute_vertex &v)=0
This method is executed on vertices in parallel and contains any user defined code.
void init_vertices(vertex_id_t ids[], int num, vertex_initializer::ptr init)
Allows users to initialize vertices to certain state.
When the graph engine starts, a user can use this filter to decide what vertices are activated for th...
Definition: graph_engine.h:257
virtual bool keep(vertex_program &, compute_vertex &v)=0
The method defines which vertices are active in the first iteration. If this method returns true the ...
unsigned int vsize_t
Basic data types used in FlashGraph.
Definition: FG_basic_types.h:33
virtual void schedule(vertex_program &prog, std::vector< compute_vertex_pointer > &vertices)=0
Implement this method in order to customize the vertex schedule.
void get_vertex_programs(std::vector< vertex_program::ptr > &programs)
Return The per-thread vertex programs used by the graph engine.
Definition: graph_engine.h:549
static void init_flash_graph(config_map::ptr configs)
void notify_iteration_end(vertex_program &prog)
Allows a vertex to perform a task at the end of every iteration.
Definition: graph_engine.h:86
bool progress_next_level()
const graph_header & get_graph_header() const
Get the graph header info.
Definition: graph_engine.h:464
bool is_directed() const
Tell a user if the graph is directed or not.
Definition: graph_engine.h:456
This is the class that coordinates how & where algorithms are run. It can be seen as the central orga...
Definition: graph_engine.h:331
Definition: vertex_program.h:45
compute_vertex(vertex_id_t id)
The constructor called by graph_engine to create vertex state.
Definition: graph_engine.h:61
~graph_engine()
Class destructor.
Definition: messaging.h:292
void set_vertex_scheduler(vertex_scheduler::ptr scheduler)
Set the graph computation to use a custom vertex scheduler.
edge_type
Edge type of an edge in the graph.
Definition: vertex.h:43
vertex_id_t get_min_vertex_id() const
Get the minimum vertex ID in the graph.
Definition: graph_engine.h:439
size_t get_num_vertices() const
Get the number of vertices in the graph.
Definition: graph_engine.h:448
Definition: partitioner.h:36
Class from which users' vertex-centric programs should inherit. Serial code written when implementing...
Definition: graph_engine.h:51
virtual void merge(graph_engine &graph, vertex_query::ptr q)=0
All vertex results may be merged (not specially combined but any custom operation). . This for instance can be used to aggregate (add, subtract, max etc.) a user defined data member for the class.
Parallized query of the vertex state of all vertices in the graph. Each worker thread gets an instanc...
Definition: graph_engine.h:295
void request_vertices(vertex_id_t ids[], size_t num)
This allows a vertex to request the adjacency lists of vertices in the graph.
void request_vertex_headers(vertex_id_t ids[], size_t num)
This requests the vertex headers. It mainly contains the number of edges (the number of in-edges and ...
virtual void init(compute_vertex &)=0
Initialization method to initialize the given vertex.
int get_curr_level() const
This returns the current iteration number in the graph engine.
Definition: graph_engine.h:557
vertex_id_t get_max_vertex_id() const
Get the maximum vertex ID in the graph.
Definition: graph_engine.h:431
Definition: graph_file_header.h:58
A user may be decide to initialize individual vertex state in a custom way not expressible via the ve...
Definition: graph_engine.h:275
void run_on_message(vertex_program &vprog, const vertex_message &msg)
This method is invoked when the vertex receives a message.
Definition: graph_engine.h:105
size_t get_vertices(const vertex_id_t ids[], int num, compute_vertex *v_buf[])
Permits any vertex to pull any set of the vertex state of other vertices. NOTE: This can only be use...
Definition: graph_engine.h:417
void wait4complete()
Synchronization barrier that waits for the graph algorithm to complete.
void init_all_vertices(vertex_initializer::ptr init)
Allows users to initialize all vertices to certain state.
compute_directed_vertex(vertex_id_t id)
The constructor callled by the grah engne.
Definition: graph_engine.h:183
compute_vertex & get_vertex(vertex_id_t id)
Permits any vertex to pull any other vertex's state. NOTE: This can only be used in a shared machine...
Definition: graph_engine.h:403
Definition: graph_engine.h:237
A user-friendly wrapper for FlashGraph's raw graph type. Very usefule when when utilizing FlashGraph ...
Definition: FGlib.h:42
void query_on_all(vertex_query::ptr query)
Allows users to query the information on the state of all vertices.
void start_all(vertex_initializer::ptr init=vertex_initializer::ptr(), vertex_program_creater::ptr creater=vertex_program_creater::ptr())
Start the graph engine and begin computation on all vertices.
virtual ptr clone()=0
Implements a copy constructor. The graph engine uses this method to create an instance of this query ...
A directed version of the compute_vertex class that users inherit from when using the FlashGraph engi...
Definition: graph_engine.h:177
void start(std::shared_ptr< vertex_filter > filter, vertex_program_creater::ptr creater=vertex_program_creater::ptr())
Start the graph engine and begin computation on a subset of vertices.
static graph_engine::ptr create(FG_graph &graph, graph_index::ptr index)
Constructor usable by inheriting classes.
Definition: graph_engine.h:382