FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
container.h
1 #ifndef __CONTAINER_H__
2 #define __CONTAINER_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of SAFSlib.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <stdio.h>
24 #include <math.h>
25 #include <numa.h>
26 #include <assert.h>
27 #include <pthread.h>
28 #include <limits.h>
29 
30 #include <string>
31 #include <boost/assert.hpp>
32 
33 #include "common.h"
34 
35 template<class T>
36 class queue_interface
37 {
38 public:
39  virtual ~queue_interface() {
40  }
41 
42  virtual T pop_front() = 0;
43  virtual void push_back(T &v) = 0;
44  virtual int get_num_entries() = 0;
45  virtual bool is_empty() = 0;
46 };
47 
48 /*
49  * this is a first-in-first-out queue.
50  * However, the location of an entry in the queue never changes.
51  */
52 template<class T>
53 class fifo_queue: public queue_interface<T>
54 {
55  int size_mask;
56  T *buf; // a circular buffer to keep pages.
57  bool allocated; // indicates whether the buffer is allocated by the queue.
58  long start;
59  long end;
60  bool resizable;
61  int node_id;
62 
63  long loc_in_queue(long idx) const {
64  return idx & size_mask;
65  }
66 
67  T *alloc_buf(int size) {
68  return new T[size];
69  }
70 
71  void free_buf(T *buf) {
72  assert(allocated);
73  delete [] buf;
74  }
75 
76 public:
77  class const_iterator {
78  const fifo_queue<T> *q;
79  long idx;
80  public:
81  const_iterator(const fifo_queue<T> *q) {
82  this->q = q;
83  this->idx = q->start;
84  }
85 
86  const T &operator*() const {
87  long real_idx = q->loc_in_queue(idx);
88  return q->buf[real_idx];
89  }
90 
91  const_iterator &operator++() {
92  idx++;
93  return *this;
94  }
95 
96  bool operator==(const const_iterator &it) const {
97  return this->idx == it.idx;
98  }
99 
100  bool operator!=(const const_iterator &it) const {
101  return this->idx != it.idx;
102  }
103 
104  const_iterator &operator+=(int num) {
105  assert(num >= 0);
106  idx += num;
107  assert(idx <= q->end);
108  return *this;
109  }
110  };
111 
112  fifo_queue(T *entries, int num) {
113  size_mask = INT_MAX;
114  buf = entries;
115  allocated = false;
116  start = 0;
117  end = num;
118  resizable = false;
119  node_id = -1;
120  }
121 
122  // the queue has to be 2^n. If it's not, the smallest number of 2^n
123  // is used.
124  fifo_queue(int node_id, int size, bool resizable = false) {
125  int log_size = (int) ceil(log2(size));
126  size = 1 << log_size;
127  this->size_mask = size - 1;
128  this->node_id = node_id;
129  buf = alloc_buf(size);
130  allocated = true;
131  start = 0;
132  end = 0;
133  this->resizable = resizable;
134  }
135 
136  virtual ~fifo_queue() {
137  if (allocated)
138  free_buf(buf);
139  }
140 
141  static fifo_queue<T> *create(int node_id, int size,
142  bool resizable = false) {
143  return new fifo_queue<T>(node_id, size, resizable);
144  }
145 
146  static void destroy(fifo_queue<T> *q) {
147  delete q;
148  }
149 
150  const_iterator get_begin() const {
151  return const_iterator(this);
152  }
153 
154  const_iterator get_end() const {
155  const_iterator it(this);
156  assert(end >= start);
157  it += (end - start);
158  return it;
159  }
160 
161  bool expand_queue(int new_size);
162 
163  /*
164  * Get the first element in the queue.
165  */
166  T &front() {
167  assert(!is_empty());
168  return buf[loc_in_queue(start)];
169  }
170 
171  /*
172  * Get the last element in the queue.
173  */
174  T &back() {
175  assert(!is_empty());
176  return buf[loc_in_queue(end - 1)];
177  }
178 
179  virtual T pop_front() {
180  assert(start < end);
181  T ret = buf[loc_in_queue(start)];
182  start++;
183  return ret;
184  }
185 
186  virtual void push_back(T &v) {
187  assert(end - start < get_size());
188  buf[loc_in_queue(end)] = v;
189  end++;
190  }
191 
192  virtual int fetch(T *entries, int num) {
193  int num_fetches = 0;
194  while (!fifo_queue<T>::is_empty() && num_fetches < num) {
195  entries[num_fetches++] = buf[loc_in_queue(start)];
196  start++;
197  }
198  return num_fetches;
199  }
200 
201  virtual int add(fifo_queue<T> *queue) {
202  int idx = (int) (loc_in_queue(end));
203  int length = min(get_size() - idx, get_num_remaining());
204  int num_added = 0;
205  int num = queue->fetch(buf + idx, length);
206  end += num;
207  num_added += num;
208  // If we fetch fewer entries than we ask for, it means we have fetched
209  // all entries in the queue.
210  // or the current queue is full.
211  if (num < length || get_num_remaining() == 0)
212  return num_added;
213  assert(loc_in_queue(end) == 0);
214  length = get_num_remaining();
215  num = queue->fetch(buf, length);
216  end += num;
217  num_added += num;
218  return num_added;
219  }
220 
221  virtual int add(T *entries, int num) {
222  int num_pushes = 0;
223  while (!fifo_queue<T>::is_full() && num_pushes < num) {
224  buf[loc_in_queue(end)] = entries[num_pushes++];
225  end++;
226  }
227  return num_pushes;
228  }
229 
230  int get_num_remaining() {
231  return get_size() - fifo_queue<T>::get_num_entries();
232  }
233 
234  virtual int get_num_entries() {
235  return (int) (end - start);
236  }
237 
238  int get_size() const {
239  assert(allocated);
240  return size_mask + 1;
241  }
242 
243  virtual bool is_full() {
244  return end - start >= get_size();
245  }
246 
247  virtual bool is_empty() {
248  return start >= end;
249  }
250 };
251 
252 template<class T>
253 class thread_safe_FIFO_queue;
254 
255 /*
256  * this is a thread-safe FIFO queue.
257  * It supports bulk operations.
258  */
259 template<class T>
260 class thread_safe_FIFO_queue: public fifo_queue<T>
261 {
262  /*
263  * lock is still needed because we need to check whether the buffer
264  * has entries or has space.
265  */
266  pthread_spinlock_t _lock;
267  int max_size;
268  std::string name;
269 
270 public:
271  thread_safe_FIFO_queue(const std::string &name, int node_id,
272  int size): fifo_queue<T>(node_id, size, false) {
273  this->name = name;
274  this->max_size = size;
275  pthread_spin_init(&_lock, PTHREAD_PROCESS_PRIVATE);
276  }
277 
278  thread_safe_FIFO_queue(const std::string &name, int node_id, int init_size,
279  int max_size): fifo_queue<T>(node_id, init_size,
280  max_size > init_size) {
281  this->name = name;
282  this->max_size = max_size;
283  pthread_spin_init(&_lock, PTHREAD_PROCESS_PRIVATE);
284  }
285 
286  virtual ~thread_safe_FIFO_queue() {
287  pthread_spin_destroy(&_lock);
288  }
289 
290  static thread_safe_FIFO_queue<T> *create(const std::string &name,
291  int node_id, int size) {
292  return new thread_safe_FIFO_queue<T>(name, node_id, size);
293  }
294 
295  static void destroy(thread_safe_FIFO_queue<T> *q) {
296  delete q;
297  }
298 
299  virtual int fetch(T *entries, int num) {
300  pthread_spin_lock(&_lock);
301  int ret = fifo_queue<T>::fetch(entries, num);
302  pthread_spin_unlock(&_lock);
303  return ret;
304  }
305 
306  virtual int add(T *entries, int num) {
307  pthread_spin_lock(&_lock);
308  int ret = fifo_queue<T>::add(entries, num);
309  int orig_size = fifo_queue<T>::get_size();
310  if (ret < num && orig_size < max_size) {
311  int new_size = orig_size;
312  int min_required_size = orig_size + num - ret;
313  while (new_size < min_required_size && new_size < max_size)
314  new_size *= 2;
315  fifo_queue<T>::expand_queue(new_size);
316  int ret2 = fifo_queue<T>::add(entries + ret, num - ret);
317  ret += ret2;
318  assert(ret == num);
319  }
320  pthread_spin_unlock(&_lock);
321  return ret;
322  }
323  virtual int add(fifo_queue<T> *queue) {
324  pthread_spin_lock(&_lock);
325  int ret = fifo_queue<T>::add(queue);
326  int orig_size = fifo_queue<T>::get_size();
327  if (!queue->is_empty() && orig_size < max_size) {
328  int new_size = orig_size;
329  int min_required_size = orig_size + queue->get_num_entries();
330  while (new_size < min_required_size && new_size < max_size)
331  new_size *= 2;
332  fifo_queue<T>::expand_queue(new_size);
333  int ret2 = fifo_queue<T>::add(queue);
334  ret += ret2;
335  assert(queue->is_empty());
336  }
337  pthread_spin_unlock(&_lock);
338  return 0;
339  }
340 
341  /*
342  * It guarantees to be able to add n entries to the queue.
343  * If there isn't enough space left, it will increase the capacity
344  * of the queue.
345  */
346  virtual void addByForce(T *entries, int num) {
347  BOOST_VERIFY(add(entries, num) == num);
348  }
349 
350  // TODO I should return reference.
351  T pop_front() {
352  T entry;
353  BOOST_VERIFY(fetch(&entry, 1) == 1);
354  return entry;
355  }
356 
357  void push_back(T &entry) {
358  while (add(&entry, 1) == 0);
359  }
360 
361  /*
362  * Get the existing entries from the queue.
363  * It locks the buffer, so don't over use it.
364  */
365  int get_num_entries() {
366  pthread_spin_lock(&_lock);
367  int num_entries = fifo_queue<T>::get_num_entries();
368  pthread_spin_unlock(&_lock);
369  return num_entries;
370  }
371 
372  // TODO these are bugs. They should be protected by locks.
373  bool is_full() {
374  pthread_spin_lock(&_lock);
375  bool ret = fifo_queue<T>::is_full();
376  pthread_spin_unlock(&_lock);
377  return ret;
378  }
379 
380  bool is_empty() {
381  pthread_spin_lock(&_lock);
382  bool ret = fifo_queue<T>::is_empty();
383  pthread_spin_unlock(&_lock);
384  return ret;
385  }
386 
387  const std::string &get_name() const {
388  return name;
389  }
390 };
391 
392 /*
393  * This FIFO queue can block the thread if
394  * a thread wants to add more entries when the queue is full;
395  * or
396  * a thread wants to fetch more entries when the queue is empty.
397  */
398 template<class T>
399 class blocking_FIFO_queue: public fifo_queue<T>
400 {
401  pthread_cond_t cond;
402  pthread_mutex_t mutex;
403  int num_empty;
404  int num_full;
405  int max_size;
406  bool interrupted;
407 
408  std::string name;
409 public:
410  blocking_FIFO_queue(int node_id, const std::string name, int init_size,
411  int max_size): fifo_queue<T>(node_id, init_size, max_size > init_size) {
412  assert(init_size <= max_size);
413  pthread_mutex_init(&mutex, NULL);
414  pthread_cond_init(&cond, NULL);
415  this->name = name;
416  num_empty = 0;
417  num_full = 0;
418  this->max_size = max_size;
419  interrupted = false;
420  }
421 
422  static blocking_FIFO_queue<T> *create(int node_id, const std::string name,
423  int init_size, int max_size) {
424  return new blocking_FIFO_queue<T>(node_id, name, init_size, max_size);
425  }
426 
427  static void destroy(blocking_FIFO_queue<T> *q) {
428  delete q;
429  }
430 
431  virtual int fetch(T *entries, int num) {
432  return fetch(entries, num, true, false);
433  }
434  virtual int add(T *entries, int num) {
435  fifo_queue<T> tmp(entries, num);
436  return add(&tmp);
437  }
438  virtual int add(fifo_queue<T> *queue) {
439  return add_partial(queue, INT_MAX);
440  }
441 
442  // Add at least `min_added' elements or all elements in `queue' are added.
443  int add_partial(fifo_queue<T> *queue, int min_added = 1);
444  int non_blocking_add(fifo_queue<T> *queue) {
445  return add_partial(queue, 0);
446  }
447  int non_blocking_add(T *entries, int num) {
448  fifo_queue<T> tmp(entries, num);
449  return non_blocking_add(&tmp);
450  }
451  int non_blocking_fetch(T *entries, int num) {
452  return fetch(entries, num, false, false);
453  }
454 
455  int fetch(T *entries, int num, bool blocking, bool interruptible);
456  int add(T *entries, int num, bool blocking, bool interruptible) {
457  // TODO
458  return -1;
459  }
460 
461  /*
462  * This method wakes up the thread that is waiting on the queue
463  * and can be interrupted.
464  * This can only wake up one thread.
465  */
466  void wakeup() {
467  pthread_mutex_lock(&mutex);
468  // We only try to wake up the thread when it's waiting for requests.
469  if (this->is_empty()) {
470  interrupted = true;
471  pthread_cond_signal(&cond);
472  }
473  pthread_mutex_unlock(&mutex);
474  }
475 
476  int get_num_empty() const {
477  return num_empty;
478  }
479 
480  int get_num_full() const {
481  return num_full;
482  }
483 
484  int get_num_entries() {
485  pthread_mutex_lock(&mutex);
486  int ret = fifo_queue<T>::get_num_entries();
487  pthread_mutex_unlock(&mutex);
488  return ret;
489  }
490 };
491 
492 /*
493  * This is used to allocate an array on the stack.
494  * Some code needs to allocate a large array sometimes, but most of time,
495  * it only needs a small array.
496  * This wrapper class is to avoid allocating a large array on the stack,
497  * while being efficient most of times.
498  */
499 template<class T, int size = 32>
500 class stack_array
501 {
502  T buf[size];
503  T *real_buf;
504  int capacity;
505 public:
506  stack_array(int capacity) {
507  if (capacity <= size) {
508  this->capacity = size;
509  real_buf = buf;
510  }
511  else {
512  this->capacity = capacity;
513  real_buf = new T[capacity];
514  }
515  }
516 
517  ~stack_array() {
518  if (real_buf != buf)
519  delete [] real_buf;
520  }
521 
522  T &operator[](int idx) {
523  assert(idx < capacity);
524  return real_buf[idx];
525  }
526 
527  T *data() {
528  return real_buf;
529  }
530 
531  const T *data() const {
532  return real_buf;
533  }
534 };
535 
536 template<class T, int size = 32>
537 class embedded_array
538 {
539  T buf[size];
540  T *real_buf;
541  int capacity;
542 
543  void assign(embedded_array<T, size> &arr) {
544  this->clear();
545 
546  if (arr.real_buf == arr.buf) {
547  for (int i = 0; i < capacity; i++) {
548  this->buf[i] = arr.buf[i];
549  arr.buf[i] = T();
550  }
551  this->real_buf = this->buf;
552  this->capacity = size;
553  }
554  else {
555  this->real_buf = arr.real_buf;
556  this->capacity = arr.capacity;
557  arr.real_buf = arr.buf;
558  arr.capacity = size;
559  }
560  }
561 public:
562  embedded_array() {
563  real_buf = buf;
564  capacity = size;
565  }
566 
567  embedded_array(embedded_array<T, size> &arr) {
568  assign(arr);
569  }
570 
571  embedded_array &operator=(embedded_array<T, size> &arr) {
572  assign(arr);
573  return *this;
574  }
575 
576  ~embedded_array() {
577  if (real_buf != buf)
578  delete [] real_buf;
579  }
580 
581  T &operator[](int idx) {
582  assert(idx < capacity);
583  return real_buf[idx];
584  }
585 
586  const T &operator[](int idx) const {
587  assert(idx < capacity);
588  return real_buf[idx];
589  }
590 
591  // This can only increase the capacity of the array.
592  void resize(int new_size) {
593  if (new_size <= capacity)
594  return;
595 
596  if (real_buf == buf) {
597  real_buf = new T[new_size];
598  memcpy(real_buf, buf, sizeof(buf));
599  }
600  else {
601  T *tmp = new T[new_size];
602  memcpy(tmp, real_buf, capacity * sizeof(T));
603  delete [] real_buf;
604  real_buf = tmp;
605  }
606  capacity = new_size;
607  }
608 
609  void clear() {
610  if (this->real_buf != this->buf) {
611  delete [] this->real_buf;
612  this->real_buf = this->buf;
613  this->capacity = size;
614  }
615  }
616 
617  int get_capacity() const {
618  return capacity;
619  }
620 
621  T *data() {
622  return real_buf;
623  }
624 
625  const T *data() const {
626  return real_buf;
627  }
628 };
629 
630 template<class T>
631 bool fifo_queue<T>::expand_queue(int new_size)
632 {
633  int log_size = (int) ceil(log2(new_size));
634  new_size = 1 << log_size;
635  assert(resizable && get_size() < new_size);
636  assert(allocated);
637 
638  // Allocate new memory for the array and initialize it.
639  T *tmp = alloc_buf(new_size);
640 
641  // Copy the old array to the new one.
642  int num = fifo_queue<T>::get_num_entries();
643  for (int i = 0; i < num; i++) {
644  tmp[i] = buf[loc_in_queue(start + i)];
645  }
646 
647  // Destroy the old array.
648  free_buf(buf);
649 
650  buf = tmp;
651  size_mask = new_size - 1;
652  start = 0;
653  end = num;
654  return true;
655 }
656 
657 template<class T>
658 int blocking_FIFO_queue<T>::add_partial(fifo_queue<T> *queue, int min_added)
659 {
660  int num_added = 0;
661  while (!queue->is_empty()) {
662  int num = queue->get_num_entries();
663  pthread_mutex_lock(&mutex);
664  bool empty = this->is_empty();
665  if (this->get_size() - fifo_queue<T>::get_num_entries() < num
666  && this->get_size() < max_size) {
667  int new_size = this->get_size() * 2;
668  new_size = max(new_size, fifo_queue<T>::get_num_entries() + num);
669  new_size = min(new_size, max_size);
670 #ifdef DEBUG
671  printf("try to expand queue %s to %d\n", name.c_str(), new_size);
672 #endif
673  BOOST_VERIFY(fifo_queue<T>::expand_queue(new_size));
674  }
675  int ret = fifo_queue<T>::add(queue);
676  num_added += ret;
677  /* signal the thread of reading disk to wake up. */
678  if (empty)
679  pthread_cond_broadcast(&cond);
680 
681  /* We only block the thread when it doesn't send enough data. */
682  if (num_added < min_added) {
683  while (this->is_full() && !queue->is_empty()) {
684  num_full++;
685  pthread_cond_wait(&cond, &mutex);
686  }
687  pthread_mutex_unlock(&mutex);
688  }
689  else {
690  pthread_mutex_unlock(&mutex);
691  break;
692  }
693  }
694  return num_added;
695 }
696 
697 template<class T>
698 int blocking_FIFO_queue<T>::fetch(T *entries, int num, bool blocking,
699  bool interruptible)
700 {
701  /* we have to wait for coming requests. */
702  pthread_mutex_lock(&mutex);
703  if (blocking) {
704  while(this->is_empty()) {
705  num_empty++;
706  if (interruptible && interrupted) {
707  // We need to reset the interrupt signal.
708  interrupted = false;
709  break;
710  }
711  pthread_cond_wait(&cond, &mutex);
712  }
713  }
714  bool full = this->is_full();
715  int ret = fifo_queue<T>::fetch(entries, num);
716  pthread_mutex_unlock(&mutex);
717 
718  /* wake up all threads to send more requests */
719  if (full)
720  pthread_cond_broadcast(&cond);
721 
722  return ret;
723 }
724 
725 #endif