FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
io_interface.h
1 #ifndef __IO_INTERFACE_H__
2 #define __IO_INTERFACE_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of SAFSlib.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <stdlib.h>
24 
25 #include <vector>
26 #include <memory>
27 
28 #include "config_map.h"
29 #include "exception.h"
30 #include "common.h"
31 #include "concurrency.h"
32 #include "thread.h"
33 #include "io_request.h"
34 #include "comm_exception.h"
35 #include "safs_header.h"
36 
37 namespace safs
38 {
39 
40 class io_request;
41 
45 class callback
46 {
47 public:
48  typedef std::shared_ptr<callback> ptr;
49 
50  virtual ~callback() {
51  }
52 
58  virtual int invoke(io_request *reqs[], int num) = 0;
59 };
60 
65 class io_status
66 {
67  long status: 8;
68  long priv_data: 56;
69 public:
70  io_status() {
71  status = 0;
72  priv_data = 0;
73  }
74 
75  io_status(int status) {
76  this->status = status;
77  priv_data = 0;
78  }
79 
80  void set_priv_data(long data) {
81  priv_data = data;
82  }
83 
84  long get_priv_data() const {
85  return priv_data;
86  }
87 
88  io_status &operator=(int status) {
89  this->status = status;
90  return *this;
91  }
92 
93  bool operator==(int status) {
94  return this->status == status;
95  }
96 };
97 
98 enum
99 {
100  IO_OK,
101  IO_PENDING = -1,
102  IO_FAIL = -2,
103  IO_UNSUPPORTED = -3,
104 };
105 
109 enum {
110  /*
111  * The three methods below are used internally or for testing only.
112  */
113  READ_ACCESS,
114  DIRECT_ACCESS,
115  AIO_ACCESS,
116 
122 
129 
136 
142 };
143 
144 class file_io_factory;
145 class io_select;
146 
155 {
156  safs_header header;
157  thread *curr; // the thread where the IO instance runs.
158 
159  // This is an index for locating this IO object in a global table.
160  int io_idx;
161  int max_num_pending_ios;
162  static atomic_integer io_counter;
163  // Keep the I/O factory alive.
164  std::shared_ptr<file_io_factory> io_factory;
165 
166 protected:
167  io_interface(thread *t, const safs_header &header) {
168  this->header = header;
169  this->curr = t;
170  this->io_idx = io_counter.inc(1) - 1;
171  max_num_pending_ios = params.get_max_num_pending_ios();
172  }
173 
174 public:
175  typedef std::shared_ptr<io_interface> ptr;
176 
177  virtual ~io_interface();
178 
179  const safs_header &get_header() const {
180  return header;
181  }
182 
183  int get_block_size() const {
184  if (header.is_valid())
185  return header.get_block_size();
186  else
187  return params.get_RAID_block_size();
188  }
189 
190  /*
191  * This stores the I/O factory that creates the I/O instance.
192  * The main purpose is to keep the I/O factory alive.
193  */
194  void set_owner(std::shared_ptr<file_io_factory> io_factory) {
195  this->io_factory = io_factory;
196  }
197 
202  thread *get_thread() const {
203  assert(curr);
204  return curr;
205  }
206 
211  int get_node_id() const {
212  assert(curr);
213  return curr->get_node_id();
214  }
215 
220  int get_io_id() const {
221  return io_idx;
222  }
223 
231  }
232 
238  virtual int get_file_id() const = 0;
239 
243  virtual void cleanup() {
244  }
245 
246  /*
247  * This method prints the current state of the IO instance.
248  * For the sake of performance, the method may not be thread-safe.
249  * It should be used with caution.
250  */
251  virtual void print_state() {
252  }
253 
254  virtual io_interface *clone(thread *t) const {
255  return NULL;
256  }
257 
262  virtual bool support_aio() {
263  return false;
264  }
265 
276  virtual void access(io_request *requests, int num, io_status *status = NULL) {
277  throw unsupported_exception();
278  }
285  virtual void flush_requests() {
286  throw unsupported_exception();
287  }
293  virtual int wait4complete(int num) {
294  throw unsupported_exception();
295  }
300  virtual int num_pending_ios() const {
301  throw unsupported_exception();
302  }
308  virtual int get_max_num_pending_ios() const {
309  return max_num_pending_ios;
310  }
316  virtual void set_max_num_pending_ios(int max) {
317  this->max_num_pending_ios = max;
318  }
319 
328  virtual void notify_completion(io_request *reqs[], int num) {
329  if (have_callback())
330  get_callback().invoke(reqs, num);
331  }
332 
339  virtual bool set_callback(callback::ptr cb) {
340  throw unsupported_exception();
341  }
342 
343  virtual bool have_callback() const {
344  return false;
345  }
346 
351  virtual callback &get_callback() {
352  throw unsupported_exception();
353  }
354 
364  virtual io_status access(char *buf, off_t off, ssize_t size,
365  int access_method) {
366  return IO_UNSUPPORTED;
367  }
368 
369  virtual std::shared_ptr<io_select> create_io_select() const {
370  return std::shared_ptr<io_select>();
371  }
372 };
373 
382 {
383 public:
384  typedef std::shared_ptr<io_select> ptr;
385 
391  virtual bool add_io(io_interface::ptr io) = 0;
396  virtual int num_pending_ios() const = 0;
397  virtual int wait4complete(int num_to_complete) = 0;
398 };
399 
405 io_interface::ptr create_io(std::shared_ptr<file_io_factory> factory, thread *t);
406 
407 class comp_io_scheduler;
408 
413 {
414 public:
415  typedef std::shared_ptr<comp_io_sched_creator> ptr;
421  virtual std::shared_ptr<comp_io_scheduler> create(int node_id) const = 0;
422 };
423 
429 {
430  safs_header header;
431  comp_io_sched_creator::ptr creator;
432  // The name of the file.
433  const std::string name;
434 
435  /*
436  * This method creates an I/O instance for the specified thread.
437  * \param t the thread where the I/O instance can be used.
438  * \return the I/O instance.
439  */
440  virtual io_interface::ptr create_io(thread *t) = 0;
441 
442  /*
443  * This method is to notify the I/O factory that an I/O instance is destroyed.
444  * \param io the I/O instance to be destroyed.
445  */
446  virtual void destroy_io(io_interface &io) = 0;
447 public:
448  typedef std::shared_ptr<file_io_factory> shared_ptr;
449 
450  file_io_factory(const std::string _name);
451 
452  virtual ~file_io_factory() {
453  }
454 
455  const safs_header &get_header() const {
456  return header;
457  }
458 
465  void set_sched_creator(comp_io_sched_creator::ptr creator) {
466  this->creator = creator;
467  }
468 
473  comp_io_sched_creator::ptr get_sched_creator() const {
474  return creator;
475  }
476 
482  const std::string &get_name() const {
483  return name;
484  }
485 
490  virtual int get_file_id() const = 0;
491 
492  virtual void print_state() {
493  }
494 
495  virtual void collect_stat(io_interface &) {
496  }
497 
498  virtual void print_statistics() const {
499  }
500 
505  ssize_t get_file_size() const;
506 
507  friend io_interface::ptr create_io(file_io_factory::shared_ptr factory, thread *t);
508  friend class io_interface;
509 };
510 
511 class cache_config;
512 class RAID_config;
513 
514 io_select::ptr create_io_select(const std::vector<io_interface::ptr> &ios);
515 
523 file_io_factory::shared_ptr create_io_factory(const std::string &file_name,
524  const int access_option);
525 
535 void init_io_system(config_map::ptr map, bool with_cache = true);
536 
540 void destroy_io_system();
541 
545 bool is_safs_init();
546 
550 const RAID_config &get_sys_RAID_conf();
551 
555 const std::vector<int> &get_io_cpus();
556 
561 void print_io_thread_stat();
562 
566 void print_io_summary();
567 
577 void set_file_weight(const std::string &file_name, int weight);
578 
579 }
580 
581 #endif
virtual int get_max_num_pending_ios() const
Definition: io_interface.h:308
Definition: io_interface.h:428
Definition: io_interface.h:65
virtual int wait4complete(int num)
Definition: io_interface.h:293
Definition: io_request.h:491
void init_io_system(config_map::ptr map, bool with_cache=true)
Definition: io_interface.h:141
void destroy_io_system()
virtual void access(io_request *requests, int num, io_status *status=NULL)
Definition: io_interface.h:276
Definition: io_interface.h:412
virtual bool set_callback(callback::ptr cb)
Definition: io_interface.h:339
Definition: io_interface.h:128
Definition: io_interface.h:381
void print_io_summary()
virtual void flush_requests()
Definition: io_interface.h:285
virtual int get_file_id() const =0
Definition: io_interface.h:45
void set_sched_creator(comp_io_sched_creator::ptr creator)
Definition: io_interface.h:465
file_io_factory::shared_ptr create_io_factory(const std::string &file_name, const int access_option)
int get_node_id() const
Definition: io_interface.h:211
const std::string & get_name() const
Definition: io_interface.h:482
const std::vector< int > & get_io_cpus()
Definition: io_interface.h:121
virtual bool support_aio()
Definition: io_interface.h:262
Definition: io_interface.h:135
Definition: io_interface.h:154
Definition: comp_io_scheduler.h:37
virtual void set_max_num_pending_ios(int max)
Definition: io_interface.h:316
int get_remaining_io_slots() const
Definition: io_interface.h:229
virtual int get_file_id() const =0
int get_io_id() const
Definition: io_interface.h:220
const RAID_config & get_sys_RAID_conf()
thread * get_thread() const
Definition: io_interface.h:202
virtual std::shared_ptr< comp_io_scheduler > create(int node_id) const =0
virtual io_status access(char *buf, off_t off, ssize_t size, int access_method)
Definition: io_interface.h:364
virtual void cleanup()
Definition: io_interface.h:243
virtual int num_pending_ios() const =0
bool is_safs_init()
comp_io_sched_creator::ptr get_sched_creator() const
Definition: io_interface.h:473
virtual bool add_io(io_interface::ptr io)=0
virtual callback & get_callback()
Definition: io_interface.h:351
void set_file_weight(const std::string &file_name, int weight)
ssize_t get_file_size() const
virtual int invoke(io_request *reqs[], int num)=0
virtual int num_pending_ios() const
Definition: io_interface.h:300
io_interface::ptr create_io(std::shared_ptr< file_io_factory > factory, thread *t)