Source code for graphscope.interactive.query

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
from copy import deepcopy
from enum import Enum

from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal

from graphscope.framework.dag import DAGNode
from graphscope.framework.dag_utils import close_interactive_query
from graphscope.framework.dag_utils import create_interactive_query
from graphscope.framework.dag_utils import fetch_gremlin_result
from graphscope.framework.dag_utils import gremlin_query
from graphscope.framework.dag_utils import gremlin_to_subgraph

logger = logging.getLogger("graphscope")


class InteractiveQueryStatus(Enum):
    """A enumeration class of current status of InteractiveQuery"""

    Initializing = 0
    Running = 1
    Failed = 2
    Closed = 3


class ResultSetDAGNode(DAGNode):
    """A class represents a result set node in a DAG.

    This is a wrapper for :class:`gremlin_python.driver.resultset.ResultSet`,
    and you can get the result by :method:`one()` or :method:`all()`.
    """

    def __init__(self, dag_node, op):
        self._session = dag_node.session
        self._op = op
        # add op to dag
        self._session.dag.add_op(self._op)

    def one(self):
        """See details in :method:`gremlin_python.driver.resultset.ResultSet.one`"""
        # avoid circular import
        from graphscope.framework.context import ResultDAGNode

        op = fetch_gremlin_result(self, "one")
        return ResultDAGNode(self, op)

    def all(self):
        """See details in :method:`gremlin_python.driver.resultset.ResultSet.all`

        Note that this method is equal to `ResultSet.all().result()`
        """
        # avoid circular import
        from graphscope.framework.context import ResultDAGNode

        op = fetch_gremlin_result(self, "all")
        return ResultDAGNode(self, op)


class ResultSet(object):
    def __init__(self, result_set_node):
        self._result_set_node = result_set_node
        self._session = self._result_set_node.session
        # copy and set op evaluated
        self._result_set_node.op = deepcopy(self._result_set_node.op)
        self._result_set_node.evaluated = True
        self._session.dag.add_op(self._result_set_node.op)

    def one(self):
        return self._session._wrapper(self._result_set_node.one())

    def all(self):
        return self._session._wrapper(self._result_set_node.all())


class InteractiveQueryDAGNode(DAGNode):
    """A class represents an interactive query node in a DAG.

    The following example demonstrates its usage:

    .. code:: python

        >>> # lazy node
        >>> import graphscope as gs
        >>> sess = gs.session(mode="lazy")
        >>> g = sess.g() # <graphscope.framework.graph.GraphDAGNode object>
        >>> ineractive = sess.gremlin(g)
        >>> print(ineractive) # <graphscope.interactive.query.InteractiveQueryDAGNode object>
        >>> rs = ineractive.execute("g.V()")
        >>> print(rs) # <graphscope.ineractive.query.ResultSetDAGNode object>
        >>> r = rs.one()
        >>> print(r) # <graphscope.framework.context.ResultDAGNode>
        >>> print(sess.run(r))
        [2]
        >>> subgraph = ineractive.subgraph("xxx")
        >>> print(subgraph) # <graphscope.framework.graph.GraphDAGNode object>
        >>> g2 = sess.run(subgraph)
        >>> print(g2) # <graphscope.framework.graph.Graph object>
    """

    def __init__(self, session, graph, engine_params=None):
        """
        Args:
            session (:class:`Session`): instance of GraphScope session.
            graph (:class:`graphscope.framework.graph.GraphDAGNode`):
                A graph instance that the gremlin query on.
            engine_params (dict, optional):
                Configuration to startup the interactive engine. See detail in:
                `interactive_engine/deploy/docker/dockerfile/executor.vineyard.properties`
        """
        self._session = session
        self._graph = graph
        self._engine_params = engine_params
        self._op = create_interactive_query(self._graph, self._engine_params)
        # add op to dag
        self._session.dag.add_op(self._op)

    def execute(self, query, request_options=None):
        """Execute gremlin querying scripts.

        Args:
            query (str): Scripts that written in gremlin quering language.
            request_options (dict, optional): Gremlin request options. format:
            {
                "engine": "gae"
            }

        Returns:
            :class:`graphscope.framework.context.ResultDAGNode`:
                A result holds the gremlin result, evaluated in eager mode.
        """
        op = gremlin_query(self, query, request_options)
        return ResultSetDAGNode(self, op)

    def subgraph(self, gremlin_script, request_options=None):
        """Create a subgraph, which input is the result of the execution of `gremlin_script`.

        Any gremlin script that output a set of edges can be used to contruct a subgraph.

        Args:
            gremlin_script (str): Gremlin script to be executed.
            request_options (dict, optional): Gremlin request options. format:
            {
                "engine": "gae"
            }

        Returns:
            :class:`graphscope.framework.graph.GraphDAGNode`:
                A new graph constructed by the gremlin output, that also stored in vineyard.
        """
        # avoid circular import
        from graphscope.framework.graph import GraphDAGNode

        op = gremlin_to_subgraph(
            self,
            gremlin_script=gremlin_script,
            request_options=request_options,
            oid_type=self._graph._oid_type,
        )
        return GraphDAGNode(self._session, op)

    def close(self):
        """Close interactive engine and release the resources.

        Returns:
            :class:`graphscope.interactive.query.ClosedInteractiveQuery`
                Evaluated in eager mode.
        """
        op = close_interactive_query(self)
        return ClosedInteractiveQuery(self._session, op)


