Flex  0.17.9
abstract_arrow_fragment_loader.h
Go to the documentation of this file.
1 
17 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
18 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
19 
24 #include "flex/utils/mmap_vector.h"
25 #include "grape/utils/concurrent_queue.h"
26 
27 #include <arrow/api.h>
28 #include <arrow/io/api.h>
29 #include <shared_mutex>
30 #include "arrow/util/value_parsing.h"
31 
32 #include "grape/util.h"
33 
34 namespace gs {
35 
36 void printDiskRemaining(const std::string& path);
37 // The interface providing visitor pattern for RecordBatch.
38 
40  public:
41  virtual ~IRecordBatchSupplier() = default;
42  virtual std::shared_ptr<arrow::RecordBatch> GetNextBatch() = 0;
43 };
44 
45 bool check_primary_key_type(std::shared_ptr<arrow::DataType> data_type);
46 
47 // For Primitive types.
48 template <typename COL_T>
49 void set_column(gs::ColumnBase* col, std::shared_ptr<arrow::ChunkedArray> array,
50  const std::vector<size_t>& offset) {
51  using arrow_array_type = typename gs::TypeConverter<COL_T>::ArrowArrayType;
52  auto array_type = array->type();
53  auto arrow_type = gs::TypeConverter<COL_T>::ArrowTypeValue();
54  CHECK(array_type->Equals(arrow_type))
55  << "Inconsistent data type, expect " << arrow_type->ToString()
56  << ", but got " << array_type->ToString();
57  size_t cur_ind = 0;
58  for (auto j = 0; j < array->num_chunks(); ++j) {
59  auto casted = std::static_pointer_cast<arrow_array_type>(array->chunk(j));
60  size_t size = col->size();
61  for (auto k = 0; k < casted->length(); ++k) {
62  if (offset[cur_ind] >= size) {
63  cur_ind++;
64  } else {
65  col->set_any(offset[cur_ind++],
66  std::move(AnyConverter<COL_T>::to_any(casted->Value(k))));
67  }
68  }
69  }
70 }
71 
72 // For String types.
74  std::shared_ptr<arrow::ChunkedArray> array,
75  const std::vector<size_t>& offset);
76 
78  std::shared_ptr<arrow::ChunkedArray> array,
79  const std::vector<size_t>& offset);
80 
82  gs::ColumnBase* col, std::shared_ptr<arrow::ChunkedArray> array,
83  const std::vector<size_t>& offset);
84 
86  std::shared_ptr<arrow::ChunkedArray> array,
87  const std::vector<size_t>& offset);
88 
90  const Schema& schema,
91  const std::vector<std::tuple<size_t, std::string, std::string>>&
92  column_mappings,
93  size_t src_col_ind, size_t dst_col_ind, label_t src_label_i,
94  label_t dst_label_i, label_t edge_label_i);
95 
96 template <typename KEY_T>
97 struct _add_vertex {
98 #ifndef USE_PTHASH
99  void operator()(const std::shared_ptr<arrow::Array>& col,
100  IdIndexer<KEY_T, vid_t>& indexer,
101  std::vector<size_t>& offset) {
102  size_t row_num = col->length();
103  vid_t vid;
104  if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
105  // for non-string value
106  auto expected_type = gs::TypeConverter<KEY_T>::ArrowTypeValue();
107  using arrow_array_t = typename gs::TypeConverter<KEY_T>::ArrowArrayType;
108  if (!col->type()->Equals(expected_type)) {
109  LOG(FATAL) << "Inconsistent data type, expect "
110  << expected_type->ToString() << ", but got "
111  << col->type()->ToString();
112  }
113  auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
114  for (size_t i = 0; i < row_num; ++i) {
115  if (!indexer.add(casted_array->Value(i), vid)) {
116  VLOG(2) << "Duplicate vertex id: " << casted_array->Value(i) << "..";
117  offset.emplace_back(std::numeric_limits<size_t>::max());
118  } else {
119  offset.emplace_back(vid);
120  }
121  }
122  } else {
123  if (col->type()->Equals(arrow::utf8())) {
124  auto casted_array = std::static_pointer_cast<arrow::StringArray>(col);
125  for (size_t i = 0; i < row_num; ++i) {
126  auto str = casted_array->GetView(i);
127  std::string_view str_view(str.data(), str.size());
128  if (!indexer.add(str_view, vid)) {
129  VLOG(2) << "Duplicate vertex id: " << str_view << "..";
130  offset.emplace_back(std::numeric_limits<size_t>::max());
131  } else {
132  offset.emplace_back(vid);
133  }
134  }
135  } else if (col->type()->Equals(arrow::large_utf8())) {
136  auto casted_array =
137  std::static_pointer_cast<arrow::LargeStringArray>(col);
138  for (size_t i = 0; i < row_num; ++i) {
139  auto str = casted_array->GetView(i);
140  std::string_view str_view(str.data(), str.size());
141  if (!indexer.add(str_view, vid)) {
142  VLOG(2) << "Duplicate vertex id: " << str_view << "..";
143  offset.emplace_back(std::numeric_limits<size_t>::max());
144  } else {
145  offset.emplace_back(vid);
146  }
147  }
148  } else {
149  LOG(FATAL) << "Not support type: " << col->type()->ToString();
150  }
151  }
152  }
153 
154 #else
155  void operator()(const std::shared_ptr<arrow::Array>& col,
156  PTIndexerBuilder<KEY_T, vid_t>& indexer) {
157  size_t row_num = col->length();
158  if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
159  // for non-string value
160  auto expected_type = gs::TypeConverter<KEY_T>::ArrowTypeValue();
161  using arrow_array_t = typename gs::TypeConverter<KEY_T>::ArrowArrayType;
162  if (!col->type()->Equals(expected_type)) {
163  LOG(FATAL) << "Inconsistent data type, expect "
164  << expected_type->ToString() << ", but got "
165  << col->type()->ToString();
166  }
167  auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
168  for (size_t i = 0; i < row_num; ++i) {
169  indexer.add_vertex(casted_array->Value(i));
170  }
171  } else {
172  if (col->type()->Equals(arrow::utf8())) {
173  auto casted_array = std::static_pointer_cast<arrow::StringArray>(col);
174  for (size_t i = 0; i < row_num; ++i) {
175  auto str = casted_array->GetView(i);
176  std::string_view str_view(str.data(), str.size());
177  indexer.add_vertex(str_view);
178  }
179  } else if (col->type()->Equals(arrow::large_utf8())) {
180  auto casted_array =
181  std::static_pointer_cast<arrow::LargeStringArray>(col);
182  for (size_t i = 0; i < row_num; ++i) {
183  auto str = casted_array->GetView(i);
184  std::string_view str_view(str.data(), str.size());
185  indexer.add_vertex(str_view);
186  }
187  } else {
188  LOG(FATAL) << "Not support type: " << col->type()->ToString();
189  }
190  }
191  }
192 #endif
193 };
194 
195 template <typename PK_T, typename EDATA_T, typename VECTOR_T>
196 void _append(bool is_dst, size_t cur_ind, std::shared_ptr<arrow::Array> col,
197  const IndexerType& indexer, VECTOR_T& parsed_edges,
198  std::vector<std::atomic<int32_t>>& degree) {
199  static constexpr auto invalid_vid = std::numeric_limits<vid_t>::max();
200  if constexpr (std::is_same_v<PK_T, std::string_view>) {
201  if (col->type()->Equals(arrow::utf8())) {
202  auto casted = std::static_pointer_cast<arrow::StringArray>(col);
203  for (auto j = 0; j < casted->length(); ++j) {
204  auto str = casted->GetView(j);
205  std::string_view str_view(str.data(), str.size());
206  auto vid = indexer.get_index(Any::From(str_view));
207  if (is_dst) {
208  std::get<1>(parsed_edges[cur_ind++]) = vid;
209  } else {
210  std::get<0>(parsed_edges[cur_ind++]) = vid;
211  }
212  if (vid != invalid_vid) {
213  degree[vid]++;
214  }
215  }
216  } else {
217  // must be large utf8
218  auto casted = std::static_pointer_cast<arrow::LargeStringArray>(col);
219  for (auto j = 0; j < casted->length(); ++j) {
220  auto str = casted->GetView(j);
221  std::string_view str_view(str.data(), str.size());
222  auto vid = indexer.get_index(Any::From(str_view));
223  if (is_dst) {
224  std::get<1>(parsed_edges[cur_ind++]) = vid;
225  } else {
226  std::get<0>(parsed_edges[cur_ind++]) = vid;
227  }
228  if (vid != invalid_vid) {
229  degree[vid]++;
230  }
231  }
232  }
233  } else {
234  using arrow_array_type = typename gs::TypeConverter<PK_T>::ArrowArrayType;
235  auto casted = std::static_pointer_cast<arrow_array_type>(col);
236  for (auto j = 0; j < casted->length(); ++j) {
237  auto vid = indexer.get_index(Any::From(casted->Value(j)));
238  if (is_dst) {
239  std::get<1>(parsed_edges[cur_ind++]) = vid;
240  } else {
241  std::get<0>(parsed_edges[cur_ind++]) = vid;
242  }
243  if (vid != invalid_vid) {
244  degree[vid]++;
245  }
246  }
247  }
248 }
249 
250 template <typename SRC_PK_T, typename DST_PK_T, typename EDATA_T,
251  typename VECTOR_T>
252 static void append_edges(std::shared_ptr<arrow::Array> src_col,
253  std::shared_ptr<arrow::Array> dst_col,
254  const IndexerType& src_indexer,
255  const IndexerType& dst_indexer,
256  std::shared_ptr<arrow::Array>& edata_cols,
257  const PropertyType& edge_prop, VECTOR_T& parsed_edges,
258  std::vector<std::atomic<int32_t>>& ie_degree,
259  std::vector<std::atomic<int32_t>>& oe_degree,
260  size_t offset = 0) {
261  CHECK(src_col->length() == dst_col->length());
262  auto indexer_check_lambda = [](const IndexerType& cur_indexer,
263  const std::shared_ptr<arrow::Array>& cur_col) {
264  if (cur_indexer.get_type() == PropertyType::kInt64) {
265  CHECK(cur_col->type()->Equals(arrow::int64()));
266  } else if (cur_indexer.get_type() == PropertyType::kStringView) {
267  CHECK(cur_col->type()->Equals(arrow::utf8()) ||
268  cur_col->type()->Equals(arrow::large_utf8()));
269  } else if (cur_indexer.get_type() == PropertyType::kInt32) {
270  CHECK(cur_col->type()->Equals(arrow::int32()));
271  } else if (cur_indexer.get_type() == PropertyType::kUInt32) {
272  CHECK(cur_col->type()->Equals(arrow::uint32()));
273  } else if (cur_indexer.get_type() == PropertyType::kUInt64) {
274  CHECK(cur_col->type()->Equals(arrow::uint64()));
275  }
276  };
277 
278  indexer_check_lambda(src_indexer, src_col);
279  indexer_check_lambda(dst_indexer, dst_col);
280  auto old_size = parsed_edges.size();
281  parsed_edges.resize(old_size + src_col->length());
282  VLOG(10) << "resize parsed_edges from" << old_size << " to "
283  << parsed_edges.size();
284 
285  // if EDATA_T is grape::EmptyType, no need to read columns
286  auto edata_col_thread = std::thread([&]() {
287  if constexpr (std::is_same<EDATA_T, RecordView>::value) {
288  size_t cur_ind = old_size;
289  for (auto j = 0; j < src_col->length(); ++j) {
290  std::get<2>(parsed_edges[cur_ind++]) = offset++;
291  }
292  } else if constexpr (!std::is_same<EDATA_T, grape::EmptyType>::value) {
293  auto edata_col = edata_cols;
294  CHECK(src_col->length() == edata_col->length());
295  size_t cur_ind = old_size;
296  auto type = edata_col->type();
297  if (!type->Equals(TypeConverter<EDATA_T>::ArrowTypeValue())) {
298  LOG(FATAL) << "Inconsistent data type, expect "
300  << ", but got " << type->ToString();
301  }
302 
303  using arrow_array_type =
305  // cast chunk to EDATA_T array
306  auto data = std::static_pointer_cast<arrow_array_type>(edata_col);
307  for (auto j = 0; j < edata_col->length(); ++j) {
308  if constexpr (std::is_same<arrow_array_type,
309  arrow::StringArray>::value ||
310  std::is_same<arrow_array_type,
311  arrow::LargeStringArray>::value) {
312  auto str = data->GetView(j);
313  std::string_view str_view(str.data(), str.size());
314  std::get<2>(parsed_edges[cur_ind++]) = str_view;
315  } else {
316  std::get<2>(parsed_edges[cur_ind++]) = data->Value(j);
317  }
318  }
319  VLOG(10) << "Finish inserting: " << src_col->length() << " edges";
320  }
321  });
322  size_t cur_ind = old_size;
323  auto src_col_thread = std::thread([&]() {
324  _append<SRC_PK_T, EDATA_T>(false, cur_ind, src_col, src_indexer,
325  parsed_edges, oe_degree);
326  });
327  auto dst_col_thread = std::thread([&]() {
328  _append<DST_PK_T, EDATA_T>(true, cur_ind, dst_col, dst_indexer,
329  parsed_edges, ie_degree);
330  });
331  src_col_thread.join();
332  dst_col_thread.join();
333  edata_col_thread.join();
334 }
335 
336 // A AbstractArrowFragmentLoader with can load fragment from arrow::table.
337 // Cannot be used directly, should be inherited.
339  public:
340  AbstractArrowFragmentLoader(const std::string& work_dir, const Schema& schema,
341  const LoadingConfig& loading_config)
342  : loading_config_(loading_config),
343  schema_(schema),
344  thread_num_(loading_config_.GetParallelism()),
345  build_csr_in_mem_(loading_config_.GetBuildCsrInMem()),
346  use_mmap_vector_(loading_config_.GetUseMmapVector()),
347  basic_fragment_loader_(schema_, work_dir) {
350  mtxs_ = new std::mutex[vertex_label_num_];
351  }
352 
354  if (mtxs_) {
355  delete[] mtxs_;
356  }
357  }
358 
360  label_t v_label_id, const std::vector<std::string>& input_paths,
361  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
362  label_t, const std::string&, const LoadingConfig&, int)>
363  supplier_creator);
364 
365  // Add edges in record batch to output_parsed_edges, output_ie_degrees and
366  // output_oe_degrees.
367 
368  void AddEdgesRecordBatch(
369  label_t src_label_id, label_t dst_label_id, label_t edge_label_id,
370  const std::vector<std::string>& input_paths,
371  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
372  label_t, label_t, label_t, const std::string&, const LoadingConfig&,
373  int)>
374  supplier_creator);
375 
376  protected:
377  template <typename KEY_T>
379  label_t v_label_id, IdIndexer<KEY_T, vid_t>& indexer,
380  std::shared_ptr<arrow::Array>& primary_key_col,
381  const std::vector<std::shared_ptr<arrow::Array>>& property_cols,
382  std::shared_mutex& rw_mutex) {
383  size_t row_num = primary_key_col->length();
384  auto col_num = property_cols.size();
385  for (size_t i = 0; i < col_num; ++i) {
386  CHECK_EQ(property_cols[i]->length(), row_num);
387  }
388 
389  std::vector<size_t> vids;
390  vids.reserve(row_num);
391  {
392  std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
393  _add_vertex<KEY_T>()(primary_key_col, indexer, vids);
394  }
395  {
396  std::shared_lock<std::shared_mutex> lock(rw_mutex);
397  for (size_t j = 0; j < property_cols.size(); ++j) {
398  auto array = property_cols[j];
399  auto chunked_array = std::make_shared<arrow::ChunkedArray>(array);
402  chunked_array, vids);
403  }
404  }
405 
406  VLOG(10) << "Insert rows: " << row_num;
407  }
408 
409 #ifndef USE_PTHASH
410  template <typename KEY_T>
412  label_t v_label_id, const std::vector<std::string>& v_files,
413  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
414  label_t, const std::string&, const LoadingConfig&, int)>
415  supplier_creator) {
416  std::string v_label_name = schema_.get_vertex_label_name(v_label_id);
417  VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label "
418  << v_label_name;
419  auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0];
420  auto primary_key_name = std::get<1>(primary_key);
421  size_t primary_key_ind = std::get<2>(primary_key);
422  IdIndexer<KEY_T, vid_t> indexer;
423 
424  grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
425  queue.SetLimit(1024);
426  std::vector<std::thread> work_threads;
427 
428  for (auto& v_file : v_files) {
429  VLOG(10) << "Parsing vertex file:" << v_file << " for label "
430  << v_label_name;
431  auto record_batch_supplier_vec =
432  supplier_creator(v_label_id, v_file, loading_config_,
433  std::thread::hardware_concurrency());
434  queue.SetProducerNum(record_batch_supplier_vec.size());
435 
436  for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
437  work_threads.emplace_back(
438  [&](int i) {
439  auto& record_batch_supplier = record_batch_supplier_vec[i];
440  bool first_batch = true;
441  while (true) {
442  auto batch = record_batch_supplier->GetNextBatch();
443  if (!batch) {
444  queue.DecProducerNum();
445  break;
446  }
447  if (first_batch) {
448  auto header = batch->schema()->field_names();
449  auto schema_column_names =
451  CHECK(schema_column_names.size() + 1 == header.size())
452  << "File header of size: " << header.size()
453  << " does not match schema column size: "
454  << schema_column_names.size() + 1;
455  first_batch = false;
456  }
457  queue.Put(batch);
458  }
459  },
460  idx);
461  }
462 
463  std::atomic<size_t> offset(0);
464  std::shared_mutex rw_mutex;
465  for (unsigned idx = 0;
466  idx <
467  std::min(static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
468  std::thread::hardware_concurrency());
469  ++idx) {
470  work_threads.emplace_back(
471  [&](int i) {
472  // It is possible that the inserted data will exceed the size of
473  // the table, so we need to resize the table.
474  // basic_fragment_loader_.GetVertexTable(v_label_id).resize(vids.size());
475  auto& vtable = basic_fragment_loader_.GetVertexTable(v_label_id);
476  while (true) {
477  std::shared_ptr<arrow::RecordBatch> batch{nullptr};
478  auto ret = queue.Get(batch);
479  if (!ret) {
480  break;
481  }
482  if (!batch) {
483  LOG(FATAL) << "get nullptr batch";
484  }
485  auto columns = batch->columns();
486  CHECK(primary_key_ind < columns.size());
487  auto primary_key_column = columns[primary_key_ind];
488  auto other_columns_array = columns;
489  other_columns_array.erase(other_columns_array.begin() +
490  primary_key_ind);
491 
492  auto local_offset =
493  offset.fetch_add(primary_key_column->length());
494  size_t cur_row_num = std::max(vtable.row_num(), 1ul);
495  while (cur_row_num <
496  local_offset + primary_key_column->length()) {
497  cur_row_num *= 2;
498  }
499  if (cur_row_num > vtable.row_num()) {
500  std::unique_lock<std::shared_mutex> lock(rw_mutex);
501  if (cur_row_num > vtable.row_num()) {
502  LOG(INFO) << "Resize vertex table from " << vtable.row_num()
503  << " to " << cur_row_num
504  << ", local_offset: " << local_offset;
505  vtable.resize(cur_row_num);
506  }
507  }
508 
509  addVertexBatchFromArray(v_label_id, indexer, primary_key_column,
510  other_columns_array, rw_mutex);
511  }
512  },
513  idx);
514  }
515  for (auto& t : work_threads) {
516  t.join();
517  }
518  work_threads.clear();
519  VLOG(10) << "Finish parsing vertex file:" << v_file << " for label "
520  << v_label_name;
521  }
522 
523  VLOG(10) << "Finish parsing vertex file:" << v_files.size() << " for label "
524  << v_label_name;
525  if (indexer.bucket_count() == 0) {
526  indexer._rehash(schema_.get_max_vnum(v_label_name));
527  }
528  basic_fragment_loader_.FinishAddingVertex<KEY_T>(v_label_id, indexer);
529  }
530 #else
531  template <typename KEY_T>
533  label_t v_label_id, const std::vector<std::string>& v_files,
534  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
535  label_t, const std::string&, const LoadingConfig&, int)>
536  supplier_creator) {
537  std::string v_label_name = schema_.get_vertex_label_name(v_label_id);
538  VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label "
539  << v_label_name;
540  auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0];
541  auto primary_key_name = std::get<1>(primary_key);
542  size_t primary_key_ind = std::get<2>(primary_key);
543  grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
544  queue.SetLimit(1024);
545  PTIndexerBuilder<KEY_T, vid_t> indexer_builder;
546  std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> batchs(
547  std::thread::hardware_concurrency());
548  std::vector<std::thread> work_threads;
549  for (auto& v_file : v_files) {
550  VLOG(10) << "Parsing vertex file:" << v_file << " for label "
551  << v_label_name;
552  auto record_batch_supplier_vec =
553  supplier_creator(v_label_id, v_file, loading_config_,
554  std::thread::hardware_concurrency());
555  queue.SetProducerNum(record_batch_supplier_vec.size());
556  for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
557  work_threads.emplace_back(
558  [&](int i) {
559  auto& record_batch_supplier = record_batch_supplier_vec[i];
560  bool first_batch = true;
561  while (true) {
562  auto batch = record_batch_supplier->GetNextBatch();
563  if (!batch) {
564  queue.DecProducerNum();
565  break;
566  }
567  if (first_batch) {
568  auto header = batch->schema()->field_names();
569  auto schema_column_names =
571  CHECK(schema_column_names.size() + 1 == header.size())
572  << "File header of size: " << header.size()
573  << " does not match schema column size: "
574  << schema_column_names.size() + 1;
575  first_batch = false;
576  }
577  queue.Put(batch);
578  }
579  },
580  idx);
581  }
582 
583  for (unsigned idx = 0; idx < std::thread::hardware_concurrency(); ++idx) {
584  work_threads.emplace_back(
585  [&](int i) {
586  while (true) {
587  std::shared_ptr<arrow::RecordBatch> batch{nullptr};
588  auto ret = queue.Get(batch);
589  if (!ret) {
590  break;
591  }
592  if (!batch) {
593  LOG(FATAL) << "get nullptr batch";
594  }
595  batchs[i].emplace_back(batch);
596  auto columns = batch->columns();
597  CHECK(primary_key_ind < columns.size());
598  auto primary_key_column = columns[primary_key_ind];
599  {
600  std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
601  _add_vertex<KEY_T>()(primary_key_column, indexer_builder);
602  }
603  }
604  },
605  idx);
606  }
607  for (auto& t : work_threads) {
608  t.join();
609  }
610  work_threads.clear();
611 
612  VLOG(10) << "Finish parsing vertex file:" << v_file << " for label "
613  << v_label_name;
614  }
615  basic_fragment_loader_.FinishAddingVertex(v_label_id, indexer_builder);
616  const auto& indexer = basic_fragment_loader_.GetLFIndexer(v_label_id);
617 
618  auto& vtable = basic_fragment_loader_.GetVertexTable(v_label_id);
619  size_t total_row_num = 0;
620  for (auto& batch : batchs) {
621  for (auto& b : batch) {
622  total_row_num += b->num_rows();
623  }
624  }
625  if (total_row_num > vtable.row_num()) {
626  std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
627  if (total_row_num > vtable.row_num()) {
628  LOG(INFO) << "Resize vertex table from " << vtable.row_num() << " to "
629  << total_row_num;
630  vtable.resize(total_row_num);
631  }
632  }
633 
634  std::atomic<size_t> cur_batch_id(0);
635  for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
636  work_threads.emplace_back(
637  [&](int idx) {
638  for (size_t id = 0; id < batchs[idx].size(); ++id) {
639  auto batch = batchs[idx][id];
640  auto columns = batch->columns();
641  auto other_columns_array = columns;
642  auto primary_key_column = columns[primary_key_ind];
643  size_t row_num = primary_key_column->length();
644  std::vector<size_t> vids;
645  if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
646  using arrow_array_t =
648  auto casted_array =
649  std::static_pointer_cast<arrow_array_t>(primary_key_column);
650  for (size_t i = 0; i < row_num; ++i) {
651  vids.emplace_back(indexer.get_index(casted_array->Value(i)));
652  }
653  } else {
654  if (primary_key_column->type()->Equals(arrow::utf8())) {
655  auto casted_array =
656  std::static_pointer_cast<arrow::StringArray>(
657  primary_key_column);
658  for (size_t i = 0; i < row_num; ++i) {
659  auto str = casted_array->GetView(i);
660  std::string_view str_view(str.data(), str.size());
661  vids.emplace_back(indexer.get_index(str_view));
662  }
663  } else if (primary_key_column->type()->Equals(
664  arrow::large_utf8())) {
665  auto casted_array =
666  std::static_pointer_cast<arrow::LargeStringArray>(
667  primary_key_column);
668  for (size_t i = 0; i < row_num; ++i) {
669  auto str = casted_array->GetView(i);
670  std::string_view str_view(str.data(), str.size());
671  vids.emplace_back(indexer.get_index(str_view));
672  }
673  }
674  }
675  other_columns_array.erase(other_columns_array.begin() +
676  primary_key_ind);
677 
678  for (size_t j = 0; j < other_columns_array.size(); ++j) {
679  auto array = other_columns_array[j];
680  auto chunked_array =
681  std::make_shared<arrow::ChunkedArray>(array);
682  set_properties_column(vtable.column_ptrs()[j], chunked_array,
683  vids);
684  }
685  }
686  },
687  i);
688  }
689  for (auto& t : work_threads) {
690  t.join();
691  }
692 
693  auto& v_data = basic_fragment_loader_.GetVertexTable(v_label_id);
694  auto label_name = schema_.get_vertex_label_name(v_label_id);
695 
696  v_data.resize(indexer.size());
697  v_data.dump(vertex_table_prefix(label_name),
699 
700  VLOG(10) << "Finish parsing vertex file:" << v_files.size() << " for label "
701  << v_label_name;
702  }
703 #endif
704 
705  template <typename SRC_PK_T, typename EDATA_T, typename VECTOR_T>
706  void _append_edges(std::shared_ptr<arrow::Array> src_col,
707  std::shared_ptr<arrow::Array> dst_col,
708  const IndexerType& src_indexer,
709  const IndexerType& dst_indexer,
710  std::shared_ptr<arrow::Array>& property_cols,
711  const PropertyType& edge_property, VECTOR_T& parsed_edges,
712  std::vector<std::atomic<int32_t>>& ie_degree,
713  std::vector<std::atomic<int32_t>>& oe_degree,
714  size_t offset) {
715  auto dst_col_type = dst_col->type();
716  if (dst_col_type->Equals(arrow::int64())) {
717  append_edges<SRC_PK_T, int64_t, EDATA_T>(
718  src_col, dst_col, src_indexer, dst_indexer, property_cols,
719  edge_property, parsed_edges, ie_degree, oe_degree, offset);
720  } else if (dst_col_type->Equals(arrow::uint64())) {
721  append_edges<SRC_PK_T, uint64_t, EDATA_T>(
722  src_col, dst_col, src_indexer, dst_indexer, property_cols,
723  edge_property, parsed_edges, ie_degree, oe_degree, offset);
724  } else if (dst_col_type->Equals(arrow::int32())) {
725  append_edges<SRC_PK_T, int32_t, EDATA_T>(
726  src_col, dst_col, src_indexer, dst_indexer, property_cols,
727  edge_property, parsed_edges, ie_degree, oe_degree, offset);
728  } else if (dst_col_type->Equals(arrow::uint32())) {
729  append_edges<SRC_PK_T, uint32_t, EDATA_T>(
730  src_col, dst_col, src_indexer, dst_indexer, property_cols,
731  edge_property, parsed_edges, ie_degree, oe_degree, offset);
732  } else {
733  // must be string
734  append_edges<SRC_PK_T, std::string_view, EDATA_T>(
735  src_col, dst_col, src_indexer, dst_indexer, property_cols,
736  edge_property, parsed_edges, ie_degree, oe_degree, offset);
737  }
738  }
739  template <typename EDATA_T>
741  label_t src_label_id, label_t dst_label_id, label_t e_label_id,
742  const std::vector<std::string>& e_files,
743  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
744  label_t, label_t, label_t, const std::string&, const LoadingConfig&,
745  int)>
746  supplier_creator) {
747  if constexpr (std::is_same_v<EDATA_T, RecordView>) {
748  if (use_mmap_vector_) {
750  EDATA_T, std::vector<std::tuple<vid_t, vid_t, size_t>>>(
751  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
752  } else {
754  EDATA_T, std::vector<std::tuple<vid_t, vid_t, size_t>>>(
755  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
756  }
757  } else {
758  if (use_mmap_vector_) {
761  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
762  } else {
764  EDATA_T, std::vector<std::tuple<vid_t, vid_t, EDATA_T>>>(
765  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
766  }
767  }
768  }
769 
770  template <typename EDATA_T, typename VECTOR_T>
772  label_t src_label_id, label_t dst_label_id, label_t e_label_id,
773  const std::vector<std::string>& e_files,
774  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
775  label_t, label_t, label_t, const std::string&, const LoadingConfig&,
776  int)>
777  supplier_creator) {
778  auto src_label_name = schema_.get_vertex_label_name(src_label_id);
779  auto dst_label_name = schema_.get_vertex_label_name(dst_label_id);
780  auto edge_label_name = schema_.get_edge_label_name(e_label_id);
781  auto edge_column_mappings = loading_config_.GetEdgeColumnMappings(
782  src_label_id, dst_label_id, e_label_id);
783  auto src_dst_col_pair = loading_config_.GetEdgeSrcDstCol(
784  src_label_id, dst_label_id, e_label_id);
785  if (src_dst_col_pair.first.size() != 1 ||
786  src_dst_col_pair.second.size() != 1) {
787  LOG(FATAL) << "We currently only support one src primary key and one "
788  "dst primary key";
789  }
790  size_t src_col_ind = src_dst_col_pair.first[0].second;
791  size_t dst_col_ind = src_dst_col_pair.second[0].second;
792  CHECK(src_col_ind != dst_col_ind);
793 
794  check_edge_invariant(schema_, edge_column_mappings, src_col_ind,
795  dst_col_ind, src_label_id, dst_label_id, e_label_id);
796 
797  const auto& src_indexer = basic_fragment_loader_.GetLFIndexer(src_label_id);
798  const auto& dst_indexer = basic_fragment_loader_.GetLFIndexer(dst_label_id);
799  std::vector<VECTOR_T> parsed_edges_vec(std::thread::hardware_concurrency());
800  if constexpr (std::is_same_v<
801  VECTOR_T,
802  mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>> ||
803  std::is_same_v<
804  VECTOR_T,
805  mmap_vector<std::tuple<vid_t, vid_t, size_t>>>) {
806  const auto& work_dir = basic_fragment_loader_.work_dir();
807  for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
808  parsed_edges_vec[i].open(runtime_dir(work_dir) + "/" + src_label_name +
809  "_" + dst_label_name + "_" + edge_label_name +
810  "_" + std::to_string(i) + ".tmp");
811  parsed_edges_vec[i].reserve(4096);
812  }
813  }
814  std::vector<std::atomic<int32_t>> ie_degree(dst_indexer.size()),
815  oe_degree(src_indexer.size());
816  for (size_t idx = 0; idx < ie_degree.size(); ++idx) {
817  ie_degree[idx].store(0);
818  }
819  for (size_t idx = 0; idx < oe_degree.size(); ++idx) {
820  oe_degree[idx].store(0);
821  }
822  VLOG(10) << "src indexer size: " << src_indexer.size()
823  << " dst indexer size: " << dst_indexer.size();
824 
825  grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
826  queue.SetLimit(1024);
827  std::vector<std::thread> work_threads;
828 
829  std::vector<std::vector<std::shared_ptr<arrow::Array>>> string_columns(
830  std::thread::hardware_concurrency());
831 
832  if constexpr (std::is_same<EDATA_T, RecordView>::value) {
833  basic_fragment_loader_.init_edge_table(src_label_id, dst_label_id,
834  e_label_id);
835  }
836 
837  // use a dummy vector to store the string columns, to avoid the
838  // strings being released as record batch is released.
839  std::vector<std::shared_ptr<arrow::Array>> string_cols;
840  std::atomic<size_t> offset(0);
841  std::shared_mutex rw_mutex;
842  for (auto filename : e_files) {
843  auto record_batch_supplier_vec =
844  supplier_creator(src_label_id, dst_label_id, e_label_id, filename,
845  loading_config_, parsed_edges_vec.size());
846 
847  queue.SetProducerNum(record_batch_supplier_vec.size());
848 
849  for (size_t i = 0; i < record_batch_supplier_vec.size(); ++i) {
850  work_threads.emplace_back(
851  [&](int idx) {
852  auto& string_column = string_columns[idx];
853  bool first_batch = true;
854  auto& record_batch_supplier = record_batch_supplier_vec[idx];
855  while (true) {
856  auto record_batch = record_batch_supplier->GetNextBatch();
857  if (!record_batch) {
858  queue.DecProducerNum();
859  break;
860  }
861  if (first_batch) {
862  auto header = record_batch->schema()->field_names();
863  auto schema_column_names = schema_.get_edge_property_names(
864  src_label_id, dst_label_id, e_label_id);
865  auto schema_column_types = schema_.get_edge_properties(
866  src_label_id, dst_label_id, e_label_id);
867  CHECK(schema_column_names.size() + 2 == header.size())
868  << "schema size: " << schema_column_names.size()
869  << " neq header size: " << header.size();
870  first_batch = false;
871  }
872  for (auto i = 0; i < record_batch->num_columns(); ++i) {
873  if (record_batch->column(i)->type()->Equals(arrow::utf8()) ||
874  record_batch->column(i)->type()->Equals(
875  arrow::large_utf8())) {
876  string_column.emplace_back(record_batch->column(i));
877  }
878  }
879 
880  queue.Put(record_batch);
881  }
882  },
883  i);
884  }
885  for (size_t i = 0;
886  i <
887  std::min(static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
888  std::thread::hardware_concurrency());
889  ++i) {
890  work_threads.emplace_back(
891  [&](int idx) {
892  // copy the table to csr.
893  auto& parsed_edges = parsed_edges_vec[idx];
894  while (true) {
895  std::shared_ptr<arrow::RecordBatch> record_batch{nullptr};
896  auto ret = queue.Get(record_batch);
897  if (!ret) {
898  break;
899  }
900  if (!record_batch) {
901  LOG(FATAL) << "get nullptr batch";
902  }
903  auto columns = record_batch->columns();
904  // We assume the src_col and dst_col will always be put
905  // at front.
906  CHECK(columns.size() >= 2);
907  auto src_col = columns[0];
908  auto dst_col = columns[1];
909  auto src_col_type = src_col->type();
910  auto dst_col_type = dst_col->type();
911  CHECK(check_primary_key_type(src_col_type))
912  << "unsupported src_col type: " << src_col_type->ToString();
913  CHECK(check_primary_key_type(dst_col_type))
914  << "unsupported dst_col type: " << dst_col_type->ToString();
915 
916  std::vector<std::shared_ptr<arrow::Array>> property_cols;
917  for (size_t i = 2; i < columns.size(); ++i) {
918  property_cols.emplace_back(columns[i]);
919  }
920  size_t offset_i = 0;
921  if constexpr (std::is_same<EDATA_T, RecordView>::value) {
922  auto casted_csr = dynamic_cast<DualCsr<RecordView>*>(
923  basic_fragment_loader_.get_csr(src_label_id, dst_label_id,
924  e_label_id));
925  CHECK(casted_csr != NULL);
926  auto table = casted_csr->GetTable();
927  CHECK(table.col_num() == property_cols.size());
928  offset_i = offset.fetch_add(src_col->length());
929  std::vector<size_t> offsets;
930  for (size_t _i = 0;
931  _i < static_cast<size_t>(src_col->length()); ++_i) {
932  offsets.emplace_back(offset_i + _i);
933  }
934  size_t row_num = std::max(table.row_num(), 1ul);
935 
936  while (row_num < offset_i + src_col->length()) {
937  row_num *= 2;
938  }
939  if (row_num > table.row_num()) {
940  std::unique_lock<std::shared_mutex> lock(rw_mutex);
941  if (row_num > table.row_num()) {
942  table.resize(row_num);
943  }
944  }
945 
946  {
947  std::shared_lock<std::shared_mutex> lock(rw_mutex);
948  for (size_t i = 0; i < table.col_num(); ++i) {
949  auto col = table.get_column_by_id(i);
950  auto chunked_array =
951  std::make_shared<arrow::ChunkedArray>(
952  property_cols[i]);
953  set_properties_column(col.get(), chunked_array, offsets);
954  }
955  }
956  }
957  auto edge_property = schema_.get_edge_property(
958  src_label_id, dst_label_id, e_label_id);
959  // add edges to vector
960  CHECK(src_col->length() == dst_col->length());
961  if (src_col_type->Equals(arrow::int64())) {
962  _append_edges<int64_t, EDATA_T, VECTOR_T>(
963  src_col, dst_col, src_indexer, dst_indexer,
964  property_cols[0], edge_property, parsed_edges, ie_degree,
965  oe_degree, offset_i);
966  } else if (src_col_type->Equals(arrow::uint64())) {
967  _append_edges<uint64_t, EDATA_T, VECTOR_T>(
968  src_col, dst_col, src_indexer, dst_indexer,
969  property_cols[0], edge_property, parsed_edges, ie_degree,
970  oe_degree, offset_i);
971  } else if (src_col_type->Equals(arrow::int32())) {
972  _append_edges<int32_t, EDATA_T, VECTOR_T>(
973  src_col, dst_col, src_indexer, dst_indexer,
974  property_cols[0], edge_property, parsed_edges, ie_degree,
975  oe_degree, offset_i);
976  } else if (src_col_type->Equals(arrow::uint32())) {
977  _append_edges<uint32_t, EDATA_T, VECTOR_T>(
978  src_col, dst_col, src_indexer, dst_indexer,
979  property_cols[0], edge_property, parsed_edges, ie_degree,
980  oe_degree, offset_i);
981  } else {
982  // must be string
983  _append_edges<std::string_view, EDATA_T, VECTOR_T>(
984  src_col, dst_col, src_indexer, dst_indexer,
985  property_cols[0], edge_property, parsed_edges, ie_degree,
986  oe_degree, offset_i);
987  }
988  }
989  },
990  i);
991  }
992 
993  for (auto& t : work_threads) {
994  t.join();
995  }
996  VLOG(10) << "Finish parsing edge file:" << filename << " for label "
997  << src_label_name << " -> " << dst_label_name << " -> "
998  << edge_label_name;
999  }
1000  VLOG(10) << "Finish parsing edge file:" << e_files.size() << " for label "
1001  << src_label_name << " -> " << dst_label_name << " -> "
1002  << edge_label_name;
1003  std::vector<int32_t> ie_deg(ie_degree.size());
1004  std::vector<int32_t> oe_deg(oe_degree.size());
1005  for (size_t idx = 0; idx < ie_deg.size(); ++idx) {
1006  ie_deg[idx] = ie_degree[idx];
1007  }
1008  for (size_t idx = 0; idx < oe_deg.size(); ++idx) {
1009  oe_deg[idx] = oe_degree[idx];
1010  }
1011 
1012  basic_fragment_loader_.PutEdges<EDATA_T, VECTOR_T>(
1013  src_label_id, dst_label_id, e_label_id, parsed_edges_vec, ie_deg,
1014  oe_deg, build_csr_in_mem_);
1015 
1016  string_columns.clear();
1017  size_t sum = 0;
1018  for (auto& edges : parsed_edges_vec) {
1019  sum += edges.size();
1020  if constexpr (
1021  std::is_same<VECTOR_T,
1022  mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>>::value ||
1023  std::is_same<VECTOR_T,
1024  mmap_vector<std::tuple<vid_t, vid_t, size_t>>>::value) {
1025  edges.unlink();
1026  }
1027  }
1028 
1029  VLOG(10) << "Finish putting: " << sum << " edges";
1030  }
1031 
1033  const Schema& schema_;
1035  int32_t thread_num_;
1036  std::mutex* mtxs_;
1040 };
1041 
1042 } // namespace gs
1043 
1044 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
gs::LoadingConfig::GetEdgeColumnMappings
const std::vector< std::tuple< size_t, std::string, std::string > > & GetEdgeColumnMappings(label_t src_label_id, label_t dst_label_id, label_t edge_label_id) const
Definition: loading_config.cc:906
gs::_add_vertex
Definition: abstract_arrow_fragment_loader.h:97
gs::mmap_vector
Definition: mmap_vector.h:22
gs::IdIndexer::bucket_count
size_t bucket_count() const
Definition: id_indexer.h:718
gs::set_properties_column
void set_properties_column(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.cc:86
gs::AbstractArrowFragmentLoader
Definition: abstract_arrow_fragment_loader.h:338
gs::AbstractArrowFragmentLoader::addVertexBatchFromArray
void addVertexBatchFromArray(label_t v_label_id, IdIndexer< KEY_T, vid_t > &indexer, std::shared_ptr< arrow::Array > &primary_key_col, const std::vector< std::shared_ptr< arrow::Array >> &property_cols, std::shared_mutex &rw_mutex)
Definition: abstract_arrow_fragment_loader.h:378
gs::AbstractArrowFragmentLoader::edge_label_num_
size_t edge_label_num_
Definition: abstract_arrow_fragment_loader.h:1034
gs::check_edge_invariant
void check_edge_invariant(const Schema &schema, const std::vector< std::tuple< size_t, std::string, std::string >> &column_mappings, size_t src_col_ind, size_t dst_col_ind, label_t src_label_i, label_t dst_label_i, label_t edge_label_i)
Definition: abstract_arrow_fragment_loader.cc:174
gs::AbstractArrowFragmentLoader::mtxs_
std::mutex * mtxs_
Definition: abstract_arrow_fragment_loader.h:1036
gs::append_edges
static void append_edges(std::shared_ptr< arrow::Array > src_col, std::shared_ptr< arrow::Array > dst_col, const IndexerType &src_indexer, const IndexerType &dst_indexer, std::shared_ptr< arrow::Array > &edata_cols, const PropertyType &edge_prop, VECTOR_T &parsed_edges, std::vector< std::atomic< int32_t >> &ie_degree, std::vector< std::atomic< int32_t >> &oe_degree, size_t offset=0)
Definition: abstract_arrow_fragment_loader.h:252
gs::IFragmentLoader
Definition: i_fragment_loader.h:24
i_fragment_loader.h
gs::AbstractArrowFragmentLoader::~AbstractArrowFragmentLoader
~AbstractArrowFragmentLoader()
Definition: abstract_arrow_fragment_loader.h:353
gs::IRecordBatchSupplier
Definition: abstract_arrow_fragment_loader.h:39
gs::Any::From
static Any From(const T &value)
Definition: types.h:693
gs::_add_vertex::operator()
void operator()(const std::shared_ptr< arrow::Array > &col, IdIndexer< KEY_T, vid_t > &indexer, std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.h:99
gs::vertex_table_prefix
std::string vertex_table_prefix(const std::string &label)
Definition: file_names.h:256
gs::Schema::get_edge_property
PropertyType get_edge_property(label_t src, label_t dst, label_t edge) const
Definition: schema.cc:267
gs::LFIndexer::get_index
INDEX_T get_index(const Any &oid) const
Definition: id_indexer.h:324
gs::vid_t
uint32_t vid_t
Definition: types.h:31
gs::AbstractArrowFragmentLoader::AddVerticesRecordBatch
void AddVerticesRecordBatch(label_t v_label_id, const std::vector< std::string > &input_paths, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.cc:200
gs::check_primary_key_type
bool check_primary_key_type(std::shared_ptr< arrow::DataType > data_type)
Definition: abstract_arrow_fragment_loader.cc:30
gs::IdIndexer::_rehash
void _rehash(size_t num)
Definition: id_indexer.h:819
std::to_string
std::string to_string(const gs::flex::interactive::Code &status)
Definition: result.h:166
gs::_append
void _append(bool is_dst, size_t cur_ind, std::shared_ptr< arrow::Array > col, const IndexerType &indexer, VECTOR_T &parsed_edges, std::vector< std::atomic< int32_t >> &degree)
Definition: abstract_arrow_fragment_loader.h:196
gs::printDiskRemaining
void printDiskRemaining(const std::string &path)
Definition: abstract_arrow_fragment_loader.cc:22
gs::IRecordBatchSupplier::~IRecordBatchSupplier
virtual ~IRecordBatchSupplier()=default
gs
Definition: adj_list.h:23
gs::AbstractArrowFragmentLoader::addVertexRecordBatchImpl
void addVertexRecordBatchImpl(label_t v_label_id, const std::vector< std::string > &v_files, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.h:411
gs::IRecordBatchSupplier::GetNextBatch
virtual std::shared_ptr< arrow::RecordBatch > GetNextBatch()=0
gs::PropertyType::kUInt64
static const PropertyType kUInt64
Definition: types.h:141
gs::AbstractArrowFragmentLoader::addEdgesRecordBatchImplHelper
void addEdgesRecordBatchImplHelper(label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::vector< std::string > &e_files, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, label_t, label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.h:771
gs::set_column_from_timestamp_array
void set_column_from_timestamp_array(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.cc:122
mmap_vector.h
gs::set_column
void set_column(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.h:49
gs::BasicFragmentLoader::GetLFIndexer
const IndexerType & GetLFIndexer(label_t v_label) const
Definition: basic_fragment_loader.cc:147
mutable_property_fragment.h
gs::BasicFragmentLoader
Definition: basic_fragment_loader.h:53
gs::set_column_from_string_array
void set_column_from_string_array(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.cc:41
gs::Schema::get_edge_property_names
const std::vector< std::string > & get_edge_property_names(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:273
gs::AbstractArrowFragmentLoader::schema_
const Schema & schema_
Definition: abstract_arrow_fragment_loader.h:1033
gs::Schema::get_edge_properties
const std::vector< PropertyType > & get_edge_properties(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:216
gs::PropertyType::kStringView
static const PropertyType kStringView
Definition: types.h:145
gs::runtime_dir
std::string runtime_dir(const std::string &work_dir)
Definition: file_names.h:200
gs::BasicFragmentLoader::GetVertexTable
Table & GetVertexTable(size_t ind)
Definition: basic_fragment_loader.h:281
gs::AbstractArrowFragmentLoader::loading_config_
const LoadingConfig & loading_config_
Definition: abstract_arrow_fragment_loader.h:1032
gs::BasicFragmentLoader::PutEdges
void PutEdges(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, const std::vector< VECTOR_T > &edges_vec, const std::vector< int32_t > &ie_degree, const std::vector< int32_t > &oe_degree, bool build_csr_in_mem)
Definition: basic_fragment_loader.h:161
gs::TypeConverter
Definition: arrow_utils.h:242
gs::LoadingConfig::GetEdgeSrcDstCol
const std::pair< std::vector< std::pair< std::string, size_t > >, std::vector< std::pair< std::string, size_t > > > & GetEdgeSrcDstCol(label_t src_label_id, label_t dst_label_id, label_t edge_label_id) const
Definition: loading_config.cc:917
gs::Table::columns
std::vector< std::shared_ptr< ColumnBase > > & columns()
Definition: table.cc:225
gs::Schema
Definition: schema.h:29
loading_config.h
gs::AbstractArrowFragmentLoader::_append_edges
void _append_edges(std::shared_ptr< arrow::Array > src_col, std::shared_ptr< arrow::Array > dst_col, const IndexerType &src_indexer, const IndexerType &dst_indexer, std::shared_ptr< arrow::Array > &property_cols, const PropertyType &edge_property, VECTOR_T &parsed_edges, std::vector< std::atomic< int32_t >> &ie_degree, std::vector< std::atomic< int32_t >> &oe_degree, size_t offset)
Definition: abstract_arrow_fragment_loader.h:706
gs::PropertyType::kUInt32
static const PropertyType kUInt32
Definition: types.h:138
basic_fragment_loader.h
gs::Schema::vertex_label_num
label_t vertex_label_num() const
Definition: schema.cc:110
gs::AbstractArrowFragmentLoader::basic_fragment_loader_
BasicFragmentLoader basic_fragment_loader_
Definition: abstract_arrow_fragment_loader.h:1039
gs::ColumnBase::set_any
virtual void set_any(size_t index, const Any &value)=0
gs::AbstractArrowFragmentLoader::use_mmap_vector_
bool use_mmap_vector_
Definition: abstract_arrow_fragment_loader.h:1038
gs::BasicFragmentLoader::FinishAddingVertex
void FinishAddingVertex(label_t v_label, const IdIndexer< KEY_T, vid_t > &indexer)
Definition: basic_fragment_loader.h:68
gs::Table::column_ptrs
std::vector< ColumnBase * > & column_ptrs()
Definition: table.cc:227
gs::Schema::edge_label_num
label_t edge_label_num() const
Definition: schema.cc:114
gs::IdIndexer
Definition: id_indexer.h:181
gs::AbstractArrowFragmentLoader::build_csr_in_mem_
bool build_csr_in_mem_
Definition: abstract_arrow_fragment_loader.h:1037
gs::Schema::get_vertex_label_name
std::string get_vertex_label_name(label_t index) const
Definition: schema.cc:374
gs::ColumnBase::size
virtual size_t size() const =0
gs::set_column_from_timestamp_array_to_day
void set_column_from_timestamp_array_to_day(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.cc:148
gs::PropertyType::kInt64
static const PropertyType kInt64
Definition: types.h:140
gs::AbstractArrowFragmentLoader::vertex_label_num_
size_t vertex_label_num_
Definition: abstract_arrow_fragment_loader.h:1034
gs::LFIndexer::get_type
PropertyType get_type() const
Definition: id_indexer.h:305
gs::IdIndexer::add
bool add(const Any &oid, INDEX_T &lid) override
Definition: id_indexer.h:559
gs::DualCsr< RecordView >
Definition: dual_csr.h:418
gs::snapshot_dir
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
gs::Schema::get_vertex_property_names
const std::vector< std::string > & get_vertex_property_names(const std::string &label) const
Definition: schema.cc:149
gs::BasicFragmentLoader::get_csr
DualCsrBase * get_csr(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:167
gs::AnyConverter
Definition: types.h:393
gs::Schema::get_edge_label_name
std::string get_edge_label_name(label_t index) const
Definition: schema.cc:382
gs::Schema::get_max_vnum
size_t get_max_vnum(const std::string &label) const
Definition: schema.cc:191
gs::PropertyType
Definition: types.h:95
gs::LoadingConfig
Definition: loading_config.h:89
gs::BasicFragmentLoader::work_dir
const std::string & work_dir() const
Definition: basic_fragment_loader.h:290
gs::BasicFragmentLoader::init_edge_table
void init_edge_table(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:175
gs::AbstractArrowFragmentLoader::thread_num_
int32_t thread_num_
Definition: abstract_arrow_fragment_loader.h:1035
gs::label_t
uint8_t label_t
Definition: types.h:32
gs::LFIndexer
Definition: id_indexer.h:184
gs::PropertyType::kInt32
static const PropertyType kInt32
Definition: types.h:137
gs::ColumnBase
Definition: column.h:31
gs::AbstractArrowFragmentLoader::addEdgesRecordBatchImpl
void addEdgesRecordBatchImpl(label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::vector< std::string > &e_files, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, label_t, label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.h:740
gs::Schema::get_vertex_primary_key
const std::vector< std::tuple< PropertyType, std::string, size_t > > & get_vertex_primary_key(label_t index) const
Definition: schema.cc:391
gs::AbstractArrowFragmentLoader::AddEdgesRecordBatch
void AddEdgesRecordBatch(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, const std::vector< std::string > &input_paths, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, label_t, label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.cc:241
gs::AbstractArrowFragmentLoader::AbstractArrowFragmentLoader
AbstractArrowFragmentLoader(const std::string &work_dir, const Schema &schema, const LoadingConfig &loading_config)
Definition: abstract_arrow_fragment_loader.h:340