Flex  0.17.9
dual_csr.h
Go to the documentation of this file.
1 
16 #ifndef GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
17 #define GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
18 
19 #include <stdio.h>
20 
21 #include <grape/serialization/in_archive.h>
24 #include "flex/utils/allocators.h"
26 
27 namespace gs {
28 
29 class DualCsrBase {
30  public:
31  DualCsrBase() = default;
32  virtual ~DualCsrBase() = default;
33  virtual void BatchInit(const std::string& oe_name, const std::string& ie_name,
34  const std::string& edata_name,
35  const std::string& work_dir,
36  const std::vector<int>& oe_degree,
37  const std::vector<int>& ie_degree) = 0;
38 
39  virtual void BatchInitInMemory(const std::string& edata_name,
40  const std::string& work_dir,
41  const std::vector<int>& oe_degree,
42  const std::vector<int>& ie_degree) = 0;
43 
44  virtual void Open(const std::string& oe_name, const std::string& ie_name,
45  const std::string& edata_name,
46  const std::string& snapshot_dir,
47  const std::string& work_dir) = 0;
48  virtual void OpenInMemory(const std::string& oe_name,
49  const std::string& ie_name,
50  const std::string& edata_name,
51  const std::string& snapshot_dir,
52  size_t src_vertex_cap, size_t dst_vertex_cap) = 0;
53  virtual void OpenWithHugepages(const std::string& oe_name,
54  const std::string& ie_name,
55  const std::string& edata_name,
56  const std::string& snapshot_dir,
57  size_t src_vertex_cap,
58  size_t dst_vertex_cap) = 0;
59  virtual void Dump(const std::string& oe_name, const std::string& ie_name,
60  const std::string& edata_name,
61  const std::string& new_snapshot_dir) = 0;
62 
63  virtual void IngestEdge(vid_t src, vid_t dst, grape::OutArchive& oarc,
64  timestamp_t timestamp, Allocator& alloc) = 0;
65 
66  virtual void SortByEdgeData(timestamp_t ts) = 0;
67 
68  virtual void UpdateEdge(vid_t src, vid_t dst, const Any& oarc,
69  timestamp_t timestamp, Allocator& alloc) = 0;
70 
71  void Resize(vid_t src_vertex_num, vid_t dst_vertex_num) {
72  GetInCsr()->resize(dst_vertex_num);
73  GetOutCsr()->resize(src_vertex_num);
74  }
75 
76  void Warmup(int thread_num) {
77  GetInCsr()->warmup(thread_num);
78  GetOutCsr()->warmup(thread_num);
79  }
80 
81  size_t EdgeNum() const {
82  const CsrBase* oe_csr = GetOutCsr();
83  const CsrBase* ie_csr = GetInCsr();
84  if (oe_csr) {
85  return oe_csr->edge_num();
86  } else if (ie_csr) {
87  return ie_csr->edge_num();
88  } else {
89  return 0;
90  }
91  }
92 
93  virtual CsrBase* GetInCsr() = 0;
94  virtual CsrBase* GetOutCsr() = 0;
95  virtual const CsrBase* GetInCsr() const = 0;
96  virtual const CsrBase* GetOutCsr() const = 0;
97  virtual void Close() = 0;
98 };
99 
100 template <typename EDATA_T>
101 class DualCsr : public DualCsrBase {
102  public:
103  DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, bool oe_mutable,
104  bool ie_mutable)
105  : in_csr_(nullptr), out_csr_(nullptr) {
106  if (ie_strategy == EdgeStrategy::kNone) {
107  in_csr_ = new EmptyCsr<EDATA_T>();
108  } else if (ie_strategy == EdgeStrategy::kMultiple) {
110  } else if (ie_strategy == EdgeStrategy::kSingle) {
111  if (ie_mutable) {
113  } else {
115  }
116  }
117  if (oe_strategy == EdgeStrategy::kNone) {
118  out_csr_ = new EmptyCsr<EDATA_T>();
119  } else if (oe_strategy == EdgeStrategy::kMultiple) {
121  } else if (oe_strategy == EdgeStrategy::kSingle) {
122  if (oe_mutable) {
124  } else {
126  }
127  }
128  }
130  if (in_csr_ != nullptr) {
131  delete in_csr_;
132  }
133  if (out_csr_ != nullptr) {
134  delete out_csr_;
135  }
136  }
137  void BatchInit(const std::string& oe_name, const std::string& ie_name,
138  const std::string& edata_name, const std::string& work_dir,
139  const std::vector<int>& oe_degree,
140  const std::vector<int>& ie_degree) override {
141  in_csr_->batch_init(ie_name, work_dir, ie_degree);
142  out_csr_->batch_init(oe_name, work_dir, oe_degree);
143  }
144 
145  void BatchInitInMemory(const std::string& edata_name,
146  const std::string& work_dir,
147  const std::vector<int>& oe_degree,
148  const std::vector<int>& ie_degree) override {
149  in_csr_->batch_init_in_memory(ie_degree);
150  out_csr_->batch_init_in_memory(oe_degree);
151  }
152 
153  void Open(const std::string& oe_name, const std::string& ie_name,
154  const std::string& edata_name, const std::string& snapshot_dir,
155  const std::string& work_dir) override {
156  in_csr_->open(ie_name, snapshot_dir, work_dir);
157  out_csr_->open(oe_name, snapshot_dir, work_dir);
158  }
159 
160  void OpenInMemory(const std::string& oe_name, const std::string& ie_name,
161  const std::string& edata_name,
162  const std::string& snapshot_dir, size_t src_vertex_cap,
163  size_t dst_vertex_cap) override {
164  in_csr_->open_in_memory(snapshot_dir + "/" + ie_name, dst_vertex_cap);
165  out_csr_->open_in_memory(snapshot_dir + "/" + oe_name, src_vertex_cap);
166  }
167 
168  void OpenWithHugepages(const std::string& oe_name, const std::string& ie_name,
169  const std::string& edata_name,
170  const std::string& snapshot_dir, size_t src_vertex_cap,
171  size_t dst_vertex_cap) override {
172  in_csr_->open_with_hugepages(snapshot_dir + "/" + ie_name, dst_vertex_cap);
173  out_csr_->open_with_hugepages(snapshot_dir + "/" + oe_name, src_vertex_cap);
174  }
175 
176  void Dump(const std::string& oe_name, const std::string& ie_name,
177  const std::string& edata_name,
178  const std::string& new_snapshot_dir) override {
179  in_csr_->dump(ie_name, new_snapshot_dir);
180  out_csr_->dump(oe_name, new_snapshot_dir);
181  Close();
182  }
183 
184  CsrBase* GetInCsr() override { return in_csr_; }
185  CsrBase* GetOutCsr() override { return out_csr_; }
186  const CsrBase* GetInCsr() const override { return in_csr_; }
187  const CsrBase* GetOutCsr() const override { return out_csr_; }
188 
189  void IngestEdge(vid_t src, vid_t dst, grape::OutArchive& oarc, timestamp_t ts,
190  Allocator& alloc) override {
191  EDATA_T data;
192  oarc >> data;
193  in_csr_->put_edge(dst, src, data, ts, alloc);
194  out_csr_->put_edge(src, dst, data, ts, alloc);
195  }
196 
197  void SortByEdgeData(timestamp_t ts) override {
198  in_csr_->batch_sort_by_edge_data(ts);
199  out_csr_->batch_sort_by_edge_data(ts);
200  }
201 
202  void UpdateEdge(vid_t src, vid_t dst, const Any& data, timestamp_t ts,
203  Allocator& alloc) override {
204  auto oe = out_csr_->edge_iter_mut(src);
205  EDATA_T prop;
206  ConvertAny<EDATA_T>::to(data, prop);
207  bool src_flag = false, dst_flag = false;
208  while (oe != nullptr && oe->is_valid()) {
209  if (oe->get_neighbor() == dst) {
210  oe->set_data(prop, ts);
211  src_flag = true;
212  break;
213  }
214  oe->next();
215  }
216  auto ie = in_csr_->edge_iter_mut(dst);
217  while (ie != nullptr && ie->is_valid()) {
218  if (ie->get_neighbor() == src) {
219  dst_flag = true;
220  ie->set_data(prop, ts);
221  break;
222  }
223  ie->next();
224  }
225  if (!(src_flag || dst_flag)) {
226  in_csr_->put_edge(dst, src, prop, ts, alloc);
227  out_csr_->put_edge(src, dst, prop, ts, alloc);
228  }
229  }
230 
231  void BatchPutEdge(vid_t src, vid_t dst, const EDATA_T& data) {
232  in_csr_->batch_put_edge(dst, src, data);
233  out_csr_->batch_put_edge(src, dst, data);
234  }
235 
236  void Close() override {
237  in_csr_->close();
238  out_csr_->close();
239  }
240 
241  private:
244 };
245 
246 template <>
247 class DualCsr<std::string_view> : public DualCsrBase {
248  public:
249  DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, uint16_t width)
250  : in_csr_(nullptr),
251  out_csr_(nullptr),
252  column_(StorageStrategy::kMem, width) {
253  if (ie_strategy == EdgeStrategy::kNone) {
254  in_csr_ = new EmptyCsr<std::string_view>(column_);
255  } else if (ie_strategy == EdgeStrategy::kMultiple) {
256  in_csr_ = new MutableCsr<std::string_view>(column_);
257  } else if (ie_strategy == EdgeStrategy::kSingle) {
259  }
260  if (oe_strategy == EdgeStrategy::kNone) {
261  out_csr_ = new EmptyCsr<std::string_view>(column_);
262  } else if (oe_strategy == EdgeStrategy::kMultiple) {
263  out_csr_ = new MutableCsr<std::string_view>(column_);
264  } else if (oe_strategy == EdgeStrategy::kSingle) {
266  }
267  }
269  if (in_csr_ != nullptr) {
270  delete in_csr_;
271  }
272  if (out_csr_ != nullptr) {
273  delete out_csr_;
274  }
275  }
276  void BatchInit(const std::string& oe_name, const std::string& ie_name,
277  const std::string& edata_name, const std::string& work_dir,
278  const std::vector<int>& oe_degree,
279  const std::vector<int>& ie_degree) override {
280  size_t ie_num = in_csr_->batch_init(ie_name, work_dir, ie_degree);
281  size_t oe_num = out_csr_->batch_init(oe_name, work_dir, oe_degree);
282  column_.open(edata_name, "", work_dir);
283  column_.resize(std::max(ie_num, oe_num));
284  column_idx_.store(0);
285  }
286 
287  void BatchInitInMemory(const std::string& edata_name,
288  const std::string& work_dir,
289  const std::vector<int>& oe_degree,
290  const std::vector<int>& ie_degree) override {
291  size_t ie_num = in_csr_->batch_init_in_memory(ie_degree);
292  size_t oe_num = out_csr_->batch_init_in_memory(oe_degree);
293  column_.open(edata_name, "", work_dir);
294  column_.resize(std::max(ie_num, oe_num));
295  column_idx_.store(0);
296  }
297 
298  void Open(const std::string& oe_name, const std::string& ie_name,
299  const std::string& edata_name, const std::string& snapshot_dir,
300  const std::string& work_dir) override {
301  in_csr_->open(ie_name, snapshot_dir, work_dir);
302  out_csr_->open(oe_name, snapshot_dir, work_dir);
303  column_.open(edata_name, snapshot_dir, work_dir);
304  column_idx_.store(column_.size());
305  column_.resize(std::max(column_.size() + (column_.size() + 4) / 5, 4096ul));
306  }
307 
308  void OpenInMemory(const std::string& oe_name, const std::string& ie_name,
309  const std::string& edata_name,
310  const std::string& snapshot_dir, size_t src_vertex_cap,
311  size_t dst_vertex_cap) override {
312  in_csr_->open_in_memory(snapshot_dir + "/" + ie_name, dst_vertex_cap);
313  out_csr_->open_in_memory(snapshot_dir + "/" + oe_name, src_vertex_cap);
314  column_.open_in_memory(snapshot_dir + "/" + edata_name);
315  column_idx_.store(column_.size());
316  column_.resize(std::max(column_.size() + (column_.size() + 4) / 5, 4096ul));
317  }
318 
319  void OpenWithHugepages(const std::string& oe_name, const std::string& ie_name,
320  const std::string& edata_name,
321  const std::string& snapshot_dir, size_t src_vertex_cap,
322  size_t dst_vertex_cap) override {
323  LOG(FATAL) << "not supported...";
324  }
325 
326  void Dump(const std::string& oe_name, const std::string& ie_name,
327  const std::string& edata_name,
328  const std::string& new_snapshot_dir) override {
329  in_csr_->dump(ie_name, new_snapshot_dir);
330  out_csr_->dump(oe_name, new_snapshot_dir);
331  column_.resize(column_idx_.load());
332  column_.dump(new_snapshot_dir + "/" + edata_name);
333  Close();
334  }
335 
336  CsrBase* GetInCsr() override { return in_csr_; }
337  CsrBase* GetOutCsr() override { return out_csr_; }
338  const CsrBase* GetInCsr() const override { return in_csr_; }
339  const CsrBase* GetOutCsr() const override { return out_csr_; }
340 
341  void IngestEdge(vid_t src, vid_t dst, grape::OutArchive& oarc, timestamp_t ts,
342  Allocator& alloc) override {
343  std::string_view prop;
344  oarc >> prop;
345  size_t row_id = column_idx_.fetch_add(1);
346  column_.set_value(row_id, prop);
347  in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
348  out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
349  }
350 
351  void SortByEdgeData(timestamp_t ts) override {
352  LOG(FATAL) << "Not implemented";
353  }
354 
355  void UpdateEdge(vid_t src, vid_t dst, const Any& data, timestamp_t ts,
356  Allocator& alloc) override {
357  auto oe_ptr = out_csr_->edge_iter_mut(src);
358  std::string_view prop = data.AsStringView();
359  auto oe = dynamic_cast<MutableCsrEdgeIter<std::string_view>*>(oe_ptr.get());
360  size_t index = std::numeric_limits<size_t>::max();
361  while (oe != nullptr && oe->is_valid()) {
362  if (oe->get_neighbor() == dst) {
363  oe->set_timestamp(ts);
364  index = oe->get_index();
365  break;
366  }
367  oe->next();
368  }
369  auto ie_ptr = in_csr_->edge_iter_mut(dst);
370  auto ie = dynamic_cast<MutableCsrEdgeIter<std::string_view>*>(ie_ptr.get());
371  while (ie != nullptr && ie->is_valid()) {
372  if (ie->get_neighbor() == src) {
373  ie->set_timestamp(ts);
374  index = ie->get_index();
375  break;
376  }
377  ie->next();
378  }
379  if (index != std::numeric_limits<size_t>::max()) {
380  column_.set_value(index, prop);
381  } else {
382  size_t row_id = column_idx_.fetch_add(1);
383  column_.set_value(row_id, prop);
384  in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
385  out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
386  }
387  }
388 
389  void BatchPutEdge(vid_t src, vid_t dst, const std::string_view& data) {
390  size_t row_id = column_idx_.fetch_add(1);
391  column_.set_value(row_id, data);
392  in_csr_->batch_put_edge_with_index(dst, src, row_id);
393  out_csr_->batch_put_edge_with_index(src, dst, row_id);
394  }
395 
396  void BatchPutEdge(vid_t src, vid_t dst, const std::string& data) {
397  size_t row_id = column_idx_.fetch_add(1);
398  column_.set_value(row_id, data);
399 
400  in_csr_->batch_put_edge_with_index(dst, src, row_id);
401  out_csr_->batch_put_edge_with_index(src, dst, row_id);
402  }
403 
404  void Close() override {
405  in_csr_->close();
406  out_csr_->close();
407  column_.close();
408  }
409 
410  private:
413  std::atomic<size_t> column_idx_;
415 };
416 
417 template <>
418 class DualCsr<RecordView> : public DualCsrBase {
419  public:
420  DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy,
421  const std::vector<std::string>& col_name,
422  const std::vector<PropertyType>& property_types,
423  const std::vector<StorageStrategy>& storage_strategies)
424  : col_name_(col_name),
425  property_types_(property_types),
426  storage_strategies_(storage_strategies),
427  in_csr_(nullptr),
428  out_csr_(nullptr) {
429  if (ie_strategy == EdgeStrategy::kNone) {
430  in_csr_ = new EmptyCsr<RecordView>(table_);
431  } else if (ie_strategy == EdgeStrategy::kMultiple) {
432  in_csr_ = new MutableCsr<RecordView>(table_);
433  } else {
435  }
436  if (oe_strategy == EdgeStrategy::kNone) {
437  out_csr_ = new EmptyCsr<RecordView>(table_);
438  } else if (oe_strategy == EdgeStrategy::kMultiple) {
439  out_csr_ = new MutableCsr<RecordView>(table_);
440  } else {
442  }
443  }
444 
446  if (in_csr_ != nullptr) {
447  delete in_csr_;
448  }
449  if (out_csr_ != nullptr) {
450  delete out_csr_;
451  }
452  }
453 
454  void InitTable(const std::string& edata_name, const std::string& work_dir) {
455  table_.init(edata_name, work_dir, col_name_, property_types_,
456  storage_strategies_);
457  }
458 
459  void BatchInit(const std::string& oe_name, const std::string& ie_name,
460  const std::string& edata_name, const std::string& work_dir,
461  const std::vector<int>& oe_degree,
462  const std::vector<int>& ie_degree) override {
463  size_t ie_num = in_csr_->batch_init(ie_name, work_dir, ie_degree);
464  size_t oe_num = out_csr_->batch_init(oe_name, work_dir, oe_degree);
465  table_.resize(std::max(ie_num, oe_num));
466 
467  table_idx_.store(std::max(ie_num, oe_num));
468  }
469 
470  void BatchInitInMemory(const std::string& edata_name,
471  const std::string& work_dir,
472  const std::vector<int>& oe_degree,
473  const std::vector<int>& ie_degree) override {
474  size_t ie_num = in_csr_->batch_init_in_memory(ie_degree);
475  size_t oe_num = out_csr_->batch_init_in_memory(oe_degree);
476  table_.resize(std::max(ie_num, oe_num));
477  table_idx_.store(std::max(ie_num, oe_num));
478  }
479 
480  void Open(const std::string& oe_name, const std::string& ie_name,
481  const std::string& edata_name, const std::string& snapshot_dir,
482  const std::string& work_dir) override {
483  in_csr_->open(ie_name, snapshot_dir, work_dir);
484  out_csr_->open(oe_name, snapshot_dir, work_dir);
485 
486  // fix me: storage_strategies_ is not used
487  table_.open(edata_name, snapshot_dir, work_dir, col_name_, property_types_,
488  {});
489  table_idx_.store(table_.row_num());
490  table_.resize(
491  std::max(table_.row_num() + (table_.row_num() + 4) / 5, 4096ul));
492  }
493 
494  void OpenInMemory(const std::string& oe_name, const std::string& ie_name,
495  const std::string& edata_name,
496  const std::string& snapshot_dir, size_t src_vertex_cap,
497  size_t dst_vertex_cap) override {
498  in_csr_->open_in_memory(snapshot_dir + "/" + ie_name, dst_vertex_cap);
499  out_csr_->open_in_memory(snapshot_dir + "/" + oe_name, src_vertex_cap);
500  // fix me: storage_strategies_ is not used
501  table_.open_in_memory(edata_name, snapshot_dir, col_name_, property_types_,
502  {});
503  table_idx_.store(table_.row_num());
504  table_.resize(
505  std::max(table_.row_num() + (table_.row_num() + 4) / 5, 4096ul));
506  }
507 
508  void OpenWithHugepages(const std::string& oe_name, const std::string& ie_name,
509  const std::string& edata_name,
510  const std::string& snapshot_dir, size_t src_vertex_cap,
511  size_t dst_vertex_cap) override {
512  LOG(FATAL) << "not supported...";
513  }
514 
515  void Dump(const std::string& oe_name, const std::string& ie_name,
516  const std::string& edata_name,
517  const std::string& new_snapshot_dir) override {
518  in_csr_->dump(ie_name, new_snapshot_dir);
519  out_csr_->dump(oe_name, new_snapshot_dir);
520  table_.resize(table_idx_.load());
521  table_.dump(edata_name, new_snapshot_dir);
522  }
523 
524  CsrBase* GetInCsr() override { return in_csr_; }
525  CsrBase* GetOutCsr() override { return out_csr_; }
526  const CsrBase* GetInCsr() const override { return in_csr_; }
527  const CsrBase* GetOutCsr() const override { return out_csr_; }
528 
529  void BatchPutEdge(vid_t src, vid_t dst, size_t row_id) {
530  in_csr_->batch_put_edge_with_index(dst, src, row_id);
531  out_csr_->batch_put_edge_with_index(src, dst, row_id);
532  }
533 
534  Table& GetTable() { return table_; }
535 
536  const Table& GetTable() const { return table_; }
537  void IngestEdge(vid_t src, vid_t dst, grape::OutArchive& oarc, timestamp_t ts,
538  Allocator& alloc) override {
539  size_t row_id = table_idx_.fetch_add(1);
540  size_t len;
541  oarc >> len;
542  table_.ingest(row_id, oarc);
543  in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
544  out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
545  }
546 
547  void SortByEdgeData(timestamp_t ts) override {
548  LOG(FATAL) << "Not implemented";
549  }
550 
551  void UpdateEdge(vid_t src, vid_t dst, const Any& data, timestamp_t ts,
552  Allocator& alloc) override {
553  auto oe_ptr = out_csr_->edge_iter_mut(src);
554  grape::InArchive arc;
555  Record r = data.AsRecord();
556  for (size_t i = 0; i < r.len; ++i) {
557  arc << r.props[i];
558  }
559  grape::OutArchive oarc;
560  oarc.SetSlice(arc.GetBuffer(), arc.GetSize());
561  auto oe = dynamic_cast<MutableCsrEdgeIter<RecordView>*>(oe_ptr.get());
562  size_t index = std::numeric_limits<size_t>::max();
563  while (oe != nullptr && oe->is_valid()) {
564  if (oe->get_neighbor() == dst) {
565  oe->set_timestamp(ts);
566  index = oe->get_index();
567  break;
568  }
569  oe->next();
570  }
571  auto ie_ptr = in_csr_->edge_iter_mut(dst);
572  auto ie = dynamic_cast<MutableCsrEdgeIter<RecordView>*>(ie_ptr.get());
573  while (ie != nullptr && ie->is_valid()) {
574  if (ie->get_neighbor() == src) {
575  ie->set_timestamp(ts);
576  index = ie->get_index();
577  break;
578  }
579  ie->next();
580  }
581  if (index != std::numeric_limits<size_t>::max()) {
582  table_.ingest(index, oarc);
583  } else {
584  size_t row_id = table_idx_.fetch_add(1);
585  table_.ingest(row_id, oarc);
586  in_csr_->put_edge_with_index(dst, src, row_id, ts, alloc);
587  out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc);
588  }
589  }
590 
591  void Close() override {
592  in_csr_->close();
593  out_csr_->close();
594  table_.close();
595  }
596 
597  private:
598  const std::vector<std::string>& col_name_;
599  const std::vector<PropertyType>& property_types_;
600  const std::vector<StorageStrategy>& storage_strategies_;
603  std::atomic<size_t> table_idx_;
605 };
606 
607 } // namespace gs
608 
609 #endif // GRAPHSCOPE_FRAGMENT_DUAL_CSR_H_
gs::DualCsrBase::Open
virtual void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir)=0
gs::DualCsrBase::SortByEdgeData
virtual void SortByEdgeData(timestamp_t ts)=0
gs::MutableCsrEdgeIter< std::string_view >::set_timestamp
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:123
gs::SingleMutableCsr< RecordView >
Definition: mutable_csr.h:1059
gs::DualCsr::GetOutCsr
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:187
gs::DualCsr::BatchInitInMemory
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:145
gs::Any
Definition: types.h:383
gs::DualCsr< RecordView >::GetInCsr
CsrBase * GetInCsr() override
Definition: dual_csr.h:524
gs::DualCsr::SortByEdgeData
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:197
gs::DualCsr< RecordView >::IngestEdge
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:537
gs::DualCsr< std::string_view >::out_csr_
TypedCsrBase< std::string_view > * out_csr_
Definition: dual_csr.h:412
gs::DualCsr< RecordView >::GetTable
Table & GetTable()
Definition: dual_csr.h:534
gs::timestamp_t
uint32_t timestamp_t
Definition: types.h:30
gs::CsrBase
Definition: csr_base.h:61
gs::DualCsr< RecordView >::OpenInMemory
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:494
gs::EdgeStrategy::kMultiple
@ kMultiple
gs::DualCsr< std::string_view >::column_
StringColumn column_
Definition: dual_csr.h:414
gs::DualCsr< RecordView >::SortByEdgeData
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:547
gs::DualCsr< RecordView >::OpenWithHugepages
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:508
gs::vid_t
uint32_t vid_t
Definition: types.h:31
gs::DualCsr< RecordView >::out_csr_
TypedMutableCsrBase< RecordView > * out_csr_
Definition: dual_csr.h:602
gs::DualCsrBase::UpdateEdge
virtual void UpdateEdge(vid_t src, vid_t dst, const Any &oarc, timestamp_t timestamp, Allocator &alloc)=0
gs::DualCsr::~DualCsr
~DualCsr()
Definition: dual_csr.h:129
gs::DualCsr< RecordView >::GetOutCsr
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:527
gs::DualCsr< RecordView >::storage_strategies_
const std::vector< StorageStrategy > & storage_strategies_
Definition: dual_csr.h:600
gs::EdgeStrategy::kNone
@ kNone
gs::DualCsrBase::Dump
virtual void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir)=0
gs::DualCsrBase::GetOutCsr
virtual CsrBase * GetOutCsr()=0
gs::DualCsr< RecordView >::DualCsr
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, const std::vector< std::string > &col_name, const std::vector< PropertyType > &property_types, const std::vector< StorageStrategy > &storage_strategies)
Definition: dual_csr.h:420
gs::DualCsr< RecordView >::BatchPutEdge
void BatchPutEdge(vid_t src, vid_t dst, size_t row_id)
Definition: dual_csr.h:529
gs::DualCsr< std::string_view >::GetOutCsr
const CsrBase * GetOutCsr() const override
Definition: dual_csr.h:339
gs::ConvertAny::to
static void to(const Any &value, T &out)
Definition: types.h:799
gs
Definition: adj_list.h:23
gs::DualCsr< RecordView >::GetTable
const Table & GetTable() const
Definition: dual_csr.h:536
gs::DualCsrBase::IngestEdge
virtual void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t timestamp, Allocator &alloc)=0
gs::DualCsrBase::OpenWithHugepages
virtual void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap)=0
gs::DualCsr< std::string_view >::in_csr_
TypedCsrBase< std::string_view > * in_csr_
Definition: dual_csr.h:411
gs::DualCsr::IngestEdge
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:189
gs::DualCsrBase::Close
virtual void Close()=0
gs::StorageStrategy
StorageStrategy
Definition: types.h:58
gs::EmptyCsr< RecordView >
Definition: mutable_csr.h:1293
gs::DualCsr< std::string_view >::BatchPutEdge
void BatchPutEdge(vid_t src, vid_t dst, const std::string &data)
Definition: dual_csr.h:396
gs::DualCsr< std::string_view >::OpenInMemory
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:308
gs::Table
Definition: table.h:30
gs::DualCsr< RecordView >::InitTable
void InitTable(const std::string &edata_name, const std::string &work_dir)
Definition: dual_csr.h:454
gs::DualCsr::in_csr_
TypedCsrBase< EDATA_T > * in_csr_
Definition: dual_csr.h:242
gs::ArenaAllocator
Definition: allocators.h:29
gs::MutableCsr< RecordView >
Definition: mutable_csr.h:661
gs::Record::len
size_t len
Definition: types.h:314
gs::DualCsr< std::string_view >::BatchInit
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:276
gs::DualCsr::Close
void Close() override
Definition: dual_csr.h:236
gs::EmptyCsr< std::string_view >
Definition: mutable_csr.h:1234
gs::Record::props
Any * props
Definition: types.h:315
gs::DualCsrBase::OpenInMemory
virtual void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap)=0
gs::DualCsr< RecordView >::GetOutCsr
CsrBase * GetOutCsr() override
Definition: dual_csr.h:525
gs::DualCsr< RecordView >::Open
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:480
gs::DualCsr::Open
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:153
gs::DualCsrBase::Warmup
void Warmup(int thread_num)
Definition: dual_csr.h:76
gs::CsrBase::resize
virtual void resize(vid_t vnum)=0
gs::DualCsr< std::string_view >::DualCsr
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, uint16_t width)
Definition: dual_csr.h:249
gs::DualCsr::Dump
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:176
gs::CsrBase::edge_num
virtual size_t edge_num() const =0
gs::Record
Definition: types.h:300
gs::DualCsr< RecordView >::BatchInitInMemory
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:470
gs::DualCsr< RecordView >::property_types_
const std::vector< PropertyType > & property_types_
Definition: dual_csr.h:599
gs::TypedColumn< std::string_view >
Definition: column.h:338
allocators.h
gs::DualCsr< std::string_view >::IngestEdge
void IngestEdge(vid_t src, vid_t dst, grape::OutArchive &oarc, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:341
gs::DualCsr< RecordView >::table_idx_
std::atomic< size_t > table_idx_
Definition: dual_csr.h:603
gs::DualCsr< std::string_view >::Dump
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:326
gs::DualCsr::out_csr_
TypedCsrBase< EDATA_T > * out_csr_
Definition: dual_csr.h:243
gs::DualCsrBase::GetInCsr
virtual CsrBase * GetInCsr()=0
gs::DualCsr< RecordView >::col_name_
const std::vector< std::string > & col_name_
Definition: dual_csr.h:598
gs::MutableCsr< std::string_view >
Definition: mutable_csr.h:573
gs::MutableCsrEdgeIter::next
void next() override
Definition: mutable_csr.h:95
gs::SingleImmutableCsr
Definition: immutable_csr.h:326
gs::DualCsr::GetOutCsr
CsrBase * GetOutCsr() override
Definition: dual_csr.h:185
gs::Any::AsRecord
const Record & AsRecord() const
Definition: types.h:675
gs::DualCsr< std::string_view >::GetOutCsr
CsrBase * GetOutCsr() override
Definition: dual_csr.h:337
gs::MutableCsr
Definition: mutable_csr.h:183
gs::DualCsr::GetInCsr
CsrBase * GetInCsr() override
Definition: dual_csr.h:184
gs::TypedCsrBase
Definition: csr_base.h:109
gs::DualCsr< RecordView >::GetInCsr
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:526
gs::DualCsr< RecordView >::Dump
void Dump(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &new_snapshot_dir) override
Definition: dual_csr.h:515
gs::TypedMutableCsrBase< RecordView >
gs::DualCsr< std::string_view >::Close
void Close() override
Definition: dual_csr.h:404
gs::DualCsr< std::string_view >::~DualCsr
~DualCsr()
Definition: dual_csr.h:268
gs::DualCsrBase::~DualCsrBase
virtual ~DualCsrBase()=default
gs::DualCsr< std::string_view >::UpdateEdge
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:355
gs::EdgeStrategy::kSingle
@ kSingle
gs::DualCsr::BatchInit
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:137
gs::DualCsrBase::BatchInit
virtual void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree)=0
gs::DualCsr::BatchPutEdge
void BatchPutEdge(vid_t src, vid_t dst, const EDATA_T &data)
Definition: dual_csr.h:231
gs::DualCsrBase::EdgeNum
size_t EdgeNum() const
Definition: dual_csr.h:81
gs::EmptyCsr
Definition: mutable_csr.h:1168
gs::MutableCsrEdgeIter< RecordView >
Definition: mutable_csr.h:143
gs::TypedCsrBase< std::string_view >
Definition: csr_base.h:118
gs::snapshot_dir
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
std
Definition: loading_config.h:232
gs::SingleMutableCsr
Definition: mutable_csr.h:745
gs::EdgeStrategy
EdgeStrategy
Definition: types.h:24
gs::DualCsr::OpenInMemory
void OpenInMemory(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:160
gs::DualCsr< RecordView >::~DualCsr
~DualCsr()
Definition: dual_csr.h:445
gs::DualCsr< RecordView >::BatchInit
void BatchInit(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:459
gs::DualCsr< std::string_view >::Open
void Open(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, const std::string &work_dir) override
Definition: dual_csr.h:298
mutable_csr.h
gs::DualCsr::OpenWithHugepages
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:168
gs::DualCsr< std::string_view >::SortByEdgeData
void SortByEdgeData(timestamp_t ts) override
Definition: dual_csr.h:351
gs::DualCsr< RecordView >::Close
void Close() override
Definition: dual_csr.h:591
gs::DualCsr
Definition: dual_csr.h:101
gs::SingleMutableCsr< std::string_view >
Definition: mutable_csr.h:954
gs::DualCsrBase::DualCsrBase
DualCsrBase()=default
gs::MutableCsrEdgeIter< RecordView >::set_timestamp
void set_timestamp(timestamp_t ts)
Definition: mutable_csr.h:159
gs::DualCsrBase
Definition: dual_csr.h:29
gs::DualCsr::DualCsr
DualCsr(EdgeStrategy oe_strategy, EdgeStrategy ie_strategy, bool oe_mutable, bool ie_mutable)
Definition: dual_csr.h:103
gs::RecordView
Definition: types.h:285
gs::DualCsr< std::string_view >::BatchInitInMemory
void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree) override
Definition: dual_csr.h:287
immutable_csr.h
gs::DualCsr< RecordView >::in_csr_
TypedMutableCsrBase< RecordView > * in_csr_
Definition: dual_csr.h:601
gs::DualCsr::UpdateEdge
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:202
gs::DualCsr< RecordView >::table_
Table table_
Definition: dual_csr.h:604
gs::DualCsrBase::BatchInitInMemory
virtual void BatchInitInMemory(const std::string &edata_name, const std::string &work_dir, const std::vector< int > &oe_degree, const std::vector< int > &ie_degree)=0
gs::DualCsrBase::Resize
void Resize(vid_t src_vertex_num, vid_t dst_vertex_num)
Definition: dual_csr.h:71
table.h
gs::MutableCsrEdgeIter< std::string_view >
Definition: mutable_csr.h:105
gs::StorageStrategy::kMem
@ kMem
gs::Any::AsStringView
std::string_view AsStringView() const
Definition: types.h:641
gs::DualCsr< std::string_view >::GetInCsr
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:338
gs::DualCsr< RecordView >::UpdateEdge
void UpdateEdge(vid_t src, vid_t dst, const Any &data, timestamp_t ts, Allocator &alloc) override
Definition: dual_csr.h:551
gs::DualCsr< std::string_view >::column_idx_
std::atomic< size_t > column_idx_
Definition: dual_csr.h:413
gs::DualCsr< std::string_view >::OpenWithHugepages
void OpenWithHugepages(const std::string &oe_name, const std::string &ie_name, const std::string &edata_name, const std::string &snapshot_dir, size_t src_vertex_cap, size_t dst_vertex_cap) override
Definition: dual_csr.h:319
gs::DualCsr::GetInCsr
const CsrBase * GetInCsr() const override
Definition: dual_csr.h:186
gs::CsrBase::warmup
virtual void warmup(int thread_num) const =0
gs::DualCsr< std::string_view >::GetInCsr
CsrBase * GetInCsr() override
Definition: dual_csr.h:336
gs::DualCsr< std::string_view >::BatchPutEdge
void BatchPutEdge(vid_t src, vid_t dst, const std::string_view &data)
Definition: dual_csr.h:389