FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
comp_io_scheduler.h
1 #ifndef __COMP_IO_SCHEDULER_H__
2 #define __COMP_IO_SCHEDULER_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of SAFSlib.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <memory>
24 
25 #include "container.h"
26 #include "io_request.h"
27 
28 namespace safs
29 {
30 
38 {
39  // If user computation has generated too many requests, we may not
40  // complete a user computation (we can't fetch all requests generated
41  // by it). We have to keep the incomplete computation here, and we
42  // will try to fetch more requests from it later.
43  fifo_queue<user_compute *> incomplete_computes;
44  // The I/O instance where the I/O scheduler works on.
45  io_interface *io;
46 
47 public:
48  typedef std::shared_ptr<comp_io_scheduler> ptr;
53  fifo_queue<user_compute *>::const_iterator it;
54  public:
55  compute_iterator(const fifo_queue<user_compute *> &computes,
56  bool end): it(&computes) {
57  if (end)
58  it = computes.get_end();
59  }
60 
66  return *it;
67  }
68 
74  ++it;
75  return *this;
76  }
77 
83  bool operator==(const compute_iterator &it) const {
84  return this->it == it.it;
85  }
86 
92  bool operator!=(const compute_iterator &it) const {
93  return this->it != it.it;
94  }
95  };
96 
108  return compute_iterator(incomplete_computes, false);
109  }
110 
117  return compute_iterator(incomplete_computes, true);
118  }
119 
124  comp_io_scheduler(int node_id);
125 
126  virtual ~comp_io_scheduler() {
127  assert(incomplete_computes.is_empty());
128  }
129 
136  virtual size_t get_requests(fifo_queue<io_request> &reqs, size_t max) = 0;
137 
143  void set_io(io_interface *io) {
144  this->io = io;
145  }
146 
152  io_interface *get_io() const {
153  return io;
154  }
155 
160  void post_comp_process(user_compute *compute);
161 
166  static void delete_compute(user_compute *compute) {
167  assert(compute->test_flag(user_compute::IN_QUEUE));
168  compute->set_flag(user_compute::IN_QUEUE, false);
169  compute->dec_ref();
170  // the user compute isn't free'd here, it's the user's responsiblity
171  // of freeing it.
172  if (compute->get_ref() == 0) {
173  compute_allocator *alloc = compute->get_allocator();
174  alloc->free(compute);
175  }
176  }
177 
183  return incomplete_computes.get_num_entries();
184  }
185 
192  bool is_empty() {
193  return incomplete_computes.is_empty();
194  }
195 
200  virtual void gc_computes();
201 };
202 
203 /*
204  * This is a default I/O scheduler for requests generated by user tasks.
205  * This scheduler doesn't make much assumption on the requests.
206  * So it reads as many requests as possible from a user task, and then
207  * read from the next user task until it reaches the limit. The requests
208  * will be returned in a sorted order.
209  *
210  * This scheduler works to favor latency, so we finish one user task
211  * before processing the next task.
212  */
213 class default_comp_io_scheduler: public comp_io_scheduler
214 {
215  // Indicate whether there are completed user computes.
216  bool has_completed;
217  compute_iterator curr_it;
218 public:
219  default_comp_io_scheduler(int node_id): comp_io_scheduler(node_id), curr_it(
220  get_end()) {
221  has_completed = false;
222  }
223 
224  virtual size_t get_requests(fifo_queue<io_request> &reqs, size_t max);
225  virtual void gc_computes();
226 };
227 
228 }
229 
230 #endif
virtual void gc_computes()
compute_iterator get_begin() const
Definition: comp_io_scheduler.h:107
io_interface * get_io() const
Definition: comp_io_scheduler.h:152
bool operator!=(const compute_iterator &it) const
Definition: comp_io_scheduler.h:92
comp_io_scheduler(int node_id)
bool operator==(const compute_iterator &it) const
Definition: comp_io_scheduler.h:83
size_t get_num_incomplete_computes()
Definition: comp_io_scheduler.h:182
void set_io(io_interface *io)
Definition: comp_io_scheduler.h:143
virtual size_t get_requests(fifo_queue< io_request > &reqs, size_t max)=0
bool is_empty()
Definition: comp_io_scheduler.h:192
Definition: io_request.h:316
static void delete_compute(user_compute *compute)
Definition: comp_io_scheduler.h:166
void post_comp_process(user_compute *compute)
virtual void free(user_compute *compute)=0
void dec_ref()
Definition: io_request.h:438
Definition: io_interface.h:154
Definition: comp_io_scheduler.h:37
compute_iterator get_end() const
Definition: comp_io_scheduler.h:116
Definition: io_request.h:454
user_compute * operator*() const
Definition: comp_io_scheduler.h:65
Definition: comp_io_scheduler.h:52
compute_allocator * get_allocator() const
Definition: io_request.h:338
int get_ref() const
Definition: io_request.h:446
compute_iterator & operator++()
Definition: comp_io_scheduler.h:73