[docs]class InteractiveQuery(object): """`InteractiveQuery` class, is a simple wrapper around `Gremlin-Python <https://pypi.org/project/gremlinpython/>`_, which implements Gremlin within the Python language. It also can expose gremlin endpoint which can be used by any other standard gremlin console, with the method `graph_url()`. It also has a method called `subgraph` which can extract some fragments from origin graph, produce a new, smaller but concise graph stored in vineyard, which lifetime is independent from the origin graph. User can either use `execute()` to submit a script, or use `traversal_source()` to get a `GraphTraversalSource` for further traversal. """
[docs] def __init__( self, interactive_query_node=None, frontend_endpoint=None, object_id=None ): """Construct a :class:`InteractiveQuery` object.""" self._status = InteractiveQueryStatus.Initializing self._graph_url = None # graph object id stored in vineyard self._object_id = object_id # interactive_query_node is None used for create a interative query # implicitly in eager mode if interactive_query_node is not None: self._interactive_query_node = interactive_query_node self._session = self._interactive_query_node.session # copy and set op evaluated self._interactive_query_node.op = deepcopy(self._interactive_query_node.op) self._interactive_query_node.evaluated = True self._session.dag.add_op(self._interactive_query_node.op) if frontend_endpoint is not None: frontend_endpoint = frontend_endpoint.split(",") self._graph_url = [ f"ws://{endpoint}/gremlin" for endpoint in frontend_endpoint ]
@property def graph_url(self): """The gremlin graph url can be used with any standard gremlin console, e.g., tinkerpop.""" return self._graph_url @property def status(self): return self._status @property def object_id(self): return self._object_id @status.setter def status(self, value): self._status = value @property def error_msg(self): return self._error_msg @error_msg.setter def error_msg(self, error_msg): self._error_msg = error_msg
[docs] def closed(self): """Return if the current instance is closed.""" return self._status == InteractiveQueryStatus.Closed
[docs] def subgraph(self, gremlin_script, request_options=None): if self._status != InteractiveQueryStatus.Running: raise RuntimeError( "Interactive query is unavailable with %s status.", str(self._status) ) return self._session._wrapper( self._interactive_query_node.subgraph(gremlin_script, request_options) )
[docs] def execute(self, query, request_options=None): if self._status != InteractiveQueryStatus.Running: raise RuntimeError( "Interactive query is unavailable with %s status.", str(self._status) ) return self._session._wrapper( self._interactive_query_node.execute(query, request_options) )
[docs] def traversal_source(self): """Create a GraphTraversalSource and return. Once `g` has been created using a connection, we can start to write Gremlin traversals to query the remote graph. Raises: RuntimeError: If the interactive script is not running. Examples: .. code:: python sess = graphscope.session() graph = load_modern_graph(sess, modern_graph_data_dir) interactive = sess.gremlin(graph) g = interactive.traversal_source() print(g.V().both()[1:3].toList()) print(g.V().both().name.toList()) Returns: `GraphTraversalSource` """ if self._status != InteractiveQueryStatus.Running: raise RuntimeError( "Interactive query is unavailable with %s status.", str(self._status) ) ret = traversal().withRemote(DriverRemoteConnection(self._graph_url[0], "g")) return ret
[docs] def close(self): """Close interactive instance and release resources""" if not self.closed() and not self._session.closed: self._session._wrapper(self._interactive_query_node.close()) self._session._close_interactive_instance(self) self._status = InteractiveQueryStatus.Closed
class ClosedInteractiveQuery(DAGNode): """Closed interactive query node in a DAG.""" def __init__(self, session, op): self._session = session self._op = op # add op to dag self._session.dag.add_op(self._op)