16 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
17 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
25 template <
typename EDATA_T>
35 LOG(FATAL) <<
"not support edge strategy or edge data type";
61 const std::vector<std::vector<Any>>& props);
66 auto dst_columns = table.column_ptrs();
67 CHECK(col_ind < dst_columns.size());
68 dst_columns[col_ind]->set_any(vid, prop);
71 template <
typename KEY_T>
75 std::string filename =
78 auto type = std::get<0>(primary_keys[0]);
80 build_lf_indexer<KEY_T, vid_t>(
93 template <
typename KEY_T>
95 PTIndexerBuilder<KEY_T, vid_t>& indexer_builder) {
97 std::string filename =
99 indexer_builder.finish(PTIndexer<vid_t>::prefix() +
"_" + filename,
111 template <
typename EDATA_T>
116 CHECK(
ie_[index] == NULL);
117 CHECK(
oe_[index] == NULL);
122 src_label_name, dst_label_name, edge_label_name);
124 src_label_name, dst_label_name, edge_label_name);
125 if constexpr (std::is_same_v<EDATA_T, std::string_view>) {
130 if (prop[0].IsVarchar()) {
131 max_length = prop[0].additional_type_info.max_length;
137 src_label_name, dst_label_name, edge_label_name);
139 src_label_name, dst_label_name, edge_label_name);
141 oe_mutable, ie_mutable);
146 oe_prefix(src_label_name, dst_label_name, edge_label_name),
147 ie_prefix(src_label_name, dst_label_name, edge_label_name),
148 edata_prefix(src_label_name, dst_label_name, edge_label_name),
152 template <
typename EDATA_T>
154 if constexpr (std::is_same_v<EDATA_T, RecordView>) {
156 CHECK(casted_dual_csr != NULL);
157 return casted_dual_csr;
160 CHECK(casted_dual_csr != NULL);
161 return casted_dual_csr;
164 template <
typename EDATA_T,
typename VECTOR_T>
166 label_t edge_label_id,
const std::vector<VECTOR_T>& edges_vec,
167 const std::vector<int32_t>& ie_degree,
168 const std::vector<int32_t>& oe_degree,
bool build_csr_in_mem) {
172 CHECK(dual_csr != NULL);
173 auto casted_dual_csr = get_casted_dual_csr<EDATA_T>(dual_csr);
174 CHECK(casted_dual_csr != NULL);
181 auto INVALID_VID = std::numeric_limits<vid_t>::max();
182 std::atomic<size_t> edge_count(0);
183 if constexpr (std::is_same_v<EDATA_T, std::string_view>) {
184 CHECK(ie_degree.size() == dst_indexer.size());
185 CHECK(oe_degree.size() == src_indexer.size());
186 if (build_csr_in_mem) {
187 dual_csr->BatchInitInMemory(
188 edata_prefix(src_label_name, dst_label_name, edge_label_name),
192 oe_prefix(src_label_name, dst_label_name, edge_label_name),
193 ie_prefix(src_label_name, dst_label_name, edge_label_name),
194 edata_prefix(src_label_name, dst_label_name, edge_label_name),
197 std::vector<std::thread> work_threads;
198 for (
size_t i = 0; i < edges_vec.size(); ++i) {
199 work_threads.emplace_back(
201 edge_count.fetch_add(edges_vec[idx].size());
202 for (
auto& edge : edges_vec[idx]) {
203 if (std::get<1>(edge) == INVALID_VID ||
204 std::get<0>(edge) == INVALID_VID) {
205 VLOG(10) <<
"Skip invalid edge:" << std::get<0>(edge) <<
"->"
206 << std::get<1>(edge);
209 casted_dual_csr->BatchPutEdge(
210 std::get<0>(edge), std::get<1>(edge), std::get<2>(edge));
215 for (
auto& t : work_threads) {
223 dual_csr->SortByEdgeData(1);
226 oe_prefix(src_label_name, dst_label_name, edge_label_name),
227 ie_prefix(src_label_name, dst_label_name, edge_label_name),
228 edata_prefix(src_label_name, dst_label_name, edge_label_name),
231 CHECK(ie_degree.size() == dst_indexer.size());
232 CHECK(oe_degree.size() == src_indexer.size());
234 if (build_csr_in_mem) {
235 dual_csr->BatchInitInMemory(
236 edata_prefix(src_label_name, dst_label_name, edge_label_name),
240 oe_prefix(src_label_name, dst_label_name, edge_label_name),
241 ie_prefix(src_label_name, dst_label_name, edge_label_name),
242 edata_prefix(src_label_name, dst_label_name, edge_label_name),
246 std::vector<std::thread> work_threads;
247 for (
size_t i = 0; i < edges_vec.size(); ++i) {
248 work_threads.emplace_back(
250 edge_count.fetch_add(edges_vec[idx].size());
251 for (
auto& edge : edges_vec[idx]) {
252 if (std::get<1>(edge) == INVALID_VID ||
253 std::get<0>(edge) == INVALID_VID) {
254 VLOG(10) <<
"Skip invalid edge:" << std::get<0>(edge) <<
"->"
255 << std::get<1>(edge);
258 casted_dual_csr->BatchPutEdge(
259 std::get<0>(edge), std::get<1>(edge), std::get<2>(edge));
264 for (
auto& t : work_threads) {
271 dual_csr->SortByEdgeData(1);
275 oe_prefix(src_label_name, dst_label_name, edge_label_name),
276 ie_prefix(src_label_name, dst_label_name, edge_label_name),
277 edata_prefix(src_label_name, dst_label_name, edge_label_name),
282 VLOG(10) <<
"Finish adding edge batch of size: " << edge_count.load();
311 const std::string& dst_label_name,
312 const std::string& edge_label_name,
329 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_