Skip to content

Latest commit

 

History

History
530 lines (396 loc) · 18.9 KB

File metadata and controls

530 lines (396 loc) · 18.9 KB

Apache HugeGraph-Computer (Java)

HugeGraph-Computer is a distributed graph processing framework implementing the Pregel model (BSP - Bulk Synchronous Parallel). It runs on Kubernetes or YARN clusters and integrates with HugeGraph for graph input/output.

Features

  • Distributed MPP Computing: Massively parallel graph processing across cluster nodes
  • BSP Model: Algorithm execution through iterative supersteps with global synchronization
  • Auto Memory Management: Automatic spill to disk when memory is insufficient - never OOM
  • Flexible Data Sources: Load from HugeGraph or HDFS, output to HugeGraph or HDFS
  • Easy Algorithm Development: Focus on single-vertex logic without worrying about distribution
  • Production-Ready: Battle-tested on billion-scale graphs with Kubernetes integration

Architecture

Module Structure

┌─────────────────────────────────────────────────────────────┐
│                    HugeGraph-Computer                       │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   computer-driver                    │   │
│  │              (Job Submission & Coordination)         │   │
│  └─────────────────────────┬───────────────────────────┘   │
│                            │                                │
│  ┌─────────────────────────┼───────────────────────────┐   │
│  │         Deployment Layer (choose one)               │   │
│  │  ┌──────────────┐              ┌──────────────┐     │   │
│  │  │ computer-k8s │              │ computer-yarn│     │   │
│  │  └──────────────┘              └──────────────┘     │   │
│  └─────────────────────────┬───────────────────────────┘   │
│                            │                                │
│  ┌─────────────────────────┼───────────────────────────┐   │
│  │                   computer-core                      │   │
│  │        (WorkerService, MasterService, BSP)          │   │
│  │  ┌──────────────────────────────────────────────┐   │   │
│  │  │  Managers: Message, Aggregation, Snapshot... │   │   │
│  │  └──────────────────────────────────────────────┘   │   │
│  └─────────────────────────┬───────────────────────────┘   │
│                            │                                │
│  ┌─────────────────────────┼───────────────────────────┐   │
│  │              computer-algorithm                      │   │
│  │    (PageRank, LPA, WCC, SSSP, TriangleCount...)     │   │
│  └─────────────────────────┬───────────────────────────┘   │
│                            │                                │
│  ┌─────────────────────────┴───────────────────────────┐   │
│  │                    computer-api                      │   │
│  │    (Computation, Vertex, Edge, Aggregator, Value)   │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

Module Descriptions

Module Description
computer-api Public interfaces for algorithm development (Computation, Vertex, Edge, Aggregator, Combiner)
computer-core Runtime implementation (WorkerService, MasterService, messaging, BSP coordination, memory management)
computer-algorithm Built-in graph algorithms (45+ implementations)
computer-driver Job submission and driver-side coordination
computer-k8s Kubernetes deployment integration
computer-yarn YARN deployment integration
computer-k8s-operator Kubernetes operator for job lifecycle management
computer-dist Distribution packaging and assembly
computer-test Integration tests and unit tests

Prerequisites

  • JDK 11 or later (for building and running)
  • Maven 3.5+ for building
  • Kubernetes cluster or YARN cluster for deployment
  • etcd for BSP coordination (configured via BSP_ETCD_URL)

Note: For K8s-operator module development, run mvn clean install in computer-k8s-operator first to generate CRD classes.

Quick Start

Build from Source

cd computer

# Compile (skip javadoc for faster builds)
mvn clean compile -Dmaven.javadoc.skip=true

# Package (skip tests for faster packaging)
mvn clean package -DskipTests

Run Tests

# Unit tests
mvn test -P unit-test

# Integration tests (requires etcd, K8s, HugeGraph)
mvn test -P integrate-test

# Run specific test class
mvn test -P unit-test -Dtest=ClassName

