17 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
18 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
25 #include "grape/utils/concurrent_queue.h"
27 #include <arrow/api.h>
28 #include <arrow/io/api.h>
29 #include <shared_mutex>
30 #include "arrow/util/value_parsing.h"
32 #include "grape/util.h"
42 virtual std::shared_ptr<arrow::RecordBatch>
GetNextBatch() = 0;
48 template <
typename COL_T>
50 const std::vector<size_t>& offset) {
52 auto array_type = array->type();
54 CHECK(array_type->Equals(arrow_type))
55 <<
"Inconsistent data type, expect " << arrow_type->ToString()
56 <<
", but got " << array_type->ToString();
58 for (
auto j = 0; j < array->num_chunks(); ++j) {
59 auto casted = std::static_pointer_cast<arrow_array_type>(array->chunk(j));
60 size_t size = col->
size();
61 for (
auto k = 0; k < casted->length(); ++k) {
62 if (offset[cur_ind] >= size) {
74 std::shared_ptr<arrow::ChunkedArray> array,
75 const std::vector<size_t>& offset,
76 bool enable_resize =
false);
79 std::shared_ptr<arrow::ChunkedArray> array,
80 const std::vector<size_t>& offset);
84 const std::vector<size_t>& offset);
87 std::shared_ptr<arrow::ChunkedArray> array,
88 const std::vector<size_t>& offset);
92 const std::vector<std::tuple<size_t, std::string, std::string>>&
94 size_t src_col_ind,
size_t dst_col_ind,
label_t src_label_i,
97 template <
typename KEY_T>
102 std::vector<size_t>& offset) {
103 size_t row_num = col->length();
105 if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
109 if (!col->type()->Equals(expected_type)) {
110 LOG(FATAL) <<
"Inconsistent data type, expect "
111 << expected_type->ToString() <<
", but got "
112 << col->type()->ToString();
114 auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
115 for (
size_t i = 0; i < row_num; ++i) {
116 if (!indexer.
add(casted_array->Value(i), vid)) {
117 VLOG(2) <<
"Duplicate vertex id: " << casted_array->Value(i) <<
"..";
118 offset.emplace_back(std::numeric_limits<size_t>::max());
120 offset.emplace_back(vid);
124 if (col->type()->Equals(arrow::utf8())) {
125 auto casted_array = std::static_pointer_cast<arrow::StringArray>(col);
126 for (
size_t i = 0; i < row_num; ++i) {
127 auto str = casted_array->GetView(i);
128 std::string_view str_view(str.data(), str.size());
129 if (!indexer.
add(str_view, vid)) {
130 VLOG(2) <<
"Duplicate vertex id: " << str_view <<
"..";
131 offset.emplace_back(std::numeric_limits<size_t>::max());
133 offset.emplace_back(vid);
136 }
else if (col->type()->Equals(arrow::large_utf8())) {
138 std::static_pointer_cast<arrow::LargeStringArray>(col);
139 for (
size_t i = 0; i < row_num; ++i) {
140 auto str = casted_array->GetView(i);
141 std::string_view str_view(str.data(), str.size());
142 if (!indexer.
add(str_view, vid)) {
143 VLOG(2) <<
"Duplicate vertex id: " << str_view <<
"..";
144 offset.emplace_back(std::numeric_limits<size_t>::max());
146 offset.emplace_back(vid);
150 LOG(FATAL) <<
"Not support type: " << col->type()->ToString();
156 void operator()(
const std::shared_ptr<arrow::Array>& col,
157 PTIndexerBuilder<KEY_T, vid_t>& indexer) {
158 size_t row_num = col->length();
159 if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
163 if (!col->type()->Equals(expected_type)) {
164 LOG(FATAL) <<
"Inconsistent data type, expect "
165 << expected_type->ToString() <<
", but got "
166 << col->type()->ToString();
168 auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
169 for (
size_t i = 0; i < row_num; ++i) {
170 indexer.add_vertex(casted_array->Value(i));
173 if (col->type()->Equals(arrow::utf8())) {
174 auto casted_array = std::static_pointer_cast<arrow::StringArray>(col);
175 for (
size_t i = 0; i < row_num; ++i) {
176 auto str = casted_array->GetView(i);
177 std::string_view str_view(str.data(), str.size());
178 indexer.add_vertex(str_view);
180 }
else if (col->type()->Equals(arrow::large_utf8())) {
182 std::static_pointer_cast<arrow::LargeStringArray>(col);
183 for (
size_t i = 0; i < row_num; ++i) {
184 auto str = casted_array->GetView(i);
185 std::string_view str_view(str.data(), str.size());
186 indexer.add_vertex(str_view);
189 LOG(FATAL) <<
"Not support type: " << col->type()->ToString();
196 template <
typename PK_T,
typename EDATA_T,
typename VECTOR_T>
197 void _append(
bool is_dst,
size_t cur_ind, std::shared_ptr<arrow::Array> col,
198 const IndexerType& indexer, VECTOR_T& parsed_edges,
199 std::vector<std::atomic<int32_t>>& degree) {
200 static constexpr
auto invalid_vid = std::numeric_limits<vid_t>::max();
201 if constexpr (std::is_same_v<PK_T, std::string_view>) {
202 if (col->type()->Equals(arrow::utf8())) {
203 auto casted = std::static_pointer_cast<arrow::StringArray>(col);
204 for (
auto j = 0; j < casted->length(); ++j) {
205 auto str = casted->GetView(j);
206 std::string_view str_view(str.data(), str.size());
209 std::get<1>(parsed_edges[cur_ind++]) = vid;
211 std::get<0>(parsed_edges[cur_ind++]) = vid;
213 if (vid != invalid_vid) {
219 auto casted = std::static_pointer_cast<arrow::LargeStringArray>(col);
220 for (
auto j = 0; j < casted->length(); ++j) {
221 auto str = casted->GetView(j);
222 std::string_view str_view(str.data(), str.size());
225 std::get<1>(parsed_edges[cur_ind++]) = vid;
227 std::get<0>(parsed_edges[cur_ind++]) = vid;
229 if (vid != invalid_vid) {
236 auto casted = std::static_pointer_cast<arrow_array_type>(col);
237 for (
auto j = 0; j < casted->length(); ++j) {
240 std::get<1>(parsed_edges[cur_ind++]) = vid;
242 std::get<0>(parsed_edges[cur_ind++]) = vid;
244 if (vid != invalid_vid) {
251 template <
typename SRC_PK_T,
typename DST_PK_T,
typename EDATA_T,
254 std::shared_ptr<arrow::Array> dst_col,
257 std::shared_ptr<arrow::Array>& edata_cols,
259 std::vector<std::atomic<int32_t>>& ie_degree,
260 std::vector<std::atomic<int32_t>>& oe_degree,
262 CHECK(src_col->length() == dst_col->length());
263 auto indexer_check_lambda = [](
const IndexerType& cur_indexer,
264 const std::shared_ptr<arrow::Array>& cur_col) {
266 CHECK(cur_col->type()->Equals(arrow::int64()));
268 CHECK(cur_col->type()->Equals(arrow::utf8()) ||
269 cur_col->type()->Equals(arrow::large_utf8()));
271 CHECK(cur_col->type()->Equals(arrow::int32()));
273 CHECK(cur_col->type()->Equals(arrow::uint32()));
275 CHECK(cur_col->type()->Equals(arrow::uint64()));
279 indexer_check_lambda(src_indexer, src_col);
280 indexer_check_lambda(dst_indexer, dst_col);
281 auto old_size = parsed_edges.size();
282 parsed_edges.resize(old_size + src_col->length());
283 VLOG(10) <<
"resize parsed_edges from" << old_size <<
" to "
284 << parsed_edges.size();
287 auto edata_col_thread = std::thread([&]() {
288 if constexpr (std::is_same<EDATA_T, RecordView>::value) {
289 size_t cur_ind = old_size;
290 for (
auto j = 0; j < src_col->length(); ++j) {
291 std::get<2>(parsed_edges[cur_ind++]) = offset++;
293 }
else if constexpr (!std::is_same<EDATA_T, grape::EmptyType>::value) {
294 auto edata_col = edata_cols;
295 CHECK(src_col->length() == edata_col->length());
296 size_t cur_ind = old_size;
297 auto type = edata_col->type();
299 LOG(FATAL) <<
"Inconsistent data type, expect "
301 <<
", but got " << type->ToString();
304 using arrow_array_type =
307 auto data = std::static_pointer_cast<arrow_array_type>(edata_col);
308 for (
auto j = 0; j < edata_col->length(); ++j) {
309 if constexpr (std::is_same<arrow_array_type,
310 arrow::StringArray>::value ||
311 std::is_same<arrow_array_type,
312 arrow::LargeStringArray>::value) {
313 auto str = data->GetView(j);
314 std::string_view str_view(str.data(), str.size());
315 std::get<2>(parsed_edges[cur_ind++]) = str_view;
317 std::get<2>(parsed_edges[cur_ind++]) = data->Value(j);
320 VLOG(10) <<
"Finish inserting: " << src_col->length() <<
" edges";
323 size_t cur_ind = old_size;
324 auto src_col_thread = std::thread([&]() {
325 _append<SRC_PK_T, EDATA_T>(
false, cur_ind, src_col, src_indexer,
326 parsed_edges, oe_degree);
328 auto dst_col_thread = std::thread([&]() {
329 _append<DST_PK_T, EDATA_T>(
true, cur_ind, dst_col, dst_indexer,
330 parsed_edges, ie_degree);
332 src_col_thread.join();
333 dst_col_thread.join();
334 edata_col_thread.join();
361 label_t v_label_id,
const std::vector<std::string>& input_paths,
362 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
371 const std::vector<std::string>& input_paths,
372 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
378 template <
typename KEY_T>
381 std::shared_ptr<arrow::Array>& primary_key_col,
382 const std::vector<std::shared_ptr<arrow::Array>>& property_cols,
383 std::shared_mutex& rw_mutex) {
384 size_t row_num = primary_key_col->length();
385 auto col_num = property_cols.size();
386 for (
size_t i = 0; i < col_num; ++i) {
387 CHECK_EQ(property_cols[i]->length(), row_num);
390 std::vector<size_t> vids;
391 vids.reserve(row_num);
393 std::unique_lock<std::mutex> lock(
mtxs_[v_label_id]);
397 std::shared_lock<std::shared_mutex> lock(rw_mutex);
398 for (
size_t j = 0; j < property_cols.size(); ++j) {
399 auto array = property_cols[j];
400 auto chunked_array = std::make_shared<arrow::ChunkedArray>(array);
403 chunked_array, vids);
407 VLOG(10) <<
"Insert rows: " << row_num;
411 template <
typename KEY_T>
413 label_t v_label_id,
const std::vector<std::string>& v_files,
414 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
418 VLOG(10) <<
"Parsing vertex file:" << v_files.size() <<
" for label "
421 auto primary_key_name = std::get<1>(primary_key);
422 size_t primary_key_ind = std::get<2>(primary_key);
425 grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
426 queue.SetLimit(1024);
427 std::vector<std::thread> work_threads;
429 for (
auto& v_file : v_files) {
430 VLOG(10) <<
"Parsing vertex file:" << v_file <<
" for label "
432 auto record_batch_supplier_vec =
434 std::thread::hardware_concurrency());
435 queue.SetProducerNum(record_batch_supplier_vec.size());
437 for (
size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
438 work_threads.emplace_back(
440 auto& record_batch_supplier = record_batch_supplier_vec[i];
441 bool first_batch =
true;
443 auto batch = record_batch_supplier->GetNextBatch();
445 queue.DecProducerNum();
449 auto header = batch->schema()->field_names();
450 auto schema_column_names =
452 CHECK(schema_column_names.size() + 1 == header.size())
453 <<
"File header of size: " << header.size()
454 <<
" does not match schema column size: "
455 << schema_column_names.size() + 1;
464 std::atomic<size_t> offset(0);
465 std::shared_mutex rw_mutex;
466 for (
unsigned idx = 0;
468 std::min(
static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
469 std::thread::hardware_concurrency());
471 work_threads.emplace_back(
478 std::shared_ptr<arrow::RecordBatch> batch{
nullptr};
479 auto ret = queue.Get(batch);
484 LOG(FATAL) <<
"get nullptr batch";
486 auto columns = batch->
columns();
487 CHECK(primary_key_ind < columns.size());
488 auto primary_key_column = columns[primary_key_ind];
489 auto other_columns_array = columns;
490 other_columns_array.erase(other_columns_array.begin() +
494 offset.fetch_add(primary_key_column->length());
495 size_t cur_row_num = std::max(vtable.row_num(), 1ul);
497 local_offset + primary_key_column->length()) {
500 if (cur_row_num > vtable.row_num()) {
501 std::unique_lock<std::shared_mutex> lock(rw_mutex);
502 if (cur_row_num > vtable.row_num()) {
503 LOG(INFO) <<
"Resize vertex table from " << vtable.row_num()
504 <<
" to " << cur_row_num
505 <<
", local_offset: " << local_offset;
506 vtable.resize(cur_row_num);
511 other_columns_array, rw_mutex);
516 for (
auto& t : work_threads) {
519 work_threads.clear();
520 VLOG(10) <<
"Finish parsing vertex file:" << v_file <<
" for label "
524 VLOG(10) <<
"Finish parsing vertex file:" << v_files.size() <<
" for label "
532 template <
typename KEY_T>
534 label_t v_label_id,
const std::vector<std::string>& v_files,
535 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
539 VLOG(10) <<
"Parsing vertex file:" << v_files.size() <<
" for label "
542 auto primary_key_name = std::get<1>(primary_key);
543 size_t primary_key_ind = std::get<2>(primary_key);
544 grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
545 queue.SetLimit(1024);
546 PTIndexerBuilder<KEY_T, vid_t> indexer_builder;
547 std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> batchs(
548 std::thread::hardware_concurrency());
549 std::vector<std::thread> work_threads;
550 for (
auto& v_file : v_files) {
551 VLOG(10) <<
"Parsing vertex file:" << v_file <<
" for label "
553 auto record_batch_supplier_vec =
555 std::thread::hardware_concurrency());
556 queue.SetProducerNum(record_batch_supplier_vec.size());
557 for (
size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
558 work_threads.emplace_back(
560 auto& record_batch_supplier = record_batch_supplier_vec[i];
561 bool first_batch =
true;
563 auto batch = record_batch_supplier->GetNextBatch();
565 queue.DecProducerNum();
569 auto header = batch->schema()->field_names();
570 auto schema_column_names =
572 CHECK(schema_column_names.size() + 1 == header.size())
573 <<
"File header of size: " << header.size()
574 <<
" does not match schema column size: "
575 << schema_column_names.size() + 1;
584 for (
unsigned idx = 0; idx < std::thread::hardware_concurrency(); ++idx) {
585 work_threads.emplace_back(
588 std::shared_ptr<arrow::RecordBatch> batch{
nullptr};
589 auto ret = queue.Get(batch);
594 LOG(FATAL) <<
"get nullptr batch";
596 batchs[i].emplace_back(batch);
597 auto columns = batch->columns();
598 CHECK(primary_key_ind < columns.size());
599 auto primary_key_column = columns[primary_key_ind];
601 std::unique_lock<std::mutex> lock(
mtxs_[v_label_id]);
602 _add_vertex<KEY_T>()(primary_key_column, indexer_builder);
608 for (
auto& t : work_threads) {
611 work_threads.clear();
613 VLOG(10) <<
"Finish parsing vertex file:" << v_file <<
" for label "
620 size_t total_row_num = 0;
621 for (
auto& batch : batchs) {
622 for (
auto& b : batch) {
623 total_row_num += b->num_rows();
626 if (total_row_num > vtable.row_num()) {
627 std::unique_lock<std::mutex> lock(
mtxs_[v_label_id]);
628 if (total_row_num > vtable.row_num()) {
629 LOG(INFO) <<
"Resize vertex table from " << vtable.row_num() <<
" to "
631 vtable.resize(total_row_num);
635 std::atomic<size_t> cur_batch_id(0);
636 for (
unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
637 work_threads.emplace_back(
639 for (
size_t id = 0;
id < batchs[idx].size(); ++id) {
640 auto batch = batchs[idx][id];
641 auto columns = batch->columns();
642 auto other_columns_array = columns;
643 auto primary_key_column = columns[primary_key_ind];
644 size_t row_num = primary_key_column->length();
645 std::vector<size_t> vids;
646 if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
647 using arrow_array_t =
650 std::static_pointer_cast<arrow_array_t>(primary_key_column);
651 for (
size_t i = 0; i < row_num; ++i) {
652 vids.emplace_back(indexer.get_index(casted_array->Value(i)));
655 if (primary_key_column->type()->Equals(arrow::utf8())) {
657 std::static_pointer_cast<arrow::StringArray>(
659 for (
size_t i = 0; i < row_num; ++i) {
660 auto str = casted_array->GetView(i);
661 std::string_view str_view(str.data(), str.size());
662 vids.emplace_back(indexer.get_index(str_view));
664 }
else if (primary_key_column->type()->Equals(
665 arrow::large_utf8())) {
667 std::static_pointer_cast<arrow::LargeStringArray>(
669 for (
size_t i = 0; i < row_num; ++i) {
670 auto str = casted_array->GetView(i);
671 std::string_view str_view(str.data(), str.size());
672 vids.emplace_back(indexer.get_index(str_view));
676 other_columns_array.erase(other_columns_array.begin() +
679 for (
size_t j = 0; j < other_columns_array.size(); ++j) {
680 auto array = other_columns_array[j];
682 std::make_shared<arrow::ChunkedArray>(array);
690 for (
auto& t : work_threads) {
697 v_data.resize(indexer.size());
701 VLOG(10) <<
"Finish parsing vertex file:" << v_files.size() <<
" for label "
706 template <
typename SRC_PK_T,
typename EDATA_T,
typename VECTOR_T>
708 std::shared_ptr<arrow::Array> dst_col,
711 std::shared_ptr<arrow::Array>& property_cols,
712 const PropertyType& edge_property, VECTOR_T& parsed_edges,
713 std::vector<std::atomic<int32_t>>& ie_degree,
714 std::vector<std::atomic<int32_t>>& oe_degree,
716 auto dst_col_type = dst_col->type();
717 if (dst_col_type->Equals(arrow::int64())) {
718 append_edges<SRC_PK_T, int64_t, EDATA_T>(
719 src_col, dst_col, src_indexer, dst_indexer, property_cols,
720 edge_property, parsed_edges, ie_degree, oe_degree, offset);
721 }
else if (dst_col_type->Equals(arrow::uint64())) {
722 append_edges<SRC_PK_T, uint64_t, EDATA_T>(
723 src_col, dst_col, src_indexer, dst_indexer, property_cols,
724 edge_property, parsed_edges, ie_degree, oe_degree, offset);
725 }
else if (dst_col_type->Equals(arrow::int32())) {
726 append_edges<SRC_PK_T, int32_t, EDATA_T>(
727 src_col, dst_col, src_indexer, dst_indexer, property_cols,
728 edge_property, parsed_edges, ie_degree, oe_degree, offset);
729 }
else if (dst_col_type->Equals(arrow::uint32())) {
730 append_edges<SRC_PK_T, uint32_t, EDATA_T>(
731 src_col, dst_col, src_indexer, dst_indexer, property_cols,
732 edge_property, parsed_edges, ie_degree, oe_degree, offset);
735 append_edges<SRC_PK_T, std::string_view, EDATA_T>(
736 src_col, dst_col, src_indexer, dst_indexer, property_cols,
737 edge_property, parsed_edges, ie_degree, oe_degree, offset);
740 template <
typename EDATA_T>
743 const std::vector<std::string>& e_files,
744 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
748 if constexpr (std::is_same_v<EDATA_T, RecordView>) {
751 EDATA_T, std::vector<std::tuple<vid_t, vid_t, size_t>>>(
752 src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
755 EDATA_T, std::vector<std::tuple<vid_t, vid_t, size_t>>>(
756 src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
762 src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
765 EDATA_T, std::vector<std::tuple<vid_t, vid_t, EDATA_T>>>(
766 src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
771 template <
typename EDATA_T,
typename VECTOR_T>
774 const std::vector<std::string>& e_files,
775 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
783 src_label_id, dst_label_id, e_label_id);
785 src_label_id, dst_label_id, e_label_id);
786 if (src_dst_col_pair.first.size() != 1 ||
787 src_dst_col_pair.second.size() != 1) {
788 LOG(FATAL) <<
"We currently only support one src primary key and one "
791 size_t src_col_ind = src_dst_col_pair.first[0].second;
792 size_t dst_col_ind = src_dst_col_pair.second[0].second;
793 CHECK(src_col_ind != dst_col_ind);
796 dst_col_ind, src_label_id, dst_label_id, e_label_id);
800 std::vector<VECTOR_T> parsed_edges_vec(std::thread::hardware_concurrency());
801 if constexpr (std::is_same_v<
808 for (
unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
809 parsed_edges_vec[i].open(
runtime_dir(work_dir) +
"/" + src_label_name +
810 "_" + dst_label_name +
"_" + edge_label_name +
812 parsed_edges_vec[i].reserve(4096);
815 std::vector<std::atomic<int32_t>> ie_degree(dst_indexer.size()),
816 oe_degree(src_indexer.size());
817 for (
size_t idx = 0; idx < ie_degree.size(); ++idx) {
818 ie_degree[idx].store(0);
820 for (
size_t idx = 0; idx < oe_degree.size(); ++idx) {
821 oe_degree[idx].store(0);
823 VLOG(10) <<
"src indexer size: " << src_indexer.size()
824 <<
" dst indexer size: " << dst_indexer.size();
826 grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
827 queue.SetLimit(1024);
828 std::vector<std::thread> work_threads;
830 std::vector<std::vector<std::shared_ptr<arrow::Array>>> string_columns(
831 std::thread::hardware_concurrency());
833 if constexpr (std::is_same<EDATA_T, RecordView>::value) {
840 std::vector<std::shared_ptr<arrow::Array>> string_cols;
841 std::atomic<size_t> offset(0);
842 std::shared_mutex rw_mutex;
843 for (
auto filename : e_files) {
844 auto record_batch_supplier_vec =
845 supplier_creator(src_label_id, dst_label_id, e_label_id, filename,
848 queue.SetProducerNum(record_batch_supplier_vec.size());
850 for (
size_t i = 0; i < record_batch_supplier_vec.size(); ++i) {
851 work_threads.emplace_back(
853 auto& string_column = string_columns[idx];
854 bool first_batch =
true;
855 auto& record_batch_supplier = record_batch_supplier_vec[idx];
857 auto record_batch = record_batch_supplier->GetNextBatch();
859 queue.DecProducerNum();
863 auto header = record_batch->schema()->field_names();
865 src_label_id, dst_label_id, e_label_id);
867 src_label_id, dst_label_id, e_label_id);
868 CHECK(schema_column_names.size() + 2 == header.size())
869 <<
"schema size: " << schema_column_names.size()
870 <<
" neq header size: " << header.size();
873 for (
auto i = 0; i < record_batch->num_columns(); ++i) {
874 if (record_batch->column(i)->type()->Equals(arrow::utf8()) ||
875 record_batch->column(i)->type()->Equals(
876 arrow::large_utf8())) {
877 string_column.emplace_back(record_batch->column(i));
881 queue.Put(record_batch);
888 std::min(
static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
889 std::thread::hardware_concurrency());
891 work_threads.emplace_back(
894 auto& parsed_edges = parsed_edges_vec[idx];
896 std::shared_ptr<arrow::RecordBatch> record_batch{
nullptr};
897 auto ret = queue.Get(record_batch);
902 LOG(FATAL) <<
"get nullptr batch";
904 auto columns = record_batch->columns();
907 CHECK(columns.size() >= 2);
908 auto src_col = columns[0];
909 auto dst_col = columns[1];
910 auto src_col_type = src_col->type();
911 auto dst_col_type = dst_col->type();
913 <<
"unsupported src_col type: " << src_col_type->ToString();
915 <<
"unsupported dst_col type: " << dst_col_type->ToString();
917 std::vector<std::shared_ptr<arrow::Array>> property_cols;
918 for (
size_t i = 2; i < columns.size(); ++i) {
919 property_cols.emplace_back(columns[i]);
922 if constexpr (std::is_same<EDATA_T, RecordView>::value) {
926 CHECK(casted_csr != NULL);
927 auto table = casted_csr->GetTable();
928 CHECK(table.col_num() == property_cols.size());
929 offset_i = offset.fetch_add(src_col->length());
930 std::vector<size_t> offsets;
932 _i < static_cast<size_t>(src_col->length()); ++_i) {
933 offsets.emplace_back(offset_i + _i);
935 size_t row_num = std::max(table.row_num(), 1ul);
937 while (row_num < offset_i + src_col->length()) {
940 if (row_num > table.row_num()) {
941 std::unique_lock<std::shared_mutex> lock(rw_mutex);
942 if (row_num > table.row_num()) {
943 table.resize(row_num);
948 std::shared_lock<std::shared_mutex> lock(rw_mutex);
949 for (
size_t i = 0; i < table.col_num(); ++i) {
950 auto col = table.get_column_by_id(i);
952 std::make_shared<arrow::ChunkedArray>(
959 src_label_id, dst_label_id, e_label_id);
961 CHECK(src_col->length() == dst_col->length());
962 if (src_col_type->Equals(arrow::int64())) {
963 _append_edges<int64_t, EDATA_T, VECTOR_T>(
964 src_col, dst_col, src_indexer, dst_indexer,
965 property_cols[0], edge_property, parsed_edges, ie_degree,
966 oe_degree, offset_i);
967 }
else if (src_col_type->Equals(arrow::uint64())) {
968 _append_edges<uint64_t, EDATA_T, VECTOR_T>(
969 src_col, dst_col, src_indexer, dst_indexer,
970 property_cols[0], edge_property, parsed_edges, ie_degree,
971 oe_degree, offset_i);
972 }
else if (src_col_type->Equals(arrow::int32())) {
973 _append_edges<int32_t, EDATA_T, VECTOR_T>(
974 src_col, dst_col, src_indexer, dst_indexer,
975 property_cols[0], edge_property, parsed_edges, ie_degree,
976 oe_degree, offset_i);
977 }
else if (src_col_type->Equals(arrow::uint32())) {
978 _append_edges<uint32_t, EDATA_T, VECTOR_T>(
979 src_col, dst_col, src_indexer, dst_indexer,
980 property_cols[0], edge_property, parsed_edges, ie_degree,
981 oe_degree, offset_i);
984 _append_edges<std::string_view, EDATA_T, VECTOR_T>(
985 src_col, dst_col, src_indexer, dst_indexer,
986 property_cols[0], edge_property, parsed_edges, ie_degree,
987 oe_degree, offset_i);
994 for (
auto& t : work_threads) {
997 VLOG(10) <<
"Finish parsing edge file:" << filename <<
" for label "
998 << src_label_name <<
" -> " << dst_label_name <<
" -> "
1001 VLOG(10) <<
"Finish parsing edge file:" << e_files.size() <<
" for label "
1002 << src_label_name <<
" -> " << dst_label_name <<
" -> "
1004 std::vector<int32_t> ie_deg(ie_degree.size());
1005 std::vector<int32_t> oe_deg(oe_degree.size());
1006 for (
size_t idx = 0; idx < ie_deg.size(); ++idx) {
1007 ie_deg[idx] = ie_degree[idx];
1009 for (
size_t idx = 0; idx < oe_deg.size(); ++idx) {
1010 oe_deg[idx] = oe_degree[idx];
1014 src_label_id, dst_label_id, e_label_id, parsed_edges_vec, ie_deg,
1017 string_columns.clear();
1019 for (
auto& edges : parsed_edges_vec) {
1020 sum += edges.size();
1022 std::is_same<VECTOR_T,
1023 mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>>::value ||
1024 std::is_same<VECTOR_T,
1025 mmap_vector<std::tuple<vid_t, vid_t, size_t>>>::value) {
1030 VLOG(10) <<
"Finish putting: " << sum <<
" edges";
1045 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_