Skip to content

akilisha/orchestrator-poc

Repository files navigation

Event-Driven Workflow Orchestrator

A modern, embeddable workflow orchestration library for Java applications. Built on event-driven architecture with NATS, virtual threads, and structured concurrency.

Features

  • Event-Driven Architecture: Workflows emerge from nodes reacting to events
  • Pull-Based Work Distribution: Nodes pull work via atomic CAS operations
  • Elastic Scaling: Automatic node pool management with min/max instances
  • Fault Tolerance: Automatic orphan detection, retry with exponential backoff, checkpointing
  • Observability: REST API, metrics (Micrometer), and comprehensive logging
  • Modern Java: Built with Java 21, virtual threads, and structured concurrency

Quick Start

Prerequisites

  • Java 21 JDK
  • Gradle 8.0+
  • NATS Server 2.10+ (optional - can use in-memory implementations for development)

Building

./gradlew build

Running

./gradlew run

Or with custom NATS URL:

java -jar build/libs/orchestrator-poc-1.0-SNAPSHOT.jar

Configuration

The orchestrator can be configured via:

  1. Programmatic Configuration (current default)
  2. YAML Configuration (coming soon)
  3. Environment Variables:
    • ORCHESTRATOR_API_PORT: REST API port (default: 8080)

REST API

When running, the orchestrator exposes a REST API on port 8080 (configurable):

  • GET /health - Health check
  • GET /api/nodes - List all nodes
  • GET /api/nodes/{nodeId} - Get node details
  • GET /api/tasks/{taskId} - Get task details
  • GET /api/controller/status - Controller status
  • POST /api/controller/shutdown - Graceful shutdown

Example: Creating a Node

package io.orchestrator.examples.ci.nodes;

import io.orchestrator.node.*;
import io.orchestrator.event.Event;

import java.util.List;

public class MyNode implements Node {

    @Override
    public String getType() {
        return "my-node";
    }

    @Override
    public List<String> getSubscriptions() {
        return List.of("my.event");
    }

    @Override
    public void initialize(NodeContext context) throws NodeInitializationException {
        // Initialize your node
    }

    @Override
    public void run() {
        // Subscribe to events and process tasks
        context.getEventBus().subscribe("my.event", this::handleEvent);
    }

    private void handleEvent(Event event) {
        // Process event and claim associated task
        String taskId = event.getPayloadString("task_id");
        Optional<Task> task = context.claimTask(taskId);

        if (task.isPresent()) {
            // Execute task
            // ...
        }
    }

    @Override
    public void shutdown() {
        // Handle shutdown
    }

    @Override
    public void close() {
        // Cleanup resources
    }

    @Override
    public NodeStatus getStatus() {
        return NodeStatus.IDLE;
    }
}

Plugin System

Nodes are discovered via Java ServiceLoader. Create a file:

META-INF/services/io.orchestrator.node.Node

With the content:

io.orchestrator.examples.ci.nodes.MyNode

Place your node JAR in the plugins/ directory.

Architecture

Core Components

  • WorkflowController: Manages node pools and reconciliation
  • Node: Long-lived workers that execute tasks
  • EventBus: Event publishing and subscription (NATS or in-memory)
  • KeyValueStore: Metadata storage (NATS KV or in-memory)

Workflow Execution

  1. Events are published to NATS JetStream
  2. Nodes subscribe to relevant events
  3. Nodes claim tasks via atomic CAS operations
  4. Tasks execute and publish completion events
  5. Downstream nodes react to completion events

Reconciliation Loop

The controller runs a reconciliation loop every 5 seconds that:

  • Checks node health (heartbeat monitoring)
  • Detects orphaned tasks
  • Scales node pools (spawn/terminate nodes)
  • Maintains desired state

Development

Running Tests

./gradlew test

Project Structure

orchestrator-poc/
├── src/main/java/io/orchestrator/
│   ├── controller/     # WorkflowController
│   ├── node/           # Node interfaces and models
│   ├── event/          # EventBus implementations
│   ├── storage/        # KeyValueStore implementations
│   ├── config/         # Configuration classes
│   ├── api/            # REST API
│   └── metrics/        # Metrics collection
├── plugins/
│   └── example-nodes/  # Example node implementations
└── docs/               # Documentation

Status

Phase 1: Core Engine (90% Complete)

  • ✅ Core interfaces and models
  • ✅ NATS integration (basic)
  • ✅ Controller with reconciliation loop
  • ✅ Node lifecycle management
  • ✅ Retry logic
  • ✅ Visitor pattern framework
  • ✅ Example BuildNode

Phase 2: Production Readiness (60% Complete)

  • ✅ Checkpointing infrastructure
  • ✅ Orphan detection and recovery
  • ✅ Graceful shutdown with timeout
  • ✅ REST API
  • ✅ Metrics infrastructure
  • 🟡 Structured concurrency (in progress)
  • ⏳ Comprehensive test suite

Roadmap

See docs/5-Roadmap.md for detailed implementation roadmap.

License

[To be determined]

Contributing

[To be determined]

About

basic workflow orchestrator

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages