AI Agent Orchestration Control Plane Prototype
Current Status: Phase 1 Prototype
This project is NOT production-ready yet.
agentManager 是一个 AI Agent 控制平面的原型实现。它提供了任务依赖管理、状态机、事件总线和调度器的基础框架。
- In-memory DAG engine with cycle detection
- Task state machine with emergency transitions
- Event bus with wildcard subscriptions
- Scheduler with conflict detection and backoff
- FastAPI REST API (12+ endpoints)
- Task plan CRUD with agent assignment, duplicate/dependency validation, confirm hooks, and event publication
- Agent profile system with high/low layers, skills, MCP servers, and relative workdir policy
- Built-in skill/MCP template library with project-level override support
- Runtime hooks subsystem (opt-in via HOOKS_ENABLED=true)
- Scheduled task runner with asyncio-based dispatch
- Cross-platform install script (scripts/install.py) with --dry-run / --verify / --verify-docker modes
- 902+ unit tests (100% passing)
- Opt-in durable backend interfaces for PostgreSQL state, S3-compatible checkpoints, Redis Streams retries, and pluggable vector memory
- FastAPI service with full CRUD
- Real L1-L4 repair workflow
- Docker sandbox hardening
- GitHub Actions CI/CD
# Clone repository
git clone https://github.com/Zld1994/agentManager.git
cd agentManager
# Install in development mode
pip install -e .
# Install dev dependencies (optional)
pip install -e ".[dev]"One-click install (cross-platform):
# Dry-run to preview commands
python scripts/install.py --dry-run
# Full install with dev, sandbox, and OTEL extras
python scripts/install.py --with-sandbox --with-otel
# Install and verify
python scripts/install.py --verify --verify-tests
# Verify local Docker/Compose, using WSL Docker fallback on Windows when available
python scripts/install.py --verify-dockerSee docs/install.md for platform-specific instructions.
# Run all unit tests
pytest tests/unit/ -v
# Run specific test module
pytest tests/unit/test_dag_engine.py -v
# Run with coverage
pytest tests/unit/ --cov=agentManager --cov-report=term-missing# Start FastAPI server
python -m uvicorn agentManager.api:app --host 127.0.0.1 --port 8000
# Access API documentation
# Swagger UI: http://localhost:8000/docs
# ReDoc: http://localhost:8000/redocagentManager supports three runtime modes, selected automatically by the
RuntimeFactory based on environment variables:
| Mode | State | Events | Checkpoints | Memory | When |
|---|---|---|---|---|---|
| local memory (default) | In-memory | In-memory | In-memory | SQLite | No DATABASE_URL / REDIS_URL set |
| local durable | In-memory | In-memory | In-memory | SQLite + vector backend | VECTOR_BACKEND=qdrant |
| production-like | PostgreSQL | Redis Streams | S3/MinIO | SQLite + Qdrant | All env vars set |
local memory — All state lives in process memory. Suitable for development, testing, and short-lived experiments. Data is lost on restart.
local durable — Structured data persists in SQLite while vector search delegates to Qdrant (or the built-in Jaccard fallback). Suitable for single-machine long-running sessions.
production-like — Full durable stack via Docker Compose: PostgreSQL for
state, Redis Streams for events, S3-compatible storage for checkpoints, and
Qdrant for vector search. Requires docker compose up.
Local usage defaults to in-memory state and SQLite-backed memory. Production-oriented durability is opt-in through environment variables:
DATABASE_URL=postgresql://agentmanager:secret@postgres:5432/agentmanager
REDIS_URL=redis://redis:6379/0
OBJECT_STORE_ENDPOINT=http://minio:9000
OBJECT_STORE_BUCKET=agentmanager-checkpoints
OBJECT_STORE_ACCESS_KEY=...
OBJECT_STORE_SECRET_KEY=...
VECTOR_BACKEND=sqlite # sqlite, memory, or qdrant
QDRANT_URL=http://qdrant:6333
QDRANT_API_KEY=...The RuntimeFactory reads these variables at startup and wires the
appropriate backend implementations. When a URL is empty or unset, the
corresponding in-memory or SQLite fallback is used automatically. If a
durable backend fails to initialise (e.g. Postgres is unreachable), the
factory logs a warning and falls back to the in-memory default so the API
remains functional.
agentManager.storage exposes the durable repository/object-store interfaces. Unit tests mock
PostgreSQL, Redis, S3/MinIO, and Qdrant clients, so a live service stack is not required for the
default test workflow.
The API installs a request correlation middleware and exposes Prometheus metrics at /metrics.
Structured JSON logs, audit helpers, and local-safe tracing hooks for workflow, task, and recovery
operations are configured through environment variables:
LOG_LEVEL=INFO
LOG_FORMAT=json
REQUEST_CORRELATION_HEADER=X-Request-ID
WORKFLOW_CORRELATION_METADATA_KEY=correlation_id
AUDIT_LOGGER_NAME=agentManager.audit
OTEL_TRACING_ENABLED=false
OTEL_SERVICE_NAME=agentManager
OTEL_EXPORTER_OTLP_ENDPOINT=Tracing remains disabled by default for local development. OTEL_TRACING_ENABLED=true records the
opt-in setting for deployments that wire a collector/exporter around these hooks.
# Health check
curl http://127.0.0.1:8000/health
# Get system status
curl http://127.0.0.1:8000/status
# Create a task
curl -X POST http://127.0.0.1:8000/tasks \
-H "Content-Type: application/json" \
-d '{"node_id":"task_1","task_type":"data_processing"}'
# Get ready tasks
curl http://127.0.0.1:8000/tasks/ready
# Complete a task
curl -X POST http://127.0.0.1:8000/tasks/task_1/completeagentManager/
├── api.py # FastAPI application entry point
├── engine/
│ ├── __init__.py
│ ├── dag.py # DAG engine (cycle detection fixed)
│ ├── state_manager.py # State machine (HITL fixed)
│ ├── event_bus.py # Event bus (wildcard fixed)
│ └── scheduler.py # Scheduler (deadlock fixed)
├── memory/
│ ├── memory_system.py
│ └── task_history.py
├── observability/ # Logging, audit, and tracing helpers
├── runtime/ # Reserved for Phase 2
│ └── __init__.py
└── __init__.py
monitoring/
└── prometheus.yml # Prometheus configuration
tests/
├── unit/
│ ├── test_dag_engine.py # 13 tests
│ ├── test_state_manager.py # 12 tests
│ ├── test_event_bus.py # 9 tests
│ ├── test_scheduler.py # 17 tests
│ └── test_api.py # 15 tests
├── integration/ # Reserved for Phase 2
└── e2e/ # Reserved for Phase 3
pyproject.toml # Project configuration
README.md # This file
Health check endpoint.
Response (200 OK):
{
"status": "ok",
"version": "0.1.0",
"timestamp": "2026-05-24T04:00:00Z"
}Get system status.
Response (200 OK):
{
"total_tasks": 5,
"running_tasks": 2,
"completed_tasks": 1,
"dag_nodes": 5,
"events_published": 12
}Create a new task.
Request Body:
{
"node_id": "task_1",
"task_type": "data_processing",
"dependencies": ["task_0"],
"metadata": {}
}Response (201 Created):
{
"node_id": "task_1",
"task_type": "data_processing",
"status": "pending",
"dependencies": ["task_0"],
"metadata": {}
}Errors:
- 400: Task already exists or dependency not found
- 500: Internal server error
Get task information.
Response (200 OK):
{
"node_id": "task_1",
"task_type": "data_processing",
"status": "pending",
"dependencies": ["task_0"],
"metadata": {}
}Errors:
- 404: Task not found
Get all tasks ready for execution.
Response (200 OK):
{
"ready_tasks": ["task_1", "task_3"],
"total_tasks": 5,
"running_tasks": 2
}Mark task as completed.
Response (200 OK):
{
"task_id": "task_1",
"status": "completed"
}Errors:
- 404: Task not found
- 500: State transition error
Mark task as failed.
Query Parameters:
reason(optional): Failure reason
Response (200 OK):
{
"task_id": "task_1",
"status": "failed"
}Errors:
- 404: Task not found
- 500: State transition error
Create a task plan with decomposition items, duplicate item ID checks, dependency validation, and agent assignment.
Request Body:
{
"plan_id": "plan-1",
"source_task_id": "task-1",
"items": [
{
"id": "item-1",
"title": "First item",
"verification": "run tests"
}
]
}Response (201 Created):
{
"plan_id": "plan-1",
"status": "draft",
"items": [{"id": "item-1", "status": "pending_review", ...}]
}Retrieve a task plan for review.
Edit task plan items, assignees, skills, or relative workdir values. Blocked after confirm.
Freeze the plan, validate dependencies/verification, run before_task_plan_confirm and
after_task_plan_confirm hooks, and publish task-plan events outside the in-memory plan lock.
A failing blocking before-hook leaves the plan in draft state and emits a confirm-failed event.
Response (200 OK):
{
"plan_id": "plan-1",
"status": "confirmed",
"items": [{"id": "item-1", "status": "confirmed", ...}]
}- DAG Engine: Node creation, edge addition, cycle detection, topological sort
- State Machine: Initialization, transitions, history, emergency HITL
- Event Bus: Subscribe, publish, filtering, wildcard subscriptions
- Scheduler: Task scheduling, conflict detection, backoff mechanism
- API: CRUD operations, error handling, integration flows
test_dag_engine.py 13/13 ✅
test_state_manager.py 12/12 ✅
test_event_bus.py 9/9 ✅
test_scheduler.py 17/17 ✅
test_api.py 15/15 ✅
────────────────────────────────
Total 66/66 ✅ (100%)
# Run all tests
pytest tests/unit/ -v
# Run with coverage report
pytest tests/unit/ --cov=agentManager --cov-report=html
# Run specific test
pytest tests/unit/test_dag_engine.py::TestDAGEngine::test_cycle_detection -vManages task dependencies and execution order.
from agentManager.engine import DAGEngine, DAGNode, TaskStatus
engine = DAGEngine()
engine.add_node(DAGNode(node_id="task_1", task_type="type1"))
engine.add_node(DAGNode(node_id="task_2", task_type="type2"))
engine.add_edge("task_1", "task_2")
engine.update_node_status("task_1", TaskStatus.COMPLETED)
ready = engine.get_ready_nodes() # ["task_2"]Key Features:
- Cycle detection using NetworkX
- Topological sorting
- Ready node discovery
- Status tracking
Manages task lifecycle and state transitions.
from agentManager.engine import StateMachine, TaskState
sm = StateMachine()
sm.initialize("task_1", TaskState.PENDING)
sm.transition("task_1", TaskState.READY)
sm.transition("task_1", TaskState.IMPLEMENTING)
sm.transition("task_1", TaskState.VERIFYING)
sm.transition("task_1", TaskState.COMPLETED)
sm.initialize("task_2", TaskState.READY)
sm.transition("task_2", TaskState.BLOCKED_HITL) # Emergency transition from a non-terminal stateKey Features:
- Valid state transitions
- Emergency HITL from any non-terminal state
- Transition history tracking
- Terminal state detection
Publishes and subscribes to task events.
from agentManager.engine import EventBus, Event, EventType
bus = EventBus()
bus.subscribe(EventType.TASK_COMPLETED, callback)
bus.subscribe(EventType.TASK_COMPLETED, callback, workflow_id=None) # Global
event = Event(event_id="e1", event_type=EventType.TASK_COMPLETED, workflow_id="wf1")
bus.publish(event)Key Features:
- Workflow-specific subscriptions
- Global wildcard subscriptions
- Event filtering by type and workflow
- Exception handling in callbacks
Schedules and manages task execution.
from agentManager.engine import SchedulerEngine
scheduler = SchedulerEngine(max_concurrent_tasks=10)
scheduler.add_task("task_1", priority=5, dependencies=["task_0"])
scheduler.execute_scheduled_tasks()
scheduler.mark_completed("task_1")Key Features:
- Priority-based scheduling
- Dependency conflict detection
- Backoff mechanism for retries
- Concurrent task limit
Problem: topological_sort() returns generator, cycle not detected if not consumed
Fix: Use nx.is_directed_acyclic_graph() for direct DAG check
Problem: Conflicted tasks immediately re-queued, causing infinite loop
Fix: Deferred queue + backoff mechanism (5 second retry delay)
Problem: Only BLOCKED_REPAIR → BLOCKED_HITL allowed
Fix: Allow emergency transitions from any non-terminal state
Problem: Global listeners not triggered
Fix: Publish triggers both exact and wildcard subscriptions
- DAG engine with cycle detection
- State machine with HITL
- Event bus with wildcard subscriptions
- Scheduler with backoff
- FastAPI REST API
- 66 unit tests
- PostgreSQL state repository interface
- Redis Streams event bus with ACK/retry/DLQ behavior
- TaskExecutor execution loop
- RecoveryEngine real recovery
- Memory three-layer design with pluggable vector backend
- DefectRepair pipeline
- Docker sandbox hardening
- Secret management
- Prometheus metrics
- OpenTelemetry tracing
- GitHub Actions CI/CD
- Production deployment guide
- Performance optimization
- Security audit
- Documentation completion
- Reports Archive - Historical phase, completion, and test reports
- Phase 1 Completion Report - Detailed Phase 1 summary
- Phase 1 Quick Reference - Quick API reference
- API Documentation - Full API specification (coming in Phase 2)
This is a prototype project. Contributions are welcome for:
- Bug fixes
- Test improvements
- Documentation
- Performance optimization
The agentManager.observability module provides production-oriented monitoring infrastructure (the project overall is still in prototype phase):
- Structured Logging — JSON-formatted logs with request/workflow correlation IDs (
X-Request-IDheader propagated automatically) - OpenTelemetry Tracing — Opt-in distributed tracing (set
OTEL_ENABLED=true); zero overhead when disabled - Audit Events — Security-critical actions (workflow creation, task execution, sandbox denials, recovery escalation, config validation failures) logged under the
auditnamespace - Prometheus Metrics — Task counters, duration histograms, error/repair counters at
/metrics
| Variable | Default | Description |
|---|---|---|
LOG_LEVEL |
INFO |
Python log level |
LOG_JSON |
true |
JSON output (false = human-readable) |
OTEL_TRACING_ENABLED |
false |
Enable OpenTelemetry tracing |
OTEL_SERVICE_NAME |
agentManager |
OTEL service name |
OTEL_EXPORTER_OTLP_ENDPOINT |
http://localhost:4317 |
OTLP gRPC endpoint |
AUDIT_ENABLED |
true |
Emit audit log events |
MIT License - See LICENSE file for details
- GitHub: https://github.com/Zld1994/agentManager
- Issues: https://github.com/Zld1994/agentManager/issues
- Discussions: https://github.com/Zld1994/agentManager/discussions
Last Updated: 2026-05-24
Status: Phase 1 Complete, Phase 2 In Progress
Test Coverage: 66/66 ✅