Flex  0.17.9
pt_indexer.h
Go to the documentation of this file.
1 
16 #ifndef GRAPHSCOPE_GRAPH_PT_INDEXER_H_
17 #define GRAPHSCOPE_GRAPH_PT_INDEXER_H_
18 
19 #ifdef USE_PTHASH
20 #include <thread>
21 
22 #include "grape/util.h"
23 
25 
27 #include "murmurhash.h"
28 #include "pthash.hpp"
29 
30 namespace gs {
31 
32 struct murmurhash2_64 {
33  typedef pthash::hash64 hash_type;
34 
35  // specialization for std::string
36  static inline hash_type hash(std::string const& val, uint64_t seed) {
37  return MurmurHash2_64(val.data(), val.size(), seed);
38  }
39 
40  // specialization for uint64_t, int64_t, uint32_t
41  template <typename EDATA_T>
42  static inline hash_type hash(EDATA_T val, uint64_t seed) {
43  return MurmurHash2_64(reinterpret_cast<char const*>(&val), sizeof(val),
44  seed);
45  }
46 
47  // specialization for std::string
48  static inline hash_type hash(const std::string_view& val, uint64_t seed) {
49  return MurmurHash2_64(val.data(), val.size(), seed);
50  }
51 
52  static inline hash_type hash(const Any& val, uint64_t seed) {
53  if (val.type == PropertyType::kStringView) {
54  return hash(val.AsStringView(), seed);
55  } else if (val.type == PropertyType::kInt64) {
56  return hash<int64_t>(val.AsInt64(), seed);
57  } else if (val.type == PropertyType::kUInt64) {
58  return hash<uint64_t>(val.AsUInt64(), seed);
59  } else if (val.type == PropertyType::kInt32) {
60  return hash<int32_t>(val.AsInt32(), seed);
61  } else if (val.type == PropertyType::kUInt32) {
62  return hash<uint32_t>(val.AsUInt32(), seed);
63  } else {
64  LOG(FATAL) << "Unexpected property type: " << val.type;
65  return hash_type();
66  }
67  }
68 };
69 
70 template <typename KEY_T, typename INDEX_T>
71 class PTIndexerBuilder;
72 
73 template <typename INDEX_T>
74 class PTIndexer {
75  public:
76  PTIndexer() : keys_(nullptr), base_size_(0), concat_keys_(nullptr) {}
77  ~PTIndexer() {
78  if (keys_ != nullptr) {
79  delete keys_;
80  }
81  if (concat_keys_ != nullptr) {
82  delete concat_keys_;
83  }
84  }
85 
86  PTIndexer(PTIndexer&& rhs)
87  : keys_(rhs.keys_),
88  base_map_(rhs.base_map_),
89  base_size_(rhs.base_size_),
90  extra_indexer_(std::move(rhs.extra_indexer_)) {
91  rhs.keys_ = nullptr;
92  rhs.concat_keys_ = nullptr;
93  }
94 
95  void warmup(int) const {}
96  static std::string prefix() { return "pthash"; }
97 
98  void reserve(size_t capacity) {
99  if (capacity > base_size_) {
100  extra_indexer_.reserve(capacity - base_size_);
101  }
102  }
103 
104  size_t size() const { return base_size_ + extra_indexer_.size(); }
105  size_t capacity() const { return base_size_ + extra_indexer_.capacity(); }
106  PropertyType get_type() const { return keys_->type(); }
107 
108  INDEX_T get_index(const Any& key) const {
109  assert(key.type == get_type());
110  size_t index = base_map_(key.AsInt64());
111  if (index < base_size_ && keys_->get(index) == key) {
112  return index;
113  } else {
114  return extra_indexer_.get_index(key) + base_size_;
115  }
116  }
117 
118  bool get_index(const Any& oid, INDEX_T& ret) const {
119  assert(oid.type == get_type());
120  size_t index = base_map_(oid);
121  if (index < base_size_ && keys_->get(index) == oid) {
122  ret = index;
123  return true;
124  } else {
125  if (extra_indexer_.get_index(oid, ret)) {
126  ret += base_size_;
127  return true;
128  }
129  return false;
130  }
131  }
132 
133  INDEX_T insert(const Any& oid) {
134  assert(oid.type == get_type());
135  size_t index = base_map_(oid);
136  if (index < base_size_ && keys_->get(index) == oid) {
137  return index;
138  }
139  return extra_indexer_.insert(oid) + base_size_;
140  }
141 
142  Any get_key(const INDEX_T& index) const {
143  return index < base_size_ ? keys_->get(index)
144  : extra_indexer_.get_key(index - base_size_);
145  }
146 
147  void dump_meta(const std::string& filename) {
148  grape::InArchive arc;
149  arc << get_type() << base_size_;
150  std::string meta_file_path = filename;
151  FILE* fout = fopen(meta_file_path.c_str(), "wb");
152  fwrite(arc.GetBuffer(), arc.GetSize(), 1, fout);
153  fflush(fout);
154  fclose(fout);
155  }
156 
157  void dump(const std::string& name, const std::string& snapshot_dir) {
158  dump_meta(snapshot_dir + "/" + name + ".meta");
159  keys_->resize(base_size_);
160  keys_->dump(snapshot_dir + "/" + name + ".base_map.keys");
161  base_map_.Save(snapshot_dir + "/" + name + ".base_map");
162  extra_indexer_.dump(name + ".extra_indexer", snapshot_dir);
163  }
164 
165  void close() {
166  keys_->close();
167  extra_indexer_.close();
168  }
169 
170  void init(const PropertyType& type) {
171  if (keys_ != nullptr) {
172  delete keys_;
173  }
174  keys_ = nullptr;
175  if (type == PropertyType::kInt64) {
176  keys_ = new TypedColumn<int64_t>(StorageStrategy::kMem);
177  } else if (type == PropertyType::kInt32) {
178  keys_ = new TypedColumn<int32_t>(StorageStrategy::kMem);
179  } else if (type == PropertyType::kUInt64) {
180  keys_ = new TypedColumn<uint64_t>(StorageStrategy::kMem);
181  } else if (type == PropertyType::kUInt32) {
182  keys_ = new TypedColumn<uint32_t>(StorageStrategy::kMem);
183  } else if (type.type_enum == impl::PropertyTypeImpl::kVarChar) {
185  type.additional_type_info.max_length);
186  } else if (type == PropertyType::kStringView) {
189  } else {
190  LOG(FATAL) << "Not support type [" << type << "] as pk type ..";
191  }
192  }
193 
194  void load_meta(const std::string& filename) {
195  std::string meta_file_path = filename;
196  size_t meta_file_size = std::filesystem::file_size(meta_file_path);
197  std::vector<char> buf(meta_file_size);
198  FILE* fin = fopen(meta_file_path.c_str(), "r");
199  CHECK_EQ(fread(buf.data(), sizeof(char), meta_file_size, fin),
200  meta_file_size);
201  grape::OutArchive arc;
202  arc.SetSlice(buf.data(), meta_file_size);
203  PropertyType type;
204  arc >> type >> base_size_;
205  init(type);
206  }
207 
208  void open(const std::string& name, const std::string& snapshot_dir,
209  const std::string& work_dir) {
210  load_meta(snapshot_dir + "/" + name + ".meta");
211  base_map_.Open(snapshot_dir + "/" + name + ".base_map");
212  keys_->open(name + ".base_map.keys", snapshot_dir, work_dir);
213  extra_indexer_.open(name + ".extra_indexer", snapshot_dir, work_dir);
214 
215  extra_indexer_.reserve(base_size_ / 2);
216  }
217 
218  void open_in_memory(const std::string& name) {
219  load_meta(name + ".meta");
220  base_map_.Open(name + ".base_map");
221  keys_->open_in_memory(name + ".base_map.keys");
222  extra_indexer_.open_in_memory(name + ".extra_indexer");
223  extra_indexer_.reserve(base_size_ / 2);
224  }
225 
226  void open_with_hugepages(const std::string& name, bool hugepage_table) {
227  load_meta(name + ".meta");
228  keys_->open_with_hugepages(name + ".keys", true);
229  base_map_.Open(name + ".base_map");
230  extra_indexer_.open_with_hugepages(name, hugepage_table);
231  extra_indexer_.reserve(base_size_ / 2);
232  }
233  const ColumnBase& get_keys() const {
234  if (concat_keys_ != nullptr) {
235  delete concat_keys_;
236  }
237  if (keys_->type() == PropertyType::kInt64) {
238  concat_keys_ = new ConcatColumn<int64_t>(
239  dynamic_cast<const TypedColumn<int64_t>&>(*keys_),
240  dynamic_cast<const TypedColumn<int64_t>&>(extra_indexer_.get_keys()));
241  } else if (keys_->type() == PropertyType::kUInt64) {
242  concat_keys_ = new ConcatColumn<uint64_t>(
243  dynamic_cast<const TypedColumn<uint64_t>&>(*keys_),
244  dynamic_cast<const TypedColumn<uint64_t>&>(
245  extra_indexer_.get_keys()));
246  } else if (keys_->type() == PropertyType::kInt32) {
247  concat_keys_ = new ConcatColumn<int32_t>(
248  dynamic_cast<const TypedColumn<int32_t>&>(*keys_),
249  dynamic_cast<const TypedColumn<int32_t>&>(extra_indexer_.get_keys()));
250  } else if (keys_->type() == PropertyType::kUInt32) {
251  concat_keys_ = new ConcatColumn<uint32_t>(
252  dynamic_cast<const TypedColumn<uint32_t>&>(*keys_),
253  dynamic_cast<const TypedColumn<uint32_t>&>(
254  extra_indexer_.get_keys()));
255  } else {
256  concat_keys_ = new ConcatColumn<std::string_view>(
257  dynamic_cast<const TypedColumn<std::string_view>&>(*keys_),
258  dynamic_cast<const TypedColumn<std::string_view>&>(
259  extra_indexer_.get_keys()));
260  }
261  return *concat_keys_;
262  }
263 
264  private:
265  template <typename _KEY_T, typename _INDEX_T>
266  friend class PTIndexerBuilder;
267 
268  ColumnBase* keys_;
269  SinglePHFView<murmurhash2_64> base_map_;
270  size_t base_size_;
271  LFIndexer<INDEX_T> extra_indexer_;
272  mutable ColumnBase* concat_keys_;
273 };
274 
275 class mem_buffer_saver {
276  public:
277  mem_buffer_saver() = default;
278  ~mem_buffer_saver() = default;
279 
280  template <typename T>
281  void visit(T& val) {
282  if constexpr (std::is_pod<T>::value) {
283  char* ptr = reinterpret_cast<char*>(&val);
284  buf_.insert(buf_.end(), ptr, ptr + sizeof(T));
285  } else {
286  val.visit(*this);
287  }
288  }
289 
290  template <typename T, typename Allocator>
291  void visit(std::vector<T, Allocator>& vec) {
292  if constexpr (std::is_pod<T>::value) {
293  size_t n = vec.size();
294  visit(n);
295  char* ptr = reinterpret_cast<char*>(vec.data());
296  buf_.insert(buf_.end(), ptr, ptr + sizeof(T) * n);
297  } else {
298  size_t n = vec.size();
299  visit(n);
300  for (auto& v : vec)
301  visit(v);
302  }
303  }
304 
305  std::vector<char>& buffer() { return buf_; }
306 
307  private:
308  std::vector<char> buf_;
309 };
310 
311 template <typename KEY_T, typename INDEX_T>
312 class PTIndexerBuilder {
313  typedef pthash::single_phf<murmurhash2_64, pthash::dictionary_dictionary,
314  true>
315  pthash_type;
316 
317  public:
318  PTIndexerBuilder() = default;
319  ~PTIndexerBuilder() = default;
320 
321  void add_vertex(const KEY_T& key) { keys_.push_back(key); }
322 
323  void finish(const std::string& filename, const std::string& work_dir,
324  PTIndexer<INDEX_T>& output) {
325  double t = -grape::GetCurrentTime();
326  pthash::build_configuration config;
327  config.c = 7.0;
328  config.alpha = 0.94;
329  int thread_num = std::thread::hardware_concurrency();
330  if (keys_.size() > 121242388) {
331  config.num_threads = std::min(thread_num, 32);
332  } else if (keys_.size() > 100) {
333  config.num_threads = std::min(thread_num, 16);
334  } else {
335  config.num_threads = 1;
336  }
337  config.minimal_output = true;
338  config.verbose_output = false;
339 
340  pthash_type phf;
341  phf.build_in_internal_memory(keys_.begin(), keys_.size(), config);
342 
343  TypedColumn<KEY_T>* keys_column =
344  new TypedColumn<KEY_T>(StorageStrategy::kMem);
345  keys_column->resize(keys_.size());
346 
347  {
348  std::vector<std::thread> threads;
349  std::atomic<size_t> offset(0);
350  size_t total = keys_.size();
351  const size_t chunk = 4096;
352  for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
353  threads.emplace_back([&]() {
354  while (true) {
355  size_t begin = offset.fetch_add(chunk);
356  if (begin >= total) {
357  break;
358  }
359  size_t end = std::min(begin + chunk, total);
360  while (begin < end) {
361  keys_column->set_value(phf(keys_[begin]), keys_[begin]);
362  ++begin;
363  }
364  }
365  });
366  }
367  for (auto& thrd : threads) {
368  thrd.join();
369  }
370  }
371 
372  if (output.keys_ != NULL) {
373  delete output.keys_;
374  }
375  output.keys_ = keys_column;
376 
377  mem_buffer_saver saver;
378  saver.visit(phf);
379 
380  output.base_size_ = keys_.size();
381  output.base_map_.Init(saver.buffer());
382  output.extra_indexer_.init(output.keys_->type());
383  output.dump(filename, work_dir);
384  output.open_in_memory(work_dir + "/" + filename);
385 
386  // output.extra_indexer_.set_keys(keys_column->slice(output.base_size_));
387  t += grape::GetCurrentTime();
388  LOG(INFO) << "construct pthash with " << config.num_threads
389  << " threads: " << t << "s";
390  }
391 
392  private:
393  std::vector<KEY_T> keys_;
394 };
395 
396 } // namespace gs
397 #endif // USE_PTHASH
398 
399 #endif // GRAPHSCOPE_GRAPH_PT_INDEXER_H_
column.h
gs
Definition: adj_list.h:23
gs::PropertyType::kUInt64
static const PropertyType kUInt64
Definition: types.h:141
gs::PropertyType::STRING_DEFAULT_MAX_LENGTH
static constexpr const uint16_t STRING_DEFAULT_MAX_LENGTH
Definition: types.h:96
gs::PropertyType::kStringView
static const PropertyType kStringView
Definition: types.h:145
gs::PropertyType::kUInt32
static const PropertyType kUInt32
Definition: types.h:138
single_phf_view.h
gs::impl::PropertyTypeImpl::kVarChar
@ kVarChar
gs::PropertyType::kInt64
static const PropertyType kInt64
Definition: types.h:140
gs::snapshot_dir
std::string snapshot_dir(const std::string &work_dir, uint32_t version)
Definition: file_names.h:192
std
Definition: loading_config.h:232
gs::StringColumn
TypedColumn< std::string_view > StringColumn
Definition: column.h:576
gs::PropertyType::kInt32
static const PropertyType kInt32
Definition: types.h:137
gs::StorageStrategy::kMem
@ kMem