图分析引擎

GraphScope 的图分析引擎继承了 GRAPE , 该系统于 SIGMOD2017 上首次提出并获得最佳论文奖。

与以往的系统的不同,GRAPE 支持将串行图算法自动并行化。在 GRAPE 中, 只需进行很小的更改即可轻松地将串行算法即插即用,使其并行化的运行在分布式环境,并高效地处理大规模图数据。 除了易于编程外,GRAPE 还被设计为高效且可拓展的系统,可灵活应对现实中图应用多变的规模、多样性和复杂性。

内置算法

GraphScope 图分析引擎内置了许多常用的图分析算法,包括连通性分析算法、路径分析算法、社区检测和中心度计算等类型。

内置算法可以在图上轻松调用。例如,

from graphscope import pagerank
from graphscope import lpa

# algorithms defined on property graph can be invoked directly.
# 定义在属性图上的算法可以直接调用。
result = lpa(g)

# 其他一些算法可能只支持在简单图上进行计算,因此我们需要先通过顶点和边的类型来生成一个简单图。
simple_g = g.project_to_simple(v_label="users", e_label="follows")
result_pr = pagerank(simple_g)

内置算法的完整列表如下所示。具体某个算法是否支持属性图也在其文档进行了描述。

算法的支持列表会随着不断增加持续更新中。

对计算结果的处理

当完成一次图计算,计算结果会被包装成 Context 类,保存在分布式集群的内存中。

用户可能希望将结果传到客户端进行处理,或是写入云中某位置或分布式文件系统。GraphScope 支持用户通过以下方法来获取结果数据。

# 转化为相应数据类型
result_pr.to_numpy()
result_pr.to_dataframe()

# 或写入 hdfs、oss, 或本地目录中(pod中的本地目录)
result_pr.output("hdfs://output")
result_pr.output("oss://id:key@endpoint/bucket/object")
result_pr.output("file:///tmp/path")

# 或写入本地的client中
result_pr.output_to_client("local_filename")

# 或 seal to vineyard
result_pr.to_vineyard_dataframe()
result_pr.to_vineyard_numpy()

此外,如 :ref: 快速上手 中所示,用户可以将计算结果加回到该图数据中作为顶点(边)的新属性(列)。

simple_g = sub_graph.project_to_simple(vlabel="paper", elabel="cites")
ret = graphscope.kcore(simple_g, k=5)

# 将结果作为新列添加到citation图中
subgraph = sub_graph.add_column(ret, {'kcore': 'r'})

用户可以通过选择器( Selector )来定义将计算结果中的哪些部分写回图数据。 选择器指定了计算结果中的哪一部分会被处理。类似的,图数据也可以作为被处理数据的一部分,例如顶点ID。 我们为选择器保留了三个关键字:r 代表结果,ve 分别代表顶点和边。 以下是结果处理中选择器的一些示例。

# 获取顶点上的结果
result_pr.to_numpy('r')

# 转换为 dataframe,
# 使用顶点的 `id` 作为名为 df_v 的列
# 使用顶点的 `data` 作为名为 df_vd 的列
# 使用结果列作为名为 df_result 的列
result_pr.to_dataframe({'df_v': 'v.id', 'df_vd': 'v.data', 'df_result': 'r'})

# using the property0 written on vertices with label0 as column `result`
# 对于属性图的结果
# 使用 `:` 作为v和e的标签选择器
# 将 label0 顶点的 id (`v:label0.id`)作为 `id` 列
# 使用写在带有label0的顶点上的property0作为`result`列
result.output(fd='hdfs:///gs_data/output', \
        selector={'id': 'v:label0.id', 'result': 'r:label0.property0'})

可以查看 ContextSelector 获取更多细节。

使用 PIE 编程模型自定义算法

如果内置算法无法满足需求,用户可以编写自己的算法。用户可以通过 graphscope 在纯 Python 模式 下使用 PIE 编程模型编写算法。

Workflow of PIE

为了实现自己的算法,用户需要实现此类。

@graphscope.analytical.udf.pie
class YourAlgorithm(AppAssets):
    @staticmethod
    def Initialize(context, frag):
        pass

    @staticmethod
    def PEval(context, frag):
        pass

    @staticmethod
    def IncEval(context, frag):
        pass

如代码所示,用户需要实现一个以 @graphscope.analytical.udf.pie 装饰的类,并提供三个串行 图算法函数。其中,Initialize 函数用于设置算法初始状态,PEval 函数定义算法的局部计算, IncEval 函数定义对分区数据的增量计算。与 fragment 相关的完整 API 可以参考 Cython SDK API

以单源最短路径算法 SSSP 为例,用户在 PIE 模型中定义的 SSSP 算法可如下所示。

