Flex  0.17.9
basic_fragment_loader.h
Go to the documentation of this file.
1 
16 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
17 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
18 
22 
23 namespace gs {
24 
25 template <typename EDATA_T>
27  PropertyType edge_property) {
28  if (es == EdgeStrategy::kSingle) {
29  return new SingleMutableCsr<EDATA_T>(edge_property);
30  } else if (es == EdgeStrategy::kMultiple) {
31  return new MutableCsr<EDATA_T>(edge_property);
32  } else if (es == EdgeStrategy::kNone) {
33  return new EmptyCsr<EDATA_T>();
34  }
35  LOG(FATAL) << "not support edge strategy or edge data type";
36  return nullptr;
37 }
38 
39 enum class LoadingStatus {
40  kLoading = 0,
41  kLoaded = 1,
42  kCommited = 2,
43  kUnknown = 3,
44 };
45 
46 // define << and >> for LoadingStatus
47 std::ostream& operator<<(std::ostream& os, const LoadingStatus& status);
48 
49 std::istream& operator>>(std::istream& is, LoadingStatus& status);
50 
51 // FragmentLoader should use this BasicFragmentLoader to construct
52 // mutable_csr_fragment.
54  public:
55  BasicFragmentLoader(const Schema& schema, const std::string& prefix);
56 
57  void LoadFragment();
58 
59  // props vector is column_num X batch_size
60  void AddVertexBatch(label_t v_label, const std::vector<vid_t>& vids,
61  const std::vector<std::vector<Any>>& props);
62 
63  inline void SetVertexProperty(label_t v_label, size_t col_ind, vid_t vid,
64  Any&& prop) {
65  auto& table = vertex_data_[v_label];
66  auto dst_columns = table.column_ptrs();
67  CHECK(col_ind < dst_columns.size());
68  dst_columns[col_ind]->set_any(vid, prop);
69  }
70 #ifndef USE_PTHASH
71  template <typename KEY_T>
73  const IdIndexer<KEY_T, vid_t>& indexer) {
74  CHECK(v_label < vertex_label_num_);
75  std::string filename =
77  auto primary_keys = schema_.get_vertex_primary_key(v_label);
78  auto type = std::get<0>(primary_keys[0]);
79 
80  build_lf_indexer<KEY_T, vid_t>(
81  indexer, LFIndexer<vid_t>::prefix() + "_" + filename,
83  type);
86  auto& v_data = vertex_data_[v_label];
87  auto label_name = schema_.get_vertex_label_name(v_label);
88  v_data.resize(lf_indexers_[v_label].size());
89  v_data.dump(vertex_table_prefix(label_name), snapshot_dir(work_dir_, 0));
91  }
92 #else
93  template <typename KEY_T>
94  void FinishAddingVertex(label_t v_label,
95  PTIndexerBuilder<KEY_T, vid_t>& indexer_builder) {
96  CHECK(v_label < vertex_label_num_);
97  std::string filename =
99  indexer_builder.finish(PTIndexer<vid_t>::prefix() + "_" + filename,
100  snapshot_dir(work_dir_, 0), lf_indexers_[v_label]);
103  auto& v_data = vertex_data_[v_label];
104  auto label_name = schema_.get_vertex_label_name(v_label);
105  v_data.resize(lf_indexers_[v_label].size());
106  v_data.dump(vertex_table_prefix(label_name), snapshot_dir(work_dir_, 0));
108  }
109 #endif
110 
111  template <typename EDATA_T>
112  void AddNoPropEdgeBatch(label_t src_label_id, label_t dst_label_id,
113  label_t edge_label_id) {
114  size_t index = src_label_id * vertex_label_num_ * edge_label_num_ +
115  dst_label_id * edge_label_num_ + edge_label_id;
116  CHECK(ie_[index] == NULL);
117  CHECK(oe_[index] == NULL);
118  auto src_label_name = schema_.get_vertex_label_name(src_label_id);
119  auto dst_label_name = schema_.get_vertex_label_name(dst_label_id);
120  auto edge_label_name = schema_.get_edge_label_name(edge_label_id);
122  src_label_name, dst_label_name, edge_label_name);
124  src_label_name, dst_label_name, edge_label_name);
125  if constexpr (std::is_same_v<EDATA_T, std::string_view>) {
126  const auto& prop = schema_.get_edge_properties(src_label_id, dst_label_id,
127  edge_label_id);
128 
129  size_t max_length = PropertyType::STRING_DEFAULT_MAX_LENGTH;
130  if (prop[0].IsVarchar()) {
131  max_length = prop[0].additional_type_info.max_length;
132  }
133  dual_csr_list_[index] =
134  new DualCsr<std::string_view>(oe_strategy, ie_strategy, max_length);
135  } else {
136  bool oe_mutable = schema_.outgoing_edge_mutable(
137  src_label_name, dst_label_name, edge_label_name);
138  bool ie_mutable = schema_.incoming_edge_mutable(
139  src_label_name, dst_label_name, edge_label_name);
140  dual_csr_list_[index] = new DualCsr<EDATA_T>(oe_strategy, ie_strategy,
141  oe_mutable, ie_mutable);
142  }
143  ie_[index] = dual_csr_list_[index]->GetInCsr();
144  oe_[index] = dual_csr_list_[index]->GetOutCsr();
145  dual_csr_list_[index]->BatchInit(
146  oe_prefix(src_label_name, dst_label_name, edge_label_name),
147  ie_prefix(src_label_name, dst_label_name, edge_label_name),
148  edata_prefix(src_label_name, dst_label_name, edge_label_name),
149  tmp_dir(work_dir_), {}, {});
150  }
151 
152  template <typename EDATA_T>
153  static decltype(auto) get_casted_dual_csr(DualCsrBase* dual_csr) {
154  if constexpr (std::is_same_v<EDATA_T, RecordView>) {
155  auto casted_dual_csr = dynamic_cast<DualCsr<RecordView>*>(dual_csr);
156  CHECK(casted_dual_csr != NULL);
157  return casted_dual_csr;
158  } else {
159  auto casted_dual_csr = dynamic_cast<DualCsr<EDATA_T>*>(dual_csr);
160  CHECK(casted_dual_csr != NULL);
161  return casted_dual_csr;
162  }
163  }
164  template <typename EDATA_T, typename VECTOR_T>
165  void PutEdges(label_t src_label_id, label_t dst_label_id,
166  label_t edge_label_id, const std::vector<VECTOR_T>& edges_vec,
167  const std::vector<int32_t>& ie_degree,
168  const std::vector<int32_t>& oe_degree, bool build_csr_in_mem) {
169  size_t index = src_label_id * vertex_label_num_ * edge_label_num_ +
170  dst_label_id * edge_label_num_ + edge_label_id;
171  auto dual_csr = dual_csr_list_[index];
172  CHECK(dual_csr != NULL);
173  auto casted_dual_csr = get_casted_dual_csr<EDATA_T>(dual_csr);
174  CHECK(casted_dual_csr != NULL);
175  auto& src_indexer = lf_indexers_[src_label_id];
176  auto& dst_indexer = lf_indexers_[dst_label_id];
177  auto src_label_name = schema_.get_vertex_label_name(src_label_id);
178  auto dst_label_name = schema_.get_vertex_label_name(dst_label_id);
179  auto edge_label_name = schema_.get_edge_label_name(edge_label_id);
180 
181  auto INVALID_VID = std::numeric_limits<vid_t>::max();
182  std::atomic<size_t> edge_count(0);
183  if constexpr (std::is_same_v<EDATA_T, std::string_view>) {
184  CHECK(ie_degree.size() == dst_indexer.size());
185  CHECK(oe_degree.size() == src_indexer.size());
186  if (build_csr_in_mem) {
187  dual_csr->BatchInitInMemory(
188  edata_prefix(src_label_name, dst_label_name, edge_label_name),
189  tmp_dir(work_dir_), oe_degree, ie_degree);
190  } else {
191  dual_csr->BatchInit(
192  oe_prefix(src_label_name, dst_label_name, edge_label_name),
193  ie_prefix(src_label_name, dst_label_name, edge_label_name),
194  edata_prefix(src_label_name, dst_label_name, edge_label_name),
195  tmp_dir(work_dir_), oe_degree, ie_degree);
196  }
197  std::vector<std::thread> work_threads;
198  for (size_t i = 0; i < edges_vec.size(); ++i) {
199  work_threads.emplace_back(
200  [&](int idx) {
201  edge_count.fetch_add(edges_vec[idx].size());
202  for (auto& edge : edges_vec[idx]) {
203  if (std::get<1>(edge) == INVALID_VID ||
204  std::get<0>(edge) == INVALID_VID) {
205  VLOG(10) << "Skip invalid edge:" << std::get<0>(edge) << "->"
206  << std::get<1>(edge);
207  continue;
208  }
209  casted_dual_csr->BatchPutEdge(
210  std::get<0>(edge), std::get<1>(edge), std::get<2>(edge));
211  }
212  },
213  i);
214  }
215  for (auto& t : work_threads) {
216  t.join();
217  }
218 
219  append_edge_loading_progress(src_label_name, dst_label_name,
220  edge_label_name, LoadingStatus::kLoaded);
221  if (schema_.get_sort_on_compaction(src_label_name, dst_label_name,
222  edge_label_name)) {
223  dual_csr->SortByEdgeData(1);
224  }
225  dual_csr->Dump(
226  oe_prefix(src_label_name, dst_label_name, edge_label_name),
227  ie_prefix(src_label_name, dst_label_name, edge_label_name),
228  edata_prefix(src_label_name, dst_label_name, edge_label_name),
229  snapshot_dir(work_dir_, 0));
230  } else {
231  CHECK(ie_degree.size() == dst_indexer.size());
232  CHECK(oe_degree.size() == src_indexer.size());
233 
234  if (build_csr_in_mem) {
235  dual_csr->BatchInitInMemory(
236  edata_prefix(src_label_name, dst_label_name, edge_label_name),
237  tmp_dir(work_dir_), oe_degree, ie_degree);
238  } else {
239  dual_csr->BatchInit(
240  oe_prefix(src_label_name, dst_label_name, edge_label_name),
241  ie_prefix(src_label_name, dst_label_name, edge_label_name),
242  edata_prefix(src_label_name, dst_label_name, edge_label_name),
243  tmp_dir(work_dir_), oe_degree, ie_degree);
244  }
245 
246  std::vector<std::thread> work_threads;
247  for (size_t i = 0; i < edges_vec.size(); ++i) {
248  work_threads.emplace_back(
249  [&](int idx) {
250  edge_count.fetch_add(edges_vec[idx].size());
251  for (auto& edge : edges_vec[idx]) {
252  if (std::get<1>(edge) == INVALID_VID ||
253  std::get<0>(edge) == INVALID_VID) {
254  VLOG(10) << "Skip invalid edge:" << std::get<0>(edge) << "->"
255  << std::get<1>(edge);
256  continue;
257  }
258  casted_dual_csr->BatchPutEdge(
259  std::get<0>(edge), std::get<1>(edge), std::get<2>(edge));
260  }
261  },
262  i);
263  }
264  for (auto& t : work_threads) {
265  t.join();
266  }
267  append_edge_loading_progress(src_label_name, dst_label_name,
268  edge_label_name, LoadingStatus::kLoaded);
269  if (schema_.get_sort_on_compaction(src_label_name, dst_label_name,
270  edge_label_name)) {
271  dual_csr->SortByEdgeData(1);
272  }
273 
274  dual_csr->Dump(
275  oe_prefix(src_label_name, dst_label_name, edge_label_name),
276  ie_prefix(src_label_name, dst_label_name, edge_label_name),
277  edata_prefix(src_label_name, dst_label_name, edge_label_name),
278  snapshot_dir(work_dir_, 0));
279  }
280  append_edge_loading_progress(src_label_name, dst_label_name,
281  edge_label_name, LoadingStatus::kCommited);
282  VLOG(10) << "Finish adding edge batch of size: " << edge_count.load();
283  }
284 
285  Table& GetVertexTable(size_t ind) {
286  CHECK(ind < vertex_data_.size());
287  return vertex_data_[ind];
288  }
289 
290  // get lf_indexer
291  const IndexerType& GetLFIndexer(label_t v_label) const;
292  IndexerType& GetLFIndexer(label_t v_label);
293 
294  const std::string& work_dir() const { return work_dir_; }
295 
296  void set_csr(label_t src_label_id, label_t dst_label_id,
297  label_t edge_label_id, DualCsrBase* dual_csr);
298 
299  DualCsrBase* get_csr(label_t src_label_id, label_t dst_label_id,
300  label_t edge_label_id);
301 
302  void init_edge_table(label_t src_label_id, label_t dst_label_id,
303  label_t edge_label_id);
304 
305  private:
306  // create status files for each vertex label and edge triplet pair.
307  void append_vertex_loading_progress(const std::string& label_name,
308  LoadingStatus status);
309 
310  void append_edge_loading_progress(const std::string& src_label_name,
311  const std::string& dst_label_name,
312  const std::string& edge_label_name,
313  LoadingStatus status);
315  void init_vertex_data();
316  const Schema& schema_;
317  std::string work_dir_;
319  std::vector<IndexerType> lf_indexers_;
320  std::vector<CsrBase*> ie_, oe_;
321  std::vector<DualCsrBase*> dual_csr_list_;
322  std::vector<Table> vertex_data_;
323 
324  // loading progress related
326 };
327 } // namespace gs
328 
329 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
gs::BasicFragmentLoader::AddVertexBatch
void AddVertexBatch(label_t v_label, const std::vector< vid_t > &vids, const std::vector< std::vector< Any >> &props)
Definition: basic_fragment_loader.cc:147
gs::Schema::get_sort_on_compaction
bool get_sort_on_compaction(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:332
gs::oe_prefix
std::string oe_prefix(const std::string &src_label, const std::string &dst_label, const std::string edge_label)
Definition: file_names.h:245
gs::Any
Definition: types.h:383
gs::BasicFragmentLoader::schema_
const Schema & schema_
Definition: basic_fragment_loader.h:316
gs::BasicFragmentLoader::loading_progress_mutex_
std::mutex loading_progress_mutex_
Definition: basic_fragment_loader.h:325
gs::Schema::outgoing_edge_mutable
bool outgoing_edge_mutable(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:312
gs::EdgeStrategy::kMultiple
@ kMultiple
gs::BasicFragmentLoader::LoadFragment
void LoadFragment()
Definition: basic_fragment_loader.cc:135
gs::BasicFragmentLoader::init_vertex_data
void init_vertex_data()
Definition: basic_fragment_loader.cc:121
gs::Schema::get_outgoing_edge_strategy
EdgeStrategy get_outgoing_edge_strategy(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:292
gs::BasicFragmentLoader::append_vertex_loading_progress
void append_vertex_loading_progress(const std::string &label_name, LoadingStatus status)
Definition: basic_fragment_loader.cc:72
schema.h
gs::vertex_table_prefix
std::string vertex_table_prefix(const std::string &label)
Definition: file_names.h:256
file_names.h
gs::edata_prefix
std::string edata_prefix(const std::string &src_label, const std::string &dst_label, const std::string &edge_label)
Definition: file_names.h:251
gs::vid_t
uint32_t vid_t
Definition: types.h:31
gs::Schema::get_incoming_edge_strategy
EdgeStrategy get_incoming_edge_strategy(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:302
gs::BasicFragmentLoader::set_csr
void set_csr(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, DualCsrBase *dual_csr)
Definition: basic_fragment_loader.cc:177
gs::BasicFragmentLoader::lf_indexers_
std::vector< IndexerType > lf_indexers_
Definition: basic_fragment_loader.h:319
gs::EdgeStrategy::kNone
@ kNone
gs
Definition: adj_list.h:23
gs::BasicFragmentLoader::ie_
std::vector< CsrBase * > ie_
Definition: basic_fragment_loader.h:320
gs::vertex_map_prefix
std::string vertex_map_prefix(const std::string &label)
Definition: file_names.h:235
gs::LoadingStatus::kCommited
@ kCommited
gs::BasicFragmentLoader::BasicFragmentLoader
BasicFragmentLoader(const Schema &schema, const std::string &prefix)
Definition: basic_fragment_loader.cc:50
gs::Table
Definition: table.h:30
gs::BasicFragmentLoader::GetLFIndexer
const IndexerType & GetLFIndexer(label_t v_label) const
Definition: basic_fragment_loader.cc:167
mutable_property_fragment.h
gs::BasicFragmentLoader::vertex_label_num_
size_t vertex_label_num_
Definition: basic_fragment_loader.h:318
gs::BasicFragmentLoader
Definition: basic_fragment_loader.h:53
gs::BasicFragmentLoader::get_casted_dual_csr
static decltype(auto) get_casted_dual_csr(DualCsrBase *dual_csr)
Definition: basic_fragment_loader.h:153
gs::BasicFragmentLoader::AddNoPropEdgeBatch
void AddNoPropEdgeBatch(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.h:112
gs::PropertyType::STRING_DEFAULT_MAX_LENGTH
static constexpr const uint16_t STRING_DEFAULT_MAX_LENGTH
Definition: types.h:96
gs::Schema::get_edge_properties
const std::vector< PropertyType > & get_edge_properties(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:201
gs::BasicFragmentLoader::GetVertexTable
Table & GetVertexTable(size_t ind)
Definition: basic_fragment_loader.h:285
gs::BasicFragmentLoader::PutEdges
void PutEdges(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, const std::vector< VECTOR_T > &edges_vec, const std::vector< int32_t > &ie_degree, const std::vector< int32_t > &oe_degree, bool build_csr_in_mem)
Definition: basic_fragment_loader.h:165
gs::BasicFragmentLoader::dual_csr_list_
std::vector< DualCsrBase * > dual_csr_list_
Definition: basic_fragment_loader.h:321
gs::BasicFragmentLoader::init_loading_status_file
void init_loading_status_file()
Definition: basic_fragment_loader.cc:99
gs::Schema
Definition: schema.h:29
gs::LoadingStatus::kUnknown
@ kUnknown
gs::create_typed_csr
TypedMutableCsrBase< EDATA_T > * create_typed_csr(EdgeStrategy es, PropertyType edge_property)
Definition: basic_fragment_loader.h:26
gs::BasicFragmentLoader::work_dir_
std::string work_dir_
Definition: basic_fragment_loader.h:317
gs::MutableCsr
Definition: mutable_csr.h:183
gs::operator<<
std::ostream & operator<<(std::ostream &os, const LoadingStatus &status)
Definition: basic_fragment_loader.cc:22
gs::BasicFragmentLoader::FinishAddingVertex
void FinishAddingVertex(label_t v_label, const IdIndexer< KEY_T, vid_t > &indexer)
Definition: basic_fragment_loader.h:72
gs::LoadingStatus::kLoaded
@ kLoaded
gs::IdIndexer
Definition: id_indexer.h:181
gs::LoadingStatus
LoadingStatus
Definition: basic_fragment_loader.h:39
gs::TypedMutableCsrBase
Definition: csr_base.h:135
gs::Schema::get_vertex_label_name
std::string get_vertex_label_name(label_t index) const
Definition: schema.cc:359
gs::BasicFragmentLoader::edge_label_num_
size_t edge_label_num_
Definition: basic_fragment_loader.h:318
gs::BasicFragmentLoader::SetVertexProperty
void SetVertexProperty(label_t v_label, size_t col_ind, vid_t vid, Any &&prop)
Definition: basic_fragment_loader.h:63
gs::BasicFragmentLoader::oe_
std::vector< CsrBase * > oe_
Definition: basic_fragment_loader.h:320
gs::EdgeStrategy::kSingle
@ kSingle
gs::EmptyCsr
Definition: mutable_csr.h:1168
gs::DualCsr< RecordView >
Definition: dual_csr.h:418
gs::snapshot_dir
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
gs::SingleMutableCsr
Definition: mutable_csr.h:745
gs::BasicFragmentLoader::append_edge_loading_progress
void append_edge_loading_progress(const std::string &src_label_name, const std::string &dst_label_name, const std::string &edge_label_name, LoadingStatus status)
Definition: basic_fragment_loader.cc:84
gs::EdgeStrategy
EdgeStrategy
Definition: types.h:24
gs::ie_prefix
std::string ie_prefix(const std::string &src_label, const std::string &dst_label, const std::string edge_label)
Definition: file_names.h:239
gs::BasicFragmentLoader::get_csr
DualCsrBase * get_csr(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:187
gs::Schema::get_edge_label_name
std::string get_edge_label_name(label_t index) const
Definition: schema.cc:367
gs::DualCsr
Definition: dual_csr.h:101
gs::DualCsrBase
Definition: dual_csr.h:29
gs::BasicFragmentLoader::vertex_data_
std::vector< Table > vertex_data_
Definition: basic_fragment_loader.h:322
gs::PropertyType
Definition: types.h:95
gs::BasicFragmentLoader::work_dir
const std::string & work_dir() const
Definition: basic_fragment_loader.h:294
gs::BasicFragmentLoader::init_edge_table
void init_edge_table(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:195
gs::LoadingStatus::kLoading
@ kLoading
gs::label_t
uint8_t label_t
Definition: types.h:32
gs::Schema::incoming_edge_mutable
bool incoming_edge_mutable(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:322
gs::tmp_dir
std::string tmp_dir(const std::string &work_dir)
Definition: file_names.h:213
gs::LFIndexer
Definition: id_indexer.h:184
gs::operator>>
std::istream & operator>>(std::istream &is, LoadingStatus &status)
Definition: basic_fragment_loader.cc:35
gs::Schema::get_vertex_primary_key
const std::vector< std::tuple< PropertyType, std::string, size_t > > & get_vertex_primary_key(label_t index) const
Definition: schema.cc:376
gs::DualCsr< std::string_view >
Definition: dual_csr.h:247