FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
local_mem_buffer.h
1 #ifndef __LOCAL_MEM_BUFFER_H__
2 #define __LOCAL_MEM_BUFFER_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashMatrix.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <assert.h>
24 
25 #include <atomic>
26 #include <memory>
27 #include <deque>
28 #include <vector>
29 #include <unordered_map>
30 
31 #include "concurrency.h"
32 
33 namespace fm
34 {
35 
36 namespace detail
37 {
38 
39 class local_matrix_store;
40 
41 /*
42  * This class keeps memory buffers in the local thread.
43  *
44  * If a piece of memory of the same size is being used again in the near
45  * future, we can buffer the piece of memory and reuse it, so that we can
46  * avoid allocating a large piece of memory from malloc.
47  * It turns out to be fairly expensive to allocate a large piece of memory
48  * with malloc in multiple threads. It can cause lock contention.
49  *
50  * If a portion of data in a matrix store will be used again (which is very
51  * likely to happen in a chain of matrix computation), we should buffer it.
52  *
53  * The buffered data is guaranteed to be used in the same thread, so locking
54  * isn't needed.
55  */
56 class local_mem_buffer
57 {
58 public:
59  typedef std::pair<size_t, std::shared_ptr<char> > irreg_buf_t;
60  enum buff_type {
61  REG_BUF,
62  IRREG_BUF,
63  MAT_PORTION,
64  ALL,
65  };
66 private:
67  // The lock is to protect `mem_set'.
68  static spin_lock mem_lock;
69  // This contains all local mem buffers in the system.
70  static std::vector<local_mem_buffer *> mem_set;
71  static std::atomic<bool> initialized;
72  static pthread_key_t mem_key;
73 
74  size_t num_allocs;
75  size_t num_frees;
76  /*
77  * This buffers the memory allocated for part of a vector or a matrix.
78  * The key is the size of a buffer. It works like a slab allocator.
79  */
80  std::unordered_map<size_t, std::deque<char *> > bufs;
81 
82  /*
83  * This buffers a portion of a dense matrix.
84  * The portion of the data is valid. It may be read from the SSDs
85  * or materialized in a virtual matrix.
86  * The key is the data Id of a dense matrix.
87  */
88  std::unordered_map<long, std::shared_ptr<const local_matrix_store> > portions;
89 
90  std::deque<irreg_buf_t> irreg_bufs;
91 
92  local_mem_buffer() {
93  num_allocs = 0;
94  num_frees = 0;
95  }
96  std::shared_ptr<char> _alloc(size_t num_bytes);
97 
98  void _cache_portion(long key,
99  std::shared_ptr<const local_matrix_store> portion);
100  std::shared_ptr<const local_matrix_store> _get_mat_portion(long key);
101 
102  void clear_local_bufs(buff_type type);
103 public:
104  /*
105  * We initialize the memory buffers when the system starts to run and
106  * destroy them after system stops.
107  */
108  static bool init();
109  static void destroy();
110  /*
111  * This function clears per-thread memory buffers.
112  * It is called after each computation on data containers.
113  * We should delete the local buffer as much as possible to reduce
114  * memory consumption.
115  */
116  static void clear_bufs(buff_type type = buff_type::ALL);
117 
118  /*
119  * We cache matrix portions for EM matrix store and mapply virtual matrix
120  * store, so that we can reduce amount of data read from disks and
121  * save some computation.
122  */
123  static void cache_portion(long key,
124  std::shared_ptr<const local_matrix_store> portion);
125  static std::shared_ptr<const local_matrix_store> get_mat_portion(long key);
126 
127  /*
128  * This function allocates memory from the memory buffer in the local thread.
129  */
130  static std::shared_ptr<char> alloc(size_t num_bytes);
131 
132  /*
133  * Cache a memory buffer of irregular size in the local thread.
134  */
135  static void cache_irreg(irreg_buf_t buf);
136  /*
137  * Get a piece of memory of irregular size buffered in the local thread.
138  */
139  static irreg_buf_t get_irreg();
140 
141  ~local_mem_buffer();
142 };
143 
144 }
145 
146 }
147 
148 #endif