FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
vertex_program.h
1 #ifndef __VERTEX_PROGRAM_H__
2 #define __VERTEX_PROGRAM_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashGraph.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <memory>
24 
25 #include "container.h"
26 #include "vertex.h"
27 #include "messaging.h"
28 #include "vertex_pointer.h"
29 
30 namespace fg
31 {
32 
33 class graph_engine;
34 class compute_vertex;
35 class page_vertex;
36 class vertex_message;
37 class worker_thread;
38 
46 {
47  int part_id;
48  worker_thread *t;
49  graph_engine *graph;
50 
51  std::unique_ptr<std::vector<local_vid_t>[]> vid_bufs;
52  std::unique_ptr<vertex_loc_t[]> vertex_locs;
53  size_t vloc_size;
54 
55  // The message senders to send messages to all other threads.
56  // There are n senders, n is the total number of threads used by
57  // the graph engine.
58  std::vector<simple_msg_sender *> msg_senders;
59  std::vector<simple_msg_sender *> flush_msg_senders;
60  std::vector<multicast_msg_sender *> multicast_senders;
61  std::vector<multicast_msg_sender *> activate_senders;
62 
63  multicast_msg_sender &get_activate_sender(int thread_id) const {
64  return *activate_senders[thread_id];
65  }
66 
67  multicast_msg_sender &get_multicast_sender(int thread_id) const {
68  return *multicast_senders[thread_id];
69  }
70 
71  simple_msg_sender &get_flush_msg_sender(int thread_id) const {
72  return *flush_msg_senders[thread_id];
73  }
74 
75  simple_msg_sender &get_msg_sender(int thread_id) const {
76  return *msg_senders[thread_id];
77  }
78 public:
79  typedef std::shared_ptr<vertex_program> ptr;
82  part_id = 0;
83  t = NULL;
84  graph = NULL;
85  }
86 
88  virtual ~vertex_program();
89 
96  void init(graph_engine *graph, worker_thread *t);
97 
98  /* Internal */
99  void init_messaging(const std::vector<worker_thread *> &threads,
100  std::shared_ptr<slab_allocator> msg_alloc,
101  std::shared_ptr<slab_allocator> flush_msg_alloc);
102 
103  virtual void run_on_engine_start() = 0;
104 
112  virtual void run(compute_vertex &vertex) = 0;
113 
120  virtual void run(compute_vertex &comp_v, const page_vertex &vertex) = 0;
121 
127  virtual void run_on_message(compute_vertex &c_vertex, const vertex_message &vertex_m) = 0;
128 
134  virtual void run_on_messages(const vertex_message *v_msgs[], int num) = 0;
135 
140  virtual void run_on_multicast_message(multicast_message &mmsg) = 0;
141 
147  virtual void run_on_num_edges(compute_vertex &c_vertex,
148  const vertex_header &header) = 0;
149 
150  virtual void run_on_iteration_end() = 0;
151 
156  virtual void notify_iteration_end(compute_vertex &cv) = 0;
157 
158  /* Internal */
159  const worker_thread &get_thread() const {
160  return *t;
161  }
162 
168  return *graph;
169  }
170 
179  void multicast_msg(vertex_id_t ids[], int num, vertex_message &msg);
180 
189 
195  void send_msg(vertex_id_t dest, vertex_message &msg);
196 
202  void activate_vertices(vertex_id_t ids[], int num);
203 
209 
214  void activate_vertex(vertex_id_t vertex) {
215  activate_vertices(&vertex, 1);
216  }
217 
218  /* Internal */
219  void flush_msgs();
220 
228 
229  vertex_id_t get_vertex_id(compute_vertex_pointer v) const;
230  vertex_id_t get_vertex_id(const compute_vertex &v) const;
231  vsize_t get_num_edges(vertex_id_t id) const;
232  int get_partition_id() const {
233  return part_id;
234  }
235 };
236 
243 {
244 public:
245  typedef std::unique_ptr<vertex_program_creater> ptr;
250  virtual vertex_program::ptr create() const = 0;
251 };
252 
253 size_t graph_get_vertices(graph_engine &graph, const worker_thread &,
254  const local_vid_t ids[], int num_ids, compute_vertex *v_buf[]);
255 
259 template<class vertex_type>
261 {
262  embedded_array<compute_vertex *, 1024> vertex_buf;
263  embedded_array<local_vid_t, 1024> id_buf;
264 public:
265 
266  virtual void run_on_engine_start() {
267  }
268 
276  virtual void run(compute_vertex &comp_v) {
277  ((vertex_type &) comp_v).run(*this);
278  }
279 
286  virtual void run(compute_vertex &comp_v, const page_vertex &vertex) {
287  ((vertex_type &) comp_v).run(*this, vertex);
288  }
289 
295  virtual void run_on_message(compute_vertex &comp_v,
296  const vertex_message &msg) {
297  ((vertex_type &) comp_v).run_on_message(*this, msg);
298  }
299 
305  virtual void run_on_messages(const vertex_message *v_msgs[],
306  int num) {
307  vertex_buf.resize(num);
308  id_buf.resize(num);
309  for (int i = 0; i < num; i++)
310  id_buf[i] = v_msgs[i]->get_dest();
311  graph_get_vertices(get_graph(), get_thread(), id_buf.data(), num,
312  vertex_buf.data());
313  for (int i = 0; i < num; i++) {
314  assert(!v_msgs[i]->is_multicast());
315  vertex_type *v = (vertex_type *) vertex_buf[i];
316  v->run_on_message(*this, *v_msgs[i]);
317  }
318  }
319 
324  virtual void run_on_multicast_message(multicast_message &mmsg) {
325  int num_dests = mmsg.get_num_dests();
326  multicast_dest_list dest_list = mmsg.get_dest_list();
327 
328  vertex_buf.resize(num_dests);
329  id_buf.resize(num_dests);
330  for (int i = 0; i < num_dests; i++)
331  id_buf[i] = dest_list.get_dest(i);
332  graph_get_vertices(get_graph(), get_thread(), id_buf.data(), num_dests,
333  vertex_buf.data());
334 
335  for (int i = 0; i < num_dests; i++) {
336  vertex_type *v = (vertex_type *) vertex_buf[i];
337  v->run_on_message(*this, mmsg);
338  }
339  }
340 
341  virtual void run_on_num_edges(compute_vertex &c_vertex,
342  const vertex_header &header) {
343  ((vertex_type &) c_vertex).run_on_vertex_header(*this, header);
344  }
345 
346  virtual void run_on_iteration_end() {
347  }
348 
355  virtual void notify_iteration_end(compute_vertex &comp_v) {
356  ((vertex_type &) comp_v).notify_iteration_end(*this);
357  }
358 };
359 
360 }
361 
362 #endif
unsigned int vsize_t
Basic data types used in FlashGraph.
Definition: FG_basic_types.h:33
virtual void run_on_multicast_message(multicast_message &mmsg)=0
Run user's code when a multicast message is received.
virtual vertex_program::ptr create() const =0
Much like a constructor implement this in lieu of that.
graph_engine & get_graph()
Get a pointer to the graph_engine.
Definition: vertex_program.h:167
void request_notify_iter_end(const compute_vertex &v)
A vertex requests the end of an iteration. `notify_iteration_end' of the vertex will be invoked at th...
This is the class that coordinates how & where algorithms are run. It can be seen as the central orga...
Definition: graph_engine.h:331
Definition: vertex_program.h:45
virtual void notify_iteration_end(compute_vertex &comp_v)
A vertex requests the end of an iteration. `notify_iteration_end' of the vertex will be invoked at th...
Definition: vertex_program.h:355
Definition: messaging.h:292
The default implementation of a vertex program in the graph engine.
Definition: vertex_program.h:260
virtual void run_on_message(compute_vertex &c_vertex, const vertex_message &vertex_m)=0
Run user's code when the vertex receives messages from another.
Definition: partitioner.h:36
Class from which users' vertex-centric programs should inherit. Serial code written when implementing...
Definition: graph_engine.h:51
virtual ~vertex_program()
Destructor.
virtual void run_on_messages(const vertex_message *v_msgs[], int num)
Run user's code when the vertex receives messages from others.
Definition: vertex_program.h:305
virtual void notify_iteration_end(compute_vertex &cv)=0
Perform some user defined action on a vertex when the current iteration comes to an end...
virtual void run_on_message(compute_vertex &comp_v, const vertex_message &msg)
Run user's code when the vertex receives messages from other.
Definition: vertex_program.h:295
virtual void run(compute_vertex &vertex)=0
This is a pre-run before users get any information of adjacency list of vertices. This is commonly wh...
virtual void run_on_num_edges(compute_vertex &c_vertex, const vertex_header &header)
Run user's code when the vertex header is read from disks.
Definition: vertex_program.h:341
Vertex representation when in the page cache.
Definition: vertex.h:575
virtual void run_on_multicast_message(multicast_message &mmsg)
Run user's code when a multicast message is received.
Definition: vertex_program.h:324
virtual void run_on_messages(const vertex_message *v_msgs[], int num)=0
Run user's code when the vertex receives messages from others.
void multicast_msg(vertex_id_t ids[], int num, vertex_message &msg)
Multicast the same message to several other vertices. If the number of vertices receiving the message...
virtual void run(compute_vertex &comp_v)
This is a pre-run before users get any information of adjacency list of vertices. This is commonly wh...
Definition: vertex_program.h:276
virtual void run(compute_vertex &comp_v, const page_vertex &vertex)
Run user's code ideally/generally when the adjacency list of the vertex is read from disks...
Definition: vertex_program.h:286
void activate_vertices(vertex_id_t ids[], int num)
Activate vertices to be processed in the next level (iteration).
vertex_program()
Definition: vertex_program.h:81
void send_msg(vertex_id_t dest, vertex_message &msg)
Send a point-to-point message from one vertex to another.
virtual void run_on_num_edges(compute_vertex &c_vertex, const vertex_header &header)=0
Run user's code when the vertex header is read from disks.
void activate_vertex(vertex_id_t vertex)
Activate a singel vertex to be processed in the next level (iteration).
Definition: vertex_program.h:214
Extend/Override when defining a custom vertex program. The graph engine uses this to construct vertex...
Definition: vertex_program.h:242