A minimal workflow/graph engine for executing agent workflows with nodes, edges, state management, branching, and looping capabilities. Built with FastAPI and Python.
This project implements a simplified version of a workflow engine (similar to LangGraph) that allows you to:
- Define workflows as graphs with nodes and edges
- Execute nodes sequentially with shared state
- Support conditional branching based on state values
- Implement loops until conditions are met
- Track execution with detailed logs
- Nodes: Python functions that read and modify shared state
- State Management: Pydantic-based state that flows through the workflow
- Edges: Simple and conditional routing between nodes
- Branching: Conditional routing based on state values (>, <, >=, <=, ==, !=)
- Looping: Support for iterative execution until conditions are met
- Execution Logging: Step-by-step execution tracking
POST /graph/create- Create a new workflow graphPOST /graph/run- Execute a graph with initial stateGET /graph/state/{run_id}- Retrieve execution state and logsGET /health- Health check endpoint
The project includes a complete implementation of a code review workflow that:
- Extracts functions from Python code using AST parsing
- Checks complexity using simplified cyclomatic complexity
- Detects issues like long functions, high complexity, too many parameters
- Suggests improvements based on detected issues
- Calculates quality score (0-10 scale)
- Loops until quality score >= 7.0 or max iterations reached
agent-workflow-engine/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI application
│ ├── models/
│ │ ├── __init__.py
│ │ ├── graph.py # Graph, Node, Edge models
│ │ ├── state.py # State management
│ │ └── api.py # API request/response models
│ ├── core/
│ │ ├── __init__.py
│ │ ├── engine.py # Graph execution engine
│ │ └── tools.py # Tool registry
│ ├── workflows/
│ │ ├── __init__.py
│ │ └── code_review.py # Code review workflow
│ ├── storage/
│ │ ├── __init__.py
│ │ └── memory.py # In-memory storage
│ └── api/
│ ├── __init__.py
│ └── routes.py # API endpoints
├── requirements.txt
├── start.bat # Windows startup script
├── .gitignore
└── README.md
- Python 3.8 or higher
- pip (Python package manager)
-
Start the server: In your terminal, run:
.\start.batThis will create a virtual environment, install dependencies, and start the FastAPI server on
http://localhost:8000 -
Run the test demo: In a new terminal, run:
python test_demo.py
Watch the test results for sample code workflows defined in
test_demo.py
If you prefer manual setup:
# Create virtual environment
python -m venv .venv
# Activate virtual environment
# On Windows:
.venv\Scripts\activate
# On Linux/Mac:
source .venv/bin/activate
# Install dependencies
pip install -r requirements.txt
# Run the server
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reloadOnce the server is running, visit:
- Interactive API Docs: http://localhost:8000/docs
- Alternative Docs: http://localhost:8000/redoc
curl -X POST "http://localhost:8000/graph/create" \
-H "Content-Type: application/json" \
-d '{
"name": "Code Review Mini-Agent",
"description": "Analyzes Python code quality",
"nodes": [
{"name": "extract", "function_name": "extract_functions"},
{"name": "analyze", "function_name": "check_complexity"},
{"name": "detect", "function_name": "detect_issues"},
{"name": "suggest", "function_name": "suggest_improvements"},
{"name": "score", "function_name": "calculate_quality_score"}
],
"edges": [
{"from_node": "extract", "to_node": "analyze"},
{"from_node": "analyze", "to_node": "detect"},
{"from_node": "detect", "to_node": "suggest"},
{"from_node": "suggest", "to_node": "score"},
{
"from_node": "score",
"to_node": {
"condition_key": "quality_score",
"condition_operator": "<",
"condition_value": 7.0,
"true_node": "analyze",
"false_node": "END"
}
}
],
"start_node": "extract"
}'curl -X POST "http://localhost:8000/graph/run" \
-H "Content-Type: application/json" \
-d '{
"graph_id": "<graph_id_from_step_1>",
"initial_state": {
"code": "def long_function(a, b, c, d, e, f):\n if a > 0:\n if b > 0:\n if c > 0:\n for i in range(100):\n print(i)\n return a + b"
}
}'curl "http://localhost:8000/graph/state/<run_id>"✅ Node Execution: Execute Python functions as workflow nodes
✅ State Flow: Pydantic-based state management across nodes
✅ Simple Edges: Direct node-to-node transitions
✅ Conditional Branching: Route based on state values with operators (<, >, <=, >=, ==, !=)
✅ Looping: Repeat nodes until conditions are met
✅ Tool Registry: Register and manage callable functions
✅ Execution Logging: Track each step with state snapshots
✅ In-Memory Storage: Store graphs and runs (thread-safe)
✅ Error Handling: Graceful error handling with detailed messages
✅ Cycle Detection: Prevent infinite loops with max iteration limit
- Uses Pydantic models for type safety and validation
- State flows through nodes as a dictionary-like object
- Each node can read and modify the state
- Engine traverses the graph starting from
start_node - Each node executes its associated tool function
- Edges determine the next node (simple or conditional)
- Execution continues until reaching "END" or max iterations
- Global registry for all callable tools
- Tools are registered using the
@register_tooldecorator - Nodes reference tools by name
Given more time, I would add:
- Async Execution: Support for async/await in node functions
- WebSocket Streaming: Real-time execution log streaming
- Persistent Storage: SQLite/PostgreSQL backend option
- Parallel Execution: Run independent nodes in parallel
- Graph Validation: Validate graph structure before execution
- Dynamic Graph Modification: Modify graphs during execution
- Better Error Recovery: Retry logic and fallback nodes
- Metrics & Monitoring: Execution time, success rates, etc.
- Graph Visualization: Visual representation of workflows
- More Workflow Examples: Additional sample workflows
- Unit Tests: Comprehensive test coverage
- Authentication: API key or JWT-based auth
- Rate Limiting: Prevent abuse of API endpoints
- Caching: Cache execution results for identical inputs
- Simplicity and speed for MVP
- Easy to upgrade to persistent storage later
- Thread-safe implementation with locks
- Type safety and validation
- Excellent FastAPI integration
- Clear data models
- Covers most use cases
- Easy to understand and debug
- Can be extended to support complex expressions
You can test the API using:
- Swagger UI: http://localhost:8000/docs (interactive testing)
- cURL: Command-line HTTP requests
- Postman: Import the OpenAPI schema from
/openapi.json - Python requests: Write custom test scripts