Batch job orchestrator — a Python library for defining, scheduling, and running batch processing jobs with dependency management and retry logic.
graph TD
A[Define Jobs] --> B[Add to BatchFlow]
B --> C[Resolve Dependencies]
C --> D[Topological Sort]
D --> E[Async Execution Engine]
E --> F{Job Status}
F -->|completed| G[Collect Results]
F -->|failed| H[Retry Logic]
H --> E
G --> I[Export Report]
- Dependency management — declare job dependencies and BatchFlow resolves execution order via topological sort
- Async execution — run independent jobs concurrently with
asyncio - Retry logic — configurable retries with exponential backoff for failed jobs
- Status tracking — real-time job status:
pending,running,completed,failed - Reports — export execution reports in JSON or plain text
- Type-safe config — Pydantic-based configuration and validation
pip install -e .import asyncio
from batchflow import BatchFlow
flow = BatchFlow()
# Define jobs (sync or async functions)
async def extract_data():
return {"rows": 1000}
async def transform_data():
return {"transformed": True}
async def load_data():
return {"loaded": True}
# Add jobs with dependencies
flow.add_job("extract", extract_data)
flow.add_job("transform", transform_data, depends_on=["extract"])
flow.add_job("load", load_data, depends_on=["transform"])
# Run all jobs (respects dependency order)
asyncio.run(flow.run_all())
# Check status
print(flow.get_status())
# Get results
print(flow.get_results())
# Export report
report = flow.export_report(format="json")
print(report)# If any jobs failed, retry them
asyncio.run(flow.retry_failed())from batchflow.config import BatchFlowConfig
config = BatchFlowConfig(
max_retries=5,
retry_delay=2.0,
concurrency_limit=8,
)
flow = BatchFlow(config=config)make install # Install dependencies
make test # Run tests
make lint # Run linter
make format # Format codeBatchFlow/
├── src/batchflow/
│ ├── __init__.py # Public API
│ ├── core.py # BatchFlow engine
│ ├── config.py # Pydantic configuration
│ └── utils.py # Dependency resolution, retry, status tracking
├── tests/
│ └── test_core.py # Test suite
├── docs/
│ └── ARCHITECTURE.md # Architecture documentation
├── pyproject.toml
├── Makefile
└── README.md
Inspired by workflow orchestration and batch processing trends.
Built by Officethree Technologies | Made with love and AI