FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
mem_worker_thread.h
1 #ifndef __MEM_WORKER_THREAD_H__
2 #define __MEM_WORKER_THREAD_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashMatrix.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <memory>
24 #include <set>
25 #include <unordered_map>
26 
27 #include "thread.h"
28 #include "io_interface.h"
29 
30 #include "local_vec_store.h"
31 
32 namespace safs
33 {
34 class file_io_factory;
35 class io_interface;
36 class io_request;
37 }
38 
39 namespace fm
40 {
41 
42 namespace detail
43 {
44 
45 class pool_task_thread: public task_thread
46 {
47  int pool_thread_id;
48 public:
49  pool_task_thread(int pool_thread_id, const std::string &name,
50  const std::vector<int> &cpus, int node_id): task_thread(name, cpus,
51  node_id) {
52  this->pool_thread_id = pool_thread_id;
53  }
54 
55  pool_task_thread(int pool_thread_id, const std::string &name,
56  int node_id): task_thread(name, node_id) {
57  this->pool_thread_id = pool_thread_id;
58  }
59 
60  int get_pool_thread_id() const {
61  return pool_thread_id;
62  }
63 };
64 
65 /*
66  * This is designed to replace openmp for parallelization while respecting
67  * NUMA data locality.
68  */
69 class mem_thread_pool
70 {
71  size_t tot_num_tasks;
72  std::vector<size_t> ntasks_per_node;
73  std::vector<std::vector<std::shared_ptr<pool_task_thread> > > threads;
74 
75  mem_thread_pool(int num_nodes, int nthreads_per_node);
76 public:
77  typedef std::shared_ptr<mem_thread_pool> ptr;
78 
79  static ptr get_global_mem_threads();
80  static size_t get_global_num_threads();
81  static int get_curr_thread_id();
82  static void init_global_mem_threads(int num_nodes, int nthreads_per_node);
83 
84  static ptr create(int num_nodes, int nthreads_per_node) {
85  return ptr(new mem_thread_pool(num_nodes, nthreads_per_node));
86  }
87 
88  size_t get_num_pending() const;
89 
90  size_t get_num_nodes() const {
91  return ntasks_per_node.size();
92  }
93  size_t get_num_threads() const {
94  assert(!threads.empty());
95  return threads.size() * threads.front().size();
96  }
97 
98  void process_task(int node_id, thread_task *task);
99 
100  void wait4complete();
101 };
102 
103 /*
104  * This defines a set of I/O tasks that process an entire data container.
105  */
106 class task_dispatcher
107 {
108 public:
109  typedef std::shared_ptr<task_dispatcher> ptr;
110 
111  virtual ~task_dispatcher() {
112  }
113  /*
114  * Issue a task.
115  * This method must be thread-safe.
116  */
117  virtual bool issue_task() = 0;
118 };
119 
120 class EM_object;
121 
122 class io_worker_task: public thread_task
123 {
124  pthread_spinlock_t lock;
125  std::set<EM_object *> EM_objs;
126 
127  task_dispatcher::ptr dispatch;
128  int max_pending_ios;
129 public:
130  io_worker_task(task_dispatcher::ptr dispatch, int max_pending_ios = 4) {
131  pthread_spin_init(&lock, PTHREAD_PROCESS_PRIVATE);
132  this->dispatch = dispatch;
133  this->max_pending_ios = max_pending_ios;
134  }
135 
136  ~io_worker_task() {
137  pthread_spin_destroy(&lock);
138  }
139 
140  void register_EM_obj(EM_object *obj) {
141  pthread_spin_lock(&lock);
142  auto it = EM_objs.find(obj);
143  assert(it == EM_objs.end());
144  EM_objs.insert(obj);
145  pthread_spin_unlock(&lock);
146  }
147 
148  virtual void run();
149 };
150 
151 }
152 
153 }
154 
155 #endif