FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
in_mem_io.h
1 #ifndef __IN_MEM_IO_H__
2 #define __IN_MEM_IO_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of SAFSlib.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include "NUMA_mapper.h"
24 
25 #include "io_interface.h"
26 #include "comp_io_scheduler.h"
27 #include "cache.h"
28 
29 namespace safs
30 {
31 
32 class NUMA_buffer
33 {
34  std::vector<std::shared_ptr<char> > bufs;
35  // This has the length of individual buffers.
36  // The sum of the physical buffer lengths >= the total length;
37  std::vector<size_t> buf_lens;
38  // This is the total length of the buffer.
39  size_t length;
40  NUMA_mapper mapper;
41 
42  struct data_loc_info {
43  int node_id;
44  off_t local_off;
45  size_t local_size;
46  data_loc_info(int node_id, off_t local_off, size_t local_size) {
47  this->node_id = node_id;
48  this->local_off = local_off;
49  this->local_size = local_size;
50  }
51  };
52  data_loc_info get_data_loc(off_t off, size_t size) const;
53 
54  NUMA_buffer(std::shared_ptr<char>, size_t length, const NUMA_mapper &mapper);
55  NUMA_buffer(size_t length, const NUMA_mapper &mapper);
56 public:
57  typedef std::pair<const char *, size_t> cdata_info;
58  typedef std::pair<char *, size_t> data_info;
59  typedef std::shared_ptr<NUMA_buffer> ptr;
60 
61  /*
62  * Load data in a file to the buffer.
63  */
64  static ptr load(const std::string &file, const NUMA_mapper &mapper);
65  static ptr load_safs(const std::string &file, const NUMA_mapper &mapper);
66 
67  static ptr create(std::shared_ptr<char>, size_t length,
68  const NUMA_mapper &mapper);
69  static ptr create(size_t length, const NUMA_mapper &mapper) {
70  return ptr(new NUMA_buffer(length, mapper));
71  }
72 
73  size_t get_length() const {
74  return length;
75  }
76 
77  /*
78  * Get the data in the specified location.
79  * Since the data in the buffer isn't stored contiguously, the size of
80  * the returned data may be smaller than the specified size.
81  */
82  cdata_info get_data(off_t off, size_t size) const;
83  data_info get_data(off_t off, size_t size);
84 
85  /*
86  * Write the data in the given buffer to the specified location.
87  */
88  void copy_from(const char *buf, size_t size, off_t off);
89  /*
90  * Copy the data in the specified location to the given buffer.
91  */
92  void copy_to(char *buf, size_t size, off_t off) const;
93 
94  void dump(const std::string &file);
95 };
96 
97 /*
98  * This class provides a single I/O interface for accessing data in memory.
99  * The main reason of having this class is to allow us to reuse a lot of code
100  * when data is stored in memory.
101  */
102 class in_mem_io: public io_interface
103 {
104  NUMA_buffer::ptr data;
105  int file_id;
106  fifo_queue<io_request> req_buf;
107  comp_io_scheduler::ptr comp_io_sched;
108  std::unique_ptr<byte_array_allocator> array_allocator;
109 
110  callback::ptr cb;
111 
112  void process_req(const io_request &req);
113  void process_computes();
114 public:
115  in_mem_io(NUMA_buffer::ptr data, int file_id, thread *t);
116 
117  virtual int get_file_id() const {
118  return file_id;
119  }
120 
121  virtual bool support_aio() {
122  return true;
123  }
124 
125  virtual bool set_callback(callback::ptr cb) {
126  this->cb = cb;
127  return true;
128  }
129 
130  virtual bool have_callback() const {
131  return cb != NULL;
132  }
133 
134  virtual callback &get_callback() {
135  return *cb;
136  }
137 
138  virtual void flush_requests() { }
139 
140  virtual int num_pending_ios() const {
141  return 0;
142  }
143 
144  virtual io_status access(char *buf, off_t off, ssize_t size,
145  int access_method);
146  virtual void access(io_request *requests, int num, io_status *status);
147  virtual int wait4complete(int) {
148  return 0;
149  }
150  virtual std::shared_ptr<io_select> create_io_select() const;
151 };
152 
153 class in_mem_io_factory: public file_io_factory
154 {
155  // The I/O interface doesn't own the byte array.
156  NUMA_buffer::ptr data;
157  int file_id;
158 public:
159  in_mem_io_factory(NUMA_buffer::ptr data, int file_id,
160  const std::string file_name): file_io_factory(file_name) {
161  this->data = data;
162  this->file_id = file_id;
163  }
164 
165  virtual int get_file_id() const {
166  return file_id;
167  }
168 
169  virtual io_interface::ptr create_io(thread *t) {
170  return io_interface::ptr(new in_mem_io(data, file_id, t));
171  }
172 
173  virtual void destroy_io(io_interface &) {
174  }
175 };
176 
177 }
178 
179 #endif
io_interface::ptr create_io(std::shared_ptr< file_io_factory > factory, thread *t)