Go to the documentation of this file.
17 #ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_ODPS_FRAGMENT_LOADER_H_
18 #define STORAGES_RT_MUTABLE_GRAPH_LOADER_ODPS_FRAGMENT_LOADER_H_
20 #include <arrow/api.h>
21 #include <arrow/csv/api.h>
22 #include <arrow/io/api.h>
23 #include <arrow/util/uri.h>
25 #include <boost/convert.hpp>
26 #include <boost/convert/strtol.hpp>
29 #include "arrow/util/value_parsing.h"
30 #include "common/configuration.h"
37 #include "flex/third_party/httplib.h"
38 #include "grape/util.h"
39 #include "storage_api.hpp"
40 #include "storage_api_arrow.hpp"
42 using apsara::odps::sdk::AliyunAccount;
43 using apsara::odps::sdk::Configuration;
44 using apsara::odps::sdk::storage_api::ReadRowsReq;
45 using apsara::odps::sdk::storage_api::SessionReq;
46 using apsara::odps::sdk::storage_api::SessionStatus;
47 using apsara::odps::sdk::storage_api::SplitOptions;
48 using apsara::odps::sdk::storage_api::TableBatchScanReq;
49 using apsara::odps::sdk::storage_api::TableBatchScanResp;
50 using apsara::odps::sdk::storage_api::TableBatchWriteReq;
51 using apsara::odps::sdk::storage_api::TableBatchWriteResp;
52 using apsara::odps::sdk::storage_api::TableIdentifier;
53 using apsara::odps::sdk::storage_api::WriteRowsReq;
54 using apsara::odps::sdk::storage_api::arrow_adapter::ArrowClient;
55 using apsara::odps::sdk::storage_api::arrow_adapter::Reader;
71 const TableIdentifier& table_identifier,
72 const std::vector<std::string>& selected_cols,
73 const std::vector<std::string>& partition_cols,
74 const std::vector<std::string>& selected_partitions);
76 std::shared_ptr<arrow::Table>
ReadTable(
const std::string& session_id,
78 const TableIdentifier& table_id,
79 int thread_num)
const;
85 const TableIdentifier& table_identifier,
86 const std::vector<std::string>& selected_cols,
87 const std::vector<std::string>& partition_cols,
88 const std::vector<std::string>& selected_partitions);
91 const TableIdentifier& table_identifier);
94 const TableIdentifier& table_identifier);
97 const std::string& session_id,
const TableIdentifier& table_identifier,
98 std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&
100 std::vector<int>&& indices)
const;
102 bool readRows(std::string session_id,
const TableIdentifier& table_identifier,
103 std::vector<std::shared_ptr<arrow::RecordBatch>>& res_batches,
104 int split_index)
const;
121 const std::string& session_id,
int split_count,
122 TableIdentifier table_identifier,
int worker_id,
125 std::shared_ptr<arrow::RecordBatch>
GetNextBatch()
override;
145 const std::string& session_id,
int split_count,
146 TableIdentifier table_identifier,
149 std::shared_ptr<arrow::RecordBatch>
GetNextBatch()
override;
158 std::shared_ptr<arrow::TableBatchReader>
reader_;
176 static std::shared_ptr<IFragmentLoader>
Make(
177 const std::string& work_dir,
const Schema& schema,
188 TableIdentifier& table_identifier,
189 std::vector<std::string>& partition_names,
190 std::vector<std::string>& selected_partitions);
199 const std::vector<std::string>& e_files);
202 const std::vector<std::tuple<size_t, std::string, std::string>>&
212 #endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_ODPS_FRAGMENT_LOADER_H_
static constexpr const size_t MAX_RETRY
Definition: odps_fragment_loader.h:63
~ODPSFragmentLoader()
Definition: odps_fragment_loader.h:180
Definition: odps_fragment_loader.h:170
~ODPSReadClient()
Definition: odps_fragment_loader.cc:31
Definition: abstract_arrow_fragment_loader.h:338
std::vector< std::string > columnMappingsToSelectedCols(const std::vector< std::tuple< size_t, std::string, std::string >> &column_mappings)
Definition: odps_fragment_loader.cc:611
const ODPSReadClient & odps_read_client_
Definition: odps_fragment_loader.h:129
Definition: odps_fragment_loader.h:141
TableIdentifier table_identifier_
Definition: odps_fragment_loader.h:155
std::shared_ptr< arrow::TableBatchReader > reader_
Definition: odps_fragment_loader.h:158
Definition: abstract_arrow_fragment_loader.h:39
void addEdges(label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::vector< std::string > &e_files)
Definition: odps_fragment_loader.cc:501
ODPSReadClient()
Definition: odps_fragment_loader.cc:30
void loadEdges()
Definition: odps_fragment_loader.cc:560
const ODPSReadClient & odps_read_client_
Definition: odps_fragment_loader.h:153
void CreateReadSession(std::string *session_id, int *split_count, const TableIdentifier &table_identifier, const std::vector< std::string > &selected_cols, const std::vector< std::string > &partition_cols, const std::vector< std::string > &selected_partitions)
Definition: odps_fragment_loader.cc:112
static constexpr const int READ_WRITE_TIMEOUT
Definition: odps_fragment_loader.h:62
void addVertices(label_t v_label_id, const std::vector< std::string > &v_files)
Definition: odps_fragment_loader.cc:412
Definition: adj_list.h:23
static std::shared_ptr< IFragmentLoader > Make(const std::string &work_dir, const Schema &schema, const LoadingConfig &loading_config)
Definition: odps_fragment_loader.cc:355
Definition: odps_fragment_loader.h:117
std::string access_id_
Definition: odps_fragment_loader.h:108
std::shared_ptr< ArrowClient > GetArrowClient() const
Definition: odps_fragment_loader.cc:68
std::shared_ptr< arrow::RecordBatch > GetNextBatch() override
Definition: odps_fragment_loader.cc:343
void producerRoutine(const std::string &session_id, const TableIdentifier &table_identifier, std::vector< std::vector< std::shared_ptr< arrow::RecordBatch >>> &all_batches_, std::vector< int > &&indices) const
Definition: odps_fragment_loader.cc:160
ReadRowsReq read_rows_req_
Definition: odps_fragment_loader.h:137
std::shared_ptr< arrow::Table > table_
Definition: odps_fragment_loader.h:157
std::string file_path_
Definition: odps_fragment_loader.h:152
std::shared_ptr< ArrowClient > arrow_client_ptr_
Definition: odps_fragment_loader.h:113
Definition: odps_fragment_loader.h:59
int worker_num_
Definition: odps_fragment_loader.h:136
std::shared_ptr< arrow::RecordBatch > GetNextBatch() override
Definition: odps_fragment_loader.cc:283
ODPSStreamRecordBatchSupplier(label_t label_id, const std::string &file_path, const ODPSReadClient &odps_table_reader, const std::string &session_id, int split_count, TableIdentifier table_identifier, int worker_id, int worker_num)
Definition: odps_fragment_loader.cc:262
static const bool registered_
Definition: odps_fragment_loader.h:207
ODPSReadClient odps_read_client_
Definition: odps_fragment_loader.h:205
std::string file_path_
Definition: odps_fragment_loader.h:128
std::string access_key_
Definition: odps_fragment_loader.h:109
std::string session_id_
Definition: odps_fragment_loader.h:130
void parseLocation(const std::string &odps_table_path, TableIdentifier &table_identifier, std::vector< std::string > &partition_names, std::vector< std::string > &selected_partitions)
Definition: odps_fragment_loader.cc:384
static constexpr const int CONNECTION_TIMEOUT
Definition: odps_fragment_loader.h:61
void getReadSessionStatus(const std::string &session_id, int *split_count, const TableIdentifier &table_identifier)
Definition: odps_fragment_loader.cc:138
int split_count_
Definition: odps_fragment_loader.h:131
std::string tunnel_endpoint_
Definition: odps_fragment_loader.h:111
bool readRows(std::string session_id, const TableIdentifier &table_identifier, std::vector< std::shared_ptr< arrow::RecordBatch >> &res_batches, int split_index) const
Definition: odps_fragment_loader.cc:177
std::string session_id_
Definition: odps_fragment_loader.h:154
std::shared_ptr< Reader > cur_batch_reader_
Definition: odps_fragment_loader.h:138
TableBatchScanResp getReadSession(std::string session_id, const TableIdentifier &table_identifier)
Definition: odps_fragment_loader.cc:101
Result< bool > LoadFragment() override
Definition: odps_fragment_loader.cc:363
void init()
Definition: odps_fragment_loader.cc:361
TableBatchScanResp createReadSession(const TableIdentifier &table_identifier, const std::vector< std::string > &selected_cols, const std::vector< std::string > &partition_cols, const std::vector< std::string > &selected_partitions)
Definition: odps_fragment_loader.cc:72
Definition: loading_config.h:89
int32_t cur_split_index_
Definition: odps_fragment_loader.h:135
std::string odps_endpoint_
Definition: odps_fragment_loader.h:110
size_t MAX_PRODUCER_NUM
Definition: odps_fragment_loader.h:114
uint8_t label_t
Definition: types.h:32
std::shared_ptr< arrow::Table > ReadTable(const std::string &session_id, int split_count, const TableIdentifier &table_id, int thread_num) const
Definition: odps_fragment_loader.cc:200
std::string output_directory_
Definition: odps_fragment_loader.h:112
ODPSFragmentLoader(const std::string &work_dir, const Schema &schema, const LoadingConfig &loading_config)
Definition: odps_fragment_loader.h:172
void init()
Definition: odps_fragment_loader.cc:33
TableIdentifier table_identifier_
Definition: odps_fragment_loader.h:133
void loadVertices()
Definition: odps_fragment_loader.cc:452
ODPSTableRecordBatchSupplier(label_t label_id, const std::string &file_path, const ODPSReadClient &odps_table_reader, const std::string &session_id, int split_count, TableIdentifier table_identifier, int thread_num)
Definition: odps_fragment_loader.cc:328