Flex  0.17.9
abstract_arrow_fragment_loader.h
Go to the documentation of this file.
1 
17 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
18 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
19 
24 #include "flex/utils/mmap_vector.h"
25 #include "grape/utils/concurrent_queue.h"
26 
27 #include <arrow/api.h>
28 #include <arrow/io/api.h>
29 #include <shared_mutex>
30 #include "arrow/util/value_parsing.h"
31 
32 #include "grape/util.h"
33 
34 namespace gs {
35 
36 void printDiskRemaining(const std::string& path);
37 // The interface providing visitor pattern for RecordBatch.
38 
40  public:
41  virtual ~IRecordBatchSupplier() = default;
42  virtual std::shared_ptr<arrow::RecordBatch> GetNextBatch() = 0;
43 };
44 
45 bool check_primary_key_type(std::shared_ptr<arrow::DataType> data_type);
46 
47 // For Primitive types.
48 template <typename COL_T>
49 void set_column(gs::ColumnBase* col, std::shared_ptr<arrow::ChunkedArray> array,
50  const std::vector<size_t>& offset) {
51  using arrow_array_type = typename gs::TypeConverter<COL_T>::ArrowArrayType;
52  auto array_type = array->type();
53  auto arrow_type = gs::TypeConverter<COL_T>::ArrowTypeValue();
54  CHECK(array_type->Equals(arrow_type))
55  << "Inconsistent data type, expect " << arrow_type->ToString()
56  << ", but got " << array_type->ToString();
57  size_t cur_ind = 0;
58  for (auto j = 0; j < array->num_chunks(); ++j) {
59  auto casted = std::static_pointer_cast<arrow_array_type>(array->chunk(j));
60  size_t size = col->size();
61  for (auto k = 0; k < casted->length(); ++k) {
62  if (offset[cur_ind] >= size) {
63  cur_ind++;
64  } else {
65  col->set_any(offset[cur_ind++],
66  std::move(AnyConverter<COL_T>::to_any(casted->Value(k))));
67  }
68  }
69  }
70 }
71 
72 // For String types.
74  std::shared_ptr<arrow::ChunkedArray> array,
75  const std::vector<size_t>& offset);
76 
78  std::shared_ptr<arrow::ChunkedArray> array,
79  const std::vector<size_t>& offset);
80 
82  gs::ColumnBase* col, std::shared_ptr<arrow::ChunkedArray> array,
83  const std::vector<size_t>& offset);
84 
86  std::shared_ptr<arrow::ChunkedArray> array,
87  const std::vector<size_t>& offset);
88 
90  const Schema& schema,
91  const std::vector<std::tuple<size_t, std::string, std::string>>&
92  column_mappings,
93  size_t src_col_ind, size_t dst_col_ind, label_t src_label_i,
94  label_t dst_label_i, label_t edge_label_i);
95 
96 template <typename KEY_T>
97 struct _add_vertex {
98 #ifndef USE_PTHASH
99  void operator()(const std::shared_ptr<arrow::Array>& col,
100  IdIndexer<KEY_T, vid_t>& indexer,
101  std::vector<size_t>& offset) {
102  size_t row_num = col->length();
103  vid_t vid;
104  if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
105  // for non-string value
106  auto expected_type = gs::TypeConverter<KEY_T>::ArrowTypeValue();
107  using arrow_array_t = typename gs::TypeConverter<KEY_T>::ArrowArrayType;
108  if (!col->type()->Equals(expected_type)) {
109  LOG(FATAL) << "Inconsistent data type, expect "
110  << expected_type->ToString() << ", but got "
111  << col->type()->ToString();
112  }
113  auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
114  for (size_t i = 0; i < row_num; ++i) {
115  if (!indexer.add(casted_array->Value(i), vid)) {
116  VLOG(2) << "Duplicate vertex id: " << casted_array->Value(i) << "..";
117  offset.emplace_back(std::numeric_limits<size_t>::max());
118  } else {
119  offset.emplace_back(vid);
120  }
121  }
122  } else {
123  if (col->type()->Equals(arrow::utf8())) {
124  auto casted_array = std::static_pointer_cast<arrow::StringArray>(col);
125  for (size_t i = 0; i < row_num; ++i) {
126  auto str = casted_array->GetView(i);
127  std::string_view str_view(str.data(), str.size());
128  if (!indexer.add(str_view, vid)) {
129  VLOG(2) << "Duplicate vertex id: " << str_view << "..";
130  offset.emplace_back(std::numeric_limits<size_t>::max());
131  } else {
132  offset.emplace_back(vid);
133  }
134  }
135  } else if (col->type()->Equals(arrow::large_utf8())) {
136  auto casted_array =
137  std::static_pointer_cast<arrow::LargeStringArray>(col);
138  for (size_t i = 0; i < row_num; ++i) {
139  auto str = casted_array->GetView(i);
140  std::string_view str_view(str.data(), str.size());
141  if (!indexer.add(str_view, vid)) {
142  VLOG(2) << "Duplicate vertex id: " << str_view << "..";
143  offset.emplace_back(std::numeric_limits<size_t>::max());
144  } else {
145  offset.emplace_back(vid);
146  }
147  }
148  } else {
149  LOG(FATAL) << "Not support type: " << col->type()->ToString();
150  }
151  }
152  }
153 
154 #else
155  void operator()(const std::shared_ptr<arrow::Array>& col,
156  PTIndexerBuilder<KEY_T, vid_t>& indexer) {
157  size_t row_num = col->length();
158  if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
159  // for non-string value
160  auto expected_type = gs::TypeConverter<KEY_T>::ArrowTypeValue();
161  using arrow_array_t = typename gs::TypeConverter<KEY_T>::ArrowArrayType;
162  if (!col->type()->Equals(expected_type)) {
163  LOG(FATAL) << "Inconsistent data type, expect "
164  << expected_type->ToString() << ", but got "
165  << col->type()->ToString();
166  }
167  auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
168  for (size_t i = 0; i < row_num; ++i) {
169  indexer.add_vertex(casted_array->Value(i));
170  }
171  } else {
172  if (col->type()->Equals(arrow::utf8())) {
173  auto casted_array = std::static_pointer_cast<arrow::StringArray>(col);
174  for (size_t i = 0; i < row_num; ++i) {
175  auto str = casted_array->GetView(i);
176  std::string_view str_view(str.data(), str.size());
177  indexer.add_vertex(str_view);
178  }
179  } else if (col->type()->Equals(arrow::large_utf8())) {
180  auto casted_array =
181  std::static_pointer_cast<arrow::LargeStringArray>(col);
182  for (size_t i = 0; i < row_num; ++i) {
183  auto str = casted_array->GetView(i);
184  std::string_view str_view(str.data(), str.size());
185  indexer.add_vertex(str_view);
186  }
187  } else {
188  LOG(FATAL) << "Not support type: " << col->type()->ToString();
189  }
190  }
191  }
192 #endif
193 };
194 
195 template <typename PK_T, typename EDATA_T, typename VECTOR_T>
196 void _append(bool is_dst, size_t cur_ind, std::shared_ptr<arrow::Array> col,
197  const IndexerType& indexer, VECTOR_T& parsed_edges,
198  std::vector<std::atomic<int32_t>>& degree) {
199  static constexpr auto invalid_vid = std::numeric_limits<vid_t>::max();
200  if constexpr (std::is_same_v<PK_T, std::string_view>) {
201  if (col->type()->Equals(arrow::utf8())) {
202  auto casted = std::static_pointer_cast<arrow::StringArray>(col);
203  for (auto j = 0; j < casted->length(); ++j) {
204  auto str = casted->GetView(j);
205  std::string_view str_view(str.data(), str.size());
206  auto vid = indexer.get_index(Any::From(str_view));
207  if (is_dst) {
208  std::get<1>(parsed_edges[cur_ind++]) = vid;
209  } else {
210  std::get<0>(parsed_edges[cur_ind++]) = vid;
211  }
212  if (vid != invalid_vid) {
213  degree[vid]++;
214  }
215  }
216  } else {
217  // must be large utf8
218  auto casted = std::static_pointer_cast<arrow::LargeStringArray>(col);
219  for (auto j = 0; j < casted->length(); ++j) {
220  auto str = casted->GetView(j);
221  std::string_view str_view(str.data(), str.size());
222  auto vid = indexer.get_index(Any::From(str_view));
223  if (is_dst) {
224  std::get<1>(parsed_edges[cur_ind++]) = vid;
225  } else {
226  std::get<0>(parsed_edges[cur_ind++]) = vid;
227  }
228  if (vid != invalid_vid) {
229  degree[vid]++;
230  }
231  }
232  }
233  } else {
234  using arrow_array_type = typename gs::TypeConverter<PK_T>::ArrowArrayType;
235  auto casted = std::static_pointer_cast<arrow_array_type>(col);
236  for (auto j = 0; j < casted->length(); ++j) {
237  auto vid = indexer.get_index(Any::From(casted->Value(j)));
238  if (is_dst) {
239  std::get<1>(parsed_edges[cur_ind++]) = vid;
240  } else {
241  std::get<0>(parsed_edges[cur_ind++]) = vid;
242  }
243  if (vid != invalid_vid) {
244  degree[vid]++;
245  }
246  }
247  }
248 }
249 
250 template <typename SRC_PK_T, typename DST_PK_T, typename EDATA_T,
251  typename VECTOR_T>
252 static void append_edges(std::shared_ptr<arrow::Array> src_col,
253  std::shared_ptr<arrow::Array> dst_col,
254  const IndexerType& src_indexer,
255  const IndexerType& dst_indexer,
256  std::shared_ptr<arrow::Array>& edata_cols,
257  const PropertyType& edge_prop, VECTOR_T& parsed_edges,
258  std::vector<std::atomic<int32_t>>& ie_degree,
259  std::vector<std::atomic<int32_t>>& oe_degree,
260  size_t offset = 0) {
261  CHECK(src_col->length() == dst_col->length());
262  auto indexer_check_lambda = [](const IndexerType& cur_indexer,
263  const std::shared_ptr<arrow::Array>& cur_col) {
264  if (cur_indexer.get_type() == PropertyType::kInt64) {
265  CHECK(cur_col->type()->Equals(arrow::int64()));
266  } else if (cur_indexer.get_type() == PropertyType::kStringView) {
267  CHECK(cur_col->type()->Equals(arrow::utf8()) ||
268  cur_col->type()->Equals(arrow::large_utf8()));
269  } else if (cur_indexer.get_type() == PropertyType::kInt32) {
270  CHECK(cur_col->type()->Equals(arrow::int32()));
271  } else if (cur_indexer.get_type() == PropertyType::kUInt32) {
272  CHECK(cur_col->type()->Equals(arrow::uint32()));
273  } else if (cur_indexer.get_type() == PropertyType::kUInt64) {
274  CHECK(cur_col->type()->Equals(arrow::uint64()));
275  }
276  };
277 
278  indexer_check_lambda(src_indexer, src_col);
279  indexer_check_lambda(dst_indexer, dst_col);
280  auto old_size = parsed_edges.size();
281  parsed_edges.resize(old_size + src_col->length());
282  VLOG(10) << "resize parsed_edges from" << old_size << " to "
283  << parsed_edges.size();
284 
285  // if EDATA_T is grape::EmptyType, no need to read columns
286  auto edata_col_thread = std::thread([&]() {
287  if constexpr (std::is_same<EDATA_T, RecordView>::value) {
288  size_t cur_ind = old_size;
289  for (auto j = 0; j < src_col->length(); ++j) {
290  std::get<2>(parsed_edges[cur_ind++]) = offset++;
291  }
292  } else if constexpr (!std::is_same<EDATA_T, grape::EmptyType>::value) {
293  auto edata_col = edata_cols;
294  CHECK(src_col->length() == edata_col->length());
295  size_t cur_ind = old_size;
296  auto type = edata_col->type();
297  if (!type->Equals(TypeConverter<EDATA_T>::ArrowTypeValue())) {
298  LOG(FATAL) << "Inconsistent data type, expect "
300  << ", but got " << type->ToString();
301  }
302 
303  using arrow_array_type =
305  // cast chunk to EDATA_T array
306  auto data = std::static_pointer_cast<arrow_array_type>(edata_col);
307  for (auto j = 0; j < edata_col->length(); ++j) {
308  if constexpr (std::is_same<arrow_array_type,
309  arrow::StringArray>::value ||
310  std::is_same<arrow_array_type,
311  arrow::LargeStringArray>::value) {
312  auto str = data->GetView(j);
313  std::string_view str_view(str.data(), str.size());
314  std::get<2>(parsed_edges[cur_ind++]) = str_view;
315  } else {
316  std::get<2>(parsed_edges[cur_ind++]) = data->Value(j);
317  }
318  }
319  VLOG(10) << "Finish inserting: " << src_col->length() << " edges";
320  }
321  });
322  size_t cur_ind = old_size;
323  auto src_col_thread = std::thread([&]() {
324  _append<SRC_PK_T, EDATA_T>(false, cur_ind, src_col, src_indexer,
325  parsed_edges, oe_degree);
326  });
327  auto dst_col_thread = std::thread([&]() {
328  _append<DST_PK_T, EDATA_T>(true, cur_ind, dst_col, dst_indexer,
329  parsed_edges, ie_degree);
330  });
331  src_col_thread.join();
332  dst_col_thread.join();
333  edata_col_thread.join();
334 }
335 
336 // A AbstractArrowFragmentLoader with can load fragment from arrow::table.
337 // Cannot be used directly, should be inherited.
339  public:
340  AbstractArrowFragmentLoader(const std::string& work_dir, const Schema& schema,
341  const LoadingConfig& loading_config)
342  : loading_config_(loading_config),
343  schema_(schema),
344  thread_num_(loading_config_.GetParallelism()),
345  build_csr_in_mem_(loading_config_.GetBuildCsrInMem()),
346  use_mmap_vector_(loading_config_.GetUseMmapVector()),
347  basic_fragment_loader_(schema_, work_dir) {
350  mtxs_ = new std::mutex[vertex_label_num_];
351  }
352 
354  if (mtxs_) {
355  delete[] mtxs_;
356  }
357  }
358 
360  label_t v_label_id, const std::vector<std::string>& input_paths,
361  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
362  label_t, const std::string&, const LoadingConfig&, int)>
363  supplier_creator);
364 
365  // Add edges in record batch to output_parsed_edges, output_ie_degrees and
366  // output_oe_degrees.
367 
368  void AddEdgesRecordBatch(
369  label_t src_label_id, label_t dst_label_id, label_t edge_label_id,
370  const std::vector<std::string>& input_paths,
371  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
372  label_t, label_t, label_t, const std::string&, const LoadingConfig&,
373  int)>
374  supplier_creator);
375 
376  protected:
377  template <typename KEY_T>
379  label_t v_label_id, IdIndexer<KEY_T, vid_t>& indexer,
380  std::shared_ptr<arrow::Array>& primary_key_col,
381  const std::vector<std::shared_ptr<arrow::Array>>& property_cols) {
382  size_t row_num = primary_key_col->length();
383  auto col_num = property_cols.size();
384  for (size_t i = 0; i < col_num; ++i) {
385  CHECK_EQ(property_cols[i]->length(), row_num);
386  }
387 
388  std::vector<size_t> vids;
389  vids.reserve(row_num);
390  {
391  std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
392  _add_vertex<KEY_T>()(primary_key_col, indexer, vids);
393  }
394  for (size_t j = 0; j < property_cols.size(); ++j) {
395  auto array = property_cols[j];
396  auto chunked_array = std::make_shared<arrow::ChunkedArray>(array);
399  chunked_array, vids);
400  }
401 
402  VLOG(10) << "Insert rows: " << row_num;
403  }
404 
405 #ifndef USE_PTHASH
406  template <typename KEY_T>
408  label_t v_label_id, const std::vector<std::string>& v_files,
409  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
410  label_t, const std::string&, const LoadingConfig&, int)>
411  supplier_creator) {
412  std::string v_label_name = schema_.get_vertex_label_name(v_label_id);
413  VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label "
414  << v_label_name;
415  auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0];
416  auto primary_key_name = std::get<1>(primary_key);
417  size_t primary_key_ind = std::get<2>(primary_key);
418  IdIndexer<KEY_T, vid_t> indexer;
419 
420  grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
421  queue.SetLimit(1024);
422  std::vector<std::thread> work_threads;
423 
424  for (auto& v_file : v_files) {
425  VLOG(10) << "Parsing vertex file:" << v_file << " for label "
426  << v_label_name;
427  auto record_batch_supplier_vec =
428  supplier_creator(v_label_id, v_file, loading_config_,
429  std::thread::hardware_concurrency());
430  queue.SetProducerNum(record_batch_supplier_vec.size());
431 
432  for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
433  work_threads.emplace_back(
434  [&](int i) {
435  auto& record_batch_supplier = record_batch_supplier_vec[i];
436  bool first_batch = true;
437  while (true) {
438  auto batch = record_batch_supplier->GetNextBatch();
439  if (!batch) {
440  queue.DecProducerNum();
441  break;
442  }
443  if (first_batch) {
444  auto header = batch->schema()->field_names();
445  auto schema_column_names =
447  CHECK(schema_column_names.size() + 1 == header.size())
448  << "File header of size: " << header.size()
449  << " does not match schema column size: "
450  << schema_column_names.size() + 1;
451  first_batch = false;
452  }
453  queue.Put(batch);
454  }
455  },
456  idx);
457  }
458  for (unsigned idx = 0;
459  idx <
460  std::min(static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
461  std::thread::hardware_concurrency());
462  ++idx) {
463  work_threads.emplace_back(
464  [&](int i) {
465  while (true) {
466  std::shared_ptr<arrow::RecordBatch> batch{nullptr};
467  auto ret = queue.Get(batch);
468  if (!ret) {
469  break;
470  }
471  if (!batch) {
472  LOG(FATAL) << "get nullptr batch";
473  }
474  auto columns = batch->columns();
475  CHECK(primary_key_ind < columns.size());
476  auto primary_key_column = columns[primary_key_ind];
477  auto other_columns_array = columns;
478  other_columns_array.erase(other_columns_array.begin() +
479  primary_key_ind);
480 
481  addVertexBatchFromArray(v_label_id, indexer, primary_key_column,
482  other_columns_array);
483  }
484  },
485  idx);
486  }
487  for (auto& t : work_threads) {
488  t.join();
489  }
490  work_threads.clear();
491  VLOG(10) << "Finish parsing vertex file:" << v_file << " for label "
492  << v_label_name;
493  }
494 
495  VLOG(10) << "Finish parsing vertex file:" << v_files.size() << " for label "
496  << v_label_name;
497  if (indexer.bucket_count() == 0) {
498  indexer._rehash(schema_.get_max_vnum(v_label_name));
499  }
500  basic_fragment_loader_.FinishAddingVertex<KEY_T>(v_label_id, indexer);
501  }
502 #else
503  template <typename KEY_T>
505  label_t v_label_id, const std::vector<std::string>& v_files,
506  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
507  label_t, const std::string&, const LoadingConfig&, int)>
508  supplier_creator) {
509  std::string v_label_name = schema_.get_vertex_label_name(v_label_id);
510  VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label "
511  << v_label_name;
512  auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0];
513  auto primary_key_name = std::get<1>(primary_key);
514  size_t primary_key_ind = std::get<2>(primary_key);
515  grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
516  queue.SetLimit(1024);
517  PTIndexerBuilder<KEY_T, vid_t> indexer_builder;
518  std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> batchs(
519  std::thread::hardware_concurrency());
520  std::vector<std::thread> work_threads;
521  for (auto& v_file : v_files) {
522  VLOG(10) << "Parsing vertex file:" << v_file << " for label "
523  << v_label_name;
524  auto record_batch_supplier_vec =
525  supplier_creator(v_label_id, v_file, loading_config_,
526  std::thread::hardware_concurrency());
527  queue.SetProducerNum(record_batch_supplier_vec.size());
528  for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
529  work_threads.emplace_back(
530  [&](int i) {
531  auto& record_batch_supplier = record_batch_supplier_vec[i];
532  bool first_batch = true;
533  while (true) {
534  auto batch = record_batch_supplier->GetNextBatch();
535  if (!batch) {
536  queue.DecProducerNum();
537  break;
538  }
539  if (first_batch) {
540  auto header = batch->schema()->field_names();
541  auto schema_column_names =
543  CHECK(schema_column_names.size() + 1 == header.size())
544  << "File header of size: " << header.size()
545  << " does not match schema column size: "
546  << schema_column_names.size() + 1;
547  first_batch = false;
548  }
549  queue.Put(batch);
550  }
551  },
552  idx);
553  }
554 
555  for (unsigned idx = 0; idx < std::thread::hardware_concurrency(); ++idx) {
556  work_threads.emplace_back(
557  [&](int i) {
558  while (true) {
559  std::shared_ptr<arrow::RecordBatch> batch{nullptr};
560  auto ret = queue.Get(batch);
561  if (!ret) {
562  break;
563  }
564  if (!batch) {
565  LOG(FATAL) << "get nullptr batch";
566  }
567  batchs[i].emplace_back(batch);
568  auto columns = batch->columns();
569  CHECK(primary_key_ind < columns.size());
570  auto primary_key_column = columns[primary_key_ind];
571  {
572  std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
573  _add_vertex<KEY_T>()(primary_key_column, indexer_builder);
574  }
575  }
576  },
577  idx);
578  }
579  for (auto& t : work_threads) {
580  t.join();
581  }
582  work_threads.clear();
583 
584  VLOG(10) << "Finish parsing vertex file:" << v_file << " for label "
585  << v_label_name;
586  }
587  basic_fragment_loader_.FinishAddingVertex(v_label_id, indexer_builder);
588  const auto& indexer = basic_fragment_loader_.GetLFIndexer(v_label_id);
589 
590  std::atomic<size_t> cur_batch_id(0);
591  for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
592  work_threads.emplace_back(
593  [&](int idx) {
594  for (size_t id = 0; id < batchs[idx].size(); ++id) {
595  auto batch = batchs[idx][id];
596  auto columns = batch->columns();
597  auto other_columns_array = columns;
598  auto primary_key_column = columns[primary_key_ind];
599  size_t row_num = primary_key_column->length();
600  std::vector<size_t> vids;
601  if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
602  using arrow_array_t =
604  auto casted_array =
605  std::static_pointer_cast<arrow_array_t>(primary_key_column);
606  for (size_t i = 0; i < row_num; ++i) {
607  vids.emplace_back(indexer.get_index(casted_array->Value(i)));
608  }
609  } else {
610  if (primary_key_column->type()->Equals(arrow::utf8())) {
611  auto casted_array =
612  std::static_pointer_cast<arrow::StringArray>(
613  primary_key_column);
614  for (size_t i = 0; i < row_num; ++i) {
615  auto str = casted_array->GetView(i);
616  std::string_view str_view(str.data(), str.size());
617  vids.emplace_back(indexer.get_index(str_view));
618  }
619  } else if (primary_key_column->type()->Equals(
620  arrow::large_utf8())) {
621  auto casted_array =
622  std::static_pointer_cast<arrow::LargeStringArray>(
623  primary_key_column);
624  for (size_t i = 0; i < row_num; ++i) {
625  auto str = casted_array->GetView(i);
626  std::string_view str_view(str.data(), str.size());
627  vids.emplace_back(indexer.get_index(str_view));
628  }
629  }
630  }
631  other_columns_array.erase(other_columns_array.begin() +
632  primary_key_ind);
633 
634  for (size_t j = 0; j < other_columns_array.size(); ++j) {
635  auto array = other_columns_array[j];
636  auto chunked_array =
637  std::make_shared<arrow::ChunkedArray>(array);
640  .column_ptrs()[j],
641  chunked_array, vids);
642  }
643  }
644  },
645  i);
646  }
647  for (auto& t : work_threads) {
648  t.join();
649  }
650 
651  auto& v_data = basic_fragment_loader_.GetVertexTable(v_label_id);
652  auto label_name = schema_.get_vertex_label_name(v_label_id);
653 
654  v_data.resize(indexer.size());
655  v_data.dump(vertex_table_prefix(label_name),
657 
658  VLOG(10) << "Finish parsing vertex file:" << v_files.size() << " for label "
659  << v_label_name;
660  }
661 #endif
662 
663  template <typename SRC_PK_T, typename EDATA_T, typename VECTOR_T>
664  void _append_edges(std::shared_ptr<arrow::Array> src_col,
665  std::shared_ptr<arrow::Array> dst_col,
666  const IndexerType& src_indexer,
667  const IndexerType& dst_indexer,
668  std::shared_ptr<arrow::Array>& property_cols,
669  const PropertyType& edge_property, VECTOR_T& parsed_edges,
670  std::vector<std::atomic<int32_t>>& ie_degree,
671  std::vector<std::atomic<int32_t>>& oe_degree,
672  size_t offset) {
673  auto dst_col_type = dst_col->type();
674  if (dst_col_type->Equals(arrow::int64())) {
675  append_edges<SRC_PK_T, int64_t, EDATA_T>(
676  src_col, dst_col, src_indexer, dst_indexer, property_cols,
677  edge_property, parsed_edges, ie_degree, oe_degree, offset);
678  } else if (dst_col_type->Equals(arrow::uint64())) {
679  append_edges<SRC_PK_T, uint64_t, EDATA_T>(
680  src_col, dst_col, src_indexer, dst_indexer, property_cols,
681  edge_property, parsed_edges, ie_degree, oe_degree, offset);
682  } else if (dst_col_type->Equals(arrow::int32())) {
683  append_edges<SRC_PK_T, int32_t, EDATA_T>(
684  src_col, dst_col, src_indexer, dst_indexer, property_cols,
685  edge_property, parsed_edges, ie_degree, oe_degree, offset);
686  } else if (dst_col_type->Equals(arrow::uint32())) {
687  append_edges<SRC_PK_T, uint32_t, EDATA_T>(
688  src_col, dst_col, src_indexer, dst_indexer, property_cols,
689  edge_property, parsed_edges, ie_degree, oe_degree, offset);
690  } else {
691  // must be string
692  append_edges<SRC_PK_T, std::string_view, EDATA_T>(
693  src_col, dst_col, src_indexer, dst_indexer, property_cols,
694  edge_property, parsed_edges, ie_degree, oe_degree, offset);
695  }
696  }
697  template <typename EDATA_T>
699  label_t src_label_id, label_t dst_label_id, label_t e_label_id,
700  const std::vector<std::string>& e_files,
701  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
702  label_t, label_t, label_t, const std::string&, const LoadingConfig&,
703  int)>
704  supplier_creator) {
705  if constexpr (std::is_same_v<EDATA_T, RecordView>) {
706  if (use_mmap_vector_) {
708  EDATA_T, std::vector<std::tuple<vid_t, vid_t, size_t>>>(
709  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
710  } else {
712  EDATA_T, std::vector<std::tuple<vid_t, vid_t, size_t>>>(
713  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
714  }
715  } else {
716  if (use_mmap_vector_) {
719  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
720  } else {
722  EDATA_T, std::vector<std::tuple<vid_t, vid_t, EDATA_T>>>(
723  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
724  }
725  }
726  }
727 
728  template <typename EDATA_T, typename VECTOR_T>
730  label_t src_label_id, label_t dst_label_id, label_t e_label_id,
731  const std::vector<std::string>& e_files,
732  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
733  label_t, label_t, label_t, const std::string&, const LoadingConfig&,
734  int)>
735  supplier_creator) {
736  auto src_label_name = schema_.get_vertex_label_name(src_label_id);
737  auto dst_label_name = schema_.get_vertex_label_name(dst_label_id);
738  auto edge_label_name = schema_.get_edge_label_name(e_label_id);
739  auto edge_column_mappings = loading_config_.GetEdgeColumnMappings(
740  src_label_id, dst_label_id, e_label_id);
741  auto src_dst_col_pair = loading_config_.GetEdgeSrcDstCol(
742  src_label_id, dst_label_id, e_label_id);
743  if (src_dst_col_pair.first.size() != 1 ||
744  src_dst_col_pair.second.size() != 1) {
745  LOG(FATAL) << "We currently only support one src primary key and one "
746  "dst primary key";
747  }
748  size_t src_col_ind = src_dst_col_pair.first[0].second;
749  size_t dst_col_ind = src_dst_col_pair.second[0].second;
750  CHECK(src_col_ind != dst_col_ind);
751 
752  check_edge_invariant(schema_, edge_column_mappings, src_col_ind,
753  dst_col_ind, src_label_id, dst_label_id, e_label_id);
754 
755  const auto& src_indexer = basic_fragment_loader_.GetLFIndexer(src_label_id);
756  const auto& dst_indexer = basic_fragment_loader_.GetLFIndexer(dst_label_id);
757  std::vector<VECTOR_T> parsed_edges_vec(std::thread::hardware_concurrency());
758  if constexpr (std::is_same_v<
759  VECTOR_T,
760  mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>> ||
761  std::is_same_v<
762  VECTOR_T,
763  mmap_vector<std::tuple<vid_t, vid_t, size_t>>>) {
764  const auto& work_dir = basic_fragment_loader_.work_dir();
765  for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
766  parsed_edges_vec[i].open(runtime_dir(work_dir) + "/" + src_label_name +
767  "_" + dst_label_name + "_" + edge_label_name +
768  "_" + std::to_string(i) + ".tmp");
769  parsed_edges_vec[i].reserve(4096);
770  }
771  }
772  std::vector<std::atomic<int32_t>> ie_degree(dst_indexer.size()),
773  oe_degree(src_indexer.size());
774  for (size_t idx = 0; idx < ie_degree.size(); ++idx) {
775  ie_degree[idx].store(0);
776  }
777  for (size_t idx = 0; idx < oe_degree.size(); ++idx) {
778  oe_degree[idx].store(0);
779  }
780  VLOG(10) << "src indexer size: " << src_indexer.size()
781  << " dst indexer size: " << dst_indexer.size();
782 
783  grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
784  queue.SetLimit(1024);
785  std::vector<std::thread> work_threads;
786 
787  std::vector<std::vector<std::shared_ptr<arrow::Array>>> string_columns(
788  std::thread::hardware_concurrency());
789 
790  if constexpr (std::is_same<EDATA_T, RecordView>::value) {
791  basic_fragment_loader_.init_edge_table(src_label_id, dst_label_id,
792  e_label_id);
793  }
794 
795  // use a dummy vector to store the string columns, to avoid the
796  // strings being released as record batch is released.
797  std::vector<std::shared_ptr<arrow::Array>> string_cols;
798  std::atomic<size_t> offset(0);
799  std::shared_mutex rw_mutex;
800  for (auto filename : e_files) {
801  auto record_batch_supplier_vec =
802  supplier_creator(src_label_id, dst_label_id, e_label_id, filename,
803  loading_config_, parsed_edges_vec.size());
804 
805  queue.SetProducerNum(record_batch_supplier_vec.size());
806 
807  for (size_t i = 0; i < record_batch_supplier_vec.size(); ++i) {
808  work_threads.emplace_back(
809  [&](int idx) {
810  auto& string_column = string_columns[idx];
811  bool first_batch = true;
812  auto& record_batch_supplier = record_batch_supplier_vec[idx];
813  while (true) {
814  auto record_batch = record_batch_supplier->GetNextBatch();
815  if (!record_batch) {
816  queue.DecProducerNum();
817  break;
818  }
819  if (first_batch) {
820  auto header = record_batch->schema()->field_names();
821  auto schema_column_names = schema_.get_edge_property_names(
822  src_label_id, dst_label_id, e_label_id);
823  auto schema_column_types = schema_.get_edge_properties(
824  src_label_id, dst_label_id, e_label_id);
825  CHECK(schema_column_names.size() + 2 == header.size())
826  << "schema size: " << schema_column_names.size()
827  << " neq header size: " << header.size();
828  first_batch = false;
829  }
830  for (auto i = 0; i < record_batch->num_columns(); ++i) {
831  if (record_batch->column(i)->type()->Equals(arrow::utf8()) ||
832  record_batch->column(i)->type()->Equals(
833  arrow::large_utf8())) {
834  string_column.emplace_back(record_batch->column(i));
835  }
836  }
837 
838  queue.Put(record_batch);
839  }
840  },
841  i);
842  }
843  for (size_t i = 0;
844  i <
845  std::min(static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
846  std::thread::hardware_concurrency());
847  ++i) {
848  work_threads.emplace_back(
849  [&](int idx) {
850  // copy the table to csr.
851  auto& parsed_edges = parsed_edges_vec[idx];
852  while (true) {
853  std::shared_ptr<arrow::RecordBatch> record_batch{nullptr};
854  auto ret = queue.Get(record_batch);
855  if (!ret) {
856  break;
857  }
858  if (!record_batch) {
859  LOG(FATAL) << "get nullptr batch";
860  }
861  auto columns = record_batch->columns();
862  // We assume the src_col and dst_col will always be put
863  // at front.
864  CHECK(columns.size() >= 2);
865  auto src_col = columns[0];
866  auto dst_col = columns[1];
867  auto src_col_type = src_col->type();
868  auto dst_col_type = dst_col->type();
869  CHECK(check_primary_key_type(src_col_type))
870  << "unsupported src_col type: " << src_col_type->ToString();
871  CHECK(check_primary_key_type(dst_col_type))
872  << "unsupported dst_col type: " << dst_col_type->ToString();
873 
874  std::vector<std::shared_ptr<arrow::Array>> property_cols;
875  for (size_t i = 2; i < columns.size(); ++i) {
876  property_cols.emplace_back(columns[i]);
877  }
878  size_t offset_i = 0;
879  if constexpr (std::is_same<EDATA_T, RecordView>::value) {
880  auto casted_csr = dynamic_cast<DualCsr<RecordView>*>(
881  basic_fragment_loader_.get_csr(src_label_id, dst_label_id,
882  e_label_id));
883  CHECK(casted_csr != NULL);
884  auto table = casted_csr->GetTable();
885  CHECK(table.col_num() == property_cols.size());
886  offset_i = offset.fetch_add(src_col->length());
887  std::vector<size_t> offsets;
888  for (size_t _i = 0;
889  _i < static_cast<size_t>(src_col->length()); ++_i) {
890  offsets.emplace_back(offset_i + _i);
891  }
892  size_t row_num = std::max(table.row_num(), 1ul);
893 
894  while (row_num < offset_i + src_col->length()) {
895  row_num *= 2;
896  }
897  if (row_num > table.row_num()) {
898  std::unique_lock<std::shared_mutex> lock(rw_mutex);
899  if (row_num > table.row_num()) {
900  table.resize(row_num);
901  }
902  }
903 
904  {
905  std::shared_lock<std::shared_mutex> lock(rw_mutex);
906  for (size_t i = 0; i < table.col_num(); ++i) {
907  auto col = table.get_column_by_id(i);
908  auto chunked_array =
909  std::make_shared<arrow::ChunkedArray>(
910  property_cols[i]);
911  set_properties_column(col.get(), chunked_array, offsets);
912  }
913  }
914  }
915  auto edge_property = schema_.get_edge_property(
916  src_label_id, dst_label_id, e_label_id);
917  // add edges to vector
918  CHECK(src_col->length() == dst_col->length());
919  if (src_col_type->Equals(arrow::int64())) {
920  _append_edges<int64_t, EDATA_T, VECTOR_T>(
921  src_col, dst_col, src_indexer, dst_indexer,
922  property_cols[0], edge_property, parsed_edges, ie_degree,
923  oe_degree, offset_i);
924  } else if (src_col_type->Equals(arrow::uint64())) {
925  _append_edges<uint64_t, EDATA_T, VECTOR_T>(
926  src_col, dst_col, src_indexer, dst_indexer,
927  property_cols[0], edge_property, parsed_edges, ie_degree,
928  oe_degree, offset_i);
929  } else if (src_col_type->Equals(arrow::int32())) {
930  _append_edges<int32_t, EDATA_T, VECTOR_T>(
931  src_col, dst_col, src_indexer, dst_indexer,
932  property_cols[0], edge_property, parsed_edges, ie_degree,
933  oe_degree, offset_i);
934  } else if (src_col_type->Equals(arrow::uint32())) {
935  _append_edges<uint32_t, EDATA_T, VECTOR_T>(
936  src_col, dst_col, src_indexer, dst_indexer,
937  property_cols[0], edge_property, parsed_edges, ie_degree,
938  oe_degree, offset_i);
939  } else {
940  // must be string
941  _append_edges<std::string_view, EDATA_T, VECTOR_T>(
942  src_col, dst_col, src_indexer, dst_indexer,
943  property_cols[0], edge_property, parsed_edges, ie_degree,
944  oe_degree, offset_i);
945  }
946  }
947  },
948  i);
949  }
950 
951  for (auto& t : work_threads) {
952  t.join();
953  }
954  VLOG(10) << "Finish parsing edge file:" << filename << " for label "
955  << src_label_name << " -> " << dst_label_name << " -> "
956  << edge_label_name;
957  }
958  VLOG(10) << "Finish parsing edge file:" << e_files.size() << " for label "
959  << src_label_name << " -> " << dst_label_name << " -> "
960  << edge_label_name;
961  std::vector<int32_t> ie_deg(ie_degree.size());
962  std::vector<int32_t> oe_deg(oe_degree.size());
963  for (size_t idx = 0; idx < ie_deg.size(); ++idx) {
964  ie_deg[idx] = ie_degree[idx];
965  }
966  for (size_t idx = 0; idx < oe_deg.size(); ++idx) {
967  oe_deg[idx] = oe_degree[idx];
968  }
969 
970  basic_fragment_loader_.PutEdges<EDATA_T, VECTOR_T>(
971  src_label_id, dst_label_id, e_label_id, parsed_edges_vec, ie_deg,
972  oe_deg, build_csr_in_mem_);
973 
974  string_columns.clear();
975  size_t sum = 0;
976  for (auto& edges : parsed_edges_vec) {
977  sum += edges.size();
978  if constexpr (
979  std::is_same<VECTOR_T,
980  mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>>::value ||
981  std::is_same<VECTOR_T,
982  mmap_vector<std::tuple<vid_t, vid_t, size_t>>>::value) {
983  edges.unlink();
984  }
985  }
986 
987  VLOG(10) << "Finish putting: " << sum << " edges";
988  }
989 
991  const Schema& schema_;
993  int32_t thread_num_;
994  std::mutex* mtxs_;
998 };
999 
1000 } // namespace gs
1001 
1002 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
gs::LoadingConfig::GetEdgeColumnMappings
const std::vector< std::tuple< size_t, std::string, std::string > > & GetEdgeColumnMappings(label_t src_label_id, label_t dst_label_id, label_t edge_label_id) const
Definition: loading_config.cc:906
gs::_add_vertex
Definition: abstract_arrow_fragment_loader.h:97
gs::mmap_vector
Definition: mmap_vector.h:22
gs::IdIndexer::bucket_count
size_t bucket_count() const
Definition: id_indexer.h:718
gs::set_properties_column
void set_properties_column(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.cc:86
gs::AbstractArrowFragmentLoader
Definition: abstract_arrow_fragment_loader.h:338
gs::AbstractArrowFragmentLoader::edge_label_num_
size_t edge_label_num_
Definition: abstract_arrow_fragment_loader.h:992
gs::check_edge_invariant
void check_edge_invariant(const Schema &schema, const std::vector< std::tuple< size_t, std::string, std::string >> &column_mappings, size_t src_col_ind, size_t dst_col_ind, label_t src_label_i, label_t dst_label_i, label_t edge_label_i)
Definition: abstract_arrow_fragment_loader.cc:174
gs::AbstractArrowFragmentLoader::mtxs_
std::mutex * mtxs_
Definition: abstract_arrow_fragment_loader.h:994
gs::append_edges
static void append_edges(std::shared_ptr< arrow::Array > src_col, std::shared_ptr< arrow::Array > dst_col, const IndexerType &src_indexer, const IndexerType &dst_indexer, std::shared_ptr< arrow::Array > &edata_cols, const PropertyType &edge_prop, VECTOR_T &parsed_edges, std::vector< std::atomic< int32_t >> &ie_degree, std::vector< std::atomic< int32_t >> &oe_degree, size_t offset=0)
Definition: abstract_arrow_fragment_loader.h:252
gs::IFragmentLoader
Definition: i_fragment_loader.h:24
i_fragment_loader.h
gs::AbstractArrowFragmentLoader::~AbstractArrowFragmentLoader
~AbstractArrowFragmentLoader()
Definition: abstract_arrow_fragment_loader.h:353
gs::IRecordBatchSupplier
Definition: abstract_arrow_fragment_loader.h:39
gs::Any::From
static Any From(const T &value)
Definition: types.h:681
gs::_add_vertex::operator()
void operator()(const std::shared_ptr< arrow::Array > &col, IdIndexer< KEY_T, vid_t > &indexer, std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.h:99
gs::vertex_table_prefix
std::string vertex_table_prefix(const std::string &label)
Definition: file_names.h:256
gs::Schema::get_edge_property
PropertyType get_edge_property(label_t src, label_t dst, label_t edge) const
Definition: schema.cc:252
gs::LFIndexer::get_index
INDEX_T get_index(const Any &oid) const
Definition: id_indexer.h:324
gs::vid_t
uint32_t vid_t
Definition: types.h:31
gs::AbstractArrowFragmentLoader::AddVerticesRecordBatch
void AddVerticesRecordBatch(label_t v_label_id, const std::vector< std::string > &input_paths, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.cc:200
gs::check_primary_key_type
bool check_primary_key_type(std::shared_ptr< arrow::DataType > data_type)
Definition: abstract_arrow_fragment_loader.cc:30
gs::IdIndexer::_rehash
void _rehash(size_t num)
Definition: id_indexer.h:819
std::to_string
std::string to_string(const gs::flex::interactive::Code &status)
Definition: result.h:166
gs::_append
void _append(bool is_dst, size_t cur_ind, std::shared_ptr< arrow::Array > col, const IndexerType &indexer, VECTOR_T &parsed_edges, std::vector< std::atomic< int32_t >> &degree)
Definition: abstract_arrow_fragment_loader.h:196
gs::printDiskRemaining
void printDiskRemaining(const std::string &path)
Definition: abstract_arrow_fragment_loader.cc:22
gs::IRecordBatchSupplier::~IRecordBatchSupplier
virtual ~IRecordBatchSupplier()=default
gs
Definition: adj_list.h:23
gs::AbstractArrowFragmentLoader::addVertexRecordBatchImpl
void addVertexRecordBatchImpl(label_t v_label_id, const std::vector< std::string > &v_files, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.h:407
gs::IRecordBatchSupplier::GetNextBatch
virtual std::shared_ptr< arrow::RecordBatch > GetNextBatch()=0
gs::PropertyType::kUInt64
static const PropertyType kUInt64
Definition: types.h:141
gs::AbstractArrowFragmentLoader::addEdgesRecordBatchImplHelper
void addEdgesRecordBatchImplHelper(label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::vector< std::string > &e_files, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, label_t, label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.h:729
gs::set_column_from_timestamp_array
void set_column_from_timestamp_array(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.cc:122
mmap_vector.h
gs::set_column
void set_column(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.h:49
gs::BasicFragmentLoader::GetLFIndexer
const IndexerType & GetLFIndexer(label_t v_label) const
Definition: basic_fragment_loader.cc:167
mutable_property_fragment.h
gs::BasicFragmentLoader
Definition: basic_fragment_loader.h:53
gs::set_column_from_string_array
void set_column_from_string_array(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.cc:41
gs::Schema::get_edge_property_names
const std::vector< std::string > & get_edge_property_names(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:258
gs::AbstractArrowFragmentLoader::schema_
const Schema & schema_
Definition: abstract_arrow_fragment_loader.h:991
gs::Schema::get_edge_properties
const std::vector< PropertyType > & get_edge_properties(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:201
gs::PropertyType::kStringView
static const PropertyType kStringView
Definition: types.h:145
gs::runtime_dir
std::string runtime_dir(const std::string &work_dir)
Definition: file_names.h:200
gs::BasicFragmentLoader::GetVertexTable
Table & GetVertexTable(size_t ind)
Definition: basic_fragment_loader.h:285
gs::AbstractArrowFragmentLoader::loading_config_
const LoadingConfig & loading_config_
Definition: abstract_arrow_fragment_loader.h:990
gs::BasicFragmentLoader::PutEdges
void PutEdges(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, const std::vector< VECTOR_T > &edges_vec, const std::vector< int32_t > &ie_degree, const std::vector< int32_t > &oe_degree, bool build_csr_in_mem)
Definition: basic_fragment_loader.h:165
gs::TypeConverter
Definition: arrow_utils.h:242
gs::LoadingConfig::GetEdgeSrcDstCol
const std::pair< std::vector< std::pair< std::string, size_t > >, std::vector< std::pair< std::string, size_t > > > & GetEdgeSrcDstCol(label_t src_label_id, label_t dst_label_id, label_t edge_label_id) const
Definition: loading_config.cc:917
gs::Schema
Definition: schema.h:29
loading_config.h
gs::AbstractArrowFragmentLoader::_append_edges
void _append_edges(std::shared_ptr< arrow::Array > src_col, std::shared_ptr< arrow::Array > dst_col, const IndexerType &src_indexer, const IndexerType &dst_indexer, std::shared_ptr< arrow::Array > &property_cols, const PropertyType &edge_property, VECTOR_T &parsed_edges, std::vector< std::atomic< int32_t >> &ie_degree, std::vector< std::atomic< int32_t >> &oe_degree, size_t offset)
Definition: abstract_arrow_fragment_loader.h:664
gs::PropertyType::kUInt32
static const PropertyType kUInt32
Definition: types.h:138
basic_fragment_loader.h
gs::Schema::vertex_label_num
label_t vertex_label_num() const
Definition: schema.cc:100
gs::AbstractArrowFragmentLoader::basic_fragment_loader_
BasicFragmentLoader basic_fragment_loader_
Definition: abstract_arrow_fragment_loader.h:997
gs::ColumnBase::set_any
virtual void set_any(size_t index, const Any &value)=0
gs::AbstractArrowFragmentLoader::use_mmap_vector_
bool use_mmap_vector_
Definition: abstract_arrow_fragment_loader.h:996
gs::BasicFragmentLoader::FinishAddingVertex
void FinishAddingVertex(label_t v_label, const IdIndexer< KEY_T, vid_t > &indexer)
Definition: basic_fragment_loader.h:72
gs::Table::column_ptrs
std::vector< ColumnBase * > & column_ptrs()
Definition: table.cc:227
gs::Schema::edge_label_num
label_t edge_label_num() const
Definition: schema.cc:104
gs::IdIndexer
Definition: id_indexer.h:181
gs::AbstractArrowFragmentLoader::build_csr_in_mem_
bool build_csr_in_mem_
Definition: abstract_arrow_fragment_loader.h:995
gs::Schema::get_vertex_label_name
std::string get_vertex_label_name(label_t index) const
Definition: schema.cc:359
gs::ColumnBase::size
virtual size_t size() const =0
gs::set_column_from_timestamp_array_to_day
void set_column_from_timestamp_array_to_day(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.cc:148
gs::PropertyType::kInt64
static const PropertyType kInt64
Definition: types.h:140
gs::AbstractArrowFragmentLoader::vertex_label_num_
size_t vertex_label_num_
Definition: abstract_arrow_fragment_loader.h:992
gs::LFIndexer::get_type
PropertyType get_type() const
Definition: id_indexer.h:305
gs::IdIndexer::add
bool add(const Any &oid, INDEX_T &lid) override
Definition: id_indexer.h:559
gs::DualCsr< RecordView >
Definition: dual_csr.h:418
gs::AbstractArrowFragmentLoader::addVertexBatchFromArray
void addVertexBatchFromArray(label_t v_label_id, IdIndexer< KEY_T, vid_t > &indexer, std::shared_ptr< arrow::Array > &primary_key_col, const std::vector< std::shared_ptr< arrow::Array >> &property_cols)
Definition: abstract_arrow_fragment_loader.h:378
gs::snapshot_dir
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
gs::Schema::get_vertex_property_names
const std::vector< std::string > & get_vertex_property_names(const std::string &label) const
Definition: schema.cc:139
gs::BasicFragmentLoader::get_csr
DualCsrBase * get_csr(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:187
gs::AnyConverter
Definition: types.h:381
gs::Schema::get_edge_label_name
std::string get_edge_label_name(label_t index) const
Definition: schema.cc:367
gs::Schema::get_max_vnum
size_t get_max_vnum(const std::string &label) const
Definition: schema.cc:181
gs::PropertyType
Definition: types.h:95
gs::LoadingConfig
Definition: loading_config.h:89
gs::BasicFragmentLoader::work_dir
const std::string & work_dir() const
Definition: basic_fragment_loader.h:294
gs::BasicFragmentLoader::init_edge_table
void init_edge_table(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:195
gs::AbstractArrowFragmentLoader::thread_num_
int32_t thread_num_
Definition: abstract_arrow_fragment_loader.h:993
gs::label_t
uint8_t label_t
Definition: types.h:32
gs::LFIndexer
Definition: id_indexer.h:184
gs::PropertyType::kInt32
static const PropertyType kInt32
Definition: types.h:137
gs::ColumnBase
Definition: column.h:29
gs::AbstractArrowFragmentLoader::addEdgesRecordBatchImpl
void addEdgesRecordBatchImpl(label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::vector< std::string > &e_files, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, label_t, label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.h:698
gs::Schema::get_vertex_primary_key
const std::vector< std::tuple< PropertyType, std::string, size_t > > & get_vertex_primary_key(label_t index) const
Definition: schema.cc:376
gs::AbstractArrowFragmentLoader::AddEdgesRecordBatch
void AddEdgesRecordBatch(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, const std::vector< std::string > &input_paths, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, label_t, label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.cc:241
gs::AbstractArrowFragmentLoader::AbstractArrowFragmentLoader
AbstractArrowFragmentLoader(const std::string &work_dir, const Schema &schema, const LoadingConfig &loading_config)
Definition: abstract_arrow_fragment_loader.h:340