FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
worker_thread.h
1 #ifndef __WORKER_THREAD_H__
2 #define __WORKER_THREAD_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashGraph.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <pthread.h>
24 
25 #include <vector>
26 #include <unordered_map>
27 
28 #include "graph_engine.h"
29 #include "bitmap.h"
30 #include "scan_pointer.h"
31 
32 namespace safs
33 {
34  class file_io_factory;
35  class io_interface;
36 }
37 
38 namespace fg
39 {
40 
41 static const size_t MAX_ACTIVE_V = 1024;
42 
43 class worker_thread;
44 
45 /*
46  * This data structure contains two data structures to represent active
47  * vertices in an iteration. The bitmap is used when there are many active
48  * vertices in an iteration; the vector is used when there are only a few
49  * vertices in an iteration.
50  */
51 class active_vertex_set
52 {
53  // The fetech index in the active bitmap. It indicates the index of longs.
54  bitmap active_map;
55  scan_pointer bitmap_fetch_idx;
56 
57  std::vector<local_vid_t> active_v;
58 
59  struct local_vid_less {
60  bool operator()(local_vid_t id1, local_vid_t id2) {
61  return id1.id < id2.id;
62  }
63  };
64 
65  struct local_vid_eq {
66  bool operator()(local_vid_t id1, local_vid_t id2) {
67  return id1.id == id2.id;
68  }
69  };
70 
71  void set_bitmap(const local_vid_t ids[], int num) {
72  for (int i = 0; i < num; i++)
73  active_map.set(ids[i].id);
74  }
75 public:
76  active_vertex_set(size_t num_vertices, int node_id): active_map(
77  num_vertices, node_id), bitmap_fetch_idx(0, true) {
78  }
79 
80  void activate_all() {
81  active_map.set_all();
82  }
83 
84  void activate_vertex(local_vid_t id) {
85  if (active_map.get_num_set_bits() > 0)
86  active_map.set(id.id);
87  else if (active_v.size() < MAX_ACTIVE_V)
88  active_v.push_back(id);
89  else {
90  active_map.set(id.id);
91  set_bitmap(active_v.data(), active_v.size());
92  active_v.clear();
93  }
94  }
95 
96  void activate_vertices(const local_vid_t ids[], int num) {
97  if (active_map.get_num_set_bits() > 0) {
98  set_bitmap(ids, num);
99  }
100  else if (active_v.size() + num < MAX_ACTIVE_V)
101  active_v.insert(active_v.end(), ids, ids + num);
102  else {
103  set_bitmap(ids, num);
104  set_bitmap(active_v.data(), active_v.size());
105  active_v.clear();
106  }
107  }
108 
109  vsize_t get_num_active_vertices() const {
110  if (active_v.empty())
111  return active_map.get_num_set_bits();
112  else
113  return active_v.size();
114  }
115 
116  void finalize() {
117  if (!active_v.empty()) {
118  std::sort(active_v.begin(), active_v.end(), local_vid_less());
119  std::vector<local_vid_t>::iterator new_end
120  = std::unique(active_v.begin(), active_v.end(), local_vid_eq());
121  size_t num_eles = new_end - active_v.begin();
122  assert(num_eles <= active_v.size());
123  active_v.resize(num_eles);
124  }
125  }
126 
127  void force_bitmap() {
128  set_bitmap(active_v.data(), active_v.size());
129  active_v.clear();
130  }
131 
132  void reset_active_vertex(local_vid_t id) {
133  assert(active_v.empty());
134  active_map.reset(id.id);
135  }
136 
137  bool is_active(local_vid_t id) const {
138  assert(active_v.empty());
139  return active_map.get(id.id);
140  }
141 
142  void clear() {
143  active_v.clear();
144  active_map.clear();
145  bitmap_fetch_idx = scan_pointer(0, true);
146  }
147 
148  void set_dir(bool forward) {
149  bitmap_fetch_idx = scan_pointer(active_map.get_num_longs(), forward);
150  }
151 
152  void fetch_reset_active_vertices(size_t max_num,
153  std::vector<local_vid_t> &local_ids);
154  void fetch_reset_active_vertices(std::vector<local_vid_t> &local_ids);
155 };
156 
157 /*
158  * The queue for active vertices.
159  */
160 class active_vertex_queue
161 {
162 public:
163  virtual ~active_vertex_queue() {
164  }
165 
166  virtual void init(const vertex_id_t buf[], size_t size, bool sorted) = 0;
167  // This is the common case for iterations.
168  virtual void init(worker_thread &) = 0;
169  virtual int fetch(compute_vertex_pointer vertices[], int num) = 0;
170  virtual bool is_empty() = 0;
171  virtual size_t get_num_vertices() = 0;
172 
173  void init(const std::vector<vertex_id_t> &vec, bool sorted) {
174  init(vec.data(), vec.size(), sorted);
175  }
176 };
177 
178 /*
179  * This vertex queue is sorted based on the vertex ID.
180  */
181 class default_vertex_queue: public active_vertex_queue
182 {
183  static const size_t VERTEX_BUF_SIZE = 64 * 1024;
184  pthread_spinlock_t lock;
185  // It contains the offset of the vertex in the local partition
186  // instead of the real vertex Ids.
187  std::vector<compute_vertex_pointer> vertex_buf;
188  // Pointers to the vertically partitioned vertices that are activated
189  // in this iteration.
190  std::vector<vpart_vertex_pointer> vpart_ps;
191  int curr_vpart;
192  std::unique_ptr<active_vertex_set> active_vertices;
193  // The fetch index in the vertex buffer.
194  scan_pointer buf_fetch_idx;
195  graph_engine &graph;
196  const graph_index &index;
197  std::atomic<size_t> num_active;
198  int part_id;
199 
200  void fetch_from_map();
201  void fetch_vparts();
202 public:
203  default_vertex_queue(graph_engine &_graph, int part_id,
204  int node_id): buf_fetch_idx(0, true), graph(_graph), index(
205  _graph.get_graph_index()) {
206  pthread_spin_init(&lock, PTHREAD_PROCESS_PRIVATE);
207  num_active = 0;
208  this->part_id = part_id;
209  size_t num_local_vertices = _graph.get_partitioner()->get_part_size(
210  part_id, _graph.get_num_vertices());
211  this->active_vertices = std::unique_ptr<active_vertex_set>(
212  new active_vertex_set(num_local_vertices, node_id));
213  curr_vpart = 0;
214  }
215 
216  virtual void init(const vertex_id_t buf[], size_t size, bool sorted);
217  virtual void init(worker_thread &);
218  virtual int fetch(compute_vertex_pointer vertices[], int num);
219 
220  virtual bool is_empty() {
221  return num_active == 0;
222  }
223 
224  virtual size_t get_num_vertices() {
225  return num_active;
226  }
227 };
228 
229 class customized_vertex_queue: public active_vertex_queue
230 {
231  pthread_spinlock_t lock;
232  std::vector<compute_vertex_pointer> sorted_vertices;
233  scan_pointer fetch_idx;
234  vertex_scheduler::ptr scheduler;
235  vertex_program::ptr vprog;
236  graph_engine &graph;
237  const graph_index &index;
238  int part_id;
239 
240  void get_compute_vertex_pointers(const std::vector<vertex_id_t> &vertices,
241  std::vector<vpart_vertex_pointer> &vpart_ps);
242 public:
243  customized_vertex_queue(vertex_program::ptr vprog,
244  vertex_scheduler::ptr scheduler, int part_id): fetch_idx(0,
245  true), graph(vprog->get_graph()), index(
246  graph.get_graph_index()) {
247  pthread_spin_init(&lock, PTHREAD_PROCESS_PRIVATE);
248  this->scheduler = scheduler;
249  this->part_id = part_id;
250  this->vprog = vprog;
251  }
252 
253  void init(const vertex_id_t buf[], size_t size, bool sorted);
254  void init(worker_thread &);
255 
256  int fetch(compute_vertex_pointer vertices[], int num) {
257  pthread_spin_lock(&lock);
258  int num_fetches = min(num, fetch_idx.get_num_remaining());
259  if (num_fetches > 0) {
260  size_t curr_loc = fetch_idx.get_curr_loc();
261  size_t new_loc = fetch_idx.move(num_fetches);
262  memcpy(vertices, sorted_vertices.data() + min(curr_loc, new_loc),
263  num_fetches * sizeof(vertices[0]));
264  }
265  pthread_spin_unlock(&lock);
266  return num_fetches;
267  }
268 
269  bool is_empty() {
270  pthread_spin_lock(&lock);
271  bool ret = fetch_idx.get_num_remaining() == 0;
272  pthread_spin_unlock(&lock);
273  return ret;
274  }
275 
276  size_t get_num_vertices() {
277  pthread_spin_lock(&lock);
278  size_t num = fetch_idx.get_num_remaining();
279  pthread_spin_unlock(&lock);
280  return num;
281  }
282 };
283 
284 class vertex_compute;
285 class steal_state_t;
286 class message_processor;
287 class load_balancer;
288 class simple_index_reader;
289 
290 class worker_thread: public thread
291 {
292  int worker_id;
293  std::shared_ptr<safs::file_io_factory> graph_factory;
294  std::shared_ptr<safs::file_io_factory> index_factory;
295  std::shared_ptr<safs::io_interface> io;
296  graph_engine *graph;
297  const graph_index &index;
298 
299  std::unique_ptr<safs::compute_allocator> alloc;
300  std::unique_ptr<safs::compute_allocator> merged_alloc;
301  std::unique_ptr<safs::compute_allocator> sparse_alloc;
302  vertex_program::ptr vprogram;
303  // Vertex program on the vertically partitioned vertices.
304  vertex_program::ptr vpart_vprogram;
305  std::shared_ptr<simple_index_reader> index_reader;
306 
307  // This buffers the I/O requests for adjacency lists.
308  std::vector<safs::io_request> adj_reqs;
309 
310  // When a thread process a vertex, the worker thread should keep
311  // a vertex compute for the vertex. This is useful when a user-defined
312  // compute vertex needs to reference its vertex compute.
313  std::unordered_map<compute_vertex *, vertex_compute *> active_computes;
314  // Determine whether the current vertex issues requests.
315  bool req_on_vertex;
316  // This points to the vertex that is currently being processed.
317  compute_vertex_pointer curr_vertex;
318 
319  /*
320  * A vertex is allowed to send messages to other vertices.
321  * The message passing scheme between vertices are implemented as follows:
322  * all non-empty vertices (with edges) have a message holder;
323  * vertices are partitioned and each worker thread is responsible for
324  * a certin number of vertices;
325  * when a vertex issues messages, the thread that processes the vertex will
326  * redirect them to the right threads;
327  * a thread that receives messages process them and place them in
328  * the right message holder.
329  */
330 
331  std::unique_ptr<message_processor> msg_processor;
332  std::unique_ptr<load_balancer> balancer;
333 
334  // This indicates the vertices that request the notification of the end
335  // of an iteration.
336  std::unique_ptr<bitmap> notify_vertices;
337  // This is to collect vertices activated in the next level.
338  std::unique_ptr<active_vertex_set> next_activated_vertices;
339  // This contains the vertices activated in the current level.
340  std::unique_ptr<active_vertex_queue> curr_activated_vertices;
341  vertex_scheduler::ptr scheduler;
342 
343  // Indicate that we need to start all vertices.
344  bool start_all;
345  std::vector<vertex_id_t> started_vertices;
346  std::shared_ptr<vertex_filter> filter;
347  vertex_initializer::ptr vinitializer;
348 
349  // The buffer for processing activated vertex.
350  embedded_array<compute_vertex_pointer> process_vertex_buf;
351 
352  // The number of activated vertices processed in the current level.
353  atomic_number<long> num_activated_vertices_in_level;
354  // The number of vertices completed in the current level.
355  atomic_number<long> num_completed_vertices_in_level;
356 
357  /*
358  * Get the number of vertices being processed in the current level.
359  */
360  int get_num_vertices_processing() const {
361  return num_activated_vertices_in_level.get()
362  - num_completed_vertices_in_level.get();
363  }
364  int process_activated_vertices(int max);
365 public:
366  worker_thread(graph_engine *graph, std::shared_ptr<safs::file_io_factory> graph_factory,
367  std::shared_ptr<safs::file_io_factory> index_factory, vertex_program::ptr prog,
368  vertex_program::ptr part_prog, int node_id, int worker_id,
369  int num_threads, vertex_scheduler::ptr scheduler,
370  std::shared_ptr<slab_allocator> msg_alloc);
371 
372  ~worker_thread();
373 
374  void init_messaging(const std::vector<worker_thread *> &threads,
375  std::shared_ptr<slab_allocator> msg_alloc,
376  std::shared_ptr<slab_allocator> flush_msg_alloc);
377 
378  void run();
379  void init();
380 
381  /*
382  * When a vertex has been completed for the current iteration, this
383  * method is invoked to notify the worker thread, so that the worker
384  * thread can update its statistics on the number of completed vertices.
385  */
386  void complete_vertex(const compute_vertex_pointer v);
387 
388  size_t enter_next_level();
389 
390  void start_vertices(const std::vector<vertex_id_t> &vertices,
391  vertex_initializer::ptr initializer) {
392  this->vinitializer = initializer;
393  started_vertices = vertices;
394  }
395 
396  void start_all_vertices(vertex_initializer::ptr init) {
397  start_all = true;
398  this->vinitializer = init;
399  }
400 
401  void start_vertices(std::shared_ptr<vertex_filter> filter) {
402  this->filter = filter;
403  }
404 
405  compute_vertex_pointer get_curr_vertex() const {
406  return curr_vertex;
407  }
408  void start_run_vertex(compute_vertex_pointer v) {
409  assert(!curr_vertex.is_valid());
410  curr_vertex = v;
411  req_on_vertex = false;
412  }
413  bool finish_run_vertex(compute_vertex_pointer v) {
414  assert(curr_vertex.is_valid());
415  assert(curr_vertex.get() == v.get());
416  curr_vertex = compute_vertex_pointer();
417  return req_on_vertex;
418  }
419 
420  void request_on_vertex(vertex_id_t id) {
421  req_on_vertex = true;
422  }
423  vertex_compute *get_vertex_compute(compute_vertex_pointer v);
424 
425  /*
426  * Activate the vertex in its own partition for the next iteration.
427  */
428  void activate_vertex(local_vid_t id) {
429  next_activated_vertices->activate_vertex(id);
430  }
431 
432  void activate_vertices(const local_vid_t ids[], int num) {
433  next_activated_vertices->activate_vertices(ids, num);
434  }
435 
436  void request_notify_iter_end(local_vid_t id) {
437  notify_vertices->set(id.id);
438  }
439 
440  int steal_activated_vertices(compute_vertex_pointer vertices[], int num);
441  void return_vertices(vertex_id_t ids[], int num);
442 
443  size_t get_num_local_vertices() const {
444  return graph->get_partitioner()->get_part_size(worker_id,
445  graph->get_num_vertices());
446  }
447 
448  int get_worker_id() const {
449  return worker_id;
450  }
451 
452  vertex_program &get_vertex_program(bool part) {
453  return part ? *vpart_vprogram : *vprogram;
454  }
455 
456  graph_engine &get_graph() {
457  return *graph;
458  }
459 
460  message_processor &get_msg_processor() {
461  return *msg_processor;
462  }
463 
464  simple_index_reader &get_index_reader() {
465  return *index_reader;
466  }
467 
468  void issue_io_request(safs::io_request &req) {
469  adj_reqs.push_back(req);
470  }
471 
472  size_t get_activates() const {
473  return curr_activated_vertices->get_num_vertices();
474  }
475 
476  safs::compute_allocator &get_merged_compute_allocator() {
477  return *merged_alloc;
478  }
479 
480  safs::compute_allocator &get_sparse_compute_allocator() {
481  return *sparse_alloc;
482  }
483 
484  int get_stolen_vertex_part(const compute_vertex &v) const;
485 
486  friend class load_balancer;
487  friend class default_vertex_queue;
488  friend class customized_vertex_queue;
489 };
490 
491 }
492 
493 #endif
Definition: io_request.h:491
unsigned int vsize_t
Basic data types used in FlashGraph.
Definition: FG_basic_types.h:33
Definition: io_request.h:454