Flex  0.17.9
dual_csr.h
Go to the documentation of this file.
1 
16 #ifndef GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
17 #define GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
18 
19 #include <stdio.h>
20 
21 #include <grape/serialization/in_archive.h>
24 #include "flex/utils/allocators.h"
26 
27 namespace gs {
28 
29 class DualCsrBase {
30  public:
31  DualCsrBase() = default;
32  virtual ~DualCsrBase() = default;
33  virtual void BatchInit(const std::string& oe_name, const std::string& ie_name,
34  const std::string& edata_name,
35  const std::string& work_dir,
36  const std::vector<int>& oe_degree,
37  const std::vector<int>& ie_degree) = 0;
38 
39  virtual void BatchInitInMemory(const std::string& edata_name,
40  const std::string& work_dir,
41  const std::vector<int>& oe_degree,
42  const std::vector<int>& ie_degree) = 0;
43 
44  virtual void Open(const std::string& oe_name, const std::string& ie_name,
45  const std::string& edata_name,
46  const std::string& snapshot_dir,
47  const std::string& work_dir) = 0;
48  virtual void OpenInMemory(const std::string& oe_name,
49  const std::string& ie_name,
50  const std::string& edata_name,
51  const std::string& snapshot_dir,
52  size_t src_vertex_cap, size_t dst_vertex_cap) = 0;
53  virtual void OpenWithHugepages(const std::string& oe_name,
54  const std::string& ie_name,
55  const std::string& edata_name,
56  const std::string& snapshot_dir,
57  size_t src_vertex_cap,
58  size_t dst_vertex_cap) = 0;
59  virtual void Dump(const std::string& oe_name, const std::string& ie_name,
60  const std::string& edata_name,
61  const std::string& new_snapshot_dir) = 0;
62 
63  virtual void IngestEdge(vid_t src, vid_t dst, grape::OutArchive& oarc,
64  timestamp_t timestamp, Allocator& alloc) = 0;
65 
66  virtual void SortByEdgeData(timestamp_t ts) = 0;
67 
68  virtual void UpdateEdge(vid_t src, vid_t dst, const Any& oarc,
69  timestamp_t timestamp, Allocator& alloc) = 0;
70 
71  void Resize(vid_t src_vertex_num, vid_t dst_vertex_num) {
72  GetInCsr()->resize(dst_vertex_num);
73  GetOutCsr()->resize(src_vertex_num);
74  }
75 
76  void Warmup(int thread_num) {
77  GetInCsr()->warmup(thread_num);
78  GetOutCsr()->warmup(thread_num);
79  }
80 
81  size_t EdgeNum() const {
82  const CsrBase* oe_csr = GetOutCsr();
83  const CsrBase* ie_csr = GetInCsr();
84  if (oe_csr) {
85  return oe_csr->edge_num();
86  } else if (ie_csr) {
87  return ie_csr->edge_num();
88  } else {
89  return 0;
90  }
91  }
92 
93  virtual CsrBase* GetInCsr() = 0;
94  virtual CsrBase* GetOutCsr() = 0;
95  virtual const CsrBase* GetInCsr() const = 0;
96  virtual const CsrBase* GetOutCsr() const = 0;
97  virtual void Close() = 0;
98 };
99 
100 template <typename EDATA_T>
101 class DualCsr : public DualCsrBase {
102  public:
103  DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, bool oe_mutable,
104  bool ie_mutable)
105  : in_csr_(nullptr), out_csr_(nullptr) {
106  if (ie_strategy == EdgeStrategy::kNone) {
107  in_csr_ = new EmptyCsr<EDATA_T>();
108  } else if (ie_strategy == EdgeStrategy::kMultiple) {
109  if (ie_mutable) {
111  } else {
113  }
114  } else if (ie_strategy == EdgeStrategy::kSingle) {
115  if (ie_mutable) {
117  } else {
119  }
120  }
121  if (oe_strategy == EdgeStrategy::kNone) {
122  out_csr_ = new EmptyCsr<EDATA_T>();
123  } else if (oe_strategy == EdgeStrategy::kMultiple) {
124  if (oe_mutable) {
126  } else {
128  }
129  } else if (oe_strategy == EdgeStrategy::kSingle) {
130  if (oe_mutable) {
132  } else {
134  }
135  }
136  }
138  if (in_csr_ != nullptr) {
139  delete in_csr_;
140  }
141  if (out_csr_ != nullptr) {
142  delete out_csr_;
143  }
144  }
145  void BatchInit(const std::string& oe_name, const std::string& ie_name,
146  const std::string& edata_name, const std::string& work_dir,
147  const std::vector<int>& oe_degree,
148  const std::vector<int>& ie_degree) override {
149  in_csr_->batch_init(ie_name, work_dir, ie_degree);
150  out_csr_->batch_init(oe_name, work_dir, oe_degree);
151  }
152 
153  void BatchInitInMemory(const std::string& edata_name,
154  const std::string& work_dir,
155  const std::vector<int>& oe_degree,
156  const std::vector<int>& ie_degree) override {
157  in_csr_->batch_init_in_memory(ie_degree);
158  out_csr_->batch_init_in_memory(oe_degree);
159  }
160 
161  void Open(const std::string& oe_name, const std::string& ie_name,
162  const std::string& edata_name, const std::string& snapshot_dir,
163  const std::string& work_dir) override {
164  in_csr_->open(ie_name, snapshot_dir, work_dir);
165  out_csr_->open(oe_name, snapshot_dir, work_dir);
166  }
167 
168  void OpenInMemory(const std::string& oe_name, const std::string& ie_name,
169  const std::string& edata_name,
170  const std::string& snapshot_dir, size_t src_vertex_cap,
171  size_t dst_vertex_cap) override {
172  in_csr_->open_in_memory(snapshot_dir + "/" + ie_name, dst_vertex_cap);
173  out_csr_->open_in_memory(snapshot_dir + "/" + oe_name, src_vertex_cap);
174  }
175 
176  void OpenWithHugepages(const std::string& oe_name, const std::string& ie_name,
177  const std::string& edata_name,
178  const std::string& snapshot_dir, size_t src_vertex_cap,
179  size_t dst_vertex_cap) override {
180  in_csr_->open_with_hugepages(snapshot_dir + "/" + ie_name, dst_vertex_cap);
181  out_csr_->open_with_hugepages(snapshot_dir + "/" + oe_name, src_vertex_cap);
182  }
183 
184  void Dump(const std::string& oe_name, const std::string& ie_name,
185  const std::string& edata_name,
186  const std::string& new_snapshot_dir) override {
187  in_csr_->dump(ie_name, new_snapshot_dir);
188  out_csr_->dump(oe_name, new_snapshot_dir);
189  Close();
190  }
191 
192  CsrBase* GetInCsr() override { return in_csr_; }
193  CsrBase* GetOutCsr() override { return out_csr_; }
194  const CsrBase* GetInCsr() const override { return in_csr_; }
195  const CsrBase* GetOutCsr() const override { return out_csr_; }
196 
197  void IngestEdge(vid_t src, vid_t dst, grape::OutArchive& oarc, timestamp_t ts,
198  Allocator& alloc) override {
199  EDATA_T data;
200  oarc >> data;
201  in_csr_->put_edge(dst, src, data, ts, alloc);
202  out_csr_->put_edge(src, dst, data, ts, alloc);
203  }
204 
205  void SortByEdgeData(timestamp_t ts) override {
206  in_csr_->batch_sort_by_edge_data(ts);
207  out_csr_->batch_sort_by_edge_data(ts);
208  }
209 
210  void UpdateEdge(vid_t src, vid_t dst, const Any& data, timestamp_t ts,
211  Allocator& alloc) override {
212  auto oe = out_csr_->edge_iter_mut(src);
213  EDATA_T prop;
214  ConvertAny<EDATA_T>::to(data, prop);
215  bool src_flag = false, dst_flag = false;
216  while (oe != nullptr && oe->is_valid()) {
217  if (oe->get_neighbor() == dst) {
218  oe->set_data(prop, ts);
219  src_flag = true;
220  break;
221  }
222  oe->next();
223  }
224  auto ie = in_csr_->edge_iter_mut(dst);
225  while (ie != nullptr && ie->is_valid()) {
226  if (ie->get_neighbor() == src) {
227  dst_flag = true;
228  ie->set_data(prop, ts);
229  break;
230  }
231  ie->next();
232  }
233  if (!(src_flag || dst_flag)) {
234  in_csr_->put_edge(dst, src, prop, ts, alloc);
235  out_csr_->put_edge(src, dst, prop, ts, alloc);
236  }
237  }
238 
239  void BatchPutEdge(vid_t src, vid_t dst, const EDATA_T& data) {
240  in_csr_->batch_put_edge(dst, src, data);
241  out_csr_->batch_put_edge(src, dst, data);
242  }
243 
244  void Close() override {
245  in_csr_->close();
246  out_csr_->close();
247  }
248 
249  private:
252 };
253 
254 template <>
255 class DualCsr<std::string_view> : public DualCsrBase {
256  public:
257  DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, uint16_t width,
258  bool oe_mutable, bool ie_mutable)
259  : in_csr_(nullptr),
260  out_csr_(nullptr),
261  column_(StorageStrategy::kMem, width) {
262  if (ie_strategy == EdgeStrategy::kNone) {
263  in_csr_ = new EmptyCsr<std::string_view>(column_);
264  } else if (ie_strategy == EdgeStrategy::kMultiple) {
265  if (ie_mutable) {
266  in_csr_ = new MutableCsr<std::string_view>(column_);
267  } else {
269  }
270  } else if (ie_strategy == EdgeStrategy::kSingle) {
271  if (ie_mutable) {
273  } else {
275  }
276  }
277  if (oe_strategy == EdgeStrategy::kNone) {
278  out_csr_ = new EmptyCsr<std::string_view>(column_);
279  } else if (oe_strategy == EdgeStrategy::kMultiple) {
280  if (oe_mutable) {
281  out_csr_ = new MutableCsr<std::string_view>(column_);
282  } else {
284  }
285  } else if (oe_strategy == EdgeStrategy::kSingle) {
286  if (oe_mutable) {
288  } else {
290  }
291  }
292  }
294  if (in_csr_ != nullptr) {
295  delete in_csr_;
296  }
297  if (out_csr_ != nullptr) {
298  delete out_csr_;
299  }
300  }
301  void BatchInit(const std::string& oe_name, const std::string& ie_name,
302  const std::string& edata_name, const std::string& work_dir,
303  const std::vector<int>& oe_degree,
304  const std::vector<int>& ie_degree) override {
305  size_t ie_num = in_csr_->batch_init(ie_name, work_dir, ie_degree);
306  size_t oe_num = out_csr_->batch_init(oe_name, work_dir, oe_degree);
307  column_.open(edata_name, "", work_dir);
308  column_.resize(std::max(ie_num, oe_num));
309  column_idx_.store(0);
310  }
311 
312  void BatchInitInMemory(const std::string& edata_name,
313  const std::string& work_dir,
314  const std::vector<int>& oe_degree,
315  const std::vector<int>& ie_degree) override {
316  size_t ie_num = in_csr_->batch_init_in_memory(ie_degree);
317  size_t oe_num = out_csr_->batch_init_in_memory(oe_degree);
318  column_.open(edata_name, "", work_dir);
319  column_.resize(std::max(ie_num, oe_num));
320  column_idx_.store(0);
321  }
322 
323  void Open(const std::string& oe_name, const std::string& ie_name,
324  const std::string& edata_name, const std::string& snapshot_dir,
325  const std::string& work_dir) override {
326  in_csr_->open(ie_name, snapshot_dir, work_dir);
327  out_csr_->open(oe_name, snapshot_dir, work_dir);
328  column_.open(edata_name, snapshot_dir, work_dir);
329  column_idx_.store(column_.size());
330  column_.resize(std::max(column_.size() + (column_.size() + 4) / 5, 4096ul));
331  }
332 
333  void OpenInMemory(const std::string& oe_name, const std::string& ie_name,
334  const std::string& edata_name,
335  const std::string& snapshot_dir, size_t src_vertex_cap,
336  size_t dst_vertex_cap) override {
337  in_csr_->open_in_memory(snapshot_dir + "/" + ie_name, dst_vertex_cap);
338  out_csr_->open_in_memory(snapshot_dir + "/" + oe_name, src_vertex_cap);
339  column_.open_in_memory(snapshot_dir + "/" + edata_name);
340  column_idx_.store(column_.size());
341  column_.resize(std::max(column_.size() + (column_.size() + 4) / 5, 4096ul));
342  }
343 
344  void OpenWithHugepages(const std::string& oe_name, const std::string& ie_name,
345  const std::string& edata_name,
346  const std::string& snapshot_dir, size_t src_vertex_cap,
347  size_t dst_vertex_cap) override {
348  LOG(FATAL) << "not supported...";
349  }
350 
351  void Dump(const std::string& oe_name, const std::string& ie_name,
352  const std::string& edata_name,
353  const std::string& new_snapshot_dir) override {
354  in_csr_->dump(ie_name, new_snapshot_dir);
355  out_csr_->dump(oe_name, new_snapshot_dir);
356  column_.resize(column_idx_.load());
357  column_.dump(new_snapshot_dir + "/" + edata_name);
358  Close();
359  }
360 
361  CsrBase* GetInCsr() override { return in_csr_; }
362  CsrBase* GetOutCsr() override { return out_csr_; }
363  const CsrBase* GetInCsr() const override { return in_csr_; }
364  const CsrBase* GetOutCsr() const override { return out_csr_; }
365 
366  void IngestEdge(vid_t src, vid_t dst, grape::OutArchive& oarc, timestamp_t ts,
367  Allocator& alloc) override {
368  std::string_view prop;
369  oarc >> prop;
370  size_t row_id = column_idx_.fetch_add(1);
371  column_.set_value(row_id, prop);
372  in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
373  out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
374  }
375 
376  void SortByEdgeData(timestamp_t ts) override {
377  LOG(FATAL) << "Not implemented";
378  }
379 
380  void UpdateEdge(vid_t src, vid_t dst, const Any& data, timestamp_t ts,
381  Allocator& alloc) override {
382  auto oe_ptr = out_csr_->edge_iter_mut(src);
383  std::string_view prop = data.AsStringView();
384  auto oe = dynamic_cast<MutableCsrEdgeIter<std::string_view>*>(oe_ptr.get());
385  size_t index = std::numeric_limits<size_t>::max();
386  while (oe != nullptr && oe->is_valid()) {
387  if (oe->get_neighbor() == dst) {
388  oe->set_timestamp(ts);
389  index = oe->get_index();
390  break;
391  }
392  oe->next();
393  }
394  auto ie_ptr = in_csr_->edge_iter_mut(dst);
395  auto ie = dynamic_cast<MutableCsrEdgeIter<std::string_view>*>(ie_ptr.get());
396  while (ie != nullptr && ie->is_valid()) {
397  if (ie->get_neighbor() == src) {
398  ie->set_timestamp(ts);
399  index = ie->get_index();
400  break;
401  }
402  ie->next();
403  }
404  if (index != std::numeric_limits<size_t>::max()) {
405  column_.set_value(index, prop);
406  } else {
407  size_t row_id = column_idx_.fetch_add(1);
408  column_.set_value(row_id, prop);
409  in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
410  out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
411  }
412  }
413 
414  void BatchPutEdge(vid_t src, vid_t dst, const std::string_view& data) {
415  size_t row_id = column_idx_.fetch_add(1);
416  column_.set_value(row_id, data);
417  in_csr_->batch_put_edge_with_index(dst, src, row_id);
418  out_csr_->batch_put_edge_with_index(src, dst, row_id);
419  }
420 
421  void BatchPutEdge(vid_t src, vid_t dst, const std::string& data) {
422  size_t row_id = column_idx_.fetch_add(1);
423  column_.set_value(row_id, data);
424 
425  in_csr_->batch_put_edge_with_index(dst, src, row_id);
426  out_csr_->batch_put_edge_with_index(src, dst, row_id);
427  }
428 
429  void Close() override {
430  in_csr_->close();
431  out_csr_->close();
432  column_.close();
433  }
434 
435  private:
438  std::atomic<size_t> column_idx_;
440 };
441 
442 template <>
443 class DualCsr<RecordView> : public DualCsrBase {
444  public:
445  DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy,
446  const std::vector<std::string>& col_name,
447  const std::vector<PropertyType>& property_types,
448  const std::vector<StorageStrategy>& storage_strategies,
449  bool oe_mutable, bool ie_mutable)
450  : col_name_(col_name),
451  property_types_(property_types),
452  storage_strategies_(storage_strategies),
453  in_csr_(nullptr),
454  out_csr_(nullptr) {
455  if (ie_strategy == EdgeStrategy::kNone) {
456  in_csr_ = new EmptyCsr<RecordView>(table_);
457  } else if (ie_strategy == EdgeStrategy::kMultiple) {
458  if (ie_mutable) {
459  in_csr_ = new MutableCsr<RecordView>(table_);
460  } else {
461  in_csr_ = new ImmutableCsr<RecordView>(table_);
462  }
463  } else {
464  if (ie_mutable) {
466  } else {
468  }
469  }
470  if (oe_strategy == EdgeStrategy::kNone) {
471  out_csr_ = new EmptyCsr<RecordView>(table_);
472  } else if (oe_strategy == EdgeStrategy::kMultiple) {
473  if (oe_mutable) {
474  out_csr_ = new MutableCsr<RecordView>(table_);
475  } else {
476  out_csr_ = new ImmutableCsr<RecordView>(table_);
477  }
478  } else {
479  if (oe_mutable) {
481  } else {
483  }
484  }
485  }
486 
488  if (in_csr_ != nullptr) {
489  delete in_csr_;
490  }
491  if (out_csr_ != nullptr) {
492  delete out_csr_;
493  }
494  }
495 
496  void InitTable(const std::string& edata_name, const std::string& work_dir) {
497  table_.init(edata_name, work_dir, col_name_, property_types_,
498  storage_strategies_);
499  }
500 
501  void BatchInit(const std::string& oe_name, const std::string& ie_name,
502  const std::string& edata_name, const std::string& work_dir,
503  const std::vector<int>& oe_degree,
504  const std::vector<int>& ie_degree) override {
505  size_t ie_num = in_csr_->batch_init(ie_name, work_dir, ie_degree);
506  size_t oe_num = out_csr_->batch_init(oe_name, work_dir, oe_degree);
507  table_.resize(std::max(ie_num, oe_num));
508 
509  table_idx_.store(std::max(ie_num, oe_num));
510  }
511 
512  void BatchInitInMemory(const std::string& edata_name,
513  const std::string& work_dir,
514  const std::vector<int>& oe_degree,
515  const std::vector<int>& ie_degree) override {
516  size_t ie_num = in_csr_->batch_init_in_memory(ie_degree);
517  size_t oe_num = out_csr_->batch_init_in_memory(oe_degree);
518  table_.resize(std::max(ie_num, oe_num));
519  table_idx_.store(std::max(ie_num, oe_num));
520  }
521 
522  void Open(const std::string& oe_name, const std::string& ie_name,
523  const std::string& edata_name, const std::string& snapshot_dir,
524  const std::string& work_dir) override {
525  in_csr_->open(ie_name, snapshot_dir, work_dir);
526  out_csr_->open(oe_name, snapshot_dir, work_dir);
527 
528  // fix me: storage_strategies_ is not used
529  table_.open(edata_name, snapshot_dir, work_dir, col_name_, property_types_,
530  {});
531  table_idx_.store(table_.row_num());
532  table_.resize(
533  std::max(table_.row_num() + (table_.row_num() + 4) / 5, 4096ul));
534  }
535 
536  void OpenInMemory(const std::string& oe_name, const std::string& ie_name,
537  const std::string& edata_name,
538  const std::string& snapshot_dir, size_t src_vertex_cap,
539  size_t dst_vertex_cap) override {
540  in_csr_->open_in_memory(snapshot_dir + "/" + ie_name, dst_vertex_cap);
541  out_csr_->open_in_memory(snapshot_dir + "/" + oe_name, src_vertex_cap);
542  // fix me: storage_strategies_ is not used
543  table_.open_in_memory(edata_name, snapshot_dir, col_name_, property_types_,
544  {});
545  table_idx_.store(table_.row_num());
546  table_.resize(
547  std::max(table_.row_num() + (table_.row_num() + 4) / 5, 4096ul));
548  }
549 
550  void OpenWithHugepages(const std::string& oe_name, const std::string& ie_name,
551  const std::string& edata_name,
552  const std::string& snapshot_dir, size_t src_vertex_cap,
553  size_t dst_vertex_cap) override {
554  LOG(FATAL) << "not supported...";
555  }
556 
557  void Dump(const std::string& oe_name, const std::string& ie_name,
558  const std::string& edata_name,
559  const std::string& new_snapshot_dir) override {
560  in_csr_->dump(ie_name, new_snapshot_dir);
561  out_csr_->dump(oe_name, new_snapshot_dir);
562  table_.resize(table_idx_.load());
563  table_.dump(edata_name, new_snapshot_dir);
564  }
565 
566  CsrBase* GetInCsr() override { return in_csr_; }
567  CsrBase* GetOutCsr() override { return out_csr_; }
568  const CsrBase* GetInCsr() const override { return in_csr_; }
569  const CsrBase* GetOutCsr() const override { return out_csr_; }
570 
571  void BatchPutEdge(vid_t src, vid_t dst, size_t row_id) {
572  in_csr_->batch_put_edge_with_index(dst, src, row_id);
573  out_csr_->batch_put_edge_with_index(src, dst, row_id);
574  }
575 
576  Table& GetTable() { return table_; }
577 
578  const Table& GetTable() const { return table_; }
579  void IngestEdge(vid_t src, vid_t dst, grape::OutArchive& oarc, timestamp_t ts,
580  Allocator& alloc) override {
581  size_t row_id = table_idx_.fetch_add(1);
582  size_t len;
583  oarc >> len;
584  table_.ingest(row_id, oarc);
585  in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
586  out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
587  }
588 
589  void SortByEdgeData(timestamp_t ts) override {
590  LOG(FATAL) << "Not implemented";
591  }
592 
593  void UpdateEdge(vid_t src, vid_t dst, const Any& data, timestamp_t ts,
594  Allocator& alloc) override {
595  auto oe_ptr = out_csr_->edge_iter_mut(src);
596  grape::InArchive arc;
597  Record r = data.AsRecord();
598  for (size_t i = 0; i < r.len; ++i) {
599  arc << r.props[i];
600  }
601  grape::OutArchive oarc;
602  oarc.SetSlice(arc.GetBuffer(), arc.GetSize());
603  auto oe = dynamic_cast<MutableCsrEdgeIter<RecordView>*>(oe_ptr.get());
604  size_t index = std::numeric_limits<size_t>::max();
605  while (oe != nullptr && oe->is_valid()) {
606  if (oe->get_neighbor() == dst) {
607  oe->set_timestamp(ts);
608  index = oe->get_index();
609  break;
610  }
611  oe->next();
612  }
613  auto ie_ptr = in_csr_->edge_iter_mut(dst);
614  auto ie = dynamic_cast<MutableCsrEdgeIter<RecordView>*>(ie_ptr.get());
615  while (ie != nullptr && ie->is_valid()) {
616  if (ie->get_neighbor() == src) {
617  ie->set_timestamp(ts);
618  index = ie->get_index();
619  break;
620  }
621  ie->next();
622  }
623  if (index != std::numeric_limits<size_t>::max()) {
624  table_.ingest(index, oarc);
625  } else {
626  size_t row_id = table_idx_.fetch_add(1);
627  table_.ingest(row_id, oarc);
628  in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
629  out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
630  }
631  }
632 
633  void Close() override {
634  in_csr_->close();
635  out_csr_->close();
636  table_.close();
637  }
638 
639  private:
640  const std::vector<std::string>& col_name_;
641  const std::vector<PropertyType>& property_types_;
642  const std::vector<StorageStrategy>& storage_strategies_;
645  std::atomic<size_t> table_idx_;
647 };
648 
649 } // namespace gs
650 
651 #endif // GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
gs::DualCsrBase::Open
virtual void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir)=0
gs::DualCsrBase::SortByEdgeData
virtual void SortByEdgeData(timestamp_t ts)=0
gs::MutableCsrEdgeIter< std::string_view >::set_timestamp
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:123
gs::SingleMutableCsr< RecordView >
Definition: mutable_csr.h:1061
gs::DualCsr::GetOutCsr
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:195
gs::DualCsr::BatchInitInMemory
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:153
gs::Any
Definition: types.h:399
gs::DualCsr< RecordView >::GetInCsr
CsrBase * GetInCsr() override
Definition: dual_csr.h:566
gs::DualCsr::SortByEdgeData
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:205
gs::DualCsr< RecordView >::IngestEdge
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:579
gs::ImmutableCsr
Definition: immutable_csr.h:53
gs::DualCsr< std::string_view >::out_csr_
TypedCsrBase< std::string_view > * out_csr_
Definition: dual_csr.h:437
gs::DualCsr< RecordView >::GetTable
Table & GetTable()
Definition: dual_csr.h:576
gs::timestamp_t
uint32_t timestamp_t
Definition: types.h:30
gs::CsrBase
Definition: csr_base.h:61
gs::DualCsr< RecordView >::OpenInMemory
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:536
gs::EdgeStrategy::kMultiple
@ kMultiple
gs::DualCsr< std::string_view >::column_
StringColumn column_
Definition: dual_csr.h:439
gs::DualCsr< RecordView >::SortByEdgeData
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:589
gs::DualCsr< RecordView >::OpenWithHugepages
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:550
gs::vid_t
uint32_t vid_t
Definition: types.h:31
gs::DualCsrBase::UpdateEdge
virtual void UpdateEdge(vid_t src, vid_t dst, const Any &oarc, timestamp_t timestamp, Allocator &alloc)=0
gs::DualCsr< RecordView >::DualCsr
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, const std::vector< std::string > &col_name, const std::vector< PropertyType > &property_types, const std::vector< StorageStrategy > &storage_strategies, bool oe_mutable, bool ie_mutable)
Definition: dual_csr.h:445
gs::DualCsr::~DualCsr
~DualCsr()
Definition: dual_csr.h:137
gs::DualCsr< RecordView >::GetOutCsr
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:569
gs::ImmutableCsr< std::string_view >
Definition: immutable_csr.h:327
gs::DualCsr< RecordView >::storage_strategies_
const std::vector< StorageStrategy > & storage_strategies_
Definition: dual_csr.h:642
gs::EdgeStrategy::kNone
@ kNone
gs::DualCsrBase::Dump
virtual void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir)=0
gs::DualCsrBase::GetOutCsr
virtual CsrBase * GetOutCsr()=0
gs::DualCsr< RecordView >::BatchPutEdge
void BatchPutEdge(vid_t src, vid_t dst, size_t row_id)
Definition: dual_csr.h:571
gs::DualCsr< std::string_view >::GetOutCsr
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:364
gs::ConvertAny::to
static void to(const Any &value, T &out)
Definition: types.h:815
gs
Definition: adj_list.h:23
gs::DualCsr< RecordView >::GetTable
const Table & GetTable() const
Definition: dual_csr.h:578
gs::DualCsrBase::IngestEdge
virtual void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t timestamp, Allocator &alloc)=0
gs::DualCsrBase::OpenWithHugepages
virtual void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap)=0
gs::DualCsr< std::string_view >::in_csr_
TypedCsrBase< std::string_view > * in_csr_
Definition: dual_csr.h:436
gs::DualCsr::IngestEdge
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:197
gs::DualCsrBase::Close
virtual void Close()=0
gs::DualCsr< RecordView >::in_csr_
TypedCsrBase< RecordView > * in_csr_
Definition: dual_csr.h:643
gs::StorageStrategy
StorageStrategy
Definition: types.h:58
gs::EmptyCsr< RecordView >
Definition: mutable_csr.h:1297
gs::DualCsr< std::string_view >::BatchPutEdge
void BatchPutEdge(vid_t src, vid_t dst, const std::string &data)
Definition: dual_csr.h:421
gs::DualCsr< std::string_view >::OpenInMemory
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:333
gs::Table
Definition: table.h:30
gs::DualCsr< RecordView >::InitTable
void InitTable(const std::string &edata_name, const std::string &work_dir)
Definition: dual_csr.h:496
gs::DualCsr::in_csr_
TypedCsrBase< EDATA_T > * in_csr_
Definition: dual_csr.h:250
gs::ArenaAllocator
Definition: allocators.h:29
gs::MutableCsr< RecordView >
Definition: mutable_csr.h:663
gs::Record::len
size_t len
Definition: types.h:330
gs::DualCsr< std::string_view >::BatchInit
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:301
gs::DualCsr::Close
void Close() override
Definition: dual_csr.h:244
gs::EmptyCsr< std::string_view >
Definition: mutable_csr.h:1236
gs::Record::props
Any * props
Definition: types.h:331
gs::DualCsrBase::OpenInMemory
virtual void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap)=0
gs::DualCsr< RecordView >::GetOutCsr
CsrBase * GetOutCsr() override
Definition: dual_csr.h:567
gs::DualCsr< RecordView >::Open
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:522
gs::DualCsr::Open
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:161
gs::DualCsrBase::Warmup
void Warmup(int thread_num)
Definition: dual_csr.h:76
gs::CsrBase::resize
virtual void resize(vid_t vnum)=0
gs::DualCsr::Dump
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:184
gs::CsrBase::edge_num
virtual size_t edge_num() const =0
gs::Record
Definition: types.h:316
gs::DualCsr< RecordView >::BatchInitInMemory
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:512
gs::DualCsr< RecordView >::property_types_
const std::vector< PropertyType > & property_types_
Definition: dual_csr.h:641
gs::TypedColumn< std::string_view >
Definition: column.h:347
allocators.h
gs::ImmutableCsr< RecordView >
Definition: immutable_csr.h:411
gs::DualCsr< std::string_view >::IngestEdge
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:366
gs::DualCsr< RecordView >::table_idx_
std::atomic< size_t > table_idx_
Definition: dual_csr.h:645
gs::DualCsr< std::string_view >::Dump
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:351
gs::DualCsr::out_csr_
TypedCsrBase< EDATA_T > * out_csr_
Definition: dual_csr.h:251
gs::DualCsrBase::GetInCsr
virtual CsrBase * GetInCsr()=0
gs::DualCsr< RecordView >::col_name_
const std::vector< std::string > & col_name_
Definition: dual_csr.h:640
gs::MutableCsr< std::string_view >
Definition: mutable_csr.h:575
gs::MutableCsrEdgeIter::next
void next() override
Definition: mutable_csr.h:95
gs::SingleImmutableCsr
Definition: immutable_csr.h:494
gs::DualCsr< std::string_view >::DualCsr
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, uint16_t width, bool oe_mutable, bool ie_mutable)
Definition: dual_csr.h:257
gs::DualCsr::GetOutCsr
CsrBase * GetOutCsr() override
Definition: dual_csr.h:193
gs::Any::AsRecord
const Record & AsRecord() const
Definition: types.h:691
gs::DualCsr< std::string_view >::GetOutCsr
CsrBase * GetOutCsr() override
Definition: dual_csr.h:362
gs::MutableCsr
Definition: mutable_csr.h:183
gs::DualCsr::GetInCsr
CsrBase * GetInCsr() override
Definition: dual_csr.h:192
gs::TypedCsrBase
Definition: csr_base.h:109
gs::DualCsr< RecordView >::GetInCsr
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:568
gs::DualCsr< RecordView >::Dump
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:557
gs::SingleImmutableCsr< std::string_view >
Definition: immutable_csr.h:680
gs::DualCsr< std::string_view >::Close
void Close() override
Definition: dual_csr.h:429
gs::DualCsr< std::string_view >::~DualCsr
~DualCsr()
Definition: dual_csr.h:293
gs::DualCsrBase::~DualCsrBase
virtual ~DualCsrBase()=default
gs::DualCsr< std::string_view >::UpdateEdge
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:380
gs::SingleImmutableCsr< RecordView >
Definition: immutable_csr.h:774
gs::EdgeStrategy::kSingle
@ kSingle
gs::DualCsr::BatchInit
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:145
gs::DualCsrBase::BatchInit
virtual void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree)=0
gs::DualCsr::BatchPutEdge
void BatchPutEdge(vid_t src, vid_t dst, const EDATA_T &data)
Definition: dual_csr.h:239
gs::TypedCsrBase< RecordView >
Definition: csr_base.h:143
gs::DualCsrBase::EdgeNum
size_t EdgeNum() const
Definition: dual_csr.h:81
gs::EmptyCsr
Definition: mutable_csr.h:1170
gs::MutableCsrEdgeIter< RecordView >
Definition: mutable_csr.h:143
gs::TypedCsrBase< std::string_view >
Definition: csr_base.h:118
gs::snapshot_dir
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
std
Definition: loading_config.h:232
gs::SingleMutableCsr
Definition: mutable_csr.h:747
gs::EdgeStrategy
EdgeStrategy
Definition: types.h:24
gs::DualCsr::OpenInMemory
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:168
gs::DualCsr< RecordView >::~DualCsr
~DualCsr()
Definition: dual_csr.h:487
gs::DualCsr< RecordView >::BatchInit
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:501
gs::DualCsr< std::string_view >::Open
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:323
mutable_csr.h
gs::DualCsr::OpenWithHugepages
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:176
gs::DualCsr< std::string_view >::SortByEdgeData
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:376
gs::DualCsr< RecordView >::Close
void Close() override
Definition: dual_csr.h:633
gs::DualCsr
Definition: dual_csr.h:101
gs::SingleMutableCsr< std::string_view >
Definition: mutable_csr.h:956
gs::DualCsrBase::DualCsrBase
DualCsrBase()=default
gs::MutableCsrEdgeIter< RecordView >::set_timestamp
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:159
gs::DualCsrBase
Definition: dual_csr.h:29
gs::DualCsr::DualCsr
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, bool oe_mutable, bool ie_mutable)
Definition: dual_csr.h:103
gs::RecordView
Definition: types.h:292
gs::DualCsr< std::string_view >::BatchInitInMemory
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:312
immutable_csr.h
gs::DualCsr::UpdateEdge
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:210
gs::DualCsr< RecordView >::table_
Table table_
Definition: dual_csr.h:646
gs::DualCsrBase::BatchInitInMemory
virtual void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree)=0
gs::DualCsrBase::Resize
void Resize(vid_t src_vertex_num, vid_t dst_vertex_num)
Definition: dual_csr.h:71
table.h
gs::MutableCsrEdgeIter< std::string_view >
Definition: mutable_csr.h:105
gs::StorageStrategy::kMem
@ kMem
gs::Any::AsStringView
std::string_view AsStringView() const
Definition: types.h:657
gs::DualCsr< std::string_view >::GetInCsr
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:363
gs::DualCsr< RecordView >::UpdateEdge
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:593
gs::DualCsr< std::string_view >::column_idx_
std::atomic< size_t > column_idx_
Definition: dual_csr.h:438
gs::DualCsr< std::string_view >::OpenWithHugepages
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:344
gs::DualCsr< RecordView >::out_csr_
TypedCsrBase< RecordView > * out_csr_
Definition: dual_csr.h:644
gs::DualCsr::GetInCsr
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:194
gs::CsrBase::warmup
virtual void warmup(int thread_num) const =0
gs::DualCsr< std::string_view >::GetInCsr
CsrBase * GetInCsr() override
Definition: dual_csr.h:361
gs::DualCsr< std::string_view >::BatchPutEdge
void BatchPutEdge(vid_t src, vid_t dst, const std::string_view &data)
Definition: dual_csr.h:414