FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
part_global_cached_private.h
1 #ifndef __PART_GLOBAL_CACHED_PRIVATE_H__
2 #define __PART_GLOBAL_CACHED_PRIVATE_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of SAFSlib.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <errno.h>
24 
25 #include <tr1/unordered_map>
26 
27 #ifdef PART_IO
28 
29 #include "parameters.h"
30 #include "messaging.h"
31 #include "global_cached_private.h"
32 
33 namespace safs
34 {
35 
36 const int REPLY_BUF_SIZE = 1000;
37 const int REQ_BUF_SIZE = 1000;
38 const int MSG_BUF_SIZE = 128;
39 
40 // The size of a reply >= sizeof(io_reply).
41 const int NUMA_REPLY_BUF_SIZE = NUMA_MSG_SIZE / sizeof(io_reply);
42 
43 struct thread_group;
44 class part_io_process_table;
45 class disk_io_thread;
46 class file_mapper;
47 class group_request_sender;
48 class NUMA_cache;
49 
56 class part_global_cached_io: public io_interface
57 {
58  part_io_process_table *global_table;
59  const struct thread_group *local_group;
60  const cache_config *cache_conf;
61 
62  global_cached_io *underlying;
63 
64  msg_queue<io_reply> *reply_queue;
65  // This reply message buffer is used when copying remote messages
66  // to the local memory.
67  message<io_reply> local_reply_msgs[MSG_BUF_SIZE];
68  // This request buffer is used when distributing requests.
69  io_request local_req_buf[REQ_BUF_SIZE];
70  // This reply buffer is used when processing replies.
71  io_reply local_reply_buf[NUMA_REPLY_BUF_SIZE];
72 
73  /*
74  * there is a sender for each node.
75  */
76  // group id <-> msg sender
77  std::tr1::unordered_map<int, group_request_sender *> req_senders;
78 
79  /*
80  * These reply senders are to send replies to this IO. They are made
81  * to be thread-safe, so all threads can use them. However, the remote
82  * access on a NUMA machine is slow, so each NUMA node has a copy of
83  * the reply sender to improve performance.
84  */
85  thread_safe_msg_sender<io_reply> *reply_sender;
86 
87  // All these variables are updated in one thread, so it's fine without
88  // any concurrency control.
89  long processed_requests;
90  long sent_requests;
91  atomic_number<long> processed_replies;
92  long remote_reads;
93 
94  // It's the callback from the user.
95  callback *final_cb;
96 
97  int process_replies();
98  void notify_upper(io_request *reqs[], int num);
99 
100  part_global_cached_io(io_interface *underlying, part_io_process_table *);
101  ~part_global_cached_io();
102 public:
103  static part_io_process_table *init_subsystem(
104  const std::vector<disk_io_thread *> &io_threads,
105  file_mapper *mapper, NUMA_cache *cache);
106  static int destroy_subsystem(part_io_process_table *table);
107 
108  static part_global_cached_io *create(io_interface *underlying,
109  part_io_process_table *table) {
110  int node_id = underlying->get_node_id();
111  assert(node_id >= 0);
112  return new part_global_cached_io(underlying, table);
113  }
114 
115  static void destroy(part_global_cached_io *io) {
116  delete io;
117  }
118 
119  int init();
120 
121  virtual bool set_callback(callback *cb) {
122  this->final_cb = cb;
123  return true;
124  }
125 
126  virtual callback *get_callback() {
127  return final_cb;
128  }
129 
130  int reply(io_request *requests, io_reply *replies, int num);
131 
132  int process_requests(int max_nreqs);
133 
134  int process_reply(io_reply *reply);
135 
136  virtual void notify_completion(io_request *reqs[], int num);
137  void access(io_request *requests, int num, io_status *status);
138  io_status access(char *, off_t, ssize_t, int) {
139  return IO_UNSUPPORTED;
140  }
141 
142  void flush_requests();
143 
144  void cleanup();
145  int preload(off_t start, long size);
146 
147  bool support_aio() {
148  return true;
149  }
150 
151  virtual int get_file_id() const {
152  return underlying->get_file_id();
153  }
154  virtual int wait4complete(int num);
155  virtual int num_pending_ios() const {
156  // the number of pending requests on the remote nodes.
157  return sent_requests - processed_replies.get()
158  // The number of pending requests in the local IO instance.
159  + underlying->num_pending_ios();
160  }
161 
162  friend class node_cached_io;
163 
164  virtual void print_state();
165 };
166 
167 }
168 #endif
169 
170 #endif