FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
EM_object.h
1 #ifndef __EM_OBJECT_H__
2 #define __EM_OBJECT_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashMatrix.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 #include <unordered_map>
23 
24 #include "safs_file.h"
25 #include "io_interface.h"
26 #include "local_vec_store.h"
27 #include "mem_worker_thread.h"
28 
29 namespace fm
30 {
31 
32 namespace detail
33 {
34 
35 class EM_object
36 {
37 public:
38  class file_holder {
39  bool persistent;
40  std::string file_name;
41  file_holder(const std::string &name, bool persistent) {
42  this->file_name = name;
43  this->persistent = persistent;
44  }
45  public:
46  typedef std::shared_ptr<file_holder> ptr;
47  static ptr create_temp(const std::string &name, size_t num_bytes,
48  safs::safs_file_group::ptr group);
49  static ptr create(const std::string &name);
50 
51  ~file_holder();
52  std::string get_name() const {
53  return file_name;
54  }
55  bool set_persistent(const std::string &new_name);
56  void unset_persistent();
57  bool is_persistent() const {
58  return persistent;
59  }
60  };
61 
62  class io_set {
63  safs::file_io_factory::shared_ptr factory;
64  // This keeps an I/O instance for each thread.
65  std::unordered_map<thread *, safs::io_interface::ptr> thread_ios;
66  pthread_key_t io_key;
67  pthread_spinlock_t io_lock;
68  public:
69  typedef std::shared_ptr<io_set> ptr;
70  io_set(safs::file_io_factory::shared_ptr factory);
71  ~io_set();
72 
73  safs::io_interface::ptr create_io();
74  // This returns the I/O instance for the curr thread.
75  safs::io_interface &get_curr_io() const;
76  // Test if the current thread has an I/O instance for the vector.
77  bool has_io() const;
78  };
79 
80  typedef std::shared_ptr<EM_object> ptr;
81  /*
82  * This creates an I/O instance for the current thread.
83  */
84  virtual std::vector<safs::io_interface::ptr> create_ios() const = 0;
85 };
86 
87 template<class T>
88 T round_ele(T val, size_t alignment, size_t ele_size)
89 {
90  assert(alignment % ele_size == 0);
91  alignment = alignment / ele_size;
92  return ROUND(val, alignment);
93 }
94 
95 template<class T>
96 T roundup_ele(T val, size_t alignment, size_t ele_size)
97 {
98  assert(alignment % ele_size == 0);
99  alignment = alignment / ele_size;
100  return ROUNDUP(val, alignment);
101 }
102 
103 /*
104  * This runs on the portion of the data in a data container when the portion
105  * of data is available in memory.
106  */
107 class portion_compute
108 {
109 public:
110  typedef std::shared_ptr<portion_compute> ptr;
111 
112  virtual ~portion_compute() {
113  }
114 
115  virtual void run(char *buf, size_t size) = 0;
116 };
117 
118 class portion_callback: public safs::callback
119 {
120  std::unordered_map<long, std::vector<portion_compute::ptr> > computes;
121 public:
122  typedef std::shared_ptr<portion_callback> ptr;
123 
124  static long get_portion_key(const safs::io_request &req) {
125  return (long) req.get_buf();
126  }
127 
128  virtual ~portion_callback() {
129  assert(computes.empty());
130  }
131 
132  bool has_callback() const {
133  return !computes.empty();
134  }
135 
136  bool has_callback(const safs::io_request &req) const {
137  auto it = computes.find(get_portion_key(req));
138  return it != computes.end();
139  }
140  void add(const safs::io_request &req, portion_compute::ptr compute) {
141  add(get_portion_key(req), compute);
142  }
143  void add(long key, portion_compute::ptr compute);
144  virtual int invoke(safs::io_request *reqs[], int num);
145 };
146 
147 class sync_read_compute: public portion_compute
148 {
149  bool &ready;
150 public:
151  sync_read_compute(bool &_ready): ready(_ready) {
152  }
153  virtual void run(char *buf, size_t size) {
154  ready = true;
155  }
156 };
157 
158 /*
159  * This task dispatcher enables 1D partitioning on the object.
160  */
161 class EM_portion_dispatcher: public task_dispatcher
162 {
163  size_t tot_len;
164  off_t portion_idx;
165  pthread_spinlock_t lock;
166  size_t portion_size;
167 public:
168  EM_portion_dispatcher(size_t tot_len, size_t portion_size) {
169  pthread_spin_init(&lock, PTHREAD_PROCESS_PRIVATE);
170  portion_idx = 0;
171  this->tot_len = tot_len;
172  this->portion_size = portion_size;
173  }
174 
175  size_t get_portion_size() const {
176  return portion_size;
177  }
178 
179  virtual bool issue_task() {
180  pthread_spin_lock(&lock);
181  off_t global_start = portion_idx * portion_size;
182  if ((size_t) global_start >= tot_len) {
183  pthread_spin_unlock(&lock);
184  return false;
185  }
186  size_t length = std::min(portion_size, tot_len - global_start);
187  portion_idx++;
188  pthread_spin_unlock(&lock);
189  create_task(global_start, length);
190  return true;
191  }
192 
193  virtual void create_task(off_t global_start, size_t length) = 0;
194 };
195 
196 }
197 
198 }
199 
200 #endif
Definition: io_request.h:491
Definition: io_interface.h:45
Definition: io_interface.h:154
io_interface::ptr create_io(std::shared_ptr< file_io_factory > factory, thread *t)