🎯 Issue Summary
Replace FastAPI BackgroundTasks with Celery for robust distributed task execution and better scalability.
📋 Current Behavior
Pipeline executions use FastAPI's BackgroundTasks which has limitations:
- No task persistence across server restarts
- No distributed execution support
- Limited retry/failure handling
- No task monitoring/management
Current Implementation:
- File:
backend/api/routes/executions.py (lines 28-40)
- Uses simple background task execution
✨ Desired Behavior
- Celery workers handle pipeline execution
- Task persistence with Redis/RabbitMQ broker
- Distributed execution across multiple workers
- Built-in retry mechanisms and task monitoring
🔧 Technical Requirements
1. Celery Setup
2. Task Definitions
3. API Integration
4. Worker Management
5. Error Handling
📝 Acceptance Criteria
- ✅ Celery workers execute pipelines successfully
- ✅ Tasks persist across server restarts
- ✅ Multiple workers can process tasks concurrently
- ✅ Task status queryable via API
- ✅ Failed tasks automatically retry up to 3 times
💡 Implementation Example
# backend/celery_app.py [2](#header-2)
from celery import Celery
celery_app = Celery(
'flexiroaster',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
# backend/tasks/pipeline_tasks.py [3](#header-3)
from backend.celery_app import celery_app
@celery_app.task(bind=True, max_retries=3)
def execute_pipeline_task(self, pipeline_id: str, execution_id: str):
try:
# Execute pipeline
pass
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
📚 Resources
[Celery Documentation](https://docs.celeryq.dev/)
[FastAPI with Celery](https://fastapi.tiangolo.com/deployment/docker/#celery)
🎯 Issue Summary
Replace FastAPI BackgroundTasks with Celery for robust distributed task execution and better scalability.
📋 Current Behavior
Pipeline executions use FastAPI's
BackgroundTaskswhich has limitations:Current Implementation:
backend/api/routes/executions.py(lines 28-40)✨ Desired Behavior
🔧 Technical Requirements
1. Celery Setup
celery[redis]==5.3.4to requirementsbackend/celery_app.pywith Celery configuration2. Task Definitions
backend/tasks/pipeline_tasks.pyexecute_pipeline_taskas Celery task3. API Integration
create_execution()to useexecute_pipeline_task.delay()4. Worker Management
5. Error Handling
📝 Acceptance Criteria
💡 Implementation Example