Groot: Persistent Graph Store#
Overview#
In addition to Vineyard, the in-memory columnar graph store supported in GraphScope, we also have a disk-based, row-oriented, multi-versioned, persistent graph store. While Vineyard focuses on great support for in-memory whole graph analytics workloads, the persistent graph store is geared towards better supporting continuous graph data management services that frequently update the graph and answer traversal queries.
The store is a distributed graph store built on top of the popular RocksDB key-value store. It adopts a row-oriented design to support frequent small updates to the graph. Each row is tagged with a snapshot ID as its version. A query reads the most recent version of rows relative to the snapshot ID when it starts and is hence not blocked by writes. For writes, we take a compromise between consistency and higher throughput. In our design, writes in the same session can be grouped and executed atomically as a unit, and the persistent store assigns a snapshot ID (which is a low-resolution timestamp of the current time) to each group and executes groups of writes by the order of their snapshot IDs and by a deterministic (though arbitrary) order for groups of writes that occur in the same snapshot ID. It provides high write throughput while still maintaining some degree of order and isolation, although it provides less consistency than strict snapshot isolation common in databases. We hope our design choice provides an interesting trade-off for practical usage.
Known Limitation#
Initially, the new persistent store is provided as a separate option from Vineyard, and it can accept Gremlin queries for data access. Going forward we hope to evolve them into an integrated hybrid graph store suitable for all kinds of workloads.
Deploy Groot#
We use Helm to deploy Groot on Kubernetes Cluster.
Prerequisites#
Kubernetes 1.21+
Helm 3.2.0+
If you don’t have a Kubernetes cluster, you can create a local one by using Docker Desktop, Minikube, or Kind.
If you don’t have a Kubernetes cluster, you can create a local one Docker Desktop, minikube or kind.
Refer to deploy graphscope on self managed k8s cluster for more details guide.
Installation#
Install from ArtifactHub#
The latest stable version of Groot can be installed from ArtifactHub by using the following command:
helm repo add graphscope https://graphscope.oss-cn-beijing.aliyuncs.com/charts/
helm repo update
helm install demo graphscope/graphscope-store
Installing from a local directory#
If you want to apply the latest updates or modify some files, you can clone the GraphScope repository and install Groot from a local directory by using the following commands:
cd GraphScope/charts/graphscope-store
helm dependency update # fetch the dependency charts
helm install demo .
The above commands will deploy Groot with the default configuration. The configurable items during installation can be found in the Common Configurations
section.
It may take some time for the Groot service to be available because the image needs to be pulled for the first time. You can check if the service is available by using the following command:
helm test demo
Helm will print the following statement on the console, which you could copy and execute to get the connection address.
You can also check the deployment status and get the connection address by using the following command:
helm status demo
Common Configurations#
Name |
Description |
Default value |
---|---|---|
image.registry |
Image registry |
|
image.repository |
Image repository |
graphscope/graphscope-store |
image.tag |
Image tag, default to the version of the Chart |
“” |
auth.username |
Username. If empty, then there’s no authentication |
“” |
auth.password |
Password |
“” |
store.replicaCount |
Number of Store Pod |
2 |
dataset.modern |
Load modern graph dataset at the start |
false |
frontend.replicaCount |
Number of Frontend |
1 |
frontend.service.type |
Kubernetes Service type of frontend |
NodePort |
frontend.query.per.second.limit |
the maximum qps can be handled by frontend service |
2147483647 (without limitation) |
If Groot is launched with the default configuration, then two Store Pods, one Frontend Pod, and one Coordinator Pod will be started. The number of Coordinator nodes is fixed to 1.
Use the --set key=value[,key=value]
command to set the parameters for helm install
, for example:
helm install demo graphscope/graphscope-store \
--set auth.username=admin,auth.password=123456
The aforementioned command configures the username and password required for connecting to the cluster.
In situations where a multitude of parameters need to be set, utilizing the --set
option can become difficult to manage. In such cases, one can specify the parameters using a YAML file, as exemplified below:
helm install demo graphscope/graphscope-store -f settings.yaml
A sample configuration for settings.yaml
is like the following:
# cat settings.yaml
---
image:
tag: latest
auth:
username: admin
password: 123456
It will specify the image tag to be pulled as latest while setting the username and password.
Connecting to Groot#
Upon installing Groot, an empty graph is created by default. We can execute connections, define graph models, load data, and perform queries using the Gremlin Query Language.
Connection#
In the previous step, upon executing the command to obtain connection information as printed by Helm, the said information is set to environment variables. The following statement can be used to obtain and connect to Groot:
import os
import graphscope
node_ip = os.environ["NODE_IP"]
grpc_port = os.environ["GRPC_PORT"]
gremlin_port = os.environ["GREMLIN_PORT"]
grpc_endpoint = f"{node_ip}:{grpc_port}"
gremlin_endpoint = f"{node_ip}:{gremlin_port}"
conn = graphscope.conn(grpc_endpoint, gremlin_endpoint)
In case a username and password were configured during the installation process, they will need to be provided when establishing a connection.
conn = graphscope.conn(grpc_endpoint, gremlin_endpoint, username="admin", password="123456")
Building and Modifying Graph Models#
The graph object can be obtained through the conn
object.
graph = conn.g()
# Create schema
schema = graph.schema()
Using Built-in Datasets#
If dataset.modern=true
is set during installation, Groot will load a simple example dataset for quick start.
Note
Not supported at this moment
Customizing Models and Datasets#
Users can also customize models and load their own datasets.
Common statements used to define graph models are as follows:
schema.add_vertex_label('v_label_name').add_primary_key('pk_name', 'type').property('prop_name_1', 'type').property('prop_name_2', 'type')
schema.add_edge_label('e_label_name').source('src_label').destination('dst_label').property('prop_name_3', 'type')
schema.drop('label')
schema.drop('label', 'src_label', 'dst_label')
schema.update()
A graph model defines several labels, each with a label name and several properties (.property()
).
Among them, point labels can define primary keys (.add_primary_key()
), and edge labels need to define the source label (.source()
) and destination label (.destination()
). .drop()
is used to delete a label. .update()
submits a transaction to apply changes.
Here is an example of a simple model that defines the relationship between people who know each other, with the labels person
-> knows
<- person
. The model includes:
person
label, which includes a primary key named id
of type long
, and a property named name of type str
.
knows
label, which includes a primary key named date
of type str
, with the source and destination labels both being person.
schema.add_vertex_label("person").add_primary_key("id", "long").add_property(
"name", "str"
)
schema.add_edge_label("knows").source("person").destination("person").add_property(
"date", "str"
)
schema.update()
Querying Data#
Python#
Using the connection information obtained earlier, we can perform Gremlin queries in Python.
g = conn.gremlin()
print(g.V().count().toList())
Alternatively, we can directly retrieve the Gremlin IP address and port from the connection information and use the gremlinpython
library for querying.
Install the
gremlinpython
package
pip install gremlinpython ‑‑user
Copy the following code and set the
endpoint
to the connection information obtained earlier:
import os
from gremlin_python.driver.client import Client
endpoint = f"{os.environ['NODE_IP']}:{os.environ['GREMLIN_PORT']}"
graph_url = f"ws://{endpoint}/gremlin"
username = "<username>"
password = "<password>"
client = Client(
graph_url,
"g",
username=username, # If auth enabled
password=password, # If auth enabled
)
print(client.submit("g.V().limit(2)").all().result())
client.close()
Java#
Create a directory structure as follows, where pom.xml and Main.java are files
gremlin
├── pom.xml
└── src
├── main
├── java
└── org
└── example
└── Main.java
Configure
pom.xml
as follows:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>gremlin</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>GremlinExample</name>
<url>https://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-driver</artifactId>
<version>3.6.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<executable>java</executable>
<arguments>
<argument>-classpath</argument>
<classpath/>
<argument>org.example.Main</argument>
</arguments>
<mainClass>org.example.Main</mainClass>
<complianceLevel>1.11</complianceLevel>
<killAfter>-1</killAfter>
</configuration>
</plugin>
</plugins>
</build>
</project>
Configure
Main.java
as follows:
package org.example;
import org.apache.tinkerpop.gremlin.driver.*;
public class Main {
public static void main(String[] args) {
Cluster.Builder builder = Cluster.build();
builder.addContactPoint("127.0.0.1");
builder.port(8182);
builder.credentials("username", "password");
Cluster cluster = builder.create();
Client client = cluster.connect();
ResultSet results = client.submit("g.V().limit(3).valueMap()");
for (Result result : results) {
System.out.println(result.getObject());
}
client.close();
cluster.close();
}
}
Execute the program
mvn compile exec:exec
Node.js#
Install package
gremlin
for javascript
npm install gremlin
Execute these codes
const gremlin = require('gremlin');
const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection;
const Graph = gremlin.structure.Graph;
graph_url = `ws://{gremlin_endpoint}/gremlin`
remoteConn = new DriverRemoteConnection(graph_url,{});
const graph = new Graph();
const g = graph.traversal().withRemote(remoteConn);
g.V().limit(2).count().next().
then(data => {
console.log(data);
remoteConn.close();
}).catch(error => {
console.log('ERROR', error);
remoteConn.close();
});
Now since we have only defined the schema, and there is no data yet, the query result would be empty. So the next step is to load data.
Data Import#
There are two methods for importing data. One method is to batch import data from external storage (such as HDFS) using an offline import tool, and the other is to perform real-time writing using statements provided by the SDK.
Note: Offline import will *overwrite the full data of the imported label.
Offline Import#
Prerequisite#
Hadoop Cluster
Data import tool data_load.tar.gz
Extract data_load.tar.gz, where data_load/bin/load_tool.sh
is the tool that will be used below.
tar xzvf data_load.tar.gz
Data Format#
Source data needs to be stored in HDFS in a certain format. Each file includes data related to a type of vertex or edge label.
The following is an example of the data related to the person
vertex label and the knows
edge label, which contains the person
->knows
<-person
relationship.
person.csv
id|name
1000|Alice
1001|Bob
person_knows_person.csv
person_id|person_id_1|date
1000|1001|20210611151923
The first line of the data file is a header that describes the key of each field. The header is not required. If there is no header in the data file, you need to set skip.header to true in the data building process (For details, see params description in “Building a partitioned graph”).
The rest lines are the data records. Each line represents one record. Data fields are separated by a custom separator (“|” in the example above). In the vertex data file person.csv, id field and name field are the primary-key and the property of the vertex type person respectively. In the edge data file person_knows_person.csv, person_id field is the primary-key of the source vertex, person_id_1 field is the primary-key of the destination vertex, date is the property of the edge type knows.
All the data fields will be parsed according to the data-type defined in the graph schema. If the input data field cannot be parsed correctly, data building process would be failed with corresponding errors.
Loading Process#
The loading process contains three steps:
A partitioned graph is built from the source files and stored in the same HDFS using a MapReduce job
The graph partitions are loaded into the store servers (in parallel)
Commit to the online service so that data is ready for serving queries
Build: Building a partitioned graph#
Build data by running the hadoop map-reduce job with following command:
$ ./load_tool.sh build <path/to/config/file>
The config file should follow a format that is recognized by Java java.util.Properties
class. Here is an example:
split.size=256
separator=\\|
input.path=/tmp/ldbc_sample
output.path=/tmp/data_output
graph.endpoint=1.2.3.4:55555
column.mapping.config={"person_0_0.csv":{"label":"person","propertiesColMap":{"0":"id","1":"name"}},"person_knows_person_0_0.csv":{"label":"knows","srcLabel":"person","dstLabel":"person","srcPkColMap":{"0":"id"},"dstPkColMap":{"1":"id"},"propertiesColMap":{"2":"date"}}}
skip.header=true
load.after.build=true
# This is not required when load.after.build=true
# hadoop.endpoint=127.0.0.1:9000
Details of the parameters are listed below:
Config key |
Required |
Default |
Description |
---|---|---|---|
split.size |
false |
256 |
Hadoop map-reduce input data split size in MB |
separator |
false |
\| |
Separator used to parse each field in a line |
input.path |
true |
- |
Input HDFS dir |
output.path |
true |
- |
Output HDFS dir |
graph.endpoint |
true |
- |
RPC endpoint of the graph storage service. You can get the RPC endpoint following this document: GraphScope Store Service |
column.mapping.config |
true |
- |
Mapping info for each input file in JSON format. Each key in the first level should be a fileName that can be found in the |
skip.header |
false |
true |
Whether to skip the first line of the input file |
load.after.build |
false |
false |
Whether to immediately ingest and commit the builded files |
hadoop.endpoint |
false |
- |
Endpoint of hadoop cluster in the format of |
After data building completed, you can find the output files in the output.path
of HDFS. The output files includes a
meta file named META
, an empty file named _SUCCESS
, and some data files that one for each partition named in the
pattern of part-r-xxxxx.sst
. The layout of the output directory should look like:
/tmp/data_output
|- META
|- _SUCCESS
|- part-r-00000.sst
|- part-r-00001.sst
|- part-r-00002.sst
...
If load.after.build=true
, then you can skip step 2 and 3.
Else, please proceed to ingest and commit.
2. Loading graph partitions#
Now ingest the offline built data into the graph storage. Run:
$ ./load_data.sh ingest <path/to/config/file>
The offline built data can be ingested successfully only once, otherwise errors will occur.
3. Commit to store service#
After data ingested into graph storage, you need to commit data loading. The data will not be able to read until committed successfully. Run:
$ ./load_data.sh commit <path/to/config/file>
Note: The later committed data will overwrite the earlier committed data which have same vertex types or edge relations.
Realtime Write#
Groot graph have several methods for realtime write as follows:
Python#
Refer to test_store_service.py for examples.
# Inserts one vertex
def insert_vertex(self, vertex: VertexRecordKey, properties: dict) -> int: pass
# Inserts a list of vertices
def insert_vertices(self, vertices: list) -> int: pass
# Update one vertex to new properties
def update_vertex_properties(self, vertex: VertexRecordKey, properties: dict) -> int: pass
# Delete one vertex
def delete_vertex(self, vertex_pk: VertexRecordKey) -> int: pass
# Delete a list of vertices
def delete_vertices(self, vertex_pks: list) -> int: pass
# Insert one edge
def insert_edge(self, edge: EdgeRecordKey, properties: dict) -> int: pass
# Insert a list of edges
def insert_edges(self, edges: list) -> int: pass
# Update one edge to new properties
def update_edge_properties(self, edge: EdgeRecordKey, properties: dict) -> int: pass
# Delete one edge
def delete_edge(self, edge: EdgeRecordKey) -> int: pass
# Delete a list of edges
def delete_edges(self, edge_pks: list) -> int: pass
# Make sure the snapshot is available
def remote_flush(self, snapshot_id: int): pass
We use two utility class called VertexRecordKey
and EdgeRecordKey
to denote the key to uniquely identify a record.
class VertexRecordKey:
"""Unique identifier of a vertex.
The primary key may be a dict, the key is the property name,
and the value is the data.
"""
def __init__(self, label, primary_key):
self.label: str = label
self.primary_key: dict = primary_key
class EdgeRecordKey:
"""Unique identifier of an edge.
The `eid` is required in Update and Delete, which is a
system generated unsigned integer. User need to get that eid
by other means such as gremlin query.
"""
def __init__(self, label, src_vertex_key, dst_vertex_key, eid=None):
self.label: str = label
self.src_vertex_key: VertexRecordKey = src_vertex_key
self.dst_vertex_key: VertexRecordKey = dst_vertex_key
self.eid: int = eid # Only required in update and delete operation
Java#
We also have a java sdk for realtime write and schema management.
APIs including:
Create and inspect graph schema
Insert / delete / update vertices
Insert / delete / update edges
Clear properties of vertices or edges by property name
Refer to RealtimeWrite.java for examples.
Other features#
Groot could enable user to replay realtime write records from a specific offset, or a timestamp, this is useful when you want to restore some records before a offline load finished, since offload will overwrite all records.
You can only specify one of offset
and timestamp
. The other unused one must be set to -1. If not, offset
will take precedence.
Example API:
Python:
import time import graphscope conn = graphscope.conn() current_timestamp = int(time.time() * 1000) - 100 * 60 * 1000 r = conn.replay_records(-1, current_timestamp)
Java
GrootClient client = GrootClientBuilder.build(); long timestamp = System.currentTimeMillis(); client.replayRecords(-1, timestamp);
Uninstalling and Restarting#
Uninstall Groot#
To uninstall/delete the demo
Groot cluster deployment, use
helm delete demo
The command removes all the Kubernetes components associated with the chart and deletes the release.
If the cluster supports dynamic provisioning, Groot will create a set of PersistentVolumeClaims (PVCs) to claim PersistentVolumes (PVs) by default for storing metadata and graph data. The PVs will not be deleted by default when Groot is uninstalled. You can query the PVCs and PVs using the following commands.
kubectl get pvc
kubectl get pv
# To query only the PVC belonging to the demo deployment
kubectl get pvc -lapp.kubernetes.io/instance=demo
Restart Groot#
To relaunch Groot on the original PV with the same command used for the initial installation. At this point, Groot can access the data from before the uninstallation, and all other operations are the same as before the uninstallation. This can facilitate seamless version updates, or when using cloud provider services, you can uninstall Groot on demand to release elastic computing resources and keep only the block storage to save costs.
# Note that if the node count is configured during installation, it should be exactly the same when reinstalling.
helm install demo graphscope/graphscope-store
Destroy Groot#
Destroying Groot means releasing all resources used by Groot, including StatefulSets, Services, PVCs, and PVs.
helm delete demo
kubectl delete pvc -lapp.kubernetes.io/instance=demo
# If the PV was dynamically provisioned with a PVC, then there is no need to delete the PV explicitly as it will be deleted automatically with the PVC.
# However, if the PV was manually created, then it must be explicitly deleted.
# To delete a PV, you can use the kubectl delete command followed by the PV name:
# kubectl delete pv ${PV_NAME}
Developing Guide#
Build image#
cd GraphScope/k8s
make graphscope-store VERSION=latest
This would produce an image named graphscope/graphscope-store:latest
.
Persistence#
Groot stores the graph data in /var/lib/graphscope-store
directory in the Store Pod and the metadata in /etc/groot/my.meta
directory in the Coordinator Pod.
Troubleshooting#
Viewing logs#
You can view the logs of each Pod using the command
kubectl logs ${POD_NAME}
.
It is common to check the logs of Frontend and Store roles. When debugging, it is often necessary to check the logs of Coordinator as well. The logs of Frontend include the logs of the Compiler that generates the logical query plan, while the logs of Store include the logs of the query engine execution. For example,
kubectl logs demo-graphscope-store-frontend-0
kubectl logs demo-graphscope-store-store-0
Configuring logs#
Groot uses logback
as the logging library for the Java part, and log4rs
as the logging library for the Rust part.
Both of these logging libraries support automatic periodic reloading of configuration, which means that the logging configuration file can be changed and will take effect after a short time (up to 30 seconds).
The location of the logging configuration file in the container is:
configuration file of
logback
is in/usr/local/groot/conf/logback.xml
configuration file of
log4rs
is in/usr/local/groot/conf/log4rs.yml
Secondary Instance#
Groot support open secondary instance along with primary instances. It leverages the Secondary Instance of RocksDB to provide the ability to serve the querying requests as well as catching up the schema and data updates.
To use it, just set the secondary.enabled=true
in the helm charts.
Also remember the data path, ZK connect string as well as Kafka endpoint and topic should be as same as the primary instance.
And use a different zk.base.path
for each secondary instance to avoid conflict with each other when doing node discovery.
storeGcIntervalMs
controls how often should the secondary perform a try_catch_up_with_primary
call, default to 5000
which is 5 seconds.
Traces#
use --set otel.enabled=true
to enable trace export.