FlowForge is a small distributed fulfillment workflow prototype built with FastAPI and Temporal. It models a real order flow with inventory reservation, payment processing, warehouse updates, and saga-style compensation when something fails.
The goal is not to hide failures. The goal is to make them recoverable.
An order moves through four steps:
- Check inventory
- Reserve inventory
- Process payment
- Update the warehouse
If a later step fails, FlowForge runs compensating actions in reverse order. For example, if payment succeeds but the warehouse update fails, the workflow refunds the payment and releases the reserved stock.
Temporal owns the durable workflow execution. FastAPI gives clients a simple HTTP surface for starting orders and checking state.
Multi-step business operations are awkward in a normal request/response API. A single HTTP request can return a success or failure, but the real world is often messier:
- a payment provider accepts the charge and then the warehouse service times out
- inventory is reserved, but the next system in the chain is unavailable
- a worker process crashes halfway through an order
- a retry runs the same side effect more than once
FlowForge demonstrates how to handle that class of problem with the saga pattern. Every side effect has an undo step, and the workflow records enough state to know what should happen next.
| Layer | Technology | Notes |
|---|---|---|
| API | FastAPI | Starts workflows and exposes read endpoints |
| Workflow engine | Temporal Python SDK | Runs durable order workflows |
| Worker | Temporal Worker | Executes workflow activities |
| Models | Pydantic | Request, response, and state schemas |
| State | SQLite-backed stores | Persistent local storage for inventory, payments, warehouse, and order summaries |
| Package manager | uv | Dependency and test runner |
| Containers | Docker Compose | Local Temporal, API, worker, starter, and test services |
flowforge/
├── api/
│ ├── app.py # FastAPI application and HTTP endpoints
│ └── schemas.py # API schema exports
├── activities/
│ └── order.py # Order activities and compensation activities
├── integrations.py # Swappable inventory, payment, and warehouse interfaces
├── workflows/
│ ├── workflows.py # FulfillmentWorkflow
│ └── compensation.py # Saga compensation helper
├── worker/
│ └── worker.py # Temporal worker entrypoint
├── mocks/
│ ├── inventory_api.py # Thin inventory mock wrapper
│ └── stripe_mock.py # Thin payment mock wrapper
├── tests/
│ ├── test_api.py
│ ├── test_compensation.py
│ ├── test_docker_config.py
│ ├── test_store.py
│ └── test_workflow.py
├── config.py # Environment-based runtime settings
├── models.py # Shared Pydantic models
└── store.py # SQLite-backed state stores
The workflow registers compensation before it runs each side-effecting step. That matters because an activity can mutate external state and then fail before returning cleanly.
Current compensation behavior:
| Step | Side effect | Compensation |
|---|---|---|
reserve_inventory |
reserves stock by order id | release_inventory |
process_payment |
charges by idempotency key | refund_payment_by_idempotency_key |
update_warehouse |
writes a warehouse record | revert_warehouse |
Compensations are intentionally idempotent where possible. Retrying a reservation or running a missing refund should not corrupt the prototype state.
- Python 3.13+
- uv
- Docker and Docker Compose, for the containerized local setup
- Temporal CLI, for the non-Docker setup
The easiest way to run the full local stack is Docker Compose. It starts:
- Temporal dev server on
localhost:7233 - Temporal UI on
http://localhost:8233 - FlowForge API on
http://localhost:8000 - FlowForge Temporal worker on the
fulfillment-queuetask queue - Persistent FlowForge state in the
flowforge-stateDocker volume
Start the stack:
docker compose up --buildRun the test service in Docker:
docker compose --profile test run --rm testsRun the one-shot demo starter against the Docker stack:
docker compose --profile demo run --rm starterThe project image has separate Dockerfile targets:
| Target | Purpose |
|---|---|
api |
Runs python main.py api |
worker |
Runs the Temporal worker |
starter |
Starts a sample workflow and waits for the result |
tests |
Runs pytest -q |
The Docker services use TEMPORAL_HOST=temporal:7233 so containers connect to
Temporal over the Compose network. Local processes outside Docker should use the
default localhost:7233.
The API and worker share DATABASE_URL=sqlite:////data/flowforge.sqlite3 in
Docker so workflow side effects and HTTP read endpoints see the same persisted
state.
Use this path if you want to run the app directly on your machine without Docker.
uv synctemporal server start-devTemporal's local UI will be available at:
http://localhost:8233
Run this in a separate terminal:
uv run python main.py workerThe worker listens on the configured task queue and executes workflow activities.
Run this in another terminal:
uv run python main.py api --port 8000The API will be available at:
http://localhost:8000
You can also start the API and worker in one process:
uv run python main.py allCreate an order:
curl -X POST http://localhost:8000/orders \
-H "Content-Type: application/json" \
-d '{
"product_id": "SKU-001",
"quantity": 2,
"customer_id": "cust-42",
"payment_method": "tok_visa"
}'Example response:
{
"workflow_id": "order-ord_abc123",
"order_id": "ord_abc123",
"status": "started"
}Check workflow status:
curl http://localhost:8000/orders/order-ord_abc123/statusInspect the full engine snapshot:
curl http://localhost:8000/engine/snapshotSet FAIL_AT on the worker process to force a failure after a named step:
FAIL_AT=warehouse uv run python main.py workerSupported values:
| Value | Failure point |
|---|---|
inventory-check |
after inventory check |
inventory |
after inventory reservation |
payment |
after payment charge |
warehouse |
after warehouse update |
When a failure is injected after a side effect, the workflow should move into compensation and unwind the completed side effects in reverse order.
Basic API health check.
{
"status": "ok"
}Starts a new fulfillment workflow.
Request:
{
"product_id": "SKU-001",
"quantity": 2,
"customer_id": "cust-42",
"payment_method": "tok_visa",
"workflow_id": "optional-client-provided-id"
}Response:
{
"workflow_id": "optional-client-provided-id",
"order_id": "ord_abc123",
"status": "started"
}Lists known orders from the persisted workflow registry.
Queries Temporal for the current workflow state.
Returns the current inventory snapshot.
Returns a payment record by charge id.
Returns warehouse records.
Returns orders, inventory, payments, and warehouse records in one response.
Run the local Python test suite:
uv run pytestRun the live Temporal integration test against a running Temporal server:
RUN_TEMPORAL_INTEGRATION=1 uv run pytest flowforge/tests/test_temporal_integration.pyRun the containerized test target:
docker compose --profile test run --rm testsThe default suite covers the API surface, persistent stores, workflow state, compensation ordering, and Docker/Compose configuration. The live Temporal test is opt-in so normal test runs do not require an external Temporal server.
| Variable | Default | Description |
|---|---|---|
TEMPORAL_HOST |
localhost:7233 |
Temporal server address |
TASK_QUEUE |
fulfillment-queue |
Temporal task queue name |
MAX_CONCURRENT_ACTIVITIES |
100 |
Worker activity concurrency |
MAX_CONCURRENT_WORKFLOW_TASKS |
100 |
Worker workflow task concurrency |
DATABASE_URL |
empty | Optional SQLite URL, for example sqlite:////data/flowforge.sqlite3 |
FLOWFORGE_DB_PATH |
.flowforge/flowforge.sqlite3 |
Local SQLite path when DATABASE_URL is empty |
FAIL_AT |
empty | Optional failure injection point |
FlowForge is still a prototype. The core saga behavior and persistence behavior are real, but several production concerns are intentionally out of scope for now:
- PostgreSQL models are only placeholders
- payment and inventory integrations are local mocks
- the live Temporal tests are opt-in rather than part of every unit run
- replace the placeholder PostgreSQL model layer with a production database adapter
- add migrations for schema changes
- add API authentication and request-level authorization
- expose workflow cancellation and retry controls through the API
- add observability for activity retries, compensation events, and workflow latency