Source code for graphscope.framework.app

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import functools
import hashlib
import json
import logging
import os
import zipfile
from copy import deepcopy
from io import BytesIO

import yaml

from graphscope.framework.context import create_context_node
from graphscope.framework.dag import DAGNode
from graphscope.framework.dag_utils import bind_app
from graphscope.framework.dag_utils import create_app
from graphscope.framework.dag_utils import unload_app
from graphscope.framework.errors import InvalidArgumentError
from graphscope.framework.errors import check_argument
from graphscope.framework.utils import graph_type_to_cpp_class
from graphscope.proto import graph_def_pb2

logger = logging.getLogger("graphscope")

DEFAULT_GS_CONFIG_FILE = ".gs_conf.yaml"


def project_to_simple(func):
    """Decorator to project a property graph to the simple graph.

    Default to uses `weight` as edge data key to correspond to the edge weight,
    and uses `attribute` as node data key to correspond to the node attribute.

    Examples:
        >>> @project_to_simple
        >>> def sssp(G, src, weight="dist")
        >>>     pass
    """

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        graph = args[0]
        if not hasattr(graph, "graph_type"):
            raise InvalidArgumentError("Missing graph_type attribute in graph object.")
        if graph.graph_type == graph_def_pb2.ARROW_PROPERTY:
            if "weight" in kwargs:
                # func has 'weight' argument
                weight = kwargs.get("weight", None)
                projected = graph._project_to_simple(e_prop=weight)
            elif "attribute" in kwargs:
                # func has 'attribute' argument
                attribute = kwargs.get("attribute", None)
                projected = graph._project_to_simple(v_prop=attribute)
            else:
                projected = graph._project_to_simple()
            projected._base_graph = graph
        else:
            projected = graph
        return func(projected, *args[1:], **kwargs)

    return wrapper


def not_compatible_for(*graph_types):
    """Decorator to mark builtin algorithms as not compatible with graph.

    Args:
        graph_types: list of string
            Entries must be one of 'arrow_property', 'dynamic_property',
                'arrow_projected', 'dynamic_projected', 'directed', 'undirected'

    Returns:
        The decorated function.

    Raises:
        RuntimeError: If graph is not compatible.
        KeyError: If parameter is not correctly.

    Notes:
        Multiple types or use multiple @not_compatible_for() lines
        are joined logically with "or".

    Examples:
        >>> @not_compatible_for('arrow_property', 'dynamic_property')
        >>> def sssp(G, src, weight="dist"):
        >>>     pass
    """

    def _not_compatible_for(not_compatible_for_func, *args, **kwargs):
        @functools.wraps(not_compatible_for_func)
        def wrapper(*args, **kwargs):
            graph = args[0]
            if not hasattr(graph, "graph_type"):
                raise InvalidArgumentError(
                    "Missing graph_type attribute in graph object."
                )

            terms = {
                "arrow_property": graph.graph_type == graph_def_pb2.ARROW_PROPERTY,
                "dynamic_property": graph.graph_type == graph_def_pb2.DYNAMIC_PROPERTY,
                "arrow_projected": graph.graph_type == graph_def_pb2.ARROW_PROJECTED,
                "dynamic_projected": graph.graph_type
                == graph_def_pb2.DYNAMIC_PROJECTED,
                "arrow_flattened": graph.graph_type == graph_def_pb2.ARROW_FLATTENED,
                "directed": graph.is_directed(),
                "undirected": not graph.is_directed(),
            }
            matched, tag = False, ""
            try:
                for t in graph_types:
                    if terms[t]:
                        matched, tag = True, t
                        break
            except KeyError:
                raise InvalidArgumentError(
                    "Use one or more of arrow_property,dynamic_property,"
                    "arrow_projected,dynamic_projected,arrow_flattened,directed,undirected",
                )
            if matched:
                raise InvalidArgumentError(
                    "Algorithm '%s' isn't compatible for '%s' graphs"
                    % (not_compatible_for_func.__name__, tag)
                )
            else:
                return not_compatible_for_func(*args, **kwargs)

        return wrapper

    return _not_compatible_for


