HugeGraph-Computer is a distributed graph processing framework implementing the Pregel model (BSP - Bulk Synchronous Parallel). It runs on Kubernetes or YARN clusters and integrates with HugeGraph for graph input/output.
- Distributed MPP Computing: Massively parallel graph processing across cluster nodes
- BSP Model: Algorithm execution through iterative supersteps with global synchronization
- Auto Memory Management: Automatic spill to disk when memory is insufficient - never OOM
- Flexible Data Sources: Load from HugeGraph or HDFS, output to HugeGraph or HDFS
- Easy Algorithm Development: Focus on single-vertex logic without worrying about distribution
- Production-Ready: Battle-tested on billion-scale graphs with Kubernetes integration
┌─────────────────────────────────────────────────────────────┐
│ HugeGraph-Computer │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────┐ │
│ │ computer-driver │ │
│ │ (Job Submission & Coordination) │ │
│ └─────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌─────────────────────────┼───────────────────────────┐ │
│ │ Deployment Layer (choose one) │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ computer-k8s │ │ computer-yarn│ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌─────────────────────────┼───────────────────────────┐ │
│ │ computer-core │ │
│ │ (WorkerService, MasterService, BSP) │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Managers: Message, Aggregation, Snapshot... │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ └─────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌─────────────────────────┼───────────────────────────┐ │
│ │ computer-algorithm │ │
│ │ (PageRank, LPA, WCC, SSSP, TriangleCount...) │ │
│ └─────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌─────────────────────────┴───────────────────────────┐ │
│ │ computer-api │ │
│ │ (Computation, Vertex, Edge, Aggregator, Value) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
| Module | Description |
|---|---|
| computer-api | Public interfaces for algorithm development (Computation, Vertex, Edge, Aggregator, Combiner) |
| computer-core | Runtime implementation (WorkerService, MasterService, messaging, BSP coordination, memory management) |
| computer-algorithm | Built-in graph algorithms (45+ implementations) |
| computer-driver | Job submission and driver-side coordination |
| computer-k8s | Kubernetes deployment integration |
| computer-yarn | YARN deployment integration |
| computer-k8s-operator | Kubernetes operator for job lifecycle management |
| computer-dist | Distribution packaging and assembly |
| computer-test | Integration tests and unit tests |
- JDK 11 or later (for building and running)
- Maven 3.5+ for building
- Kubernetes cluster or YARN cluster for deployment
- etcd for BSP coordination (configured via
BSP_ETCD_URL)
Note: For K8s-operator module development, run mvn clean install in computer-k8s-operator first to generate CRD classes.
cd computer
# Compile (skip javadoc for faster builds)
mvn clean compile -Dmaven.javadoc.skip=true
# Package (skip tests for faster packaging)
mvn clean package -DskipTests# Unit tests
mvn test -P unit-test
# Integration tests (requires etcd, K8s, HugeGraph)
mvn test -P integrate-test
# Run specific test class
mvn test -P unit-test -Dtest=ClassName
# Run specific test method
mvn test -P unit-test -Dtest=ClassName#methodNamemvn apache-rat:checkCreate job-config.properties:
# Algorithm class
algorithm.class=org.apache.hugegraph.computer.algorithm.centrality.pagerank.PageRank
# HugeGraph connection
hugegraph.url=http://hugegraph-server:8080
hugegraph.graph=hugegraph
# K8s configuration
k8s.namespace=default
k8s.image=hugegraph/hugegraph-computer:latest
k8s.master.cpu=2
k8s.master.memory=4Gi
k8s.worker.replicas=3
k8s.worker.cpu=4
k8s.worker.memory=8Gi
# BSP coordination (etcd)
bsp.etcd.url=http://etcd-cluster:2379
# Algorithm parameters (PageRank example)
# Alpha parameter (1 - damping factor), default: 0.15
page_rank.alpha=0.85
# Maximum supersteps (iterations), controlled by BSP framework
bsp.max_superstep=20
# L1 norm difference threshold for convergence, default: 0.00001
pagerank.l1DiffThreshold=0.0001java -jar computer-driver/target/computer-driver-${VERSION}.jar \
--config job-config.properties# Check pod status
kubectl get pods -n default
# View master logs
kubectl logs hugegraph-computer-master-xxx -n default
# View worker logs
kubectl logs hugegraph-computer-worker-0 -n defaultNote: YARN deployment support is under development. Use Kubernetes for production deployments.
| Algorithm | Class | Description |
|---|---|---|
| PageRank | algorithm.centrality.pagerank.PageRank |
Standard PageRank |
| Personalized PageRank | algorithm.centrality.pagerank.PersonalizedPageRank |
Source-specific PageRank |
| Betweenness Centrality | algorithm.centrality.betweenness.BetweennessCentrality |
Shortest-path-based centrality |
| Closeness Centrality | algorithm.centrality.closeness.ClosenessCentrality |
Average distance centrality |
| Degree Centrality | algorithm.centrality.degree.DegreeCentrality |
In/out degree counting |
| Algorithm | Class | Description |
|---|---|---|
| LPA | algorithm.community.lpa.Lpa |
Label Propagation Algorithm |
| WCC | algorithm.community.wcc.Wcc |
Weakly Connected Components |
| Louvain | algorithm.community.louvain.Louvain |
Modularity-based community detection |
| K-Core | algorithm.community.kcore.KCore |
K-core decomposition |
| Algorithm | Class | Description |
|---|---|---|
| SSSP | algorithm.path.sssp.Sssp |
Single Source Shortest Path |
| BFS | algorithm.traversal.bfs.Bfs |
Breadth-First Search |
| Rings | algorithm.path.rings.Rings |
Cycle/ring detection |
| Algorithm | Class | Description |
|---|---|---|
| Triangle Count | algorithm.trianglecount.TriangleCount |
Count triangles |
| Clustering Coefficient | algorithm.clusteringcoefficient.ClusteringCoefficient |
Local clustering measure |
Full algorithm list: See computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/
Algorithms implement the Computation interface from computer-api:
package org.apache.hugegraph.computer.core.worker;
public interface Computation<M extends Value> {
/**
* Initialization at superstep 0
*/
void compute0(ComputationContext context, Vertex vertex);
/**
* Message processing in subsequent supersteps
*/
void compute(ComputationContext context, Vertex vertex, Iterator<M> messages);
}NOTE: This is a simplified example showing the key concepts. For the complete implementation including all required methods (
name(),category(),init(), etc.), see:computer/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/centrality/pagerank/PageRank.java
package org.apache.hugegraph.computer.algorithm.centrality.pagerank;
import org.apache.hugegraph.computer.core.worker.Computation;
import org.apache.hugegraph.computer.core.worker.ComputationContext;
public class PageRank implements Computation<DoubleValue> {
public static final String OPTION_ALPHA = "pagerank.alpha";
public static final String OPTION_MAX_ITERATIONS = "pagerank.max_iterations";
private double alpha;
private int maxIterations;
@Override
public void init(Config config) {
this.alpha = config.getDouble(OPTION_ALPHA, 0.85);
this.maxIterations = config.getInt(OPTION_MAX_ITERATIONS, 20);
}
@Override
public void compute0(ComputationContext context, Vertex vertex) {
// Initialize: set initial PR value
vertex.value(new DoubleValue(1.0));
// Send PR to neighbors
int edgeCount = vertex.numEdges();
if (edgeCount > 0) {
double contribution = 1.0 / edgeCount;
context.sendMessageToAllEdges(vertex, new DoubleValue(contribution));
}
}
@Override
public void compute(ComputationContext context, Vertex vertex, Iterator<DoubleValue> messages) {
// Sum incoming PR contributions
double sum = 0.0;
while (messages.hasNext()) {
sum += messages.next().value();
}
// Calculate new PR value
double newPR = (1.0 - alpha) + alpha * sum;
vertex.value(new DoubleValue(newPR));
// Send to neighbors if not converged
if (context.superstep() < maxIterations) {
int edgeCount = vertex.numEdges();
if (edgeCount > 0) {
double contribution = newPR / edgeCount;
context.sendMessageToAllEdges(vertex, new DoubleValue(contribution));
}
} else {
vertex.inactivate();
}
}
}- Superstep 0: Initialization via
compute0() - Superstep 1+: Message processing via
compute() - Barrier Synchronization: All workers complete superstep N before starting N+1
// Send to specific vertex
context.sendMessage(targetId, new DoubleValue(1.0));
// Send to all outgoing edges
context.sendMessageToAllEdges(vertex, new DoubleValue(1.0));Global state shared across all workers:
// Register aggregator in compute0()
context.registerAggregator("sum", new DoubleValue(0.0), SumAggregator.class);
// Write to aggregator
context.aggregateValue("sum", new DoubleValue(vertex.value()));
// Read aggregator value (available in next superstep)
DoubleValue total = context.aggregatedValue("sum");Reduce message volume by combining messages at sender:
public class SumCombiner implements Combiner<DoubleValue> {
@Override
public void combine(DoubleValue v1, DoubleValue v2, DoubleValue result) {
result.value(v1.value() + v2.value());
}
}- Implement
Computationinterface incomputer-algorithm - Add configuration options with
OPTION_*constants - Implement
compute0()for initialization - Implement
compute()for message processing - Configure in job properties:
algorithm.class=com.example.MyAlgorithm myalgorithm.param1=value1
- Build and test:
mvn clean package -DskipTests
HugeGraph-Computer uses etcd for BSP barrier synchronization:
- Worker Prepare:
workerStepPrepareDone→waitMasterStepPrepareDone - Compute Phase: Workers process vertices and messages locally
- Worker Compute Done:
workerStepComputeDone→waitMasterStepComputeDone - Aggregation: Aggregators combine global state
- Worker Step Done:
workerStepDone→waitMasterStepDone(master returnsSuperstepStat)
WorkerService composes multiple managers with lifecycle hooks:
MessageSendManager: Outgoing message buffering and sendingMessageRecvManager: Incoming message receiving and sortingWorkerAggrManager: Aggregator value collectionDataServerManager: Inter-worker data transferSortManagers: Message and edge sortingSnapshotManager: Checkpoint creation
All managers implement:
initAll(): Initialize before first superstepbeforeSuperstep(): Prepare for superstepafterSuperstep(): Cleanup after superstepcloseAll(): Shutdown cleanup
# === Algorithm ===
algorithm.class=<fully qualified class name>
algorithm.message_class=<message value class>
algorithm.result_class=<result value class>
# === HugeGraph Input ===
hugegraph.url=http://localhost:8080
hugegraph.graph=hugegraph
hugegraph.input.vertex_label=person
hugegraph.input.edge_label=knows
hugegraph.input.filter=<gremlin filter>
# === HugeGraph Output ===
hugegraph.output.vertex_property=pagerank_value
hugegraph.output.edge_property=<property name>
# === HDFS Input ===
input.hdfs.path=/graph/input
input.hdfs.format=json
# === HDFS Output ===
output.hdfs.path=/graph/output
output.hdfs.format=json
# === Worker Resources ===
worker.count=3
worker.memory=8Gi
worker.cpu=4
worker.thread_count=<cpu cores>
# === BSP Coordination ===
bsp.etcd.url=http://etcd:2379
bsp.max_superstep=100
bsp.log_interval=10
# === Memory Management ===
worker.data.dirs=/data1,/data2
worker.write_buffer_size=134217728
worker.max_spill_size=1073741824Computer auto-manages memory to prevent OOM:
- In-Memory Buffering: Vertices, edges, messages buffered in memory
- Spill Threshold: When memory usage exceeds threshold, spill to disk
- Disk Storage: Configurable data directories (
worker.data.dirs) - Automatic Cleanup: Spilled data cleaned after superstep completion
Best Practice: Allocate worker memory ≥ 2x graph size for optimal performance.
# Generate CRD classes first
cd computer-k8s-operator
mvn clean installGenerated classes appear in computer-k8s/target/generated-sources/.
- Verify
bsp.etcd.urlis reachable from all pods - Check etcd cluster health:
etcdctl endpoint health - Ensure firewall allows port 2379
- Increase
worker.memoryin job config - Reduce
worker.write_buffer_sizeto trigger earlier spilling - Increase
worker.countto distribute graph across more workers
- Check algorithm parameters (e.g.,
pagerank.convergence_tolerance) - Monitor superstep logs for progress
- Consider using combiners to reduce message volume
| File | Description |
|---|---|
computer-api/.../Computation.java |
Algorithm interface contract (computer/computer-api/src/main/java/org/apache/hugegraph/computer/core/worker/Computation.java:25) |
computer-core/.../WorkerService.java |
Worker runtime orchestration (computer/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java:1) |
computer-core/.../Bsp4Worker.java |
BSP coordination logic (computer/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/Bsp4Worker.java:1) |
computer-algorithm/.../PageRank.java |
Example algorithm implementation (computer/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/centrality/pagerank/PageRank.java:1) |
The CI pipeline (.github/workflows/computer-ci.yml) runs:
- License check (Apache RAT)
- Setup HDFS (Hadoop 3.3.2)
- Setup Minikube/Kubernetes
- Load test data into HugeGraph
- Compile with JDK 11
- Run integration tests (
-P integrate-test) - Run unit tests (
-P unit-test) - Upload coverage to Codecov
# Setup test environment (etcd, HDFS, K8s)
cd computer-dist/src/assembly/travis
./start-etcd.sh
./start-hdfs.sh
./start-minikube.sh
# Run tests
cd ../../../../
mvn test -P integrate-testSee the main Contributing Guide for how to contribute.
HugeGraph-Computer is licensed under Apache 2.0 License.