16 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
17 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_BASIC_FRAGMENT_LOADER_H_
25 template <
typename EDATA_T>
35 LOG(FATAL) <<
"not support edge strategy or edge data type";
62 auto dst_columns = table.column_ptrs();
63 CHECK(col_ind < dst_columns.size());
64 dst_columns[col_ind]->set_any(vid, prop);
67 template <
typename KEY_T>
71 std::string filename =
74 auto type = std::get<0>(primary_keys[0]);
76 build_lf_indexer<KEY_T, vid_t>(
89 template <
typename KEY_T>
91 PTIndexerBuilder<KEY_T, vid_t>& indexer_builder) {
93 std::string filename =
95 indexer_builder.finish(PTIndexer<vid_t>::prefix() +
"_" + filename,
107 template <
typename EDATA_T>
112 CHECK(
ie_[index] == NULL);
113 CHECK(
oe_[index] == NULL);
118 src_label_name, dst_label_name, edge_label_name);
120 src_label_name, dst_label_name, edge_label_name);
121 if constexpr (std::is_same_v<EDATA_T, std::string_view>) {
126 if (prop[0].IsVarchar()) {
127 max_length = prop[0].additional_type_info.max_length;
130 src_label_name, dst_label_name, edge_label_name);
132 src_label_name, dst_label_name, edge_label_name);
134 oe_strategy, ie_strategy, max_length, oe_mutable, ie_mutable);
137 src_label_name, dst_label_name, edge_label_name);
139 src_label_name, dst_label_name, edge_label_name);
141 oe_mutable, ie_mutable);
146 oe_prefix(src_label_name, dst_label_name, edge_label_name),
147 ie_prefix(src_label_name, dst_label_name, edge_label_name),
148 edata_prefix(src_label_name, dst_label_name, edge_label_name),
152 template <
typename EDATA_T>
154 if constexpr (std::is_same_v<EDATA_T, RecordView>) {
156 CHECK(casted_dual_csr != NULL);
157 return casted_dual_csr;
160 CHECK(casted_dual_csr != NULL);
161 return casted_dual_csr;
164 template <
typename EDATA_T,
typename VECTOR_T>
166 label_t edge_label_id,
const std::vector<VECTOR_T>& edges_vec,
167 const std::vector<int32_t>& ie_degree,
168 const std::vector<int32_t>& oe_degree,
bool build_csr_in_mem) {
172 CHECK(dual_csr != NULL);
173 auto casted_dual_csr = get_casted_dual_csr<EDATA_T>(dual_csr);
174 CHECK(casted_dual_csr != NULL);
181 auto INVALID_VID = std::numeric_limits<vid_t>::max();
182 std::atomic<size_t> edge_count(0);
183 if constexpr (std::is_same_v<EDATA_T, std::string_view>) {
184 CHECK(ie_degree.size() == dst_indexer.size());
185 CHECK(oe_degree.size() == src_indexer.size());
186 if (build_csr_in_mem) {
187 dual_csr->BatchInitInMemory(
188 edata_prefix(src_label_name, dst_label_name, edge_label_name),
192 oe_prefix(src_label_name, dst_label_name, edge_label_name),
193 ie_prefix(src_label_name, dst_label_name, edge_label_name),
194 edata_prefix(src_label_name, dst_label_name, edge_label_name),
197 std::vector<std::thread> work_threads;
198 for (
size_t i = 0; i < edges_vec.size(); ++i) {
199 work_threads.emplace_back(
201 edge_count.fetch_add(edges_vec[idx].size());
202 for (
auto& edge : edges_vec[idx]) {
203 if (std::get<1>(edge) == INVALID_VID ||
204 std::get<0>(edge) == INVALID_VID) {
205 VLOG(10) <<
"Skip invalid edge:" << std::get<0>(edge) <<
"->"
206 << std::get<1>(edge);
209 casted_dual_csr->BatchPutEdge(
210 std::get<0>(edge), std::get<1>(edge), std::get<2>(edge));
215 for (
auto& t : work_threads) {
223 dual_csr->SortByEdgeData(1);
226 oe_prefix(src_label_name, dst_label_name, edge_label_name),
227 ie_prefix(src_label_name, dst_label_name, edge_label_name),
228 edata_prefix(src_label_name, dst_label_name, edge_label_name),
231 CHECK(ie_degree.size() == dst_indexer.size());
232 CHECK(oe_degree.size() == src_indexer.size());
234 if (build_csr_in_mem) {
235 dual_csr->BatchInitInMemory(
236 edata_prefix(src_label_name, dst_label_name, edge_label_name),
240 oe_prefix(src_label_name, dst_label_name, edge_label_name),
241 ie_prefix(src_label_name, dst_label_name, edge_label_name),
242 edata_prefix(src_label_name, dst_label_name, edge_label_name),
246 std::vector<std::thread> work_threads;
247 for (
size_t i = 0; i < edges_vec.size(); ++i) {
248 work_threads.emplace_back(
250 edge_count.fetch_add(edges_vec[idx].size());
251 for (
auto& edge : edges_vec[idx]) {
252 if (std::get<1>(edge) == INVALID_VID ||
253 std::get<0>(edge) == INVALID_VID) {
254 VLOG(10) <<
"Skip invalid edge:" << std::get<0>(edge) <<
"->"
255 << std::get<1>(edge);
258 casted_dual_csr->BatchPutEdge(
259 std::get<0>(edge), std::get<1>(edge), std::get<2>(edge));
264 for (
auto& t : work_threads) {
271 dual_csr->SortByEdgeData(1);
275 oe_prefix(src_label_name, dst_label_name, edge_label_name),
276 ie_prefix(src_label_name, dst_label_name, edge_label_name),
277 edata_prefix(src_label_name, dst_label_name, edge_label_name),
282 VLOG(10) <<
"Finish adding edge batch of size: " << edge_count.load();
311 const std::string& dst_label_name,
312 const std::string& edge_label_name,
Definition: basic_fragment_loader.h:53
std::vector< Table > vertex_data_
Definition: basic_fragment_loader.h:322
void init_vertex_data()
Definition: basic_fragment_loader.cc:121
size_t vertex_label_num_
Definition: basic_fragment_loader.h:318
void append_edge_loading_progress(const std::string &src_label_name, const std::string &dst_label_name, const std::string &edge_label_name, LoadingStatus status)
Definition: basic_fragment_loader.cc:84
size_t edge_label_num_
Definition: basic_fragment_loader.h:318
std::vector< CsrBase * > oe_
Definition: basic_fragment_loader.h:320
void set_csr(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, DualCsrBase *dual_csr)
Definition: basic_fragment_loader.cc:157
void FinishAddingVertex(label_t v_label, const IdIndexer< KEY_T, vid_t > &indexer)
Definition: basic_fragment_loader.h:68
std::vector< DualCsrBase * > dual_csr_list_
Definition: basic_fragment_loader.h:321
void init_edge_table(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:175
const Schema & schema_
Definition: basic_fragment_loader.h:316
void SetVertexProperty(label_t v_label, size_t col_ind, vid_t vid, Any &&prop)
Definition: basic_fragment_loader.h:59
DualCsrBase * get_csr(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.cc:167
void LoadFragment()
Definition: basic_fragment_loader.cc:135
void init_loading_status_file()
Definition: basic_fragment_loader.cc:99
std::vector< IndexerType > lf_indexers_
Definition: basic_fragment_loader.h:319
std::vector< CsrBase * > ie_
Definition: basic_fragment_loader.h:320
void AddNoPropEdgeBatch(label_t src_label_id, label_t dst_label_id, label_t edge_label_id)
Definition: basic_fragment_loader.h:108
const IndexerType & GetLFIndexer(label_t v_label) const
Definition: basic_fragment_loader.cc:147
void append_vertex_loading_progress(const std::string &label_name, LoadingStatus status)
Definition: basic_fragment_loader.cc:72
static decltype(auto) get_casted_dual_csr(DualCsrBase *dual_csr)
Definition: basic_fragment_loader.h:153
Table & GetVertexTable(size_t ind)
Definition: basic_fragment_loader.h:285
BasicFragmentLoader(const Schema &schema, const std::string &prefix)
Definition: basic_fragment_loader.cc:50
std::mutex loading_progress_mutex_
Definition: basic_fragment_loader.h:325
std::string work_dir_
Definition: basic_fragment_loader.h:317
void PutEdges(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, const std::vector< VECTOR_T > &edges_vec, const std::vector< int32_t > &ie_degree, const std::vector< int32_t > &oe_degree, bool build_csr_in_mem)
Definition: basic_fragment_loader.h:165
const std::string & work_dir() const
Definition: basic_fragment_loader.h:294
Definition: dual_csr.h:29
Definition: dual_csr.h:443
Definition: dual_csr.h:255
Definition: dual_csr.h:101
Definition: mutable_csr.h:1170
Definition: id_indexer.h:541
Definition: id_indexer.h:193
Definition: mutable_csr.h:183
bool get_sort_on_compaction(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:347
const std::vector< PropertyType > & get_edge_properties(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:216
bool incoming_edge_mutable(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:337
EdgeStrategy get_outgoing_edge_strategy(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:307
std::string get_vertex_label_name(label_t index) const
Definition: schema.cc:374
EdgeStrategy get_incoming_edge_strategy(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:317
bool outgoing_edge_mutable(const std::string &src_label, const std::string &dst_label, const std::string &label) const
Definition: schema.cc:327
const std::vector< std::tuple< PropertyType, std::string, size_t > > & get_vertex_primary_key(label_t index) const
Definition: schema.cc:391
std::string get_edge_label_name(label_t index) const
Definition: schema.cc:382
Definition: mutable_csr.h:747
Definition: csr_base.h:135
Definition: adj_list.h:23
EdgeStrategy
Definition: types.h:24
std::string vertex_map_prefix(const std::string &label)
Definition: file_names.h:235
std::string oe_prefix(const std::string &src_label, const std::string &dst_label, const std::string edge_label)
Definition: file_names.h:245
std::ostream & operator<<(std::ostream &os, const LoadingStatus &status)
Definition: basic_fragment_loader.cc:22
std::string vertex_table_prefix(const std::string &label)
Definition: file_names.h:256
std::string ie_prefix(const std::string &src_label, const std::string &dst_label, const std::string edge_label)
Definition: file_names.h:239
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
std::string edata_prefix(const std::string &src_label, const std::string &dst_label, const std::string &edge_label)
Definition: file_names.h:251
uint32_t vid_t
Definition: types.h:31
std::string tmp_dir(const std::string &work_dir)
Definition: file_names.h:213
std::istream & operator>>(std::istream &is, LoadingStatus &status)
Definition: basic_fragment_loader.cc:35
uint8_t label_t
Definition: types.h:32
TypedMutableCsrBase< EDATA_T > * create_typed_csr(EdgeStrategy es, PropertyType edge_property)
Definition: basic_fragment_loader.h:26
LoadingStatus
Definition: basic_fragment_loader.h:39
static uint16_t GetStringDefaultMaxLength()
Definition: types.cc:103