Flex  0.17.9
graph_db_session.h
Go to the documentation of this file.
1 
16 #ifndef GRAPHSCOPE_DATABASE_GRAPH_DB_SESSION_H_
17 #define GRAPHSCOPE_DATABASE_GRAPH_DB_SESSION_H_
18 
29 #include "flex/utils/result.h"
30 
31 namespace gs {
32 
33 class GraphDB;
34 class IWalWriter;
35 
37  public:
38  enum class InputFormat : uint8_t {
39  kCppEncoder = 0,
40  kCypherJson = 1, // Json format for cypher query
41  kCypherProtoAdhoc = 2, // Protobuf format for adhoc query
42  kCypherProtoProcedure = 3, // Protobuf format for procedure query
43  kCypherString = 4,
44  };
45 
46  static constexpr int32_t MAX_RETRY = 3;
47  static constexpr int32_t MAX_PLUGIN_NUM = 256; // 2^(sizeof(uint8_t)*8)
48  static constexpr const char* kCppEncoderStr = "\x00";
49  static constexpr const char* kCypherJsonStr = "\x01";
50  static constexpr const char* kCypherProtoAdhocStr = "\x02";
51  static constexpr const char* kCypherProtoProcedureStr = "\x03";
53  const std::string& work_dir, int thread_id)
54  : db_(db),
55  alloc_(alloc),
56  logger_(logger),
57  work_dir_(work_dir),
58  thread_id_(thread_id),
59  eval_duration_(0),
60  query_num_(0) {
61  for (auto& app : apps_) {
62  app = nullptr;
63  }
64  }
66 
68 
70 
72 
74 
76 
78 
79  bool BatchUpdate(UpdateBatch& batch);
80 
81  const MutablePropertyFragment& graph() const;
83  const GraphDB& db() const;
84 
85  const Schema& schema() const;
86 
87  std::shared_ptr<ColumnBase> get_vertex_property_column(
88  uint8_t label, const std::string& col_name) const;
89 
90  // Get vertex id column.
91  std::shared_ptr<RefColumnBase> get_vertex_id_column(uint8_t label) const;
92 
93  Result<std::vector<char>> Eval(const std::string& input);
94 
95  void GetAppInfo(Encoder& result);
96 
97  int SessionId() const;
98 
99  bool Compact();
100 
101  double eval_duration() const;
102 
103  const AppMetric& GetAppMetric(int idx) const;
104 
105  int64_t query_num() const;
106 
107  AppBase* GetApp(int idx);
108 
109  AppBase* GetApp(const std::string& name);
110 
111  private:
113  parse_query_type_from_cypher_json(const std::string_view& input);
115  parse_query_type_from_cypher_internal(const std::string_view& input);
143  const std::string& input) {
144  const char* str_data = input.data();
145  VLOG(10) << "parse query type for " << input << " size: " << input.size();
146  char input_tag = input.back();
147  VLOG(10) << "input tag: " << static_cast<int>(input_tag);
148  size_t len = input.size();
149  if (input_tag == static_cast<uint8_t>(InputFormat::kCppEncoder)) {
150  // For cpp encoder, the query id is the second last byte, others are all
151  // user-defined payload,
152  return std::make_pair((uint8_t) input[len - 2],
153  std::string_view(str_data, len - 2));
154  } else if (input_tag ==
155  static_cast<uint8_t>(InputFormat::kCypherProtoAdhoc)) {
156  // For cypher internal adhoc, the query id is the
157  // second last byte,which is fixed to 255, and other bytes are a string
158  // representing the path to generated dynamic lib.
159  return std::make_pair((uint8_t) input[len - 2],
160  std::string_view(str_data, len - 1));
161  } else if (input_tag == static_cast<uint8_t>(InputFormat::kCypherJson)) {
162  // For cypherJson there is no query-id provided. The query name is
163  // provided in the json string.
164  // We don't discard the last byte, since we need it to determine the input
165  // format when deserializing the input arguments in deserialize() function
166  std::string_view str_view(input.data(), len);
167  return parse_query_type_from_cypher_json(str_view);
168  } else if (input_tag ==
169  static_cast<uint8_t>(InputFormat::kCypherProtoProcedure)) {
170  // For cypher internal procedure, the query_name is
171  // provided in the protobuf message.
172  // Same as cypherJson, we don't discard the last byte.
173  std::string_view str_view(input.data(), len);
174  return parse_query_type_from_cypher_internal(str_view);
175  } else if (input_tag == static_cast<uint8_t>(InputFormat::kCypherString)) {
176  return std::make_pair((uint8_t) input[len - 2],
177  std::string_view(str_data, len - 1));
178  } else {
180  gs::Status(StatusCode::INVALID_ARGUMENT,
181  "Invalid input tag: " + std::to_string(input_tag)));
182  }
183  }
187  std::string work_dir_;
189 
190  std::array<AppWrapper, MAX_PLUGIN_NUM> app_wrappers_;
191  std::array<AppBase*, MAX_PLUGIN_NUM> apps_;
192  std::array<AppMetric, MAX_PLUGIN_NUM> app_metrics_;
193 
194  std::atomic<int64_t> eval_duration_;
195  std::atomic<int64_t> query_num_;
196 };
197 
198 } // namespace gs
199 
200 #endif // GRAPHSCOPE_DATABASE_GRAPH_DB_SESSION_H_
gs::Result
Definition: result.h:62
gs::GraphDBSession::apps_
std::array< AppBase *, MAX_PLUGIN_NUM > apps_
Definition: graph_db_session.h:191
gs::GraphDBSession::kCppEncoderStr
static constexpr const char * kCppEncoderStr
Definition: graph_db_session.h:48
gs::GraphDBSession::Eval
Result< std::vector< char > > Eval(const std::string &input)
Definition: graph_db_session.cc:110
gs::GraphDBSession::kCypherProtoAdhocStr
static constexpr const char * kCypherProtoAdhocStr
Definition: graph_db_session.h:50
gs::GraphDBSession::GetAppInfo
void GetAppInfo(Encoder &result)
Definition: graph_db_session.cc:189
single_vertex_insert_transaction.h
gs::GraphDBSession::GetSingleEdgeInsertTransaction
SingleEdgeInsertTransaction GetSingleEdgeInsertTransaction()
Definition: graph_db_session.cc:48
gs::GraphDBSession::GetAppMetric
const AppMetric & GetAppMetric(int idx) const
Definition: graph_db_session.cc:311
gs::GraphDBSession::db_
GraphDB & db_
Definition: graph_db_session.h:184
gs::CompactTransaction
Definition: compact_transaction.h:28
gs::GraphDBSession::InputFormat::kCppEncoder
@ kCppEncoder
gs::GraphDBSession::alloc_
Allocator & alloc_
Definition: graph_db_session.h:185
gs::GraphDBSession::GetApp
AppBase * GetApp(int idx)
Definition: graph_db_session.cc:227
column.h
gs::GraphDBSession::logger_
IWalWriter & logger_
Definition: graph_db_session.h:186
gs::GraphDBSession::parse_query_type_from_cypher_internal
Result< std::pair< uint8_t, std::string_view > > parse_query_type_from_cypher_internal(const std::string_view &input)
Definition: graph_db_session.cc:281
gs::GraphDBSession::InputFormat
InputFormat
Definition: graph_db_session.h:38
gs::InsertTransaction
Definition: insert_transaction.h:34
gs::GraphDBSession::db
const GraphDB & db() const
Definition: graph_db_session.cc:68
std::to_string
std::string to_string(const gs::flex::interactive::Code &status)
Definition: result.h:166
gs
Definition: adj_list.h:23
single_edge_insert_transaction.h
gs::GraphDBSession::GetUpdateTransaction
UpdateTransaction GetUpdateTransaction()
Definition: graph_db_session.cc:54
gs::GraphDBSession::InputFormat::kCypherString
@ kCypherString
insert_transaction.h
gs::GraphDBSession::get_vertex_id_column
std::shared_ptr< RefColumnBase > get_vertex_id_column(uint8_t label) const
Definition: graph_db_session.cc:79
gs::GraphDBSession::GetReadTransaction
ReadTransaction GetReadTransaction() const
Definition: graph_db_session.cc:30
gs::Encoder
Definition: app_utils.h:25
gs::GraphDBSession::query_num_
std::atomic< int64_t > query_num_
Definition: graph_db_session.h:195
gs::SingleEdgeInsertTransaction
Definition: single_edge_insert_transaction.h:30
mutable_property_fragment.h
gs::ArenaAllocator
Definition: allocators.h:29
gs::GraphDBSession::Compact
bool Compact()
Definition: graph_db_session.cc:198
gs::MutablePropertyFragment
Definition: mutable_property_fragment.h:37
gs::GraphDBSession::~GraphDBSession
~GraphDBSession()
Definition: graph_db_session.h:65
gs::GraphDBSession::graph
const MutablePropertyFragment & graph() const
Definition: graph_db_session.cc:64
gs::GraphDBSession::parse_query_type_from_cypher_json
Result< std::pair< uint8_t, std::string_view > > parse_query_type_from_cypher_json(const std::string_view &input)
Definition: graph_db_session.cc:254
update_transaction.h
gs::AppBase
Definition: app_base.h:35
gs::GraphDBSession::eval_duration_
std::atomic< int64_t > eval_duration_
Definition: graph_db_session.h:194
gs::GraphDBSession::BatchUpdate
bool BatchUpdate(UpdateBatch &batch)
Definition: graph_db_session.cc:60
gs::Schema
Definition: schema.h:29
gs::GraphDBSession::GraphDBSession
GraphDBSession(GraphDB &db, Allocator &alloc, IWalWriter &logger, const std::string &work_dir, int thread_id)
Definition: graph_db_session.h:52
gs::GraphDBSession
Definition: graph_db_session.h:36
gs::GraphDB
Definition: graph_db.h:77
gs::GraphDBSession::GetSingleVertexInsertTransaction
SingleVertexInsertTransaction GetSingleVertexInsertTransaction()
Definition: graph_db_session.cc:42
gs::GraphDBSession::thread_id_
int thread_id_
Definition: graph_db_session.h:188
gs::GraphDBSession::InputFormat::kCypherJson
@ kCypherJson
gs::GraphDBSession::app_metrics_
std::array< AppMetric, MAX_PLUGIN_NUM > app_metrics_
Definition: graph_db_session.h:192
gs::Status
Definition: result.h:32
gs::ReadTransaction
Definition: read_transaction.h:375
gs::SingleVertexInsertTransaction
Definition: single_vertex_insert_transaction.h:30
result.h
gs::GraphDBSession::query_num
int64_t query_num() const
Definition: graph_db_session.cc:214
gs::GraphDBSession::app_wrappers_
std::array< AppWrapper, MAX_PLUGIN_NUM > app_wrappers_
Definition: graph_db_session.h:190
gs::GraphDBSession::InputFormat::kCypherProtoProcedure
@ kCypherProtoProcedure
gs::GraphDBSession::kCypherProtoProcedureStr
static constexpr const char * kCypherProtoProcedureStr
Definition: graph_db_session.h:51
gs::GraphDBSession::MAX_PLUGIN_NUM
static constexpr int32_t MAX_PLUGIN_NUM
Definition: graph_db_session.h:47
gs::GraphDBSession::GetInsertTransaction
InsertTransaction GetInsertTransaction()
Definition: graph_db_session.cc:35
gs::GraphDBSession::parse_query_type
Result< std::pair< uint8_t, std::string_view > > parse_query_type(const std::string &input)
Parse the input format of the query. There are four formats: 0. CppEncoder: This format will be used ...
Definition: graph_db_session.h:142
gs::GraphDBSession::MAX_RETRY
static constexpr int32_t MAX_RETRY
Definition: graph_db_session.h:46
compact_transaction.h
gs::GraphDBSession::work_dir_
std::string work_dir_
Definition: graph_db_session.h:187
app_base.h
gs::GraphDBSession::schema
const Schema & schema() const
Definition: graph_db_session.cc:72
gs::UpdateTransaction
Definition: update_transaction.h:39
transaction_utils.h
gs::GraphDBSession::SessionId
int SessionId() const
Definition: graph_db_session.cc:191
gs::GraphDBSession::eval_duration
double eval_duration() const
Definition: graph_db_session.cc:210
read_transaction.h
gs::GraphDBSession::GetCompactTransaction
CompactTransaction GetCompactTransaction()
Definition: graph_db_session.cc:193
gs::GraphDBSession::InputFormat::kCypherProtoAdhoc
@ kCypherProtoAdhoc
gs::GraphDBSession::kCypherJsonStr
static constexpr const char * kCypherJsonStr
Definition: graph_db_session.h:49
gs::IWalWriter
Definition: wal.h:57
gs::AppMetric
Definition: app_base.h:139
gs::UpdateBatch
Definition: transaction_utils.h:113
gs::GraphDBSession::get_vertex_property_column
std::shared_ptr< ColumnBase > get_vertex_property_column(uint8_t label, const std::string &col_name) const
Definition: graph_db_session.cc:74