Create c++ Stored Procedures on GraphScope Interactive

Apart from adapting Cypher query as stored procedure, Interactive supports implementing stored procedure via c++ code, by calling the interfaces provided by Graph Database Engine.

Getting Started.

We take a example to demonstrate how to create a stored procedure in c++. In this example, we implement a procedure which returns the total number of vertices.

Before proceed, make sure you have installed and launched Interactive as describe in Getting Started, and environment variables are correctly exported.

Define a C++ Stored Procedure

A C++ stored procedure should be defined in a file, for example count_vertices.cc.

#include "flex/engines/graph_db/app/app_base.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/utils/app_utils.h"

namespace gs {

// A sample app get the count of the specified vertex label, since no write
// operations are needed we inherit from ReadAppBase. Otherwise you could
// inherit from WriteAppBase.
class CountVertices : public ReadAppBase {
public:
  CountVertices() {}
  /**
   * @brief Query function for query class.
   * @param sess: GraphDBSession The interface where you can visit the graph.
   * @param input: Decoder From where you could deserialize the input
   * parameters.
   * @param output: Encoder To where you should encode the output parameters.
   */
  bool Query(const gs::GraphDBSession &sess, Decoder &input,
             Encoder &output) override {
    // First get the read transaction.
    auto txn = sess.GetReadTransaction();
    // We expect one param of type string from decoder.
    if (input.empty()) {
      return false;
    }
    std::string label_name{input.get_string()};
    const auto &schema = txn.schema();
    if (!schema.has_vertex_label(label_name)) {
      return false; // The requested label doesn't exits.
    }
    auto label_id = schema.get_vertex_label_id(label_name);
    // The vertices are labeled internally from 0 ~ vertex_label_num, accumulate
    // the count.
    output.put_int(txn.GetVertexNum(label_id));
    txn.Commit();
    return true;
  }
};
} // namespace gs

extern "C" {

// Defines how a instance of your procedure is created.
void *CreateApp(gs::GraphDBSession &db) {
  gs::CountVertices *app = new gs::CountVertices();
  return static_cast<void *>(app);
}

// Defines how a instance of your procedure should be deleted.
void DeleteApp(void *app) {
  gs::CountVertices *casted = static_cast<gs::CountVertices *>(app);
  delete casted;
}
}

Register and Call the stored procedure

With the above CountVertices procedure defined, we could create a stored procedure from Interactive Python/Java SDK, or gsctl.

Python SDK

With Interactive Python SDK Installed, you could easily create the stored procedure via the following code.

import os
from gs_interactive.client.driver import Driver
from gs_interactive.models import *

driver = Driver() # connecting to Interactive service, assuming environment variables like INTERACTIVE_ADMIN_ENDPOINT have been correctly exported.
sess = driver.session() # create the session

# Read the content of the procedure <count_vertices.cc>
app_path='/path/to/count_vertices.cc' # Replace the path with the real path to count_vertices.cc
if not os.path.exists(app_path):
    raise Exception("count_vertices.cc not found")
with open(app_path, "r") as f:
    app_content = f.read()

# we try to create the procedure on default graph, you may try to create on your customized graph by changing the graph_id 
graph_id = '1'

create_proc_request = CreateProcedureRequest(
    name="count_vertices",
    description="Count vertices for the specified vertex label name",
    query=app_content,
    type="cpp",
)
resp = sess.create_procedure(graph_id, create_proc_request)
print(resp)
assert resp.is_ok()

For more tails about Python SDK Interface, please refer to Java SDK Procedure API.

Java SDK

With Interactive Java SDK Installed, you could easily create the stored procedure via the following code.

import com.alibaba.graphscope.interactive.client.Driver;
import com.alibaba.graphscope.interactive.client.Session;
import com.alibaba.graphscope.interactive.models.*;
import com.alibaba.graphscope.interactive.client.common.Result;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.IOException;
import java.net.URISyntaxException;

import org.junit.jupiter.api.Test;

