Go to the documentation of this file.
16 #ifndef GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
17 #define GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
21 #include <grape/serialization/in_archive.h>
33 virtual void BatchInit(
const std::string& oe_name,
const std::string& ie_name,
34 const std::string& edata_name,
35 const std::string& work_dir,
36 const std::vector<int>& oe_degree,
37 const std::vector<int>& ie_degree) = 0;
40 const std::string& work_dir,
41 const std::vector<int>& oe_degree,
42 const std::vector<int>& ie_degree) = 0;
44 virtual void Open(
const std::string& oe_name,
const std::string& ie_name,
45 const std::string& edata_name,
47 const std::string& work_dir) = 0;
49 const std::string& ie_name,
50 const std::string& edata_name,
52 size_t src_vertex_cap,
size_t dst_vertex_cap) = 0;
54 const std::string& ie_name,
55 const std::string& edata_name,
57 size_t src_vertex_cap,
58 size_t dst_vertex_cap) = 0;
59 virtual void Dump(
const std::string& oe_name,
const std::string& ie_name,
60 const std::string& edata_name,
61 const std::string& new_snapshot_dir) = 0;
97 virtual void Close() = 0;
100 template <
typename EDATA_T>
137 void BatchInit(
const std::string& oe_name,
const std::string& ie_name,
138 const std::string& edata_name,
const std::string& work_dir,
139 const std::vector<int>& oe_degree,
140 const std::vector<int>& ie_degree)
override {
141 in_csr_->batch_init(ie_name, work_dir, ie_degree);
142 out_csr_->batch_init(oe_name, work_dir, oe_degree);
146 const std::string& work_dir,
147 const std::vector<int>& oe_degree,
148 const std::vector<int>& ie_degree)
override {
149 in_csr_->batch_init_in_memory(ie_degree);
150 out_csr_->batch_init_in_memory(oe_degree);
153 void Open(
const std::string& oe_name,
const std::string& ie_name,
154 const std::string& edata_name,
const std::string&
snapshot_dir,
155 const std::string& work_dir)
override {
160 void OpenInMemory(
const std::string& oe_name,
const std::string& ie_name,
161 const std::string& edata_name,
163 size_t dst_vertex_cap)
override {
169 const std::string& edata_name,
171 size_t dst_vertex_cap)
override {
176 void Dump(
const std::string& oe_name,
const std::string& ie_name,
177 const std::string& edata_name,
178 const std::string& new_snapshot_dir)
override {
179 in_csr_->dump(ie_name, new_snapshot_dir);
180 out_csr_->dump(oe_name, new_snapshot_dir);
193 in_csr_->put_edge(dst, src, data, ts, alloc);
194 out_csr_->put_edge(src, dst, data, ts, alloc);
198 in_csr_->batch_sort_by_edge_data(ts);
199 out_csr_->batch_sort_by_edge_data(ts);
204 auto oe =
out_csr_->edge_iter_mut(src);
207 bool src_flag =
false, dst_flag =
false;
208 while (oe !=
nullptr && oe->is_valid()) {
209 if (oe->get_neighbor() == dst) {
210 oe->set_data(prop, ts);
216 auto ie =
in_csr_->edge_iter_mut(dst);
217 while (ie !=
nullptr && ie->is_valid()) {
218 if (ie->get_neighbor() == src) {
220 ie->set_data(prop, ts);
225 if (!(src_flag || dst_flag)) {
226 in_csr_->put_edge(dst, src, prop, ts, alloc);
227 out_csr_->put_edge(src, dst, prop, ts, alloc);
232 in_csr_->batch_put_edge(dst, src, data);
233 out_csr_->batch_put_edge(src, dst, data);
276 void BatchInit(
const std::string& oe_name,
const std::string& ie_name,
277 const std::string& edata_name,
const std::string& work_dir,
278 const std::vector<int>& oe_degree,
279 const std::vector<int>& ie_degree)
override {
280 size_t ie_num =
in_csr_->batch_init(ie_name, work_dir, ie_degree);
281 size_t oe_num =
out_csr_->batch_init(oe_name, work_dir, oe_degree);
282 column_.open(edata_name,
"", work_dir);
283 column_.resize(std::max(ie_num, oe_num));
284 column_idx_.store(0);
288 const std::string& work_dir,
289 const std::vector<int>& oe_degree,
290 const std::vector<int>& ie_degree)
override {
291 size_t ie_num =
in_csr_->batch_init_in_memory(ie_degree);
292 size_t oe_num =
out_csr_->batch_init_in_memory(oe_degree);
293 column_.open(edata_name,
"", work_dir);
294 column_.resize(std::max(ie_num, oe_num));
295 column_idx_.store(0);
298 void Open(
const std::string& oe_name,
const std::string& ie_name,
299 const std::string& edata_name,
const std::string&
snapshot_dir,
300 const std::string& work_dir)
override {
304 column_idx_.store(column_.size());
305 column_.resize(std::max(column_.size() + (column_.size() + 4) / 5, 4096ul));
308 void OpenInMemory(
const std::string& oe_name,
const std::string& ie_name,
309 const std::string& edata_name,
311 size_t dst_vertex_cap)
override {
314 column_.open_in_memory(
snapshot_dir +
"/" + edata_name);
315 column_idx_.store(column_.size());
316 column_.resize(std::max(column_.size() + (column_.size() + 4) / 5, 4096ul));
320 const std::string& edata_name,
322 size_t dst_vertex_cap)
override {
323 LOG(FATAL) <<
"not supported...";
326 void Dump(
const std::string& oe_name,
const std::string& ie_name,
327 const std::string& edata_name,
328 const std::string& new_snapshot_dir)
override {
329 in_csr_->dump(ie_name, new_snapshot_dir);
330 out_csr_->dump(oe_name, new_snapshot_dir);
331 column_.resize(column_idx_.load());
332 column_.dump(new_snapshot_dir +
"/" + edata_name);
343 std::string_view prop;
345 size_t row_id = column_idx_.fetch_add(1);
346 column_.set_value(row_id, prop);
347 in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
348 out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
352 LOG(FATAL) <<
"Not implemented";
357 auto oe_ptr =
out_csr_->edge_iter_mut(src);
360 size_t index = std::numeric_limits<size_t>::max();
361 while (oe !=
nullptr && oe->is_valid()) {
362 if (oe->get_neighbor() == dst) {
363 oe->set_timestamp(ts);
364 index = oe->get_index();
369 auto ie_ptr =
in_csr_->edge_iter_mut(dst);
371 while (ie !=
nullptr && ie->is_valid()) {
372 if (ie->get_neighbor() == src) {
374 index = ie->get_index();
379 if (index != std::numeric_limits<size_t>::max()) {
380 column_.set_value(index, prop);
382 size_t row_id = column_idx_.fetch_add(1);
383 column_.set_value(row_id, prop);
384 in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
385 out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
390 size_t row_id = column_idx_.fetch_add(1);
391 column_.set_value(row_id, data);
392 in_csr_->batch_put_edge_with_index(dst, src, row_id);
393 out_csr_->batch_put_edge_with_index(src, dst, row_id);
397 size_t row_id = column_idx_.fetch_add(1);
398 column_.set_value(row_id, data);
400 in_csr_->batch_put_edge_with_index(dst, src, row_id);
401 out_csr_->batch_put_edge_with_index(src, dst, row_id);
421 const std::vector<std::string>& col_name,
422 const std::vector<PropertyType>& property_types,
423 const std::vector<StorageStrategy>& storage_strategies)
424 : col_name_(col_name),
425 property_types_(property_types),
426 storage_strategies_(storage_strategies),
454 void InitTable(
const std::string& edata_name,
const std::string& work_dir) {
455 table_.init(edata_name, work_dir, col_name_, property_types_,
456 storage_strategies_);
459 void BatchInit(
const std::string& oe_name,
const std::string& ie_name,
460 const std::string& edata_name,
const std::string& work_dir,
461 const std::vector<int>& oe_degree,
462 const std::vector<int>& ie_degree)
override {
463 size_t ie_num =
in_csr_->batch_init(ie_name, work_dir, ie_degree);
464 size_t oe_num =
out_csr_->batch_init(oe_name, work_dir, oe_degree);
465 table_.resize(std::max(ie_num, oe_num));
467 table_idx_.store(std::max(ie_num, oe_num));
471 const std::string& work_dir,
472 const std::vector<int>& oe_degree,
473 const std::vector<int>& ie_degree)
override {
474 size_t ie_num =
in_csr_->batch_init_in_memory(ie_degree);
475 size_t oe_num =
out_csr_->batch_init_in_memory(oe_degree);
476 table_.resize(std::max(ie_num, oe_num));
477 table_idx_.store(std::max(ie_num, oe_num));
480 void Open(
const std::string& oe_name,
const std::string& ie_name,
481 const std::string& edata_name,
const std::string&
snapshot_dir,
482 const std::string& work_dir)
override {
487 table_.open(edata_name,
snapshot_dir, work_dir, col_name_, property_types_,
489 table_idx_.store(table_.row_num());
491 std::max(table_.row_num() + (table_.row_num() + 4) / 5, 4096ul));
494 void OpenInMemory(
const std::string& oe_name,
const std::string& ie_name,
495 const std::string& edata_name,
497 size_t dst_vertex_cap)
override {
501 table_.open_in_memory(edata_name,
snapshot_dir, col_name_, property_types_,
503 table_idx_.store(table_.row_num());
505 std::max(table_.row_num() + (table_.row_num() + 4) / 5, 4096ul));
509 const std::string& edata_name,
511 size_t dst_vertex_cap)
override {
512 LOG(FATAL) <<
"not supported...";
515 void Dump(
const std::string& oe_name,
const std::string& ie_name,
516 const std::string& edata_name,
517 const std::string& new_snapshot_dir)
override {
518 in_csr_->dump(ie_name, new_snapshot_dir);
519 out_csr_->dump(oe_name, new_snapshot_dir);
520 table_.resize(table_idx_.load());
521 table_.dump(edata_name, new_snapshot_dir);
530 in_csr_->batch_put_edge_with_index(dst, src, row_id);
531 out_csr_->batch_put_edge_with_index(src, dst, row_id);
539 size_t row_id = table_idx_.fetch_add(1);
542 table_.ingest(row_id, oarc);
543 in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
544 out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
548 LOG(FATAL) <<
"Not implemented";
553 auto oe_ptr =
out_csr_->edge_iter_mut(src);
554 grape::InArchive arc;
556 for (
size_t i = 0; i < r.
len; ++i) {
559 grape::OutArchive oarc;
560 oarc.SetSlice(arc.GetBuffer(), arc.GetSize());
562 size_t index = std::numeric_limits<size_t>::max();
563 while (oe !=
nullptr && oe->is_valid()) {
564 if (oe->get_neighbor() == dst) {
565 oe->set_timestamp(ts);
566 index = oe->get_index();
571 auto ie_ptr =
in_csr_->edge_iter_mut(dst);
573 while (ie !=
nullptr && ie->is_valid()) {
574 if (ie->get_neighbor() == src) {
576 index = ie->get_index();
581 if (index != std::numeric_limits<size_t>::max()) {
582 table_.ingest(index, oarc);
584 size_t row_id = table_idx_.fetch_add(1);
585 table_.ingest(row_id, oarc);
586 in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
587 out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
609 #endif // GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
virtual void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir)=0
virtual void SortByEdgeData(timestamp_t ts)=0
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:123
Definition: mutable_csr.h:1059
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:187
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:145
CsrBase * GetInCsr() override
Definition: dual_csr.h:524
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:197
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:537
TypedCsrBase< std::string_view > * out_csr_
Definition: dual_csr.h:412
Table & GetTable()
Definition: dual_csr.h:534
uint32_t timestamp_t
Definition: types.h:30
Definition: csr_base.h:61
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:494
StringColumn column_
Definition: dual_csr.h:414
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:547
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:508
uint32_t vid_t
Definition: types.h:31
TypedMutableCsrBase< RecordView > * out_csr_
Definition: dual_csr.h:602
virtual void UpdateEdge(vid_t src, vid_t dst, const Any &oarc, timestamp_t timestamp, Allocator &alloc)=0
~DualCsr()
Definition: dual_csr.h:129
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:527
const std::vector< StorageStrategy > & storage_strategies_
Definition: dual_csr.h:600
virtual void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir)=0
virtual CsrBase * GetOutCsr()=0
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, const std::vector< std::string > &col_name, const std::vector< PropertyType > &property_types, const std::vector< StorageStrategy > &storage_strategies)
Definition: dual_csr.h:420
void BatchPutEdge(vid_t src, vid_t dst, size_t row_id)
Definition: dual_csr.h:529
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:339
static void to(const Any &value, T &out)
Definition: types.h:799
Definition: adj_list.h:23
const Table & GetTable() const
Definition: dual_csr.h:536
virtual void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t timestamp, Allocator &alloc)=0
virtual void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap)=0
TypedCsrBase< std::string_view > * in_csr_
Definition: dual_csr.h:411
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:189
StorageStrategy
Definition: types.h:58
Definition: mutable_csr.h:1293
void BatchPutEdge(vid_t src, vid_t dst, const std::string &data)
Definition: dual_csr.h:396
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:308
void InitTable(const std::string &edata_name, const std::string &work_dir)
Definition: dual_csr.h:454
TypedCsrBase< EDATA_T > * in_csr_
Definition: dual_csr.h:242
Definition: allocators.h:29
Definition: mutable_csr.h:661
size_t len
Definition: types.h:314
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:276
void Close() override
Definition: dual_csr.h:236
Definition: mutable_csr.h:1234
Any * props
Definition: types.h:315
virtual void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap)=0
CsrBase * GetOutCsr() override
Definition: dual_csr.h:525
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:480
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:153
void Warmup(int thread_num)
Definition: dual_csr.h:76
virtual void resize(vid_t vnum)=0
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, uint16_t width)
Definition: dual_csr.h:249
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:176
virtual size_t edge_num() const =0
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:470
const std::vector< PropertyType > & property_types_
Definition: dual_csr.h:599
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:341
std::atomic< size_t > table_idx_
Definition: dual_csr.h:603
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:326
TypedCsrBase< EDATA_T > * out_csr_
Definition: dual_csr.h:243
virtual CsrBase * GetInCsr()=0
const std::vector< std::string > & col_name_
Definition: dual_csr.h:598
Definition: mutable_csr.h:573
void next() override
Definition: mutable_csr.h:95
Definition: immutable_csr.h:326
CsrBase * GetOutCsr() override
Definition: dual_csr.h:185
const Record & AsRecord() const
Definition: types.h:675
CsrBase * GetOutCsr() override
Definition: dual_csr.h:337
Definition: mutable_csr.h:183
CsrBase * GetInCsr() override
Definition: dual_csr.h:184
Definition: csr_base.h:109
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:526
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:515
void Close() override
Definition: dual_csr.h:404
~DualCsr()
Definition: dual_csr.h:268
virtual ~DualCsrBase()=default
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:355
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:137
virtual void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree)=0
void BatchPutEdge(vid_t src, vid_t dst, const EDATA_T &data)
Definition: dual_csr.h:231
size_t EdgeNum() const
Definition: dual_csr.h:81
Definition: mutable_csr.h:1168
Definition: mutable_csr.h:143
Definition: csr_base.h:118
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
Definition: loading_config.h:232
Definition: mutable_csr.h:745
EdgeStrategy
Definition: types.h:24
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:160
~DualCsr()
Definition: dual_csr.h:445
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:459
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:298
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:168
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:351
void Close() override
Definition: dual_csr.h:591
Definition: dual_csr.h:101
Definition: mutable_csr.h:954
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:159
Definition: dual_csr.h:29
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, bool oe_mutable, bool ie_mutable)
Definition: dual_csr.h:103
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:287
TypedMutableCsrBase< RecordView > * in_csr_
Definition: dual_csr.h:601
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:202
Table table_
Definition: dual_csr.h:604
virtual void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree)=0
void Resize(vid_t src_vertex_num, vid_t dst_vertex_num)
Definition: dual_csr.h:71
Definition: mutable_csr.h:105
std::string_view AsStringView() const
Definition: types.h:641
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:338
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:551
std::atomic< size_t > column_idx_
Definition: dual_csr.h:413
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:319
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:186
virtual void warmup(int thread_num) const =0
CsrBase * GetInCsr() override
Definition: dual_csr.h:336
void BatchPutEdge(vid_t src, vid_t dst, const std::string_view &data)
Definition: dual_csr.h:389