#!/usr/bin/env python3# -*- coding: utf-8 -*-## Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importjsonimportpicklefromtypingimportDictfromtypingimportListfromtypingimportTuplefromtypingimportUnionfromgraphscope.frameworkimportutilsfromgraphscope.framework.errorsimportcheck_argumentfromgraphscope.framework.operationimportOperationfromgraphscope.protoimportattr_value_pb2fromgraphscope.protoimportgraph_def_pb2fromgraphscope.protoimporttypes_pb2
[docs]defcreate_app(app_assets):"""Wrapper for create an `CREATE_APP` Operation with configuration. This op will do nothing but provide required information for `BOUND_APP` """config={types_pb2.APP_ALGO:utils.s_to_attr(app_assets.algo)}ifapp_assets.garisnotNone:config[types_pb2.GAR]=utils.bytes_to_attr(app_assets.gar)op=Operation(None,types_pb2.CREATE_APP,config=config,output_types=types_pb2.APP)returnop
[docs]defbind_app(graph,app_assets):"""Wrapper for create an `BIND_APP` Operation with configuration. Compile and load an application after evaluated. Args: graph (:class:`GraphDAGNode`): A :class:`GraphDAGNode` instance app (:class:`AppAssets`): A :class:`AppAssets` instance. Returns: An :class:`Operation` with configuration that instruct analytical engine how to build the app. """inputs=[graph.op,app_assets.op]config={}config[types_pb2.APP_ALGO]=utils.s_to_attr(app_assets.algo)ifhasattr(graph,"_vertex_map"):config[types_pb2.VERTEX_MAP_TYPE]=utils.i_to_attr(graph._vertex_map)ifhasattr(graph,"_compact_edges"):config[types_pb2.COMPACT_EDGES]=utils.b_to_attr(graph._compact_edges)ifhasattr(graph,"_use_perfect_hash"):config[types_pb2.USE_PERFECT_HASH]=utils.b_to_attr(graph._use_perfect_hash)ifapp_assets.cmake_extra_optionsisnotNone:config[types_pb2.CMAKE_EXTRA_OPTIONS]=utils.s_to_attr(app_assets.cmake_extra_options)op=Operation(graph.session_id,types_pb2.BIND_APP,inputs=inputs,config=config,output_types=types_pb2.BOUND_APP,)returnop
[docs]defrun_app(app,*args,**kwargs):"""Run `bound app` on the `graph`. Args: app (:class:`AppDAGNode`): A :class:`AppDAGNode` instance which represent a bound app. key (str): Key of query results, can be used to retrieve results. *args: Additional query params that will be used in evaluation. **kwargs: Key-value formated query params that mostly used in Cython apps. Returns: An op to run app on the specified graph, with optional query parameters. """inputs=[app.op]config={}output_prefix=kwargs.pop("output_prefix",".")config[types_pb2.OUTPUT_PREFIX]=utils.s_to_attr(output_prefix)# optional query arguments.params=utils.pack_query_params(*args,**kwargs)query_args=types_pb2.QueryArgs()query_args.args.extend(params)op=Operation(app.session_id,types_pb2.RUN_APP,inputs=inputs,config=config,output_types=types_pb2.RESULTS,query_args=query_args,)returnop
[docs]defcreate_graph(session_id,graph_type,inputs=None,**kwargs):"""Create an `CREATE_GRAPH` op, add op to default dag. Args: session_id (str): Refer to session that the graph will be create on. graph_type (:class:`GraphType`): GraphType defined in proto.types.proto. **kwargs: additional properties respect to different `graph_type`. Returns: An op to create a graph in c++ side with necessary configurations. """config={types_pb2.GRAPH_TYPE:utils.graph_type_to_attr(graph_type),}ifgraph_type==graph_def_pb2.ARROW_PROPERTY:attrs=kwargs.pop("attrs",None)ifattrs:fork,vinattrs.items():ifisinstance(v,attr_value_pb2.AttrValue):config[k]=velifgraph_type==graph_def_pb2.DYNAMIC_PROPERTY:config[types_pb2.E_FILE]=utils.s_to_attr(kwargs["efile"])config[types_pb2.V_FILE]=utils.s_to_attr(kwargs["vfile"])config[types_pb2.DIRECTED]=utils.b_to_attr(kwargs["directed"])config[types_pb2.DISTRIBUTED]=utils.b_to_attr(kwargs["distributed"])else:raiseRuntimeError("Not supported graph type {}".format(graph_type))op=Operation(session_id,types_pb2.CREATE_GRAPH,inputs=inputs,config=config,output_types=types_pb2.GRAPH,)returnop
[docs]defcreate_loader(vertex_or_edge_label_list):"""Create a loader operation. Args: vertex_or_edge_label_list: List of (:class:`graphscope.framework.graph_utils.VertexLabel`) or (:class:`graphscope.framework.graph_utils.EdgeLabel`) Returns: An op to take various data sources as a loader. """ifnotisinstance(vertex_or_edge_label_list,list):vertex_or_edge_label_list=[vertex_or_edge_label_list]large_attr=attr_value_pb2.LargeAttrValue()forlabelinvertex_or_edge_label_list:large_attr.chunk_list.items.extend(label.attr())op=Operation(vertex_or_edge_label_list[0]._session_id,types_pb2.DATA_SOURCE,config={},large_attr=large_attr,output_types=types_pb2.NULL_OUTPUT,)returnop
[docs]defadd_labels_to_graph(graph,loader_op):"""Add new labels to existed graph. Args: graph (:class:`Graph`): A graph instance. May not be fully loaded. i.e. it's in a building procedure. loader_op (:class:`graphscope.framework.operation.Operation`): Operation of loader. Raises: NotImplementedError: When encountered not supported graph type. Returns: The operation. Notes: Since we don't want to trigger the loading, we must not use any api that can trigger the loading process implicitly. """fromgraphscope.framework.graphimportGraphDAGNodeassertisinstance(graph,GraphDAGNode)inputs=[graph.op,loader_op]# vid_type is fixedconfig={types_pb2.GRAPH_TYPE:utils.graph_type_to_attr(graph._graph_type),types_pb2.DIRECTED:utils.b_to_attr(graph._directed),types_pb2.EXTEND_LABEL_DATA:utils.i_to_attr(graph._extend_label_data),types_pb2.OID_TYPE:utils.s_to_attr(graph._oid_type),types_pb2.VID_TYPE:utils.s_to_attr(graph._vid_type),types_pb2.GENERATE_EID:utils.b_to_attr(graph._generate_eid),types_pb2.RETAIN_OID:utils.b_to_attr(graph._retain_oid),types_pb2.VERTEX_MAP_TYPE:utils.i_to_attr(graph._vertex_map),types_pb2.COMPACT_EDGES:utils.b_to_attr(graph._compact_edges),types_pb2.USE_PERFECT_HASH:utils.b_to_attr(graph._use_perfect_hash),types_pb2.IS_FROM_VINEYARD_ID:utils.b_to_attr(False),types_pb2.IS_FROM_GAR:utils.b_to_attr(False),}# inferred from the context of the dag.config.update({types_pb2.GRAPH_NAME:utils.s_to_attr("")})ifgraph._graph_type!=graph_def_pb2.ARROW_PROPERTY:raiseNotImplementedError(f"Add vertices or edges is not supported yet on graph type {graph._graph_type}")op=Operation(graph._session.session_id,types_pb2.ADD_LABELS,inputs=inputs,config=config,output_types=types_pb2.GRAPH,)returnop
defconsolidate_columns(graph,label:str,columns:Union[List[str],Tuple[str]],result_column:str,):"""Consolidate property columns in the graph. Args: graph (:class:`Graph`) label (str): The label of the vertex/edge to be consolidated. columns: The columns to be consolidated. result_column: The column name of the result. Returns: Operation """check_argument(graph.graph_type==graph_def_pb2.ARROW_PROPERTY)config={types_pb2.GRAPH_TYPE:utils.graph_type_to_attr(graph._graph_type),types_pb2.DIRECTED:utils.b_to_attr(graph._directed),types_pb2.OID_TYPE:utils.s_to_attr(graph._oid_type),types_pb2.VID_TYPE:utils.s_to_attr(graph._vid_type),types_pb2.GENERATE_EID:utils.b_to_attr(graph._generate_eid),types_pb2.RETAIN_OID:utils.b_to_attr(graph._retain_oid),types_pb2.VERTEX_MAP_TYPE:utils.i_to_attr(graph._vertex_map),types_pb2.COMPACT_EDGES:utils.b_to_attr(graph._compact_edges),types_pb2.USE_PERFECT_HASH:utils.b_to_attr(graph._use_perfect_hash),types_pb2.IS_FROM_VINEYARD_ID:utils.b_to_attr(False),types_pb2.IS_FROM_GAR:utils.b_to_attr(False),types_pb2.CONSOLIDATE_COLUMNS_LABEL:utils.s_to_attr(label),types_pb2.CONSOLIDATE_COLUMNS_COLUMNS:utils.s_to_attr(",".join(columns)),types_pb2.CONSOLIDATE_COLUMNS_RESULT_COLUMN:utils.s_to_attr(result_column),}# The key maybe filled later in coordinatorifhasattr(graph,"key"):config[types_pb2.GRAPH_NAME]=utils.s_to_attr(graph.key)op=Operation(graph.session_id,types_pb2.CONSOLIDATE_COLUMNS,config=config,inputs=[graph.op],output_types=types_pb2.GRAPH,)returnop
[docs]defdynamic_to_arrow(graph):"""Create an op to transform a :class:`nx.Graph` object to :class:`Graph`. Args: graph (:class:`Graph`): Source graph, which type should be DYNAMIC_PROPERTY Returns: An op of transform dynamic graph to arrow graph with necessary configurations. """check_argument(graph.graph_type==graph_def_pb2.DYNAMIC_PROPERTY)oid_type=Nonefornodeingraph:ifoid_typeisNone:oid_type=type(node)elifoid_type!=type(node):raiseRuntimeError("The vertex type is not consistent {} vs {}, can not convert it to arrow graph".format(str(oid_type),str(type(node))))ifoid_type==intoroid_typeisNone:oid_type=utils.data_type_to_cpp(graph_def_pb2.LONG)elifoid_type==str:oid_type=utils.data_type_to_cpp(graph_def_pb2.STRING)else:raiseRuntimeError("Unsupported oid type: "+str(oid_type))vid_type=utils.data_type_to_cpp(graph_def_pb2.ULONG)config={types_pb2.GRAPH_NAME:utils.s_to_attr(graph.key),types_pb2.GRAPH_TYPE:utils.graph_type_to_attr(graph_def_pb2.ARROW_PROPERTY),types_pb2.DST_GRAPH_TYPE:utils.graph_type_to_attr(graph_def_pb2.ARROW_PROPERTY),types_pb2.OID_TYPE:utils.s_to_attr(oid_type),types_pb2.VID_TYPE:utils.s_to_attr(vid_type),}op=Operation(graph.session_id,types_pb2.TRANSFORM_GRAPH,config=config,output_types=types_pb2.GRAPH,)returnop
[docs]defarrow_to_dynamic(graph):"""Transform a :class:`Graph` object to :class:`nx.Graph`. Args: graph (:class:`Graph`): Source graph, which type should be ARROW_PROPERTY. Returns: An op of transform arrow graph to dynamic graph with necessary configurations. """check_argument(graph.graph_type==graph_def_pb2.ARROW_PROPERTY)config={types_pb2.GRAPH_NAME:utils.s_to_attr(graph.key),types_pb2.GRAPH_TYPE:utils.graph_type_to_attr(graph_def_pb2.ARROW_PROPERTY),types_pb2.DST_GRAPH_TYPE:utils.graph_type_to_attr(graph_def_pb2.DYNAMIC_PROPERTY),types_pb2.OID_TYPE:utils.s_to_attr(utils.data_type_to_cpp(graph.schema.oid_type)),types_pb2.VID_TYPE:utils.s_to_attr(utils.data_type_to_cpp(graph.schema.vid_type)),types_pb2.DEFAULT_LABEL_ID:utils.i_to_attr(graph._default_label_id),}op=Operation(graph.session_id,types_pb2.TRANSFORM_GRAPH,config=config,output_types=types_pb2.GRAPH,)returnop
[docs]defmodify_edges(graph,modify_type,edges,attr={},weight=None):"""Create modify edges operation for nx graph. Args: graph (:class:`nx.Graph`): A nx graph. modify_type (`type_pb2.(NX_ADD_EDGES | NX_DEL_EDGES | NX_UPDATE_EDGES)`): The modify type edges (list): List of edges to be inserted into or delete from graph based on `modify_type` Returns: An op to modify edges on the graph. """check_argument(graph.graph_type==graph_def_pb2.DYNAMIC_PROPERTY)config={}config[types_pb2.GRAPH_NAME]=utils.s_to_attr(graph.key)config[types_pb2.MODIFY_TYPE]=utils.modify_type_to_attr(modify_type)config[types_pb2.PROPERTIES]=utils.s_to_attr(json.dumps(attr))ifweight:config[types_pb2.EDGE_KEY]=utils.s_to_attr(weight)op=Operation(graph.session_id,types_pb2.MODIFY_EDGES,config=config,large_attr=utils.bytes_to_large_attr(edges),output_types=types_pb2.GRAPH,)returnop
[docs]defmodify_vertices(graph,modify_type,vertices,attr={}):"""Create modify vertices operation for nx graph. Args: graph (:class:`nx.Graph`): A nx graph. modify_type (`type_pb2.(NX_ADD_NODES | NX_DEL_NODES | NX_UPDATE_NODES)`): The modify type vertices (list): node list. Returns: An op to modify vertices on the graph. """check_argument(graph.graph_type==graph_def_pb2.DYNAMIC_PROPERTY)config={}config[types_pb2.GRAPH_NAME]=utils.s_to_attr(graph.key)config[types_pb2.MODIFY_TYPE]=utils.modify_type_to_attr(modify_type)config[types_pb2.PROPERTIES]=utils.s_to_attr(json.dumps(attr))op=Operation(graph.session_id,types_pb2.MODIFY_VERTICES,config=config,large_attr=utils.bytes_to_large_attr(vertices),output_types=types_pb2.GRAPH,)returnop
[docs]defreport_graph(graph,report_type,node=None,edge=None,fid=None,lid=None,key=None,label_id=None,gid=None,):"""Create report operation for nx graph. This operation is used to simulate networkx graph reporting methods with variaty report type and corresponding config parameters. Args: graph (`nx.Graph`): A nx graph. report_type: report type, can be type_pb2.(NODE_NUM, EDGE_NUM, HAS_NODE, HAS_EDGE, NODE_DATA, EDGE_DATA, NEIGHBORS_BY_NODE, SUCCS_BY_NODE, PREDS_BY_NODE, NEIGHBORS_BY_LOC, SUCCS_BY_LOC, PREDS_BY_LOC, DEG_BY_NODE, IN_DEG_BY_NODE, OUT_DEG_BY_NODE, DEG_BY_LOC, IN_DEG_BY_LOC, OUT_DEG_BY_LOC, NODES_BY_LOC) node (str): node id, used as node id with 'NODE' report types. (optional) edge (str): an edge with 'EDGE' report types. (optional) fid (int): fragment id, with 'LOC' report types. (optional) lid (int): local id of node in grape_engine, with 'LOC; report types. (optional) key (str): edge key for MultiGraph or MultiDiGraph, with 'EDGE' report types. (optional) Returns: An op to do reporting job. """config={types_pb2.GRAPH_NAME:utils.s_to_attr(graph.key),types_pb2.REPORT_TYPE:utils.report_type_to_attr(report_type),}ifgraph.graph_type==graph_def_pb2.ARROW_PROPERTY:config[types_pb2.DEFAULT_LABEL_ID]=utils.i_to_attr(graph._default_label_id)ifnodeisnotNone:config[types_pb2.NODE]=utils.bytes_to_attr(node)ifedgeisnotNone:config[types_pb2.EDGE]=utils.bytes_to_attr(edge)iffidisnotNone:config[types_pb2.FID]=utils.i_to_attr(fid)iflidisnotNone:config[types_pb2.LID]=utils.i_to_attr(lid)iflabel_idisnotNone:config[types_pb2.V_LABEL_ID]=utils.i_to_attr(label_id)ifgidisnotNone:config[types_pb2.GID]=utils.u_to_attr(gid)config[types_pb2.EDGE_KEY]=utils.s_to_attr(str(key)ifkeyisnotNoneelse"")op=Operation(graph.session_id,types_pb2.REPORT_GRAPH,config=config,output_types=types_pb2.RESULTS,)returnop
[docs]defproject_to_simple(graph,v_prop,e_prop,):"""Project property graph to a simple graph. Args: graph: Source graph, which type should be a property graph v_prop (str): The node attribute key to project. e_prop (str): The edge attribute key to project. Returns: Operation to project a property graph, results in a simple graph. """check_argument(graph.graph_typein(graph_def_pb2.ARROW_PROPERTY,graph_def_pb2.DYNAMIC_PROPERTY))config={types_pb2.V_PROP_KEY:utils.s_to_attr(v_prop),types_pb2.E_PROP_KEY:utils.s_to_attr(e_prop),}ifhasattr(graph,"_vertex_map"):config[types_pb2.VERTEX_MAP_TYPE]=utils.i_to_attr(graph._vertex_map)ifhasattr(graph,"_compact_edges"):config[types_pb2.COMPACT_EDGES]=utils.b_to_attr(graph._compact_edges)ifhasattr(graph,"_use_perfect_hash"):config[types_pb2.USE_PERFECT_HASH]=utils.b_to_attr(graph._use_perfect_hash)op=Operation(graph.session_id,types_pb2.PROJECT_TO_SIMPLE,config=config,inputs=[graph.op],output_types=types_pb2.GRAPH,)returnop
[docs]defcopy_graph(graph,copy_type="identical"):"""Create copy operation for nx graph. Args: graph (:class:`nx.Graph`): A nx graph. copy_type (str): 'identical': copy graph to destination graph without any change. 'reverse': copy graph to destination graph with reversing the graph edges Returns: Operation """check_argument(graph.graph_typein(graph_def_pb2.ARROW_PROPERTY,graph_def_pb2.DYNAMIC_PROPERTY))check_argument(copy_typein("identical","reverse"))config={types_pb2.GRAPH_NAME:utils.s_to_attr(graph.key),types_pb2.COPY_TYPE:utils.s_to_attr(copy_type),}op=Operation(graph.session_id,types_pb2.COPY_GRAPH,config=config,output_types=types_pb2.GRAPH,)returnop
[docs]defto_directed(graph):"""Create to_directed operation graph. Args: graph (:class:`nx.Graph`) Returns: Operation """check_argument(graph.graph_typein(graph_def_pb2.DYNAMIC_PROPERTY,graph_def_pb2.ARROW_PROPERTY))config={}# The key maybe filled later in coordinatorifhasattr(graph,"key"):config[types_pb2.GRAPH_NAME]=utils.s_to_attr(graph.key)op=Operation(graph.session_id,types_pb2.TO_DIRECTED,config=config,inputs=[graph.op],output_types=types_pb2.GRAPH,)returnop
[docs]defto_undirected(graph):"""Create to_undirected operation for graph. Args: graph (:class:`nx.Graph`) Returns: Operation """check_argument(graph.graph_typein(graph_def_pb2.DYNAMIC_PROPERTY,graph_def_pb2.ARROW_PROPERTY))config={}# The key maybe filled later in coordinatorifhasattr(graph,"key"):config[types_pb2.GRAPH_NAME]=utils.s_to_attr(graph.key)op=Operation(graph.session_id,types_pb2.TO_UNDIRECTED,config=config,inputs=[graph.op],output_types=types_pb2.GRAPH,)returnop
[docs]defcreate_graph_view(graph,view_type):"""Create view of nx graph. Args: graph (:class:`nx.Graph`): A nx graph. view_type (str): 'reversed': get a reverse view of graph. 'directed': get a directed view of graph 'undirected': get a undirected view of graph Returns: Operation """check_argument(graph.graph_type==graph_def_pb2.DYNAMIC_PROPERTY)check_argument(view_typein("reversed","directed","undirected"))config={types_pb2.GRAPH_NAME:utils.s_to_attr(graph.key),types_pb2.VIEW_TYPE:utils.s_to_attr(view_type),}op=Operation(graph.session_id,types_pb2.VIEW_GRAPH,config=config,output_types=types_pb2.GRAPH,)returnop
[docs]defclear_graph(graph):"""Create clear graph operation for nx graph. Args: graph (:class:`nx.Graph`): A nx graph. Returns: An op to modify edges on the graph. """check_argument(graph.graph_type==graph_def_pb2.DYNAMIC_PROPERTY)config={types_pb2.GRAPH_NAME:utils.s_to_attr(graph.key),}op=Operation(graph.session_id,types_pb2.CLEAR_GRAPH,config=config,output_types=types_pb2.GRAPH,)returnop
[docs]defclear_edges(graph):"""Create clear edges operation for nx graph. Args: graph (:class:`nx.Graph`): A nx graph. Returns: An op to modify edges on the graph. """check_argument(graph.graph_type==graph_def_pb2.DYNAMIC_PROPERTY)config={types_pb2.GRAPH_NAME:utils.s_to_attr(graph.key),}op=Operation(graph.session_id,types_pb2.CLEAR_EDGES,config=config,output_types=types_pb2.GRAPH,)returnop
[docs]defcreate_subgraph(graph,nodes=None,edges=None):"""Create subgraph operation for nx graph. Args: graph (:class:`nx.Graph`): A nx graph. nodes (list): the nodes to induce a subgraph. edges (list): the edges to induce a edge-induced subgraph. Returns: Operation """check_argument(graph.graph_type==graph_def_pb2.DYNAMIC_PROPERTY)config={types_pb2.GRAPH_NAME:utils.s_to_attr(graph.key),}ifnodesisnotNone:config[types_pb2.NODES]=utils.bytes_to_attr(nodes)ifedgesisnotNone:config[types_pb2.EDGES]=utils.bytes_to_attr(edges)op=Operation(graph.session_id,types_pb2.INDUCE_SUBGRAPH,config=config,output_types=types_pb2.GRAPH,)returnop
[docs]defcreate_unload_op(session_id,op_type,inputs):"""Uility method to create a unload `Operation` based on op type and op."""op=Operation(session_id,op_type,inputs=inputs,output_types=types_pb2.NULL_OUTPUT,)returnop
[docs]defunload_app(app):"""Unload a loaded app. Args: app (:class:`AppDAGNode`): The app to unload. Returns: An op to unload the `app`. """returncreate_unload_op(app.session_id,types_pb2.UNLOAD_APP,[app.op])
[docs]defunload_graph(graph):"""Unload a graph. Args: graph (:class:`GraphDAGNode`): The graph to unload. Returns: An op to unload the `graph`. """returncreate_unload_op(graph.session_id,types_pb2.UNLOAD_GRAPH,[graph.op])
[docs]defcontext_to_numpy(context,selector=None,vertex_range=None,axis=0):"""Retrieve results as a numpy ndarray. Args: results (:class:`Context`): Results return by `run_app` operation, store the query results. selector (str): Select the type of data to retrieve. vertex_range (str): Specify a range to retrieve. Returns: An op to retrieve query results and convert to numpy ndarray. """config={}ifselectorisnotNone:config[types_pb2.SELECTOR]=utils.s_to_attr(selector)ifvertex_rangeisnotNone:config[types_pb2.VERTEX_RANGE]=utils.s_to_attr(vertex_range)ifaxisisnotNone:config[types_pb2.AXIS]=utils.i_to_attr(axis)op=Operation(context.session_id,types_pb2.CONTEXT_TO_NUMPY,config=config,inputs=[context.op],output_types=types_pb2.TENSOR,)returnop
[docs]defcontext_to_dataframe(context,selector=None,vertex_range=None):"""Retrieve results as a pandas DataFrame. Args: results (:class:`Context`): Results return by `run_app` operation, store the query results. selector (str): Select the type of data to retrieve. vertex_range (str): Specify a range to retrieve. Returns: An op to retrieve query results and convert to pandas DataFrame. """config={}ifselectorisnotNone:config[types_pb2.SELECTOR]=utils.s_to_attr(selector)ifvertex_rangeisnotNone:config[types_pb2.VERTEX_RANGE]=utils.s_to_attr(vertex_range)op=Operation(context.session_id,types_pb2.CONTEXT_TO_DATAFRAME,config=config,inputs=[context.op],output_types=types_pb2.DATAFRAME,)returnop
[docs]defto_vineyard_tensor(context,selector=None,vertex_range=None,axis=None):"""Retrieve results as vineyard tensor. Parameters: results (:class:`Context`): Results return by `run_app` operation, store the query results. selector (str): Select the type of data to retrieve. vertex_range (str): Specify a range to retrieve. Returns: An op to convert query results into a vineyard tensor. """config={}ifselectorisnotNone:config[types_pb2.SELECTOR]=utils.s_to_attr(selector)ifvertex_rangeisnotNone:config[types_pb2.VERTEX_RANGE]=utils.s_to_attr(vertex_range)ifaxisisnotNone:config[types_pb2.AXIS]=utils.i_to_attr(axis)op=Operation(context.session_id,types_pb2.TO_VINEYARD_TENSOR,config=config,inputs=[context.op],output_types=types_pb2.VINEYARD_TENSOR,)returnop
[docs]defto_vineyard_dataframe(context,selector=None,vertex_range=None):"""Retrieve results as vineyard dataframe. Parameters: results (:class:`Context`): Results return by `run_app` operation, store the query results. selector (str): Select the type of data to retrieve. vertex_range (str): Specify a range to retrieve. Returns: An op to convert query results into a vineyard dataframe. """config={}ifselectorisnotNone:config[types_pb2.SELECTOR]=utils.s_to_attr(selector)ifvertex_rangeisnotNone:config[types_pb2.VERTEX_RANGE]=utils.s_to_attr(vertex_range)op=Operation(context.session_id,types_pb2.TO_VINEYARD_DATAFRAME,config=config,inputs=[context.op],output_types=types_pb2.VINEYARD_DATAFRAME,)returnop
[docs]defto_data_sink(result,fd,storage_options=None,write_options=None,**kwargs):"""Dump result to `fd` by drivers in vineyard. Parameters: result (:class:`graphscope.framework.context.ResultDAGNode`): Dataframe or numpy or result hold the object id of vineyard dataframe. fd (str): Such as `hdfs:///tmp/result_path` kwargs (dict, optional): Storage options with respect to output storage type Returns: An op to dump result to `fd`. """ifstorage_optionsisNone:storage_options={}storage_options.update(kwargs)ifwrite_optionsisNone:write_options={}write_options.update(kwargs)config={types_pb2.STORAGE_OPTIONS:utils.s_to_attr(json.dumps(storage_options)),types_pb2.WRITE_OPTIONS:utils.s_to_attr(json.dumps(write_options)),types_pb2.FD:utils.s_to_attr(str(fd)),}op=Operation(result.session_id,types_pb2.DATA_SINK,config=config,inputs=[result.op],output_types=types_pb2.NULL_OUTPUT,)returnop
[docs]defoutput(context,fd,selector,vertex_range,storage_options=None,write_options=None,**kwargs,):"""Output result to `fd`, this will be handled by registered vineyard C++ adaptor. Args: results (:class:`Context`): Results return by `run_app` operation, store the query results. fd (str): Such as `file:///tmp/result_path` selector (str): Select the type of data to retrieve. vertex_range (str): Specify a range to retrieve. Returns: An op to output results to `fd`. """ifstorage_optionsisNone:storage_options={}storage_options.update(kwargs)ifwrite_optionsisNone:write_options={}write_options.update(kwargs)config={}config[types_pb2.FD]=utils.s_to_attr(fd)config[types_pb2.SELECTOR]=utils.s_to_attr(selector)config[types_pb2.STORAGE_OPTIONS]=utils.s_to_attr(json.dumps(storage_options))config[types_pb2.WRITE_OPTIONS]=utils.s_to_attr(json.dumps(write_options))ifvertex_rangeisnotNone:config[types_pb2.VERTEX_RANGE]=utils.s_to_attr(vertex_range)op=Operation(context.session_id,types_pb2.OUTPUT,config=config,inputs=[context.op],output_types=types_pb2.NULL_OUTPUT,)returnop
[docs]defadd_column(graph,results,selector):"""Add a column to `graph`, produce a new graph. Args: graph (:class:`Graph`): Source ArrowProperty graph. results (:class:`Context`): Results that generated by previous app querying. selector (str): Used to select a subrange of data of results, add them as one column of graph. Returns: A new graph with new columns added. """config={types_pb2.SELECTOR:utils.s_to_attr(selector)}op=Operation(graph.session_id,types_pb2.ADD_COLUMN,config=config,inputs=[graph.op,results.op],output_types=types_pb2.GRAPH,)returnop
[docs]defgraph_to_numpy(graph,selector=None,vertex_range=None):"""Retrieve graph raw data as a numpy ndarray. Args: graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph. selector (str): Select the type of data to retrieve. vertex_range (str): Specify a range to retrieve. Returns: An op to convert a graph's data to numpy ndarray. """config={}ifselectorisnotNone:config[types_pb2.SELECTOR]=utils.s_to_attr(selector)ifvertex_rangeisnotNone:config[types_pb2.VERTEX_RANGE]=utils.s_to_attr(vertex_range)op=Operation(graph.session_id,types_pb2.GRAPH_TO_NUMPY,config=config,inputs=[graph.op],output_types=types_pb2.TENSOR,)returnop
[docs]defgraph_to_dataframe(graph,selector=None,vertex_range=None):"""Retrieve graph raw data as a pandas DataFrame. Args: graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph. selector (str): Select the type of data to retrieve. vertex_range (str): Specify a range to retrieve. Returns: An op to convert a graph's data to pandas DataFrame. """config={}ifselectorisnotNone:config[types_pb2.SELECTOR]=utils.s_to_attr(selector)ifvertex_rangeisnotNone:config[types_pb2.VERTEX_RANGE]=utils.s_to_attr(vertex_range)op=Operation(graph.session_id,types_pb2.GRAPH_TO_DATAFRAME,config=config,inputs=[graph.op],output_types=types_pb2.DATAFRAME,)returnop
[docs]defgremlin_to_subgraph(interactive_query,gremlin_script,request_options=None,oid_type="int64"):"""Create a subgraph from gremlin output. Args: interactive_query (:class:`graphscope.interactive.query.InteractiveQueryDAGNode`): The GIE instance holds the graph that gremlin query on. gremlin_script (str): gremlin script to be executed. request_options (dict, optional): gremlin request options. format: { "engine": "gae" } oid_type (str, optional): Type of vertex original id. Defaults to "int64". Returns: An op to create the subgraph from gremlin script """config={}config[types_pb2.GIE_GREMLIN_QUERY_MESSAGE]=utils.s_to_attr(gremlin_script)config[types_pb2.OID_TYPE]=utils.s_to_attr(oid_type)config[types_pb2.VINEYARD_ID]=utils.i_to_attr(interactive_query.object_id)ifrequest_options:config[types_pb2.GIE_GREMLIN_REQUEST_OPTIONS]=utils.s_to_attr(json.dumps(request_options))op=Operation(interactive_query.session_id,types_pb2.SUBGRAPH,config=config,output_types=types_pb2.GRAPH,)returnop
defsave_to_graphar(graph,path:str,**kwargs):"""Archive a graph to gar format with a path. Args: graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph. path (str): The path to archive the graph. Returns: An op to archive the graph to a path. """config={types_pb2.GRAPH_TYPE:utils.graph_type_to_attr(graph._graph_type),types_pb2.OID_TYPE:utils.s_to_attr(graph._oid_type),types_pb2.VID_TYPE:utils.s_to_attr(graph._vid_type),types_pb2.VERTEX_MAP_TYPE:utils.i_to_attr(graph._vertex_map),types_pb2.COMPACT_EDGES:utils.b_to_attr(graph._compact_edges),types_pb2.USE_PERFECT_HASH:utils.b_to_attr(graph._use_perfect_hash),types_pb2.WRITE_OPTIONS:utils.s_to_attr(json.dumps(kwargs)),types_pb2.GRAPH_INFO_PATH:utils.s_to_attr(path),}op=Operation(graph.session_id,types_pb2.ARCHIVE_GRAPH,config=config,inputs=[graph.op],output_types=types_pb2.NULL_OUTPUT,)returnopdefserialize_graph(graph,path:str,**kwargs):"""Serialize graph to the specified location The meta and data of graph is dumped to specified location, and can be restored by `Graph.load_from` in other sessions. Each worker will write a `path_{worker_id}.meta` file and a `path_{worker_id}` file to storage. Args: graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph. path (str): The path to serialize the graph, on each worker, supported storages are local, hdfs, oss, s3 Returns: An op to serialize the graph to a path. """config={types_pb2.GRAPH_SERIALIZATION_PATH:utils.s_to_attr(path),types_pb2.VINEYARD_ID:utils.i_to_attr(graph._vineyard_id),types_pb2.STORAGE_OPTIONS:utils.s_to_attr(json.dumps(kwargs)),}op=Operation(graph.session_id,types_pb2.SERIALIZE_GRAPH,config=config,inputs=[graph.op],output_types=types_pb2.NULL_OUTPUT,)returnopdefdeserialize_graph(path:str,sess,**kwargs):"""Deserialize graph from the specified location. Args: path (str): The path contains the serialization files. sess (`graphscope.Session`): The target session that the graph will be construct in. Returns: `Graph`: A new graph object. Schema and data is supposed to be identical with the one that called serialized method. """config={types_pb2.GRAPH_SERIALIZATION_PATH:utils.s_to_attr(path),types_pb2.STORAGE_OPTIONS:utils.s_to_attr(json.dumps(kwargs)),}op=Operation(sess.session_id,types_pb2.DESERIALIZE_GRAPH,config=config,output_types=types_pb2.GRAPH,)returnop