Go to the documentation of this file.
16 #ifndef STORAGES_RT_MUTABLE_GRAPH_CSR_MUTABLE_CSR_H_
17 #define STORAGES_RT_MUTABLE_GRAPH_CSR_MUTABLE_CSR_H_
21 #include "grape/utils/concurrent_queue.h"
29 void read_file(
const std::string& filename,
void* buffer,
size_t size,
32 void write_file(
const std::string& filename,
const void* buffer,
size_t size,
35 template <
typename EDATA_T>
41 :
cur_(slice.begin()),
end_(slice.end()) {}
66 template <
typename EDATA_T>
72 :
cur_(slice.begin()),
end_(slice.end()) {}
110 :
cur_(slice.begin()),
end_(slice.end()) {}
148 :
cur_(slice.begin()),
end_(slice.end()) {}
182 template <
typename EDATA_T>
197 size_t batch_init(
const std::string& name,
const std::string& work_dir,
198 const std::vector<int>& degree,
199 double reserve_ratio)
override {
200 reserve_ratio = std::max(reserve_ratio, 1.0);
201 size_t vnum = degree.size();
202 adj_lists_.open(work_dir +
"/" + name +
".adj",
true);
205 locks_ =
new grape::SpinLock[vnum];
208 for (
auto d : degree) {
209 edge_num += (std::ceil(d * reserve_ratio));
211 nbr_list_.open(work_dir +
"/" + name +
".nbr",
true);
215 for (
vid_t i = 0; i < vnum; ++i) {
217 int cap = std::ceil(deg * reserve_ratio);
227 double reserve_ratio)
override {
228 reserve_ratio = std::max(reserve_ratio, 1.0);
229 size_t vnum = degree.size();
233 locks_ =
new grape::SpinLock[vnum];
236 for (
auto d : degree) {
237 edge_num += (std::ceil(d * reserve_ratio));
243 for (
vid_t i = 0; i < vnum; ++i) {
245 int cap = std::ceil(deg * reserve_ratio);
256 adj_lists_[src].batch_put_edge(dst, data, ts);
261 for (
size_t i = 0; i != vnum; ++i) {
274 const std::string& work_dir)
override {
279 if (std::filesystem::exists(
snapshot_dir +
"/" + name +
".cap")) {
286 nbr_list_.touch(work_dir +
"/" + name +
".nbr");
287 adj_lists_.open(work_dir +
"/" + name +
".adj",
true);
290 locks_ =
new grape::SpinLock[degree_list.
size()];
293 for (
size_t i = 0; i < degree_list.
size(); ++i) {
294 int degree = degree_list[i];
295 int cap = (*cap_list)[i];
299 if (cap_list != °ree_list) {
306 degree_list.
open(prefix +
".deg",
false);
309 if (std::filesystem::exists(prefix +
".cap")) {
311 cap_list->
open(prefix +
".cap",
false);
317 v_cap = std::max(v_cap, degree_list.
size());
319 locks_ =
new grape::SpinLock[v_cap];
322 for (
size_t i = 0; i < degree_list.
size(); ++i) {
323 int degree = degree_list[i];
324 int cap = (*cap_list)[i];
328 for (
size_t i = degree_list.
size(); i < v_cap; ++i) {
332 if (cap_list != °ree_list) {
339 degree_list.
open(prefix +
".deg",
false);
342 if (std::filesystem::exists(prefix +
".cap")) {
344 cap_list->
open(prefix +
".cap",
false);
347 nbr_list_.open_with_hugepages(prefix +
".nbr");
350 v_cap = std::max(v_cap, degree_list.
size());
353 locks_ =
new grape::SpinLock[v_cap];
356 for (
size_t i = 0; i < degree_list.
size(); ++i) {
357 int degree = degree_list[i];
358 int cap = (*cap_list)[i];
362 for (
size_t i = degree_list.
size(); i < v_cap; ++i) {
366 if (cap_list != °ree_list) {
371 void dump(
const std::string& name,
372 const std::string& new_snapshot_dir)
override {
374 bool reuse_nbr_list =
true;
375 dump_meta(new_snapshot_dir +
"/" + name);
377 std::vector<int> cap_list;
378 degree_list.
open(new_snapshot_dir +
"/" + name +
".deg",
true);
380 cap_list.resize(vnum);
381 bool need_cap_list =
false;
383 for (
size_t i = 0; i < vnum; ++i) {
387 reuse_nbr_list =
false;
394 if (degree_list[i] != cap_list[i]) {
395 need_cap_list =
true;
400 write_file(new_snapshot_dir +
"/" + name +
".cap", cap_list.data(),
401 sizeof(
int), cap_list.size());
404 if (reuse_nbr_list && !
nbr_list_.filename().empty() &&
405 std::filesystem::exists(
nbr_list_.filename())) {
406 std::error_code errorCode;
407 std::filesystem::create_hard_link(
nbr_list_.filename(),
408 new_snapshot_dir +
"/" + name +
".nbr",
411 std::stringstream ss;
412 ss <<
"Failed to create hard link from " <<
nbr_list_.filename()
413 <<
" to " << new_snapshot_dir +
"/" + name +
".snbr"
414 <<
", error code: " << errorCode <<
" " << errorCode.message();
415 LOG(ERROR) << ss.str();
416 throw std::runtime_error(ss.str());
420 fopen((new_snapshot_dir +
"/" + name +
".nbr").c_str(),
"wb");
421 std::string filename = new_snapshot_dir +
"/" + name +
".nbr";
422 if (fout ==
nullptr) {
423 std::stringstream ss;
424 ss <<
"Failed to open nbr list " << filename <<
", " << strerror(errno);
425 LOG(ERROR) << ss.str();
426 throw std::runtime_error(ss.str());
429 for (
size_t i = 0; i < vnum; ++i) {
433 static_cast<size_t>(
adj_lists_[i].capacity())) {
434 std::stringstream ss;
435 ss <<
"Failed to write nbr list " << filename <<
", expected "
436 <<
adj_lists_[i].capacity() <<
", got " << ret <<
", "
438 LOG(ERROR) << ss.str();
439 throw std::runtime_error(ss.str());
443 if ((ret = fflush(fout)) != 0) {
444 std::stringstream ss;
445 ss <<
"Failed to flush nbr list " << filename <<
", error code: " << ret
446 <<
" " << strerror(errno);
447 LOG(ERROR) << ss.str();
448 throw std::runtime_error(ss.str());
450 if ((ret = fclose(fout)) != 0) {
451 std::stringstream ss;
452 ss <<
"Failed to close nbr list " << filename <<
", error code: " << ret
453 <<
" " << strerror(errno);
454 LOG(ERROR) << ss.str();
455 throw std::runtime_error(ss.str());
460 void warmup(
int thread_num)
const override {
462 std::vector<std::thread> threads;
463 std::atomic<size_t> v_i(0);
464 const size_t chunk = 4096;
465 std::atomic<size_t> output(0);
466 for (
int i = 0; i < thread_num; ++i) {
467 threads.emplace_back([&]() {
470 size_t begin = std::min(v_i.fetch_add(chunk), vnum);
471 size_t end = std::min(begin + chunk, vnum);
477 while (begin < end) {
479 for (
auto& nbr : adj_list) {
485 output.fetch_add(ret);
488 for (
auto& thrd : threads) {
491 (void) output.load();
498 for (
size_t k = old_size; k != vnum; ++k) {
502 locks_ =
new grape::SpinLock[vnum];
511 for (
size_t i = 0; i <
adj_lists_.size(); ++i) {
518 return std::make_shared<MutableCsrConstEdgeIter<EDATA_T>>(
get_edges(v));
524 return std::make_shared<MutableCsrEdgeIter<EDATA_T>>(
get_edges_mut(v));
531 adj_lists_[src].put_edge(dst, data, ts, alloc);
554 std::string meta_file_path = prefix +
".meta";
555 if (std::filesystem::exists(meta_file_path)) {
564 std::string meta_file_path = prefix +
".meta";
586 size_t batch_init(
const std::string& name,
const std::string& work_dir,
587 const std::vector<int>& degree,
588 double reserve_ratio)
override {
589 return csr_.batch_init(name, work_dir, degree, reserve_ratio);
593 double reserve)
override {
594 return csr_.batch_init_in_memory(degree, reserve);
599 csr_.batch_put_edge(src, dst, data, ts);
603 const std::string& work_dir)
override {
608 csr_.open_in_memory(prefix, v_cap);
611 void dump(
const std::string& name,
612 const std::string& new_snapshot_dir)
override {
613 csr_.dump(name, new_snapshot_dir);
616 void warmup(
int thread_num)
const override { csr_.warmup(thread_num); }
620 size_t size()
const override {
return csr_.size(); }
622 size_t edge_num()
const override {
return csr_.edge_num(); }
625 return std::make_shared<MutableCsrConstEdgeIter<std::string_view>>(
633 return std::make_shared<MutableCsrEdgeIter<std::string_view>>(
639 csr_.put_edge(src, dst, data, ts, alloc);
644 csr_.put_edge(src, dst, index, ts, alloc);
648 return slice_t(csr_.get_edges(i), column_);
652 return mut_slice_t(csr_.get_edges_mut(i), column_);
655 void close()
override { csr_.close(); }
672 size_t batch_init(
const std::string& name,
const std::string& work_dir,
673 const std::vector<int>& degree,
674 double reserve_ratio = 1.2)
override {
675 return csr_.batch_init(name, work_dir, degree, reserve_ratio);
679 double reserve_ratio = 1.2)
override {
680 return csr_.batch_init_in_memory(degree, reserve_ratio);
685 csr_.batch_put_edge(src, dst, data, ts);
689 const std::string& work_dir)
override {
694 csr_.open_in_memory(prefix, v_cap);
697 void dump(
const std::string& name,
698 const std::string& new_snapshot_dir)
override {
699 csr_.dump(name, new_snapshot_dir);
702 void warmup(
int thread_num)
const override { csr_.warmup(thread_num); }
706 size_t size()
const override {
return csr_.size(); }
708 size_t edge_num()
const override {
return csr_.edge_num(); }
711 return std::make_shared<MutableCsrConstEdgeIter<RecordView>>(
get_edges(v));
718 return std::make_shared<MutableCsrEdgeIter<RecordView>>(
get_edges_mut(v));
723 csr_.put_edge(src, dst, data, ts, alloc);
728 csr_.put_edge(src, dst, index, ts, alloc);
732 return slice_t(csr_.get_edges(i), table_);
739 void close()
override { csr_.close(); }
746 template <
typename EDATA_T>
756 size_t batch_init(
const std::string& name,
const std::string& work_dir,
757 const std::vector<int>& degree,
758 double reserve_ratio)
override {
759 size_t vnum = degree.size();
760 nbr_list_.open(work_dir +
"/" + name +
".snbr",
true);
762 for (
size_t k = 0; k != vnum; ++k) {
763 nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
769 double reserve_ratio)
override {
770 size_t vnum = degree.size();
773 for (
size_t k = 0; k != vnum; ++k) {
774 nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
783 CHECK_EQ(
nbr_list_[src].timestamp.load(),
784 std::numeric_limits<timestamp_t>::max());
791 return std::numeric_limits<timestamp_t>::max();
795 const std::string& work_dir)
override {
796 if (!std::filesystem::exists(work_dir +
"/" + name +
".snbr")) {
798 work_dir +
"/" + name +
".snbr");
800 nbr_list_.open(work_dir +
"/" + name +
".snbr",
true);
810 for (
size_t k = old_size; k != v_cap; ++k) {
811 nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
817 nbr_list_.open_with_hugepages(prefix +
".snbr", v_cap);
819 if (old_size < v_cap) {
821 for (
size_t k = old_size; k != v_cap; ++k) {
822 nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
827 void dump(
const std::string& name,
828 const std::string& new_snapshot_dir)
override {
830 std::filesystem::exists(
nbr_list_.filename()))) {
831 std::error_code errorCode;
832 std::filesystem::create_hard_link(
nbr_list_.filename(),
833 new_snapshot_dir +
"/" + name +
".snbr",
836 std::stringstream ss;
837 ss <<
"Failed to create hard link from " <<
nbr_list_.filename()
838 <<
" to " << new_snapshot_dir +
"/" + name +
".snbr"
839 <<
", error code: " << errorCode <<
" " << errorCode.message();
840 LOG(ERROR) << ss.str();
841 throw std::runtime_error(ss.str());
849 void warmup(
int thread_num)
const override {
851 std::vector<std::thread> threads;
852 std::atomic<size_t> v_i(0);
853 std::atomic<size_t> output(0);
854 const size_t chunk = 4096;
855 for (
int i = 0; i < thread_num; ++i) {
856 threads.emplace_back([&]() {
859 size_t begin = std::min(v_i.fetch_add(chunk), vnum);
860 size_t end = std::min(begin + chunk, vnum);
864 while (begin < end) {
870 output.fetch_add(ret);
873 for (
auto& thrd : threads) {
876 (void) output.load();
883 for (
size_t k = old_size; k != vnum; ++k) {
884 nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
895 for (
size_t k = 0; k !=
nbr_list_.size(); ++k) {
897 std::numeric_limits<timestamp_t>::max()) {
905 return std::make_shared<MutableCsrConstEdgeIter<EDATA_T>>(
get_edges(v));
911 return std::make_shared<MutableCsrEdgeIter<EDATA_T>>(
get_edges_mut(v));
919 CHECK_EQ(
nbr_list_[src].timestamp, std::numeric_limits<timestamp_t>::max());
926 std::numeric_limits<timestamp_t>::max()
929 if (ret.
size() != 0) {
938 std::numeric_limits<timestamp_t>::max()
941 if (ret.
size() != 0) {
966 size_t batch_init(
const std::string& name,
const std::string& work_dir,
967 const std::vector<int>& degree,
968 double reserve_ratio)
override {
969 return csr_.batch_init(name, work_dir, degree, reserve_ratio);
973 double reserve_ratio)
override {
974 return csr_.batch_init_in_memory(degree, reserve_ratio);
979 csr_.batch_put_edge(src, dst, data, ts);
985 return std::numeric_limits<timestamp_t>::max();
989 const std::string& work_dir)
override {
994 csr_.open_in_memory(prefix, v_cap);
997 void dump(
const std::string& name,
998 const std::string& new_snapshot_dir)
override {
999 csr_.dump(name, new_snapshot_dir);
1002 void warmup(
int thread_num)
const override { csr_.warmup(thread_num); }
1006 size_t size()
const override {
return csr_.size(); }
1008 size_t edge_num()
const override {
return csr_.edge_num(); }
1011 return std::make_shared<MutableCsrConstEdgeIter<std::string_view>>(
1020 return std::make_shared<MutableCsrEdgeIter<std::string_view>>(
1026 csr_.put_edge(src, dst, data, ts, alloc);
1031 put_edge(src, dst, index, ts, alloc);
1035 auto ret = csr_.get_edges(i);
1040 auto ret = csr_.get_edges_mut(i);
1046 auto nbr_tmp = csr_.get_edge(i);
1048 nbr.
timestamp.store(nbr_tmp.timestamp.load());
1049 nbr.
data = column_.get_view(nbr_tmp.data);
1070 size_t batch_init(
const std::string& name,
const std::string& work_dir,
1071 const std::vector<int>& degree,
1072 double reserve_ratio)
override {
1073 return csr_.batch_init(name, work_dir, degree, reserve_ratio);
1077 double reserve_ratio)
override {
1078 return csr_.batch_init_in_memory(degree, reserve_ratio);
1083 csr_.batch_put_edge(src, dst, data, ts);
1089 return std::numeric_limits<timestamp_t>::max();
1093 const std::string& work_dir)
override {
1098 csr_.open_in_memory(prefix, v_cap);
1102 const std::string& new_snapshot_dir)
override {
1103 csr_.dump(name, new_snapshot_dir);
1106 void warmup(
int thread_num)
const override { csr_.warmup(thread_num); }
1110 size_t size()
const override {
return csr_.size(); }
1112 size_t edge_num()
const override {
return csr_.edge_num(); }
1115 return std::make_shared<MutableCsrConstEdgeIter<RecordView>>(
get_edges(v));
1123 return std::make_shared<MutableCsrEdgeIter<RecordView>>(
get_edges_mut(v));
1128 csr_.put_edge(src, dst, data, ts, alloc);
1133 put_edge(src, dst, index, ts, alloc);
1137 auto ret = csr_.get_edges(i);
1142 auto ret = csr_.get_edges_mut(i);
1158 auto nbr = csr_.get_edge(i);
1159 return RecordNbr(&nbr, table_);
1169 template <
typename EDATA_T>
1177 size_t batch_init(
const std::string& name,
const std::string& work_dir,
1178 const std::vector<int>& degree,
1179 double reserve_ratio)
override {
1184 double reserve_ratio)
override {
1189 const std::string& work_dir)
override {}
1196 const std::string& new_snapshot_dir)
override {}
1198 void warmup(
int thread_num)
const override {}
1202 size_t size()
const override {
return 0; }
1212 return std::make_shared<MutableCsrConstEdgeIter<EDATA_T>>(
1220 return std::make_shared<MutableCsrEdgeIter<EDATA_T>>(
1227 return std::numeric_limits<timestamp_t>::max();
1244 size_t batch_init(
const std::string& name,
const std::string& work_dir,
1245 const std::vector<int>& degree,
1246 double reserve_ratio)
override {
1250 double reserve_ratio)
override {
1255 const std::string& work_dir)
override {}
1260 const std::string& new_snapshot_dir)
override {}
1262 void warmup(
int thread_num)
const override {}
1266 size_t size()
const override {
return 0; }
1275 return std::make_shared<MutableCsrConstEdgeIter<std::string_view>>(
1283 return std::make_shared<MutableCsrEdgeIter<std::string_view>>(
1304 size_t batch_init(
const std::string& name,
const std::string& work_dir,
1305 const std::vector<int>& degree,
1306 double reserve_ratio)
override {
1311 double reserve_ratio)
override {
1316 const std::string& work_dir)
override {}
1321 const std::string& new_snapshot_dir)
override {}
1323 void warmup(
int thread_num)
const override {}
1327 size_t size()
const override {
return 0; }
1336 return std::make_shared<MutableCsrConstEdgeIter<RecordView>>(
1344 return std::make_shared<MutableCsrEdgeIter<RecordView>>(
1360 #endif // STORAGES_RT_MUTABLE_GRAPH_CSR_MUTABLE_CSR_H_
timestamp_t get_timestamp() const override
Definition: mutable_csr.h:117
MutableCsr< size_t > csr_
Definition: mutable_csr.h:659
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: mutable_csr.h:1086
void close() override
Definition: mutable_csr.h:1053
bool is_valid() const override
Definition: mutable_csr.h:96
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:993
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1070
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:697
MutableCsr()
Definition: mutable_csr.h:190
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:123
CsrEdgeIterBase & operator+=(size_t offset) override
Definition: mutable_csr.h:125
~SingleMutableCsr()
Definition: mutable_csr.h:964
void open_with_hugepages(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:1193
void set_size(int size)
Definition: nbr.h:503
~SingleMutableCsr()
Definition: mutable_csr.h:754
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:611
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:726
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1249
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:717
void read_file(const std::string &filename, void *buffer, size_t size, size_t num)
Definition: mutable_csr.cc:23
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:907
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:1039
timestamp_t get_timestamp() const
Definition: nbr.h:275
MutableCsr< size_t > csr_
Definition: mutable_csr.h:743
Any get_data() const override
Definition: mutable_csr.h:114
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:710
EmptyCsr(StringColumn &column)
Definition: mutable_csr.h:1241
void resize(vid_t vnum) override
Definition: mutable_csr.h:1264
size_t edge_num() const override
Definition: mutable_csr.h:1268
nbr_t * cur_
Definition: mutable_csr.h:100
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:1118
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: mutable_csr.h:259
bool is_valid() const override
Definition: mutable_csr.h:134
size_t size() const override
Definition: mutable_csr.h:1202
vid_t get_neighbor() const override
Definition: mutable_csr.h:44
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:1019
size_t size() const override
Definition: mutable_csr.h:620
void set_data(const EDATA_T &val, timestamp_t ts)
Definition: nbr.h:277
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator &alloc)
Definition: mutable_csr.h:721
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:1010
typename MutableNbrSlice< EDATA_T >::const_nbr_ptr_t const_nbr_ptr_t
Definition: mutable_csr.h:37
void warmup(int thread_num) const override
Definition: mutable_csr.h:702
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1310
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:1331
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:607
uint32_t timestamp_t
Definition: types.h:30
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1177
void set_size(int size)
Definition: nbr.h:327
nbr_ptr_t end_
Definition: mutable_csr.h:139
size_t edge_num() const override
Definition: mutable_csr.h:893
void put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:914
void close() override
Definition: mutable_csr.h:1162
MutableNbrSlice< EDATA_T > slice_t
Definition: mutable_csr.h:187
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:586
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:966
timestamp_t unsorted_since() const override
Definition: mutable_csr.h:790
size_t size() const override
Definition: mutable_csr.h:706
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:1188
void set_begin(const_nbr_ptr_t ptr)
Definition: nbr.h:330
void next() override
Definition: mutable_csr.h:133
nbr_t * end_
Definition: mutable_csr.h:101
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts) override
Definition: mutable_csr.h:597
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:632
size_t size() const override
Definition: mutable_csr.h:891
CsrEdgeIterBase & operator+=(size_t offset) override
Definition: mutable_csr.h:86
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:735
void dump_meta(const std::string &prefix) const
Definition: mutable_csr.h:563
void write_file(const std::string &filename, const void *buffer, size_t size, size_t num)
Definition: mutable_csr.cc:50
vid_t get_neighbor() const override
Definition: mutable_csr.h:151
const nbr_t & get_edge(vid_t i) const
Definition: mutable_csr.h:947
MutableCsrEdgeIter(MutableNbrSliceMut< EDATA_T > slice)
Definition: mutable_csr.h:71
Any get_data() const override
Definition: mutable_csr.h:45
typename MutableNbrSliceMut< std::string_view >::nbr_ptr_t nbr_ptr_t
Definition: mutable_csr.h:106
uint32_t vid_t
Definition: types.h:31
Definition: mutable_csr.h:36
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: mutable_csr.h:1224
MutableNbrSliceMut< EDATA_T > mut_slice_t
Definition: mutable_csr.h:188
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:1101
SingleMutableCsr< size_t > csr_
Definition: mutable_csr.h:1166
void close() override
Definition: mutable_csr.h:655
void warmup(int thread_num) const override
Definition: mutable_csr.h:460
void close() override
Definition: mutable_csr.h:1352
Table table_
Definition: mutable_csr.h:1154
void resize(size_t size)
Definition: mmap_array.h:319
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:794
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts=0) override
Definition: mutable_csr.h:683
void close() override
Definition: mutable_csr.h:1291
slice_t get_edges(vid_t i) const override
Definition: mutable_csr.h:1136
SingleMutableCsr()
Definition: mutable_csr.h:753
void batch_put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts=0) override
Definition: mutable_csr.h:1206
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:1029
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:1191
int size() const
Definition: nbr.h:504
const nbr_t * ptr_
Definition: mutable_csr.h:1153
Definition: csr_base.h:43
static void to(const Any &value, T &out)
Definition: types.h:815
bool is_valid() const override
Definition: mutable_csr.h:175
slice_t get_edges(vid_t i) const override
Definition: mutable_csr.h:731
void put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:527
void resize(vid_t vnum) override
Definition: mutable_csr.h:1108
Definition: adj_list.h:33
Definition: adj_list.h:23
SingleMutableCsr(StringColumn &column)
Definition: mutable_csr.h:963
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts) override
Definition: mutable_csr.h:977
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator &alloc)
Definition: mutable_csr.h:1126
typename MutableNbrSliceMut< RecordView >::nbr_ptr_t nbr_ptr_t
Definition: mutable_csr.h:144
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:1259
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:539
std::atomic< timestamp_t > timestamp
Definition: nbr.h:285
nbr_ptr_t end_
Definition: mutable_csr.h:180
RecordNbr(const nbr_t *ptr, Table &table)
Definition: mutable_csr.h:1148
const_nbr_ptr_t cur_
Definition: mutable_csr.h:62
void warmup(int thread_num) const override
Definition: mutable_csr.h:616
slice_t get_edges(vid_t v) const override
Definition: mutable_csr.h:923
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: mutable_csr.h:788
size_t size() const override
Definition: mutable_csr.h:1006
Table & table_
Definition: mutable_csr.h:742
void open(const std::string &filename, bool sync_to_file=false)
Definition: mmap_array.h:129
MutableCsrEdgeIter(MutableNbrSliceMut< std::string_view > slice)
Definition: mutable_csr.h:109
size_t size() const override
Definition: mutable_csr.h:1327
Definition: allocators.h:29
size_t edge_num() const override
Definition: mutable_csr.h:1329
RecordNbr get_edge(vid_t i) const
Definition: mutable_csr.h:1157
~SingleMutableCsr()
Definition: mutable_csr.h:1068
vid_t get_neighbor() const
Definition: nbr.h:274
void batch_put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts) override
Definition: mutable_csr.h:254
mmap_array< nbr_t > nbr_list_
Definition: mutable_csr.h:570
void set_data(const Any &value, timestamp_t ts) override
Definition: mutable_csr.h:119
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:910
size_t edge_num() const override
Definition: mutable_csr.h:1204
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:1215
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:1122
void resize(vid_t vnum) override
Definition: mutable_csr.h:879
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1304
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts=0) override
Definition: mutable_csr.h:1333
~MutableCsrConstEdgeIter()=default
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:629
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator &alloc)
Definition: mutable_csr.h:637
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator &alloc)
Definition: mutable_csr.h:1024
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:693
MutableCsr(StringColumn &column)
Definition: mutable_csr.h:583
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:642
void set_timestamp(timestamp_t ts)
Definition: nbr.h:282
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:1015
timestamp_t unsorted_since() const override
Definition: mutable_csr.h:984
SingleMutableCsr(Table &table)
Definition: mutable_csr.h:1067
grape::SpinLock * locks_
Definition: mutable_csr.h:568
void warmup(int thread_num) const override
Definition: mutable_csr.h:1106
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:972
EDATA_T data
Definition: nbr.h:286
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:520
slice_t get_edges(vid_t v) const override
Definition: mutable_csr.h:1230
Definition: csr_base.h:27
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1183
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:1131
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:904
void load_meta(const std::string &prefix)
Definition: mutable_csr.h:553
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:517
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:827
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:1315
timestamp_t unsorted_since() const override
Definition: mutable_csr.h:1226
slice_t get_edges(vid_t i) const override
Definition: mutable_csr.h:647
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:1097
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:756
MutableCsrEdgeIter(MutableNbrSliceMut< RecordView > slice)
Definition: mutable_csr.h:147
timestamp_t unsorted_since() const override
Definition: mutable_csr.h:1088
size_t size() const override
Definition: mutable_csr.h:1266
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:1219
void set_data(const Any &value, timestamp_t ts) override
Definition: mutable_csr.h:161
size_t size() const
Definition: mmap_array.h:415
size_t edge_num() const override
Definition: mutable_csr.h:622
~MutableCsr()
Definition: mutable_csr.h:670
StringColumn & column_
Definition: mutable_csr.h:1293
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:1343
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:304
void resize(vid_t vnum) override
Definition: mutable_csr.h:1200
void warmup(int thread_num) const override
Definition: mutable_csr.h:1198
mmap_array< nbr_t > nbr_list_
Definition: mutable_csr.h:952
slice_t get_edges(vid_t i) const override
Definition: mutable_csr.h:1034
size_t size() const override
Definition: mutable_csr.h:1110
void next() override
Definition: mutable_csr.h:95
CsrConstEdgeIterBase & operator+=(size_t offset) override
Definition: mutable_csr.h:51
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:1092
void resize(vid_t vnum) override
Definition: mutable_csr.h:618
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: mutable_csr.h:982
const Record & AsRecord() const
Definition: types.h:691
MutableCsrConstEdgeIter(const MutableNbrSlice< EDATA_T > &slice)
Definition: mutable_csr.h:40
void open_with_hugepages(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:337
Definition: mutable_csr.h:183
size_t edge_num() const override
Definition: mutable_csr.h:1008
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:1141
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:714
void close() override
Definition: mutable_csr.h:949
void warmup(int thread_num) const override
Definition: mutable_csr.h:1002
void resize(vid_t vnum) override
Definition: mutable_csr.h:1325
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:1320
void resize(vid_t vnum) override
Definition: mutable_csr.h:1004
void close() override
Definition: mutable_csr.h:1232
nbr_ptr_t cur_
Definition: mutable_csr.h:179
size_t get_index() const
Definition: mutable_csr.h:122
void warmup(int thread_num) const override
Definition: mutable_csr.h:1323
MutableNbrSliceMut< EDATA_T > mut_slice_t
Definition: mutable_csr.h:751
size_t size() const override
Definition: mutable_csr.h:507
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:803
timestamp_t get_timestamp() const override
Definition: mutable_csr.h:48
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio=1.2) override
Definition: mutable_csr.h:672
mmap_array< adjlist_t > adj_lists_
Definition: mutable_csr.h:569
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:602
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts=0) override
Definition: mutable_csr.h:1272
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:935
void set_begin(nbr_t *ptr)
Definition: nbr.h:506
timestamp_t unsorted_since() const override
Definition: mutable_csr.h:271
nbr_ptr_t cur_
Definition: mutable_csr.h:138
Definition: csr_base.h:135
size_t get_index() const
Definition: mutable_csr.h:1151
vid_t get_neighbor() const override
Definition: mutable_csr.h:75
Table & table_
Definition: mutable_csr.h:1165
timestamp_t get_timestamp() const
Definition: mutable_csr.h:1150
void next() override
Definition: mutable_csr.h:50
void batch_put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts=0) override
Definition: mutable_csr.h:779
timestamp_t unsorted_since_
Definition: mutable_csr.h:571
vid_t neighbor
Definition: nbr.h:284
timestamp_t get_timestamp() const override
Definition: mutable_csr.h:157
bool is_valid() const override
Definition: mutable_csr.h:58
size_t size() const override
Definition: mutable_csr.h:97
void copy_file(const std::string &src, const std::string &dst)
Definition: file_names.h:80
~MutableCsrEdgeIter()=default
StringColumn & column_
Definition: mutable_csr.h:1056
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:273
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:1257
~MutableCsr()
Definition: mutable_csr.h:191
Definition: mutable_csr.h:1170
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:768
size_t size() const override
Definition: mutable_csr.h:176
vid_t get_neighbor() const override
Definition: mutable_csr.h:113
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:371
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts) override
Definition: mutable_csr.h:1081
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:997
Definition: loading_config.h:232
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:1114
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio=1.2) override
Definition: mutable_csr.h:678
Definition: mutable_csr.h:747
void put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts, Allocator &) override
Definition: mutable_csr.h:1208
EmptyCsr(Table &table)
Definition: mutable_csr.h:1301
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:1270
size_t get_index() const
Definition: mutable_csr.h:155
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:1282
vid_t get_neighbor() const
Definition: mutable_csr.h:1149
timestamp_t get_timestamp() const override
Definition: mutable_csr.h:79
void resize(vid_t vnum) override
Definition: mutable_csr.h:704
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:1318
Table & table_
Definition: mutable_csr.h:1355
void set_data(const Any &value, timestamp_t ts) override
Definition: mutable_csr.h:81
void warmup(int thread_num) const override
Definition: mutable_csr.h:849
size_t size() const override
Definition: mutable_csr.h:59
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1076
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:1274
MutableNbr< std::string_view > get_edge(vid_t i) const
Definition: mutable_csr.h:1044
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:651
Any get_data() const override
Definition: mutable_csr.h:152
const EDATA_T & get_data() const
Definition: nbr.h:273
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:1339
Any get_data() const override
Definition: mutable_csr.h:76
size_t edge_num() const override
Definition: mutable_csr.h:708
~MutableCsr()
Definition: mutable_csr.h:584
slice_t get_edges(vid_t v) const override
Definition: mutable_csr.h:1348
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1244
CsrEdgeIterBase & operator+=(size_t offset) override
Definition: mutable_csr.h:166
MutableCsr(Table &table)
Definition: mutable_csr.h:669
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:1278
void close() override
Definition: mutable_csr.h:543
RecordView get_data() const
Definition: mutable_csr.h:1152
slice_t get_edges(vid_t v) const override
Definition: mutable_csr.h:1287
slice_t get_edges(vid_t v) const override
Definition: mutable_csr.h:535
const_nbr_ptr_t end_
Definition: mutable_csr.h:63
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:226
void resize(vid_t vnum) override
Definition: mutable_csr.h:494
int size() const
Definition: nbr.h:328
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:1254
void next() override
Definition: mutable_csr.h:174
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:159
StringColumn & column_
Definition: mutable_csr.h:658
static MutableNbrSlice empty()
Definition: nbr.h:335
SingleMutableCsr< size_t > csr_
Definition: mutable_csr.h:1057
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:624
static MutableNbrSliceMut empty()
Definition: nbr.h:511
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve) override
Definition: mutable_csr.h:592
void warmup(int thread_num) const override
Definition: mutable_csr.h:1262
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:988
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:1335
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:1211
Definition: mutable_csr.h:67
void close() override
Definition: mutable_csr.h:739
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:523
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:197
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:1195
std::string_view AsStringView() const
Definition: types.h:657
void open_with_hugepages(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:816
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:688
size_t size() const override
Definition: mutable_csr.h:135
size_t edge_num() const override
Definition: mutable_csr.h:509
MutableNbrSlice< EDATA_T > slice_t
Definition: mutable_csr.h:750
size_t edge_num() const override
Definition: mutable_csr.h:1112