# Run specific test method
mvn test -P unit-test -Dtest=ClassName#methodName

License Check

mvn apache-rat:check

Deploy on Kubernetes

1. Configure Job

Create job-config.properties:

# Algorithm class
algorithm.class=org.apache.hugegraph.computer.algorithm.centrality.pagerank.PageRank

# HugeGraph connection
hugegraph.url=http://hugegraph-server:8080
hugegraph.graph=hugegraph

# K8s configuration
k8s.namespace=default
k8s.image=hugegraph/hugegraph-computer:latest
k8s.master.cpu=2
k8s.master.memory=4Gi
k8s.worker.replicas=3
k8s.worker.cpu=4
k8s.worker.memory=8Gi

# BSP coordination (etcd)
bsp.etcd.url=http://etcd-cluster:2379

# Algorithm parameters (PageRank example)
# Alpha parameter (1 - damping factor), default: 0.15
page_rank.alpha=0.85

# Maximum supersteps (iterations), controlled by BSP framework
bsp.max_superstep=20

# L1 norm difference threshold for convergence, default: 0.00001
pagerank.l1DiffThreshold=0.0001

2. Submit Job

java -jar computer-driver/target/computer-driver-${VERSION}.jar \
  --config job-config.properties

3. Monitor Job

# Check pod status
kubectl get pods -n default

# View master logs
kubectl logs hugegraph-computer-master-xxx -n default

# View worker logs
kubectl logs hugegraph-computer-worker-0 -n default

Deploy on YARN

Note: YARN deployment support is under development. Use Kubernetes for production deployments.

Available Algorithms

Centrality Algorithms

Algorithm Class Description
PageRank algorithm.centrality.pagerank.PageRank Standard PageRank
Personalized PageRank algorithm.centrality.pagerank.PersonalizedPageRank Source-specific PageRank
Betweenness Centrality algorithm.centrality.betweenness.BetweennessCentrality Shortest-path-based centrality
Closeness Centrality algorithm.centrality.closeness.ClosenessCentrality Average distance centrality
Degree Centrality algorithm.centrality.degree.DegreeCentrality In/out degree counting

Community Detection

Algorithm Class Description
LPA algorithm.community.lpa.Lpa Label Propagation Algorithm
WCC algorithm.community.wcc.Wcc Weakly Connected Components
Louvain algorithm.community.louvain.Louvain Modularity-based community detection
K-Core algorithm.community.kcore.KCore K-core decomposition

Path Finding

Algorithm Class Description
SSSP algorithm.path.sssp.Sssp Single Source Shortest Path
BFS algorithm.traversal.bfs.Bfs Breadth-First Search
Rings algorithm.path.rings.Rings Cycle/ring detection

Graph Structure

Algorithm Class Description
Triangle Count algorithm.trianglecount.TriangleCount Count triangles
Clustering Coefficient algorithm.clusteringcoefficient.ClusteringCoefficient Local clustering measure

Full algorithm list: See computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/

Developing Custom Algorithms

Algorithm Contract

Algorithms implement the Computation interface from computer-api:

package org.apache.hugegraph.computer.core.worker;

public interface Computation<M extends Value> {
    /**
     * Initialization at superstep 0
     */
    void compute0(ComputationContext context, Vertex vertex);

    /**
     * Message processing in subsequent supersteps
     */
    void compute(ComputationContext context, Vertex vertex, Iterator<M> messages);
}

Example: Simple PageRank

NOTE: This is a simplified example showing the key concepts. For the complete implementation including all required methods (name(), category(), init(), etc.), see: computer/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/centrality/pagerank/PageRank.java

package org.apache.hugegraph.computer.algorithm.centrality.pagerank;

import org.apache.hugegraph.computer.core.worker.Computation;
import org.apache.hugegraph.computer.core.worker.ComputationContext;

public class PageRank implements Computation<DoubleValue> {