public class CreateProcedureTest{
    public static void createProcedure(Session sess, String graphId) {
        CreateProcedureRequest procedure = new CreateProcedureRequest();
        procedure.setName("count_vertices");
        procedure.setDescription("Count vertices for the specified vertex label name");
        String appFilePath = "/path/to/count_vertices.cc"; // Please replace with the real path to count_vertices.cc
        // check file exist
        if (Files.notExists(Paths.get(appFilePath))) {
            throw new RuntimeException("sample app file not exist");
        }
        String appFileContent = "";
        try {
            appFileContent =
                    new String(
                            Files.readAllBytes(Paths.get(appFilePath)));
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (appFileContent.isEmpty()) {
            throw new RuntimeException("sample app content is empty");
        }
        procedure.setQuery(appFileContent);
        procedure.setType(CreateProcedureRequest.TypeEnum.CPP);
        Result<CreateProcedureResponse> resp = sess.createProcedure(graphId, procedure);
        if (!resp.isOk()) {
            throw new RuntimeException("Fail to create procedure" + resp.getStatusMessage());
        }
    }

    public static void main(String[] args) {
        Driver driver = Driver.connect();
        Session sess = driver.session();
        createProcedure(sess, "1");
        return ;
    }
}

For more tails about Java SDK Interface, please refer to Java SDK Procedure API.

gsctl

TODO

Graph Database Engine

Interactive follow the design of Transaction, user should either make use of ReadTransaction, InsertTraction or UpdateTransaction.

We recommend you to read though the code of GraphDB for more detail, if you want to write a stored procedure with best performance. If you encounter any problems, feel free to contact us by submit issue or create discussions.

Query Input and Output

Interactive natively support two kind of protocols for param encoding and result decoding, Encoder/Decoder and CypherApp.

Encoder/Decoder

Encoder/Decoder based method provides the best performance. The serialization/deserialization could be customized by the user. In this serialization protocol, User need to take care of the param encoding and decoding himself. For example, the example procedure above count_vertices.cc use Encoder/Decoder to encode the input and decode the output.

Here is an example of how to query the count_vertices procedure using Interactive Java SDK and Python SDK. Please note that you need to switch to the graph with ID “1” in order to make the procedure callable.

import com.alibaba.graphscope.interactive.client.Driver;
import com.alibaba.graphscope.interactive.client.Session;
import com.alibaba.graphscope.interactive.models.*;
import com.alibaba.graphscope.interactive.client.common.Result;
import com.alibaba.graphscope.interactive.client.utils.Encoder;
import com.alibaba.graphscope.interactive.client.utils.Decoder;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.IOException;
import java.net.URISyntaxException;

import org.junit.jupiter.api.Test;

public class CreateProcedureTest{
    public static void callProcedure(Session sess, String graphId, String procedureName, String labelName) {
        byte[] bytes = new byte[1 + 4 + labelName.length()]; // 1 byte for procedure index, 4 bytes for label length, and labelName.length() bytes for label name
        Encoder encoder = new Encoder(bytes);
        encoder.put_string(labelName);
        encoder.put_byte((byte) 1); // Assume the procedure index is 1
        Result<byte[]> resp = sess.callProcedureRaw(graphId, bytes);
        if (!resp.isOk()) {
            throw new RuntimeException("Fail to call procedure" + resp.getStatusMessage());
        }
        Decoder decoder = new Decoder(resp.getValue());
        int count = decoder.get_int();
        System.out.println("Count of vertices with label " + labelName + " is " + count); // should be 4
    }

    public static void startServiceOnGraph(Session sess, String graphId) {
        Result<StartServiceResponse> resp = sess.startService(new StartServiceRequest().graphId(graphId));
        if (!resp.isOk()) {
            throw new RuntimeException("Fail to start service" + resp.getStatusMessage());
        }
    }

    public static void main(String[] args) {
        Driver driver = Driver.connect();
        Session sess = driver.session();
        startServiceOnGraph(sess, "1"); // Procedure is only runnable after service has been switched to graph 1
        callProcedure(sess, "1", "count_vertices", "person"); // count how many vertices are labeled with person.
        return ;
    }
}
import os
from gs_interactive.client.driver import Driver
from gs_interactive.models import *
from gs_interactive.client.utils import *

driver = Driver() # connecting to Interactive service, assuming environment variables like INTERACTIVE_ADMIN_ENDPOINT have been correctly exported.
sess = driver.session() # create the session

# Use a encoder to encode input request
encoder = Encoder()
encoder.put_string("person") # input label name
encoder.put_byte(1) # procedure id 1
resp = sess.call_procedure_raw(graph_id="1", params=encoder.get_bytes())
assert resp.is_ok()