Go to the documentation of this file.
16 #ifndef GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
17 #define GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
21 #include <grape/serialization/in_archive.h>
33 virtual void BatchInit(
const std::string& oe_name,
const std::string& ie_name,
34 const std::string& edata_name,
35 const std::string& work_dir,
36 const std::vector<int>& oe_degree,
37 const std::vector<int>& ie_degree) = 0;
40 const std::string& work_dir,
41 const std::vector<int>& oe_degree,
42 const std::vector<int>& ie_degree) = 0;
44 virtual void Open(
const std::string& oe_name,
const std::string& ie_name,
45 const std::string& edata_name,
47 const std::string& work_dir) = 0;
49 const std::string& ie_name,
50 const std::string& edata_name,
52 size_t src_vertex_cap,
size_t dst_vertex_cap) = 0;
54 const std::string& ie_name,
55 const std::string& edata_name,
57 size_t src_vertex_cap,
58 size_t dst_vertex_cap) = 0;
59 virtual void Dump(
const std::string& oe_name,
const std::string& ie_name,
60 const std::string& edata_name,
61 const std::string& new_snapshot_dir) = 0;
97 virtual void Close() = 0;
100 template <
typename EDATA_T>
145 void BatchInit(
const std::string& oe_name,
const std::string& ie_name,
146 const std::string& edata_name,
const std::string& work_dir,
147 const std::vector<int>& oe_degree,
148 const std::vector<int>& ie_degree)
override {
149 in_csr_->batch_init(ie_name, work_dir, ie_degree);
150 out_csr_->batch_init(oe_name, work_dir, oe_degree);
154 const std::string& work_dir,
155 const std::vector<int>& oe_degree,
156 const std::vector<int>& ie_degree)
override {
157 in_csr_->batch_init_in_memory(ie_degree);
158 out_csr_->batch_init_in_memory(oe_degree);
161 void Open(
const std::string& oe_name,
const std::string& ie_name,
162 const std::string& edata_name,
const std::string&
snapshot_dir,
163 const std::string& work_dir)
override {
168 void OpenInMemory(
const std::string& oe_name,
const std::string& ie_name,
169 const std::string& edata_name,
171 size_t dst_vertex_cap)
override {
177 const std::string& edata_name,
179 size_t dst_vertex_cap)
override {
184 void Dump(
const std::string& oe_name,
const std::string& ie_name,
185 const std::string& edata_name,
186 const std::string& new_snapshot_dir)
override {
187 in_csr_->dump(ie_name, new_snapshot_dir);
188 out_csr_->dump(oe_name, new_snapshot_dir);
201 in_csr_->put_edge(dst, src, data, ts, alloc);
202 out_csr_->put_edge(src, dst, data, ts, alloc);
206 in_csr_->batch_sort_by_edge_data(ts);
207 out_csr_->batch_sort_by_edge_data(ts);
212 auto oe =
out_csr_->edge_iter_mut(src);
215 bool src_flag =
false, dst_flag =
false;
216 while (oe !=
nullptr && oe->is_valid()) {
217 if (oe->get_neighbor() == dst) {
218 oe->set_data(prop, ts);
224 auto ie =
in_csr_->edge_iter_mut(dst);
225 while (ie !=
nullptr && ie->is_valid()) {
226 if (ie->get_neighbor() == src) {
228 ie->set_data(prop, ts);
233 if (!(src_flag || dst_flag)) {
234 in_csr_->put_edge(dst, src, prop, ts, alloc);
235 out_csr_->put_edge(src, dst, prop, ts, alloc);
240 in_csr_->batch_put_edge(dst, src, data);
241 out_csr_->batch_put_edge(src, dst, data);
258 bool oe_mutable,
bool ie_mutable)
301 void BatchInit(
const std::string& oe_name,
const std::string& ie_name,
302 const std::string& edata_name,
const std::string& work_dir,
303 const std::vector<int>& oe_degree,
304 const std::vector<int>& ie_degree)
override {
305 size_t ie_num =
in_csr_->batch_init(ie_name, work_dir, ie_degree);
306 size_t oe_num =
out_csr_->batch_init(oe_name, work_dir, oe_degree);
307 column_.open(edata_name,
"", work_dir);
308 column_.resize(std::max(ie_num, oe_num));
309 column_idx_.store(0);
313 const std::string& work_dir,
314 const std::vector<int>& oe_degree,
315 const std::vector<int>& ie_degree)
override {
316 size_t ie_num =
in_csr_->batch_init_in_memory(ie_degree);
317 size_t oe_num =
out_csr_->batch_init_in_memory(oe_degree);
318 column_.open(edata_name,
"", work_dir);
319 column_.resize(std::max(ie_num, oe_num));
320 column_idx_.store(0);
323 void Open(
const std::string& oe_name,
const std::string& ie_name,
324 const std::string& edata_name,
const std::string&
snapshot_dir,
325 const std::string& work_dir)
override {
329 column_idx_.store(column_.size());
330 column_.resize(std::max(column_.size() + (column_.size() + 4) / 5, 4096ul));
333 void OpenInMemory(
const std::string& oe_name,
const std::string& ie_name,
334 const std::string& edata_name,
336 size_t dst_vertex_cap)
override {
339 column_.open_in_memory(
snapshot_dir +
"/" + edata_name);
340 column_idx_.store(column_.size());
341 column_.resize(std::max(column_.size() + (column_.size() + 4) / 5, 4096ul));
345 const std::string& edata_name,
347 size_t dst_vertex_cap)
override {
348 LOG(FATAL) <<
"not supported...";
351 void Dump(
const std::string& oe_name,
const std::string& ie_name,
352 const std::string& edata_name,
353 const std::string& new_snapshot_dir)
override {
354 in_csr_->dump(ie_name, new_snapshot_dir);
355 out_csr_->dump(oe_name, new_snapshot_dir);
356 column_.resize(column_idx_.load());
357 column_.dump(new_snapshot_dir +
"/" + edata_name);
368 std::string_view prop;
370 size_t row_id = column_idx_.fetch_add(1);
371 column_.set_value(row_id, prop);
372 in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
373 out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
377 LOG(FATAL) <<
"Not implemented";
382 auto oe_ptr =
out_csr_->edge_iter_mut(src);
385 size_t index = std::numeric_limits<size_t>::max();
386 while (oe !=
nullptr && oe->is_valid()) {
387 if (oe->get_neighbor() == dst) {
388 oe->set_timestamp(ts);
389 index = oe->get_index();
394 auto ie_ptr =
in_csr_->edge_iter_mut(dst);
396 while (ie !=
nullptr && ie->is_valid()) {
397 if (ie->get_neighbor() == src) {
399 index = ie->get_index();
404 if (index != std::numeric_limits<size_t>::max()) {
405 column_.set_value(index, prop);
407 size_t row_id = column_idx_.fetch_add(1);
408 column_.set_value(row_id, prop);
409 in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
410 out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
415 size_t row_id = column_idx_.fetch_add(1);
416 column_.set_value(row_id, data);
417 in_csr_->batch_put_edge_with_index(dst, src, row_id);
418 out_csr_->batch_put_edge_with_index(src, dst, row_id);
422 size_t row_id = column_idx_.fetch_add(1);
423 column_.set_value(row_id, data);
425 in_csr_->batch_put_edge_with_index(dst, src, row_id);
426 out_csr_->batch_put_edge_with_index(src, dst, row_id);
446 const std::vector<std::string>& col_name,
447 const std::vector<PropertyType>& property_types,
448 const std::vector<StorageStrategy>& storage_strategies,
449 bool oe_mutable,
bool ie_mutable)
450 : col_name_(col_name),
451 property_types_(property_types),
452 storage_strategies_(storage_strategies),
496 void InitTable(
const std::string& edata_name,
const std::string& work_dir) {
497 table_.init(edata_name, work_dir, col_name_, property_types_,
498 storage_strategies_);
501 void BatchInit(
const std::string& oe_name,
const std::string& ie_name,
502 const std::string& edata_name,
const std::string& work_dir,
503 const std::vector<int>& oe_degree,
504 const std::vector<int>& ie_degree)
override {
505 size_t ie_num =
in_csr_->batch_init(ie_name, work_dir, ie_degree);
506 size_t oe_num =
out_csr_->batch_init(oe_name, work_dir, oe_degree);
507 table_.resize(std::max(ie_num, oe_num));
509 table_idx_.store(std::max(ie_num, oe_num));
513 const std::string& work_dir,
514 const std::vector<int>& oe_degree,
515 const std::vector<int>& ie_degree)
override {
516 size_t ie_num =
in_csr_->batch_init_in_memory(ie_degree);
517 size_t oe_num =
out_csr_->batch_init_in_memory(oe_degree);
518 table_.resize(std::max(ie_num, oe_num));
519 table_idx_.store(std::max(ie_num, oe_num));
522 void Open(
const std::string& oe_name,
const std::string& ie_name,
523 const std::string& edata_name,
const std::string&
snapshot_dir,
524 const std::string& work_dir)
override {
529 table_.open(edata_name,
snapshot_dir, work_dir, col_name_, property_types_,
531 table_idx_.store(table_.row_num());
533 std::max(table_.row_num() + (table_.row_num() + 4) / 5, 4096ul));
536 void OpenInMemory(
const std::string& oe_name,
const std::string& ie_name,
537 const std::string& edata_name,
539 size_t dst_vertex_cap)
override {
543 table_.open_in_memory(edata_name,
snapshot_dir, col_name_, property_types_,
545 table_idx_.store(table_.row_num());
547 std::max(table_.row_num() + (table_.row_num() + 4) / 5, 4096ul));
551 const std::string& edata_name,
553 size_t dst_vertex_cap)
override {
554 LOG(FATAL) <<
"not supported...";
557 void Dump(
const std::string& oe_name,
const std::string& ie_name,
558 const std::string& edata_name,
559 const std::string& new_snapshot_dir)
override {
560 in_csr_->dump(ie_name, new_snapshot_dir);
561 out_csr_->dump(oe_name, new_snapshot_dir);
562 table_.resize(table_idx_.load());
563 table_.dump(edata_name, new_snapshot_dir);
572 in_csr_->batch_put_edge_with_index(dst, src, row_id);
573 out_csr_->batch_put_edge_with_index(src, dst, row_id);
581 size_t row_id = table_idx_.fetch_add(1);
584 table_.ingest(row_id, oarc);
585 in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
586 out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
590 LOG(FATAL) <<
"Not implemented";
595 auto oe_ptr =
out_csr_->edge_iter_mut(src);
596 grape::InArchive arc;
598 for (
size_t i = 0; i < r.
len; ++i) {
601 grape::OutArchive oarc;
602 oarc.SetSlice(arc.GetBuffer(), arc.GetSize());
604 size_t index = std::numeric_limits<size_t>::max();
605 while (oe !=
nullptr && oe->is_valid()) {
606 if (oe->get_neighbor() == dst) {
607 oe->set_timestamp(ts);
608 index = oe->get_index();
613 auto ie_ptr =
in_csr_->edge_iter_mut(dst);
615 while (ie !=
nullptr && ie->is_valid()) {
616 if (ie->get_neighbor() == src) {
618 index = ie->get_index();
623 if (index != std::numeric_limits<size_t>::max()) {
624 table_.ingest(index, oarc);
626 size_t row_id = table_idx_.fetch_add(1);
627 table_.ingest(row_id, oarc);
628 in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
629 out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
651 #endif // GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
virtual void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir)=0
virtual void SortByEdgeData(timestamp_t ts)=0
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:123
Definition: mutable_csr.h:1061
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:195
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:153
CsrBase * GetInCsr() override
Definition: dual_csr.h:566
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:205
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:579
Definition: immutable_csr.h:53
TypedCsrBase< std::string_view > * out_csr_
Definition: dual_csr.h:437
Table & GetTable()
Definition: dual_csr.h:576
uint32_t timestamp_t
Definition: types.h:30
Definition: csr_base.h:61
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:536
StringColumn column_
Definition: dual_csr.h:439
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:589
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:550
uint32_t vid_t
Definition: types.h:31
virtual void UpdateEdge(vid_t src, vid_t dst, const Any &oarc, timestamp_t timestamp, Allocator &alloc)=0
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, const std::vector< std::string > &col_name, const std::vector< PropertyType > &property_types, const std::vector< StorageStrategy > &storage_strategies, bool oe_mutable, bool ie_mutable)
Definition: dual_csr.h:445
~DualCsr()
Definition: dual_csr.h:137
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:569
Definition: immutable_csr.h:327
const std::vector< StorageStrategy > & storage_strategies_
Definition: dual_csr.h:642
virtual void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir)=0
virtual CsrBase * GetOutCsr()=0
void BatchPutEdge(vid_t src, vid_t dst, size_t row_id)
Definition: dual_csr.h:571
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:364
static void to(const Any &value, T &out)
Definition: types.h:815
Definition: adj_list.h:23
const Table & GetTable() const
Definition: dual_csr.h:578
virtual void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t timestamp, Allocator &alloc)=0
virtual void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap)=0
TypedCsrBase< std::string_view > * in_csr_
Definition: dual_csr.h:436
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:197
TypedCsrBase< RecordView > * in_csr_
Definition: dual_csr.h:643
StorageStrategy
Definition: types.h:58
Definition: mutable_csr.h:1297
void BatchPutEdge(vid_t src, vid_t dst, const std::string &data)
Definition: dual_csr.h:421
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:333
void InitTable(const std::string &edata_name, const std::string &work_dir)
Definition: dual_csr.h:496
TypedCsrBase< EDATA_T > * in_csr_
Definition: dual_csr.h:250
Definition: allocators.h:29
Definition: mutable_csr.h:663
size_t len
Definition: types.h:330
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:301
void Close() override
Definition: dual_csr.h:244
Definition: mutable_csr.h:1236
Any * props
Definition: types.h:331
virtual void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap)=0
CsrBase * GetOutCsr() override
Definition: dual_csr.h:567
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:522
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:161
void Warmup(int thread_num)
Definition: dual_csr.h:76
virtual void resize(vid_t vnum)=0
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:184
virtual size_t edge_num() const =0
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:512
const std::vector< PropertyType > & property_types_
Definition: dual_csr.h:641
Definition: immutable_csr.h:411
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:366
std::atomic< size_t > table_idx_
Definition: dual_csr.h:645
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:351
TypedCsrBase< EDATA_T > * out_csr_
Definition: dual_csr.h:251
virtual CsrBase * GetInCsr()=0
const std::vector< std::string > & col_name_
Definition: dual_csr.h:640
Definition: mutable_csr.h:575
void next() override
Definition: mutable_csr.h:95
Definition: immutable_csr.h:494
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, uint16_t width, bool oe_mutable, bool ie_mutable)
Definition: dual_csr.h:257
CsrBase * GetOutCsr() override
Definition: dual_csr.h:193
const Record & AsRecord() const
Definition: types.h:691
CsrBase * GetOutCsr() override
Definition: dual_csr.h:362
Definition: mutable_csr.h:183
CsrBase * GetInCsr() override
Definition: dual_csr.h:192
Definition: csr_base.h:109
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:568
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:557
Definition: immutable_csr.h:680
void Close() override
Definition: dual_csr.h:429
~DualCsr()
Definition: dual_csr.h:293
virtual ~DualCsrBase()=default
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:380
Definition: immutable_csr.h:774
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:145
virtual void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree)=0
void BatchPutEdge(vid_t src, vid_t dst, const EDATA_T &data)
Definition: dual_csr.h:239
Definition: csr_base.h:143
size_t EdgeNum() const
Definition: dual_csr.h:81
Definition: mutable_csr.h:1170
Definition: mutable_csr.h:143
Definition: csr_base.h:118
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
Definition: loading_config.h:232
Definition: mutable_csr.h:747
EdgeStrategy
Definition: types.h:24
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:168
~DualCsr()
Definition: dual_csr.h:487
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:501
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:323
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:176
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:376
void Close() override
Definition: dual_csr.h:633
Definition: dual_csr.h:101
Definition: mutable_csr.h:956
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:159
Definition: dual_csr.h:29
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, bool oe_mutable, bool ie_mutable)
Definition: dual_csr.h:103
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:312
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:210
Table table_
Definition: dual_csr.h:646
virtual void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree)=0
void Resize(vid_t src_vertex_num, vid_t dst_vertex_num)
Definition: dual_csr.h:71
Definition: mutable_csr.h:105
std::string_view AsStringView() const
Definition: types.h:657
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:363
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:593
std::atomic< size_t > column_idx_
Definition: dual_csr.h:438
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:344
TypedCsrBase< RecordView > * out_csr_
Definition: dual_csr.h:644
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:194
virtual void warmup(int thread_num) const =0
CsrBase * GetInCsr() override
Definition: dual_csr.h:361
void BatchPutEdge(vid_t src, vid_t dst, const std::string_view &data)
Definition: dual_csr.h:414