FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
messaging.h
1 #ifndef __GRAPH_MESSAGING_H__
2 #define __GRAPH_MESSAGING_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashGraph.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include "slab_allocator.h"
24 
25 #include "vertex.h"
26 #include "partitioner.h"
27 
28 namespace fg
29 {
30 
31 class message
32 {
33  // The allocator of the message buffer.
34  slab_allocator *alloc;
35 
36  char *buf;
37  int curr_get_off;
38  int curr_add_off;
39  int num_objs;
40 
41  void init() {
42  alloc = NULL;
43  buf = NULL;
44  curr_get_off = 0;
45  curr_add_off = 0;
46  num_objs = 0;
47  }
48 
49  void destroy() {
50  if (buf) {
51  alloc->free(buf);
52  }
53  }
54 
55 public:
56  message() {
57  init();
58  }
59 
60  message(slab_allocator *alloc) {
61  init();
62  this->alloc = alloc;
63  this->buf = alloc->alloc();
64  assert(this->buf);
65  }
66 
67  ~message() {
68  destroy();
69  }
70 
71  message(message &msg) {
72  memcpy(this, &msg, sizeof(msg));
73  msg.init();
74  }
75 
76  message &operator=(message &msg) {
77  destroy();
78  memcpy(this, &msg, sizeof(msg));
79  msg.init();
80  return *this;
81  }
82 
83  void clear() {
84  destroy();
85  init();
86  }
87 
91  int get_num_objs() const {
92  return num_objs;
93  }
94 
95  bool is_empty() const {
96  return get_num_objs() == 0;
97  }
98 
99  int size() const {
100  return alloc->get_obj_size();
101  }
102 
103  bool has_next() const {
104  return num_objs > 0;
105  }
106 
107  int get_remaining_size() const {
108  return size() - curr_add_off;
109  }
110 
111  template<class T>
112  int get_next(T *objs[], int num) {
113  int i;
114  for (i = 0; has_next() && i < num; i++) {
115  int remaining = size() - curr_get_off;
116  objs[i] = T::deserialize(&buf[curr_get_off], remaining);
117  curr_get_off += objs[i]->get_serialized_size();
118  num_objs--;
119  }
120  return i;
121  }
122 
123  template<class T>
124  T *add(T &obj) {
125  int remaining = size() - curr_add_off;
126  if (remaining < obj.get_serialized_size())
127  return NULL;
128  T *obj_p = (T *) &buf[curr_add_off];
129  curr_add_off += obj.serialize(&buf[curr_add_off], remaining);
130  num_objs++;
131  return obj_p;
132  }
133 
134  int inc_msg_size(int msg_size) {
135  int remaining = size() - curr_add_off;
136  if (remaining < msg_size)
137  return 0;
138  curr_add_off += msg_size;
139  return msg_size;
140  }
141 
142  template<class T>
143  int add(T *objs, int num = 1) {
144  int num_added = 0;
145  for (int i = 0; i < num; i++) {
146  int remaining = size() - curr_add_off;
147  if (remaining < objs[i].get_serialized_size())
148  return num_added;
149  curr_add_off += objs[i].serialize(&buf[curr_add_off], remaining);
150  num_objs++;
151  num_added++;
152  }
153  return num_added;
154  }
155 
156  template<class T>
157  void remove_last_obj(T &obj) {
158  assert(num_objs > 0);
159  assert(curr_add_off >= obj.get_serialized_size());
160  T *obj_p = (T *) &buf[curr_add_off - obj.get_serialized_size()];
161  BOOST_VERIFY(obj_p->get_serialized_size() == obj.get_serialized_size());
162  num_objs--;
163  curr_add_off -= obj.get_serialized_size();
164  }
165 
166  bool copy_to(message &msg) {
167  assert(msg.alloc);
168  assert(msg.size() >= this->size());
169  memcpy(msg.buf, this->buf, curr_add_off);
170  // It probably makes more sense to reset the get offset.
171  // I expect the user will iterate all objects the message later.
172  msg.curr_get_off = 0;
173  msg.curr_add_off = this->curr_add_off;
174  msg.num_objs = this->num_objs;
175  // After we copy all objects to another message, the current
176  // message doesn't contain objects.
177  this->num_objs = 0;
178  this->curr_get_off = this->curr_add_off;
179  return true;
180  }
181 };
182 
183 class msg_queue: public thread_safe_FIFO_queue<message>
184 {
185 public:
186  msg_queue(int node_id, const std::string _name, int init_size,
187  int max_size): thread_safe_FIFO_queue<message>(_name,
188  node_id, init_size, max_size) {
189  }
190 
191  static msg_queue *create(int node_id, const std::string name,
192  int init_size, int max_size) {
193  return new msg_queue(node_id, name, init_size, max_size);
194  }
195 
196  static void destroy(msg_queue *q) {
197  delete q;
198  }
199 
207  int get_num_objs() {
208  int num = thread_safe_FIFO_queue<message>::get_num_entries();
209  stack_array<message> msgs(num);
210  int ret = thread_safe_FIFO_queue<message>::fetch(msgs.data(), num);
211  int num_objs = 0;
212  for (int i = 0; i < ret; i++) {
213  num_objs += msgs[i].get_num_objs();
214  }
215  BOOST_VERIFY(ret == thread_safe_FIFO_queue<message>::add(
216  msgs.data(), ret));
217  return num_objs;
218  }
219 };
220 
221 class simple_msg_sender
222 {
223  std::shared_ptr<slab_allocator> alloc;
224  message buf;
225  msg_queue *queue;
226  int num_objs;
227 
228 protected:
232  simple_msg_sender(int node_id, std::shared_ptr<slab_allocator> alloc,
233  msg_queue *queue): buf(alloc.get()) {
234  this->alloc = alloc;
235  this->queue = queue;
236  num_objs = 0;
237  }
238 
239 public:
240  static simple_msg_sender *create(int node_id,
241  std::shared_ptr<slab_allocator> alloc, msg_queue *queue) {
242  assert(node_id >= 0);
243  return new simple_msg_sender(node_id, alloc, queue);
244  }
245 
246  static void destroy(simple_msg_sender *s) {
247  delete s;
248  }
249 
250  int flush() {
251  num_objs = 0;
252  if (buf.is_empty()) {
253  return 0;
254  }
255  queue->add(&buf, 1);
256  // We have to make sure all messages have been sent to the queue.
257  assert(buf.is_empty());
258  message tmp(alloc.get());
259  buf = tmp;
260  return 1;
261  }
262 
267  int get_num_remaining() {
268  return num_objs;
269  }
270 
271  template<class T>
272  int send_cached(T &msg) {
273  num_objs++;
274  T *ret = buf.add(msg);
275  if (ret == NULL) {
276  flush();
277  ret = buf.add(msg);
278  assert(ret != NULL);
279  }
280  return 1;
281  }
282 
283  msg_queue *get_queue() const {
284  return queue;
285  }
286 };
287 
293 {
294 protected:
295  unsigned multicast: 1;
296  // This is a flag to indicate that a user also wants to activate
297  // the destination vertex.
298  unsigned activate: 1;
299  // This is a flag to indicate that it's a system's activation message.
300  unsigned activation_msg: 1;
301  unsigned flush: 1;
302  unsigned size: 28;
303  union {
304  vertex_id_t dest;
305  int num_dests;
306  } u;
307 public:
308  static vertex_message *deserialize(char *buf, int size) {
309  vertex_message *msg = (vertex_message *) buf;
310  assert(msg->size <= size);
311  return msg;
312  }
313 
314  vertex_message(int size, bool activate) {
315  this->activate = activate;
316  this->multicast = 0;
317  this->activation_msg = 0;
318  this->flush = 0;
319  this->u.dest = -1;
320  this->size = size;
321  assert(size % 4 == 0);
322  }
323 
324  void set_dest(local_vid_t id) {
325  this->u.dest = id.id;
326  }
327 
328  bool is_activation_msg() const {
329  return activation_msg;
330  }
331 
332  bool is_activate() const {
333  return activate;
334  }
335 
336  bool is_multicast() const {
337  return multicast;
338  }
339 
340  bool is_flush() const {
341  return flush;
342  }
343 
344  void set_flush(bool flush) {
345  this->flush = flush;
346  }
347 
348  local_vid_t get_dest() const {
349  return local_vid_t(u.dest);
350  }
351 
352  int get_serialized_size() const {
353  return size;
354  }
355 
356  bool is_empty() const {
357  return size == sizeof(vertex_message);
358  }
359 
360  int serialize(char *buf, int size) const {
361  assert(this->size <= size);
362  memcpy(buf, this, this->size);
363  return this->size;
364  }
365 };
366 
367 class multicast_message;
368 
369 class multicast_dest_list
370 {
371  multicast_message *msg;
372  vertex_id_t *dest_list;
373 public:
374  multicast_dest_list() {
375  msg = NULL;
376  dest_list = NULL;
377  }
378 
379  void clear() {
380  msg = NULL;
381  dest_list = NULL;
382  }
383 
384  multicast_dest_list(multicast_message *msg);
385  void add_dest(local_vid_t id);
386  void add_dests(local_vid_t ids[], int num);
387  int get_num_dests() const;
388  local_vid_t get_dest(int idx) const;
389  const local_vid_t *get_dests() const {
390  return (local_vid_t *) dest_list;
391  }
392 };
393 
394 class multicast_message: public vertex_message
395 {
396  const vertex_id_t *get_dest_begin() const {
397  return (vertex_id_t *) (((char *) this) + get_orig_msg_size());
398  }
399 
400  vertex_id_t *get_dest_begin() {
401  return (vertex_id_t *) (((char *) this) + get_orig_msg_size());
402  }
403 
404  int get_orig_msg_size() const {
405  return size - u.num_dests * sizeof(vertex_id_t);
406  }
407 public:
408  static multicast_message *convert2multicast(vertex_message *msg) {
409  multicast_message *mmsg = (multicast_message *) msg;
410  mmsg->u.num_dests = 0;
411  mmsg->multicast = 1;
412  return mmsg;
413  }
414 
415  static multicast_message *cast2multicast(vertex_message *msg) {
416  multicast_message *mmsg = (multicast_message *) msg;
417  assert(mmsg->multicast);
418  return mmsg;
419  }
420 
421  multicast_message(int size, bool activate): vertex_message(size,
422  activate) {
423  this->multicast = 1;
424  this->u.num_dests = 0;
425  }
426 
427  bool is_empty() const {
428  return (size_t) get_orig_msg_size() == sizeof(vertex_message);
429  }
430 
431  bool is_multicast() const {
432  return multicast;
433  }
434 
435  int get_num_dests() const {
436  return u.num_dests;
437  }
438 
439  multicast_dest_list get_dest_list() {
440  return multicast_dest_list(this);
441  }
442 
443  int get_serialized_size() const {
444  return size;
445  }
446 
452  int get_body_size() const {
453  return get_serialized_size() - get_num_dests() * sizeof(vertex_id_t);
454  }
455 
456  friend class multicast_dest_list;
457 };
458 
459 class activation_message: public multicast_message
460 {
461 public:
462  activation_message(): multicast_message(sizeof(vertex_message), true) {
463  this->activation_msg = 1;
464  }
465 };
466 
467 inline multicast_dest_list::multicast_dest_list(multicast_message *msg)
468 {
469  this->msg = msg;
470  dest_list = msg->get_dest_begin();
471 }
472 
473 inline void multicast_dest_list::add_dest(local_vid_t id)
474 {
475  dest_list[msg->u.num_dests++] = id.id;
476  msg->size += sizeof(id.id);
477 }
478 
479 inline void multicast_dest_list::add_dests(local_vid_t ids[], int num)
480 {
481  assert(sizeof(ids[0]) == sizeof(ids[0].id));
482  memcpy(&dest_list[msg->u.num_dests], ids, sizeof(ids[0].id) * num);
483  msg->u.num_dests += num;
484  msg->size += sizeof(ids[0].id) * num;
485 }
486 
487 inline int multicast_dest_list::get_num_dests() const
488 {
489  return msg->u.num_dests;
490 }
491 
492 inline local_vid_t multicast_dest_list::get_dest(int idx) const
493 {
494  return local_vid_t(dest_list[idx]);
495 }
496 
497 class multicast_msg_sender
498 {
499  const static int MMSG_BUF_SIZE = 4096;
500  std::shared_ptr<slab_allocator> alloc;
501  // The local buffer of vertex messages.
502  message buf;
503  // The destination queue of the sender.
504  msg_queue *queue;
505  // The initialized multicast vertex message.
506  multicast_message *mmsg;
507  // The number of destinations in the multicast message.
508  int num_dests;
509  // This stores the body of the multicast message.
510  char mmsg_temp_buf[MMSG_BUF_SIZE];
511  multicast_dest_list dest_list;
512 
513  multicast_msg_sender(std::shared_ptr<slab_allocator> alloc,
514  msg_queue *queue): buf(alloc.get()) {
515  this->alloc = alloc;
516  this->queue = queue;
517  this->mmsg = NULL;
518  this->num_dests = 0;
519  }
520 public:
521  static multicast_msg_sender *create(std::shared_ptr<slab_allocator> alloc,
522  msg_queue *queue) {
523  return new multicast_msg_sender(alloc, queue);
524  }
525 
526  static void destroy(multicast_msg_sender *s) {
527  delete s;
528  }
529 
530  int flush();
531 
532  template<class T>
533  void init(const T &msg) {
534  assert(mmsg == NULL);
535  num_dests = 0;
536  vertex_message *p = (vertex_message *) buf.add(msg);
537  if (p == NULL) {
538  flush();
539  p = (vertex_message *) buf.add(msg);
540  assert(p);
541  }
542  this->mmsg = multicast_message::convert2multicast(p);
543  assert(this->mmsg->get_serialized_size() <= MMSG_BUF_SIZE);
544  this->mmsg->serialize(mmsg_temp_buf, MMSG_BUF_SIZE);
545  dest_list = this->mmsg->get_dest_list();
546  }
547 
548  int add_dests(local_vid_t ids[], int num);
549 
550  bool add_dest(local_vid_t id);
551 
552  void end_multicast() {
553  if (num_dests == 0) {
554  multicast_message *mmsg_template
555  = (multicast_message *) mmsg_temp_buf;
556  buf.remove_last_obj(*mmsg_template);
557  }
558  mmsg = NULL;
559  num_dests = 0;
560  dest_list.clear();
561  }
562 };
563 
564 }
565 
566 #endif
Definition: messaging.h:292
Definition: partitioner.h:36