FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
remote_access.h
1 #ifndef __REMOTE_ACCESS_H__
2 #define __REMOTE_ACCESS_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of SAFSlib.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include "slab_allocator.h"
24 #include "io_interface.h"
25 #include "container.h"
26 
27 namespace safs
28 {
29 
30 class request_sender;
31 class disk_io_thread;
32 class file_mapper;
33 
34 /*
35  * This class is to help the local thread send IO requests to remote threads
36  * dedicated to accessing SSDs. Each SSD has such a thread.
37  * However, the helper class isn't thread safe, so each local thread has to
38  * reference its own helper object.
39  */
40 class remote_io: public io_interface
41 {
42  static atomic_integer num_ios;
43  const int max_disk_cached_reqs;
44  // They work as buffers for requests and are only used to
45  // send high-priority requests.
46  std::vector<request_sender *> senders;
47  // They are used to send low-priority requests.
48  std::vector<request_sender *> low_prio_senders;
49  std::vector<std::shared_ptr<disk_io_thread> > io_threads;
50  callback::ptr cb;
51  file_mapper *block_mapper;
52  thread_safe_FIFO_queue<io_request> complete_queue;
53  slab_allocator &msg_allocator;
54 
55  atomic_integer num_completed_reqs;
56  atomic_integer num_issued_reqs;
57 public:
58  typedef std::shared_ptr<remote_io> ptr;
59 
60  remote_io(const std::vector<std::shared_ptr<disk_io_thread> > &remotes,
61  slab_allocator &msg_allocator, file_mapper *mapper, thread *t,
62  const safs_header &header, int max_reqs = MAX_DISK_CACHED_REQS);
63 
64  ~remote_io();
65 
66  virtual int process_completed_requests(io_request reqs[], int num);
67  int process_completed_requests(int num);
68  int process_all_completed_requests();
69 
70  virtual int thread_init() {
71  return 0;
72  }
73 
74  virtual bool support_aio() {
75  return true;
76  }
77 
78  virtual void cleanup();
79 
80  virtual bool set_callback(callback::ptr cb) {
81  this->cb = cb;
82  return true;
83  }
84 
85  virtual bool have_callback() const {
86  return cb != NULL;
87  }
88 
89  virtual callback &get_callback() {
90  return *cb;
91  }
92 
93  virtual int get_file_id() const;
94 
95  virtual void access(io_request *requests, int num,
96  io_status *status = NULL);
97  virtual void notify_completion(io_request *reqs[], int num);
98  virtual int wait4complete(int num_to_complete);
99  virtual int num_pending_ios() const {
100  return num_issued_reqs.get() - num_completed_reqs.get();
101  }
102  virtual io_interface *clone(thread *t) const;
103  void flush_requests(int max_cached);
104  virtual void flush_requests();
105  virtual void print_state();
106 
107  size_t get_num_reqs() const {
108  return num_issued_reqs.get();
109  }
110 
111  virtual io_select::ptr create_io_select() const;
112 };
113 
114 }
115 
116 #endif
virtual io_status access(char *buf, off_t off, ssize_t size, int access_method)
Definition: io_interface.h:364