图分析引擎#

GraphScope 的图分析引擎继承了 GRAPE , 该系统于 SIGMOD2017 上首次提出并获得最佳论文奖。

与以往的系统的不同,GRAPE 支持将串行图算法自动并行化。在 GRAPE 中, 只需进行很小的更改即可轻松地将串行算法即插即用,使其并行化的运行在分布式环境,并高效地处理大规模图数据。 除了易于编程外,GRAPE 还被设计为高效且可拓展的系统,可灵活应对现实中图应用多变的规模、多样性和复杂性。

内置算法#

GraphScope 图分析引擎内置了许多常用的图分析算法,包括连通性分析算法、路径分析算法、社区检测和中心度计算等类型。

内置算法可以在图上轻松调用。例如,

import graphscope
from graphscope.dataset import load_p2p_network

# 创建默认 session,并加载属性图
g = load_p2p_network()

# 大多数内置算法只支持在简单图上进行计算,因此我们需要先通过顶点和边的类型来生成一个简单图
simple_g = g.project(vertices={"host": ["id"]}, edges={"connect": ["dist"]})

result_lpa = graphscope.lpa(simple_g, max_round=20)
result_sssp = graphscope.sssp(simple_g, src=20)

内置算法的完整列表如下所示。具体某个算法是否支持属性图也在其文档进行了描述。

算法的支持列表会随着不断增加持续更新中。

对计算结果的处理#

当完成一次图计算,计算结果会被包装成 Context 类,保存在分布式集群的内存中。

用户可能希望将结果传到客户端进行处理,或是写入云中某位置或分布式文件系统。GraphScope 支持用户通过以下方法来获取结果数据。

# 转化为相应数据类型
result_lpa.to_numpy("r")
result_lpa.to_dataframe({"node": "v.id", "result": "r"})

# 或写入 hdfs、oss, 或本地目录中(pod中的本地目录)
result_lpa.output("hdfs://output", {"node": "v.id", "result": "r"})
result_lpa.output("oss://id:key@endpoint/bucket/object", {"node": "v.id", "result": "r"})
result_lpa.output("file:///tmp/path", {"node": "v.id", "result": "r"})

# 或写入本地的 client 中
result_lpa.output_to_client("/tmp/lpa_result.txt", {"node": "v.id", "result": "r"})

# 或写入 vineyard 数据结构
result_lpa.to_vineyard_dataframe({"node": "v.id", "result": "r"})
result_lpa.to_vineyard_tensor("r")

此外,如 快速上手 中所示,用户可以将计算结果加回到该图数据中作为顶点(边)的新属性(列)。

# 将结果作为新列添加回属性图,列名为 "lpa_result",并生成一张新图
new_graph = g.add_column(result_lpa, {"lpa_result": "r"})

用户可以通过选择器( Selector )来定义将计算结果中的哪些部分写回图数据。 选择器指定了计算结果中的哪一部分会被处理。类似的,图数据也可以作为被处理数据的一部分,例如顶点ID。 我们为选择器保留了三个关键字:r 代表结果,ve 分别代表顶点和边。 以下是结果处理中选择器的一些示例。

# 获取顶点上的结果
result_lpa.to_numpy('r')

# 转换为 dataframe,
# 使用顶点的 `id` 作为名为 df_v 的列
# 使用顶点的 `data` 作为名为 df_vd 的列
# 使用结果列作为名为 df_result 的列
result_lpa.to_dataframe({'df_v': 'v.id', 'df_vd': 'v.data', 'df_result': 'r'})

# using the property0 written on vertices with label0 as column `result`
# 对于属性图的结果
# 使用 `:` 作为v和e的标签选择器
# 将 label0 顶点的 id (`v:label0.id`)作为 `id` 列
# 使用写在带有label0的顶点上的property0作为`result`列
result.output(fd='hdfs:///gs_data/output', \
        selector={'id': 'v:label0.id', 'result': 'r:label0.property0'})

可以查看 ContextSelector 获取更多细节。

使用 PIE 编程模型自定义算法#

如果内置算法无法满足需求,用户可以编写自己的算法。用户可以通过 graphscope 在纯 Python 模式 下使用 PIE 编程模型编写算法。

Workflow of PIE

为了实现自己的算法,用户需要实现此类。

from graphscope.analytical.udf.decorators import pie
from graphscope.framework.app import AppAssets

@pie(vd_type="double", md_type="double")
class YourAlgorithm(AppAssets):
    @staticmethod
    def Init(frag, context):
        pass

    @staticmethod
    def PEval(frag, context):
        pass

    @staticmethod
    def IncEval(frag, context):
        pass

如代码所示,用户需要实现一个以 @pie 装饰的类,并提供三个串行 图算法函数。其中,Initialize 函数用于设置算法初始状态,PEval 函数定义算法的局部计算, IncEval 函数定义对分区数据的增量计算。与 fragment 相关的完整 API 可以参考 Cython SDK API.

以单源最短路径算法 SSSP 为例,用户在 PIE 模型中定义的 SSSP 算法可如下所示。

from graphscope.analytical.udf.decorators import pie
from graphscope.framework.app import AppAssets

@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
    @staticmethod
    def Init(frag, context):
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            nodes = frag.nodes(v_label_id)
            # 初始化每个顶点的距离
            context.init_value(
                nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
            )
            context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)

    @staticmethod
    def PEval(frag, context):
        # 从context中获取源顶点
        src = int(context.get_config(b"src"))
        graphscope.declare(graphscope.Vertex, source)
        native_source = False
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            if frag.get_inner_node(v_label_id, src, source):
                native_source = True
                break
        if native_source:
            context.set_node_value(source, 0)
        else:
            return
        e_label_num = frag.edge_label_num()
        # 在源顶点所在分区中,运行dijkstra算法作为部分计算
        for e_label_id in range(e_label_num):
            edges = frag.get_outgoing_edges(source, e_label_id)
            for e in edges:
                dst = e.neighbor()
                # 使用边上第三列数据作为两点之间的距离
                distv = e.get_int(2)
                if context.get_node_value(dst) > distv:
                    context.set_node_value(dst, distv)

    @staticmethod
    def IncEval(frag, context):
        v_label_num = frag.vertex_label_num()
        e_label_num = frag.edge_label_num()
        # 增量计算,更新最短距离
        for v_label_id in range(v_label_num):
            iv = frag.inner_nodes(v_label_id)
            for v in iv:
                v_dist = context.get_node_value(v)
                for e_label_id in range(e_label_num):
                    es = frag.get_outgoing_edges(v, e_label_id)
                    for e in es:
                        u = e.neighbor()
                        u_dist = v_dist + e.get_int(2)
                        if context.get_node_value(u) > u_dist:
                            context.set_node_value(u, u_dist)

如代码所示,用户仅需要设计和实现单分区的串行算法,而不需要考虑分布式环境中的分区通信和消息传递。 在这种情况下,经典的 Dijkstra 算法及其增量版本就可以用于在集群上的大规模图数据计算。

使用 Pregel 编程模型自定义算法#

除了基于子图的 PIE 模型之外,graphscope 也支持以顶点为中心的 Pregel 编程模型。 您可以通过实现以下算法类来在 Pregel 模型中开发算法。

from graphscope.analytical.udf.decorators import pregel
from graphscope.framework.app import AppAssets

@pregel(vd_type='double', md_type='double')
class YourPregelAlgorithm(AppAssets):

    @staticmethod
    def Init(v, context):
        pass

    @staticmethod
    def Compute(messages, v, context):
        pass

    @staticmethod
    def Combine(messages):
        pass

与 PIE 模型不同,Pregel 算法类的装饰器为 @pregel ,该类方法是 定义在顶点上的,而不同于 PIE 模型中定义在图分区上。 还是以 SSSP 为例,Pregel 模型下的算法如下所示。

# 装饰器, 定义顶点数据和消息数据的类型
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
    @staticmethod
    def Init(v, context):
        v.set_value(1000000000.0)

    @staticmethod
    def Compute(messages, v, context):
        src_id = context.get_config(b"src")
        cur_dist = v.value()
        new_dist = 1000000000.0
        if v.id() == src_id:
            new_dist = 0
        for message in messages:
            new_dist = min(message, new_dist)
        if new_dist < cur_dist:
            v.set_value(new_dist)
            for e_label_id in range(context.edge_label_num()):
                edges = v.outgoing_edges(e_label_id)
                for e in edges:
                    v.send(e.vertex(), new_dist + e.get_int(2))
        v.vote_to_halt()

    @staticmethod
    def Combine(messages):
        ret = 1000000000.0
        for m in messages:
            ret = min(ret, m)
        return ret

自定义算法中使用 math.h 中的函数#

GraphScope 支持用户在自定义算法中通过 context.math 上的接口来使用定义在 math.h 中的 C 函数, 例如,下列代码,

@staticmethod
def Init(v, context):
    v.set_value(context.math.sin(1000000000.0 * context.math.M_PI))

