Skip to content

MukundaKatta/BatchFlow

Repository files navigation

BatchFlow

CI Python 3.10+ License: MIT Code style: black

Batch job orchestrator — a Python library for defining, scheduling, and running batch processing jobs with dependency management and retry logic.


Architecture

graph TD
    A[Define Jobs] --> B[Add to BatchFlow]
    B --> C[Resolve Dependencies]
    C --> D[Topological Sort]
    D --> E[Async Execution Engine]
    E --> F{Job Status}
    F -->|completed| G[Collect Results]
    F -->|failed| H[Retry Logic]
    H --> E
    G --> I[Export Report]
Loading

Features

  • Dependency management — declare job dependencies and BatchFlow resolves execution order via topological sort
  • Async execution — run independent jobs concurrently with asyncio
  • Retry logic — configurable retries with exponential backoff for failed jobs
  • Status tracking — real-time job status: pending, running, completed, failed
  • Reports — export execution reports in JSON or plain text
  • Type-safe config — Pydantic-based configuration and validation

Quickstart

Installation

pip install -e .

Basic Usage

import asyncio
from batchflow import BatchFlow

flow = BatchFlow()

# Define jobs (sync or async functions)
async def extract_data():
    return {"rows": 1000}

async def transform_data():
    return {"transformed": True}

async def load_data():
    return {"loaded": True}

# Add jobs with dependencies
flow.add_job("extract", extract_data)
flow.add_job("transform", transform_data, depends_on=["extract"])
flow.add_job("load", load_data, depends_on=["transform"])

# Run all jobs (respects dependency order)
asyncio.run(flow.run_all())

# Check status
print(flow.get_status())

# Get results
print(flow.get_results())

# Export report
report = flow.export_report(format="json")
print(report)

Retry Failed Jobs

# If any jobs failed, retry them
asyncio.run(flow.retry_failed())

Configuration

from batchflow.config import BatchFlowConfig

config = BatchFlowConfig(
    max_retries=5,
    retry_delay=2.0,
    concurrency_limit=8,
)
flow = BatchFlow(config=config)

Development

make install    # Install dependencies
make test       # Run tests
make lint       # Run linter
make format     # Format code

Project Structure

BatchFlow/
├── src/batchflow/
│   ├── __init__.py      # Public API
│   ├── core.py          # BatchFlow engine
│   ├── config.py        # Pydantic configuration
│   └── utils.py         # Dependency resolution, retry, status tracking
├── tests/
│   └── test_core.py     # Test suite
├── docs/
│   └── ARCHITECTURE.md  # Architecture documentation
├── pyproject.toml
├── Makefile
└── README.md

Inspired by workflow orchestration and batch processing trends.


Built by Officethree Technologies | Made with love and AI

About

Batch job orchestrator — dependency resolution, retry logic, status tracking, async execution

Topics

Resources

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors