FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
trace_logger.h
1 #ifndef __TRACE_LOGGER_H__
2 #define __TRACE_LOGGER_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashGraph.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <stdio.h>
24 #include <pthread.h>
25 
26 #include <string>
27 #include <vector>
28 
29 #include "thread.h"
30 #include "container.h"
31 
32 namespace fg
33 {
34 
35 class log_thread: public thread
36 {
37  FILE *f;
38  std::string trace_file;
39  thread_safe_FIFO_queue<std::vector<safs::request_range> *> queue;
40 public:
41  log_thread(const std::string &trace_file): thread(
42  "trace_log_thread", 0), queue("log_queue", 0, 1024, INT_MAX) {
43  f = fopen(trace_file.c_str(), "w");
44  }
45 
46  void add(std::vector<safs::request_range> *reqs) {
47  int ret = queue.add(&reqs, 1);
48  if (ret < 1)
49  fprintf(stderr, "can't add traced requests to the queue\n");
50  activate();
51  }
52 
53  void run() {
54  while (!queue.is_empty()) {
55  std::vector<safs::request_range> *reqs = queue.pop_front();
56  for (size_t i = 0; i < reqs->size(); i++) {
57  safs::request_range req = reqs->at(i);
58  fprintf(f, ",%ld,%ld,R,%ld\n",
59  req.get_loc().get_offset(), req.get_size(), req.get_size());
60  }
61  delete reqs;
62  }
63  }
64 
65  void close() {
66  fclose(f);
67  stop();
68  join();
69  }
70 };
71 
72 const size_t MAX_LOG_BUF = 1024 * 32;
73 
74 static void destroy_queue(void *p)
75 {
76  std::vector<safs::request_range> *q = (std::vector<safs::request_range> *) p;
77  delete q;
78 }
79 
80 class trace_logger
81 {
82  log_thread *thread;
83  pthread_key_t queue_key;
84 
85  std::vector<safs::request_range> *get_per_thread_queue() {
86  std::vector<safs::request_range> *p
87  = (std::vector<safs::request_range> *) pthread_getspecific(queue_key);
88  if (p == NULL) {
89  p = new std::vector<safs::request_range>();
90  pthread_setspecific(queue_key, p);
91  }
92  return p;
93  }
94 public:
95  typedef std::shared_ptr<trace_logger> ptr;
96 
97  trace_logger(const std::string &trace_file) {
98  thread = new log_thread(trace_file);
99  thread->start();
100  pthread_key_create(&queue_key, destroy_queue);
101  }
102 
103  ~trace_logger() {
104  close();
105  }
106 
107  void log(safs::request_range reqs[], int num) {
108  std::vector<safs::request_range> *q = get_per_thread_queue();
109  q->insert(q->end(), reqs, reqs + num);
110  if (q->size() >= MAX_LOG_BUF) {
111  thread->add(q);
112  pthread_setspecific(queue_key, new std::vector<safs::request_range>());
113  }
114  }
115 
116  void close() {
117  // TODO we don't add all requests to the logging thread.
118  thread->close();
119  }
120 };
121 
122 }
123 
124 #endif
size_t get_size() const
Definition: io_request.h:271
const data_loc_t & get_loc() const
Definition: io_request.h:263
Definition: io_request.h:231
off_t get_offset() const
Definition: io_request.h:218