FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
messaging.h
1 #ifndef __MESSAGING_H__
2 #define __MESSAGING_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of SAFSlib.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <pthread.h>
24 #include <numa.h>
25 #include <assert.h>
26 #include <sys/uio.h>
27 
28 #include "common.h"
29 #include "container.h"
30 #include "parameters.h"
31 #include "concurrency.h"
32 #include "io_request.h"
33 
34 class slab_allocator;
35 
36 namespace safs
37 {
38 
39 class io_reply
40 {
41  io_request req;
42 public:
43  io_reply() {
44  }
45 
46  io_reply(io_request *reqp, int success, int status) {
47  if (reqp->is_extended_req()) {
48  assert(reqp->get_num_bufs() == 1);
49  data_loc_t loc(reqp->get_file_id(), reqp->get_offset());
50  req = io_request(reqp->get_buf(), loc, reqp->get_size(),
51  reqp->get_access_method(), reqp->get_io(), -1);
52  }
53  else
54  this->req = *reqp;
55  }
56 
57  int get_status() const {
58  return 0;
59  }
60 
61  bool is_success() const {
62  return true;
63  }
64 
65  char *get_buf() const {
66  return req.get_buf();
67  }
68 
69  off_t get_offset() const {
70  return req.get_offset();
71  }
72 
73  ssize_t get_size() const {
74  return req.get_size();
75  }
76 
77  int get_access_method() const {
78  return req.get_access_method();
79  }
80 
81  bool is_data_inline() const {
82  return false;
83  }
84 
85  int serialize(char *buf, int size, bool accept_inline) {
86  return req.serialize(buf, size, accept_inline);
87  }
88 
89  int get_serialized_size() const {
90  return req.get_serialized_size();
91  }
92 
93  io_request &get_request() {
94  return req;
95  }
96 
97  static void deserialize(io_reply &reply, char *buf, int size) {
98  io_request::deserialize(reply.req, buf, size);
99  }
100 
101  static io_reply *deserialize(char *buf, int size) {
102  return (io_reply *) io_request::deserialize(buf, size);
103  }
104 };
105 
106 /*
107  * It is an object container used for message passing.
108  * Instead of sending objects to another thread directly, we add objects
109  * to a message, and send the message to the thread.
110  *
111  * The message supports objects of variant sizes, but objects need to
112  * be able to serialize and deserialize itself from the message.
113  *
114  * The objects that can be added to a message need to support
115  * serialize(),
116  * get_serialized_size(),
117  * deserialize().
118  * If the message allows objects inline in the message buffer, after objects
119  * are serialized in the message, they shouldn't own any memory, and
120  * the message isn't responsible for destroying them when the message
121  * is destroyed.
122  * If the message doesn't allow objects inline in the message buffer,
123  * the objects stored in the message may still own their own memory. Thus,
124  * when the message is destroyed, all objects will be destroyed.
125  *
126  * The ways of fetching objects are different in these two modes:
127  * If the message accepts inline objects, get_next_inline() should be used.
128  * Otherwise, get_next() should be used.
129  *
130  * There are multiple benefits of using the message object:
131  * It reduces lock contention.
132  * When it works with message senders, we can reduce the number of memory
133  * copies. There are at most two memory copies: add an object to
134  * the message; (optionally) the receiver thread copies the message to
135  * the local memory if it runs on a NUMA node different from the message sender.
136  * Previously, we need to copy an object to the sender's local buffer,
137  * copy the local buffer to the queue, copy objects in the queue to the
138  * remote buffer.
139  */
140 template<class T>
141 class message
142 {
143  // The allocator of the message buffer.
144  slab_allocator *alloc;
145 
146  char *buf;
147  short curr_get_off;
148  short curr_add_off;
149  short num_objs;
150  // Indicate whether the data of an object can be inline in the message.
151  short accept_inline: 1;
152 
153  void init() {
154  alloc = NULL;
155  buf = NULL;
156  curr_get_off = 0;
157  curr_add_off = 0;
158  num_objs = 0;
159  accept_inline = 0;
160  }
161 
162  void destroy();
163 
164  /*
165  * This just returns the address of the next object.
166  */
167  T *get_next_addr() {
168  int remaining = size() - curr_get_off;
169  assert(num_objs > 0);
170  num_objs--;
171  T *ret = T::deserialize(&buf[curr_get_off], remaining);
172  curr_get_off += ret->get_serialized_size();
173  return ret;
174  }
175 public:
176  message() {
177  init();
178  }
179 
180  message(slab_allocator *alloc, bool accept_inline);
181 
182  ~message() {
183  destroy();
184  }
185 
186  message(message<T> &msg) {
187  memcpy(this, &msg, sizeof(msg));
188  msg.init();
189  }
190 
191  message<T> &operator=(message<T> &msg) {
192  destroy();
193  memcpy(this, &msg, sizeof(msg));
194  msg.init();
195  return *this;
196  }
197 
198  void clear() {
199  destroy();
200  init();
201  }
202 
203  /*
204  * It's actually the number of remaining objects in the message.
205  */
206  int get_num_objs() const {
207  return num_objs;
208  }
209 
210  bool is_empty() const {
211  return get_num_objs() == 0;
212  }
213 
214  int size() const;
215 
216  bool has_next() const {
217  return num_objs > 0;
218  }
219 
220  int get_next_inline(T objs[], int num_objs) {
221  /* If the message accepts inline objects, there are no ownership
222  * problems. The memory owned by the objects has been embedded
223  * in the message buffer.
224  */
225  assert(accept_inline);
226  int i = 0;
227  while (has_next()) {
228  int remaining = size() - curr_get_off;
229  assert(num_objs > 0);
230  num_objs--;
231  T::deserialize(objs[i], &buf[curr_get_off], remaining);
232  curr_get_off += objs[i].get_serialized_size();
233  i++;
234  }
235  return i;
236  }
237 
238  bool get_next(T &obj) {
239  T *ret = get_next_addr();
240  assert(!accept_inline && !ret->is_data_inline());
241  // The copy constructor of the object will remove the ownership
242  // of any memory pointed by the object, so we don't need to call
243  // the deconstructor of the object.
244  obj = *ret;
245  return true;
246  }
247 
248  int get_next_objs(T objs[], int num) {
249  for (int i = 0; i < num; i++) {
250  if (has_next())
251  get_next(objs[i]);
252  else
253  return i;
254  }
255  return num;
256  }
257 
258  int add(T *objs, int num = 1) {
259  int num_added = 0;
260  for (int i = 0; i < num; i++) {
261  int remaining = size() - curr_add_off;
262  if (remaining < objs[i].get_serialized_size())
263  return num_added;
264  curr_add_off += objs[i].serialize(&buf[curr_add_off], remaining,
265  accept_inline);
266  num_objs++;
267  num_added++;
268  }
269  return num_added;
270  }
271 
272  bool copy_to(message<T> &msg) {
273  assert(msg.alloc);
274  assert(msg.size() >= this->size());
275  memcpy(msg.buf, this->buf, curr_add_off);
276  // It probably makes more sense to reset the get offset.
277  // I expect the user will iterate all objects the message later.
278  msg.curr_get_off = 0;
279  msg.curr_add_off = this->curr_add_off;
280  msg.num_objs = this->num_objs;
281  msg.accept_inline = this->accept_inline;
282  // After we copy all objects to another message, the current
283  // message doesn't contain objects.
284  this->num_objs = 0;
285  this->curr_get_off = this->curr_add_off;
286  return true;
287  }
288 };
289 
290 /*
291  * It contains multiple messages. It basically helps construct messages.
292  */
293 template<class T>
294 class msg_buffer: public fifo_queue<message<T> >
295 {
296  static const int INIT_MSG_BUF_SIZE = 16;
297 
298  slab_allocator *alloc;
299  bool accept_inline;
300 
301  void add_msg(message<T> &msg) {
302  if (fifo_queue<message<T> >::is_full()) {
303  fifo_queue<message<T> >::expand_queue(
304  fifo_queue<message<T> >::get_size() * 2);
305  }
306  BOOST_VERIFY(fifo_queue<message<T> >::add(&msg, 1) == 1);
307  }
308 
309 public:
310  msg_buffer(int node_id, slab_allocator *alloc,
311  bool accept_inline): fifo_queue<message<T> >(
312  node_id, INIT_MSG_BUF_SIZE, true) {
313  this->alloc = alloc;
314  this->accept_inline = accept_inline;
315  }
316 
317  int add_objs(T *objs, int num = 1) {
318  int num_added = 0;
319  if (fifo_queue<message<T> >::is_empty()) {
320  message<T> tmp(alloc, accept_inline);
321  add_msg(tmp);
322  }
323  while (num > 0) {
324  int ret = fifo_queue<message<T> >::back().add(objs, num);
325  // The last message is full. We need to add a new message
326  // to the queue.
327  if (ret == 0) {
328  message<T> tmp(alloc, accept_inline);
329  add_msg(tmp);
330  }
331  else {
332  num_added += ret;
333  objs += ret;
334  num -= ret;
335  }
336  }
337  return num_added;
338  }
339 };
340 
341 template<class T>
342 class msg_queue: public thread_safe_FIFO_queue<message<T> >
343 {
344  // TODO I may need to make sure all messages are compatible with the flag.
345  bool accept_inline;
346 public:
347  msg_queue(int node_id, const std::string _name, int init_size, int max_size,
348  bool accept_inline): thread_safe_FIFO_queue<message<T> >(_name,
349  node_id, init_size, max_size) {
350  this->accept_inline = accept_inline;
351  }
352 
353  static msg_queue<T> *create(int node_id, const std::string name,
354  int init_size, int max_size, bool accept_inline) {
355  return new msg_queue<T>(node_id, name, init_size, max_size,
356  accept_inline);
357  }
358 
359  static void destroy(msg_queue<T> *q) {
360  delete q;
361  }
362 
363  bool is_accept_inline() const {
364  return accept_inline;
365  }
366 
367  /*
368  * This method needs to be used with caution.
369  * It may change the behavior of other threads if they also access
370  * the queue, so it's better to use it when no other threads are
371  * using it.
372  * It is also a heavy operation.
373  */
374  int get_num_objs() {
375  int num = thread_safe_FIFO_queue<message<T> >::get_num_entries();
376  stack_array<message<T> > msgs(num);
377  int ret = thread_safe_FIFO_queue<message<T> >::fetch(msgs.data(), num);
378  int num_objs = 0;
379  for (int i = 0; i < ret; i++) {
380  num_objs += msgs[i].get_num_objs();
381  }
382  BOOST_VERIFY(ret == thread_safe_FIFO_queue<message<T> >::add(
383  msgs.data(), ret));
384  return num_objs;
385  }
386 };
387 
388 template<class T>
389 class thread_safe_msg_sender
390 {
391  pthread_spinlock_t _lock;
392  message<T> buf;
393 
394  slab_allocator *alloc;
395  msg_queue<T> *dest_queue;
396 
397  /*
398  * buf_size: the number of messages that can be buffered in the sender.
399  */
400  thread_safe_msg_sender(slab_allocator *alloc,
401  msg_queue<T> *queue): buf(alloc, queue->is_accept_inline()) {
402  this->alloc = alloc;
403  dest_queue = queue;
404  pthread_spin_init(&_lock, PTHREAD_PROCESS_PRIVATE);
405  }
406 
407 public:
408  static thread_safe_msg_sender<T> *create(int node_id, slab_allocator *alloc,
409  msg_queue<T> *queue) {
410  assert(node_id >= 0);
411  return new thread_safe_msg_sender<T>(alloc, queue);
412  }
413 
414  static void destroy(thread_safe_msg_sender<T> *s) {
415  delete s;
416  }
417 
418  /*
419  * flush the entries in the buffer to the queues.
420  * return the number of entries that have been flushed.
421  */
422  int flush() {
423  pthread_spin_lock(&_lock);
424  if (!buf.is_empty()) {
425  message<T> tmp = buf;
426  message<T> tmp1(alloc, dest_queue->is_accept_inline());
427  buf = tmp1;
428  pthread_spin_unlock(&_lock);
429  int ret = dest_queue->add(&tmp, 1);
430  assert(ret == 1);
431  return ret;
432  }
433  else {
434  pthread_spin_unlock(&_lock);
435  return 0;
436  }
437  }
438 
439  void flush_all() {
440  // flush_all() now is the same as flush().
441  flush();
442  }
443 
444  int send_cached(T *msg, int num = 1);
445  int send(T *msg, int num);
446 
447  int get_num_remaining() const {
448  return buf.get_num_objs();
449  }
450 };
451 
452 template<class T>
453 class simple_msg_sender
454 {
455  slab_allocator *alloc;
456  msg_buffer<T> buf;
457  msg_queue<T> *queue;
458  int num_objs;
459 
460 protected:
461  /*
462  * buf_size: the number of messages that can be buffered in the sender.
463  */
464  simple_msg_sender(int node_id, slab_allocator *alloc,
465  msg_queue<T> *queue): buf(node_id, alloc, queue->is_accept_inline()) {
466  this->alloc = alloc;
467  this->queue = queue;
468  num_objs = 0;
469  }
470 
471 public:
472  static simple_msg_sender<T> *create(int node_id, slab_allocator *alloc,
473  msg_queue<T> *queue) {
474  assert(node_id >= 0);
475  return new simple_msg_sender<T>(node_id, alloc, queue);
476  }
477 
478  static void destroy(simple_msg_sender<T> *s) {
479  delete s;
480  }
481 
482  int flush() {
483  num_objs = 0;
484  if (buf.is_empty()) {
485  return 0;
486  }
487  queue->add(&buf);
488  // We have to make sure all messages have been sent to the queue.
489  assert(buf.is_empty());
490  return 1;
491  }
492 
493  /*
494  * This returns the number of remaining messages instead of the number
495  * of remaining objects.
496  */
497  int get_num_remaining() {
498  return num_objs;
499  }
500 
501  int send_cached(T *msgs, int num = 1) {
502  num_objs += num;
503  return buf.add_objs(msgs, num);
504  }
505 
506  msg_queue<T> *get_queue() const {
507  return queue;
508  }
509 };
510 
511 class request_sender: public simple_msg_sender<io_request>
512 {
513  request_sender(int node_id, slab_allocator *alloc,
514  msg_queue<io_request> *queue): simple_msg_sender(
515  node_id, alloc, queue) {
516  }
517 
518 public:
519  static request_sender *create(int node_id, slab_allocator *alloc,
520  msg_queue<io_request> *queue) {
521  return new request_sender(node_id, alloc, queue);
522  }
523 
524  static void destroy(request_sender *s) {
525  delete s;
526  }
527 };
528 
529 }
530 
531 #endif