#!/usr/bin/env python3# -*- coding: utf-8 -*-## Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importosimportsysfromgremlin_pythonimportstaticsfromgremlin_python.driver.clientimportClientfromgremlin_python.driver.driver_remote_connectionimportDriverRemoteConnectionfromgremlin_python.process.graph_traversalimport__fromgremlin_python.process.strategiesimport*fromgremlin_python.structure.graphimportGraphfromneo4jimportGraphDatabasefromneo4jimportSessionasNeo4jSessionfromgs_interactive.client.sessionimportDefaultSession,Session
[docs]classDriver:""" The main entry point for the Interactive SDK. With the Interactive endpoints provided, you can create a Interactive Session to interact with the Interactive service, and create a Neo4j Session to interact with the Neo4j service. """
[docs]def__init__(self,admin_endpoint:str=None,stored_proc_endpoint:str=None,cypher_endpoint:str=None,gremlin_endpoint:str=None,):""" Construct a new driver using the specified endpoints. If no endpoints are provided, the driver will read them from environment variables. You will receive the endpoints after starting the Interactive service. Args: admin_endpoint: the endpoint for the admin service. stored_proc_endpoint (str, optional): the endpoint for the stored procedure service. cypher_endpoint (str, optional): the endpoint for the cypher service. """ifadmin_endpointisNone:self.read_endpoints_from_env()else:self._admin_endpoint=admin_endpointself._stored_proc_endpoint=stored_proc_endpointself._cypher_endpoint=cypher_endpointself._gremlin_endpoint=gremlin_endpointself._session=Noneself.init_host_and_port()self._neo4j_driver=None
[docs]defclose(self):""" Close the driver and release resources. """ifself._neo4j_driverisnotNone:self._neo4j_driver.close()
def__del__(self):self.close()definit_host_and_port(self):# prepend http:// to self._admin_endpointifnotself._admin_endpoint.startswith("http://"):raiseValueError("Invalid uri, expected format is http://host:port")host_and_port=self._admin_endpoint[7:]splitted=host_and_port.split(":")iflen(splitted)!=2:raiseValueError("Invalid uri, expected format is host:port")self._host=splitted[0]self._port=int(splitted[1])
[docs]defread_endpoints_from_env(self):""" Construct a new driver from the endpoints declared in environment variables. INTERACTIVE_ADMIN_ENDPOINT: http://host:port INTERACTIVE_STORED_PROC_ENDPOINT: http://host:port INTERACTIVE_CYPHER_ENDPOINT: neo4j://host:port or bolt://host:port """self._admin_endpoint=os.environ.get("INTERACTIVE_ADMIN_ENDPOINT")assert(self._admin_endpointisnotNone),"INTERACTIVE_ADMIN_ENDPOINT is not set, did you forget to export the environment variable after deploying Interactive? see https://graphscope.io/docs/latest/flex/interactive/installation"self._stored_proc_endpoint=os.environ.get("INTERACTIVE_STORED_PROC_ENDPOINT")ifself._stored_proc_endpointisNone:print("INTERACTIVE_STORED_PROC_ENDPOINT is not set, will try to get it from service status endpoint")self._cypher_endpoint=os.environ.get("INTERACTIVE_CYPHER_ENDPOINT")ifself._cypher_endpointisNone:print("INTERACTIVE_CYPHER_ENDPOINT is not set, will try to get it from service status endpoint")self._gremlin_endpoint=os.environ.get("INTERACTIVE_GREMLIN_ENDPOINT")ifself._gremlin_endpointisNone:print("INTERACTIVE_GREMLIN_ENDPOINT is not set, will try to get it from service status endpoint")
[docs]defsession(self)->Session:""" Create a session with the specified endpoints. """ifself._stored_proc_endpointisnotNone:returnDefaultSession(self._admin_endpoint,self._stored_proc_endpoint)returnDefaultSession(self._admin_endpoint)
[docs]defgetDefaultSession(self)->Session:""" Get the default session. """ifself._sessionisNone:self._session=self.session()returnself._session
[docs]defgetNeo4jSession(self,**config)->Neo4jSession:""" Create a neo4j session with the specified endpoints. Args: config: a dictionary of configuration options. The same as the ones in neo4j.Driver.session """returnself.getNeo4jSessionImpl(**config)
[docs]defgetNeo4jEndpoint(self)->str:""" Get the bolt endpoint from the service status endpoint. Only works if the sdk was running in the same pod as the service. """service_status=self.getDefaultSession().get_service_status()ifservice_status.is_ok():bolt_port=service_status.get_value().bolt_portreturn"bolt://"+self._host+":"+str(bolt_port)else:raiseValueError("Failed to get service status "+service_status.get_status_message())
defgetGremlinClientImpl(self):ifself._gremlin_endpointisNone:self._gremlin_endpoint=self.getGremlinEndpoint()graph_url="ws://"+self._gremlin_endpoint+"/gremlin"returnClient(graph_url,"g")defgetGremlinEndpoint(self):service_status=self.getDefaultSession().get_service_status()ifservice_status.is_ok():gremlin_port=service_status.get_value().gremlin_portreturnself._host+":"+str(gremlin_port)else:raiseValueError("Failed to get service status "+service_status.get_status_message())