    public static final String OPTION_ALPHA = "pagerank.alpha";
    public static final String OPTION_MAX_ITERATIONS = "pagerank.max_iterations";

    private double alpha;
    private int maxIterations;

    @Override
    public void init(Config config) {
        this.alpha = config.getDouble(OPTION_ALPHA, 0.85);
        this.maxIterations = config.getInt(OPTION_MAX_ITERATIONS, 20);
    }

    @Override
    public void compute0(ComputationContext context, Vertex vertex) {
        // Initialize: set initial PR value
        vertex.value(new DoubleValue(1.0));

        // Send PR to neighbors
        int edgeCount = vertex.numEdges();
        if (edgeCount > 0) {
            double contribution = 1.0 / edgeCount;
            context.sendMessageToAllEdges(vertex, new DoubleValue(contribution));
        }
    }

    @Override
    public void compute(ComputationContext context, Vertex vertex, Iterator<DoubleValue> messages) {
        // Sum incoming PR contributions
        double sum = 0.0;
        while (messages.hasNext()) {
            sum += messages.next().value();
        }

        // Calculate new PR value
        double newPR = (1.0 - alpha) + alpha * sum;
        vertex.value(new DoubleValue(newPR));

        // Send to neighbors if not converged
        if (context.superstep() < maxIterations) {
            int edgeCount = vertex.numEdges();
            if (edgeCount > 0) {
                double contribution = newPR / edgeCount;
                context.sendMessageToAllEdges(vertex, new DoubleValue(contribution));
            }
        } else {
            vertex.inactivate();
        }
    }
}

Key Concepts

1. Supersteps

  • Superstep 0: Initialization via compute0()
  • Superstep 1+: Message processing via compute()
  • Barrier Synchronization: All workers complete superstep N before starting N+1

2. Message Passing

// Send to specific vertex
context.sendMessage(targetId, new DoubleValue(1.0));

// Send to all outgoing edges
context.sendMessageToAllEdges(vertex, new DoubleValue(1.0));

3. Aggregators

Global state shared across all workers:

// Register aggregator in compute0()
context.registerAggregator("sum", new DoubleValue(0.0), SumAggregator.class);

// Write to aggregator
context.aggregateValue("sum", new DoubleValue(vertex.value()));

// Read aggregator value (available in next superstep)
DoubleValue total = context.aggregatedValue("sum");

4. Combiners

Reduce message volume by combining messages at sender:

public class SumCombiner implements Combiner<DoubleValue> {
    @Override
    public void combine(DoubleValue v1, DoubleValue v2, DoubleValue result) {
        result.value(v1.value() + v2.value());
    }
}

Algorithm Development Workflow

  1. Implement Computation interface in computer-algorithm
  2. Add configuration options with OPTION_* constants
  3. Implement compute0() for initialization
  4. Implement compute() for message processing
  5. Configure in job properties:
    algorithm.class=com.example.MyAlgorithm
    myalgorithm.param1=value1
  6. Build and test:
    mvn clean package -DskipTests

BSP Coordination

HugeGraph-Computer uses etcd for BSP barrier synchronization:

BSP Lifecycle (per superstep)

  1. Worker Prepare: workerStepPrepareDonewaitMasterStepPrepareDone
  2. Compute Phase: Workers process vertices and messages locally
  3. Worker Compute Done: workerStepComputeDonewaitMasterStepComputeDone
  4. Aggregation: Aggregators combine global state
  5. Worker Step Done: workerStepDonewaitMasterStepDone (master returns SuperstepStat)

Manager Pattern

WorkerService composes multiple managers with lifecycle hooks:

  • MessageSendManager: Outgoing message buffering and sending
  • MessageRecvManager: Incoming message receiving and sorting
  • WorkerAggrManager: Aggregator value collection
  • DataServerManager: Inter-worker data transfer
  • SortManagers: Message and edge sorting
  • SnapshotManager: Checkpoint creation

