Skip to content

Latest commit

 

History

History
637 lines (466 loc) · 11.7 KB

File metadata and controls

637 lines (466 loc) · 11.7 KB

HTTP API Guide

Complete guide for triggering Temporal workflows via HTTP API.

📋 Table of Contents


Overview

The HTTP API allows you to trigger Temporal workflows from any language or service using REST endpoints.

Two Components

  1. Worker Service (api/worker_service.py) - Processes workflows
  2. API Server (api/api_trigger_example.py) - HTTP endpoints

Both must be running to use the API.


Quick Start

1. Start Services

./scripts/start_api_services.sh

2. Trigger Workflow

curl -X POST http://localhost:8000/workflows/shopping \
  -H "Content-Type: application/json" \
  -d '{
    "items": [
      {"fruit": "APPLE", "amount": 5},
      {"fruit": "BANANA", "amount": 3}
    ]
  }'

3. Stop Services

./scripts/stop_api_services.sh

API Endpoints

Health Check

GET /

Check if API is running and connected to Temporal.

curl http://localhost:8000/

Response:

{
  "service": "Temporal Workflow API",
  "status": "running",
  "temporal_connected": true
}

Trigger Workflow (Synchronous)

POST /workflows/shopping

Trigger workflow and wait for result.

Request:

{
  "items": [
    {"fruit": "APPLE", "amount": 5},
    {"fruit": "BANANA", "amount": 3}
  ]
}

Response:

{
  "workflow_id": "shopping-workflow-abc123",
  "status": "completed",
  "result": "Ordered 5 Apples...Ordered 3 Bananas..."
}

Example:

curl -X POST http://localhost:8000/workflows/shopping \
  -H "Content-Type: application/json" \
  -d '{"items":[{"fruit":"APPLE","amount":5}]}'

Trigger Workflow (Asynchronous)

POST /workflows/shopping/async

Trigger workflow and return immediately.

Request:

{
  "items": [
    {"fruit": "ORANGE", "amount": 10}
  ]
}

Response:

{
  "workflow_id": "shopping-workflow-xyz789",
  "status": "started",
  "result": null
}

Example:

curl -X POST http://localhost:8000/workflows/shopping/async \
  -H "Content-Type: application/json" \
  -d '{"items":[{"fruit":"ORANGE","amount":10}]}'

Get Workflow Status

GET /workflows/{workflow_id}/status

Check workflow status.

Response:

{
  "workflow_id": "shopping-workflow-xyz789",
  "status": "COMPLETED",
  "start_time": "2025-10-09T02:50:00",
  "close_time": "2025-10-09T02:50:05"
}

Example:

curl http://localhost:8000/workflows/shopping-workflow-xyz789/status

Get Workflow Result

GET /workflows/{workflow_id}/result

Get workflow result (waits if still running).

Response:

{
  "workflow_id": "shopping-workflow-xyz789",
  "status": "completed",
  "result": "Ordered 10 Oranges..."
}

Example:

curl http://localhost:8000/workflows/shopping-workflow-xyz789/result

Usage Examples

cURL

# Synchronous
curl -X POST http://localhost:8000/workflows/shopping \
  -H "Content-Type: application/json" \
  -d '{"items":[{"fruit":"APPLE","amount":5}]}'

# Asynchronous
curl -X POST http://localhost:8000/workflows/shopping/async \
  -H "Content-Type: application/json" \
  -d '{"items":[{"fruit":"BANANA","amount":3}]}'

# Check status
curl http://localhost:8000/workflows/{workflow_id}/status

# Get result
curl http://localhost:8000/workflows/{workflow_id}/result

Python

import requests

# Trigger workflow
response = requests.post(
    "http://localhost:8000/workflows/shopping",
    json={
        "items": [
            {"fruit": "APPLE", "amount": 5},
            {"fruit": "BANANA", "amount": 3}
        ]
    }
)

result = response.json()
print(f"Workflow ID: {result['workflow_id']}")
print(f"Result: {result['result']}")

Using the provided client:

from api.example_api_client import TemporalAPIClient

client = TemporalAPIClient()

# Synchronous
result = client.trigger_shopping_workflow_sync([
    {"fruit": "APPLE", "amount": 5}
])
print(result['result'])

# Asynchronous
workflow = client.trigger_shopping_workflow_async([
    {"fruit": "BANANA", "amount": 3}
])

# Wait for result
final_result = client.wait_for_workflow(workflow['workflow_id'])
print(final_result['result'])

JavaScript/Node.js

// Trigger workflow
const response = await fetch('http://localhost:8000/workflows/shopping', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({
    items: [
      { fruit: 'APPLE', amount: 5 },
      { fruit: 'BANANA', amount: 3 }
    ]
  })
});

const result = await response.json();
console.log('Workflow ID:', result.workflow_id);
console.log('Result:', result.result);

Go

package main

import (
    "bytes"
    "encoding/json"
    "net/http"
)

type Item struct {
    Fruit  string `json:"fruit"`
    Amount int    `json:"amount"`
}

type Request struct {
    Items []Item `json:"items"`
}

func main() {
    req := Request{
        Items: []Item{
            {Fruit: "APPLE", Amount: 5},
            {Fruit: "BANANA", Amount: 3},
        },
    }
    
    body, _ := json.Marshal(req)
    resp, _ := http.Post(
        "http://localhost:8000/workflows/shopping",
        "application/json",
        bytes.NewBuffer(body),
    )
    
    var result map[string]interface{}
    json.NewDecoder(resp.Body).Decode(&result)
    
    println("Result:", result["result"].(string))
}

