Source code for graphscope.analytical.app.java_app
#!/usr/bin/env python3# -*- coding: utf-8 -*-## Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importhashlibimportjsonimportloggingimportosimportshutilimportsubprocessimportzipfilefrompathlibimportPathimportyamlfromgraphscope.analytical.udf.utilsimportInMemoryZipfromgraphscope.framework.appimportAppAssetsfromgraphscope.framework.appimportAppDAGNodefromgraphscope.framework.appimportcheck_argumentfromgraphscope.framework.contextimportcreate_context_nodefromgraphscope.framework.dagimportDAGNodefromgraphscope.framework.dag_utilsimportbind_appfromgraphscope.framework.errorsimportInvalidArgumentErrorfromgraphscope.framework.graphimportGraphfromgraphscope.framework.utilsimportget_tempdirfromgraphscope.protoimportgraph_def_pb2__all__=["JavaApp"]logger=logging.getLogger("graphscope")# runtime workspacetry:WORKSPACE=os.environ["GRAPHSCOPE_RUNTIME"]exceptKeyError:WORKSPACE=os.path.join(get_tempdir(),"gs")DEFAULT_GS_CONFIG_FILE=".gs_conf.yaml"POSSIBLE_APP_TYPES=["default_property","parallel_property","default_simple","parallel_simple",]def_parse_user_app(java_app_class:str,java_jar_full_path:str):_java_app_type=""_frag_param_str=""_java_inner_context_type=""_java_executable="java"ifshutil.which("java")isNone:ifos.environ.get("JAVA_HOME",None)isnotNone:_java_executable=os.path.join(os.environ.get("JAVA_HOME"),"bin","java")ifnotos.path.isfile(_java_executable)ornotos.access(_java_executable,os.X_OK):raiseRuntimeError("Java executable not found, you shall install a java runtime.")parse_user_app_cmd=[_java_executable,"-cp","{}".format(java_jar_full_path),"com.alibaba.graphscope.utils.AppBaseParser",java_app_class,]logger.info(" ".join(parse_user_app_cmd))parse_user_app_process=subprocess.Popen(parse_user_app_cmd,env=os.environ.copy(),encoding="utf-8",errors="replace",stdout=subprocess.PIPE,stderr=subprocess.PIPE,universal_newlines=True,bufsize=1,)out,err=parse_user_app_process.communicate()lines=out.split("\n")+err.split("\n")forlineinlines:iflen(line)==0:continueifline.find("DefaultPropertyApp")!=-1:_java_app_type="default_property"elifline.find("ParallelPropertyApp")!=-1:_java_app_type="parallel_property"elifline.find("DefaultAppBase")!=-1:_java_app_type="default_simple"elifline.find("ParallelAppBase")!=-1:_java_app_type="parallel_simple"elifline.find("Error")!=-1:raiseException("Error occured in verifying user app")elifline.find("TypeParams")!=-1:_frag_param_str=line.split(":")[-1].strip()elifline.find("ContextType")!=-1:_java_inner_context_type=line.split(":")[-1].strip()logger.info("Java app type: %s, frag type str: %s, ctx type: %s",_java_app_type,_frag_param_str,_java_inner_context_type,)parse_user_app_process.wait()return_java_app_type,_frag_param_str,_java_inner_context_typedef_type_param_consistent(graph_actucal_type_param,java_app_type_param):ifjava_app_type_param=="java.lang.Long":ifgraph_actucal_type_paramin{"uint64_t","int64_t"}:returnTruereturnFalseifjava_app_type_param=="java.lang.Double":ifgraph_actucal_type_paramin{"double"}:returnTruereturnFalseifjava_app_type_param=="java.lang.Integer":ifgraph_actucal_type_paramin{"int32_t","uint32_t"}:returnTruereturnFalsereturnFalse
[docs]classJavaApp(AppAssets):"""A class represents a java app assert node in a DAG that holds the jar file. It holds neccessary resouces to run a java app, including java class path, the gar file which consists jar and configuration yaml, and the specified java class. On creating a JavaApp, graphscope will try to load the specified java class, and parse the Base class for your app, and the base class for your Context Class. This operation requires a java runtime environment installed in your client machine where your graphscope session is created. To run your app, provide `JavaApp` with a property or projected graph and your querying args. """
[docs]def__init__(self,full_jar_path:str,java_app_class:str):"""Init JavaApp with the full path of your `jar` file and the fully-qualified name of your app class. Args: full_jar_path (str): The path where the jar file exists. java_app_class (str): the fully-qualified name of your app class. """self._java_app_class=java_app_classself._full_jar_path=full_jar_pathself._jar_name=Path(self._full_jar_path).namegar=self._pack_jar(self._full_jar_path)gs_config={"app":[{"algo":"java_app","type":"java_pie","java_jar_path":self._full_jar_path,"java_app_class":self.java_app_class,}]}# extract java app type with help of java class.self._java_app_type,self._frag_param_str,_java_ctx_type=_parse_user_app(java_app_class,full_jar_path)# For four different java type, we use two different driver classifself._java_app_typenotinPOSSIBLE_APP_TYPES:raiseRuntimeError("Unexpected app type: {}".format(self._java_app_type))ifself._java_app_type.find("property")!=-1:gs_config["app"][0]["compatible_graph"]=["vineyard::ArrowFragment"]else:gs_config["app"][0]["compatible_graph"]=["gs::ArrowProjectedFragment"]gs_config["app"][0]["context_type"]=_java_ctx_typeifself._java_app_type=="default_property":gs_config["app"][0]["driver_header"]="apps/java_pie/java_pie_property_default_app.h"gs_config["app"][0]["class_name"]="gs::JavaPIEPropertyDefaultApp"elifself._java_app_type=="parallel_property":gs_config["app"][0]["driver_header"]="apps/java_pie/java_pie_property_parallel_app.h"gs_config["app"][0]["class_name"]="gs::JavaPIEPropertyParallelAppOE"elifself._java_app_type=="default_simple":gs_config["app"][0]["driver_header"]="apps/java_pie/java_pie_projected_default_app.h"gs_config["app"][0]["class_name"]="gs::JavaPIEProjectedDefaultApp"elifself._java_app_type=="parallel_simple":gs_config["app"][0]["driver_header"]="apps/java_pie/java_pie_projected_parallel_app.h"gs_config["app"][0]["class_name"]="gs::JavaPIEProjectedParallelAppOE"else:raiseException("Unrecognizable java app type: {}".format(self._java_app_type))gar.append(DEFAULT_GS_CONFIG_FILE,yaml.dump(gs_config))super().__init__("java_app",_java_ctx_type,gar.read_bytes())
# Override is_compatible to make sure type params of graph consists with java app.
[docs]defis_compatible(self,graph):# The GraphTemplate can be vineyard::ArrowFragment<OID_T,VID_T, VERTEX_MAP_T, COMPACT># or gs::ArrowProjectedFragment<OID_T,VID_T,VDATA_T,EDATA_T, VERTEX_MAP_T, COMPACT>java_app_type_params=self.frag_param_str.split(",")ifgraph.template_str.find("vineyard::ArrowFragment")!=-1:ifself.java_app_type.find("property")==-1:logger.error("Expected property app")returnFalseiflen(java_app_type_params)!=1:logger.error("Expected one type params.")returnFalse# Get the OID_T from the graph template stringsplit_parts=graph.template_str.split("<")iflen(split_parts)>1:type_parts=split_parts[1].split(",")iflen(type_parts)>0:oid_t=type_parts[0].strip()else:raiseException("Error format graph template str: {}".format(graph.template_str))else:raiseException("Error format graph template str: {}".format(graph.template_str))ifnot_type_param_consistent(oid_t,java_app_type_params[0]):returnFalsereturnTrueelifgraph.template_str.find("gs::ArrowProjectedFragment")!=-1:ifself.java_app_type.find("simple")==-1:logger.error("Expected simple app")returnFalseiflen(java_app_type_params)!=4:logger.error("Expected 4 type params")returnFalsesplit_parts=graph.template_str.split("<")iflen(split_parts)>1:type_parts=split_parts[1].split(",")cpp_graph_type=[]if(len(type_parts)>3):# Check if there are at least 4 parts to extractcpp_graph_type=type_parts[:4]logger.info("found type param {}".format(cpp_graph_type))else:raiseException("Error format graph template str: {}".format(graph.template_str))foriinrange(0,4):logger.info("checking type param {},{},{}".format(i,cpp_graph_type[i],java_app_type_params[i]))ifnot_type_param_consistent(cpp_graph_type[i],java_app_type_params[i]):returnFalsereturnTrueelse:raiseException("Expect least {}".format(graph.template_str))else:raiseException("Unrecoginizable graph template str: {}".format(graph.template_str))
def_pack_jar(self,full_jar_path:str):garfile=InMemoryZip()ifnotos.path.exists(full_jar_path):raiseFileNotFoundError("Jar file not found in {}.".format(full_jar_path))ifnotfull_jar_path.endswith(".jar")ornotzipfile.is_zipfile(full_jar_path):raiseKeyError("{} is not a jar file, please feed your packed jar file to JavaApp.".format(full_jar_path))tmp_jar_file=open(full_jar_path,"rb")jar_bytes=tmp_jar_file.read()iflen(jar_bytes)<=0:raiseKeyError("Expect a non-empty Jar.")garfile.append("{}".format(full_jar_path.split("/")[-1]),jar_bytes)returngarfile
[docs]def__call__(self,graph:Graph,*args,**kwargs):kwargs_extend=dict(app_class=self.java_app_class,**kwargs)ifnothasattr(graph,"graph_type"):raiseInvalidArgumentError("Missing graph_type attribute in graph object.")if(self.java_app_type.find("simple")!=-1andgraph.graph_type==graph_def_pb2.ARROW_PROPERTY):graph=graph._project_to_simple()app_=graph.session._wrapper(JavaAppDagNode(graph,self))returnapp_(*args,**kwargs_extend)
classJavaAppDagNode(AppDAGNode):"""retrict app assets to javaAppAssets"""def__init__(self,graph:Graph,app_assets:JavaApp):self._graph=graphself._app_assets=app_assetsself._session=graph.sessionifnotself._app_assets.is_compatible(self._graph):raiseException("No compactiable app and graph: {} and {}".format(self._app_assets.java_app_type,self._graph.template_str))self._op=bind_app(graph,self._app_assets)# add op to dagself._session.dag.add_op(self._app_assets.op)self._session.dag.add_op(self._op)def_convert_arrow_frag_for_java(self,cpp_frag_str:str):"""Convert vineyard::ArrowFragment<OID,VID> to gs::ArrowFragmentDefault<OID>"""res=cpp_frag_str.split(",")[0]+">"returnres.replace("<","Default<",1).replace("vineyard","gs")def__call__(self,*args,**kwargs):"""When called, check arguments based on app type, Then do build and query. Raises: InvalidArgumentError: If app_type is None, or positional argument found when app_type not `cpp_pie`. Returns: :class:`Context`: Query context, include running results of the app. """check_argument(self._app_assets.type=="java_pie","expect java_pie app")ifnotisinstance(self._graph,DAGNode)andnotself._graph.loaded():raiseRuntimeError("The graph is not loaded")check_argument(notargs,"Only support using keyword arguments in cython app.")ifself._app_assets.java_app_type.find("property")!=-1:frag_name_for_java=self._convert_arrow_frag_for_java(self._graph.template_str)logger.info("Set frag name to %s, %s",self._graph.template_str,frag_name_for_java)else:frag_name_for_java=self._graph.template_str# get number of worker on each host, so we can determine the java memory settings.kwargs_extend=dict(frag_name=frag_name_for_java,jar_name=self._app_assets.jar_name,**kwargs,)logger.info("dumping to json %s",json.dumps(kwargs_extend))returncreate_context_node(self._app_assets.context_type,self,self._graph,json.dumps(kwargs_extend))