FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
thread.h
1 #ifndef __MY_THREAD_H__
2 #define __MY_THREAD_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of SAFSlib.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <pthread.h>
24 #ifdef USE_HWLOC
25 #include <hwloc.h>
26 #endif
27 
28 #include <atomic>
29 #include <string>
30 #include <set>
31 
32 #include "concurrency.h"
33 #include "common.h"
34 #include "container.h"
35 
36 class thread
37 {
38  static pthread_key_t thread_key;
39  static atomic_integer num_threads;
40 
41  volatile pid_t tid;
42  int thread_idx;
43  int node_id;
44  // Indicate which CPU cores the thread is bound to.
45  std::vector<int> cpu_affinity;
46  pthread_t id;
47  bool blocking;
48  std::string name;
49 
50  volatile void *user_data;
51 
52  volatile bool _is_sleeping;
53  volatile bool _is_running;
54  volatile bool _has_exit;
55  volatile bool _is_activated;
56 
57  pthread_mutex_t mutex;
58  pthread_cond_t cond;
59 
60  void construct_init();
61 
62  friend void init_thread_class();
63  friend void *thread_run(void *arg);
64 public:
65  thread(std::string name, int node_id, bool blocking = true);
66  thread(std::string name, const std::vector<int> &cpu_affinity,
67  bool blocking = true);
68 
69  void set_user_data(void *user_data) {
70  assert(this->user_data == NULL);
71  this->user_data = user_data;
72  }
73 
74  void *get_user_data() const {
75  return (void *) user_data;
76  }
77 
78  virtual ~thread() {
79  stop();
80  if (thread_idx >= 0)
81  join();
82  }
83 
84  void activate() {
85  bool sleeping;
86  pthread_mutex_lock(&mutex);
87  _is_activated = true;
88  sleeping = _is_sleeping;
89  pthread_mutex_unlock(&mutex);
90  if (sleeping)
91  pthread_cond_signal(&cond);
92  }
93 
94  void wait() {
95  pthread_mutex_lock(&mutex);
96  if (!_is_activated) {
97  while (!_is_activated && _is_running) {
98  _is_sleeping = true;
99  int ret = pthread_cond_wait(&cond, &mutex);
100  _is_sleeping = false;
101  if (ret)
102  perror("pthread_cond_wait");
103  }
104  }
105  _is_activated = false;
106  pthread_mutex_unlock(&mutex);
107  }
108 
109  bool is_running() const {
110  return _is_running;
111  }
112 
113  bool is_blocking() const {
114  return blocking;
115  }
116 
117  int get_node_id() const {
118  return node_id;
119  }
120 
121  const std::vector<int> get_cpu_affinity() const {
122  return cpu_affinity;
123  }
124 
125  void stop() {
126  pthread_mutex_lock(&mutex);
127  // This is the only place where `is_running' is changed.
128  _is_running = false;
129  pthread_cond_signal(&cond);
130  pthread_mutex_unlock(&mutex);
131  }
132 
133  void exit() {
134  _has_exit = true;
135  }
136 
137  bool has_exit() const {
138  return _has_exit;
139  }
140 
141  void join() {
142  pthread_join(id, NULL);
143 #ifdef DEBUG
144  printf("stop thread %s\n", name.c_str());
145 #endif
146  thread_idx = -1;
147  }
148 
149  int get_id() const {
150  return thread_idx;
151  }
152  int get_tid() const {
153  if (get_curr_thread() == NULL || get_curr_thread() != this) {
154  // TODO I need a better way to wait for tid to be initialized.
155  while (tid < 0) { }
156  return tid;
157  }
158  else
159  return gettid();
160  }
161 
162  void start();
163  virtual void run() = 0;
164  virtual void init() {
165  }
166  virtual void cleanup() {
167  }
168 
169  /*
170  * This is to initialize the thread class instead of a single thread.
171  */
172  static void thread_class_init();
173 
174  static thread *get_curr_thread();
175 
176  /*
177  * This creates a thread instance to represent the current thread context.
178  * It is used when the current thread isn't created by the thread class.
179  */
180  static thread *represent_thread(int node_id);
181 
182  const std::string &get_thread_name() const {
183  return name;
184  }
185 };
186 
187 class thread_task
188 {
189 public:
190  virtual ~thread_task() {
191  }
192  virtual void run() = 0;
193 };
194 
195 class task_thread: public thread
196 {
197  fifo_queue<thread_task *> tasks;
198  std::atomic<size_t> num_pending;
199  bool all_complete;
200  pthread_mutex_t mutex;
201  pthread_cond_t cond;
202 public:
203  task_thread(const std::string &name, int node): thread(name,
204  node), tasks(node, 1024, true) {
205  pthread_mutex_init(&mutex, NULL);
206  pthread_cond_init(&cond, NULL);
207  all_complete = false;
208  num_pending = 0;
209  }
210 
211  task_thread(const std::string &name, const std::vector<int> &cpus,
212  int node): thread(name, cpus), tasks(node, 1024, true) {
213  pthread_mutex_init(&mutex, NULL);
214  pthread_cond_init(&cond, NULL);
215  all_complete = false;
216  num_pending = 0;
217  }
218 
219  void add_task(thread_task *t) {
220  pthread_mutex_lock(&mutex);
221  all_complete = false;
222  if (tasks.is_full())
223  tasks.expand_queue(tasks.get_size() * 2);
224  tasks.push_back(t);
225  pthread_mutex_unlock(&mutex);
226  activate();
227  num_pending++;
228  }
229 
230  void run() {
231  const int TASK_BUF_SIZE = 128;
232  thread_task *local_tasks[TASK_BUF_SIZE];
233  pthread_mutex_lock(&mutex);
234  while (!tasks.is_empty()) {
235  int num_tasks = tasks.fetch(local_tasks, TASK_BUF_SIZE);
236  pthread_mutex_unlock(&mutex);
237  for (int i = 0; i < num_tasks; i++) {
238  local_tasks[i]->run();
239  delete local_tasks[i];
240  num_pending--;
241  }
242  pthread_mutex_lock(&mutex);
243  }
244  all_complete = true;
245  pthread_mutex_unlock(&mutex);
246  pthread_cond_signal(&cond);
247  }
248 
249  void wait4complete() {
250  pthread_mutex_lock(&mutex);
251  while (!all_complete) {
252  pthread_cond_wait(&cond, &mutex);
253  }
254  pthread_mutex_unlock(&mutex);
255  }
256 
257  size_t get_num_pending() const {
258  return num_pending;
259  }
260 };
261 
262 #ifdef USE_HWLOC
263 
264 class CPU_core
265 {
266  std::vector<int> logical_units;
267 public:
268  CPU_core(hwloc_obj_t core);
269 
270  const std::vector<int> get_units() const {
271  return logical_units;
272  }
273 
274  off_t get_logical_unit(size_t idx) const {
275  return logical_units[idx];
276  }
277 
278  size_t get_num_units() const {
279  return logical_units.size();
280  }
281 };
282 
283 class NUMA_node
284 {
285  std::vector<CPU_core> cores;
286  std::set<int> lus;
287 public:
288  /* This constructor works for the machine without NUMA nodes. */
289  NUMA_node(hwloc_topology_t topology);
290  /* This constructor works for the machine with NUMA nodes. */
291  NUMA_node(hwloc_obj_t node);
292 
293  bool contain_lu(int unit) const {
294  return lus.find(unit) != lus.end();
295  }
296 
297  std::vector<int> get_logical_units() const;
298 
299  const CPU_core &get_core(size_t idx) const {
300  return cores[idx];
301  }
302 
303  size_t get_num_cores() const {
304  return cores.size();
305  }
306 
307  size_t get_num_logical_units() const {
308  return cores.size() * cores.front().get_num_units();
309  }
310 };
311 
312 class CPU_hierarchy
313 {
314  std::vector<NUMA_node> nodes;
315 public:
316  CPU_hierarchy();
317 
318  const NUMA_node &get_node(size_t idx) const {
319  return nodes[idx];
320  }
321 
322  size_t get_num_nodes() const {
323  return nodes.size();
324  }
325 
326  size_t get_num_cores() const {
327  return nodes.size() * nodes.front().get_num_cores();
328  }
329 
330  size_t get_num_logical_units() const {
331  return nodes.size() * nodes.front().get_num_logical_units();
332  }
333 
334  std::vector<int> lus2node(const std::vector<int> &lus) const;
335 };
336 
337 extern CPU_hierarchy cpus;
338 
339 #endif
340 
341 #endif