@graphscope.analytical.udf.pie
class SSSP:
    @staticmethod
    def Initialize(context, frag):
        v_label_num = frag.vertex_label_num()
        # 初始化每个顶点的距离
        for v_label_id in range(v_label_num):
            nodes = frag.nodes(v_label_id)
            context.init_value(nodes, v_label_id, 1000000000.0,
                           PIEAggregateType.kMinAggregate)
        context.register_sync_buffer(MessageStrategy.kSyncOnOuterVertex)

    @staticmethod
    def PEval(context, frag):
        # 从context中获取源顶点
        src = int(context.get_config(b'src'))
        graphscope.declare(graphscope.Vertex, source)
        native_source = False
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            if frag.get_inner_node(v_label_id, src, source):
                native_source = True
                break
        if native_source:
            context.set_node_value(source, 0)
        else:
            return

        # 在源顶点所在分区中,运行dijkstra算法作为部分计算
        e_label_num = frag.edge_label_num()
        for e_label_id in range(e_label_num):
            edges = frag.get_outgoing_edges(source, e_label_id)
            for e in edges:
                dst = e.neighbor()
                distv = e.get_int(2)
                if context.get_node_value(dst) > distv:
                    context.set_node_value(dst, distv)

    @staticmethod
    def IncEval(context, frag):
        v_label_num = frag.vertex_label_num()
        e_label_num = frag.edge_label_num()
        # 增量计算,更新最短距离
        for v_label_id in range(v_label_num):
            iv = frag.inner_nodes(v_label_id)
            for v in iv:
                v_dist = context.get_node_value(v)
                for e_label_id in range(e_label_num):
                    es = frag.get_outgoing_edges(v, e_label_id)
                    for e in es:
                        u = e.neighbor()
                        u_dist = v_dist + e.get_int(2)
                        if context.get_node_value(u) > u_dist:
                            context.set_node_value(u, u_dist)

如代码所示,用户仅需要设计和实现单分区的串行算法,而不需要考虑分布式环境中的分区通信和消息传递。 在这种情况下,经典的 Dijkstra 算法及其增量版本就可以用于在集群上的大规模图数据计算。

使用 Pregel 编程模型自定义算法

除了基于子图的 PIE 模型之外,graphscope 也支持以顶点为中心的 Pregel 编程模型。 您可以通过实现以下算法类来在 Pregel 模型中开发算法。

@pregel(vd_type='double', md_type='double')
class YourPregelAlgorithm(AppAssets):

    @staticmethod
    def Init(v, context):
        pass

    @staticmethod
    def Compute(messages, v, context):
        pass

    @staticmethod
    def Combine(messages):
        pass

与 PIE 模型不同,Pregel 算法类的装饰器为 @graphscope.analytical.udf.pregel ,该类方法是 定义在顶点上的,而不同于 PIE 模型中定义在图分区上。 还是以 SSSP 为例,Pregel 模型下的算法如下所示。

# 装饰器, 定义顶点数据和消息数据的类型
@pregel(vd_type='double', md_type='double')
class SSSP_Pregel(AppAssets):

    @staticmethod
    def Init(v, context):
        v.set_value(1000000000.0)

    @staticmethod
    def Compute(messages, v, context):
        src_id = context.get_config(b"src")
        cur_dist = v.value()
        new_dist = 1000000000.0
        if v.id() == src_id:
            new_dist = 0
        for message in messages:
            new_dist = min(message, new_dist)
        if new_dist < cur_dist:
            v.set_value(new_dist)
            for e_label_id in range(context.edge_label_num()):
                edges = v.outgoing_edges(e_label_id)
                for e in edges:
                    v.send(e.vertex(), new_dist + e.get_int(2))
        v.vote_to_halt()

    @staticmethod
    def Combine(messages):
        ret = 1000000000.0
        for m in messages:
            ret = min(ret, m)
        return ret

运行自定义算法

运行自定义算法,用户需要在定义算法后调用算法。

import graphscope

sess = graphscope.session()
g = sess.load_from("...")

# 加载自己的算法
my_app = SSSP_Pregel()

# 在图上运行自己的算法,得到计算结果
ret = my_app(g, source="0")

在开发和测试之后,您可以通过 to_gar 方法将算法保存成 gar 包以备将来使用。

SSSP_Pregel.to_gar("file:///var/graphscope/udf/my_sssp_pregel.gar")

在此之后,您可以从 gar 包加载自定义的算法。

import graphscope

sess = graphscope.session()
g = sess.load_from("...")

# 从gar包中加载自己的算法
my_app = load_app('SSSP_Pregel', 'file:///var/graphscope/udf/my_sssp_pregel.gar')

# 在图上运行自己的算法,得到计算结果
ret = my_app(g, src="0")

相关论文