FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
io_request.h
1 #ifndef __IO_REQUEST_H__
2 #define __IO_REQUEST_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of SAFSlib.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <sys/uio.h>
24 #include <sys/time.h>
25 #include <limits.h>
26 
27 #include <algorithm>
28 #include <queue>
29 
30 #include "common.h"
31 #include "concurrency.h"
32 #include "container.h"
33 #include "parameters.h"
34 
35 namespace safs
36 {
37 
38 static const int PAGE_SIZE = 4096;
39 static const int LOG_PAGE_SIZE = 12;
40 
41 #define ROUND_PAGE(off) (((long) off) & (~((long) safs::PAGE_SIZE - 1)))
42 #define ROUNDUP_PAGE(off) (((long) off + safs::PAGE_SIZE - 1) & (~((long) safs::PAGE_SIZE - 1)))
43 
44 class thread_safe_page;
45 class io_interface;
46 
47 class io_buf
48 {
49  union {
50  thread_safe_page *p;
51  void *buf;
52  } u;
53  unsigned int size: 31;
54  unsigned int is_page: 1;
55 public:
56  io_buf() {
57  u.buf = NULL;
58  size = 0;
59  is_page = 0;
60  }
61 
62  void init(void *p, int size, bool is_page) {
63  u.buf = p;
64  if (is_page)
65  assert(size == PAGE_SIZE);
66  this->size = size;
67  this->is_page = is_page;
68  }
69 
70  void init(thread_safe_page *p);
71  void init(void *buf, int size) {
72  u.buf = buf;
73  this->size = size;
74  is_page = 0;
75  }
76 
77  void *get_buf() const;
78 
79  int get_size() const {
80  return size;
81  }
82 
83  thread_safe_page *get_page() const {
84  assert(is_page);
85  return u.p;
86  }
87 };
88 
89 class io_request;
90 
91 class io_req_extension
92 {
93  void *priv;
94 
95  int num_bufs: 16;
96  int vec_capacity: 15;
97 
98  io_buf *vec_pointer;
99  io_buf embedded_vecs[NUM_EMBEDDED_IOVECS];
100 
101  struct timeval issue_time;
102 
103 public:
104  io_req_extension() {
105  vec_pointer = embedded_vecs;
106  vec_capacity = NUM_EMBEDDED_IOVECS;
107  init();
108  }
109 
110  ~io_req_extension() {
111  if (vec_pointer != embedded_vecs)
112  delete [] vec_pointer;
113  }
114 
115  bool is_valid() const {
116  // Valid extension must have vec_pointer points to embedded_vecs
117  // or an array.
118  return vec_pointer != NULL;
119  }
120 
121  void init() {
122  this->priv = NULL;
123  this->num_bufs = 0;
124  memset(vec_pointer, 0, vec_capacity * sizeof(io_buf));
125  memset(&issue_time, 0, sizeof(issue_time));
126  }
127 
128  void init(const io_req_extension &ext) {
129  this->priv = ext.priv;
130  this->num_bufs = ext.num_bufs;
131  assert(this->vec_capacity >= ext.vec_capacity);
132  memcpy(vec_pointer, ext.vec_pointer, num_bufs * sizeof(*vec_pointer));
133  memset(&issue_time, 0, sizeof(issue_time));
134  }
135 
136  void *get_priv() const {
137  return priv;
138  }
139 
140  void set_priv(void *priv) {
141  this->priv = priv;
142  }
143 
144  void set_timestamp() {
145  gettimeofday(&issue_time, NULL);
146  }
147 
148  struct timeval get_timestamp() {
149  return issue_time;
150  }
151 
152  void add_io_buf(const io_buf &buf);
153  void add_buf(char *buf, int size, bool is_page);
154  void add_buf_front(char *buf, int size, bool is_page);
155  int get_num_bufs() const {
156  return num_bufs;
157  }
158 
159  const io_buf &get_buf(int idx) const {
160  return vec_pointer[idx];
161  }
162 
163  int get_size() const {
164  ssize_t size = 0;
165  for (int i = 0; i < num_bufs; i++)
166  size += vec_pointer[i].get_size();
167  return size;
168  }
169 };
170 
171 const int MAX_INLINE_SIZE=128;
172 
176 typedef int file_id_t;
180 const int INVALID_FILE_ID = -1;
181 
187 {
188  file_id_t file_id;
189  off_t off;
190 public:
191  data_loc_t() {
192  file_id = -1;
193  off = -1;
194  }
195 
201  data_loc_t(file_id_t file_id, off_t off) {
202  this->file_id = file_id;
203  this->off = off;
204  }
205 
211  return file_id;
212  }
213 
218  off_t get_offset() const {
219  return off;
220  }
221 };
222 
223 const data_loc_t INVALID_DATA_LOC;
224 
225 class user_compute;
226 
232 {
233  data_loc_t loc;
234  unsigned long size: 63;
235  unsigned long access_method: 1;
236  user_compute *compute;
237 public:
238  request_range() {
239  size = 0;
240  access_method = 0;
241  compute = NULL;
242  }
243 
251  request_range(const data_loc_t &loc, size_t size, int access_method,
252  user_compute *compute) {
253  this->loc = loc;
254  this->size = size;
255  this->access_method = access_method & 0x1;
256  this->compute = compute;
257  }
258 
263  const data_loc_t &get_loc() const {
264  return loc;
265  };
266 
271  size_t get_size() const {
272  return size;
273  }
274 
279  int get_access_method() const {
280  return access_method & 0x1;
281  }
282 
288  return compute;
289  }
290 
295  void set_compute(user_compute *compute) {
296  this->compute = compute;
297  }
298 };
299 
300 typedef fifo_queue<io_request> user_comp_req_queue;
301 class page_byte_array;
302 class compute_allocator;
303 
317 {
318  compute_allocator *alloc;
319  atomic_flags<int> flags;
320  int num_refs;
321 public:
322  enum {
323  IN_QUEUE,
324  };
325 
331  this->alloc = alloc;
332  }
333 
339  return alloc;
340  }
341 
342  virtual ~user_compute() {
343  }
344 
351  virtual int serialize(char *buf, int size) const = 0;
357  virtual int get_serialized_size() const = 0;
365  virtual void run(page_byte_array &arr) = 0;
366 
371  virtual bool has_completed() = 0;
372 
378  virtual int has_requests() = 0;
379 
386  virtual request_range get_next_request() = 0;
387 
388  virtual void set_flag(int flag, bool value) {
389  if (value)
390  flags.set_flag(flag);
391  else
392  flags.clear_flag(flag);
393  }
394 
395  virtual bool test_flag(int flag) const {
396  return flags.test_flag(flag);
397  }
398 
403  virtual void set_scan_dir(bool forward) {
404  }
405 
413  bool fetch_request(io_interface *io, io_request &req);
414 
425  int fetch_requests(io_interface *io, user_comp_req_queue &reqs,
426  int max_fetch);
427 
431  void inc_ref() {
432  num_refs++;
433  }
434 
438  void dec_ref() {
439  num_refs--;
440  }
441 
446  int get_ref() const {
447  return num_refs;
448  }
449 };
450 
455 {
456 public:
457  virtual ~compute_allocator() {
458  }
463  virtual user_compute *alloc() = 0;
468  virtual void free(user_compute *compute) = 0;
469 };
470 
492 {
493  static const off_t MAX_FILE_SIZE = LONG_MAX;
494  static const int MAX_NODE_ID = (1 << 8) - 1;
495 
496  size_t buf_size;
497  off_t offset;
498 
499  // These two flags decide how the payload is interpreted, so they are
500  // initialized when the object is created and can't be changed manually.
501  // The only case that payload_type is changed is when
502  // another request object is copied to this request object.
503  // What needs to be guaranteed is that the pointer in the payload always
504  // points to the object of the right type.
505  // However, data_inline is never changed. It is usually false. It is
506  // used when to interpret the io requests in a message.
507  unsigned int payload_type: 2;
508  unsigned int data_inline: 1;
509 
510  unsigned int access_method: 1;
511  // Is this synchronous IO?
512  unsigned int sync: 1;
513  unsigned int high_prio: 1;
514  unsigned int low_latency: 1;
515  unsigned int discarded: 1;
516  unsigned int node_id: 8;
517  int file_id;
518 
519  io_interface *io;
520  void *user_data;
521 
522  union {
526  void *buf_addr;
527  user_compute *compute;
528 
532  io_req_extension *ext;
533 
538  char buf[0];
539  } payload;
540 
541  void use_default_flags() {
542  sync = 0;
543  high_prio = 1;
544  low_latency = 0;
545  discarded = 0;
546  }
547 
548  void copy_flags(const io_request &req) {
549  this->sync = req.sync;
550  this->high_prio = req.high_prio;
551  this->low_latency = req.low_latency;
552  }
553 
554  void set_int_buf_size(size_t size) {
555  buf_size = size;
556  }
557 
558  size_t get_int_buf_size() const {
559  return buf_size;
560  }
561 
562 public:
563  enum {
564  BASIC_REQ,
565  EXT_REQ,
566  USER_COMPUTE,
567  };
568 
569  static size_t get_max_req_size() {
570  return std::numeric_limits<size_t>::max();
571  }
572 
573  // By default, a request is initialized as a flush request.
574  explicit io_request(bool sync = false) {
575  memset(this, 0, sizeof(*this));
576  payload_type = BASIC_REQ;
577  use_default_flags();
578  this->sync = sync;
579  }
580 
591  io_request(char *buf, const data_loc_t &loc, ssize_t size,
592  int access_method, io_interface *io = NULL,
593  int node_id = MAX_NODE_ID, bool sync = false) {
594  payload_type = BASIC_REQ;
595  data_inline = 0;
596  user_data = NULL;
597  init(buf, loc, size, access_method, io, node_id);
598  use_default_flags();
599  this->sync = sync;
600  }
601 
602  io_request(io_req_extension *ext, const data_loc_t &loc, int access_method,
603  io_interface *io = NULL, int node_id = MAX_NODE_ID) {
604  payload_type = EXT_REQ;
605  data_inline = 0;
606  payload.ext = ext;
607  user_data = NULL;
608  init(NULL, loc, 0, access_method, io, node_id);
609  use_default_flags();
610  this->sync = false;
611  }
612 
622  io_request(user_compute *compute, const data_loc_t &loc, ssize_t size,
623  int access_method, io_interface *io = NULL,
624  int node_id = MAX_NODE_ID) {
625  payload_type = USER_COMPUTE;
626  data_inline = 0;
627  user_data = NULL;
628  init(NULL, loc, size, access_method, io, node_id);
629  payload.compute = compute;
630  use_default_flags();
631  this->sync = false;
632  }
633 
634  void init(const io_request &req) {
635  data_loc_t loc(req.get_file_id(), req.get_offset());
636  assert(!data_inline);
637  if (req.payload_type == USER_COMPUTE
638  || this->payload_type == USER_COMPUTE) {
639  assert(req.payload_type == USER_COMPUTE
640  && this->payload_type == USER_COMPUTE);
641  // TODO
642  }
643  else if (!req.is_extended_req()) {
644  this->init(req.get_buf(), loc, req.get_size(),
645  req.get_access_method(), req.get_io(), req.get_node_id());
646  }
647  // Both requests have extensions.
648  else if (this->is_extended_req()) {
649  this->init(NULL, loc, 0, req.get_access_method(),
650  req.get_io(), req.get_node_id());
651  this->get_extension()->init(*req.get_extension());
652  }
653  else {
654  // The last case is that this request doesn't have extension,
655  // but the given request has extension. This request can't keep
656  // all information in the given request.
657  assert(!this->is_extended_req() && req.is_extended_req());
658  }
659  copy_flags(req);
660  this->user_data = req.user_data;
661  }
662 
663  void init() {
664  data_inline = 0;
665  if (is_extended_req()) {
666  io_req_extension *ext = get_extension();
667  assert(ext);
668  ext->init();
669  }
670  else {
671  payload_type = BASIC_REQ;
672  payload.buf_addr = NULL;
673  }
674  file_id = 0;
675  offset = 0;
676  high_prio = 0;
677  sync = 0;
678  node_id = MAX_NODE_ID;
679  io = NULL;
680  access_method = 0;
681  set_int_buf_size(0);
682  user_data = NULL;
683  }
684 
685  void init(char *buf, const data_loc_t &loc, ssize_t size,
686  int access_method, io_interface *io, int node_id);
687  void init(const data_loc_t &loc, int access_method, io_interface *io,
688  int node_id) {
689  init(NULL, loc, 0, access_method, io, node_id);
690  }
691 
692  int get_req_type() const {
693  return payload_type;
694  }
695 
696  io_req_extension *get_extension() const {
697  assert(is_extended_req() && payload.ext);
698  if (data_inline)
699  return (io_req_extension *) payload.buf;
700  else
701  return payload.ext;
702  }
703 
704  file_id_t get_file_id() const;
705 
706  /*
707  * Test whether the request is a flush request.
708  * The difference of a sync'd request and a flush request is that
709  * a flush request isn't a valid request for accessing data.
710  */
711  bool is_flush() const {
712  return sync && high_prio && (payload.buf_addr == NULL);
713  }
714 
715  bool is_sync() const {
716  return sync;
717  }
718 
719  bool is_extended_req() const {
720  return payload_type == EXT_REQ;
721  }
722 
723  off_t get_offset() const {
724  return offset;
725  }
726 
727  void set_data_loc(const data_loc_t &loc) {
728  this->file_id = loc.get_file_id();
729  this->offset = loc.get_offset();
730  }
731 
732  int get_access_method() const {
733  return access_method & 0x1;
734  }
735 
736  void set_io(io_interface *io) {
737  this->io = io;
738  }
739 
740  io_interface *get_io() const {
741  return io;
742  }
743 
744  int get_node_id() const {
745  return node_id;
746  }
747 
748  void set_node_id(int node_id) {
749  assert(node_id <= MAX_NODE_ID);
750  this->node_id = node_id;
751  }
752 
753  bool is_discarded() const {
754  return (discarded & 0x1) == 1;
755  }
756 
757  void set_discarded(bool discarded) {
758  this->discarded = discarded;
759  }
760 
761  bool is_high_prio() const {
762  return (high_prio & 0x1) == 1;
763  }
764 
765  void set_high_prio(bool high_prio) {
766  this->high_prio = high_prio;
767  }
768 
769  bool is_low_latency() const {
770  return (low_latency & 0x1) == 1;
771  }
772 
773  void set_low_latency(bool low_latency) {
774  this->low_latency = low_latency;
775  }
776 
777  /*
778  * The requested data is inside a page on the disk.
779  */
780  bool within_1page() const {
781  return get_offset() + get_size() <= ROUND_PAGE(get_offset()) + PAGE_SIZE;
782  }
783 
784  bool contain_offset(off_t off) const {
785  return get_offset() <= off && off < get_offset() + get_size();
786  }
787 
788  /*
789  * Test if the request has overlap with the specified range.
790  */
791  bool has_overlap(off_t off, ssize_t size) const {
792  // the beginning of the range inside the input request
793  return (off >= this->get_offset() && off < this->get_offset()
794  + this->get_size())
795  // or the end of the range inside the input request
796  || (off + size >= this->get_offset()
797  && off + size < this->get_offset() + this->get_size())
798  // or the input request is inside the range.
799  || (off <= this->get_offset()
800  && off + size >= this->get_offset() + this->get_size());
801  }
802 
803  int get_overlap_size(thread_safe_page *pg) const;
804 
805  int get_num_covered_pages() const {
806  off_t begin_pg = ROUND_PAGE(get_offset());
807  off_t end_pg = ROUNDUP_PAGE(get_offset() + get_size());
808  return (end_pg - begin_pg) / PAGE_SIZE;
809  }
810 
811  bool inside_RAID_block(int block_size) const {
812  int block_size_bytes = block_size * PAGE_SIZE;
813  return ROUND(this->get_offset(), block_size_bytes)
814  == ROUND(this->get_offset() + this->get_size() - 1, block_size_bytes);
815  }
816 
817  void *get_user_data() const {
818  return user_data;
819  }
820 
821  void set_user_data(void *data) {
822  this->user_data = data;
823  }
824 
825  void *get_priv() const {
826  return get_extension()->get_priv();
827  }
828 
829  void set_priv(void *priv) {
830  get_extension()->set_priv(priv);
831  }
832 
833  bool is_empty() const {
834  return get_extension()->get_num_bufs() == 0;
835  }
836 
837  bool is_valid() const {
838  return get_offset() != -1;
839  }
840 
841  ssize_t get_size() const {
842  if (!is_extended_req()) {
843  return get_int_buf_size();
844  }
845  else {
846  return get_extension()->get_size();
847  }
848  }
849 
850  user_compute *get_compute() const {
851  assert(get_req_type() == io_request::USER_COMPUTE);
852  return payload.compute;
853  }
854 
855  /*
856  * By default, we get the first buffer. This makes sense
857  * for a single buffer request.
858  */
859  char *get_buf(int idx = 0) const {
860  if (!is_extended_req()) {
861  return (char *) payload.buf_addr;
862  }
863  else {
864  return (char *) get_extension()->get_buf(idx).get_buf();
865  }
866  }
867 
868  thread_safe_page *get_page(int idx) const {
869  return get_extension()->get_buf(idx).get_page();
870  }
871 
872  void add_buf(char *buf, int size) {
873  get_extension()->add_buf(buf, size, 0);
874  }
875 
876  void add_page(thread_safe_page *p) {
877  get_extension()->add_buf((char *) p, PAGE_SIZE, 1);
878  }
879 
880  void add_io_buf(const io_buf &buf) {
881  get_extension()->add_io_buf(buf);
882  }
883 
884  void add_buf_front(char *buf, int size) {
885  get_extension()->add_buf_front(buf, size, 0);
886  }
887 
888  void add_page_front(thread_safe_page *p) {
889  get_extension()->add_buf_front((char *) p, PAGE_SIZE, 1);
890  }
891 
892  int get_num_bufs() const {
893  if (is_extended_req())
894  return get_extension()->get_num_bufs();
895  else
896  return 1;
897  }
898 
899  int get_buf_size(int idx) const {
900  if (!is_extended_req()) {
901  assert(idx == 0);
902  return get_int_buf_size();
903  }
904  else
905  return get_extension()->get_buf(idx).get_size();
906  }
907 
908  const io_buf &get_io_buf(int idx) const {
909  return get_extension()->get_buf(idx);
910  }
911 
912  const struct iovec get(int idx) const {
913  struct iovec ret;
914  io_req_extension *ext = get_extension();
915  ret.iov_base = ext->get_buf(idx).get_buf();
916  ret.iov_len = ext->get_buf(idx).get_size();
917  return ret;
918  }
919 
920  const int get_vec(struct iovec *vec, int num) const {
921  num = min(get_num_bufs(), num);
922  for (int i = 0; i < num; i++) {
923  io_req_extension *ext = get_extension();
924  vec[i].iov_base = ext->get_buf(i).get_buf();
925  vec[i].iov_len = ext->get_buf(i).get_size();
926  }
927  return num;
928  }
929 
930  bool is_data_inline() const {
931  return data_inline == 1;
932  }
933 
934  void set_timestamp() {
935  get_extension()->set_timestamp();
936  }
937 
938  struct timeval get_timestamp() {
939  return get_extension()->get_timestamp();
940  }
941 
942  /*
943  * Extract a request from the input request.
944  * The extract request is within the range [off, off + size).
945  */
946  void extract(off_t off, int size, io_request &extracted) const {
947  off_t req_off;
948  char *req_buf;
949  ssize_t req_size;
950  assert(get_num_bufs() == 1);
951  // We have to make sure the extracted range has overlap with
952  // the input request.
953  bool check = has_overlap(off, size);
954  if (!check)
955  fprintf(stderr, "req %lx, size: %lx, page off: %lx\n",
956  this->get_offset(), this->get_size(), off);
957  assert(check);
958  // this is the first page in the request.
959  if (off <= this->get_offset()) {
960  req_off = this->get_offset();
961  req_buf = this->get_buf();
962  }
963  else {
964  req_off = off;
965  /*
966  * We can't be sure if the request buffer is aligned
967  * with the page size.
968  */
969  req_buf = this->get_buf() + (off - this->get_offset());
970  }
971  req_size = min(off + size - req_off,
972  this->get_offset() + this->get_size() - req_off);
973  data_loc_t loc(this->get_file_id(), req_off);
974  extracted.init(req_buf, loc, req_size, this->get_access_method(),
975  this->get_io(), this->get_node_id());
976  }
977 
978  /*
979  * We need to serialize an io request to a buffer so it can be sent to
980  * another thread.
981  * @accept_inline: indicates whether the IO request can inline data
982  * to the buffer.
983  */
984  int serialize(char *buf, int size, bool accept_inline) {
985  int serialized_size;
986  if (is_data_inline()) {
987  assert(accept_inline);
988  serialized_size = get_serialized_size();
989  assert(serialized_size <= size);
990  memcpy(buf, this, serialized_size);
991  }
992  else if (payload_type == EXT_REQ) {
993  // We never serialize the io request extension to the message.
994  serialized_size = sizeof(io_request);
995  assert(serialized_size <= size);
996  memcpy(buf, this, sizeof(*this));
997  }
998  else if (payload_type == BASIC_REQ) {
999  // We only serialize the data buffer to the message for write
1000  // requests. The size of the data buffer has to be small.
1001  if (get_size() > MAX_INLINE_SIZE || access_method == READ
1002  || !accept_inline) {
1003  serialized_size = sizeof(io_request);
1004  assert(serialized_size <= size);
1005  memcpy(buf, this, sizeof(*this));
1006  }
1007  else {
1008  serialized_size = sizeof(io_request) - sizeof(this->payload)
1009  + get_size();
1010  assert(serialized_size <= size);
1011  memcpy(buf, this, sizeof(*this));
1012  io_request *p = (io_request *) buf;
1013  p->data_inline = 1;
1014  memcpy(p->payload.buf, (char *) this->payload.buf_addr,
1015  this->get_size());
1016  }
1017  }
1018  else {
1019  // The user compute object is always serialized to the message.
1020  serialized_size = sizeof(io_request);
1021  assert(serialized_size <= size && sizeof(io_request)
1022  <= (unsigned) size);
1023  memcpy(buf, this, serialized_size);
1024  }
1025  return serialized_size;
1026  }
1027 
1028  /*
1029  * This method returns the size of an IO request after it is serialized.
1030  */
1031  int get_serialized_size() const {
1032  if (payload_type == EXT_REQ)
1033  return sizeof(io_request);
1034  else if (payload_type == BASIC_REQ && (get_size() > MAX_INLINE_SIZE
1035  || access_method == READ))
1036  return sizeof(io_request);
1037  else if (payload_type == BASIC_REQ)
1038  return sizeof(io_request) - sizeof(this->payload) + get_size();
1039  else {
1040  return sizeof(io_request);
1041  }
1042  }
1043 
1044  /*
1045  * This method deserialize an request from the buffer.
1046  * If the request data is inline in the buffer, instead of allocating
1047  * memory for the extra objects of the IO request, the extra objects
1048  * will stay in the buffer and the created request will point to the buffer.
1049  */
1050  static void deserialize(io_request &req, char *buf, int size) {
1051  assert((unsigned) size >= sizeof(io_request));
1052  io_request *p = (io_request *) buf;
1053  memcpy(&req, buf, sizeof(req));
1054  if (req.is_data_inline()) {
1055  assert(req.payload_type != EXT_REQ);
1056  switch(req.payload_type) {
1057  case BASIC_REQ:
1058  req.payload.buf_addr = p->payload.buf;
1059  break;
1060  case USER_COMPUTE:
1061  req.payload.compute = (user_compute *) p->payload.buf;
1062  break;
1063  default:
1064  assert(0);
1065  }
1066  req.data_inline = 0;
1067  }
1068  }
1069 
1070  static io_request *deserialize(char *buf, int size) {
1071  assert((unsigned) size >= sizeof(io_request));
1072  io_request *ret = (io_request *) buf;
1073  assert(ret->get_serialized_size() <= size);
1074  return ret;
1075  }
1076 };
1077 
1078 template<class req_type, class get_io_func>
1079 class comp_req_io
1080 {
1081  get_io_func get_io;
1082 
1083 public:
1084  comp_req_io(get_io_func func) {
1085  this->get_io = func;
1086  }
1087 
1088  bool operator() (const req_type req1, const req_type req2) {
1089  return (long) get_io(req1) < (long) get_io(req2);
1090  }
1091 };
1092 
1093 typedef void (*req_process_func_t)(io_interface *io, io_request *reqs[], int num);
1094 /*
1095  * Perform the same function to the requests with the same IO instance.
1096  * It turns out it's a common function when delivering requests to
1097  * the upper layer.
1098  */
1099 template<class req_type, class get_io_func, class process_req_func>
1100 void process_reqs_on_io(req_type reqs[], int num,
1101  get_io_func func, process_req_func proc_func)
1102 {
1103  comp_req_io<req_type, get_io_func> req_io_comparator(func);
1104 
1105  std::sort(reqs, reqs + num, req_io_comparator);
1106  io_interface *prev = func(reqs[0]);
1107  int begin_idx = 0;
1108  for (int end_idx = 1; end_idx < num; end_idx++) {
1109  if (func(reqs[end_idx]) != prev) {
1110  proc_func(prev, reqs + begin_idx, end_idx - begin_idx);
1111  begin_idx = end_idx;
1112  prev = func(reqs[end_idx]);
1113  }
1114  }
1115  assert(begin_idx < num);
1116  proc_func(prev, reqs + begin_idx, num - begin_idx);
1117 }
1118 
1119 static inline void process_reqs_on_io(io_request *reqs[],
1120  int num, req_process_func_t func)
1121 {
1122  class get_io_func {
1123  public:
1124  io_interface *operator()(io_request *req) {
1125  return req->get_io();
1126  }
1127  } io_func;
1128 
1129  class process_req_func {
1130  req_process_func_t func;
1131  public:
1132  process_req_func(req_process_func_t func) {
1133  this->func = func;
1134  }
1135 
1136  void operator()(io_interface *io, io_request *reqs[], int num) {
1137  func(io, reqs, num);
1138  }
1139  } proc_func(func);
1140 
1141  process_reqs_on_io(reqs, num, io_func, proc_func);
1142 }
1143 
1144 }
1145 
1146 #endif
Definition: io_request.h:491
char buf[0]
Definition: io_request.h:538
virtual user_compute * alloc()=0
size_t get_size() const
Definition: io_request.h:271
virtual int serialize(char *buf, int size) const =0
const data_loc_t & get_loc() const
Definition: io_request.h:263
const int INVALID_FILE_ID
Definition: io_request.h:180
Definition: io_request.h:231
io_req_extension * ext
Definition: io_request.h:532
virtual void set_scan_dir(bool forward)
Definition: io_request.h:403
file_id_t get_file_id() const
Definition: io_request.h:210
Definition: io_request.h:186
Definition: io_request.h:316
int fetch_requests(io_interface *io, user_comp_req_queue &reqs, int max_fetch)
virtual void free(user_compute *compute)=0
off_t get_offset() const
Definition: io_request.h:218
void dec_ref()
Definition: io_request.h:438
void inc_ref()
Definition: io_request.h:431
Definition: io_interface.h:154
virtual int has_requests()=0
io_request(user_compute *compute, const data_loc_t &loc, ssize_t size, int access_method, io_interface *io=NULL, int node_id=MAX_NODE_ID)
Definition: io_request.h:622
void set_compute(user_compute *compute)
Definition: io_request.h:295
virtual void run(page_byte_array &arr)=0
void * buf_addr
Definition: io_request.h:526
virtual request_range get_next_request()=0
Definition: io_request.h:454
io_request(char *buf, const data_loc_t &loc, ssize_t size, int access_method, io_interface *io=NULL, int node_id=MAX_NODE_ID, bool sync=false)
Definition: io_request.h:591
request_range(const data_loc_t &loc, size_t size, int access_method, user_compute *compute)
Definition: io_request.h:251
data_loc_t(file_id_t file_id, off_t off)
Definition: io_request.h:201
bool fetch_request(io_interface *io, io_request &req)
int get_access_method() const
Definition: io_request.h:279
compute_allocator * get_allocator() const
Definition: io_request.h:338
int get_ref() const
Definition: io_request.h:446
user_compute * get_compute() const
Definition: io_request.h:287
virtual bool has_completed()=0
user_compute(compute_allocator *alloc)
Definition: io_request.h:330
int file_id_t
Definition: io_request.h:176
virtual int get_serialized_size() const =0