All managers implement:

  • initAll(): Initialize before first superstep
  • beforeSuperstep(): Prepare for superstep
  • afterSuperstep(): Cleanup after superstep
  • closeAll(): Shutdown cleanup

Configuration Reference

Job Configuration

# === Algorithm ===
algorithm.class=<fully qualified class name>
algorithm.message_class=<message value class>
algorithm.result_class=<result value class>

# === HugeGraph Input ===
hugegraph.url=http://localhost:8080
hugegraph.graph=hugegraph
hugegraph.input.vertex_label=person
hugegraph.input.edge_label=knows
hugegraph.input.filter=<gremlin filter>

# === HugeGraph Output ===
hugegraph.output.vertex_property=pagerank_value
hugegraph.output.edge_property=<property name>

# === HDFS Input ===
input.hdfs.path=/graph/input
input.hdfs.format=json

# === HDFS Output ===
output.hdfs.path=/graph/output
output.hdfs.format=json

# === Worker Resources ===
worker.count=3
worker.memory=8Gi
worker.cpu=4
worker.thread_count=<cpu cores>

# === BSP Coordination ===
bsp.etcd.url=http://etcd:2379
bsp.max_superstep=100
bsp.log_interval=10

# === Memory Management ===
worker.data.dirs=/data1,/data2
worker.write_buffer_size=134217728
worker.max_spill_size=1073741824

Memory Management

Computer auto-manages memory to prevent OOM:

  1. In-Memory Buffering: Vertices, edges, messages buffered in memory
  2. Spill Threshold: When memory usage exceeds threshold, spill to disk
  3. Disk Storage: Configurable data directories (worker.data.dirs)
  4. Automatic Cleanup: Spilled data cleaned after superstep completion

Best Practice: Allocate worker memory ≥ 2x graph size for optimal performance.

Troubleshooting

K8s CRD Classes Not Found

# Generate CRD classes first
cd computer-k8s-operator
mvn clean install

Generated classes appear in computer-k8s/target/generated-sources/.

etcd Connection Errors

  • Verify bsp.etcd.url is reachable from all pods
  • Check etcd cluster health: etcdctl endpoint health
  • Ensure firewall allows port 2379

Out of Memory Errors

  • Increase worker.memory in job config
  • Reduce worker.write_buffer_size to trigger earlier spilling
  • Increase worker.count to distribute graph across more workers

Slow Convergence

  • Check algorithm parameters (e.g., pagerank.convergence_tolerance)
  • Monitor superstep logs for progress
  • Consider using combiners to reduce message volume

Important Files

File Description
computer-api/.../Computation.java Algorithm interface contract (computer/computer-api/src/main/java/org/apache/hugegraph/computer/core/worker/Computation.java:25)
computer-core/.../WorkerService.java Worker runtime orchestration (computer/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java:1)
computer-core/.../Bsp4Worker.java BSP coordination logic (computer/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/Bsp4Worker.java:1)
computer-algorithm/.../PageRank.java Example algorithm implementation (computer/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/centrality/pagerank/PageRank.java:1)

Testing

CI/CD Pipeline

The CI pipeline (.github/workflows/computer-ci.yml) runs:

  1. License check (Apache RAT)
  2. Setup HDFS (Hadoop 3.3.2)
  3. Setup Minikube/Kubernetes
  4. Load test data into HugeGraph
  5. Compile with JDK 11
  6. Run integration tests (-P integrate-test)
  7. Run unit tests (-P unit-test)
  8. Upload coverage to Codecov

Local Testing

# Setup test environment (etcd, HDFS, K8s)
cd computer-dist/src/assembly/travis
./start-etcd.sh
./start-hdfs.sh
./start-minikube.sh

# Run tests
cd ../../../../
mvn test -P integrate-test

Links

Contributing

See the main Contributing Guide for how to contribute.

License

HugeGraph-Computer is licensed under Apache 2.0 License.