Go to the documentation of this file.
16 #ifndef STORAGES_RT_MUTABLE_GRAPH_CSR_IMMUTABLE_CSR_H_
17 #define STORAGES_RT_MUTABLE_GRAPH_CSR_IMMUTABLE_CSR_H_
21 template <
typename EDATA_T>
27 :
cur_(slice.begin()),
end_(slice.end()) {}
52 template <
typename EDATA_T>
58 size_t batch_init(
const std::string& name,
const std::string& work_dir,
59 const std::vector<int>& degree,
60 double reserve_ratio)
override {
61 size_t vnum = degree.size();
62 adj_lists_.open(work_dir +
"/" + name +
".adj",
true);
66 for (
auto d : degree) {
70 nbr_list_.open(work_dir +
"/" + name +
".nbr",
true);
77 for (
vid_t i = 0; i < vnum; ++i) {
94 double reserve_ratio)
override {
95 size_t vnum = degree.size();
100 for (
auto d : degree) {
111 for (
vid_t i = 0; i < vnum; ++i) {
136 for (
size_t i = 0; i != vnum; ++i) {
148 const std::string& work_dir)
override {
157 adj_lists_.open(work_dir +
"/" + name +
".adj",
true);
179 for (
size_t i = 0; i < old_degree_size; ++i) {
197 nbr_list_.open_with_hugepages(prefix +
".nbr");
205 for (
size_t i = 0; i < old_degree_size; ++i) {
220 void dump(
const std::string& name,
221 const std::string& new_snapshot_dir)
override {
222 dump_meta(new_snapshot_dir +
"/" + name);
226 fopen((new_snapshot_dir +
"/" + name +
".deg").c_str(),
"wb");
233 fopen((new_snapshot_dir +
"/" + name +
".nbr").c_str(),
"wb");
234 for (
size_t k = 0; k < vnum; ++k) {
244 void warmup(
int thread_num)
const override {}
250 for (
size_t k = old_size; k != vnum; ++k) {
264 for (
size_t i = 0; i <
adj_lists_.size(); ++i) {
271 return std::make_shared<ImmutableCsrConstEdgeIter<EDATA_T>>(
get_edges(v));
282 LOG(FATAL) <<
"Put single edge is not supported";
300 std::string meta_file_path = prefix +
".meta";
301 if (std::filesystem::exists(meta_file_path)) {
302 FILE* meta_file_fd = fopen(meta_file_path.c_str(),
"r");
305 fclose(meta_file_fd);
312 std::string meta_file_path = prefix +
".meta";
313 FILE* meta_file_fd = fopen((prefix +
".meta").c_str(),
"wb");
315 fflush(meta_file_fd);
316 fclose(meta_file_fd);
325 template <
typename EDATA_T>
334 size_t batch_init(
const std::string& name,
const std::string& work_dir,
335 const std::vector<int>& degree,
336 double reserve_ratio)
override {
337 size_t vnum = degree.size();
338 nbr_list_.open(work_dir +
"/" + name +
".snbr",
true);
340 for (
size_t k = 0; k != vnum; ++k) {
341 nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
347 double reserve_ratio)
override {
348 size_t vnum = degree.size();
351 for (
size_t k = 0; k != vnum; ++k) {
352 nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
359 CHECK_EQ(
nbr_list_[src].neighbor, std::numeric_limits<vid_t>::max());
367 return std::numeric_limits<timestamp_t>::max();
371 const std::string& work_dir)
override {
372 if (!std::filesystem::exists(work_dir +
"/" + name +
".snbr")) {
374 work_dir +
"/" + name +
".snbr");
376 nbr_list_.open(work_dir +
"/" + name +
".snbr",
true);
385 FILE* fin = fopen((prefix +
".snbr").c_str(),
"r");
386 CHECK_EQ(fread(
nbr_list_.data(),
sizeof(
nbr_t), old_size, fin), old_size);
388 for (
size_t k = old_size; k != v_cap; ++k) {
389 nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
395 nbr_list_.open_with_hugepages(prefix +
".snbr", v_cap);
397 if (old_size < v_cap) {
399 for (
size_t k = old_size; k != v_cap; ++k) {
400 nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
405 void dump(
const std::string& name,
406 const std::string& new_snapshot_dir)
override {
408 std::filesystem::exists(
nbr_list_.filename())) {
409 std::filesystem::create_hard_link(
410 nbr_list_.filename(), new_snapshot_dir +
"/" + name +
".snbr");
412 FILE* fp = fopen((new_snapshot_dir +
"/" + name +
".snbr").c_str(),
"wb");
419 void warmup(
int thread_num)
const override {
421 std::vector<std::thread> threads;
422 std::atomic<size_t> v_i(0);
423 std::atomic<size_t> output(0);
424 const size_t chunk = 4096;
425 for (
int i = 0; i < thread_num; ++i) {
426 threads.emplace_back([&]() {
429 size_t begin = std::min(v_i.fetch_add(chunk), vnum);
430 size_t end = std::min(begin + chunk, vnum);
434 while (begin < end) {
440 output.fetch_add(ret);
443 for (
auto& thrd : threads) {
446 (void) output.load();
453 for (
size_t k = old_size; k != vnum; ++k) {
454 nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
465 for (
size_t i = 0; i <
nbr_list_.size(); ++i) {
466 if (
nbr_list_[i].neighbor != std::numeric_limits<vid_t>::max()) {
474 return std::make_shared<ImmutableCsrConstEdgeIter<EDATA_T>>(
get_edges(v));
488 CHECK_EQ(
nbr_list_[src].neighbor, std::numeric_limits<vid_t>::max());
496 nbr_list_[i].neighbor == std::numeric_limits<vid_t>::max() ? 0 : 1);
497 if (ret.
size() != 0) {
521 size_t batch_init(
const std::string& name,
const std::string& work_dir,
522 const std::vector<int>& degree,
523 double reserve_ratio)
override {
524 size_t vnum = degree.size();
525 nbr_list_.open(work_dir +
"/" + name +
".snbr",
true);
527 for (
size_t k = 0; k != vnum; ++k) {
528 nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
534 double reserve_ratio)
override {
535 size_t vnum = degree.size();
538 for (
size_t k = 0; k != vnum; ++k) {
539 nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
546 CHECK_EQ(
nbr_list_[src].neighbor, std::numeric_limits<vid_t>::max());
554 return std::numeric_limits<timestamp_t>::max();
558 const std::string& work_dir)
override {
559 if (!std::filesystem::exists(work_dir +
"/" + name +
".snbr")) {
561 work_dir +
"/" + name +
".snbr");
563 nbr_list_.open(work_dir +
"/" + name +
".snbr",
true);
572 FILE* fin = fopen((prefix +
".snbr").c_str(),
"r");
573 CHECK_EQ(fread(
nbr_list_.data(),
sizeof(
nbr_t), old_size, fin), old_size);
575 for (
size_t k = old_size; k != v_cap; ++k) {
576 nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
582 nbr_list_.open_with_hugepages(prefix +
".snbr", v_cap);
584 if (old_size < v_cap) {
586 for (
size_t k = old_size; k != v_cap; ++k) {
587 nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
592 void dump(
const std::string& name,
593 const std::string& new_snapshot_dir)
override {
595 std::filesystem::exists(
nbr_list_.filename())) {
596 std::filesystem::create_hard_link(
597 nbr_list_.filename(), new_snapshot_dir +
"/" + name +
".snbr");
599 FILE* fp = fopen((new_snapshot_dir +
"/" + name +
".snbr").c_str(),
"wb");
606 void warmup(
int thread_num)
const override {
608 std::vector<std::thread> threads;
609 std::atomic<size_t> v_i(0);
610 std::atomic<size_t> output(0);
611 const size_t chunk = 4096;
612 for (
int i = 0; i < thread_num; ++i) {
613 threads.emplace_back([&]() {
616 size_t begin = std::min(v_i.fetch_add(chunk), vnum);
617 size_t end = std::min(begin + chunk, vnum);
621 while (begin < end) {
627 output.fetch_add(ret);
630 for (
auto& thrd : threads) {
633 (void) output.load();
640 for (
size_t k = old_size; k != vnum; ++k) {
641 nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
652 for (
size_t i = 0; i <
nbr_list_.size(); ++i) {
653 if (
nbr_list_[i].neighbor != std::numeric_limits<vid_t>::max()) {
661 return std::make_shared<ImmutableCsrConstEdgeIter<std::string_view>>(
676 CHECK_EQ(
nbr_list_[src].neighbor, std::numeric_limits<vid_t>::max());
684 nbr_list_[i].neighbor == std::numeric_limits<vid_t>::max() ? 0 : 1);
685 if (ret.
size() != 0) {
706 #endif // STORAGES_RT_MUTABLE_GRAPH_CSR_IMMUTABLE_CSR_H_
mmap_array< nbr_t > nbr_list_
Definition: immutable_csr.h:321
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: immutable_csr.h:481
const_nbr_ptr_t end_
Definition: immutable_csr.h:49
void open_with_hugepages(const std::string &prefix, size_t v_cap) override
Definition: immutable_csr.h:581
mmap_array< nbr_t * > adj_lists_
Definition: immutable_csr.h:319
Definition: immutable_csr.h:53
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: immutable_csr.h:58
void close() override
Definition: immutable_csr.h:292
uint32_t timestamp_t
Definition: types.h:30
void put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator &) override
Definition: immutable_csr.h:673
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: immutable_csr.h:147
void resize(vid_t vnum) override
Definition: immutable_csr.h:246
StringColumn & column_
Definition: immutable_csr.h:700
timestamp_t unsorted_since() const override
Definition: immutable_csr.h:145
timestamp_t get_timestamp() const override
Definition: immutable_csr.h:34
size_t size() const override
Definition: immutable_csr.h:461
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: immutable_csr.h:370
SingleImmutableCsr(StringColumn &column)
Definition: immutable_csr.h:518
slice_t get_edges(vid_t v) const override
Definition: immutable_csr.h:285
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: immutable_csr.h:364
void set_begin(const_nbr_ptr_t ptr)
Definition: nbr.h:112
void batch_put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts) override
Definition: immutable_csr.h:357
CsrConstEdgeIterBase & operator+=(size_t offset) override
Definition: immutable_csr.h:37
SingleImmutableCsr()
Definition: immutable_csr.h:331
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: immutable_csr.h:346
uint32_t vid_t
Definition: types.h:31
const_nbr_ptr_t cur_
Definition: immutable_csr.h:48
vid_t neighbor
Definition: nbr.h:49
void resize(vid_t vnum) override
Definition: immutable_csr.h:636
void resize(size_t size)
Definition: mmap_array.h:319
~SingleImmutableCsr()
Definition: immutable_csr.h:332
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: immutable_csr.h:566
typename ImmutableNbrSlice< EDATA_T >::const_nbr_ptr_t const_nbr_ptr_t
Definition: immutable_csr.h:23
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: immutable_csr.h:405
void dump_meta(const std::string &prefix) const
Definition: immutable_csr.h:311
Definition: adj_list.h:23
void load_meta(const std::string &prefix)
Definition: immutable_csr.h:299
mmap_array< nbr_t > nbr_list_
Definition: immutable_csr.h:701
size_t edge_num() const override
Definition: immutable_csr.h:463
Definition: immutable_csr.h:22
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: immutable_csr.h:521
timestamp_t unsorted_since_
Definition: immutable_csr.h:322
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: immutable_csr.h:168
void put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts, Allocator &) override
Definition: immutable_csr.h:485
int size() const
Definition: nbr.h:173
void open(const std::string &filename, bool sync_to_file=false)
Definition: mmap_array.h:129
Definition: allocators.h:29
void set_size(int size)
Definition: nbr.h:109
void close() override
Definition: immutable_csr.h:697
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: immutable_csr.h:270
size_t size() const override
Definition: immutable_csr.h:260
Definition: csr_base.h:27
T * data()
Definition: mmap_array.h:405
timestamp_t unsorted_since() const override
Definition: immutable_csr.h:366
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: immutable_csr.h:665
Definition: csr_base.h:127
size_t size() const
Definition: mmap_array.h:415
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: immutable_csr.h:220
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: immutable_csr.h:134
Definition: immutable_csr.h:326
vid_t get_neighbor() const override
Definition: immutable_csr.h:30
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: immutable_csr.h:477
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: immutable_csr.h:379
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts) override
Definition: immutable_csr.h:544
EDATA_T data
Definition: nbr.h:50
size_t edge_num() const override
Definition: immutable_csr.h:262
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: immutable_csr.h:273
void batch_put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts) override
Definition: immutable_csr.h:127
size_t edge_num() const override
Definition: immutable_csr.h:650
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: immutable_csr.h:276
mmap_array< int > degree_list_
Definition: immutable_csr.h:320
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: immutable_csr.h:592
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: immutable_csr.h:669
void set_size(int size)
Definition: nbr.h:172
ImmutableNbr< std::string_view > get_edge(vid_t i) const
Definition: immutable_csr.h:691
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: immutable_csr.h:557
size_t size() const override
Definition: immutable_csr.h:648
void copy_file(const std::string &src, const std::string &dst)
Definition: file_names.h:80
void next() override
Definition: immutable_csr.h:36
void resize(vid_t vnum) override
Definition: immutable_csr.h:449
Definition: mmap_array.h:65
~ImmutableCsrConstEdgeIter()=default
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
Definition: loading_config.h:232
timestamp_t unsorted_since() const override
Definition: immutable_csr.h:553
~SingleImmutableCsr()
Definition: immutable_csr.h:519
void warmup(int thread_num) const override
Definition: immutable_csr.h:606
void warmup(int thread_num) const override
Definition: immutable_csr.h:419
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: immutable_csr.h:533
mmap_array< nbr_t > nbr_list_
Definition: immutable_csr.h:508
void open_with_hugepages(const std::string &prefix, size_t v_cap) override
Definition: immutable_csr.h:194
Any get_data() const override
Definition: immutable_csr.h:31
void open_with_hugepages(const std::string &prefix, size_t v_cap) override
Definition: immutable_csr.h:394
void open_with_hugepages(const std::string &filename, size_t capacity=0)
Definition: mmap_array.h:214
void close() override
Definition: immutable_csr.h:503
const nbr_t & get_edge(vid_t i) const
Definition: immutable_csr.h:505
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: immutable_csr.h:660
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: immutable_csr.h:551
bool is_valid() const override
Definition: immutable_csr.h:44
void set_begin(const ImmutableNbr< size_t > *ptr)
Definition: nbr.h:175
ImmutableCsrConstEdgeIter(const ImmutableNbrSlice< EDATA_T > &slice)
Definition: immutable_csr.h:26
int size() const
Definition: nbr.h:110
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: immutable_csr.h:334
void reset()
Definition: mmap_array.h:84
slice_t get_edges(vid_t i) const override
Definition: immutable_csr.h:493
size_t size() const override
Definition: immutable_csr.h:45
void put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts, Allocator &alloc) override
Definition: immutable_csr.h:280
void warmup(int thread_num) const override
Definition: immutable_csr.h:244
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: immutable_csr.h:473
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: immutable_csr.h:93
slice_t get_edges(vid_t i) const override
Definition: immutable_csr.h:681