[docs]class AppAssets(DAGNode): """A class represents an app asset node in a DAG that holds the bytes of the gar resource. Assets includes an algorithm name, and gar (for user defined algorithm), a context type (one of 'tensor', 'vertex_data', 'vertex_property', 'labeled_vertex_data', 'dynamic_vertex_data', 'labeled_vertex_property'), and its type (one of `cpp_pie`, `cpp_pregel`, `cython_pie`, `cython_pregel`), The instance of this class can be passed to init :class:`graphscope.framework.app.AppDAGNode` """ _support_context_type = [ "tensor", "vertex_data", "vertex_property", "labeled_vertex_data", "dynamic_vertex_data", "labeled_vertex_property", ]
[docs] def __init__(self, algo=None, context=None, gar=None, cmake_extra_options=None): """Init assets of the algorithm. Args: algo (str): Represent specific algo inside resource. context (str): Type of context that hold the calculation results. It will get from gar if param is None. Defaults to None. gar (bytes or BytesIO, optional): The bytes that encodes the application's source code. Defaults to None. """ self._algo = algo self._context_type = context if isinstance(self._algo, str) and ( "giraph:" in self._algo or "java_pie:" in self._algo ): self._type = "java_pie" else: self._type = "cpp_pie" # default is builtin app with `built_in` type self._meta = {} # used for gar resource if gar is not None and isinstance(gar, (BytesIO, bytes)): self._gar = gar if isinstance(gar, bytes) else gar.getvalue() self._extract_meta_info() else: # built_in apps has no gar resource. self._gar = None self._cmake_extra_options = cmake_extra_options if self._context_type not in self._support_context_type: raise InvalidArgumentError( "Unsupport context type: {0}".format(self._context_type) ) self._op = create_app(self)
def __repr__(self) -> str: return f"graphscope.framework.app.AppAssets <type: {self._type}, algo: {self._algo}, context: {self._context_type}>" def _extract_meta_info(self): """Extract app meta info from gar resource. Raises: InvalidArgumentError: - :code:`gs_conf.yaml` not exist in gar resource. - App not found in gar resource. """ fp = BytesIO(self._gar) archive = zipfile.ZipFile(fp, "r") config = yaml.safe_load(archive.read(DEFAULT_GS_CONFIG_FILE)) # default app will used if there is only one app in it if self._algo is None and len(config["app"]) == 1: self._algo = config["app"][0]["algo"] logger.info("Default app %s will be used.", self._algo) for meta in config["app"]: if self._algo == meta["algo"]: if "context_type" in meta: self._context_type = meta["context_type"] self._type = meta["type"] self._meta = meta return raise InvalidArgumentError("App not found in gar: {}".format(self._algo)) @property def algo(self): """Algorithm name, e.g. sssp, pagerank. Returns: str: Algorithm name of this asset. """ return self._algo @property def context_type(self): """Context type, e.g. vertex_property, labeled_vertex_data. Returns: str: Type of the app context. """ return self._context_type @property def type(self): """Algorithm type, one of `cpp_pie`, `cpp_pregel`, `cython_pie`, `java_pie` or `cython_pregel`. Returns: str: Algorithm type of this asset. """ return self._type @property def gar(self): """Gar resource. Returns: bytes: gar resource of this asset. """ return self._gar @classmethod def to_gar(cls, path): if os.path.exists(path): raise RuntimeError("Path exist: {}.".format(path)) with open(path, "wb") as f: f.write(cls._gar) @classmethod def bytes(cls): return cls._gar @property def cmake_extra_options(self): return self._cmake_extra_options @property def signature(self): """Generate a signature of the app assets by its algo name (and gar resources). Used to uniquely identify a app assets. Returns: str: signature of this assets """ s = hashlib.sha256() s.update(self._algo.encode("utf-8", errors="ignore")) if self._gar: s.update(self._gar) return s.hexdigest()
[docs] def is_compatible(self, graph): """Determine if this algorithm can run on this type of graph. Args: graph (:class:`GraphDAGNode`): A graph instance. Raises: InvalidArgumentError: - App is not compatible with graph ScannerError: - Yaml file format is incorrect. """ # builtin app if self._gar is None: return # check yaml file graph_type = graph_type_to_cpp_class(graph.graph_type) if graph_type not in self._meta["compatible_graph"]: raise InvalidArgumentError( "App is uncompatible with graph {}".format(graph_type) ) return True
def __call__(self, graph, *args, **kwargs): """Instantiate an App and do queries over it.""" app_ = graph.session._wrapper(AppDAGNode(graph, self)) return app_(*args, **kwargs)
[docs]class AppDAGNode(DAGNode): """A class represents a app node in a DAG. In GraphScope, an app node binding a concrete graph node that query executed on. """ def __init__(self, graph, app_assets: AppAssets): """Create an application using given :code:`gar` file, or given application class name. Args: graph (:class:`GraphDAGNode`): A :class:`GraphDAGNode` instance. app_assets: A :class:`AppAssets` instance. """ self._graph = graph self._app_assets = app_assets self._session = graph.session self._app_assets.is_compatible(self._graph) self._op = bind_app(graph, self._app_assets) # add app_assets op to dag is not exist if not self._session.dag.exists(self._app_assets.op): self._session.dag.add_op(self._app_assets.op) # add op to dag self._session.dag.add_op(self._op) # statically create the unload op to prevent a possible segmentation fault # inside the protobuf library. self._unload_op = unload_app(self) def __repr__(self): s = f"graphscope.App <type: {self._app_assets.type}, algorithm: {self._app_assets.algo} " s += f"bounded_graph: {str(self._graph)}>" return s @property def algo(self): """Algorithm name, e.g. sssp, pagerank. Returns: str: Algorithm name of this asset. """ return self._app_assets.algo @property def gar(self): """Gar resource. Returns: bytes: gar resource of this asset. """ return self._app_assets.gar def __call__(self, *args, **kwargs): """When called, check arguments based on app type, Then do build and query. Raises: InvalidArgumentError: If app_type is None, or positional argument found when app_type not `cpp_pie`. Returns: :class:`Context`: Query context, include running results of the app. """ app_type = self._app_assets.type check_argument(app_type is not None) context_type = self._app_assets.context_type if not isinstance(self._graph, DAGNode) and not self._graph.loaded(): raise RuntimeError("The graph is not loaded") if self._app_assets.type in [ "cpp_pregel", "cython_pie", "cython_pregel", "java_pie", ]: # cython app support kwargs only check_argument( not args, "Only support using keyword arguments in cython app." ) return create_context_node( context_type, self, self._graph, json.dumps(kwargs) ) return create_context_node(context_type, self, self._graph, *args, **kwargs) def __del__(self): try: self.session.run(self._unload()) except Exception: # pylint: disable=broad-except pass def _unload(self): """Unload this app from graphscope engine. Returns: :class:`graphscope.framework.app.UnloadedApp`: Evaluated in eager mode. """ return UnloadedApp(self._session, self._unload_op)
[docs]class App(object): """An application that can run on graphs and produce results. Analytical engine will build the app dynamic library when instantiate a app instance. And the dynamic library will be reused if subsequent app's signature matches one of previous ones. """
[docs] def __init__(self, app_node, key): self._app_node = app_node self._session = self._app_node.session self._key = key # copy and set op evaluated self._app_node.op = deepcopy(self._app_node.op) self._app_node.evaluated = True self._app_node._unload_op = unload_app(self._app_node) self._session.dag.add_op(self._app_node.op) self._saved_signature = self.signature
def __getattr__(self, name): if hasattr(self._app_node, name): return getattr(self._app_node, name) raise AttributeError("{0} not found.".format(name)) @property def key(self): """A unique identifier of App.""" return self._key @property def signature(self): """Signature is computed by all critical components of the App.""" return hashlib.sha256( "{}.{}".format(self._app_assets.signature, self._graph.template_str).encode( "utf-8", errors="ignore" ) ).hexdigest() def _unload(self): return self._session._wrapper(self._app_node._unload())
[docs] def __del__(self): """Unload app. Both on engine side and python side. Set the key to None.""" try: self.session.run(self._unload()) except Exception: # pylint: disable=broad-except pass
def __call__(self, *args, **kwargs): return self._session._wrapper(self._app_node(*args, **kwargs))
class UnloadedApp(DAGNode): """Unloaded app node in a DAG.""" def __init__(self, session, op): self._session = session self._op = op # add op to dag self._session.dag.add_op(self._op)
[docs]def load_app(gar=None, algo=None, context=None, **kwargs): """Load an app from gar. Args: algo: str Algo name inside resource. None will extract name from gar resource if there is only one app in it. gar: bytes or BytesIO or str str represent the path of resource, bytes or the resource of the specified path or bytes. For java apps, gar can be none to indicate we should find the app in previouse added libs. Returns: Instance of <graphscope.framework.app.AppAssets> Raises: FileNotFoundError: File not exist. PermissionError: Permission denied of path. TypeError: File is not a zip file. Examples: >>> sssp = load_app(gar='./resource.gar', algo='sssp') >>> sssp(src=4) which will have following `.gs_conf.yaml` in resource.gar: app: - algo: sssp type: cpp_pie class_name: grape:SSSP context_type: vertex_data src: sssp/sssp.h compatible_graph: - gs::ArrowProjectedFragment """ if isinstance(gar, (BytesIO, bytes)): return AppAssets(algo, context, gar, **kwargs) elif isinstance(gar, str): with open(gar, "rb") as f: content = f.read() if not zipfile.is_zipfile(gar): raise InvalidArgumentError("{} is not a zip file.".format(gar)) return AppAssets(algo, context, content, **kwargs) elif isinstance(algo, str) and ( algo.startswith("giraph:") or algo.startswith("java_pie:") ): if gar is not None: raise InvalidArgumentError("Running java app expect no gar resource") return AppAssets(algo, "vertex_data", None, **kwargs) else: raise InvalidArgumentError("Wrong type with {}".format(gar))