会被翻译成如下的 C 代码以高效地执行,

... Init(...)

    v.set_value(sin(1000000000.0 * M_PI));

运行自定义算法#

运行自定义算法,用户需要在定义算法后调用算法。

import graphscope
from graphscope.dataset import load_p2p_network

g = load_p2p_network(generate_eid=False)

# 加载自己的算法
my_app = SSSP_Pregel()

# 在图上运行自己的算法,得到计算结果
# 这里 `src` 是与 `context.get_config(b"src")` 相对应的
ret = my_app(g, src="6")

在开发和测试之后,您可以通过 to_gar 方法将算法保存成 gar 包以备将来使用。

SSSP_Pregel.to_gar("/tmp/my_sssp_pregel.gar")

在此之后,您可以从 gar 包加载自定义的算法。

from graphscope.framework.app import load_app

# 从gar包中加载自己的算法
my_app = load_app("/tmp/my_sssp_pregel.gar")

# 在图上运行自己的算法,得到计算结果
ret = my_app(g, src="6")

运行Java编写的算法#

GraphScope 支持用户编写Java的PIE app,并且运行在图分析引擎上。我们首先通过一个简单的例子来演示如果在GraphScope 图分析引擎上运行一个Java的图算法(bfs),然后我们将展示如果实现并运行自定义的Java图算法。

运行示例的Java算法#

我们提供了一些经典图分析算法的示例实现,通过下面展示的例子,你可以尝试在GraphScope的图分析引擎上试着运行他们。首先你需要从下载我们提供的示例app的合集 grape-demo.jar <https://github.com/GraphScope/gstest/blob/master/jars/grape-demo-0.17.0-shaded.jar>_,无需任何更改你就可以在 GraphScope图分析引擎上运行这些示例算法。

然后你需要打开GraphScope的python client,尝试载图并且运行一个简单的bfs的算法。

import graphscope
from graphscope import JavaApp
from graphscope.dataset import load_p2p_network

"""Or launch session in k8s cluster"""
sess = graphscope.session(cluster_type='hosts')

graph = load_p2p_network(sess)

"""This app need to run on simple graph"""
graph = graph.project(vertices={"host": ['id']}, edges={"connect": ["dist"]})

sssp=JavaApp(
    full_jar_path="grape-demo.jar", # The path to the grape-demo.jar
    java_app_class="com.alibaba.graphscope.example.bfs.BFS",
)
ctx=sssp(graph,src=6)

"""Fetch the result via context"""
ctx.to_numpy("r:label0.dist_0")

使用Java编写用户自定义的图算法#

为了编写Java实现的图算法,用户需要借助于 grape-jdk。请参考 About Grape JDK 来将 grape-jdk 安装到你的本地环境上。

安装完成后,你需要将 grape-jdk 的依赖添加到你的maven项目依赖中。用户应当注意在 grape-jdk 的依赖配置上应当添加 classifier shaded ,来确保 所有必要的依赖都被包括到。

<dependency>
  <groupId>com.alibaba.graphscope</groupId>
  <artifactId>grape-jdk</artifactId>
  <version>0.1</version>
  <classifier>shaded</classifier>
</dependency>

用户在开发自己算法的过程中,可能会用到其他的第三方jar包。为了解决依赖jar包的版本问题,用户需要使用确保自己生成的jar包包含所有依赖的jar包。 例如,用户可以使用maven插件 maven-shade-plugin.

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
</plugin>

用户在自定义的图算法时,用户需要按照 PIE 来实现自定义算法, 并且需要根据需要的app类型来实现 grape-jdk 中相应的接口并且实现接口。 如果用户期望算法运行在属性图上,那么定义的app应该实现接口 DefaultPropertyAppBaseParallelPropertyAppBase。 如果用户期望算法运行在简单图上,那么定义的app应该实现借口 DefaultAppBaseParallelAppBase 同时用户需要实现app相应的 context,来保存跨SuperStep的数据,其应该是 DefaultPropertyContextBaseParallelPropertyContextBaseDefaultContextBase 或者 ParallelContextBase 的的子类。通过继承 VertexDataContext 或者 VertexPropertyContext 用户可以使用到不同类型的context所拥有的特性。通过这两种context提供的借口所存储的数据在程序执行完之后可以被用户访问,用户可以通过在python client中query返回的context 对象来访问这些数据。

这里我们展示一个简单的app的实现,它所做的事情只是对一个简单图的所有边进行了遍历。

