Source code for graphscope.interactive.query

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import logging
import random
from concurrent.futures import ThreadPoolExecutor
from enum import Enum

from gremlin_python.driver.client import Client
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal

from graphscope.framework.loader import Loader

logger = logging.getLogger("graphscope")

class InteractiveQueryStatus(Enum):
    """A enumeration class of current status of InteractiveQuery"""

    Initializing = 0
    Running = 1
    Failed = 2
    Closed = 3

[docs]class InteractiveQuery(object): """`InteractiveQuery` class, is a simple wrapper around `Gremlin-Python <>`_, which implements Gremlin within the Python language. It also can expose gremlin endpoint which can be used by any other standard gremlin console, with the method `graph_url()`. It also has a method called `subgraph` which can extract some fragments from origin graph, produce a new, smaller but concise graph stored in vineyard, which lifetime is independent from the origin graph. User can either use `execute()` to submit a script, or use `traversal_source()` to get a `GraphTraversalSource` for further traversal. """
[docs] def __init__(self, session, object_id, front_ip=None, front_port=None): self._status = InteractiveQueryStatus.Initializing self._session = session self._object_id = object_id self._error_msg = "" if front_ip is not None and front_port is not None: self._graph_url = "ws://%s:%d/gremlin" % (front_ip, front_port) self._client = Client(self._graph_url, "g") else: self._graph_url = None self._client = None
def __repr__(self): return f"graphscope.InteractiveQuery <{self._graph_url}>" @property def object_id(self): """Get the vineyard object id of graph. Returns: str: object id """ return self._object_id @property def graph_url(self): """The gremlin graph url can be used with any standard gremlin console, e.g., thinkerpop.""" return self._graph_url @property def status(self): return self._status @status.setter def status(self, value): self._status = value @property def error_msg(self): return self._error_msg @error_msg.setter def error_msg(self, error_msg): self._error_msg = error_msg
[docs] def set_frontend(self, front_ip, front_port): self._graph_url = "ws://%s:%d/gremlin" % (front_ip, front_port) self._client = Client(self._graph_url, "g")
[docs] def closed(self): """Return if the current instance is closed.""" return self._status == InteractiveQueryStatus.Closed
[docs] def subgraph(self, gremlin_script): """Create a subgraph, which input is the result of the execution of `gremlin_script`. Any gremlin script that will output a set of edges can be used to contruct a subgraph. Args: gremlin_script (str): gremlin script to be executed Raises: RuntimeError: If the interactive instance is not running. Returns: :class:`Graph`: constructed subgraph. which is also stored in vineyard. """ if self._status != InteractiveQueryStatus.Running: raise RuntimeError( "Interactive query is unavailable with %s status.", str(self._status) ) now_time ="%Y%m%d%H%M%S") random_num = random.randint(0, 10000000) graph_name = "%s_%s" % (str(now_time), str(random_num)) # create graph handle by name self._client.submit( "g.createGraph('%s').with('graphType', 'vineyard')" % graph_name ).all().result() # start a thread to launch the graph def load_subgraph(name): import vineyard # invoke load_from g = self._session.load_from( edges=[Loader(vineyard.ObjectName("__%s_edge_stream" % name))], vertices=[Loader(vineyard.ObjectName("__%s_vertex_stream" % name))], generate_eid=False, )"subgraph has been loaded") return g pool = ThreadPoolExecutor() subgraph_task = pool.submit(load_subgraph, (graph_name,)) # add subgraph vertices and edges subgraph_script = "%s.subgraph('%s').outputVineyard('%s')" % ( gremlin_script, graph_name, graph_name, ) self._client.submit(subgraph_script).all().result() return subgraph_task.result()
[docs] def execute(self, query): """Execute gremlin querying scripts. Behind the scene, it uses `gremlinpython` to send the query. Args: query (str): Scripts that written in gremlin quering language. Raises: RuntimeError: If the interactive script is not running. Returns: execution results """ if self._status != InteractiveQueryStatus.Running: raise RuntimeError( "Interactive query is unavailable with %s status.", str(self._status) ) return self._client.submit(query)
[docs] def traversal_source(self): """Create a GraphTraversalSource and return. Once `g` has been created using a connection, we can start to write Gremlin traversals to query the remote graph. Raises: RuntimeError: If the interactive script is not running. Examples: .. code:: python sess = graphscope.session() graph = load_modern_graph(sess, modern_graph_data_dir) interactive = sess.gremlin(graph) g = interactive.traversal_source() print(g.V().both()[1:3].toList()) print(g.V().both().name.toList()) Returns: `GraphTraversalSource` """ if self._status != InteractiveQueryStatus.Running: raise RuntimeError( "Interactive query is unavailable with %s status.", str(self._status) ) return traversal().withRemote(DriverRemoteConnection(self._graph_url, "g"))
[docs] def close(self): """Close interactive instance and release resources""" if not self.closed(): self._session._close_interactive_instance(self) self._status = InteractiveQueryStatus.Closed