16 #ifndef GRAPHSCOPE_GRAPH_PT_INDEXER_H_
17 #define GRAPHSCOPE_GRAPH_PT_INDEXER_H_
22 #include "grape/util.h"
27 #include "murmurhash.h"
32 struct murmurhash2_64 {
33 typedef pthash::hash64 hash_type;
36 static inline hash_type hash(std::string
const& val, uint64_t seed) {
37 return MurmurHash2_64(val.data(), val.size(), seed);
41 template <
typename EDATA_T>
42 static inline hash_type hash(EDATA_T val, uint64_t seed) {
43 return MurmurHash2_64(
reinterpret_cast<char const*
>(&val),
sizeof(val),
48 static inline hash_type hash(
const std::string_view& val, uint64_t seed) {
49 return MurmurHash2_64(val.data(), val.size(), seed);
52 static inline hash_type hash(
const Any& val, uint64_t seed) {
54 return hash(val.AsStringView(), seed);
56 return hash<int64_t>(val.AsInt64(), seed);
58 return hash<uint64_t>(val.AsUInt64(), seed);
60 return hash<int32_t>(val.AsInt32(), seed);
62 return hash<uint32_t>(val.AsUInt32(), seed);
64 LOG(FATAL) <<
"Unexpected property type: " << val.type;
70 template <
typename KEY_T,
typename INDEX_T>
71 class PTIndexerBuilder;
73 template <
typename INDEX_T>
76 PTIndexer() : keys_(nullptr), base_size_(0), concat_keys_(nullptr) {}
78 if (keys_ !=
nullptr) {
81 if (concat_keys_ !=
nullptr) {
86 PTIndexer(PTIndexer&& rhs)
88 base_map_(rhs.base_map_),
89 base_size_(rhs.base_size_),
90 extra_indexer_(
std::move(rhs.extra_indexer_)) {
92 rhs.concat_keys_ =
nullptr;
95 void warmup(
int)
const {}
96 static std::string prefix() {
return "pthash"; }
98 void reserve(
size_t capacity) {
99 if (capacity > base_size_) {
100 extra_indexer_.reserve(capacity - base_size_);
104 size_t size()
const {
return base_size_ + extra_indexer_.size(); }
105 size_t capacity()
const {
return base_size_ + extra_indexer_.capacity(); }
106 PropertyType get_type()
const {
return keys_->type(); }
108 INDEX_T get_index(
const Any& key)
const {
109 assert(key.type == get_type());
110 size_t index = base_map_(key.AsInt64());
111 if (index < base_size_ && keys_->get(index) == key) {
114 return extra_indexer_.get_index(key) + base_size_;
118 bool get_index(
const Any& oid, INDEX_T& ret)
const {
119 assert(oid.type == get_type());
120 size_t index = base_map_(oid);
121 if (index < base_size_ && keys_->get(index) == oid) {
125 if (extra_indexer_.get_index(oid, ret)) {
133 INDEX_T insert(
const Any& oid) {
134 assert(oid.type == get_type());
135 size_t index = base_map_(oid);
136 if (index < base_size_ && keys_->get(index) == oid) {
139 return extra_indexer_.insert(oid) + base_size_;
142 Any get_key(
const INDEX_T& index)
const {
143 return index < base_size_ ? keys_->get(index)
144 : extra_indexer_.get_key(index - base_size_);
147 void dump_meta(
const std::string& filename) {
148 grape::InArchive arc;
149 arc << get_type() << base_size_;
150 std::string meta_file_path = filename;
151 FILE* fout = fopen(meta_file_path.c_str(),
"wb");
152 fwrite(arc.GetBuffer(), arc.GetSize(), 1, fout);
157 void dump(
const std::string& name,
const std::string&
snapshot_dir) {
159 keys_->resize(base_size_);
160 keys_->dump(
snapshot_dir +
"/" + name +
".base_map.keys");
161 base_map_.Save(
snapshot_dir +
"/" + name +
".base_map");
162 extra_indexer_.dump(name +
".extra_indexer",
snapshot_dir);
167 extra_indexer_.close();
170 void init(
const PropertyType& type) {
171 if (keys_ !=
nullptr) {
185 type.additional_type_info.max_length);
190 LOG(FATAL) <<
"Not support type [" << type <<
"] as pk type ..";
194 void load_meta(
const std::string& filename) {
195 std::string meta_file_path = filename;
196 size_t meta_file_size = std::filesystem::file_size(meta_file_path);
197 std::vector<char> buf(meta_file_size);
198 FILE* fin = fopen(meta_file_path.c_str(),
"r");
199 CHECK_EQ(fread(buf.data(),
sizeof(
char), meta_file_size, fin),
201 grape::OutArchive arc;
202 arc.SetSlice(buf.data(), meta_file_size);
204 arc >> type >> base_size_;
208 void open(
const std::string& name,
const std::string&
snapshot_dir,
209 const std::string& work_dir) {
211 base_map_.Open(
snapshot_dir +
"/" + name +
".base_map");
212 keys_->open(name +
".base_map.keys",
snapshot_dir, work_dir);
213 extra_indexer_.open(name +
".extra_indexer",
snapshot_dir, work_dir);
215 extra_indexer_.reserve(base_size_ / 2);
218 void open_in_memory(
const std::string& name) {
219 load_meta(name +
".meta");
220 base_map_.Open(name +
".base_map");
221 keys_->open_in_memory(name +
".base_map.keys");
222 extra_indexer_.open_in_memory(name +
".extra_indexer");
223 extra_indexer_.reserve(base_size_ / 2);
226 void open_with_hugepages(
const std::string& name,
bool hugepage_table) {
227 load_meta(name +
".meta");
228 keys_->open_with_hugepages(name +
".keys",
true);
229 base_map_.Open(name +
".base_map");
230 extra_indexer_.open_with_hugepages(name, hugepage_table);
231 extra_indexer_.reserve(base_size_ / 2);
233 const ColumnBase& get_keys()
const {
234 if (concat_keys_ !=
nullptr) {
238 concat_keys_ =
new ConcatColumn<int64_t>(
239 dynamic_cast<const TypedColumn<int64_t>&
>(*keys_),
240 dynamic_cast<const TypedColumn<int64_t>&
>(extra_indexer_.get_keys()));
242 concat_keys_ =
new ConcatColumn<uint64_t>(
243 dynamic_cast<const TypedColumn<uint64_t>&
>(*keys_),
244 dynamic_cast<const TypedColumn<uint64_t>&
>(
245 extra_indexer_.get_keys()));
247 concat_keys_ =
new ConcatColumn<int32_t>(
248 dynamic_cast<const TypedColumn<int32_t>&
>(*keys_),
249 dynamic_cast<const TypedColumn<int32_t>&
>(extra_indexer_.get_keys()));
251 concat_keys_ =
new ConcatColumn<uint32_t>(
252 dynamic_cast<const TypedColumn<uint32_t>&
>(*keys_),
253 dynamic_cast<const TypedColumn<uint32_t>&
>(
254 extra_indexer_.get_keys()));
256 concat_keys_ =
new ConcatColumn<std::string_view>(
257 dynamic_cast<const TypedColumn<std::string_view>&
>(*keys_),
258 dynamic_cast<const TypedColumn<std::string_view>&
>(
259 extra_indexer_.get_keys()));
261 return *concat_keys_;
265 template <
typename _KEY_T,
typename _INDEX_T>
266 friend class PTIndexerBuilder;
269 SinglePHFView<murmurhash2_64> base_map_;
271 LFIndexer<INDEX_T> extra_indexer_;
272 mutable ColumnBase* concat_keys_;
275 class mem_buffer_saver {
277 mem_buffer_saver() =
default;
278 ~mem_buffer_saver() =
default;
280 template <
typename T>
282 if constexpr (std::is_pod<T>::value) {
283 char* ptr =
reinterpret_cast<char*
>(&val);
284 buf_.insert(buf_.end(), ptr, ptr +
sizeof(T));
290 template <
typename T,
typename Allocator>
291 void visit(std::vector<T, Allocator>& vec) {
292 if constexpr (std::is_pod<T>::value) {
293 size_t n = vec.size();
295 char* ptr =
reinterpret_cast<char*
>(vec.data());
296 buf_.insert(buf_.end(), ptr, ptr +
sizeof(T) * n);
298 size_t n = vec.size();
305 std::vector<char>& buffer() {
return buf_; }
308 std::vector<char> buf_;
311 template <
typename KEY_T,
typename INDEX_T>
312 class PTIndexerBuilder {
313 typedef pthash::single_phf<murmurhash2_64, pthash::dictionary_dictionary,
318 PTIndexerBuilder() =
default;
319 ~PTIndexerBuilder() =
default;
321 void add_vertex(
const KEY_T& key) { keys_.push_back(key); }
323 void finish(
const std::string& filename,
const std::string& work_dir,
324 PTIndexer<INDEX_T>& output) {
325 double t = -grape::GetCurrentTime();
326 pthash::build_configuration config;
329 int thread_num = std::thread::hardware_concurrency();
330 if (keys_.size() > 121242388) {
331 config.num_threads = std::min(thread_num, 32);
332 }
else if (keys_.size() > 100) {
333 config.num_threads = std::min(thread_num, 16);
335 config.num_threads = 1;
337 config.minimal_output =
true;
338 config.verbose_output =
false;
341 phf.build_in_internal_memory(keys_.begin(), keys_.size(), config);
343 TypedColumn<KEY_T>* keys_column =
345 keys_column->resize(keys_.size());
348 std::vector<std::thread> threads;
349 std::atomic<size_t> offset(0);
350 size_t total = keys_.size();
351 const size_t chunk = 4096;
352 for (
unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
353 threads.emplace_back([&]() {
355 size_t begin = offset.fetch_add(chunk);
356 if (begin >= total) {
359 size_t end = std::min(begin + chunk, total);
360 while (begin < end) {
361 keys_column->set_value(phf(keys_[begin]), keys_[begin]);
367 for (
auto& thrd : threads) {
372 if (output.keys_ != NULL) {
375 output.keys_ = keys_column;
377 mem_buffer_saver saver;
380 output.base_size_ = keys_.size();
381 output.base_map_.Init(saver.buffer());
382 output.extra_indexer_.init(output.keys_->type());
383 output.dump(filename, work_dir);
384 output.open_in_memory(work_dir +
"/" + filename);
387 t += grape::GetCurrentTime();
388 LOG(INFO) <<
"construct pthash with " << config.num_threads
389 <<
" threads: " << t <<
"s";
393 std::vector<KEY_T> keys_;
399 #endif // GRAPHSCOPE_GRAPH_PT_INDEXER_H_