public class Traverse implements ParallelAppBase<Long, Long, Double, Long, TraverseContext>,
    ParallelEngine {

    @Override
    public void PEval(IFragment<Long, Long, Double, Long> fragment,
        ParallelContextBase<Long, Long, Double, Long> context,
        ParallelMessageManager messageManager) {
        TraverseContext ctx = (TraverseContext) context;
        for (Vertex<Long> vertex : fragment.innerVertices()) {
            AdjList<Long, Long> adjList = fragment.getOutgoingAdjList(vertex);
            for (Nbr<Long, Long> nbr : adjList.iterator()) {
                Vertex<Long> dst = nbr.neighbor();
                //Update largest distance for current vertex
                ctx.vertexArray.setValue(vertex, Math.max(nbr.data(), ctx.vertexArray.get(vertex)));
            }
        }
        messageManager.ForceContinue();
    }

    @Override
    public void IncEval(IFragment<Long, Long, Double, Long> fragment,
        ParallelContextBase<Long, Long, Double, Long> context,
        ParallelMessageManager messageManager) {
        TraverseContext ctx = (TraverseContext) context;
        for (Vertex<Long> vertex : fragment.innerVertices()) {
            AdjList<Long, Long> adjList = fragment.getOutgoingAdjList(vertex);
            for (Nbr<Long, Long> nbr : adjList.iterator()) {
                Vertex<Long> dst = nbr.neighbor();
                //Update largest distance for current vertex
                ctx.vertexArray.setValue(vertex, Math.max(nbr.data(), ctx.vertexArray.get(vertex)));
            }
        }
    }
}

该app对应的context的实现:

public class TraverseContext extends
    VertexDataContext<IFragment<Long, Long, Double, Long>, Long> implements ParallelContextBase<Long,Long,Double,Long> {

    public GSVertexArray<Long> vertexArray;
    public int maxIteration;

    @Override
    public void Init(IFragment<Long, Long, Double, Long> frag,
        ParallelMessageManager messageManager, JSONObject jsonObject) {
        createFFIContext(frag, Long.class, false);
        //This vertex Array is created by our framework. Data stored in this array will be available
        //after execution, you can receive them by invoking method provided in Python Context.
        vertexArray = data();
        maxIteration = 10;
        if (jsonObject.containsKey("maxIteration")){
            maxIteration = jsonObject.getInteger("maxIteration");
        }
    }

    @Override
    public void Output(IFragment<Long, Long, Double, Long> frag) {
        //You can also write output logic in this function.
    }
}

在实现了自己的算法之后,用户可能会想在提交到GraphScope的图分析引擎运行前,先在本地验证算法实现的正确性。我们提供了一个简单的脚本来满足用户的 这个需求。为了验证算法实现的正确性,用户只需要运行以下命令:

python3 ${GRAPHSCOPE_REPO}/analytical_engine/java/java-app-runner.py
            --app=${app_class_name} --java_path=${path_to_your_jar}
            --param_str=${params_str}

其中, app_class_name 是用户自定义的Java app的类的全名,(i.e. com.xxx.Traverse), path_to_your_jar 指向包含用户想要运行的算法的jar包。 可以通过 param_str 来制定context初始化需要的参数,例如对于bfs算法可以使用 src=6,threadNum=1 来标记初始节点是6,并行线程数为1。例如,可以 通过如下命令来运行Traverse app。

cd ${GRAPHSCOPE_REPO}/analytical_engine/java/
python3 java-app-runner.py --app com.alibaba.graphscope.example.traverse.Traverse
            --jar_path /home/graphscope/GraphScope/analytical_engine/java/grape-demo/target/grape-demo-0.17.0-shaded.jar
            --arguments "maxIteration=10"

在本地验证自定义算法的正确性之后,你可以通过GraphScope的python client来提交运行jar包。一个jar包中可以包含不同的app实现,用户可以多次提交相同的jar包但是运行不同 的app。

import graphscope
from graphscope import JavaApp
from graphscope.dataset import load_p2p_network

"""Or launch session in k8s cluster"""
sess = graphscope.session(cluster_type='hosts')

graph = load_p2p_network(sess)
graph = graph.project(vertices={"host": ['id']}, edges={"connect": ["dist"]})

app=JavaApp(
    full_jar_path="{full/path/to/your/packed/jar}",
    java_app_class="{fully/qualified/class/name/of/your/app}",
)
ctx=app(graph, "${param string}")

请耐心等待计算完成,当计算完成后,你可以通过 Context 对象来获取计算结果。

相关论文