Skip to content

Implement Async Pipeline Execution with Celery #33

@fuzziecoder

Description

@fuzziecoder

🎯 Issue Summary

Replace FastAPI BackgroundTasks with Celery for robust distributed task execution and better scalability.

📋 Current Behavior

Pipeline executions use FastAPI's BackgroundTasks which has limitations:

  • No task persistence across server restarts
  • No distributed execution support
  • Limited retry/failure handling
  • No task monitoring/management

Current Implementation:

  • File: backend/api/routes/executions.py (lines 28-40)
  • Uses simple background task execution

✨ Desired Behavior

  • Celery workers handle pipeline execution
  • Task persistence with Redis/RabbitMQ broker
  • Distributed execution across multiple workers
  • Built-in retry mechanisms and task monitoring

🔧 Technical Requirements

1. Celery Setup

  • Add celery[redis]==5.3.4 to requirements
  • Create backend/celery_app.py with Celery configuration
  • Configure Redis as message broker

2. Task Definitions

  • Create backend/tasks/pipeline_tasks.py
  • Define execute_pipeline_task as Celery task
  • Add task result backend configuration

3. API Integration

  • Update create_execution() to use execute_pipeline_task.delay()
  • Return Celery task ID in response
  • Add endpoint to check task status

4. Worker Management

  • Create worker startup script
  • Add worker monitoring with Flower
  • Configure task routing and priorities

5. Error Handling

  • Implement task retry logic with exponential backoff
  • Add dead letter queue for failed tasks
  • Log task failures to database

📝 Acceptance Criteria

  • ✅ Celery workers execute pipelines successfully
  • ✅ Tasks persist across server restarts
  • ✅ Multiple workers can process tasks concurrently
  • ✅ Task status queryable via API
  • ✅ Failed tasks automatically retry up to 3 times

💡 Implementation Example

# backend/celery_app.py  [2](#header-2)
from celery import Celery  
  
celery_app = Celery(  
    'flexiroaster',  
    broker='redis://localhost:6379/0',  
    backend='redis://localhost:6379/1'  
)  
  
# backend/tasks/pipeline_tasks.py  [3](#header-3)
from backend.celery_app import celery_app  
  
@celery_app.task(bind=True, max_retries=3)  
def execute_pipeline_task(self, pipeline_id: str, execution_id: str):  
    try:  
        # Execute pipeline  
        pass  
    except Exception as exc:  
        raise self.retry(exc=exc, countdown=60)
📚 Resources
[Celery Documentation](https://docs.celeryq.dev/)
[FastAPI with Celery](https://fastapi.tiangolo.com/deployment/docker/#celery)

Metadata

Metadata

Assignees

No one assigned

    Projects

    Status

    Backlog

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions