Flex  0.17.9
loading_config.h
Go to the documentation of this file.
1 
16 #ifndef STORAGE_RT_MUTABLE_GRAPH_LOADING_CONFIG_H_
17 #define STORAGE_RT_MUTABLE_GRAPH_LOADING_CONFIG_H_
18 
19 #include <boost/functional/hash.hpp>
20 
21 #include <filesystem>
22 #include <iostream>
23 #include <string>
24 #include <tuple>
25 #include <unordered_map>
26 #include <unordered_set>
27 #include "arrow/api.h"
28 #include "arrow/csv/options.h"
30 #include "flex/utils/arrow_utils.h"
31 #include "flex/utils/yaml_utils.h"
32 
33 #include "boost/algorithm/string.hpp"
34 
35 namespace gs {
36 
37 namespace reader_options {
38 static const int32_t DEFAULT_BLOCK_SIZE = (1 << 20); // 1MB
39 static const bool DEFAULT_BATCH_READER =
40  false; // By default, we read the whole table at once.
41 
42 // KEY_WORDS for configurations
43 static const char* DELIMITER = "delimiter";
44 static const char* HEADER_ROW = "header_row";
45 static const char* INCLUDE_COLUMNS = "include_columns";
46 static const char* COLUMN_TYPES = "column_types";
47 static const char* ESCAPING = "escaping";
48 static const char* ESCAPE_CHAR = "escape_char";
49 static const char* QUOTING = "quoting";
50 static const char* QUOTE_CHAR = "quote_char";
51 static const char* DOUBLE_QUOTE = "double_quote";
52 static const char* BATCH_SIZE_KEY = "batch_size";
53 // whether or not to use record batch reader. If true, the reader will read
54 // data in batches, otherwise, the reader will read data row by row.
55 static const char* BATCH_READER = "batch_reader";
56 static const char* NULL_VALUES = "null_values";
57 
58 static const std::unordered_set<std::string> CSV_META_KEY_WORDS = {
62 
63 } // namespace reader_options
64 
65 namespace loader_options {
66 static constexpr const char* PARALLELISM = "parallelism";
67 static constexpr const char* BUILD_CSR_IN_MEM = "build_csr_in_mem";
68 static constexpr const char* USE_MMAP_VECTOR = "use_mmap_vector";
69 static constexpr const int32_t DEFAULT_PARALLELISM = 1;
70 static constexpr const bool DEFAULT_BUILD_CSR_IN_MEM = false;
71 static constexpr const bool DEFAULT_USE_MMAP_VECTOR = false;
72 } // namespace loader_options
73 
74 class LoadingConfig;
75 
76 namespace config_parsing {
77 Status parse_bulk_load_config_file(const std::string& config_file,
78  const Schema& schema,
79  LoadingConfig& load_config);
80 
81 Status parse_bulk_load_config_yaml(const YAML::Node& yaml_node,
82  const Schema& schema,
83  LoadingConfig& load_config);
84 } // namespace config_parsing
85 
86 enum class BulkLoadMethod { kInit = 0, kOverwrite = 1 };
87 
88 // Provide meta info about bulk loading.
90  public:
92  using edge_triplet_type =
94  schema_label_type>; // src_label_t, dst_label_t, edge_label_t
95 
96  // Check whether loading config file is consistent with schema
98  const Schema& schema, const std::string& yaml_file);
100  const Schema& schema, const YAML::Node& yaml_node);
101 
102  LoadingConfig(const Schema& schema);
103 
104  LoadingConfig(const Schema& schema, const std::string& data_source,
105  const std::string& delimiter, const BulkLoadMethod& method,
106  const std::string& format);
107 
108  // Add source files for vertex label. Each label can have multiple files.
109  Status AddVertexSources(const std::string& label,
110  const std::string& file_path);
111 
112  // Add source files for edge triplet. Each label can have multiple files.
113  // When adding edge source files, src_id and dst_id column also need to be
114  // specified.
115  Status AddEdgeSources(const std::string& src_label,
116  const std::string& dst_label,
117  const std::string& edge_label, size_t src_pri_key_ind,
118  size_t dst_pri_key_ind, const std::string& file_path);
119 
120  void SetScheme(const std::string& data_source);
121  void SetDelimiter(const char& delimiter);
122  void SetMethod(const BulkLoadMethod& method);
123 
124  // getters
125  const std::string& GetScheme() const;
126  const std::string& GetDelimiter() const;
127  const BulkLoadMethod& GetMethod() const;
128  const std::string& GetFormat() const;
129  bool GetHasHeaderRow() const;
130  char GetEscapeChar() const;
131  bool GetIsEscaping() const;
132  char GetQuotingChar() const;
133  bool GetIsQuoting() const;
134  bool GetIsDoubleQuoting() const;
135  const std::vector<std::string>& GetNullValues() const;
136  int32_t GetBatchSize() const;
137  bool GetIsBatchReader() const;
138  std::string GetMetaData(const std::string& key) const;
139  const std::unordered_map<schema_label_type, std::vector<std::string>>&
140  GetVertexLoadingMeta() const;
141  const std::unordered_map<edge_triplet_type, std::vector<std::string>,
142  boost::hash<edge_triplet_type>>&
143  GetEdgeLoadingMeta() const;
144 
145  // Get vertex column mappings. Each element in the vector is a pair of
146  // <column_index, column_name, property_name>.
147  const std::vector<std::tuple<size_t, std::string, std::string>>&
148  GetVertexColumnMappings(label_t label_id) const;
149 
150  // Get edge column mappings. Each element in the vector is a pair of
151  // <column_index,column_name, schema_property_name>.
152  const std::vector<std::tuple<size_t, std::string, std::string>>&
153  GetEdgeColumnMappings(label_t src_label_id, label_t dst_label_id,
154  label_t edge_label_id) const;
155 
156  // Get src_id and dst_id column index for edge label.
157  const std::pair<std::vector<std::pair<std::string, size_t>>,
158  std::vector<std::pair<std::string, size_t>>>&
159  GetEdgeSrcDstCol(label_t src_label_id, label_t dst_label_id,
160  label_t edge_label_id) const;
161 
162  inline void SetParallelism(int32_t parallelism) {
163  parallelism_ = parallelism;
164  }
165  inline void SetBuildCsrInMem(bool build_csr_in_mem) {
166  build_csr_in_mem_ = build_csr_in_mem;
167  }
168  inline void SetUseMmapVector(bool use_mmap_vector) {
169  use_mmap_vector_ = use_mmap_vector;
170  }
171  inline int32_t GetParallelism() const { return parallelism_; }
172  inline bool GetBuildCsrInMem() const { return build_csr_in_mem_; }
173  inline bool GetUseMmapVector() const { return use_mmap_vector_; }
174 
175  private:
176  const Schema& schema_;
177  std::string scheme_; // "file", "hdfs", "oss", "s3"
178  BulkLoadMethod method_; // init, append, overwrite
179  std::string format_; // csv, tsv, json, parquet
180  int32_t parallelism_; // Number of thread should be used in loading
181  bool build_csr_in_mem_; // Whether to build csr in memory
182  bool use_mmap_vector_; // Whether to use mmap vector
183 
184  std::vector<std::string> null_values_;
185 
186  // meta_data, stores all the meta info about loading
187  std::unordered_map<std::string, std::string> metadata_;
188 
189  std::unordered_map<schema_label_type, std::vector<std::string>>
190  vertex_loading_meta_; // <vertex_label_id, std::vector<file_path_>>
191  std::unordered_map<schema_label_type,
192  std::vector<std::tuple<size_t, std::string, std::string>>>
193  vertex_column_mappings_; // match which column in file to which property
194  // in schema. {col_ind, col_name,
195  // schema_prop_name}
196  // col_name can be empty
197 
198  std::unordered_map<edge_triplet_type, std::vector<std::string>,
199  boost::hash<edge_triplet_type>>
200  edge_loading_meta_; // key: <src_label, dst_label, edge_label>
201  // value:
202  // <file_path>
203  // All Edge Files share the same File schema.
204  std::unordered_map<edge_triplet_type,
205  std::vector<std::tuple<size_t, std::string, std::string>>,
206  boost::hash<edge_triplet_type>>
207  edge_column_mappings_; // match which column in file to which property in
208  // schema, {col_ind, col_name, schema_prop_name}
209  // col_name can be empty
210 
211  // key: <src_label, dst_label, edge_label>,
212  // value: <{<src_col_name, src_col_id>,...}, {<dst_col_name,
213  // dst_col_id>,...}>
214  // for csv loader, we just need the column_id, but for odps loader, we also
215  // need the column_name
216  std::unordered_map<edge_triplet_type,
217  std::pair<std::vector<std::pair<std::string, size_t>>,
218  std::vector<std::pair<std::string, size_t>>>,
219  boost::hash<edge_triplet_type>>
221 
223  const std::string& config_file, const Schema& schema,
224  LoadingConfig& load_config);
225 
227  const YAML::Node& root, const Schema& schema, LoadingConfig& load_config);
228 };
229 
230 } // namespace gs
231 
232 namespace std {
233 // BulkLoadMethod << operator
234 inline ostream& operator<<(ostream& os, const gs::BulkLoadMethod& method) {
235  switch (method) {
237  os << "init";
238  break;
240  os << "overwrite";
241  break;
242  default:
243  os << "unknown";
244  break;
245  }
246  return os;
247 }
248 } // namespace std
249 
250 #endif // STORAGE_RT_MUTABLE_GRAPH_LOADING_CONFIG_H_
gs::reader_options::DELIMITER
static const char * DELIMITER
Definition: loading_config.h:43
gs::LoadingConfig::GetEdgeColumnMappings
const std::vector< std::tuple< size_t, std::string, std::string > > & GetEdgeColumnMappings(label_t src_label_id, label_t dst_label_id, label_t edge_label_id) const
Definition: loading_config.cc:906
gs::LoadingConfig::GetIsEscaping
bool GetIsEscaping() const
Definition: loading_config.cc:834
gs::LoadingConfig::GetMethod
const BulkLoadMethod & GetMethod() const
Definition: loading_config.cc:824
gs::reader_options::QUOTE_CHAR
static const char * QUOTE_CHAR
Definition: loading_config.h:50
gs::Result
Definition: result.h:62
gs::LoadingConfig::GetFormat
const std::string & GetFormat() const
Definition: loading_config.cc:822
gs::loader_options::USE_MMAP_VECTOR
static constexpr const char * USE_MMAP_VECTOR
Definition: loading_config.h:68
gs::LoadingConfig::edge_triplet_type
std::tuple< schema_label_type, schema_label_type, schema_label_type > edge_triplet_type
Definition: loading_config.h:94
gs::LoadingConfig::GetVertexLoadingMeta
const std::unordered_map< schema_label_type, std::vector< std::string > > & GetVertexLoadingMeta() const
Definition: loading_config.cc:886
gs::loader_options::DEFAULT_PARALLELISM
static constexpr const int32_t DEFAULT_PARALLELISM
Definition: loading_config.h:69
gs::LoadingConfig::GetIsDoubleQuoting
bool GetIsDoubleQuoting() const
Definition: loading_config.cc:852
arrow_utils.h
gs::LoadingConfig::vertex_loading_meta_
std::unordered_map< schema_label_type, std::vector< std::string > > vertex_loading_meta_
Definition: loading_config.h:190
gs::LoadingConfig::AddEdgeSources
Status AddEdgeSources(const std::string &src_label, const std::string &dst_label, const std::string &edge_label, size_t src_pri_key_ind, size_t dst_pri_key_ind, const std::string &file_path)
Definition: loading_config.cc:785
gs::loader_options::DEFAULT_BUILD_CSR_IN_MEM
static constexpr const bool DEFAULT_BUILD_CSR_IN_MEM
Definition: loading_config.h:70
gs::LoadingConfig::SetDelimiter
void SetDelimiter(const char &delimiter)
Definition: loading_config.cc:803
schema.h
gs::LoadingConfig::metadata_
std::unordered_map< std::string, std::string > metadata_
Definition: loading_config.h:187
gs::LoadingConfig::use_mmap_vector_
bool use_mmap_vector_
Definition: loading_config.h:182
gs::LoadingConfig::format_
std::string format_
Definition: loading_config.h:179
gs::LoadingConfig::edge_src_dst_col_
std::unordered_map< edge_triplet_type, std::pair< std::vector< std::pair< std::string, size_t > >, std::vector< std::pair< std::string, size_t > > >, boost::hash< edge_triplet_type > > edge_src_dst_col_
Definition: loading_config.h:220
gs::LoadingConfig::ParseFromYamlFile
static gs::Result< LoadingConfig > ParseFromYamlFile(const Schema &schema, const std::string &yaml_file)
Definition: loading_config.cc:716
gs::reader_options::BATCH_READER
static const char * BATCH_READER
Definition: loading_config.h:55
gs::reader_options::HEADER_ROW
static const char * HEADER_ROW
Definition: loading_config.h:44
gs::reader_options::BATCH_SIZE_KEY
static const char * BATCH_SIZE_KEY
Definition: loading_config.h:52
gs
Definition: adj_list.h:23
gs::reader_options::INCLUDE_COLUMNS
static const char * INCLUDE_COLUMNS
Definition: loading_config.h:45
gs::LoadingConfig::GetBuildCsrInMem
bool GetBuildCsrInMem() const
Definition: loading_config.h:172
gs::LoadingConfig::SetUseMmapVector
void SetUseMmapVector(bool use_mmap_vector)
Definition: loading_config.h:168
gs::LoadingConfig::GetVertexColumnMappings
const std::vector< std::tuple< size_t, std::string, std::string > > & GetVertexColumnMappings(label_t label_id) const
Definition: loading_config.cc:898
gs::LoadingConfig::GetIsBatchReader
bool GetIsBatchReader() const
Definition: loading_config.cc:869
gs::reader_options::ESCAPING
static const char * ESCAPING
Definition: loading_config.h:47
gs::LoadingConfig::parallelism_
int32_t parallelism_
Definition: loading_config.h:180
gs::reader_options::DEFAULT_BATCH_READER
static const bool DEFAULT_BATCH_READER
Definition: loading_config.h:39
gs::LoadingConfig::schema_label_type
Schema::label_type schema_label_type
Definition: loading_config.h:91
gs::LoadingConfig::SetBuildCsrInMem
void SetBuildCsrInMem(bool build_csr_in_mem)
Definition: loading_config.h:165
gs::LoadingConfig::GetDelimiter
const std::string & GetDelimiter() const
Definition: loading_config.cc:813
gs::LoadingConfig::GetHasHeaderRow
bool GetHasHeaderRow() const
Definition: loading_config.cc:817
gs::reader_options::DEFAULT_BLOCK_SIZE
static const int32_t DEFAULT_BLOCK_SIZE
Definition: loading_config.h:38
gs::LoadingConfig::GetMetaData
std::string GetMetaData(const std::string &key) const
Definition: loading_config.cc:877
gs::LoadingConfig::SetScheme
void SetScheme(const std::string &data_source)
Definition: loading_config.cc:802
gs::LoadingConfig::GetEdgeSrcDstCol
const std::pair< std::vector< std::pair< std::string, size_t > >, std::vector< std::pair< std::string, size_t > > > & GetEdgeSrcDstCol(label_t src_label_id, label_t dst_label_id, label_t edge_label_id) const
Definition: loading_config.cc:917
gs::reader_options::CSV_META_KEY_WORDS
static const std::unordered_set< std::string > CSV_META_KEY_WORDS
Definition: loading_config.h:58
gs::Schema
Definition: schema.h:29
gs::LoadingConfig::GetParallelism
int32_t GetParallelism() const
Definition: loading_config.h:171
yaml_utils.h
gs::BulkLoadMethod::kOverwrite
@ kOverwrite
gs::config_parsing::parse_bulk_load_config_yaml
Status parse_bulk_load_config_yaml(const YAML::Node &root, const Schema &schema, LoadingConfig &load_config)
Definition: loading_config.cc:556
gs::reader_options::NULL_VALUES
static const char * NULL_VALUES
Definition: loading_config.h:56
gs::LoadingConfig::GetNullValues
const std::vector< std::string > & GetNullValues() const
Definition: loading_config.cc:857
gs::LoadingConfig::vertex_column_mappings_
std::unordered_map< schema_label_type, std::vector< std::tuple< size_t, std::string, std::string > > > vertex_column_mappings_
Definition: loading_config.h:193
gs::Status
Definition: result.h:32
gs::LoadingConfig::build_csr_in_mem_
bool build_csr_in_mem_
Definition: loading_config.h:181
gs::LoadingConfig::GetEdgeLoadingMeta
const std::unordered_map< edge_triplet_type, std::vector< std::string >, boost::hash< edge_triplet_type > > & GetEdgeLoadingMeta() const
Definition: loading_config.cc:893
gs::LoadingConfig::GetIsQuoting
bool GetIsQuoting() const
Definition: loading_config.cc:847
std::operator<<
ostream & operator<<(ostream &os, const gs::BulkLoadMethod &method)
Definition: loading_config.h:234
gs::BulkLoadMethod::kInit
@ kInit
gs::reader_options::DOUBLE_QUOTE
static const char * DOUBLE_QUOTE
Definition: loading_config.h:51
gs::LoadingConfig::method_
BulkLoadMethod method_
Definition: loading_config.h:178
gs::LoadingConfig::GetScheme
const std::string & GetScheme() const
Definition: loading_config.cc:811
gs::LoadingConfig::AddVertexSources
Status AddVertexSources(const std::string &label, const std::string &file_path)
Definition: loading_config.cc:778
gs::BulkLoadMethod
BulkLoadMethod
Definition: loading_config.h:86
gs::Schema::label_type
label_t label_type
Definition: schema.h:62
gs::LoadingConfig::LoadingConfig
LoadingConfig(const Schema &schema)
Definition: loading_config.cc:754
gs::reader_options::QUOTING
static const char * QUOTING
Definition: loading_config.h:49
std
Definition: loading_config.h:232
gs::LoadingConfig::ParseFromYamlNode
static gs::Result< LoadingConfig > ParseFromYamlNode(const Schema &schema, const YAML::Node &yaml_node)
Definition: loading_config.cc:733
gs::config_parsing::parse_bulk_load_config_file
Status parse_bulk_load_config_file(const std::string &config_file, const Schema &schema, LoadingConfig &load_config)
Definition: loading_config.cc:531
gs::LoadingConfig::edge_loading_meta_
std::unordered_map< edge_triplet_type, std::vector< std::string >, boost::hash< edge_triplet_type > > edge_loading_meta_
Definition: loading_config.h:200
gs::loader_options::BUILD_CSR_IN_MEM
static constexpr const char * BUILD_CSR_IN_MEM
Definition: loading_config.h:67
gs::LoadingConfig::GetQuotingChar
char GetQuotingChar() const
Definition: loading_config.cc:839
gs::LoadingConfig::SetMethod
void SetMethod(const BulkLoadMethod &method)
Definition: loading_config.cc:806
gs::loader_options::PARALLELISM
static constexpr const char * PARALLELISM
Definition: loading_config.h:66
gs::reader_options::ESCAPE_CHAR
static const char * ESCAPE_CHAR
Definition: loading_config.h:48
gs::reader_options::COLUMN_TYPES
static const char * COLUMN_TYPES
Definition: loading_config.h:46
gs::LoadingConfig::GetUseMmapVector
bool GetUseMmapVector() const
Definition: loading_config.h:173
gs::LoadingConfig::GetEscapeChar
char GetEscapeChar() const
Definition: loading_config.cc:826
gs::LoadingConfig
Definition: loading_config.h:89
gs::LoadingConfig::null_values_
std::vector< std::string > null_values_
Definition: loading_config.h:184
gs::label_t
uint8_t label_t
Definition: types.h:32
gs::LoadingConfig::GetBatchSize
int32_t GetBatchSize() const
Definition: loading_config.cc:861
gs::LoadingConfig::scheme_
std::string scheme_
Definition: loading_config.h:177
gs::loader_options::DEFAULT_USE_MMAP_VECTOR
static constexpr const bool DEFAULT_USE_MMAP_VECTOR
Definition: loading_config.h:71
gs::LoadingConfig::SetParallelism
void SetParallelism(int32_t parallelism)
Definition: loading_config.h:162
gs::LoadingConfig::edge_column_mappings_
std::unordered_map< edge_triplet_type, std::vector< std::tuple< size_t, std::string, std::string > >, boost::hash< edge_triplet_type > > edge_column_mappings_
Definition: loading_config.h:207
gs::LoadingConfig::schema_
const Schema & schema_
Definition: loading_config.h:176