Architecture

┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│   Client    │  HTTP   │  FastAPI    │  gRPC   │  Temporal   │
│  (any lang) │────────▶│  API Server │────────▶│   Server    │
└─────────────┘         └─────────────┘         └─────────────┘
                                                        │
                                                        │ gRPC
                                                        ▼
                                                 ┌─────────────┐
                                                 │   Worker    │
                                                 │   Service   │
                                                 └─────────────┘

Flow

  1. Client sends HTTP request to API
  2. API Server converts to Temporal workflow request
  3. Temporal Server queues the workflow
  4. Worker Service picks up and executes workflow
  5. Result flows back through the chain

Adding Your Own Workflows

1. Create Workflow

# workflows/my_workflow.py
from temporalio import workflow, activity
from datetime import timedelta

@activity.defn
async def my_activity(data: str) -> str:
    return f"Processed: {data}"

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, input: str) -> str:
        result = await workflow.execute_activity(
            my_activity,
            input,
            start_to_close_timeout=timedelta(seconds=10),
        )
        return result

2. Add to Worker

# api/worker_service.py
from workflows.my_workflow import MyWorkflow, my_activity

worker = Worker(
    client,
    task_queue="my-task-queue",
    workflows=[MyWorkflow],
    activities=[my_activity],
)

3. Add API Endpoint

# api/api_trigger_example.py
from workflows.my_workflow import MyWorkflow

@app.post("/workflows/my-workflow")
async def trigger_my_workflow(data: str):
    result = await temporal_client.execute_workflow(
        MyWorkflow.run,
        data,
        id=f"my-workflow-{uuid.uuid4()}",
        task_queue="my-task-queue",
    )
    return {"result": result}

Production Considerations

Authentication

Add API authentication:

from fastapi.security import HTTPBearer

security = HTTPBearer()

@app.post("/workflows/shopping")
async def trigger_workflow(
    request: ShoppingListRequest,
    credentials: HTTPAuthorizationCredentials = Depends(security)
):
    verify_token(credentials.credentials)
    # ... rest of code

Rate Limiting

from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/workflows/shopping")
@limiter.limit("10/minute")
async def trigger_workflow(request: Request, ...):
    # ... code

Error Handling

from temporalio.exceptions import WorkflowAlreadyStartedError

try:
    result = await temporal_client.execute_workflow(...)
except WorkflowAlreadyStartedError:
    raise HTTPException(409, "Workflow already running")
except Exception as e:
    raise HTTPException(500, f"Workflow failed: {str(e)}")

Environment Configuration

import os

TEMPORAL_HOST = os.getenv("TEMPORAL_HOST", "localhost:7233")
TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default")
API_PORT = int(os.getenv("API_PORT", "8000"))

Logging

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@app.post("/workflows/shopping")
async def trigger_workflow(request: ShoppingListRequest):
    logger.info(f"Triggering workflow with {len(request.items)} items")
    # ... code
    logger.info(f"Workflow completed: {workflow_id}")

Interactive Documentation

FastAPI provides automatic interactive docs:

You can test all endpoints directly in your browser!


Monitoring

Check Service Status

./scripts/check_services.sh

View Logs

# Worker logs
tail -f worker.log

# API logs
tail -f api.log

# Temporal logs
docker compose -f config/docker-compose.yml logs -f temporal

Temporal UI

View all workflow executions at http://localhost:8080


Testing

Test Script

./scripts/test_api.sh

This tests:

  • Health check
  • Synchronous workflow
  • Asynchronous workflow
  • Status checking
  • Result retrieval

Manual Testing

# 1. Start services
./scripts/start_api_services.sh

# 2. Test health
curl http://localhost:8000/

# 3. Trigger workflow
curl -X POST http://localhost:8000/workflows/shopping \
  -H "Content-Type: application/json" \
  -d '{"items":[{"fruit":"APPLE","amount":5}]}'

# 4. Check Temporal UI
open http://localhost:8080

# 5. Stop services
./scripts/stop_api_services.sh

Troubleshooting

API Not Responding

# Check if running
./scripts/check_services.sh

# Check logs
tail -f api.log

# Restart
./scripts/stop_api_services.sh
./scripts/start_api_services.sh

Worker Not Processing

# Check worker logs
tail -f worker.log

# Check Temporal connection
docker compose -f config/docker-compose.yml ps

# Restart worker
./scripts/stop_api_services.sh
./scripts/start_api_services.sh

Port Already in Use

# Check what's using port 8000
lsof -i :8000

# Kill it
kill $(lsof -ti:8000)

# Or use different port in api_trigger_example.py
uvicorn.run(app, host="0.0.0.0", port=8001)

Quick Reference

# Start services
./scripts/start_api_services.sh

# Test API
./scripts/test_api.sh

# Check status
./scripts/check_services.sh

# Stop services
./scripts/stop_api_services.sh

# View logs
tail -f api.log
tail -f worker.log

# Access points
# API: http://localhost:8000
# Docs: http://localhost:8000/docs
# UI: http://localhost:8080

🎉 Ready to trigger workflows via HTTP!

For workflow development, see DEVELOPMENT.md.