Source code for graphscope.interactive.query

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from enum import Enum

from gremlin_python.driver.client import Client
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal

from graphscope.framework.dag_utils import gremlin_to_subgraph
from graphscope.framework.utils import deprecated

logger = logging.getLogger("graphscope")

class InteractiveQueryStatus(Enum):
    """Enumeration class of current status of InteractiveQuery"""

    Initializing = 0
    Running = 1
    Failed = 2
    Closed = 3

[docs]class InteractiveQuery(object): """`InteractiveQuery` class, is a simple wrapper around `Gremlin-Python <>`_, which implements Gremlin within the Python language. It also can expose gremlin endpoint which can be used by any other standard gremlin console, with the method `gremlin_url()`. It also has a method called `subgraph` which can extract some fragments from origin graph, produce a new, smaller but concise graph stored in vineyard, which lifetime is independently of the origin graph. User can either use `execute()` to submit a script, or use `traversal_source()` to get a `GraphTraversalSource` for further traversal. """
[docs] def __init__(self, graph, frontend_gremlin_endpoint, frontend_cypher_endpoint): """Construct a :class:`InteractiveQuery` object.""" # graph object id stored in vineyard self._graph = graph self._session = graph._session self._gremlin_url = [ f"ws://{endpoint}/gremlin" for endpoint in frontend_gremlin_endpoint.split(",") ] self._cypher_url = [ f"neo4j://{endpoint}" for endpoint in frontend_cypher_endpoint.split(",") ] self._conn = None self._gremlin_client = None self._cypher_driver = None self.closed = False
@property @deprecated("Please use `gremlin_url` instead") def graph_url(self): """This will be deprecated in the future, use `gremlin_url` instead.""" return self._gremlin_url @property def gremlin_url(self): """The gremlin graph url can be used with any standard gremlin console, e.g., tinkerpop.""" return self._gremlin_url @property def cypher_url(self): """The cypher graph url can be used with any standard cypher console, e.g., neo4j.""" return self._cypher_url @property def object_id(self): return self._graph.vineyard_id @property def session(self): return self._session @property def session_id(self): return self._session.session_id
[docs] def execute(self, query, lang="gremlin", request_options=None, **kwargs): """A simple wrapper around `submit`, for compatibility""" return self.submit(query, lang, request_options=request_options, **kwargs)
[docs] def submit(self, query, lang="gremlin", request_options=None, **kwargs): from neo4j import RoutingControl """Execute gremlin or cypher querying scripts. <Gremlin> Args: query (str): Scripts that written in gremlin query language. request_options (dict, optional): Gremlin request options. format: { "engine": "gae" } Returns: :class:`gremlin_python.driver.client.ResultSet`: <Cypher> Args: query (str): Scripts that written in cypher query language. kwargs (dict, optional): Cypher request options. e.g.: routing_ = RoutingControl.READ Returns: :class:``: """ if lang == "gremlin": return self.gremlin_client.submit(query, request_options=request_options) elif lang == "cypher": return self.cypher_driver.execute_query( query, routing_=RoutingControl.READ, **kwargs ) else: raise ValueError( f"Unsupported query language: {lang} other than gremlin and cypher" )
@property def gremlin_client(self): if self._gremlin_client is None: self._gremlin_client = Client(self._gremlin_url[0], "g") return self._gremlin_client @property def cypher_driver(self): from neo4j import GraphDatabase if self._cypher_driver is None: self._cypher_driver = GraphDatabase.driver( self._cypher_url[0], auth=("", "") ) return self._cypher_driver
[docs] def subgraph(self, gremlin_script, lang="gremlin", request_options=None): """We currently only support subgraph using gremlin script. Create a subgraph, which input is the executor result of `gremlin_script`. Any gremlin script that output a set of edges can be used to construct a subgraph. Args: gremlin_script (str): Gremlin script to be executed. request_options (dict, optional): Gremlin request options. format: { "engine": "gae" } Returns: :class:`graphscope.framework.graph.GraphDAGNode`: A new graph constructed by the gremlin output, that also stored in vineyard. """ assert lang == "gremlin", "Only support gremlin script" # avoid circular import from graphscope.framework.graph import GraphDAGNode op = gremlin_to_subgraph( self, gremlin_script=gremlin_script, request_options=request_options, oid_type=self._graph._oid_type, ) return self._session._wrapper(GraphDAGNode(self._session, op))
[docs] def traversal_source(self): """We currently only support traversal_source using gremlin. Create a GraphTraversalSource and return. Once `g` has been created using a connection, we can start to write Gremlin traversals to query the remote graph. Raises: RuntimeError: If the interactive script is not running. Examples: .. code:: python sess = graphscope.session() graph = load_modern_graph(sess, modern_graph_data_dir) interactive = sess.gremlin(graph) g = interactive.traversal_source() print(g.V().both()[1:3].toList()) print(g.V().both().name.toList()) Returns: `GraphTraversalSource` """ if self._conn is None: self._conn = DriverRemoteConnection(self._gremlin_url[0], "g") return traversal().withRemote(self._conn)
[docs] def close(self): if self.closed or self._session.closed: return """Close interactive instance and release resources""" if self._conn is not None: try: self._conn.close() except: # noqa: E722 pass if self._gremlin_client is not None: try: self._gremlin_client.close() except: # noqa: E722 pass if self._cypher_driver is not None: try: self._cypher_driver.close() except: # noqa: E722 pass self._session._close_interactive_instance(self) self.closed = True