16 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
17 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
25 template <
typename EDATA_T>
35 LOG(FATAL) <<
"not support edge strategy or edge data type";
62 auto dst_columns = table.column_ptrs();
63 CHECK(col_ind < dst_columns.size());
64 dst_columns[col_ind]->set_any(vid, prop);
67 template <
typename KEY_T>
71 std::string filename =
74 auto type = std::get<0>(primary_keys[0]);
76 build_lf_indexer<KEY_T, vid_t>(
89 template <
typename KEY_T>
91 PTIndexerBuilder<KEY_T, vid_t>& indexer_builder) {
93 std::string filename =
95 indexer_builder.finish(PTIndexer<vid_t>::prefix() +
"_" + filename,
107 template <
typename EDATA_T>
112 CHECK(
ie_[index] == NULL);
113 CHECK(
oe_[index] == NULL);
118 src_label_name, dst_label_name, edge_label_name);
120 src_label_name, dst_label_name, edge_label_name);
121 if constexpr (std::is_same_v<EDATA_T, std::string_view>) {
126 if (prop[0].IsVarchar()) {
127 max_length = prop[0].additional_type_info.max_length;
133 src_label_name, dst_label_name, edge_label_name);
135 src_label_name, dst_label_name, edge_label_name);
137 oe_mutable, ie_mutable);
142 oe_prefix(src_label_name, dst_label_name, edge_label_name),
143 ie_prefix(src_label_name, dst_label_name, edge_label_name),
144 edata_prefix(src_label_name, dst_label_name, edge_label_name),
148 template <
typename EDATA_T>
150 if constexpr (std::is_same_v<EDATA_T, RecordView>) {
152 CHECK(casted_dual_csr != NULL);
153 return casted_dual_csr;
156 CHECK(casted_dual_csr != NULL);
157 return casted_dual_csr;
160 template <
typename EDATA_T,
typename VECTOR_T>
162 label_t edge_label_id,
const std::vector<VECTOR_T>& edges_vec,
163 const std::vector<int32_t>& ie_degree,
164 const std::vector<int32_t>& oe_degree,
bool build_csr_in_mem) {
168 CHECK(dual_csr != NULL);
169 auto casted_dual_csr = get_casted_dual_csr<EDATA_T>(dual_csr);
170 CHECK(casted_dual_csr != NULL);
177 auto INVALID_VID = std::numeric_limits<vid_t>::max();
178 std::atomic<size_t> edge_count(0);
179 if constexpr (std::is_same_v<EDATA_T, std::string_view>) {
180 CHECK(ie_degree.size() == dst_indexer.size());
181 CHECK(oe_degree.size() == src_indexer.size());
182 if (build_csr_in_mem) {
183 dual_csr->BatchInitInMemory(
184 edata_prefix(src_label_name, dst_label_name, edge_label_name),
188 oe_prefix(src_label_name, dst_label_name, edge_label_name),
189 ie_prefix(src_label_name, dst_label_name, edge_label_name),
190 edata_prefix(src_label_name, dst_label_name, edge_label_name),
193 std::vector<std::thread> work_threads;
194 for (
size_t i = 0; i < edges_vec.size(); ++i) {
195 work_threads.emplace_back(
197 edge_count.fetch_add(edges_vec[idx].size());
198 for (
auto& edge : edges_vec[idx]) {
199 if (std::get<1>(edge) == INVALID_VID ||
200 std::get<0>(edge) == INVALID_VID) {
201 VLOG(10) <<
"Skip invalid edge:" << std::get<0>(edge) <<
"->"
202 << std::get<1>(edge);
205 casted_dual_csr->BatchPutEdge(
206 std::get<0>(edge), std::get<1>(edge), std::get<2>(edge));
211 for (
auto& t : work_threads) {
219 dual_csr->SortByEdgeData(1);
222 oe_prefix(src_label_name, dst_label_name, edge_label_name),
223 ie_prefix(src_label_name, dst_label_name, edge_label_name),
224 edata_prefix(src_label_name, dst_label_name, edge_label_name),
227 CHECK(ie_degree.size() == dst_indexer.size());
228 CHECK(oe_degree.size() == src_indexer.size());
230 if (build_csr_in_mem) {
231 dual_csr->BatchInitInMemory(
232 edata_prefix(src_label_name, dst_label_name, edge_label_name),
236 oe_prefix(src_label_name, dst_label_name, edge_label_name),
237 ie_prefix(src_label_name, dst_label_name, edge_label_name),
238 edata_prefix(src_label_name, dst_label_name, edge_label_name),
242 std::vector<std::thread> work_threads;
243 for (
size_t i = 0; i < edges_vec.size(); ++i) {
244 work_threads.emplace_back(
246 edge_count.fetch_add(edges_vec[idx].size());
247 for (
auto& edge : edges_vec[idx]) {
248 if (std::get<1>(edge) == INVALID_VID ||
249 std::get<0>(edge) == INVALID_VID) {
250 VLOG(10) <<
"Skip invalid edge:" << std::get<0>(edge) <<
"->"
251 << std::get<1>(edge);
254 casted_dual_csr->BatchPutEdge(
255 std::get<0>(edge), std::get<1>(edge), std::get<2>(edge));
260 for (
auto& t : work_threads) {
267 dual_csr->SortByEdgeData(1);
271 oe_prefix(src_label_name, dst_label_name, edge_label_name),
272 ie_prefix(src_label_name, dst_label_name, edge_label_name),
273 edata_prefix(src_label_name, dst_label_name, edge_label_name),
278 VLOG(10) <<
"Finish adding edge batch of size: " << edge_count.load();
307 const std::string& dst_label_name,
308 const std::string& edge_label_name,
325 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_