Go to the documentation of this file.
16 #ifndef STORAGES_RT_MUTABLE_GRAPH_CSR_MUTABLE_CSR_H_
17 #define STORAGES_RT_MUTABLE_GRAPH_CSR_MUTABLE_CSR_H_
21 #include "grape/utils/concurrent_queue.h"
29 void read_file(
const std::string& filename,
void* buffer,
size_t size,
32 void write_file(
const std::string& filename,
const void* buffer,
size_t size,
35 template <
typename EDATA_T>
41 :
cur_(slice.begin()),
end_(slice.end()) {}
66 template <
typename EDATA_T>
72 :
cur_(slice.begin()),
end_(slice.end()) {}
110 :
cur_(slice.begin()),
end_(slice.end()) {}
148 :
cur_(slice.begin()),
end_(slice.end()) {}
182 template <
typename EDATA_T>
197 size_t batch_init(
const std::string& name,
const std::string& work_dir,
198 const std::vector<int>& degree,
199 double reserve_ratio)
override {
200 reserve_ratio = std::max(reserve_ratio, 1.0);
201 size_t vnum = degree.size();
202 adj_lists_.open(work_dir +
"/" + name +
".adj",
true);
205 locks_ =
new grape::SpinLock[vnum];
208 for (
auto d : degree) {
209 edge_num += (std::ceil(d * reserve_ratio));
211 nbr_list_.open(work_dir +
"/" + name +
".nbr",
true);
215 for (
vid_t i = 0; i < vnum; ++i) {
217 int cap = std::ceil(deg * reserve_ratio);
227 double reserve_ratio)
override {
228 reserve_ratio = std::max(reserve_ratio, 1.0);
229 size_t vnum = degree.size();
233 locks_ =
new grape::SpinLock[vnum];
236 for (
auto d : degree) {
237 edge_num += (std::ceil(d * reserve_ratio));
243 for (
vid_t i = 0; i < vnum; ++i) {
245 int cap = std::ceil(deg * reserve_ratio);
256 adj_lists_[src].batch_put_edge(dst, data, ts);
261 for (
size_t i = 0; i != vnum; ++i) {
274 const std::string& work_dir)
override {
279 if (std::filesystem::exists(
snapshot_dir +
"/" + name +
".cap")) {
286 nbr_list_.touch(work_dir +
"/" + name +
".nbr");
287 adj_lists_.open(work_dir +
"/" + name +
".adj",
true);
290 locks_ =
new grape::SpinLock[degree_list.
size()];
293 for (
size_t i = 0; i < degree_list.
size(); ++i) {
294 int degree = degree_list[i];
295 int cap = (*cap_list)[i];
299 if (cap_list != °ree_list) {
306 degree_list.
open(prefix +
".deg",
false);
309 if (std::filesystem::exists(prefix +
".cap")) {
311 cap_list->
open(prefix +
".cap",
false);
317 v_cap = std::max(v_cap, degree_list.
size());
319 locks_ =
new grape::SpinLock[v_cap];
322 for (
size_t i = 0; i < degree_list.
size(); ++i) {
323 int degree = degree_list[i];
324 int cap = (*cap_list)[i];
328 for (
size_t i = degree_list.
size(); i < v_cap; ++i) {
332 if (cap_list != °ree_list) {
339 degree_list.
open(prefix +
".deg",
false);
342 if (std::filesystem::exists(prefix +
".cap")) {
344 cap_list->
open(prefix +
".cap",
false);
347 nbr_list_.open_with_hugepages(prefix +
".nbr");
350 v_cap = std::max(v_cap, degree_list.
size());
353 locks_ =
new grape::SpinLock[v_cap];
356 for (
size_t i = 0; i < degree_list.
size(); ++i) {
357 int degree = degree_list[i];
358 int cap = (*cap_list)[i];
362 for (
size_t i = degree_list.
size(); i < v_cap; ++i) {
366 if (cap_list != °ree_list) {
371 void dump(
const std::string& name,
372 const std::string& new_snapshot_dir)
override {
374 bool reuse_nbr_list =
true;
375 dump_meta(new_snapshot_dir +
"/" + name);
377 std::vector<int> cap_list;
378 degree_list.
open(new_snapshot_dir +
"/" + name +
".deg",
true);
380 cap_list.resize(vnum);
381 bool need_cap_list =
false;
383 for (
size_t i = 0; i < vnum; ++i) {
387 reuse_nbr_list =
false;
394 if (degree_list[i] != cap_list[i]) {
395 need_cap_list =
true;
400 write_file(new_snapshot_dir +
"/" + name +
".cap", cap_list.data(),
401 sizeof(
int), cap_list.size());
404 if (reuse_nbr_list && !
nbr_list_.filename().empty() &&
405 std::filesystem::exists(
nbr_list_.filename())) {
406 std::error_code errorCode;
407 std::filesystem::create_hard_link(
nbr_list_.filename(),
408 new_snapshot_dir +
"/" + name +
".nbr",
411 std::stringstream ss;
412 ss <<
"Failed to create hard link from " <<
nbr_list_.filename()
413 <<
" to " << new_snapshot_dir +
"/" + name +
".snbr"
414 <<
", error code: " << errorCode <<
" " << errorCode.message();
415 LOG(ERROR) << ss.str();
416 throw std::runtime_error(ss.str());
420 fopen((new_snapshot_dir +
"/" + name +
".nbr").c_str(),
"wb");
421 std::string filename = new_snapshot_dir +
"/" + name +
".nbr";
422 if (fout ==
nullptr) {
423 std::stringstream ss;
424 ss <<
"Failed to open nbr list " << filename <<
", " << strerror(errno);
425 LOG(ERROR) << ss.str();
426 throw std::runtime_error(ss.str());
429 for (
size_t i = 0; i < vnum; ++i) {
433 static_cast<size_t>(
adj_lists_[i].capacity())) {
434 std::stringstream ss;
435 ss <<
"Failed to write nbr list " << filename <<
", expected "
436 <<
adj_lists_[i].capacity() <<
", got " << ret <<
", "
438 LOG(ERROR) << ss.str();
439 throw std::runtime_error(ss.str());
443 if ((ret = fflush(fout)) != 0) {
444 std::stringstream ss;
445 ss <<
"Failed to flush nbr list " << filename <<
", error code: " << ret
446 <<
" " << strerror(errno);
447 LOG(ERROR) << ss.str();
448 throw std::runtime_error(ss.str());
450 if ((ret = fclose(fout)) != 0) {
451 std::stringstream ss;
452 ss <<
"Failed to close nbr list " << filename <<
", error code: " << ret
453 <<
" " << strerror(errno);
454 LOG(ERROR) << ss.str();
455 throw std::runtime_error(ss.str());
460 void warmup(
int thread_num)
const override {
462 std::vector<std::thread> threads;
463 std::atomic<size_t> v_i(0);
464 const size_t chunk = 4096;
465 std::atomic<size_t> output(0);
466 for (
int i = 0; i < thread_num; ++i) {
467 threads.emplace_back([&]() {
470 size_t begin = std::min(v_i.fetch_add(chunk), vnum);
471 size_t end = std::min(begin + chunk, vnum);
477 while (begin < end) {
479 for (
auto& nbr : adj_list) {
485 output.fetch_add(ret);
488 for (
auto& thrd : threads) {
491 (void) output.load();
498 for (
size_t k = old_size; k != vnum; ++k) {
502 locks_ =
new grape::SpinLock[vnum];
511 for (
size_t i = 0; i <
adj_lists_.size(); ++i) {
518 return std::make_shared<MutableCsrConstEdgeIter<EDATA_T>>(
get_edges(v));
524 return std::make_shared<MutableCsrEdgeIter<EDATA_T>>(
get_edges_mut(v));
531 adj_lists_[src].put_edge(dst, data, ts, alloc);
552 std::string meta_file_path = prefix +
".meta";
553 if (std::filesystem::exists(meta_file_path)) {
562 std::string meta_file_path = prefix +
".meta";
584 size_t batch_init(
const std::string& name,
const std::string& work_dir,
585 const std::vector<int>& degree,
586 double reserve_ratio)
override {
587 return csr_.batch_init(name, work_dir, degree, reserve_ratio);
591 double reserve)
override {
592 return csr_.batch_init_in_memory(degree, reserve);
597 csr_.batch_put_edge(src, dst, data, ts);
601 const std::string& work_dir)
override {
606 csr_.open_in_memory(prefix, v_cap);
609 void dump(
const std::string& name,
610 const std::string& new_snapshot_dir)
override {
611 csr_.dump(name, new_snapshot_dir);
614 void warmup(
int thread_num)
const override { csr_.warmup(thread_num); }
618 size_t size()
const override {
return csr_.size(); }
620 size_t edge_num()
const override {
return csr_.edge_num(); }
623 return std::make_shared<MutableCsrConstEdgeIter<std::string_view>>(
631 return std::make_shared<MutableCsrEdgeIter<std::string_view>>(
637 csr_.put_edge(src, dst, data, ts, alloc);
642 csr_.put_edge(src, dst, index, ts, alloc);
646 return slice_t(csr_.get_edges(i), column_);
650 return mut_slice_t(csr_.get_edges_mut(i), column_);
653 void close()
override { csr_.close(); }
670 size_t batch_init(
const std::string& name,
const std::string& work_dir,
671 const std::vector<int>& degree,
672 double reserve_ratio = 1.2)
override {
673 return csr_.batch_init(name, work_dir, degree, reserve_ratio);
677 double reserve_ratio = 1.2)
override {
678 return csr_.batch_init_in_memory(degree, reserve_ratio);
683 csr_.batch_put_edge(src, dst, data, ts);
687 const std::string& work_dir)
override {
692 csr_.open_in_memory(prefix, v_cap);
695 void dump(
const std::string& name,
696 const std::string& new_snapshot_dir)
override {
697 csr_.dump(name, new_snapshot_dir);
700 void warmup(
int thread_num)
const override { csr_.warmup(thread_num); }
704 size_t size()
const override {
return csr_.size(); }
706 size_t edge_num()
const override {
return csr_.edge_num(); }
709 return std::make_shared<MutableCsrConstEdgeIter<RecordView>>(
get_edges(v));
716 return std::make_shared<MutableCsrEdgeIter<RecordView>>(
get_edges_mut(v));
721 csr_.put_edge(src, dst, data, ts, alloc);
726 csr_.put_edge(src, dst, index, ts, alloc);
730 return slice_t(csr_.get_edges(i), table_);
737 void close()
override { csr_.close(); }
744 template <
typename EDATA_T>
754 size_t batch_init(
const std::string& name,
const std::string& work_dir,
755 const std::vector<int>& degree,
756 double reserve_ratio)
override {
757 size_t vnum = degree.size();
758 nbr_list_.open(work_dir +
"/" + name +
".snbr",
true);
760 for (
size_t k = 0; k != vnum; ++k) {
761 nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
767 double reserve_ratio)
override {
768 size_t vnum = degree.size();
771 for (
size_t k = 0; k != vnum; ++k) {
772 nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
781 CHECK_EQ(
nbr_list_[src].timestamp.load(),
782 std::numeric_limits<timestamp_t>::max());
789 return std::numeric_limits<timestamp_t>::max();
793 const std::string& work_dir)
override {
794 if (!std::filesystem::exists(work_dir +
"/" + name +
".snbr")) {
796 work_dir +
"/" + name +
".snbr");
798 nbr_list_.open(work_dir +
"/" + name +
".snbr",
true);
808 for (
size_t k = old_size; k != v_cap; ++k) {
809 nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
815 nbr_list_.open_with_hugepages(prefix +
".snbr", v_cap);
817 if (old_size < v_cap) {
819 for (
size_t k = old_size; k != v_cap; ++k) {
820 nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
825 void dump(
const std::string& name,
826 const std::string& new_snapshot_dir)
override {
828 std::filesystem::exists(
nbr_list_.filename()))) {
829 std::error_code errorCode;
830 std::filesystem::create_hard_link(
nbr_list_.filename(),
831 new_snapshot_dir +
"/" + name +
".snbr",
834 std::stringstream ss;
835 ss <<
"Failed to create hard link from " <<
nbr_list_.filename()
836 <<
" to " << new_snapshot_dir +
"/" + name +
".snbr"
837 <<
", error code: " << errorCode <<
" " << errorCode.message();
838 LOG(ERROR) << ss.str();
839 throw std::runtime_error(ss.str());
847 void warmup(
int thread_num)
const override {
849 std::vector<std::thread> threads;
850 std::atomic<size_t> v_i(0);
851 std::atomic<size_t> output(0);
852 const size_t chunk = 4096;
853 for (
int i = 0; i < thread_num; ++i) {
854 threads.emplace_back([&]() {
857 size_t begin = std::min(v_i.fetch_add(chunk), vnum);
858 size_t end = std::min(begin + chunk, vnum);
862 while (begin < end) {
868 output.fetch_add(ret);
871 for (
auto& thrd : threads) {
874 (void) output.load();
881 for (
size_t k = old_size; k != vnum; ++k) {
882 nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
893 for (
size_t k = 0; k !=
nbr_list_.size(); ++k) {
895 std::numeric_limits<timestamp_t>::max()) {
903 return std::make_shared<MutableCsrConstEdgeIter<EDATA_T>>(
get_edges(v));
909 return std::make_shared<MutableCsrEdgeIter<EDATA_T>>(
get_edges_mut(v));
917 CHECK_EQ(
nbr_list_[src].timestamp, std::numeric_limits<timestamp_t>::max());
924 std::numeric_limits<timestamp_t>::max()
927 if (ret.
size() != 0) {
936 std::numeric_limits<timestamp_t>::max()
939 if (ret.
size() != 0) {
964 size_t batch_init(
const std::string& name,
const std::string& work_dir,
965 const std::vector<int>& degree,
966 double reserve_ratio)
override {
967 return csr_.batch_init(name, work_dir, degree, reserve_ratio);
971 double reserve_ratio)
override {
972 return csr_.batch_init_in_memory(degree, reserve_ratio);
977 csr_.batch_put_edge(src, dst, data, ts);
983 return std::numeric_limits<timestamp_t>::max();
987 const std::string& work_dir)
override {
992 csr_.open_in_memory(prefix, v_cap);
995 void dump(
const std::string& name,
996 const std::string& new_snapshot_dir)
override {
997 csr_.dump(name, new_snapshot_dir);
1000 void warmup(
int thread_num)
const override { csr_.warmup(thread_num); }
1004 size_t size()
const override {
return csr_.size(); }
1006 size_t edge_num()
const override {
return csr_.edge_num(); }
1009 return std::make_shared<MutableCsrConstEdgeIter<std::string_view>>(
1018 return std::make_shared<MutableCsrEdgeIter<std::string_view>>(
1024 csr_.put_edge(src, dst, data, ts, alloc);
1029 put_edge(src, dst, index, ts, alloc);
1033 auto ret = csr_.get_edges(i);
1038 auto ret = csr_.get_edges_mut(i);
1044 auto nbr_tmp = csr_.get_edge(i);
1046 nbr.
timestamp.store(nbr_tmp.timestamp.load());
1047 nbr.
data = column_.get_view(nbr_tmp.data);
1068 size_t batch_init(
const std::string& name,
const std::string& work_dir,
1069 const std::vector<int>& degree,
1070 double reserve_ratio)
override {
1071 return csr_.batch_init(name, work_dir, degree, reserve_ratio);
1075 double reserve_ratio)
override {
1076 return csr_.batch_init_in_memory(degree, reserve_ratio);
1081 csr_.batch_put_edge(src, dst, data, ts);
1087 return std::numeric_limits<timestamp_t>::max();
1091 const std::string& work_dir)
override {
1096 csr_.open_in_memory(prefix, v_cap);
1100 const std::string& new_snapshot_dir)
override {
1101 csr_.dump(name, new_snapshot_dir);
1104 void warmup(
int thread_num)
const override { csr_.warmup(thread_num); }
1108 size_t size()
const override {
return csr_.size(); }
1110 size_t edge_num()
const override {
return csr_.edge_num(); }
1113 return std::make_shared<MutableCsrConstEdgeIter<RecordView>>(
get_edges(v));
1121 return std::make_shared<MutableCsrEdgeIter<RecordView>>(
get_edges_mut(v));
1126 csr_.put_edge(src, dst, data, ts, alloc);
1131 put_edge(src, dst, index, ts, alloc);
1135 auto ret = csr_.get_edges(i);
1140 auto ret = csr_.get_edges_mut(i);
1156 auto nbr = csr_.get_edge(i);
1157 return RecordNbr(&nbr, table_);
1167 template <
typename EDATA_T>
1175 size_t batch_init(
const std::string& name,
const std::string& work_dir,
1176 const std::vector<int>& degree,
1177 double reserve_ratio)
override {
1182 double reserve_ratio)
override {
1187 const std::string& work_dir)
override {}
1194 const std::string& new_snapshot_dir)
override {}
1196 void warmup(
int thread_num)
const override {}
1200 size_t size()
const override {
return 0; }
1210 return std::make_shared<MutableCsrConstEdgeIter<EDATA_T>>(
1218 return std::make_shared<MutableCsrEdgeIter<EDATA_T>>(
1225 return std::numeric_limits<timestamp_t>::max();
1242 size_t batch_init(
const std::string& name,
const std::string& work_dir,
1243 const std::vector<int>& degree,
1244 double reserve_ratio)
override {
1248 double reserve_ratio)
override {
1253 const std::string& work_dir)
override {}
1258 const std::string& new_snapshot_dir)
override {}
1260 void warmup(
int thread_num)
const override {}
1264 size_t size()
const override {
return 0; }
1273 return std::make_shared<MutableCsrConstEdgeIter<std::string_view>>(
1281 return std::make_shared<MutableCsrEdgeIter<std::string_view>>(
1300 size_t batch_init(
const std::string& name,
const std::string& work_dir,
1301 const std::vector<int>& degree,
1302 double reserve_ratio)
override {
1307 double reserve_ratio)
override {
1312 const std::string& work_dir)
override {}
1317 const std::string& new_snapshot_dir)
override {}
1319 void warmup(
int thread_num)
const override {}
1323 size_t size()
const override {
return 0; }
1332 return std::make_shared<MutableCsrConstEdgeIter<RecordView>>(
1340 return std::make_shared<MutableCsrEdgeIter<RecordView>>(
1354 #endif // STORAGES_RT_MUTABLE_GRAPH_CSR_MUTABLE_CSR_H_
timestamp_t get_timestamp() const override
Definition: mutable_csr.h:117
MutableCsr< size_t > csr_
Definition: mutable_csr.h:657
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: mutable_csr.h:1084
void close() override
Definition: mutable_csr.h:1051
bool is_valid() const override
Definition: mutable_csr.h:96
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:991
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1068
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:695
MutableCsr()
Definition: mutable_csr.h:190
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:123
CsrEdgeIterBase & operator+=(size_t offset) override
Definition: mutable_csr.h:125
~SingleMutableCsr()
Definition: mutable_csr.h:962
void open_with_hugepages(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:1191
void set_size(int size)
Definition: nbr.h:438
~SingleMutableCsr()
Definition: mutable_csr.h:752
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:609
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:724
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1247
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:715
void read_file(const std::string &filename, void *buffer, size_t size, size_t num)
Definition: mutable_csr.cc:23
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:905
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:1037
timestamp_t get_timestamp() const
Definition: nbr.h:210
MutableCsr< size_t > csr_
Definition: mutable_csr.h:741
Any get_data() const override
Definition: mutable_csr.h:114
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:708
EmptyCsr(StringColumn &column)
Definition: mutable_csr.h:1239
void resize(vid_t vnum) override
Definition: mutable_csr.h:1262
size_t edge_num() const override
Definition: mutable_csr.h:1266
nbr_t * cur_
Definition: mutable_csr.h:100
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:1116
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: mutable_csr.h:259
bool is_valid() const override
Definition: mutable_csr.h:134
size_t size() const override
Definition: mutable_csr.h:1200
vid_t get_neighbor() const override
Definition: mutable_csr.h:44
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:1017
size_t size() const override
Definition: mutable_csr.h:618
void set_data(const EDATA_T &val, timestamp_t ts)
Definition: nbr.h:212
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator &alloc)
Definition: mutable_csr.h:719
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:1008
typename MutableNbrSlice< EDATA_T >::const_nbr_ptr_t const_nbr_ptr_t
Definition: mutable_csr.h:37
void warmup(int thread_num) const override
Definition: mutable_csr.h:700
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1306
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:1327
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:605
uint32_t timestamp_t
Definition: types.h:30
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1175
void set_size(int size)
Definition: nbr.h:262
nbr_ptr_t end_
Definition: mutable_csr.h:139
size_t edge_num() const override
Definition: mutable_csr.h:891
void put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:912
void close() override
Definition: mutable_csr.h:1160
MutableNbrSlice< EDATA_T > slice_t
Definition: mutable_csr.h:187
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:584
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:964
timestamp_t unsorted_since() const override
Definition: mutable_csr.h:788
size_t size() const override
Definition: mutable_csr.h:704
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:1186
void set_begin(const_nbr_ptr_t ptr)
Definition: nbr.h:265
void next() override
Definition: mutable_csr.h:133
nbr_t * end_
Definition: mutable_csr.h:101
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts) override
Definition: mutable_csr.h:595
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:630
size_t size() const override
Definition: mutable_csr.h:889
CsrEdgeIterBase & operator+=(size_t offset) override
Definition: mutable_csr.h:86
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:733
void dump_meta(const std::string &prefix) const
Definition: mutable_csr.h:561
void write_file(const std::string &filename, const void *buffer, size_t size, size_t num)
Definition: mutable_csr.cc:50
vid_t get_neighbor() const override
Definition: mutable_csr.h:151
const nbr_t & get_edge(vid_t i) const
Definition: mutable_csr.h:945
MutableCsrEdgeIter(MutableNbrSliceMut< EDATA_T > slice)
Definition: mutable_csr.h:71
Any get_data() const override
Definition: mutable_csr.h:45
typename MutableNbrSliceMut< std::string_view >::nbr_ptr_t nbr_ptr_t
Definition: mutable_csr.h:106
uint32_t vid_t
Definition: types.h:31
Definition: mutable_csr.h:36
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: mutable_csr.h:1222
MutableNbrSliceMut< EDATA_T > mut_slice_t
Definition: mutable_csr.h:188
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:1099
SingleMutableCsr< size_t > csr_
Definition: mutable_csr.h:1164
void close() override
Definition: mutable_csr.h:653
void warmup(int thread_num) const override
Definition: mutable_csr.h:460
void close() override
Definition: mutable_csr.h:1346
Table table_
Definition: mutable_csr.h:1152
void resize(size_t size)
Definition: mmap_array.h:319
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:792
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts=0) override
Definition: mutable_csr.h:681
void close() override
Definition: mutable_csr.h:1287
slice_t get_edges(vid_t i) const override
Definition: mutable_csr.h:1134
SingleMutableCsr()
Definition: mutable_csr.h:751
void batch_put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts=0) override
Definition: mutable_csr.h:1204
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:1027
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:1189
int size() const
Definition: nbr.h:439
const nbr_t * ptr_
Definition: mutable_csr.h:1151
Definition: csr_base.h:43
static void to(const Any &value, T &out)
Definition: types.h:799
bool is_valid() const override
Definition: mutable_csr.h:175
slice_t get_edges(vid_t i) const override
Definition: mutable_csr.h:729
void put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:527
void resize(vid_t vnum) override
Definition: mutable_csr.h:1106
Definition: adj_list.h:33
Definition: adj_list.h:23
SingleMutableCsr(StringColumn &column)
Definition: mutable_csr.h:961
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts) override
Definition: mutable_csr.h:975
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator &alloc)
Definition: mutable_csr.h:1124
typename MutableNbrSliceMut< RecordView >::nbr_ptr_t nbr_ptr_t
Definition: mutable_csr.h:144
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:1257
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:539
std::atomic< timestamp_t > timestamp
Definition: nbr.h:220
nbr_ptr_t end_
Definition: mutable_csr.h:180
RecordNbr(const nbr_t *ptr, Table &table)
Definition: mutable_csr.h:1146
const_nbr_ptr_t cur_
Definition: mutable_csr.h:62
void warmup(int thread_num) const override
Definition: mutable_csr.h:614
slice_t get_edges(vid_t v) const override
Definition: mutable_csr.h:921
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: mutable_csr.h:786
size_t size() const override
Definition: mutable_csr.h:1004
Table & table_
Definition: mutable_csr.h:740
void open(const std::string &filename, bool sync_to_file=false)
Definition: mmap_array.h:129
MutableCsrEdgeIter(MutableNbrSliceMut< std::string_view > slice)
Definition: mutable_csr.h:109
size_t size() const override
Definition: mutable_csr.h:1323
Definition: allocators.h:29
size_t edge_num() const override
Definition: mutable_csr.h:1325
RecordNbr get_edge(vid_t i) const
Definition: mutable_csr.h:1155
~SingleMutableCsr()
Definition: mutable_csr.h:1066
vid_t get_neighbor() const
Definition: nbr.h:209
void batch_put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts) override
Definition: mutable_csr.h:254
mmap_array< nbr_t > nbr_list_
Definition: mutable_csr.h:568
void set_data(const Any &value, timestamp_t ts) override
Definition: mutable_csr.h:119
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:908
size_t edge_num() const override
Definition: mutable_csr.h:1202
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:1213
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:1120
void resize(vid_t vnum) override
Definition: mutable_csr.h:877
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1300
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts=0) override
Definition: mutable_csr.h:1329
~MutableCsrConstEdgeIter()=default
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:627
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator &alloc)
Definition: mutable_csr.h:635
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator &alloc)
Definition: mutable_csr.h:1022
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:691
MutableCsr(StringColumn &column)
Definition: mutable_csr.h:581
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:640
void set_timestamp(timestamp_t ts)
Definition: nbr.h:217
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:1013
timestamp_t unsorted_since() const override
Definition: mutable_csr.h:982
SingleMutableCsr(Table &table)
Definition: mutable_csr.h:1065
grape::SpinLock * locks_
Definition: mutable_csr.h:566
void warmup(int thread_num) const override
Definition: mutable_csr.h:1104
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:970
EDATA_T data
Definition: nbr.h:221
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:520
slice_t get_edges(vid_t v) const override
Definition: mutable_csr.h:1228
Definition: csr_base.h:27
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1181
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:1129
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:902
void load_meta(const std::string &prefix)
Definition: mutable_csr.h:551
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:517
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:825
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:1311
timestamp_t unsorted_since() const override
Definition: mutable_csr.h:1224
slice_t get_edges(vid_t i) const override
Definition: mutable_csr.h:645
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:1095
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:754
MutableCsrEdgeIter(MutableNbrSliceMut< RecordView > slice)
Definition: mutable_csr.h:147
timestamp_t unsorted_since() const override
Definition: mutable_csr.h:1086
size_t size() const override
Definition: mutable_csr.h:1264
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:1217
void set_data(const Any &value, timestamp_t ts) override
Definition: mutable_csr.h:161
size_t size() const
Definition: mmap_array.h:415
size_t edge_num() const override
Definition: mutable_csr.h:620
~MutableCsr()
Definition: mutable_csr.h:668
StringColumn & column_
Definition: mutable_csr.h:1289
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:1339
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:304
void resize(vid_t vnum) override
Definition: mutable_csr.h:1198
void warmup(int thread_num) const override
Definition: mutable_csr.h:1196
mmap_array< nbr_t > nbr_list_
Definition: mutable_csr.h:950
slice_t get_edges(vid_t i) const override
Definition: mutable_csr.h:1032
size_t size() const override
Definition: mutable_csr.h:1108
void next() override
Definition: mutable_csr.h:95
CsrConstEdgeIterBase & operator+=(size_t offset) override
Definition: mutable_csr.h:51
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:1090
void resize(vid_t vnum) override
Definition: mutable_csr.h:616
void batch_sort_by_edge_data(timestamp_t ts) override
Definition: mutable_csr.h:980
const Record & AsRecord() const
Definition: types.h:675
MutableCsrConstEdgeIter(const MutableNbrSlice< EDATA_T > &slice)
Definition: mutable_csr.h:40
void open_with_hugepages(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:337
Definition: mutable_csr.h:183
size_t edge_num() const override
Definition: mutable_csr.h:1006
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:1139
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:712
void close() override
Definition: mutable_csr.h:947
void warmup(int thread_num) const override
Definition: mutable_csr.h:1000
void resize(vid_t vnum) override
Definition: mutable_csr.h:1321
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:1316
void resize(vid_t vnum) override
Definition: mutable_csr.h:1002
void close() override
Definition: mutable_csr.h:1230
nbr_ptr_t cur_
Definition: mutable_csr.h:179
size_t get_index() const
Definition: mutable_csr.h:122
void warmup(int thread_num) const override
Definition: mutable_csr.h:1319
MutableNbrSliceMut< EDATA_T > mut_slice_t
Definition: mutable_csr.h:749
size_t size() const override
Definition: mutable_csr.h:507
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:801
timestamp_t get_timestamp() const override
Definition: mutable_csr.h:48
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio=1.2) override
Definition: mutable_csr.h:670
mmap_array< adjlist_t > adj_lists_
Definition: mutable_csr.h:567
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:600
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts=0) override
Definition: mutable_csr.h:1270
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:933
void set_begin(nbr_t *ptr)
Definition: nbr.h:441
timestamp_t unsorted_since() const override
Definition: mutable_csr.h:271
nbr_ptr_t cur_
Definition: mutable_csr.h:138
Definition: csr_base.h:135
size_t get_index() const
Definition: mutable_csr.h:1149
vid_t get_neighbor() const override
Definition: mutable_csr.h:75
Table & table_
Definition: mutable_csr.h:1163
timestamp_t get_timestamp() const
Definition: mutable_csr.h:1148
void next() override
Definition: mutable_csr.h:50
void batch_put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts=0) override
Definition: mutable_csr.h:777
timestamp_t unsorted_since_
Definition: mutable_csr.h:569
vid_t neighbor
Definition: nbr.h:219
timestamp_t get_timestamp() const override
Definition: mutable_csr.h:157
bool is_valid() const override
Definition: mutable_csr.h:58
size_t size() const override
Definition: mutable_csr.h:97
void copy_file(const std::string &src, const std::string &dst)
Definition: file_names.h:80
~MutableCsrEdgeIter()=default
StringColumn & column_
Definition: mutable_csr.h:1054
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:273
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:1255
~MutableCsr()
Definition: mutable_csr.h:191
Definition: mutable_csr.h:1168
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:766
size_t size() const override
Definition: mutable_csr.h:176
vid_t get_neighbor() const override
Definition: mutable_csr.h:113
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:371
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts) override
Definition: mutable_csr.h:1079
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:995
Definition: loading_config.h:232
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:1112
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio=1.2) override
Definition: mutable_csr.h:676
Definition: mutable_csr.h:745
void put_edge(vid_t src, vid_t dst, const EDATA_T &data, timestamp_t ts, Allocator &) override
Definition: mutable_csr.h:1206
EmptyCsr(Table &table)
Definition: mutable_csr.h:1297
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator &alloc) override
Definition: mutable_csr.h:1268
size_t get_index() const
Definition: mutable_csr.h:155
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:1280
vid_t get_neighbor() const
Definition: mutable_csr.h:1147
timestamp_t get_timestamp() const override
Definition: mutable_csr.h:79
void resize(vid_t vnum) override
Definition: mutable_csr.h:702
void open_in_memory(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:1314
Table & table_
Definition: mutable_csr.h:1349
void set_data(const Any &value, timestamp_t ts) override
Definition: mutable_csr.h:81
void warmup(int thread_num) const override
Definition: mutable_csr.h:847
size_t size() const override
Definition: mutable_csr.h:59
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1074
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:1272
MutableNbr< std::string_view > get_edge(vid_t i) const
Definition: mutable_csr.h:1042
mut_slice_t get_edges_mut(vid_t i)
Definition: mutable_csr.h:649
Any get_data() const override
Definition: mutable_csr.h:152
const EDATA_T & get_data() const
Definition: nbr.h:208
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:1335
Any get_data() const override
Definition: mutable_csr.h:76
size_t edge_num() const override
Definition: mutable_csr.h:706
~MutableCsr()
Definition: mutable_csr.h:582
slice_t get_edges(vid_t v) const override
Definition: mutable_csr.h:1344
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:1242
CsrEdgeIterBase & operator+=(size_t offset) override
Definition: mutable_csr.h:166
MutableCsr(Table &table)
Definition: mutable_csr.h:667
CsrConstEdgeIterBase * edge_iter_raw(vid_t v) const override
Definition: mutable_csr.h:1276
void close() override
Definition: mutable_csr.h:541
RecordView get_data() const
Definition: mutable_csr.h:1150
slice_t get_edges(vid_t v) const override
Definition: mutable_csr.h:1285
slice_t get_edges(vid_t v) const override
Definition: mutable_csr.h:535
const_nbr_ptr_t end_
Definition: mutable_csr.h:63
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:226
void resize(vid_t vnum) override
Definition: mutable_csr.h:494
int size() const
Definition: nbr.h:263
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:1252
void next() override
Definition: mutable_csr.h:174
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:159
StringColumn & column_
Definition: mutable_csr.h:656
static MutableNbrSlice empty()
Definition: nbr.h:270
SingleMutableCsr< size_t > csr_
Definition: mutable_csr.h:1055
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:622
static MutableNbrSliceMut empty()
Definition: nbr.h:446
size_t batch_init_in_memory(const std::vector< int > °ree, double reserve) override
Definition: mutable_csr.h:590
void warmup(int thread_num) const override
Definition: mutable_csr.h:1260
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:986
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:1331
std::shared_ptr< CsrConstEdgeIterBase > edge_iter(vid_t v) const override
Definition: mutable_csr.h:1209
Definition: mutable_csr.h:67
void close() override
Definition: mutable_csr.h:737
std::shared_ptr< CsrEdgeIterBase > edge_iter_mut(vid_t v) override
Definition: mutable_csr.h:523
size_t batch_init(const std::string &name, const std::string &work_dir, const std::vector< int > °ree, double reserve_ratio) override
Definition: mutable_csr.h:197
void dump(const std::string &name, const std::string &new_snapshot_dir) override
Definition: mutable_csr.h:1193
std::string_view AsStringView() const
Definition: types.h:641
void open_with_hugepages(const std::string &prefix, size_t v_cap) override
Definition: mutable_csr.h:814
void open(const std::string &name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: mutable_csr.h:686
size_t size() const override
Definition: mutable_csr.h:135
size_t edge_num() const override
Definition: mutable_csr.h:509
MutableNbrSlice< EDATA_T > slice_t
Definition: mutable_csr.h:748
size_t edge_num() const override
Definition: mutable_csr.h:1110