Flex  0.17.9
odps_fragment_loader.h
Go to the documentation of this file.
1 
17 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_ODPS_FRAGMENT_LOADER_H_
18 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_ODPS_FRAGMENT_LOADER_H_
19 
20 #include <arrow/api.h>
21 #include <arrow/csv/api.h>
22 #include <arrow/io/api.h>
23 #include <arrow/util/uri.h>
24 
25 #include <boost/convert.hpp>
26 #include <boost/convert/strtol.hpp>
27 #include <charconv>
28 
29 #include "arrow/util/value_parsing.h"
30 #include "common/configuration.h"
37 #include "flex/third_party/httplib.h"
38 #include "grape/util.h"
39 #include "storage_api.hpp"
40 #include "storage_api_arrow.hpp"
41 
42 using apsara::odps::sdk::AliyunAccount;
43 using apsara::odps::sdk::Configuration;
44 using apsara::odps::sdk::storage_api::ReadRowsReq;
45 using apsara::odps::sdk::storage_api::SessionReq;
46 using apsara::odps::sdk::storage_api::SessionStatus;
47 using apsara::odps::sdk::storage_api::SplitOptions;
48 using apsara::odps::sdk::storage_api::TableBatchScanReq;
49 using apsara::odps::sdk::storage_api::TableBatchScanResp;
50 using apsara::odps::sdk::storage_api::TableBatchWriteReq;
51 using apsara::odps::sdk::storage_api::TableBatchWriteResp;
52 using apsara::odps::sdk::storage_api::TableIdentifier;
53 using apsara::odps::sdk::storage_api::WriteRowsReq;
54 using apsara::odps::sdk::storage_api::arrow_adapter::ArrowClient;
55 using apsara::odps::sdk::storage_api::arrow_adapter::Reader;
56 
57 namespace gs {
58 
60  public:
61  static constexpr const int CONNECTION_TIMEOUT = 5;
62  static constexpr const int READ_WRITE_TIMEOUT = 10;
63  static constexpr const size_t MAX_RETRY = 10;
65 
67 
68  void init();
69 
70  void CreateReadSession(std::string* session_id, int* split_count,
71  const TableIdentifier& table_identifier,
72  const std::vector<std::string>& selected_cols,
73  const std::vector<std::string>& partition_cols,
74  const std::vector<std::string>& selected_partitions);
75 
76  std::shared_ptr<arrow::Table> ReadTable(const std::string& session_id,
77  int split_count,
78  const TableIdentifier& table_id,
79  int thread_num) const;
80 
81  std::shared_ptr<ArrowClient> GetArrowClient() const;
82 
83  private:
84  TableBatchScanResp createReadSession(
85  const TableIdentifier& table_identifier,
86  const std::vector<std::string>& selected_cols,
87  const std::vector<std::string>& partition_cols,
88  const std::vector<std::string>& selected_partitions);
89 
90  TableBatchScanResp getReadSession(std::string session_id,
91  const TableIdentifier& table_identifier);
92 
93  void getReadSessionStatus(const std::string& session_id, int* split_count,
94  const TableIdentifier& table_identifier);
95 
96  void producerRoutine(
97  const std::string& session_id, const TableIdentifier& table_identifier,
98  std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&
99  all_batches_,
100  std::vector<int>&& indices) const;
101 
102  bool readRows(std::string session_id, const TableIdentifier& table_identifier,
103  std::vector<std::shared_ptr<arrow::RecordBatch>>& res_batches,
104  int split_index) const;
105 
106  private:
107  // odps table related
108  std::string access_id_;
109  std::string access_key_;
110  std::string odps_endpoint_;
111  std::string tunnel_endpoint_;
112  std::string output_directory_;
113  std::shared_ptr<ArrowClient> arrow_client_ptr_;
114  size_t MAX_PRODUCER_NUM = 8;
115 };
116 
118  public:
119  ODPSStreamRecordBatchSupplier(label_t label_id, const std::string& file_path,
120  const ODPSReadClient& odps_table_reader,
121  const std::string& session_id, int split_count,
122  TableIdentifier table_identifier, int worker_id,
123  int worker_num);
124 
125  std::shared_ptr<arrow::RecordBatch> GetNextBatch() override;
126 
127  private:
128  std::string file_path_;
130  std::string session_id_;
132 
133  TableIdentifier table_identifier_;
134 
137  ReadRowsReq read_rows_req_;
138  std::shared_ptr<Reader> cur_batch_reader_;
139 };
140 
142  public:
143  ODPSTableRecordBatchSupplier(label_t label_id, const std::string& file_path,
144  const ODPSReadClient& odps_table_reader,
145  const std::string& session_id, int split_count,
146  TableIdentifier table_identifier,
147  int thread_num);
148 
149  std::shared_ptr<arrow::RecordBatch> GetNextBatch() override;
150 
151  private:
152  std::string file_path_;
154  std::string session_id_;
155  TableIdentifier table_identifier_;
156 
157  std::shared_ptr<arrow::Table> table_;
158  std::shared_ptr<arrow::TableBatchReader> reader_;
159 };
160 
161 /*
162  * ODPSFragmentLoader is used to load graph data from ODPS Table.
163  * It fetch the data via ODPS tunnel/halo API.
164  * You need to set the following environment variables:
165  * 1. ODPS_ACCESS_ID
166  * 2. ODPS_ACCESS_KEY
167  * 3. ODPS_ENDPOINT
168  * 4. ODPS_TUNNEL_ENDPOINT(optional)
169  */
171  public:
172  ODPSFragmentLoader(const std::string& work_dir, const Schema& schema,
173  const LoadingConfig& loading_config)
174  : AbstractArrowFragmentLoader(work_dir, schema, loading_config) {}
175 
176  static std::shared_ptr<IFragmentLoader> Make(
177  const std::string& work_dir, const Schema& schema,
178  const LoadingConfig& loading_config);
179 
181 
182  Result<bool> LoadFragment() override;
183 
184  private:
185  void init();
186 
187  void parseLocation(const std::string& odps_table_path,
188  TableIdentifier& table_identifier,
189  std::vector<std::string>& partition_names,
190  std::vector<std::string>& selected_partitions);
191 
192  void loadVertices();
193 
194  void loadEdges();
195 
196  void addVertices(label_t v_label_id, const std::vector<std::string>& v_files);
197 
198  void addEdges(label_t src_label_id, label_t dst_label_id, label_t e_label_id,
199  const std::vector<std::string>& e_files);
200 
201  std::vector<std::string> columnMappingsToSelectedCols(
202  const std::vector<std::tuple<size_t, std::string, std::string>>&
203  column_mappings);
204 
206 
207  static const bool registered_;
208 };
209 
210 } // namespace gs
211 
212 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_ODPS_FRAGMENT_LOADER_H_
gs::ODPSReadClient::MAX_RETRY
static constexpr const size_t MAX_RETRY
Definition: odps_fragment_loader.h:63
gs::Result
Definition: result.h:62
gs::ODPSFragmentLoader::~ODPSFragmentLoader
~ODPSFragmentLoader()
Definition: odps_fragment_loader.h:180
gs::ODPSFragmentLoader
Definition: odps_fragment_loader.h:170
gs::ODPSReadClient::~ODPSReadClient
~ODPSReadClient()
Definition: odps_fragment_loader.cc:31
gs::AbstractArrowFragmentLoader
Definition: abstract_arrow_fragment_loader.h:338
gs::ODPSFragmentLoader::columnMappingsToSelectedCols
std::vector< std::string > columnMappingsToSelectedCols(const std::vector< std::tuple< size_t, std::string, std::string >> &column_mappings)
Definition: odps_fragment_loader.cc:611
gs::ODPSStreamRecordBatchSupplier::odps_read_client_
const ODPSReadClient & odps_read_client_
Definition: odps_fragment_loader.h:129
gs::ODPSTableRecordBatchSupplier
Definition: odps_fragment_loader.h:141
i_fragment_loader.h
gs::ODPSTableRecordBatchSupplier::table_identifier_
TableIdentifier table_identifier_
Definition: odps_fragment_loader.h:155
gs::ODPSTableRecordBatchSupplier::reader_
std::shared_ptr< arrow::TableBatchReader > reader_
Definition: odps_fragment_loader.h:158
gs::IRecordBatchSupplier
Definition: abstract_arrow_fragment_loader.h:39
gs::ODPSFragmentLoader::addEdges
void addEdges(label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::vector< std::string > &e_files)
Definition: odps_fragment_loader.cc:501
gs::ODPSReadClient::ODPSReadClient
ODPSReadClient()
Definition: odps_fragment_loader.cc:30
gs::ODPSFragmentLoader::loadEdges
void loadEdges()
Definition: odps_fragment_loader.cc:560
gs::ODPSTableRecordBatchSupplier::odps_read_client_
const ODPSReadClient & odps_read_client_
Definition: odps_fragment_loader.h:153
gs::ODPSReadClient::CreateReadSession
void CreateReadSession(std::string *session_id, int *split_count, const TableIdentifier &table_identifier, const std::vector< std::string > &selected_cols, const std::vector< std::string > &partition_cols, const std::vector< std::string > &selected_partitions)
Definition: odps_fragment_loader.cc:112
gs::ODPSReadClient::READ_WRITE_TIMEOUT
static constexpr const int READ_WRITE_TIMEOUT
Definition: odps_fragment_loader.h:62
gs::ODPSFragmentLoader::addVertices
void addVertices(label_t v_label_id, const std::vector< std::string > &v_files)
Definition: odps_fragment_loader.cc:412
gs
Definition: adj_list.h:23
gs::ODPSFragmentLoader::Make
static std::shared_ptr< IFragmentLoader > Make(const std::string &work_dir, const Schema &schema, const LoadingConfig &loading_config)
Definition: odps_fragment_loader.cc:355
gs::ODPSStreamRecordBatchSupplier
Definition: odps_fragment_loader.h:117
gs::ODPSReadClient::access_id_
std::string access_id_
Definition: odps_fragment_loader.h:108
gs::ODPSReadClient::GetArrowClient
std::shared_ptr< ArrowClient > GetArrowClient() const
Definition: odps_fragment_loader.cc:68
mutable_property_fragment.h
gs::ODPSTableRecordBatchSupplier::GetNextBatch
std::shared_ptr< arrow::RecordBatch > GetNextBatch() override
Definition: odps_fragment_loader.cc:343
gs::ODPSReadClient::producerRoutine
void producerRoutine(const std::string &session_id, const TableIdentifier &table_identifier, std::vector< std::vector< std::shared_ptr< arrow::RecordBatch >>> &all_batches_, std::vector< int > &&indices) const
Definition: odps_fragment_loader.cc:160
gs::ODPSStreamRecordBatchSupplier::read_rows_req_
ReadRowsReq read_rows_req_
Definition: odps_fragment_loader.h:137
gs::ODPSTableRecordBatchSupplier::table_
std::shared_ptr< arrow::Table > table_
Definition: odps_fragment_loader.h:157
gs::ODPSTableRecordBatchSupplier::file_path_
std::string file_path_
Definition: odps_fragment_loader.h:152
gs::ODPSReadClient::arrow_client_ptr_
std::shared_ptr< ArrowClient > arrow_client_ptr_
Definition: odps_fragment_loader.h:113
gs::ODPSReadClient
Definition: odps_fragment_loader.h:59
gs::ODPSStreamRecordBatchSupplier::worker_num_
int worker_num_
Definition: odps_fragment_loader.h:136
gs::ODPSStreamRecordBatchSupplier::GetNextBatch
std::shared_ptr< arrow::RecordBatch > GetNextBatch() override
Definition: odps_fragment_loader.cc:283
abstract_arrow_fragment_loader.h
gs::Schema
Definition: schema.h:29
gs::ODPSStreamRecordBatchSupplier::ODPSStreamRecordBatchSupplier
ODPSStreamRecordBatchSupplier(label_t label_id, const std::string &file_path, const ODPSReadClient &odps_table_reader, const std::string &session_id, int split_count, TableIdentifier table_identifier, int worker_id, int worker_num)
Definition: odps_fragment_loader.cc:262
loading_config.h
gs::ODPSFragmentLoader::registered_
static const bool registered_
Definition: odps_fragment_loader.h:207
basic_fragment_loader.h
gs::ODPSFragmentLoader::odps_read_client_
ODPSReadClient odps_read_client_
Definition: odps_fragment_loader.h:205
loader_factory.h
gs::ODPSStreamRecordBatchSupplier::file_path_
std::string file_path_
Definition: odps_fragment_loader.h:128
gs::ODPSReadClient::access_key_
std::string access_key_
Definition: odps_fragment_loader.h:109
gs::ODPSStreamRecordBatchSupplier::session_id_
std::string session_id_
Definition: odps_fragment_loader.h:130
gs::ODPSFragmentLoader::parseLocation
void parseLocation(const std::string &odps_table_path, TableIdentifier &table_identifier, std::vector< std::string > &partition_names, std::vector< std::string > &selected_partitions)
Definition: odps_fragment_loader.cc:384
gs::ODPSReadClient::CONNECTION_TIMEOUT
static constexpr const int CONNECTION_TIMEOUT
Definition: odps_fragment_loader.h:61
gs::ODPSReadClient::getReadSessionStatus
void getReadSessionStatus(const std::string &session_id, int *split_count, const TableIdentifier &table_identifier)
Definition: odps_fragment_loader.cc:138
gs::ODPSStreamRecordBatchSupplier::split_count_
int split_count_
Definition: odps_fragment_loader.h:131
gs::ODPSReadClient::tunnel_endpoint_
std::string tunnel_endpoint_
Definition: odps_fragment_loader.h:111
gs::ODPSReadClient::readRows
bool readRows(std::string session_id, const TableIdentifier &table_identifier, std::vector< std::shared_ptr< arrow::RecordBatch >> &res_batches, int split_index) const
Definition: odps_fragment_loader.cc:177
gs::ODPSTableRecordBatchSupplier::session_id_
std::string session_id_
Definition: odps_fragment_loader.h:154
gs::ODPSStreamRecordBatchSupplier::cur_batch_reader_
std::shared_ptr< Reader > cur_batch_reader_
Definition: odps_fragment_loader.h:138
gs::ODPSReadClient::getReadSession
TableBatchScanResp getReadSession(std::string session_id, const TableIdentifier &table_identifier)
Definition: odps_fragment_loader.cc:101
gs::ODPSFragmentLoader::LoadFragment
Result< bool > LoadFragment() override
Definition: odps_fragment_loader.cc:363
gs::ODPSFragmentLoader::init
void init()
Definition: odps_fragment_loader.cc:361
gs::ODPSReadClient::createReadSession
TableBatchScanResp createReadSession(const TableIdentifier &table_identifier, const std::vector< std::string > &selected_cols, const std::vector< std::string > &partition_cols, const std::vector< std::string > &selected_partitions)
Definition: odps_fragment_loader.cc:72
gs::LoadingConfig
Definition: loading_config.h:89
gs::ODPSStreamRecordBatchSupplier::cur_split_index_
int32_t cur_split_index_
Definition: odps_fragment_loader.h:135
gs::ODPSReadClient::odps_endpoint_
std::string odps_endpoint_
Definition: odps_fragment_loader.h:110
gs::ODPSReadClient::MAX_PRODUCER_NUM
size_t MAX_PRODUCER_NUM
Definition: odps_fragment_loader.h:114
gs::label_t
uint8_t label_t
Definition: types.h:32
gs::ODPSReadClient::ReadTable
std::shared_ptr< arrow::Table > ReadTable(const std::string &session_id, int split_count, const TableIdentifier &table_id, int thread_num) const
Definition: odps_fragment_loader.cc:200
gs::ODPSReadClient::output_directory_
std::string output_directory_
Definition: odps_fragment_loader.h:112
gs::ODPSFragmentLoader::ODPSFragmentLoader
ODPSFragmentLoader(const std::string &work_dir, const Schema &schema, const LoadingConfig &loading_config)
Definition: odps_fragment_loader.h:172
gs::ODPSReadClient::init
void init()
Definition: odps_fragment_loader.cc:33
gs::ODPSStreamRecordBatchSupplier::table_identifier_
TableIdentifier table_identifier_
Definition: odps_fragment_loader.h:133
gs::ODPSFragmentLoader::loadVertices
void loadVertices()
Definition: odps_fragment_loader.cc:452
gs::ODPSTableRecordBatchSupplier::ODPSTableRecordBatchSupplier
ODPSTableRecordBatchSupplier(label_t label_id, const std::string &file_path, const ODPSReadClient &odps_table_reader, const std::string &session_id, int split_count, TableIdentifier table_identifier, int thread_num)
Definition: odps_fragment_loader.cc:328