#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import collections
import json
from copy import deepcopy
try:
from graphlearn import Graph as GLGraph
except ImportError:
GLGraph = object
from graphscope.framework.dag import DAGNode
from graphscope.framework.dag_utils import close_learning_instance
from graphscope.framework.dag_utils import create_learning_instance
from graphscope.framework.errors import InvalidArgumentError
from graphscope.framework.errors import check_argument
class GraphDAGNode(DAGNode):
"""A class represents a learning instance in a DAG.
The following example demonstrates its usage:
.. code:: python
>>> # lazy mode
>>> import graphscope as gs
>>> sess = gs.session(mode="lazy")
>>> g = sess.g() # <graphscope.framework.graph.GraphDAGNode object>
>>> lg = sess.learning(g)
>>> print(lg) # <graphscope.learning.graph.GraphDAGNode object>
>>> lg_graph = sess.run(lg)
>>> print(lg) # <graphscope.learning.grapg.Graph object>
"""
def __init__(self, session, graph, nodes=None, edges=None, gen_labels=None):
"""
See params detail in :meth:`graphscope.Session.graphlearn`
"""
self._session = session
self._graph = graph
self._op = create_learning_instance(self._graph, nodes, edges, gen_labels)
# add op to dag
self._session.dag.add_op(self._op)
def close(self):
"""Close learning instance and release the resources.
Returns:
:class:`graphscope.learning.graph.ClosedLearningInstance`
"""
op = close_learning_instance(self)
return ClosedLearningInstance(self._session, op)
[docs]class Graph(GLGraph):
[docs] def __init__(self, graph_node, handle, config=None, object_id=None):
"""Initialize a graph for the learning engine using a handle."""
self.graph_node = graph_node
self.graphscope_session = self.graph_node.session
# copy and set op evaluated
self.graph_node.op = deepcopy(self.graph_node.op)
self.graph_node.evaluated = True
self.graphscope_session.dag.add_op(self.graph_node.op)
handle = self.decode_arg(handle)
config = self.decode_arg(config)
if config is None:
if "config" in handle:
config = handle["config"]
if config is None:
config = collections.defaultdict(lambda: dict)
if object_id is None:
object_id = handle["vineyard_id"]
self.handle = handle
self.config = config
self.object_id = object_id
self.closed = False
super(Graph, self).__init__()
self.vineyard(handle, config["nodes"], config["edges"])
for label, node_attr in config["node_attributes"].items():
n_ints, n_floats, n_strings = (
node_attr[1][0],
node_attr[1][1],
node_attr[1][2],
)
self.node_attributes(label, node_attr[0], n_ints, n_floats, n_strings)
for label, edge_attr in config["edge_attributes"].items():
n_ints, n_floats, n_strings = (
edge_attr[1][0],
edge_attr[1][1],
edge_attr[1][2],
)
self.edge_attributes(label, edge_attr[0], n_ints, n_floats, n_strings)
for node_view_label, node_label, nsplit, split_range in config["gen_labels"]:
self.node_view(
node_view_label, node_label, nsplit=nsplit, split_range=split_range
)
# server_own=False: make sure the glog inside graph-learn get initialized
self.init_vineyard(worker_index=0, worker_count=1, server_own=False)
def decode_arg(self, arg):
if arg is None or isinstance(arg, dict):
return arg
return json.loads(base64.b64decode(arg.encode("utf-8")).decode("utf-8"))
def close(self):
if not self.closed and not self.graphscope_session.closed:
self.closed = True
super(Graph, self).close() # close client first
# close server instance
if self.graphscope_session is not None:
self.graphscope_session._wrapper(self.graph_node.close())
self.graphscope_session._close_learning_instance(self)
@staticmethod # noqa: C901
def preprocess_args(handle, nodes, edges, gen_labels): # noqa: C901
handle = json.loads(base64.b64decode(handle).decode("utf-8", errors="ignore"))
node_names = []
node_attributes = {}
edge_names = []
edge_attributes = {}
def selected_property_schema(attr_types, attributes):
prop_counts = collections.defaultdict(lambda: 0)
for attr in attributes:
prop_counts[attr_types[attr]] += 1
return [prop_counts["i"], prop_counts["f"], prop_counts["s"]]
if nodes is not None:
for node in nodes:
if isinstance(node, str):
if node in node_names:
raise InvalidArgumentError("Duplicate node type: %s" % node)
node_names.append(node)
elif isinstance(node, tuple):
if node[0] in node_names:
raise InvalidArgumentError("Duplicate node type: %s" % node[0])
node_names.append(node[0])
attr_types = handle["node_attribute_types"][node[0]]
attr_schema = selected_property_schema(attr_types, node[1])
node_attributes[node[0]] = (node[1], attr_schema)
else:
raise InvalidArgumentError(
"The node parameter is in bad format: %s" % node
)
else:
for node in handle["node_schema"]:
node_names.append(node.split(":")[0])
if edges is not None:
for edge in edges:
if isinstance(edge, str):
if len(node_names) > 1:
raise InvalidArgumentError(
"Cannot inference edge type when multiple kinds of nodes exists"
)
edge_names.append((node_names[0], edge, node_names[0]))
elif (
isinstance(edge, tuple)
and isinstance(edge[0], str)
and isinstance(edge[1], str)
):
edge_names.append(edge)
elif (
isinstance(edge, tuple)
and isinstance(edge[0], str)
and isinstance(edge[1], list)
):
if len(node_names) > 1:
raise InvalidArgumentError(
"Cannot inference edge type when multiple kinds of nodes exists"
)
edge_names.append((node_names[0], edge[0], node_names[0]))
attr_types = handle["edge_attribute_types"][edge[0]]
attr_schema = selected_property_schema(attr_types, edge[1])
edge_attributes[edge[0]] = (edge[1], attr_schema)
elif (
isinstance(edge, tuple)
and isinstance(edge[0], (list, tuple))
and isinstance(edge[1], list)
):
edge_names.append(edge[0])
attr_types = handle["edge_attribute_types"][edge[0][1]]
attr_schema = selected_property_schema(attr_types, edge[1])
edge_attributes[edge[0][1]] = (edge[1], attr_schema)
else:
raise InvalidArgumentError(
"The edge parameter is in bad format: %s" % edge
)
split_groups = collections.defaultdict(list)
if gen_labels is not None:
for label in gen_labels:
if len(label) == 3 or len(label) == 4:
split_groups[label[1]].append(label)
else:
raise InvalidArgumentError(
"Bad gen_labels arguments: %s" % gen_labels
)
split_labels = []
for label, group in split_groups.items():
lengths = [len(split) for split in group]
check_argument(
lengths[:-1] == lengths[1:], "Invalid gen labels: %s" % group
)
if len(group[0]) == 3:
length_sum = sum(split[2] for split in group)
s, ss = 0, []
for split in group:
ss.append((s, s + split[2]))
s += split[2]
group = [
(split[0], split[1], length_sum, s) for split, s in zip(group, ss)
]
for split in group:
split_labels.append(split)
return {
"nodes": node_names if node_names else None,
"edges": edge_names if edge_names else None,
"node_attributes": node_attributes,
"edge_attributes": edge_attributes,
"gen_labels": split_labels,
}
[docs] def get_handle(self, worker_count=1):
"""Return a base64-encoded handle for distributed training."""
handle_copy = self.handle.copy()
handle_copy["config"] = self.config
handle_copy["client_count"] = worker_count
return base64.b64encode(json.dumps(handle_copy).encode("utf-8")).decode("utf-8")
[docs] def V(self, t, feed=None):
"""Entry of Gremlin-like query. Start from node.
Args:
t (string): The type of node which is the entry of query or the type
of edge when node is from edge source or dst.
feed (None| numpy.ndarray | types.GeneratorType | `Nodes`): When `feed`
is not `None`, the `type` should be a node type, which means query the
attributes of the specified node ids.
None: Default. Sample nodes with the following .shuffle and .batch API.
numpy.ndarray: Any shape of ids. Get nodes of the given ids and
node_type.
types.Generator: A generator of numpy.ndarray. Get nodes of generated
ids and given node_type.
`Nodes`: A `Nodes` object.
Return:
A 'Query' object.
Example:
.. code:: python
>>> import numpy as np
>>> g.V("user").shuffle().batch(64)
>>> g.V("user", feed=np.array([1, 2, 3]))
>>> def gen():
>>> while True:
>>> yield np.array([1, 2, 3])
>>> gen = gen()
>>> g.V("user", feed=gen)
"""
return super(Graph, self).V(t, feed)
[docs] def E(self, edge_type, feed=None, reverse=False):
"""Entry of Gremlin-like query. Start from edge.
Args:
edge_type (string): The type of edge which is the entry of query.
feed (None| (np.ndarray, np.ndarray) | types.GeneratorType | `Edges`):
None: Default. Sample edges with the following .shuffle and .batch API.
(np.ndarray, np.ndarray): src_ids, dst_ids. Get edges of the given
(src_ids, dst_ids) and given edge_type. src_ids and dst_ids must be
the same shape, dtype is int.
types.Generator: A generator of (numpy.ndarray, numpy.ndarray). Get
edges of generated (src_ids, dst_ids) and given edge_type.
`Edges`: An `Edges` object.
Return:
A 'Query' object.
Example:
.. code:: python
>>> import numpy as np
>>> g.E("buy").shuffle().batch(64)
>>> g.E("buy", feed=(np.array([1, 2, 3]), np.array([4, 5, 6]))
>>> def gen():
>>> while True:
>>> yield (np.array([1, 2, 3]), np.array([4, 5, 6]))
>>> gen = gen()
>>> g.E("buy", feed=gen)
"""
return super(Graph, self).E(edge_type, feed, reverse)
class ClosedLearningInstance(DAGNode):
"""Closed learning instance node in a DAG."""
def __init__(self, session, op):
self._session = session
self._op = op
# add op to dag
self._session.dag.add_op(self._op)