Flex  0.17.9
basic_fragment_loader.h
Go to the documentation of this file.
1 
16 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
17 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
18 
22 
23 namespace gs {
24 
25 template <typename EDATA_T>
27  PropertyType edge_property) {
28  if (es == EdgeStrategy::kSingle) {
29  return new SingleMutableCsr<EDATA_T>(edge_property);
30  } else if (es == EdgeStrategy::kMultiple) {
31  return new MutableCsr<EDATA_T>(edge_property);
32  } else if (es == EdgeStrategy::kNone) {
33  return new EmptyCsr<EDATA_T>();
34  }
35  LOG(FATAL) << "not support edge strategy or edge data type";
36  return nullptr;
37 }
38 
39 enum class LoadingStatus {
40  kLoading = 0,
41  kLoaded = 1,
42  kCommited = 2,
43  kUnknown = 3,
44 };
45 
46 // define << and >> for LoadingStatus
47 std::ostream& operator<<(std::ostream& os, const LoadingStatus& status);
48 
49 std::istream& operator>>(std::istream& is, LoadingStatus& status);
50 
51 // FragmentLoader should use this BasicFragmentLoader to construct
52 // mutable_csr_fragment.
54  public:
55  BasicFragmentLoader(const Schema& schema, const std::string& prefix);
56 
57  void LoadFragment();
58 
59  inline void SetVertexProperty(label_t v_label, size_t col_ind, vid_t vid,
60  Any&& prop) {
61  auto& table = vertex_data_[v_label];
62  auto dst_columns = table.column_ptrs();
63  CHECK(col_ind < dst_columns.size());
64  dst_columns[col_ind]->set_any(vid, prop);
65  }
66 #ifndef USE_PTHASH
67  template <typename KEY_T>
69  const IdIndexer<KEY_T, vid_t>& indexer) {
70  CHECK(v_label < vertex_label_num_);
71  std::string filename =
73  auto primary_keys = schema_.get_vertex_primary_key(v_label);
74  auto type = std::get<0>(primary_keys[0]);
75 
76  build_lf_indexer<KEY_T, vid_t>(
77  indexer, LFIndexer<vid_t>::prefix() + "_" + filename,
79  type);
82  auto& v_data = vertex_data_[v_label];
83  auto label_name = schema_.get_vertex_label_name(v_label);
84  v_data.resize(lf_indexers_[v_label].size());
85  v_data.dump(vertex_table_prefix(label_name), snapshot_dir(work_dir_, 0));
87  }
88 #else
89  template <typename KEY_T>
90  void FinishAddingVertex(label_t v_label,
91  PTIndexerBuilder<KEY_T, vid_t>& indexer_builder) {
92  CHECK(v_label < vertex_label_num_);
93  std::string filename =
95  indexer_builder.finish(PTIndexer<vid_t>::prefix() + "_" + filename,
96  snapshot_dir(work_dir_, 0), lf_indexers_[v_label]);
99  auto& v_data = vertex_data_[v_label];
100  auto label_name = schema_.get_vertex_label_name(v_label);
101  v_data.resize(lf_indexers_[v_label].size());
102  v_data.dump(vertex_table_prefix(label_name), snapshot_dir(work_dir_, 0));
104  }
105 #endif
106 
107  template <typename EDATA_T>
108  void AddNoPropEdgeBatch(label_t src_label_id, label_t dst_label_id,
109  label_t edge_label_id) {
110  size_t index = src_label_id * vertex_label_num_ * edge_label_num_ +
111  dst_label_id * edge_label_num_ + edge_label_id;
112  CHECK(ie_[index] == NULL);
113  CHECK(oe_[index] == NULL);
114  auto src_label_name = schema_.get_vertex_label_name(src_label_id);
115  auto dst_label_name = schema_.get_vertex_label_name(dst_label_id);
116  auto edge_label_name = schema_.get_edge_label_name(edge_label_id);
118  src_label_name, dst_label_name, edge_label_name);
120  src_label_name, dst_label_name, edge_label_name);
121  if constexpr (std::is_same_v<EDATA_T, std::string_view>) {
122  const auto& prop = schema_.get_edge_properties(src_label_id, dst_label_id,
123  edge_label_id);
124 
125  size_t max_length = PropertyType::STRING_DEFAULT_MAX_LENGTH;
126  if (prop[0].IsVarchar()) {
127  max_length = prop[0].additional_type_info.max_length;
128  }
129  dual_csr_list_[index] =
130  new DualCsr<std::string_view>(oe_strategy, ie_strategy, max_length);
131  } else {
132  bool oe_mutable = schema_.outgoing_edge_mutable(
133  src_label_name, dst_label_name, edge_label_name);
134  bool ie_mutable = schema_.incoming_edge_mutable(
135  src_label_name, dst_label_name, edge_label_name);
136  dual_csr_list_[index] = new DualCsr<EDATA_T>(oe_strategy, ie_strategy,
137  oe_mutable, ie_mutable);
138  }
139  ie_[index] = dual_csr_list_[index]->GetInCsr();
140  oe_[index] = dual_csr_list_[index]->GetOutCsr();
141  dual_csr_list_[index]->BatchInit(
142  oe_prefix(src_label_name, dst_label_name, edge_label_name),
143  ie_prefix(src_label_name, dst_label_name, edge_label_name),
144  edata_prefix(src_label_name, dst_label_name, edge_label_name),
145  tmp_dir(work_dir_), {}, {});
146  }
147 
148  template <typename EDATA_T>
149  static decltype(auto) get_casted_dual_csr(DualCsrBase* dual_csr) {
150  if constexpr (std::is_same_v<EDATA_T, RecordView>) {
151  auto casted_dual_csr = dynamic_cast<DualCsr<RecordView>*>(dual_csr);
152  CHECK(casted_dual_csr != NULL);
153  return casted_dual_csr;
154  } else {
155  auto casted_dual_csr = dynamic_cast<DualCsr<EDATA_T>*>(dual_csr);
156  CHECK(casted_dual_csr != NULL);
157  return casted_dual_csr;
158  }
159  }
160  template <typename EDATA_T, typename VECTOR_T>
161  void PutEdges(label_t src_label_id, label_t dst_label_id,
162  label_t edge_label_id, const std::vector<VECTOR_T>& edges_vec,
163  const std::vector<int32_t>& ie_degree,
164  const std::vector<int32_t>& oe_degree, bool build_csr_in_mem) {
165  size_t index = src_label_id * vertex_label_num_ * edge_label_num_ +
166  dst_label_id * edge_label_num_ + edge_label_id;
167  auto dual_csr = dual_csr_list_[index];
168  CHECK(dual_csr != NULL);
169  auto casted_dual_csr = get_casted_dual_csr<EDATA_T>(dual_csr);
170  CHECK(casted_dual_csr != NULL);
171  auto& src_indexer = lf_indexers_[src_label_id];
172  auto& dst_indexer = lf_indexers_[dst_label_id];
173  auto src_label_name = schema_.get_vertex_label_name(src_label_id);
174  auto dst_label_name = schema_.get_vertex_label_name(dst_label_id);
175  auto edge_label_name = schema_.get_edge_label_name(edge_label_id);
176 
177  auto INVALID_VID = std::numeric_limits<vid_t>::max();
178  std::atomic<size_t> edge_count(0);
179  if constexpr (std::is_same_v<EDATA_T, std::string_view>) {
180  CHECK(ie_degree.size() == dst_indexer.size());
181  CHECK(oe_degree.size() == src_indexer.size());
182  if (build_csr_in_mem) {
183  dual_csr->BatchInitInMemory(
184  edata_prefix(src_label_name, dst_label_name, edge_label_name),
185  tmp_dir(work_dir_), oe_degree, ie_degree);
186  } else {
187  dual_csr->BatchInit(
188  oe_prefix(src_label_name, dst_label_name, edge_label_name),
189  ie_prefix(src_label_name, dst_label_name, edge_label_name),
190  edata_prefix(src_label_name, dst_label_name, edge_label_name),
191  tmp_dir(work_dir_), oe_degree, ie_degree);
192  }
193  std::vector<std::thread> work_threads;
194  for (size_t i = 0; i < edges_vec.size(); ++i) {
195  work_threads.emplace_back(
196  [&](int idx) {
197  edge_count.fetch_add(edges_vec[idx].size());
198  for (auto& edge : edges_vec[idx]) {
199  if (std::get<1>(edge) == INVALID_VID ||
200  std::get<0>(edge) == INVALID_VID) {
201  VLOG(10) << "Skip invalid edge:" << std::get<0>(edge) << "->"
202  << std::get<1>(edge);
203  continue;
204  }
205  casted_dual_csr->BatchPutEdge(
206  std::get<0>(edge), std::get<1>(edge), std::get<2>(edge));
207  }
208  },
209  i);
210  }
211  for (auto& t : work_threads) {
212  t.join();
213  }
214 
215  append_edge_loading_progress(src_label_name, dst_label_name,
216  edge_label_name, LoadingStatus::kLoaded);
217  if (schema_.get_sort_on_compaction(src_label_name, dst_label_name,
218  edge_label_name)) {
219  dual_csr->SortByEdgeData(1);
220  }
221  dual_csr->Dump(
222  oe_prefix(src_label_name, dst_label_name, edge_label_name),
223  ie_prefix(src_label_name, dst_label_name, edge_label_name),
224  edata_prefix(src_label_name, dst_label_name, edge_label_name),
225  snapshot_dir(work_dir_, 0));
226  } else {
227  CHECK(ie_degree.size() == dst_indexer.size());
228  CHECK(oe_degree.size() == src_indexer.size());
229 
230  if (build_csr_in_mem) {
231  dual_csr->BatchInitInMemory(
232  edata_prefix(src_label_name, dst_label_name, edge_label_name),
233  tmp_dir(work_dir_), oe_degree, ie_degree);
234  } else {
235  dual_csr->BatchInit(
236  oe_prefix(src_label_name, dst_label_name, edge_label_name),
237  ie_prefix(src_label_name, dst_label_name, edge_label_name),
238  edata_prefix(src_label_name, dst_label_name, edge_label_name),
239  tmp_dir(work_dir_), oe_degree, ie_degree);
240  }
241 
242  std::vector<std::thread> work_threads;
243  for (size_t i = 0; i < edges_vec.size(); ++i) {
244  work_threads.emplace_back(
245  [&](int idx) {
246  edge_count.fetch_add(edges_vec[idx].size());
247  for (auto& edge : edges_vec[idx]) {
248  if (std::get<1>(edge) == INVALID_VID ||
249  std::get<0>(edge) == INVALID_VID) {
250  VLOG(10) << "Skip invalid edge:" << std::get<0>(edge) << "->"
251  << std::get<1>(edge);
252  continue;
253  }
254  casted_dual_csr->BatchPutEdge(
255  std::get<0>(edge), std::get<1>(edge), std::get<2>(edge));
256  }
257  },
258  i);
259  }
260  for (auto& t : work_threads) {
261  t.join();
262  }
263  append_edge_loading_progress(src_label_name, dst_label_name,
264  edge_label_name, LoadingStatus::kLoaded);
265  if (schema_.get_sort_on_compaction(src_label_name, dst_label_name,
266  edge_label_name)) {
267  dual_csr->SortByEdgeData(1);
268  }
269 
270  dual_csr->Dump(
271  oe_prefix(src_label_name, dst_label_name, edge_label_name),
272  ie_prefix(src_label_name, dst_label_name, edge_label_name),
273  edata_prefix(src_label_name, dst_label_name, edge_label_name),
274  snapshot_dir(work_dir_, 0));
275  }
276  append_edge_loading_progress(src_label_name, dst_label_name,
277  edge_label_name, LoadingStatus::kCommited);
278  VLOG(10) << "Finish adding edge batch of size: " << edge_count.load();
279  }
280 
281  Table& GetVertexTable(size_t ind) {
282  CHECK(ind < vertex_data_.size());
283  return vertex_data_[ind];
284  }
285 
286  // get lf_indexer
287  const IndexerType& GetLFIndexer(label_t v_label) const;
288  IndexerType& GetLFIndexer(label_t v_label);
289 
290  const std::string& work_dir() const { return work_dir_; }
291 
292  void set_csr(label_t src_label_id, label_t dst_label_id,
293  label_t edge_label_id, DualCsrBase* dual_csr);
294 
295  DualCsrBase* get_csr(label_t src_label_id, label_t dst_label_id,
296  label_t edge_label_id);
297 
298  void init_edge_table(label_t src_label_id, label_t dst_label_id,
299  label_t edge_label_id);
300 
301  private:
302  // create status files for each vertex label and edge triplet pair.
303  void append_vertex_loading_progress(const std::string& label_name,
304  LoadingStatus status);
305 
306  void append_edge_loading_progress(const std::string& src_label_name,
307  const std::string& dst_label_name,
308  const std::string& edge_label_name,
309  LoadingStatus status);
311  void init_vertex_data();
312  const Schema& schema_;
313  std::string work_dir_;
315  std::vector<IndexerType> lf_indexers_;
316  std::vector<CsrBase*> ie_, oe_;
317  std::vector<DualCsrBase*> dual_csr_list_;
318  std::vector<Table> vertex_data_;
319 
320  // loading progress related
322 };
323 } // namespace gs
324 
325 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
gs::Schema::get_sort_on_compaction
bool get_sort_on_compaction(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:347
gs::oe_prefix
std::string oe_prefix(const std::string &src_label, const std::string &dst_label, const std::string edge_label)
Definition: file_names.h:245
gs::Any
Definition: types.h:395
gs::BasicFragmentLoader::schema_
const Schema & schema_
Definition: basic_fragment_loader.h:312
gs::BasicFragmentLoader::loading_progress_mutex_
std::mutex loading_progress_mutex_
Definition: basic_fragment_loader.h:321
gs::Schema::outgoing_edge_mutable
bool outgoing_edge_mutable(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:327
gs::EdgeStrategy::kMultiple
@ kMultiple
gs::BasicFragmentLoader::LoadFragment
void LoadFragment()
Definition: basic_fragment_loader.cc:135
gs::BasicFragmentLoader::init_vertex_data
void init_vertex_data()
Definition: basic_fragment_loader.cc:121
gs::Schema::get_outgoing_edge_strategy
EdgeStrategy get_outgoing_edge_strategy(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:307
gs::BasicFragmentLoader::append_vertex_loading_progress
void append_vertex_loading_progress(const std::string &label_name, LoadingStatus status)
Definition: basic_fragment_loader.cc:72
schema.h
gs::vertex_table_prefix
std::string vertex_table_prefix(const std::string &label)
Definition: file_names.h:256
file_names.h
gs::edata_prefix
std::string edata_prefix(const std::string &src_label, const std::string &dst_label, const std::string &edge_label)
Definition: file_names.h:251
gs::vid_t
uint32_t vid_t
Definition: types.h:31
gs::Schema::get_incoming_edge_strategy
EdgeStrategy get_incoming_edge_strategy(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:317
gs::BasicFragmentLoader::set_csr
void set_csr(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, DualCsrBase *dual_csr)
Definition: basic_fragment_loader.cc:157
gs::BasicFragmentLoader::lf_indexers_
std::vector< IndexerType > lf_indexers_
Definition: basic_fragment_loader.h:315
gs::EdgeStrategy::kNone
@ kNone
gs
Definition: adj_list.h:23
gs::BasicFragmentLoader::ie_
std::vector< CsrBase * > ie_
Definition: basic_fragment_loader.h:316
gs::vertex_map_prefix
std::string vertex_map_prefix(const std::string &label)
Definition: file_names.h:235
gs::LoadingStatus::kCommited
@ kCommited
gs::BasicFragmentLoader::BasicFragmentLoader
BasicFragmentLoader(const Schema &schema, const std::string &prefix)
Definition: basic_fragment_loader.cc:50
gs::Table
Definition: table.h:30
gs::BasicFragmentLoader::GetLFIndexer
const IndexerType & GetLFIndexer(label_t v_label) const
Definition: basic_fragment_loader.cc:147
mutable_property_fragment.h
gs::BasicFragmentLoader::vertex_label_num_
size_t vertex_label_num_
Definition: basic_fragment_loader.h:314
gs::BasicFragmentLoader
Definition: basic_fragment_loader.h:53
gs::BasicFragmentLoader::get_casted_dual_csr
static decltype(auto) get_casted_dual_csr(DualCsrBase *dual_csr)
Definition: basic_fragment_loader.h:149
gs::BasicFragmentLoader::AddNoPropEdgeBatch
void AddNoPropEdgeBatch(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.h:108
gs::PropertyType::STRING_DEFAULT_MAX_LENGTH
static constexpr const uint16_t STRING_DEFAULT_MAX_LENGTH
Definition: types.h:96
gs::Schema::get_edge_properties
const std::vector< PropertyType > & get_edge_properties(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:216
gs::BasicFragmentLoader::GetVertexTable
Table & GetVertexTable(size_t ind)
Definition: basic_fragment_loader.h:281
gs::BasicFragmentLoader::PutEdges
void PutEdges(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, const std::vector< VECTOR_T > &edges_vec, const std::vector< int32_t > &ie_degree, const std::vector< int32_t > &oe_degree, bool build_csr_in_mem)
Definition: basic_fragment_loader.h:161
gs::BasicFragmentLoader::dual_csr_list_
std::vector< DualCsrBase * > dual_csr_list_
Definition: basic_fragment_loader.h:317
gs::BasicFragmentLoader::init_loading_status_file
void init_loading_status_file()
Definition: basic_fragment_loader.cc:99
gs::Schema
Definition: schema.h:29
gs::LoadingStatus::kUnknown
@ kUnknown
gs::create_typed_csr
TypedMutableCsrBase< EDATA_T > * create_typed_csr(EdgeStrategy es, PropertyType edge_property)
Definition: basic_fragment_loader.h:26
gs::BasicFragmentLoader::work_dir_
std::string work_dir_
Definition: basic_fragment_loader.h:313
gs::MutableCsr
Definition: mutable_csr.h:183
gs::operator<<
std::ostream & operator<<(std::ostream &os, const LoadingStatus &status)
Definition: basic_fragment_loader.cc:22
gs::BasicFragmentLoader::FinishAddingVertex
void FinishAddingVertex(label_t v_label, const IdIndexer< KEY_T, vid_t > &indexer)
Definition: basic_fragment_loader.h:68
gs::LoadingStatus::kLoaded
@ kLoaded
gs::IdIndexer
Definition: id_indexer.h:181
gs::LoadingStatus
LoadingStatus
Definition: basic_fragment_loader.h:39
gs::TypedMutableCsrBase
Definition: csr_base.h:135
gs::Schema::get_vertex_label_name
std::string get_vertex_label_name(label_t index) const
Definition: schema.cc:374
gs::BasicFragmentLoader::edge_label_num_
size_t edge_label_num_
Definition: basic_fragment_loader.h:314
gs::BasicFragmentLoader::SetVertexProperty
void SetVertexProperty(label_t v_label, size_t col_ind, vid_t vid, Any &&prop)
Definition: basic_fragment_loader.h:59
gs::BasicFragmentLoader::oe_
std::vector< CsrBase * > oe_
Definition: basic_fragment_loader.h:316
gs::EdgeStrategy::kSingle
@ kSingle
gs::EmptyCsr
Definition: mutable_csr.h:1170
gs::DualCsr< RecordView >
Definition: dual_csr.h:418
gs::snapshot_dir
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
gs::SingleMutableCsr
Definition: mutable_csr.h:747
gs::BasicFragmentLoader::append_edge_loading_progress
void append_edge_loading_progress(const std::string &src_label_name, const std::string &dst_label_name, const std::string &edge_label_name, LoadingStatus status)
Definition: basic_fragment_loader.cc:84
gs::EdgeStrategy
EdgeStrategy
Definition: types.h:24
gs::ie_prefix
std::string ie_prefix(const std::string &src_label, const std::string &dst_label, const std::string edge_label)
Definition: file_names.h:239
gs::BasicFragmentLoader::get_csr
DualCsrBase * get_csr(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:167
gs::Schema::get_edge_label_name
std::string get_edge_label_name(label_t index) const
Definition: schema.cc:382
gs::DualCsr
Definition: dual_csr.h:101
gs::DualCsrBase
Definition: dual_csr.h:29
gs::BasicFragmentLoader::vertex_data_
std::vector< Table > vertex_data_
Definition: basic_fragment_loader.h:318
gs::PropertyType
Definition: types.h:95
gs::BasicFragmentLoader::work_dir
const std::string & work_dir() const
Definition: basic_fragment_loader.h:290
gs::BasicFragmentLoader::init_edge_table
void init_edge_table(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:175
gs::LoadingStatus::kLoading
@ kLoading
gs::label_t
uint8_t label_t
Definition: types.h:32
gs::Schema::incoming_edge_mutable
bool incoming_edge_mutable(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:337
gs::tmp_dir
std::string tmp_dir(const std::string &work_dir)
Definition: file_names.h:213
gs::LFIndexer
Definition: id_indexer.h:184
gs::operator>>
std::istream & operator>>(std::istream &is, LoadingStatus &status)
Definition: basic_fragment_loader.cc:35
gs::Schema::get_vertex_primary_key
const std::vector< std::tuple< PropertyType, std::string, size_t > > & get_vertex_primary_key(label_t index) const
Definition: schema.cc:391
gs::DualCsr< std::string_view >
Definition: dual_csr.h:247