Flex  0.17.9
abstract_arrow_fragment_loader.h
Go to the documentation of this file.
1 
17 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
18 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
19 
24 #include "flex/utils/mmap_vector.h"
25 #include "grape/utils/concurrent_queue.h"
26 
27 #include <arrow/api.h>
28 #include <arrow/io/api.h>
29 #include <shared_mutex>
30 #include "arrow/util/value_parsing.h"
31 
32 #include "grape/util.h"
33 
34 namespace gs {
35 
36 void printDiskRemaining(const std::string& path);
37 // The interface providing visitor pattern for RecordBatch.
38 
40  public:
41  virtual ~IRecordBatchSupplier() = default;
42  virtual std::shared_ptr<arrow::RecordBatch> GetNextBatch() = 0;
43 };
44 
45 bool check_primary_key_type(std::shared_ptr<arrow::DataType> data_type);
46 
47 // For Primitive types.
48 template <typename COL_T>
49 void set_column(gs::ColumnBase* col, std::shared_ptr<arrow::ChunkedArray> array,
50  const std::vector<size_t>& offset) {
51  using arrow_array_type = typename gs::TypeConverter<COL_T>::ArrowArrayType;
52  auto array_type = array->type();
53  auto arrow_type = gs::TypeConverter<COL_T>::ArrowTypeValue();
54  CHECK(array_type->Equals(arrow_type))
55  << "Inconsistent data type, expect " << arrow_type->ToString()
56  << ", but got " << array_type->ToString();
57  size_t cur_ind = 0;
58  for (auto j = 0; j < array->num_chunks(); ++j) {
59  auto casted = std::static_pointer_cast<arrow_array_type>(array->chunk(j));
60  size_t size = col->size();
61  for (auto k = 0; k < casted->length(); ++k) {
62  if (offset[cur_ind] >= size) {
63  cur_ind++;
64  } else {
65  col->set_any(offset[cur_ind++],
66  std::move(AnyConverter<COL_T>::to_any(casted->Value(k))));
67  }
68  }
69  }
70 }
71 
72 // For String types.
74  std::shared_ptr<arrow::ChunkedArray> array,
75  const std::vector<size_t>& offset,
76  bool enable_resize = false);
77 
79  std::shared_ptr<arrow::ChunkedArray> array,
80  const std::vector<size_t>& offset);
81 
83  gs::ColumnBase* col, std::shared_ptr<arrow::ChunkedArray> array,
84  const std::vector<size_t>& offset);
85 
87  std::shared_ptr<arrow::ChunkedArray> array,
88  const std::vector<size_t>& offset);
89 
91  const Schema& schema,
92  const std::vector<std::tuple<size_t, std::string, std::string>>&
93  column_mappings,
94  size_t src_col_ind, size_t dst_col_ind, label_t src_label_i,
95  label_t dst_label_i, label_t edge_label_i);
96 
97 template <typename KEY_T>
98 struct _add_vertex {
99 #ifndef USE_PTHASH
100  void operator()(const std::shared_ptr<arrow::Array>& col,
101  IdIndexer<KEY_T, vid_t>& indexer,
102  std::vector<size_t>& offset) {
103  size_t row_num = col->length();
104  vid_t vid;
105  if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
106  // for non-string value
107  auto expected_type = gs::TypeConverter<KEY_T>::ArrowTypeValue();
108  using arrow_array_t = typename gs::TypeConverter<KEY_T>::ArrowArrayType;
109  if (!col->type()->Equals(expected_type)) {
110  LOG(FATAL) << "Inconsistent data type, expect "
111  << expected_type->ToString() << ", but got "
112  << col->type()->ToString();
113  }
114  auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
115  for (size_t i = 0; i < row_num; ++i) {
116  if (!indexer.add(casted_array->Value(i), vid)) {
117  VLOG(2) << "Duplicate vertex id: " << casted_array->Value(i) << "..";
118  offset.emplace_back(std::numeric_limits<size_t>::max());
119  } else {
120  offset.emplace_back(vid);
121  }
122  }
123  } else {
124  if (col->type()->Equals(arrow::utf8())) {
125  auto casted_array = std::static_pointer_cast<arrow::StringArray>(col);
126  for (size_t i = 0; i < row_num; ++i) {
127  auto str = casted_array->GetView(i);
128  std::string_view str_view(str.data(), str.size());
129  if (!indexer.add(str_view, vid)) {
130  VLOG(2) << "Duplicate vertex id: " << str_view << "..";
131  offset.emplace_back(std::numeric_limits<size_t>::max());
132  } else {
133  offset.emplace_back(vid);
134  }
135  }
136  } else if (col->type()->Equals(arrow::large_utf8())) {
137  auto casted_array =
138  std::static_pointer_cast<arrow::LargeStringArray>(col);
139  for (size_t i = 0; i < row_num; ++i) {
140  auto str = casted_array->GetView(i);
141  std::string_view str_view(str.data(), str.size());
142  if (!indexer.add(str_view, vid)) {
143  VLOG(2) << "Duplicate vertex id: " << str_view << "..";
144  offset.emplace_back(std::numeric_limits<size_t>::max());
145  } else {
146  offset.emplace_back(vid);
147  }
148  }
149  } else {
150  LOG(FATAL) << "Not support type: " << col->type()->ToString();
151  }
152  }
153  }
154 
155 #else
156  void operator()(const std::shared_ptr<arrow::Array>& col,
157  PTIndexerBuilder<KEY_T, vid_t>& indexer) {
158  size_t row_num = col->length();
159  if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
160  // for non-string value
161  auto expected_type = gs::TypeConverter<KEY_T>::ArrowTypeValue();
162  using arrow_array_t = typename gs::TypeConverter<KEY_T>::ArrowArrayType;
163  if (!col->type()->Equals(expected_type)) {
164  LOG(FATAL) << "Inconsistent data type, expect "
165  << expected_type->ToString() << ", but got "
166  << col->type()->ToString();
167  }
168  auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
169  for (size_t i = 0; i < row_num; ++i) {
170  indexer.add_vertex(casted_array->Value(i));
171  }
172  } else {
173  if (col->type()->Equals(arrow::utf8())) {
174  auto casted_array = std::static_pointer_cast<arrow::StringArray>(col);
175  for (size_t i = 0; i < row_num; ++i) {
176  auto str = casted_array->GetView(i);
177  std::string_view str_view(str.data(), str.size());
178  indexer.add_vertex(str_view);
179  }
180  } else if (col->type()->Equals(arrow::large_utf8())) {
181  auto casted_array =
182  std::static_pointer_cast<arrow::LargeStringArray>(col);
183  for (size_t i = 0; i < row_num; ++i) {
184  auto str = casted_array->GetView(i);
185  std::string_view str_view(str.data(), str.size());
186  indexer.add_vertex(str_view);
187  }
188  } else {
189  LOG(FATAL) << "Not support type: " << col->type()->ToString();
190  }
191  }
192  }
193 #endif
194 };
195 
196 template <typename PK_T, typename EDATA_T, typename VECTOR_T>
197 void _append(bool is_dst, size_t cur_ind, std::shared_ptr<arrow::Array> col,
198  const IndexerType& indexer, VECTOR_T& parsed_edges,
199  std::vector<std::atomic<int32_t>>& degree) {
200  static constexpr auto invalid_vid = std::numeric_limits<vid_t>::max();
201  if constexpr (std::is_same_v<PK_T, std::string_view>) {
202  if (col->type()->Equals(arrow::utf8())) {
203  auto casted = std::static_pointer_cast<arrow::StringArray>(col);
204  for (auto j = 0; j < casted->length(); ++j) {
205  auto str = casted->GetView(j);
206  std::string_view str_view(str.data(), str.size());
207  auto vid = indexer.get_index(Any::From(str_view));
208  if (is_dst) {
209  std::get<1>(parsed_edges[cur_ind++]) = vid;
210  } else {
211  std::get<0>(parsed_edges[cur_ind++]) = vid;
212  }
213  if (vid != invalid_vid) {
214  degree[vid]++;
215  }
216  }
217  } else {
218  // must be large utf8
219  auto casted = std::static_pointer_cast<arrow::LargeStringArray>(col);
220  for (auto j = 0; j < casted->length(); ++j) {
221  auto str = casted->GetView(j);
222  std::string_view str_view(str.data(), str.size());
223  auto vid = indexer.get_index(Any::From(str_view));
224  if (is_dst) {
225  std::get<1>(parsed_edges[cur_ind++]) = vid;
226  } else {
227  std::get<0>(parsed_edges[cur_ind++]) = vid;
228  }
229  if (vid != invalid_vid) {
230  degree[vid]++;
231  }
232  }
233  }
234  } else {
235  using arrow_array_type = typename gs::TypeConverter<PK_T>::ArrowArrayType;
236  auto casted = std::static_pointer_cast<arrow_array_type>(col);
237  for (auto j = 0; j < casted->length(); ++j) {
238  auto vid = indexer.get_index(Any::From(casted->Value(j)));
239  if (is_dst) {
240  std::get<1>(parsed_edges[cur_ind++]) = vid;
241  } else {
242  std::get<0>(parsed_edges[cur_ind++]) = vid;
243  }
244  if (vid != invalid_vid) {
245  degree[vid]++;
246  }
247  }
248  }
249 }
250 
251 template <typename SRC_PK_T, typename DST_PK_T, typename EDATA_T,
252  typename VECTOR_T>
253 static void append_edges(std::shared_ptr<arrow::Array> src_col,
254  std::shared_ptr<arrow::Array> dst_col,
255  const IndexerType& src_indexer,
256  const IndexerType& dst_indexer,
257  std::shared_ptr<arrow::Array>& edata_cols,
258  const PropertyType& edge_prop, VECTOR_T& parsed_edges,
259  std::vector<std::atomic<int32_t>>& ie_degree,
260  std::vector<std::atomic<int32_t>>& oe_degree,
261  size_t offset = 0) {
262  CHECK(src_col->length() == dst_col->length());
263  auto indexer_check_lambda = [](const IndexerType& cur_indexer,
264  const std::shared_ptr<arrow::Array>& cur_col) {
265  if (cur_indexer.get_type() == PropertyType::kInt64) {
266  CHECK(cur_col->type()->Equals(arrow::int64()));
267  } else if (cur_indexer.get_type() == PropertyType::kStringView) {
268  CHECK(cur_col->type()->Equals(arrow::utf8()) ||
269  cur_col->type()->Equals(arrow::large_utf8()));
270  } else if (cur_indexer.get_type() == PropertyType::kInt32) {
271  CHECK(cur_col->type()->Equals(arrow::int32()));
272  } else if (cur_indexer.get_type() == PropertyType::kUInt32) {
273  CHECK(cur_col->type()->Equals(arrow::uint32()));
274  } else if (cur_indexer.get_type() == PropertyType::kUInt64) {
275  CHECK(cur_col->type()->Equals(arrow::uint64()));
276  }
277  };
278 
279  indexer_check_lambda(src_indexer, src_col);
280  indexer_check_lambda(dst_indexer, dst_col);
281  auto old_size = parsed_edges.size();
282  parsed_edges.resize(old_size + src_col->length());
283  VLOG(10) << "resize parsed_edges from" << old_size << " to "
284  << parsed_edges.size();
285 
286  // if EDATA_T is grape::EmptyType, no need to read columns
287  auto edata_col_thread = std::thread([&]() {
288  if constexpr (std::is_same<EDATA_T, RecordView>::value) {
289  size_t cur_ind = old_size;
290  for (auto j = 0; j < src_col->length(); ++j) {
291  std::get<2>(parsed_edges[cur_ind++]) = offset++;
292  }
293  } else if constexpr (!std::is_same<EDATA_T, grape::EmptyType>::value) {
294  auto edata_col = edata_cols;
295  CHECK(src_col->length() == edata_col->length());
296  size_t cur_ind = old_size;
297  auto type = edata_col->type();
298  if (!type->Equals(TypeConverter<EDATA_T>::ArrowTypeValue())) {
299  LOG(FATAL) << "Inconsistent data type, expect "
301  << ", but got " << type->ToString();
302  }
303 
304  using arrow_array_type =
306  // cast chunk to EDATA_T array
307  auto data = std::static_pointer_cast<arrow_array_type>(edata_col);
308  for (auto j = 0; j < edata_col->length(); ++j) {
309  if constexpr (std::is_same<arrow_array_type,
310  arrow::StringArray>::value ||
311  std::is_same<arrow_array_type,
312  arrow::LargeStringArray>::value) {
313  auto str = data->GetView(j);
314  std::string_view str_view(str.data(), str.size());
315  std::get<2>(parsed_edges[cur_ind++]) = str_view;
316  } else {
317  std::get<2>(parsed_edges[cur_ind++]) = data->Value(j);
318  }
319  }
320  VLOG(10) << "Finish inserting: " << src_col->length() << " edges";
321  }
322  });
323  size_t cur_ind = old_size;
324  auto src_col_thread = std::thread([&]() {
325  _append<SRC_PK_T, EDATA_T>(false, cur_ind, src_col, src_indexer,
326  parsed_edges, oe_degree);
327  });
328  auto dst_col_thread = std::thread([&]() {
329  _append<DST_PK_T, EDATA_T>(true, cur_ind, dst_col, dst_indexer,
330  parsed_edges, ie_degree);
331  });
332  src_col_thread.join();
333  dst_col_thread.join();
334  edata_col_thread.join();
335 }
336 
337 // A AbstractArrowFragmentLoader with can load fragment from arrow::table.
338 // Cannot be used directly, should be inherited.
340  public:
341  AbstractArrowFragmentLoader(const std::string& work_dir, const Schema& schema,
342  const LoadingConfig& loading_config)
343  : loading_config_(loading_config),
344  schema_(schema),
345  thread_num_(loading_config_.GetParallelism()),
346  build_csr_in_mem_(loading_config_.GetBuildCsrInMem()),
347  use_mmap_vector_(loading_config_.GetUseMmapVector()),
348  basic_fragment_loader_(schema_, work_dir) {
351  mtxs_ = new std::mutex[vertex_label_num_];
352  }
353 
355  if (mtxs_) {
356  delete[] mtxs_;
357  }
358  }
359 
361  label_t v_label_id, const std::vector<std::string>& input_paths,
362  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
363  label_t, const std::string&, const LoadingConfig&, int)>
364  supplier_creator);
365 
366  // Add edges in record batch to output_parsed_edges, output_ie_degrees and
367  // output_oe_degrees.
368 
369  void AddEdgesRecordBatch(
370  label_t src_label_id, label_t dst_label_id, label_t edge_label_id,
371  const std::vector<std::string>& input_paths,
372  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
373  label_t, label_t, label_t, const std::string&, const LoadingConfig&,
374  int)>
375  supplier_creator);
376 
377  protected:
378  template <typename KEY_T>
380  label_t v_label_id, IdIndexer<KEY_T, vid_t>& indexer,
381  std::shared_ptr<arrow::Array>& primary_key_col,
382  const std::vector<std::shared_ptr<arrow::Array>>& property_cols,
383  std::shared_mutex& rw_mutex) {
384  size_t row_num = primary_key_col->length();
385  auto col_num = property_cols.size();
386  for (size_t i = 0; i < col_num; ++i) {
387  CHECK_EQ(property_cols[i]->length(), row_num);
388  }
389 
390  std::vector<size_t> vids;
391  vids.reserve(row_num);
392  {
393  std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
394  _add_vertex<KEY_T>()(primary_key_col, indexer, vids);
395  }
396  {
397  std::shared_lock<std::shared_mutex> lock(rw_mutex);
398  for (size_t j = 0; j < property_cols.size(); ++j) {
399  auto array = property_cols[j];
400  auto chunked_array = std::make_shared<arrow::ChunkedArray>(array);
403  chunked_array, vids);
404  }
405  }
406 
407  VLOG(10) << "Insert rows: " << row_num;
408  }
409 
410 #ifndef USE_PTHASH
411  template <typename KEY_T>
413  label_t v_label_id, const std::vector<std::string>& v_files,
414  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
415  label_t, const std::string&, const LoadingConfig&, int)>
416  supplier_creator) {
417  std::string v_label_name = schema_.get_vertex_label_name(v_label_id);
418  VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label "
419  << v_label_name;
420  auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0];
421  auto primary_key_name = std::get<1>(primary_key);
422  size_t primary_key_ind = std::get<2>(primary_key);
423  IdIndexer<KEY_T, vid_t> indexer;
424 
425  grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
426  queue.SetLimit(1024);
427  std::vector<std::thread> work_threads;
428 
429  for (auto& v_file : v_files) {
430  VLOG(10) << "Parsing vertex file:" << v_file << " for label "
431  << v_label_name;
432  auto record_batch_supplier_vec =
433  supplier_creator(v_label_id, v_file, loading_config_,
434  std::thread::hardware_concurrency());
435  queue.SetProducerNum(record_batch_supplier_vec.size());
436 
437  for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
438  work_threads.emplace_back(
439  [&](int i) {
440  auto& record_batch_supplier = record_batch_supplier_vec[i];
441  bool first_batch = true;
442  while (true) {
443  auto batch = record_batch_supplier->GetNextBatch();
444  if (!batch) {
445  queue.DecProducerNum();
446  break;
447  }
448  if (first_batch) {
449  auto header = batch->schema()->field_names();
450  auto schema_column_names =
452  CHECK(schema_column_names.size() + 1 == header.size())
453  << "File header of size: " << header.size()
454  << " does not match schema column size: "
455  << schema_column_names.size() + 1;
456  first_batch = false;
457  }
458  queue.Put(batch);
459  }
460  },
461  idx);
462  }
463 
464  std::atomic<size_t> offset(0);
465  std::shared_mutex rw_mutex;
466  for (unsigned idx = 0;
467  idx <
468  std::min(static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
469  std::thread::hardware_concurrency());
470  ++idx) {
471  work_threads.emplace_back(
472  [&](int i) {
473  // It is possible that the inserted data will exceed the size of
474  // the table, so we need to resize the table.
475  // basic_fragment_loader_.GetVertexTable(v_label_id).resize(vids.size());
476  auto& vtable = basic_fragment_loader_.GetVertexTable(v_label_id);
477  while (true) {
478  std::shared_ptr<arrow::RecordBatch> batch{nullptr};
479  auto ret = queue.Get(batch);
480  if (!ret) {
481  break;
482  }
483  if (!batch) {
484  LOG(FATAL) << "get nullptr batch";
485  }
486  auto columns = batch->columns();
487  CHECK(primary_key_ind < columns.size());
488  auto primary_key_column = columns[primary_key_ind];
489  auto other_columns_array = columns;
490  other_columns_array.erase(other_columns_array.begin() +
491  primary_key_ind);
492 
493  auto local_offset =
494  offset.fetch_add(primary_key_column->length());
495  size_t cur_row_num = std::max(vtable.row_num(), 1ul);
496  while (cur_row_num <
497  local_offset + primary_key_column->length()) {
498  cur_row_num *= 2;
499  }
500  if (cur_row_num > vtable.row_num()) {
501  std::unique_lock<std::shared_mutex> lock(rw_mutex);
502  if (cur_row_num > vtable.row_num()) {
503  LOG(INFO) << "Resize vertex table from " << vtable.row_num()
504  << " to " << cur_row_num
505  << ", local_offset: " << local_offset;
506  vtable.resize(cur_row_num);
507  }
508  }
509 
510  addVertexBatchFromArray(v_label_id, indexer, primary_key_column,
511  other_columns_array, rw_mutex);
512  }
513  },
514  idx);
515  }
516  for (auto& t : work_threads) {
517  t.join();
518  }
519  work_threads.clear();
520  VLOG(10) << "Finish parsing vertex file:" << v_file << " for label "
521  << v_label_name;
522  }
523 
524  VLOG(10) << "Finish parsing vertex file:" << v_files.size() << " for label "
525  << v_label_name;
526  if (indexer.bucket_count() == 0) {
527  indexer._rehash(schema_.get_max_vnum(v_label_name));
528  }
529  basic_fragment_loader_.FinishAddingVertex<KEY_T>(v_label_id, indexer);
530  }
531 #else
532  template <typename KEY_T>
534  label_t v_label_id, const std::vector<std::string>& v_files,
535  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
536  label_t, const std::string&, const LoadingConfig&, int)>
537  supplier_creator) {
538  std::string v_label_name = schema_.get_vertex_label_name(v_label_id);
539  VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label "
540  << v_label_name;
541  auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0];
542  auto primary_key_name = std::get<1>(primary_key);
543  size_t primary_key_ind = std::get<2>(primary_key);
544  grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
545  queue.SetLimit(1024);
546  PTIndexerBuilder<KEY_T, vid_t> indexer_builder;
547  std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> batchs(
548  std::thread::hardware_concurrency());
549  std::vector<std::thread> work_threads;
550  for (auto& v_file : v_files) {
551  VLOG(10) << "Parsing vertex file:" << v_file << " for label "
552  << v_label_name;
553  auto record_batch_supplier_vec =
554  supplier_creator(v_label_id, v_file, loading_config_,
555  std::thread::hardware_concurrency());
556  queue.SetProducerNum(record_batch_supplier_vec.size());
557  for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
558  work_threads.emplace_back(
559  [&](int i) {
560  auto& record_batch_supplier = record_batch_supplier_vec[i];
561  bool first_batch = true;
562  while (true) {
563  auto batch = record_batch_supplier->GetNextBatch();
564  if (!batch) {
565  queue.DecProducerNum();
566  break;
567  }
568  if (first_batch) {
569  auto header = batch->schema()->field_names();
570  auto schema_column_names =
572  CHECK(schema_column_names.size() + 1 == header.size())
573  << "File header of size: " << header.size()
574  << " does not match schema column size: "
575  << schema_column_names.size() + 1;
576  first_batch = false;
577  }
578  queue.Put(batch);
579  }
580  },
581  idx);
582  }
583 
584  for (unsigned idx = 0; idx < std::thread::hardware_concurrency(); ++idx) {
585  work_threads.emplace_back(
586  [&](int i) {
587  while (true) {
588  std::shared_ptr<arrow::RecordBatch> batch{nullptr};
589  auto ret = queue.Get(batch);
590  if (!ret) {
591  break;
592  }
593  if (!batch) {
594  LOG(FATAL) << "get nullptr batch";
595  }
596  batchs[i].emplace_back(batch);
597  auto columns = batch->columns();
598  CHECK(primary_key_ind < columns.size());
599  auto primary_key_column = columns[primary_key_ind];
600  {
601  std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
602  _add_vertex<KEY_T>()(primary_key_column, indexer_builder);
603  }
604  }
605  },
606  idx);
607  }
608  for (auto& t : work_threads) {
609  t.join();
610  }
611  work_threads.clear();
612 
613  VLOG(10) << "Finish parsing vertex file:" << v_file << " for label "
614  << v_label_name;
615  }
616  basic_fragment_loader_.FinishAddingVertex(v_label_id, indexer_builder);
617  const auto& indexer = basic_fragment_loader_.GetLFIndexer(v_label_id);
618 
619  auto& vtable = basic_fragment_loader_.GetVertexTable(v_label_id);
620  size_t total_row_num = 0;
621  for (auto& batch : batchs) {
622  for (auto& b : batch) {
623  total_row_num += b->num_rows();
624  }
625  }
626  if (total_row_num > vtable.row_num()) {
627  std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
628  if (total_row_num > vtable.row_num()) {
629  LOG(INFO) << "Resize vertex table from " << vtable.row_num() << " to "
630  << total_row_num;
631  vtable.resize(total_row_num);
632  }
633  }
634 
635  std::atomic<size_t> cur_batch_id(0);
636  for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
637  work_threads.emplace_back(
638  [&](int idx) {
639  for (size_t id = 0; id < batchs[idx].size(); ++id) {
640  auto batch = batchs[idx][id];
641  auto columns = batch->columns();
642  auto other_columns_array = columns;
643  auto primary_key_column = columns[primary_key_ind];
644  size_t row_num = primary_key_column->length();
645  std::vector<size_t> vids;
646  if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
647  using arrow_array_t =
649  auto casted_array =
650  std::static_pointer_cast<arrow_array_t>(primary_key_column);
651  for (size_t i = 0; i < row_num; ++i) {
652  vids.emplace_back(indexer.get_index(casted_array->Value(i)));
653  }
654  } else {
655  if (primary_key_column->type()->Equals(arrow::utf8())) {
656  auto casted_array =
657  std::static_pointer_cast<arrow::StringArray>(
658  primary_key_column);
659  for (size_t i = 0; i < row_num; ++i) {
660  auto str = casted_array->GetView(i);
661  std::string_view str_view(str.data(), str.size());
662  vids.emplace_back(indexer.get_index(str_view));
663  }
664  } else if (primary_key_column->type()->Equals(
665  arrow::large_utf8())) {
666  auto casted_array =
667  std::static_pointer_cast<arrow::LargeStringArray>(
668  primary_key_column);
669  for (size_t i = 0; i < row_num; ++i) {
670  auto str = casted_array->GetView(i);
671  std::string_view str_view(str.data(), str.size());
672  vids.emplace_back(indexer.get_index(str_view));
673  }
674  }
675  }
676  other_columns_array.erase(other_columns_array.begin() +
677  primary_key_ind);
678 
679  for (size_t j = 0; j < other_columns_array.size(); ++j) {
680  auto array = other_columns_array[j];
681  auto chunked_array =
682  std::make_shared<arrow::ChunkedArray>(array);
683  set_properties_column(vtable.column_ptrs()[j], chunked_array,
684  vids);
685  }
686  }
687  },
688  i);
689  }
690  for (auto& t : work_threads) {
691  t.join();
692  }
693 
694  auto& v_data = basic_fragment_loader_.GetVertexTable(v_label_id);
695  auto label_name = schema_.get_vertex_label_name(v_label_id);
696 
697  v_data.resize(indexer.size());
698  v_data.dump(vertex_table_prefix(label_name),
700 
701  VLOG(10) << "Finish parsing vertex file:" << v_files.size() << " for label "
702  << v_label_name;
703  }
704 #endif
705 
706  template <typename SRC_PK_T, typename EDATA_T, typename VECTOR_T>
707  void _append_edges(std::shared_ptr<arrow::Array> src_col,
708  std::shared_ptr<arrow::Array> dst_col,
709  const IndexerType& src_indexer,
710  const IndexerType& dst_indexer,
711  std::shared_ptr<arrow::Array>& property_cols,
712  const PropertyType& edge_property, VECTOR_T& parsed_edges,
713  std::vector<std::atomic<int32_t>>& ie_degree,
714  std::vector<std::atomic<int32_t>>& oe_degree,
715  size_t offset) {
716  auto dst_col_type = dst_col->type();
717  if (dst_col_type->Equals(arrow::int64())) {
718  append_edges<SRC_PK_T, int64_t, EDATA_T>(
719  src_col, dst_col, src_indexer, dst_indexer, property_cols,
720  edge_property, parsed_edges, ie_degree, oe_degree, offset);
721  } else if (dst_col_type->Equals(arrow::uint64())) {
722  append_edges<SRC_PK_T, uint64_t, EDATA_T>(
723  src_col, dst_col, src_indexer, dst_indexer, property_cols,
724  edge_property, parsed_edges, ie_degree, oe_degree, offset);
725  } else if (dst_col_type->Equals(arrow::int32())) {
726  append_edges<SRC_PK_T, int32_t, EDATA_T>(
727  src_col, dst_col, src_indexer, dst_indexer, property_cols,
728  edge_property, parsed_edges, ie_degree, oe_degree, offset);
729  } else if (dst_col_type->Equals(arrow::uint32())) {
730  append_edges<SRC_PK_T, uint32_t, EDATA_T>(
731  src_col, dst_col, src_indexer, dst_indexer, property_cols,
732  edge_property, parsed_edges, ie_degree, oe_degree, offset);
733  } else {
734  // must be string
735  append_edges<SRC_PK_T, std::string_view, EDATA_T>(
736  src_col, dst_col, src_indexer, dst_indexer, property_cols,
737  edge_property, parsed_edges, ie_degree, oe_degree, offset);
738  }
739  }
740  template <typename EDATA_T>
742  label_t src_label_id, label_t dst_label_id, label_t e_label_id,
743  const std::vector<std::string>& e_files,
744  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
745  label_t, label_t, label_t, const std::string&, const LoadingConfig&,
746  int)>
747  supplier_creator) {
748  if constexpr (std::is_same_v<EDATA_T, RecordView>) {
749  if (use_mmap_vector_) {
751  EDATA_T, std::vector<std::tuple<vid_t, vid_t, size_t>>>(
752  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
753  } else {
755  EDATA_T, std::vector<std::tuple<vid_t, vid_t, size_t>>>(
756  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
757  }
758  } else {
759  if (use_mmap_vector_) {
762  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
763  } else {
765  EDATA_T, std::vector<std::tuple<vid_t, vid_t, EDATA_T>>>(
766  src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
767  }
768  }
769  }
770 
771  template <typename EDATA_T, typename VECTOR_T>
773  label_t src_label_id, label_t dst_label_id, label_t e_label_id,
774  const std::vector<std::string>& e_files,
775  std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
776  label_t, label_t, label_t, const std::string&, const LoadingConfig&,
777  int)>
778  supplier_creator) {
779  auto src_label_name = schema_.get_vertex_label_name(src_label_id);
780  auto dst_label_name = schema_.get_vertex_label_name(dst_label_id);
781  auto edge_label_name = schema_.get_edge_label_name(e_label_id);
782  auto edge_column_mappings = loading_config_.GetEdgeColumnMappings(
783  src_label_id, dst_label_id, e_label_id);
784  auto src_dst_col_pair = loading_config_.GetEdgeSrcDstCol(
785  src_label_id, dst_label_id, e_label_id);
786  if (src_dst_col_pair.first.size() != 1 ||
787  src_dst_col_pair.second.size() != 1) {
788  LOG(FATAL) << "We currently only support one src primary key and one "
789  "dst primary key";
790  }
791  size_t src_col_ind = src_dst_col_pair.first[0].second;
792  size_t dst_col_ind = src_dst_col_pair.second[0].second;
793  CHECK(src_col_ind != dst_col_ind);
794 
795  check_edge_invariant(schema_, edge_column_mappings, src_col_ind,
796  dst_col_ind, src_label_id, dst_label_id, e_label_id);
797 
798  const auto& src_indexer = basic_fragment_loader_.GetLFIndexer(src_label_id);
799  const auto& dst_indexer = basic_fragment_loader_.GetLFIndexer(dst_label_id);
800  std::vector<VECTOR_T> parsed_edges_vec(std::thread::hardware_concurrency());
801  if constexpr (std::is_same_v<
802  VECTOR_T,
803  mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>> ||
804  std::is_same_v<
805  VECTOR_T,
806  mmap_vector<std::tuple<vid_t, vid_t, size_t>>>) {
807  const auto& work_dir = basic_fragment_loader_.work_dir();
808  for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
809  parsed_edges_vec[i].open(runtime_dir(work_dir) + "/" + src_label_name +
810  "_" + dst_label_name + "_" + edge_label_name +
811  "_" + std::to_string(i) + ".tmp");
812  parsed_edges_vec[i].reserve(4096);
813  }
814  }
815  std::vector<std::atomic<int32_t>> ie_degree(dst_indexer.size()),
816  oe_degree(src_indexer.size());
817  for (size_t idx = 0; idx < ie_degree.size(); ++idx) {
818  ie_degree[idx].store(0);
819  }
820  for (size_t idx = 0; idx < oe_degree.size(); ++idx) {
821  oe_degree[idx].store(0);
822  }
823  VLOG(10) << "src indexer size: " << src_indexer.size()
824  << " dst indexer size: " << dst_indexer.size();
825 
826  grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
827  queue.SetLimit(1024);
828  std::vector<std::thread> work_threads;
829 
830  std::vector<std::vector<std::shared_ptr<arrow::Array>>> string_columns(
831  std::thread::hardware_concurrency());
832 
833  if constexpr (std::is_same<EDATA_T, RecordView>::value) {
834  basic_fragment_loader_.init_edge_table(src_label_id, dst_label_id,
835  e_label_id);
836  }
837 
838  // use a dummy vector to store the string columns, to avoid the
839  // strings being released as record batch is released.
840  std::vector<std::shared_ptr<arrow::Array>> string_cols;
841  std::atomic<size_t> offset(0);
842  std::shared_mutex rw_mutex;
843  for (auto filename : e_files) {
844  auto record_batch_supplier_vec =
845  supplier_creator(src_label_id, dst_label_id, e_label_id, filename,
846  loading_config_, parsed_edges_vec.size());
847 
848  queue.SetProducerNum(record_batch_supplier_vec.size());
849 
850  for (size_t i = 0; i < record_batch_supplier_vec.size(); ++i) {
851  work_threads.emplace_back(
852  [&](int idx) {
853  auto& string_column = string_columns[idx];
854  bool first_batch = true;
855  auto& record_batch_supplier = record_batch_supplier_vec[idx];
856  while (true) {
857  auto record_batch = record_batch_supplier->GetNextBatch();
858  if (!record_batch) {
859  queue.DecProducerNum();
860  break;
861  }
862  if (first_batch) {
863  auto header = record_batch->schema()->field_names();
864  auto schema_column_names = schema_.get_edge_property_names(
865  src_label_id, dst_label_id, e_label_id);
866  auto schema_column_types = schema_.get_edge_properties(
867  src_label_id, dst_label_id, e_label_id);
868  CHECK(schema_column_names.size() + 2 == header.size())
869  << "schema size: " << schema_column_names.size()
870  << " neq header size: " << header.size();
871  first_batch = false;
872  }
873  for (auto i = 0; i < record_batch->num_columns(); ++i) {
874  if (record_batch->column(i)->type()->Equals(arrow::utf8()) ||
875  record_batch->column(i)->type()->Equals(
876  arrow::large_utf8())) {
877  string_column.emplace_back(record_batch->column(i));
878  }
879  }
880 
881  queue.Put(record_batch);
882  }
883  },
884  i);
885  }
886  for (size_t i = 0;
887  i <
888  std::min(static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
889  std::thread::hardware_concurrency());
890  ++i) {
891  work_threads.emplace_back(
892  [&](int idx) {
893  // copy the table to csr.
894  auto& parsed_edges = parsed_edges_vec[idx];
895  while (true) {
896  std::shared_ptr<arrow::RecordBatch> record_batch{nullptr};
897  auto ret = queue.Get(record_batch);
898  if (!ret) {
899  break;
900  }
901  if (!record_batch) {
902  LOG(FATAL) << "get nullptr batch";
903  }
904  auto columns = record_batch->columns();
905  // We assume the src_col and dst_col will always be put
906  // at front.
907  CHECK(columns.size() >= 2);
908  auto src_col = columns[0];
909  auto dst_col = columns[1];
910  auto src_col_type = src_col->type();
911  auto dst_col_type = dst_col->type();
912  CHECK(check_primary_key_type(src_col_type))
913  << "unsupported src_col type: " << src_col_type->ToString();
914  CHECK(check_primary_key_type(dst_col_type))
915  << "unsupported dst_col type: " << dst_col_type->ToString();
916 
917  std::vector<std::shared_ptr<arrow::Array>> property_cols;
918  for (size_t i = 2; i < columns.size(); ++i) {
919  property_cols.emplace_back(columns[i]);
920  }
921  size_t offset_i = 0;
922  if constexpr (std::is_same<EDATA_T, RecordView>::value) {
923  auto casted_csr = dynamic_cast<DualCsr<RecordView>*>(
924  basic_fragment_loader_.get_csr(src_label_id, dst_label_id,
925  e_label_id));
926  CHECK(casted_csr != NULL);
927  auto table = casted_csr->GetTable();
928  CHECK(table.col_num() == property_cols.size());
929  offset_i = offset.fetch_add(src_col->length());
930  std::vector<size_t> offsets;
931  for (size_t _i = 0;
932  _i < static_cast<size_t>(src_col->length()); ++_i) {
933  offsets.emplace_back(offset_i + _i);
934  }
935  size_t row_num = std::max(table.row_num(), 1ul);
936 
937  while (row_num < offset_i + src_col->length()) {
938  row_num *= 2;
939  }
940  if (row_num > table.row_num()) {
941  std::unique_lock<std::shared_mutex> lock(rw_mutex);
942  if (row_num > table.row_num()) {
943  table.resize(row_num);
944  }
945  }
946 
947  {
948  std::shared_lock<std::shared_mutex> lock(rw_mutex);
949  for (size_t i = 0; i < table.col_num(); ++i) {
950  auto col = table.get_column_by_id(i);
951  auto chunked_array =
952  std::make_shared<arrow::ChunkedArray>(
953  property_cols[i]);
954  set_properties_column(col.get(), chunked_array, offsets);
955  }
956  }
957  }
958  auto edge_property = schema_.get_edge_property(
959  src_label_id, dst_label_id, e_label_id);
960  // add edges to vector
961  CHECK(src_col->length() == dst_col->length());
962  if (src_col_type->Equals(arrow::int64())) {
963  _append_edges<int64_t, EDATA_T, VECTOR_T>(
964  src_col, dst_col, src_indexer, dst_indexer,
965  property_cols[0], edge_property, parsed_edges, ie_degree,
966  oe_degree, offset_i);
967  } else if (src_col_type->Equals(arrow::uint64())) {
968  _append_edges<uint64_t, EDATA_T, VECTOR_T>(
969  src_col, dst_col, src_indexer, dst_indexer,
970  property_cols[0], edge_property, parsed_edges, ie_degree,
971  oe_degree, offset_i);
972  } else if (src_col_type->Equals(arrow::int32())) {
973  _append_edges<int32_t, EDATA_T, VECTOR_T>(
974  src_col, dst_col, src_indexer, dst_indexer,
975  property_cols[0], edge_property, parsed_edges, ie_degree,
976  oe_degree, offset_i);
977  } else if (src_col_type->Equals(arrow::uint32())) {
978  _append_edges<uint32_t, EDATA_T, VECTOR_T>(
979  src_col, dst_col, src_indexer, dst_indexer,
980  property_cols[0], edge_property, parsed_edges, ie_degree,
981  oe_degree, offset_i);
982  } else {
983  // must be string
984  _append_edges<std::string_view, EDATA_T, VECTOR_T>(
985  src_col, dst_col, src_indexer, dst_indexer,
986  property_cols[0], edge_property, parsed_edges, ie_degree,
987  oe_degree, offset_i);
988  }
989  }
990  },
991  i);
992  }
993 
994  for (auto& t : work_threads) {
995  t.join();
996  }
997  VLOG(10) << "Finish parsing edge file:" << filename << " for label "
998  << src_label_name << " -> " << dst_label_name << " -> "
999  << edge_label_name;
1000  }
1001  VLOG(10) << "Finish parsing edge file:" << e_files.size() << " for label "
1002  << src_label_name << " -> " << dst_label_name << " -> "
1003  << edge_label_name;
1004  std::vector<int32_t> ie_deg(ie_degree.size());
1005  std::vector<int32_t> oe_deg(oe_degree.size());
1006  for (size_t idx = 0; idx < ie_deg.size(); ++idx) {
1007  ie_deg[idx] = ie_degree[idx];
1008  }
1009  for (size_t idx = 0; idx < oe_deg.size(); ++idx) {
1010  oe_deg[idx] = oe_degree[idx];
1011  }
1012 
1013  basic_fragment_loader_.PutEdges<EDATA_T, VECTOR_T>(
1014  src_label_id, dst_label_id, e_label_id, parsed_edges_vec, ie_deg,
1015  oe_deg, build_csr_in_mem_);
1016 
1017  string_columns.clear();
1018  size_t sum = 0;
1019  for (auto& edges : parsed_edges_vec) {
1020  sum += edges.size();
1021  if constexpr (
1022  std::is_same<VECTOR_T,
1023  mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>>::value ||
1024  std::is_same<VECTOR_T,
1025  mmap_vector<std::tuple<vid_t, vid_t, size_t>>>::value) {
1026  edges.unlink();
1027  }
1028  }
1029 
1030  VLOG(10) << "Finish putting: " << sum << " edges";
1031  }
1032 
1034  const Schema& schema_;
1036  int32_t thread_num_;
1037  std::mutex* mtxs_;
1041 };
1042 
1043 } // namespace gs
1044 
1045 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
gs::LoadingConfig::GetEdgeColumnMappings
const std::vector< std::tuple< size_t, std::string, std::string > > & GetEdgeColumnMappings(label_t src_label_id, label_t dst_label_id, label_t edge_label_id) const
Definition: loading_config.cc:906
gs::_add_vertex
Definition: abstract_arrow_fragment_loader.h:98
gs::set_column_from_string_array
void set_column_from_string_array(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset, bool enable_resize)
Definition: abstract_arrow_fragment_loader.cc:41
gs::mmap_vector
Definition: mmap_vector.h:22
gs::IdIndexer::bucket_count
size_t bucket_count() const
Definition: id_indexer.h:718
gs::set_properties_column
void set_properties_column(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.cc:99
gs::AbstractArrowFragmentLoader
Definition: abstract_arrow_fragment_loader.h:339
gs::AbstractArrowFragmentLoader::addVertexBatchFromArray
void addVertexBatchFromArray(label_t v_label_id, IdIndexer< KEY_T, vid_t > &indexer, std::shared_ptr< arrow::Array > &primary_key_col, const std::vector< std::shared_ptr< arrow::Array >> &property_cols, std::shared_mutex &rw_mutex)
Definition: abstract_arrow_fragment_loader.h:379
gs::AbstractArrowFragmentLoader::edge_label_num_
size_t edge_label_num_
Definition: abstract_arrow_fragment_loader.h:1035
gs::check_edge_invariant
void check_edge_invariant(const Schema &schema, const std::vector< std::tuple< size_t, std::string, std::string >> &column_mappings, size_t src_col_ind, size_t dst_col_ind, label_t src_label_i, label_t dst_label_i, label_t edge_label_i)
Definition: abstract_arrow_fragment_loader.cc:187
gs::AbstractArrowFragmentLoader::mtxs_
std::mutex * mtxs_
Definition: abstract_arrow_fragment_loader.h:1037
gs::append_edges
static void append_edges(std::shared_ptr< arrow::Array > src_col, std::shared_ptr< arrow::Array > dst_col, const IndexerType &src_indexer, const IndexerType &dst_indexer, std::shared_ptr< arrow::Array > &edata_cols, const PropertyType &edge_prop, VECTOR_T &parsed_edges, std::vector< std::atomic< int32_t >> &ie_degree, std::vector< std::atomic< int32_t >> &oe_degree, size_t offset=0)
Definition: abstract_arrow_fragment_loader.h:253
gs::IFragmentLoader
Definition: i_fragment_loader.h:24
i_fragment_loader.h
gs::AbstractArrowFragmentLoader::~AbstractArrowFragmentLoader
~AbstractArrowFragmentLoader()
Definition: abstract_arrow_fragment_loader.h:354
gs::IRecordBatchSupplier
Definition: abstract_arrow_fragment_loader.h:39
gs::Any::From
static Any From(const T &value)
Definition: types.h:697
gs::_add_vertex::operator()
void operator()(const std::shared_ptr< arrow::Array > &col, IdIndexer< KEY_T, vid_t > &indexer, std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.h:100
gs::vertex_table_prefix
std::string vertex_table_prefix(const std::string &label)
Definition: file_names.h:256
gs::Schema::get_edge_property
PropertyType get_edge_property(label_t src, label_t dst, label_t edge) const
Definition: schema.cc:267
gs::LFIndexer::get_index
INDEX_T get_index(const Any &oid) const
Definition: id_indexer.h:324
gs::vid_t
uint32_t vid_t
Definition: types.h:31
gs::AbstractArrowFragmentLoader::AddVerticesRecordBatch
void AddVerticesRecordBatch(label_t v_label_id, const std::vector< std::string > &input_paths, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.cc:213
gs::check_primary_key_type
bool check_primary_key_type(std::shared_ptr< arrow::DataType > data_type)
Definition: abstract_arrow_fragment_loader.cc:30
gs::IdIndexer::_rehash
void _rehash(size_t num)
Definition: id_indexer.h:819
std::to_string
std::string to_string(const gs::flex::interactive::Code &status)
Definition: result.h:166
gs::_append
void _append(bool is_dst, size_t cur_ind, std::shared_ptr< arrow::Array > col, const IndexerType &indexer, VECTOR_T &parsed_edges, std::vector< std::atomic< int32_t >> &degree)
Definition: abstract_arrow_fragment_loader.h:197
gs::printDiskRemaining
void printDiskRemaining(const std::string &path)
Definition: abstract_arrow_fragment_loader.cc:22
gs::IRecordBatchSupplier::~IRecordBatchSupplier
virtual ~IRecordBatchSupplier()=default
gs
Definition: adj_list.h:23
gs::AbstractArrowFragmentLoader::addVertexRecordBatchImpl
void addVertexRecordBatchImpl(label_t v_label_id, const std::vector< std::string > &v_files, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.h:412
gs::IRecordBatchSupplier::GetNextBatch
virtual std::shared_ptr< arrow::RecordBatch > GetNextBatch()=0
gs::PropertyType::kUInt64
static const PropertyType kUInt64
Definition: types.h:145
gs::AbstractArrowFragmentLoader::addEdgesRecordBatchImplHelper
void addEdgesRecordBatchImplHelper(label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::vector< std::string > &e_files, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, label_t, label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.h:772
gs::set_column_from_timestamp_array
void set_column_from_timestamp_array(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.cc:135
mmap_vector.h
gs::set_column
void set_column(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.h:49
gs::BasicFragmentLoader::GetLFIndexer
const IndexerType & GetLFIndexer(label_t v_label) const
Definition: basic_fragment_loader.cc:147
mutable_property_fragment.h
gs::BasicFragmentLoader
Definition: basic_fragment_loader.h:53
gs::Schema::get_edge_property_names
const std::vector< std::string > & get_edge_property_names(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:273
gs::AbstractArrowFragmentLoader::schema_
const Schema & schema_
Definition: abstract_arrow_fragment_loader.h:1034
gs::Schema::get_edge_properties
const std::vector< PropertyType > & get_edge_properties(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:216
gs::PropertyType::kStringView
static const PropertyType kStringView
Definition: types.h:149
gs::runtime_dir
std::string runtime_dir(const std::string &work_dir)
Definition: file_names.h:200
gs::BasicFragmentLoader::GetVertexTable
Table & GetVertexTable(size_t ind)
Definition: basic_fragment_loader.h:285
gs::AbstractArrowFragmentLoader::loading_config_
const LoadingConfig & loading_config_
Definition: abstract_arrow_fragment_loader.h:1033
gs::BasicFragmentLoader::PutEdges
void PutEdges(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, const std::vector< VECTOR_T > &edges_vec, const std::vector< int32_t > &ie_degree, const std::vector< int32_t > &oe_degree, bool build_csr_in_mem)
Definition: basic_fragment_loader.h:165
gs::TypeConverter
Definition: arrow_utils.h:242
gs::LoadingConfig::GetEdgeSrcDstCol
const std::pair< std::vector< std::pair< std::string, size_t > >, std::vector< std::pair< std::string, size_t > > > & GetEdgeSrcDstCol(label_t src_label_id, label_t dst_label_id, label_t edge_label_id) const
Definition: loading_config.cc:917
gs::Table::columns
std::vector< std::shared_ptr< ColumnBase > > & columns()
Definition: table.cc:225
gs::Schema
Definition: schema.h:29
loading_config.h
gs::AbstractArrowFragmentLoader::_append_edges
void _append_edges(std::shared_ptr< arrow::Array > src_col, std::shared_ptr< arrow::Array > dst_col, const IndexerType &src_indexer, const IndexerType &dst_indexer, std::shared_ptr< arrow::Array > &property_cols, const PropertyType &edge_property, VECTOR_T &parsed_edges, std::vector< std::atomic< int32_t >> &ie_degree, std::vector< std::atomic< int32_t >> &oe_degree, size_t offset)
Definition: abstract_arrow_fragment_loader.h:707
gs::PropertyType::kUInt32
static const PropertyType kUInt32
Definition: types.h:142
basic_fragment_loader.h
gs::Schema::vertex_label_num
label_t vertex_label_num() const
Definition: schema.cc:110
gs::AbstractArrowFragmentLoader::basic_fragment_loader_
BasicFragmentLoader basic_fragment_loader_
Definition: abstract_arrow_fragment_loader.h:1040
gs::ColumnBase::set_any
virtual void set_any(size_t index, const Any &value)=0
gs::AbstractArrowFragmentLoader::use_mmap_vector_
bool use_mmap_vector_
Definition: abstract_arrow_fragment_loader.h:1039
gs::BasicFragmentLoader::FinishAddingVertex
void FinishAddingVertex(label_t v_label, const IdIndexer< KEY_T, vid_t > &indexer)
Definition: basic_fragment_loader.h:68
gs::Table::column_ptrs
std::vector< ColumnBase * > & column_ptrs()
Definition: table.cc:227
gs::Schema::edge_label_num
label_t edge_label_num() const
Definition: schema.cc:114
gs::IdIndexer
Definition: id_indexer.h:181
gs::AbstractArrowFragmentLoader::build_csr_in_mem_
bool build_csr_in_mem_
Definition: abstract_arrow_fragment_loader.h:1038
gs::Schema::get_vertex_label_name
std::string get_vertex_label_name(label_t index) const
Definition: schema.cc:374
gs::ColumnBase::size
virtual size_t size() const =0
gs::set_column_from_timestamp_array_to_day
void set_column_from_timestamp_array_to_day(gs::ColumnBase *col, std::shared_ptr< arrow::ChunkedArray > array, const std::vector< size_t > &offset)
Definition: abstract_arrow_fragment_loader.cc:161
gs::PropertyType::kInt64
static const PropertyType kInt64
Definition: types.h:144
gs::AbstractArrowFragmentLoader::vertex_label_num_
size_t vertex_label_num_
Definition: abstract_arrow_fragment_loader.h:1035
gs::LFIndexer::get_type
PropertyType get_type() const
Definition: id_indexer.h:305
gs::IdIndexer::add
bool add(const Any &oid, INDEX_T &lid) override
Definition: id_indexer.h:559
gs::DualCsr< RecordView >
Definition: dual_csr.h:443
gs::snapshot_dir
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
gs::Schema::get_vertex_property_names
const std::vector< std::string > & get_vertex_property_names(const std::string &label) const
Definition: schema.cc:149
gs::BasicFragmentLoader::get_csr
DualCsrBase * get_csr(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:167
gs::AnyConverter
Definition: types.h:397
gs::Schema::get_edge_label_name
std::string get_edge_label_name(label_t index) const
Definition: schema.cc:382
gs::Schema::get_max_vnum
size_t get_max_vnum(const std::string &label) const
Definition: schema.cc:191
gs::PropertyType
Definition: types.h:95
gs::LoadingConfig
Definition: loading_config.h:89
gs::BasicFragmentLoader::work_dir
const std::string & work_dir() const
Definition: basic_fragment_loader.h:294
gs::BasicFragmentLoader::init_edge_table
void init_edge_table(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:175
gs::AbstractArrowFragmentLoader::thread_num_
int32_t thread_num_
Definition: abstract_arrow_fragment_loader.h:1036
gs::label_t
uint8_t label_t
Definition: types.h:32
gs::LFIndexer
Definition: id_indexer.h:184
gs::PropertyType::kInt32
static const PropertyType kInt32
Definition: types.h:141
gs::ColumnBase
Definition: column.h:32
gs::AbstractArrowFragmentLoader::addEdgesRecordBatchImpl
void addEdgesRecordBatchImpl(label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::vector< std::string > &e_files, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, label_t, label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.h:741
gs::Schema::get_vertex_primary_key
const std::vector< std::tuple< PropertyType, std::string, size_t > > & get_vertex_primary_key(label_t index) const
Definition: schema.cc:391
gs::AbstractArrowFragmentLoader::AddEdgesRecordBatch
void AddEdgesRecordBatch(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, const std::vector< std::string > &input_paths, std::function< std::vector< std::shared_ptr< IRecordBatchSupplier >>(label_t, label_t, label_t, const std::string &, const LoadingConfig &, int)> supplier_creator)
Definition: abstract_arrow_fragment_loader.cc:254
gs::AbstractArrowFragmentLoader::AbstractArrowFragmentLoader
AbstractArrowFragmentLoader(const std::string &work_dir, const Schema &schema, const LoadingConfig &loading_config)
Definition: abstract_arrow_fragment_loader.h:341