FlashGraph-ng
A new frontier in large-scale graph analysis and data mining
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
message_processor.h
1 #ifndef __MESSAGE_PROCESSOR_H__
2 #define __MESSAGE_PROCESSOR_H__
3 
4 /*
5  * Copyright 2014 Open Connectome Project (http://openconnecto.me)
6  * Written by Da Zheng (zhengda1936@gmail.com)
7  *
8  * This file is part of FlashGraph.
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  */
22 
23 #include <memory>
24 
25 #include "container.h"
26 
27 #include "messaging.h"
28 #include "vertex.h"
29 
30 namespace fg
31 {
32 
33 class graph_engine;
34 class worker_thread;
35 class steal_state_t;
36 class compute_vertex;
37 class compute_vertex_pointer;
38 
39 /*
40  * This class is to process the messages sent to the owner thread.
41  * The complexity here is that some messages can't be processed when
42  * we try to process them. Therefore, we need an extra buffer to keep them
43  * and process them later.
44  */
45 class message_processor
46 {
47  graph_engine &graph;
48  worker_thread &owner;
49 
50  std::shared_ptr<slab_allocator> msg_alloc;
51 
52  // The queue of messages sent from other threads.
53  msg_queue msg_q;
54 
55  // The thread state of handling stealing vertices.
56  std::unique_ptr<steal_state_t> steal_state;
57 
58  // This is a message buffer to keep all messages whose destination vertices
59  // have been stolen by other threads.
60  fifo_queue<message> stolenv_msgs;
61 
62  void buf_msg(vertex_message &msg);
63  void buf_mmsg(local_vid_t id, multicast_message &mmsg);
64 
65  void process_msg(message &msg, bool check_steal);
66  void process_multicast_msg(multicast_message &mmsg, bool check_steal);
67 
68 public:
69  message_processor(graph_engine &_graph, worker_thread &_owner,
70  std::shared_ptr<slab_allocator> msg_alloc);
71 
72  void process_msgs();
73 
74  void steal_vertices(compute_vertex_pointer vertices[], int num);
75  void return_vertices(vertex_id_t ids[], int num);
76 
77  msg_queue &get_msg_queue() {
78  return msg_q;
79  }
80 
81  void reset();
82 };
83 
84 }
85 
86 #endif