17 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
18 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_
25 #include "grape/utils/concurrent_queue.h"
27 #include <arrow/api.h>
28 #include <arrow/io/api.h>
29 #include <shared_mutex>
30 #include "arrow/util/value_parsing.h"
32 #include "grape/util.h"
42 virtual std::shared_ptr<arrow::RecordBatch>
GetNextBatch() = 0;
48 template <
typename COL_T>
50 const std::vector<size_t>& offset) {
52 auto array_type = array->type();
54 CHECK(array_type->Equals(arrow_type))
55 <<
"Inconsistent data type, expect " << arrow_type->ToString()
56 <<
", but got " << array_type->ToString();
58 for (
auto j = 0; j < array->num_chunks(); ++j) {
59 auto casted = std::static_pointer_cast<arrow_array_type>(array->chunk(j));
60 size_t size = col->
size();
61 for (
auto k = 0; k < casted->length(); ++k) {
62 if (offset[cur_ind] >= size) {
74 std::shared_ptr<arrow::ChunkedArray> array,
75 const std::vector<size_t>& offset);
78 std::shared_ptr<arrow::ChunkedArray> array,
79 const std::vector<size_t>& offset);
83 const std::vector<size_t>& offset);
86 std::shared_ptr<arrow::ChunkedArray> array,
87 const std::vector<size_t>& offset);
91 const std::vector<std::tuple<size_t, std::string, std::string>>&
93 size_t src_col_ind,
size_t dst_col_ind,
label_t src_label_i,
96 template <
typename KEY_T>
99 void operator()(
const std::shared_ptr<arrow::Array>& col,
101 std::vector<size_t>& offset) {
102 size_t row_num = col->length();
104 if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
108 if (!col->type()->Equals(expected_type)) {
109 LOG(FATAL) <<
"Inconsistent data type, expect "
110 << expected_type->ToString() <<
", but got "
111 << col->type()->ToString();
113 auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
114 for (
size_t i = 0; i < row_num; ++i) {
115 if (!indexer.
add(casted_array->Value(i), vid)) {
116 VLOG(2) <<
"Duplicate vertex id: " << casted_array->Value(i) <<
"..";
117 offset.emplace_back(std::numeric_limits<size_t>::max());
119 offset.emplace_back(vid);
123 if (col->type()->Equals(arrow::utf8())) {
124 auto casted_array = std::static_pointer_cast<arrow::StringArray>(col);
125 for (
size_t i = 0; i < row_num; ++i) {
126 auto str = casted_array->GetView(i);
127 std::string_view str_view(str.data(), str.size());
128 if (!indexer.
add(str_view, vid)) {
129 VLOG(2) <<
"Duplicate vertex id: " << str_view <<
"..";
130 offset.emplace_back(std::numeric_limits<size_t>::max());
132 offset.emplace_back(vid);
135 }
else if (col->type()->Equals(arrow::large_utf8())) {
137 std::static_pointer_cast<arrow::LargeStringArray>(col);
138 for (
size_t i = 0; i < row_num; ++i) {
139 auto str = casted_array->GetView(i);
140 std::string_view str_view(str.data(), str.size());
141 if (!indexer.
add(str_view, vid)) {
142 VLOG(2) <<
"Duplicate vertex id: " << str_view <<
"..";
143 offset.emplace_back(std::numeric_limits<size_t>::max());
145 offset.emplace_back(vid);
149 LOG(FATAL) <<
"Not support type: " << col->type()->ToString();
155 void operator()(
const std::shared_ptr<arrow::Array>& col,
156 PTIndexerBuilder<KEY_T, vid_t>& indexer) {
157 size_t row_num = col->length();
158 if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
162 if (!col->type()->Equals(expected_type)) {
163 LOG(FATAL) <<
"Inconsistent data type, expect "
164 << expected_type->ToString() <<
", but got "
165 << col->type()->ToString();
167 auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
168 for (
size_t i = 0; i < row_num; ++i) {
169 indexer.add_vertex(casted_array->Value(i));
172 if (col->type()->Equals(arrow::utf8())) {
173 auto casted_array = std::static_pointer_cast<arrow::StringArray>(col);
174 for (
size_t i = 0; i < row_num; ++i) {
175 auto str = casted_array->GetView(i);
176 std::string_view str_view(str.data(), str.size());
177 indexer.add_vertex(str_view);
179 }
else if (col->type()->Equals(arrow::large_utf8())) {
181 std::static_pointer_cast<arrow::LargeStringArray>(col);
182 for (
size_t i = 0; i < row_num; ++i) {
183 auto str = casted_array->GetView(i);
184 std::string_view str_view(str.data(), str.size());
185 indexer.add_vertex(str_view);
188 LOG(FATAL) <<
"Not support type: " << col->type()->ToString();
195 template <
typename PK_T,
typename EDATA_T,
typename VECTOR_T>
196 void _append(
bool is_dst,
size_t cur_ind, std::shared_ptr<arrow::Array> col,
197 const IndexerType& indexer, VECTOR_T& parsed_edges,
198 std::vector<std::atomic<int32_t>>& degree) {
199 static constexpr
auto invalid_vid = std::numeric_limits<vid_t>::max();
200 if constexpr (std::is_same_v<PK_T, std::string_view>) {
201 if (col->type()->Equals(arrow::utf8())) {
202 auto casted = std::static_pointer_cast<arrow::StringArray>(col);
203 for (
auto j = 0; j < casted->length(); ++j) {
204 auto str = casted->GetView(j);
205 std::string_view str_view(str.data(), str.size());
208 std::get<1>(parsed_edges[cur_ind++]) = vid;
210 std::get<0>(parsed_edges[cur_ind++]) = vid;
212 if (vid != invalid_vid) {
218 auto casted = std::static_pointer_cast<arrow::LargeStringArray>(col);
219 for (
auto j = 0; j < casted->length(); ++j) {
220 auto str = casted->GetView(j);
221 std::string_view str_view(str.data(), str.size());
224 std::get<1>(parsed_edges[cur_ind++]) = vid;
226 std::get<0>(parsed_edges[cur_ind++]) = vid;
228 if (vid != invalid_vid) {
235 auto casted = std::static_pointer_cast<arrow_array_type>(col);
236 for (
auto j = 0; j < casted->length(); ++j) {
239 std::get<1>(parsed_edges[cur_ind++]) = vid;
241 std::get<0>(parsed_edges[cur_ind++]) = vid;
243 if (vid != invalid_vid) {
250 template <
typename SRC_PK_T,
typename DST_PK_T,
typename EDATA_T,
253 std::shared_ptr<arrow::Array> dst_col,
256 std::shared_ptr<arrow::Array>& edata_cols,
258 std::vector<std::atomic<int32_t>>& ie_degree,
259 std::vector<std::atomic<int32_t>>& oe_degree,
261 CHECK(src_col->length() == dst_col->length());
262 auto indexer_check_lambda = [](
const IndexerType& cur_indexer,
263 const std::shared_ptr<arrow::Array>& cur_col) {
265 CHECK(cur_col->type()->Equals(arrow::int64()));
267 CHECK(cur_col->type()->Equals(arrow::utf8()) ||
268 cur_col->type()->Equals(arrow::large_utf8()));
270 CHECK(cur_col->type()->Equals(arrow::int32()));
272 CHECK(cur_col->type()->Equals(arrow::uint32()));
274 CHECK(cur_col->type()->Equals(arrow::uint64()));
278 indexer_check_lambda(src_indexer, src_col);
279 indexer_check_lambda(dst_indexer, dst_col);
280 auto old_size = parsed_edges.size();
281 parsed_edges.resize(old_size + src_col->length());
282 VLOG(10) <<
"resize parsed_edges from" << old_size <<
" to "
283 << parsed_edges.size();
286 auto edata_col_thread = std::thread([&]() {
287 if constexpr (std::is_same<EDATA_T, RecordView>::value) {
288 size_t cur_ind = old_size;
289 for (
auto j = 0; j < src_col->length(); ++j) {
290 std::get<2>(parsed_edges[cur_ind++]) = offset++;
292 }
else if constexpr (!std::is_same<EDATA_T, grape::EmptyType>::value) {
293 auto edata_col = edata_cols;
294 CHECK(src_col->length() == edata_col->length());
295 size_t cur_ind = old_size;
296 auto type = edata_col->type();
298 LOG(FATAL) <<
"Inconsistent data type, expect "
300 <<
", but got " << type->ToString();
303 using arrow_array_type =
306 auto data = std::static_pointer_cast<arrow_array_type>(edata_col);
307 for (
auto j = 0; j < edata_col->length(); ++j) {
308 if constexpr (std::is_same<arrow_array_type,
309 arrow::StringArray>::value ||
310 std::is_same<arrow_array_type,
311 arrow::LargeStringArray>::value) {
312 auto str = data->GetView(j);
313 std::string_view str_view(str.data(), str.size());
314 std::get<2>(parsed_edges[cur_ind++]) = str_view;
316 std::get<2>(parsed_edges[cur_ind++]) = data->Value(j);
319 VLOG(10) <<
"Finish inserting: " << src_col->length() <<
" edges";
322 size_t cur_ind = old_size;
323 auto src_col_thread = std::thread([&]() {
324 _append<SRC_PK_T, EDATA_T>(
false, cur_ind, src_col, src_indexer,
325 parsed_edges, oe_degree);
327 auto dst_col_thread = std::thread([&]() {
328 _append<DST_PK_T, EDATA_T>(
true, cur_ind, dst_col, dst_indexer,
329 parsed_edges, ie_degree);
331 src_col_thread.join();
332 dst_col_thread.join();
333 edata_col_thread.join();
360 label_t v_label_id,
const std::vector<std::string>& input_paths,
361 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
370 const std::vector<std::string>& input_paths,
371 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
377 template <
typename KEY_T>
380 std::shared_ptr<arrow::Array>& primary_key_col,
381 const std::vector<std::shared_ptr<arrow::Array>>& property_cols) {
382 size_t row_num = primary_key_col->length();
383 auto col_num = property_cols.size();
384 for (
size_t i = 0; i < col_num; ++i) {
385 CHECK_EQ(property_cols[i]->length(), row_num);
388 std::vector<size_t> vids;
389 vids.reserve(row_num);
391 std::unique_lock<std::mutex> lock(
mtxs_[v_label_id]);
394 for (
size_t j = 0; j < property_cols.size(); ++j) {
395 auto array = property_cols[j];
396 auto chunked_array = std::make_shared<arrow::ChunkedArray>(array);
399 chunked_array, vids);
402 VLOG(10) <<
"Insert rows: " << row_num;
406 template <
typename KEY_T>
408 label_t v_label_id,
const std::vector<std::string>& v_files,
409 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
413 VLOG(10) <<
"Parsing vertex file:" << v_files.size() <<
" for label "
416 auto primary_key_name = std::get<1>(primary_key);
417 size_t primary_key_ind = std::get<2>(primary_key);
420 grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
421 queue.SetLimit(1024);
422 std::vector<std::thread> work_threads;
424 for (
auto& v_file : v_files) {
425 VLOG(10) <<
"Parsing vertex file:" << v_file <<
" for label "
427 auto record_batch_supplier_vec =
429 std::thread::hardware_concurrency());
430 queue.SetProducerNum(record_batch_supplier_vec.size());
432 for (
size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
433 work_threads.emplace_back(
435 auto& record_batch_supplier = record_batch_supplier_vec[i];
436 bool first_batch =
true;
438 auto batch = record_batch_supplier->GetNextBatch();
440 queue.DecProducerNum();
444 auto header = batch->schema()->field_names();
445 auto schema_column_names =
447 CHECK(schema_column_names.size() + 1 == header.size())
448 <<
"File header of size: " << header.size()
449 <<
" does not match schema column size: "
450 << schema_column_names.size() + 1;
458 for (
unsigned idx = 0;
460 std::min(
static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
461 std::thread::hardware_concurrency());
463 work_threads.emplace_back(
466 std::shared_ptr<arrow::RecordBatch> batch{
nullptr};
467 auto ret = queue.Get(batch);
472 LOG(FATAL) <<
"get nullptr batch";
474 auto columns = batch->columns();
475 CHECK(primary_key_ind < columns.size());
476 auto primary_key_column = columns[primary_key_ind];
477 auto other_columns_array = columns;
478 other_columns_array.erase(other_columns_array.begin() +
482 other_columns_array);
487 for (
auto& t : work_threads) {
490 work_threads.clear();
491 VLOG(10) <<
"Finish parsing vertex file:" << v_file <<
" for label "
495 VLOG(10) <<
"Finish parsing vertex file:" << v_files.size() <<
" for label "
503 template <
typename KEY_T>
505 label_t v_label_id,
const std::vector<std::string>& v_files,
506 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
510 VLOG(10) <<
"Parsing vertex file:" << v_files.size() <<
" for label "
513 auto primary_key_name = std::get<1>(primary_key);
514 size_t primary_key_ind = std::get<2>(primary_key);
515 grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
516 queue.SetLimit(1024);
517 PTIndexerBuilder<KEY_T, vid_t> indexer_builder;
518 std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> batchs(
519 std::thread::hardware_concurrency());
520 std::vector<std::thread> work_threads;
521 for (
auto& v_file : v_files) {
522 VLOG(10) <<
"Parsing vertex file:" << v_file <<
" for label "
524 auto record_batch_supplier_vec =
526 std::thread::hardware_concurrency());
527 queue.SetProducerNum(record_batch_supplier_vec.size());
528 for (
size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
529 work_threads.emplace_back(
531 auto& record_batch_supplier = record_batch_supplier_vec[i];
532 bool first_batch =
true;
534 auto batch = record_batch_supplier->GetNextBatch();
536 queue.DecProducerNum();
540 auto header = batch->schema()->field_names();
541 auto schema_column_names =
543 CHECK(schema_column_names.size() + 1 == header.size())
544 <<
"File header of size: " << header.size()
545 <<
" does not match schema column size: "
546 << schema_column_names.size() + 1;
555 for (
unsigned idx = 0; idx < std::thread::hardware_concurrency(); ++idx) {
556 work_threads.emplace_back(
559 std::shared_ptr<arrow::RecordBatch> batch{
nullptr};
560 auto ret = queue.Get(batch);
565 LOG(FATAL) <<
"get nullptr batch";
567 batchs[i].emplace_back(batch);
568 auto columns = batch->columns();
569 CHECK(primary_key_ind < columns.size());
570 auto primary_key_column = columns[primary_key_ind];
572 std::unique_lock<std::mutex> lock(
mtxs_[v_label_id]);
573 _add_vertex<KEY_T>()(primary_key_column, indexer_builder);
579 for (
auto& t : work_threads) {
582 work_threads.clear();
584 VLOG(10) <<
"Finish parsing vertex file:" << v_file <<
" for label "
590 std::atomic<size_t> cur_batch_id(0);
591 for (
unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
592 work_threads.emplace_back(
594 for (
size_t id = 0;
id < batchs[idx].size(); ++id) {
595 auto batch = batchs[idx][id];
596 auto columns = batch->columns();
597 auto other_columns_array = columns;
598 auto primary_key_column = columns[primary_key_ind];
599 size_t row_num = primary_key_column->length();
600 std::vector<size_t> vids;
601 if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
602 using arrow_array_t =
605 std::static_pointer_cast<arrow_array_t>(primary_key_column);
606 for (
size_t i = 0; i < row_num; ++i) {
607 vids.emplace_back(indexer.get_index(casted_array->Value(i)));
610 if (primary_key_column->type()->Equals(arrow::utf8())) {
612 std::static_pointer_cast<arrow::StringArray>(
614 for (
size_t i = 0; i < row_num; ++i) {
615 auto str = casted_array->GetView(i);
616 std::string_view str_view(str.data(), str.size());
617 vids.emplace_back(indexer.get_index(str_view));
619 }
else if (primary_key_column->type()->Equals(
620 arrow::large_utf8())) {
622 std::static_pointer_cast<arrow::LargeStringArray>(
624 for (
size_t i = 0; i < row_num; ++i) {
625 auto str = casted_array->GetView(i);
626 std::string_view str_view(str.data(), str.size());
627 vids.emplace_back(indexer.get_index(str_view));
631 other_columns_array.erase(other_columns_array.begin() +
634 for (
size_t j = 0; j < other_columns_array.size(); ++j) {
635 auto array = other_columns_array[j];
637 std::make_shared<arrow::ChunkedArray>(array);
641 chunked_array, vids);
647 for (
auto& t : work_threads) {
654 v_data.resize(indexer.size());
658 VLOG(10) <<
"Finish parsing vertex file:" << v_files.size() <<
" for label "
663 template <
typename SRC_PK_T,
typename EDATA_T,
typename VECTOR_T>
665 std::shared_ptr<arrow::Array> dst_col,
668 std::shared_ptr<arrow::Array>& property_cols,
669 const PropertyType& edge_property, VECTOR_T& parsed_edges,
670 std::vector<std::atomic<int32_t>>& ie_degree,
671 std::vector<std::atomic<int32_t>>& oe_degree,
673 auto dst_col_type = dst_col->type();
674 if (dst_col_type->Equals(arrow::int64())) {
675 append_edges<SRC_PK_T, int64_t, EDATA_T>(
676 src_col, dst_col, src_indexer, dst_indexer, property_cols,
677 edge_property, parsed_edges, ie_degree, oe_degree, offset);
678 }
else if (dst_col_type->Equals(arrow::uint64())) {
679 append_edges<SRC_PK_T, uint64_t, EDATA_T>(
680 src_col, dst_col, src_indexer, dst_indexer, property_cols,
681 edge_property, parsed_edges, ie_degree, oe_degree, offset);
682 }
else if (dst_col_type->Equals(arrow::int32())) {
683 append_edges<SRC_PK_T, int32_t, EDATA_T>(
684 src_col, dst_col, src_indexer, dst_indexer, property_cols,
685 edge_property, parsed_edges, ie_degree, oe_degree, offset);
686 }
else if (dst_col_type->Equals(arrow::uint32())) {
687 append_edges<SRC_PK_T, uint32_t, EDATA_T>(
688 src_col, dst_col, src_indexer, dst_indexer, property_cols,
689 edge_property, parsed_edges, ie_degree, oe_degree, offset);
692 append_edges<SRC_PK_T, std::string_view, EDATA_T>(
693 src_col, dst_col, src_indexer, dst_indexer, property_cols,
694 edge_property, parsed_edges, ie_degree, oe_degree, offset);
697 template <
typename EDATA_T>
700 const std::vector<std::string>& e_files,
701 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
705 if constexpr (std::is_same_v<EDATA_T, RecordView>) {
708 EDATA_T, std::vector<std::tuple<vid_t, vid_t, size_t>>>(
709 src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
712 EDATA_T, std::vector<std::tuple<vid_t, vid_t, size_t>>>(
713 src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
719 src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
722 EDATA_T, std::vector<std::tuple<vid_t, vid_t, EDATA_T>>>(
723 src_label_id, dst_label_id, e_label_id, e_files, supplier_creator);
728 template <
typename EDATA_T,
typename VECTOR_T>
731 const std::vector<std::string>& e_files,
732 std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
740 src_label_id, dst_label_id, e_label_id);
742 src_label_id, dst_label_id, e_label_id);
743 if (src_dst_col_pair.first.size() != 1 ||
744 src_dst_col_pair.second.size() != 1) {
745 LOG(FATAL) <<
"We currently only support one src primary key and one "
748 size_t src_col_ind = src_dst_col_pair.first[0].second;
749 size_t dst_col_ind = src_dst_col_pair.second[0].second;
750 CHECK(src_col_ind != dst_col_ind);
753 dst_col_ind, src_label_id, dst_label_id, e_label_id);
757 std::vector<VECTOR_T> parsed_edges_vec(std::thread::hardware_concurrency());
758 if constexpr (std::is_same_v<
765 for (
unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
766 parsed_edges_vec[i].open(
runtime_dir(work_dir) +
"/" + src_label_name +
767 "_" + dst_label_name +
"_" + edge_label_name +
769 parsed_edges_vec[i].reserve(4096);
772 std::vector<std::atomic<int32_t>> ie_degree(dst_indexer.size()),
773 oe_degree(src_indexer.size());
774 for (
size_t idx = 0; idx < ie_degree.size(); ++idx) {
775 ie_degree[idx].store(0);
777 for (
size_t idx = 0; idx < oe_degree.size(); ++idx) {
778 oe_degree[idx].store(0);
780 VLOG(10) <<
"src indexer size: " << src_indexer.size()
781 <<
" dst indexer size: " << dst_indexer.size();
783 grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
784 queue.SetLimit(1024);
785 std::vector<std::thread> work_threads;
787 std::vector<std::vector<std::shared_ptr<arrow::Array>>> string_columns(
788 std::thread::hardware_concurrency());
790 if constexpr (std::is_same<EDATA_T, RecordView>::value) {
797 std::vector<std::shared_ptr<arrow::Array>> string_cols;
798 std::atomic<size_t> offset(0);
799 std::shared_mutex rw_mutex;
800 for (
auto filename : e_files) {
801 auto record_batch_supplier_vec =
802 supplier_creator(src_label_id, dst_label_id, e_label_id, filename,
805 queue.SetProducerNum(record_batch_supplier_vec.size());
807 for (
size_t i = 0; i < record_batch_supplier_vec.size(); ++i) {
808 work_threads.emplace_back(
810 auto& string_column = string_columns[idx];
811 bool first_batch =
true;
812 auto& record_batch_supplier = record_batch_supplier_vec[idx];
814 auto record_batch = record_batch_supplier->GetNextBatch();
816 queue.DecProducerNum();
820 auto header = record_batch->schema()->field_names();
822 src_label_id, dst_label_id, e_label_id);
824 src_label_id, dst_label_id, e_label_id);
825 CHECK(schema_column_names.size() + 2 == header.size())
826 <<
"schema size: " << schema_column_names.size()
827 <<
" neq header size: " << header.size();
830 for (
auto i = 0; i < record_batch->num_columns(); ++i) {
831 if (record_batch->column(i)->type()->Equals(arrow::utf8()) ||
832 record_batch->column(i)->type()->Equals(
833 arrow::large_utf8())) {
834 string_column.emplace_back(record_batch->column(i));
838 queue.Put(record_batch);
845 std::min(
static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
846 std::thread::hardware_concurrency());
848 work_threads.emplace_back(
851 auto& parsed_edges = parsed_edges_vec[idx];
853 std::shared_ptr<arrow::RecordBatch> record_batch{
nullptr};
854 auto ret = queue.Get(record_batch);
859 LOG(FATAL) <<
"get nullptr batch";
861 auto columns = record_batch->columns();
864 CHECK(columns.size() >= 2);
865 auto src_col = columns[0];
866 auto dst_col = columns[1];
867 auto src_col_type = src_col->type();
868 auto dst_col_type = dst_col->type();
870 <<
"unsupported src_col type: " << src_col_type->ToString();
872 <<
"unsupported dst_col type: " << dst_col_type->ToString();
874 std::vector<std::shared_ptr<arrow::Array>> property_cols;
875 for (
size_t i = 2; i < columns.size(); ++i) {
876 property_cols.emplace_back(columns[i]);
879 if constexpr (std::is_same<EDATA_T, RecordView>::value) {
883 CHECK(casted_csr != NULL);
884 auto table = casted_csr->GetTable();
885 CHECK(table.col_num() == property_cols.size());
886 offset_i = offset.fetch_add(src_col->length());
887 std::vector<size_t> offsets;
889 _i < static_cast<size_t>(src_col->length()); ++_i) {
890 offsets.emplace_back(offset_i + _i);
892 size_t row_num = std::max(table.row_num(), 1ul);
894 while (row_num < offset_i + src_col->length()) {
897 if (row_num > table.row_num()) {
898 std::unique_lock<std::shared_mutex> lock(rw_mutex);
899 if (row_num > table.row_num()) {
900 table.resize(row_num);
905 std::shared_lock<std::shared_mutex> lock(rw_mutex);
906 for (
size_t i = 0; i < table.col_num(); ++i) {
907 auto col = table.get_column_by_id(i);
909 std::make_shared<arrow::ChunkedArray>(
916 src_label_id, dst_label_id, e_label_id);
918 CHECK(src_col->length() == dst_col->length());
919 if (src_col_type->Equals(arrow::int64())) {
920 _append_edges<int64_t, EDATA_T, VECTOR_T>(
921 src_col, dst_col, src_indexer, dst_indexer,
922 property_cols[0], edge_property, parsed_edges, ie_degree,
923 oe_degree, offset_i);
924 }
else if (src_col_type->Equals(arrow::uint64())) {
925 _append_edges<uint64_t, EDATA_T, VECTOR_T>(
926 src_col, dst_col, src_indexer, dst_indexer,
927 property_cols[0], edge_property, parsed_edges, ie_degree,
928 oe_degree, offset_i);
929 }
else if (src_col_type->Equals(arrow::int32())) {
930 _append_edges<int32_t, EDATA_T, VECTOR_T>(
931 src_col, dst_col, src_indexer, dst_indexer,
932 property_cols[0], edge_property, parsed_edges, ie_degree,
933 oe_degree, offset_i);
934 }
else if (src_col_type->Equals(arrow::uint32())) {
935 _append_edges<uint32_t, EDATA_T, VECTOR_T>(
936 src_col, dst_col, src_indexer, dst_indexer,
937 property_cols[0], edge_property, parsed_edges, ie_degree,
938 oe_degree, offset_i);
941 _append_edges<std::string_view, EDATA_T, VECTOR_T>(
942 src_col, dst_col, src_indexer, dst_indexer,
943 property_cols[0], edge_property, parsed_edges, ie_degree,
944 oe_degree, offset_i);
951 for (
auto& t : work_threads) {
954 VLOG(10) <<
"Finish parsing edge file:" << filename <<
" for label "
955 << src_label_name <<
" -> " << dst_label_name <<
" -> "
958 VLOG(10) <<
"Finish parsing edge file:" << e_files.size() <<
" for label "
959 << src_label_name <<
" -> " << dst_label_name <<
" -> "
961 std::vector<int32_t> ie_deg(ie_degree.size());
962 std::vector<int32_t> oe_deg(oe_degree.size());
963 for (
size_t idx = 0; idx < ie_deg.size(); ++idx) {
964 ie_deg[idx] = ie_degree[idx];
966 for (
size_t idx = 0; idx < oe_deg.size(); ++idx) {
967 oe_deg[idx] = oe_degree[idx];
971 src_label_id, dst_label_id, e_label_id, parsed_edges_vec, ie_deg,
974 string_columns.clear();
976 for (
auto& edges : parsed_edges_vec) {
979 std::is_same<VECTOR_T,
980 mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>>::value ||
981 std::is_same<VECTOR_T,
982 mmap_vector<std::tuple<vid_t, vid_t, size_t>>>::value) {
987 VLOG(10) <<
"Finish putting: " << sum <<
" edges";
1002 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_ABSTRACT_ARROW_FRAGMENT_LOADER_H_