Flex  0.17.9
odps_fragment_loader.h
Go to the documentation of this file.
1 
17 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_ODPS_FRAGMENT_LOADER_H_
18 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_ODPS_FRAGMENT_LOADER_H_
19 
20 #include <arrow/api.h>
21 #include <arrow/csv/api.h>
22 #include <arrow/io/api.h>
23 #include <arrow/util/uri.h>
24 
25 #include <boost/convert.hpp>
26 #include <boost/convert/strtol.hpp>
27 #include <charconv>
28 
29 #include "arrow/util/value_parsing.h"
30 #include "common/configuration.h"
37 #include "flex/third_party/httplib.h"
38 #include "grape/util.h"
39 #include "storage_api.hpp"
40 #include "storage_api_arrow.hpp"
41 
42 using apsara::odps::sdk::AliyunAccount;
43 using apsara::odps::sdk::Configuration;
44 using apsara::odps::sdk::storage_api::ReadRowsReq;
45 using apsara::odps::sdk::storage_api::SessionReq;
46 using apsara::odps::sdk::storage_api::SessionStatus;
47 using apsara::odps::sdk::storage_api::SplitOptions;
48 using apsara::odps::sdk::storage_api::TableBatchScanReq;
49 using apsara::odps::sdk::storage_api::TableBatchScanResp;
50 using apsara::odps::sdk::storage_api::TableBatchWriteReq;
51 using apsara::odps::sdk::storage_api::TableBatchWriteResp;
52 using apsara::odps::sdk::storage_api::TableIdentifier;
53 using apsara::odps::sdk::storage_api::WriteRowsReq;
54 using apsara::odps::sdk::storage_api::arrow_adapter::ArrowClient;
55 using apsara::odps::sdk::storage_api::arrow_adapter::Reader;
56 
57 namespace gs {
58 
60  public:
61  static constexpr const int CONNECTION_TIMEOUT = 5;
62  static constexpr const int READ_WRITE_TIMEOUT = 10;
63  static constexpr const size_t MAX_RETRY = 10;
65 
67 
68  void init();
69 
70  void CreateReadSession(std::string* session_id, int* split_count,
71  const TableIdentifier& table_identifier,
72  const std::vector<std::string>& selected_cols,
73  const std::vector<std::string>& partition_cols,
74  const std::vector<std::string>& selected_partitions);
75 
76  std::shared_ptr<arrow::Table> ReadTable(const std::string& session_id,
77  int split_count,
78  const TableIdentifier& table_id,
79  int thread_num) const;
80 
81  std::shared_ptr<ArrowClient> GetArrowClient() const;
82 
83  private:
84  TableBatchScanResp createReadSession(
85  const TableIdentifier& table_identifier,
86  const std::vector<std::string>& selected_cols,
87  const std::vector<std::string>& partition_cols,
88  const std::vector<std::string>& selected_partitions);
89 
90  TableBatchScanResp getReadSession(std::string session_id,
91  const TableIdentifier& table_identifier);
92 
93  void getReadSessionStatus(const std::string& session_id, int* split_count,
94  const TableIdentifier& table_identifier);
95 
96  void producerRoutine(
97  const std::string& session_id, const TableIdentifier& table_identifier,
98  std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&
99  all_batches_,
100  std::vector<int>&& indices) const;
101 
102  bool readRows(std::string session_id, const TableIdentifier& table_identifier,
103  std::vector<std::shared_ptr<arrow::RecordBatch>>& res_batches,
104  int split_index) const;
105 
106  private:
107  // odps table related
108  std::string access_id_;
109  std::string access_key_;
110  std::string odps_endpoint_;
111  std::string tunnel_endpoint_;
112  std::string output_directory_;
113  std::shared_ptr<ArrowClient> arrow_client_ptr_;
114  size_t MAX_PRODUCER_NUM = 8;
115 };
116 
118  public:
119  ODPSStreamRecordBatchSupplier(label_t label_id, const std::string& file_path,
120  const ODPSReadClient& odps_table_reader,
121  const std::string& session_id, int split_count,
122  TableIdentifier table_identifier, int worker_id,
123  int worker_num);
124 
125  std::shared_ptr<arrow::RecordBatch> GetNextBatch() override;
126 
127  private:
128  std::string file_path_;
130  std::string session_id_;
132 
133  TableIdentifier table_identifier_;
134 
137  ReadRowsReq read_rows_req_;
138  std::shared_ptr<Reader> cur_batch_reader_;
139 };
140 
142  public:
143  ODPSTableRecordBatchSupplier(label_t label_id, const std::string& file_path,
144  const ODPSReadClient& odps_table_reader,
145  const std::string& session_id, int split_count,
146  TableIdentifier table_identifier,
147  int thread_num);
148 
149  std::shared_ptr<arrow::RecordBatch> GetNextBatch() override;
150 
151  private:
152  std::string file_path_;
154  std::string session_id_;
155  TableIdentifier table_identifier_;
156 
157  std::shared_ptr<arrow::Table> table_;
158  std::shared_ptr<arrow::TableBatchReader> reader_;
159 };
160 
161 /*
162  * ODPSFragmentLoader is used to load graph data from ODPS Table.
163  * It fetch the data via ODPS tunnel/halo API.
164  * You need to set the following environment variables:
165  * 1. ODPS_ACCESS_ID
166  * 2. ODPS_ACCESS_KEY
167  * 3. ODPS_ENDPOINT
168  * 4. ODPS_TUNNEL_ENDPOINT(optional)
169  */
171  public:
172  ODPSFragmentLoader(const std::string& work_dir, const Schema& schema,
173  const LoadingConfig& loading_config)
174  : AbstractArrowFragmentLoader(work_dir, schema, loading_config) {}
175 
176  static std::shared_ptr<IFragmentLoader> Make(
177  const std::string& work_dir, const Schema& schema,
178  const LoadingConfig& loading_config);
179 
181 
182  Result<bool> LoadFragment() override;
183 
184  private:
185  void init();
186 
187  void parseLocation(const std::string& odps_table_path,
188  TableIdentifier& table_identifier,
189  std::vector<std::string>& partition_names,
190  std::vector<std::string>& selected_partitions);
191 
192  void loadVertices();
193 
194  void loadEdges();
195 
196  void addVertices(label_t v_label_id, const std::vector<std::string>& v_files);
197 
198  void addEdges(label_t src_label_id, label_t dst_label_id, label_t e_label_id,
199  const std::vector<std::string>& e_files);
200 
201  std::vector<std::string> columnMappingsToSelectedCols(
202  const std::vector<std::tuple<size_t, std::string, std::string>>&
203  column_mappings);
204 
206 
207  static const bool registered_;
208 };
209 
210 } // namespace gs
211 
212 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_ODPS_FRAGMENT_LOADER_H_
Definition: abstract_arrow_fragment_loader.h:339
Definition: abstract_arrow_fragment_loader.h:39
Definition: loading_config.h:89
Definition: odps_fragment_loader.h:170
ODPSFragmentLoader(const std::string &work_dir, const Schema &schema, const LoadingConfig &loading_config)
Definition: odps_fragment_loader.h:172
std::vector< std::string > columnMappingsToSelectedCols(const std::vector< std::tuple< size_t, std::string, std::string >> &column_mappings)
Definition: odps_fragment_loader.cc:611
void parseLocation(const std::string &odps_table_path, TableIdentifier &table_identifier, std::vector< std::string > &partition_names, std::vector< std::string > &selected_partitions)
Definition: odps_fragment_loader.cc:384
ODPSReadClient odps_read_client_
Definition: odps_fragment_loader.h:205
void addEdges(label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::vector< std::string > &e_files)
Definition: odps_fragment_loader.cc:501
static std::shared_ptr< IFragmentLoader > Make(const std::string &work_dir, const Schema &schema, const LoadingConfig &loading_config)
Definition: odps_fragment_loader.cc:355
static const bool registered_
Definition: odps_fragment_loader.h:207
void init()
Definition: odps_fragment_loader.cc:361
void loadEdges()
Definition: odps_fragment_loader.cc:560
Result< bool > LoadFragment() override
Definition: odps_fragment_loader.cc:363
void addVertices(label_t v_label_id, const std::vector< std::string > &v_files)
Definition: odps_fragment_loader.cc:412
~ODPSFragmentLoader()
Definition: odps_fragment_loader.h:180
void loadVertices()
Definition: odps_fragment_loader.cc:452
Definition: odps_fragment_loader.h:59
std::string access_key_
Definition: odps_fragment_loader.h:109
static constexpr const size_t MAX_RETRY
Definition: odps_fragment_loader.h:63
TableBatchScanResp createReadSession(const TableIdentifier &table_identifier, const std::vector< std::string > &selected_cols, const std::vector< std::string > &partition_cols, const std::vector< std::string > &selected_partitions)
Definition: odps_fragment_loader.cc:72
std::string tunnel_endpoint_
Definition: odps_fragment_loader.h:111
size_t MAX_PRODUCER_NUM
Definition: odps_fragment_loader.h:114
std::shared_ptr< ArrowClient > arrow_client_ptr_
Definition: odps_fragment_loader.h:113
ODPSReadClient()
Definition: odps_fragment_loader.cc:30
TableBatchScanResp getReadSession(std::string session_id, const TableIdentifier &table_identifier)
Definition: odps_fragment_loader.cc:101
static constexpr const int READ_WRITE_TIMEOUT
Definition: odps_fragment_loader.h:62
std::string access_id_
Definition: odps_fragment_loader.h:108
std::string output_directory_
Definition: odps_fragment_loader.h:112
void CreateReadSession(std::string *session_id, int *split_count, const TableIdentifier &table_identifier, const std::vector< std::string > &selected_cols, const std::vector< std::string > &partition_cols, const std::vector< std::string > &selected_partitions)
Definition: odps_fragment_loader.cc:112
std::shared_ptr< arrow::Table > ReadTable(const std::string &session_id, int split_count, const TableIdentifier &table_id, int thread_num) const
Definition: odps_fragment_loader.cc:200
std::shared_ptr< ArrowClient > GetArrowClient() const
Definition: odps_fragment_loader.cc:68
~ODPSReadClient()
Definition: odps_fragment_loader.cc:31
std::string odps_endpoint_
Definition: odps_fragment_loader.h:110
void getReadSessionStatus(const std::string &session_id, int *split_count, const TableIdentifier &table_identifier)
Definition: odps_fragment_loader.cc:138
void producerRoutine(const std::string &session_id, const TableIdentifier &table_identifier, std::vector< std::vector< std::shared_ptr< arrow::RecordBatch >>> &all_batches_, std::vector< int > &&indices) const
Definition: odps_fragment_loader.cc:160
bool readRows(std::string session_id, const TableIdentifier &table_identifier, std::vector< std::shared_ptr< arrow::RecordBatch >> &res_batches, int split_index) const
Definition: odps_fragment_loader.cc:177
void init()
Definition: odps_fragment_loader.cc:33
static constexpr const int CONNECTION_TIMEOUT
Definition: odps_fragment_loader.h:61
Definition: odps_fragment_loader.h:117
const ODPSReadClient & odps_read_client_
Definition: odps_fragment_loader.h:129
std::shared_ptr< arrow::RecordBatch > GetNextBatch() override
Definition: odps_fragment_loader.cc:283
ReadRowsReq read_rows_req_
Definition: odps_fragment_loader.h:137
std::string session_id_
Definition: odps_fragment_loader.h:130
int split_count_
Definition: odps_fragment_loader.h:131
TableIdentifier table_identifier_
Definition: odps_fragment_loader.h:133
std::string file_path_
Definition: odps_fragment_loader.h:128
int worker_num_
Definition: odps_fragment_loader.h:136
int32_t cur_split_index_
Definition: odps_fragment_loader.h:135
std::shared_ptr< Reader > cur_batch_reader_
Definition: odps_fragment_loader.h:138
ODPSStreamRecordBatchSupplier(label_t label_id, const std::string &file_path, const ODPSReadClient &odps_table_reader, const std::string &session_id, int split_count, TableIdentifier table_identifier, int worker_id, int worker_num)
Definition: odps_fragment_loader.cc:262
Definition: odps_fragment_loader.h:141
ODPSTableRecordBatchSupplier(label_t label_id, const std::string &file_path, const ODPSReadClient &odps_table_reader, const std::string &session_id, int split_count, TableIdentifier table_identifier, int thread_num)
Definition: odps_fragment_loader.cc:328
std::shared_ptr< arrow::Table > table_
Definition: odps_fragment_loader.h:157
std::shared_ptr< arrow::TableBatchReader > reader_
Definition: odps_fragment_loader.h:158
std::shared_ptr< arrow::RecordBatch > GetNextBatch() override
Definition: odps_fragment_loader.cc:343
std::string file_path_
Definition: odps_fragment_loader.h:152
std::string session_id_
Definition: odps_fragment_loader.h:154
const ODPSReadClient & odps_read_client_
Definition: odps_fragment_loader.h:153
TableIdentifier table_identifier_
Definition: odps_fragment_loader.h:155
Definition: result.h:63
Definition: schema.h:29
Definition: adj_list.h:23
uint8_t label_t
Definition: types.h:32