Skip to content

Add Pipeline Execution Timeout and Cancellation [8](#header-8) #35

@fuzziecoder

Description

@fuzziecoder

🎯 Issue Summary

Implement execution timeout mechanism and manual cancellation support for long-running pipelines.

📋 Current Behavior

Pipelines can run indefinitely with no timeout or cancellation mechanism.

Current Limitations:

  • No execution timeout
  • Cannot cancel running executions
  • Stuck pipelines consume resources indefinitely

✨ Proposed Solution

Add timeout configuration and cancellation API:

  • Per-pipeline timeout settings
  • Manual cancellation via API
  • Graceful shutdown of running stages
  • Timeout tracking in execution logs

🔧 Technical Requirements

1. Timeout Configuration

  • Add timeout_seconds field to Pipeline model
  • Default timeout: 3600 seconds (1 hour)
  • Validate timeout > 0

2. Timeout Enforcement

  • Wrap execution in asyncio.wait_for()
  • Raise TimeoutError when exceeded
  • Update execution status to TIMEOUT

3. Cancellation API

  • Add DELETE /api/executions/{id} endpoint
  • Set cancellation flag in execution record
  • Check flag between stages
  • Graceful cleanup on cancellation

4. Execution Tracking

  • Add cancelled_at timestamp field
  • Log timeout/cancellation events
  • Track cancellation reason

📝 Acceptance Criteria

  • ✅ Pipelines timeout after configured duration
  • ✅ Manual cancellation works via API
  • ✅ Execution status updated correctly (TIMEOUT/CANCELLED)
  • ✅ Resources cleaned up on timeout/cancellation
  • ✅ Logs show timeout/cancellation details

💡 Implementation Example

# backend/core/executor.py  [9](#header-9)
import asyncio  
  
async def execute_with_timeout(pipeline: Pipeline, execution_id: str):  
    timeout = pipeline.timeout_seconds or 3600  
      
    try:  
        await asyncio.wait_for(  
            execute_pipeline(pipeline, execution_id),  
            timeout=timeout  
        )  
    except asyncio.TimeoutError:  
        execution = executions_db[execution_id]  
        execution.status = ExecutionStatus.TIMEOUT  
        execution.error = f"Execution exceeded timeout of {timeout}s"  
        raise  
  
# Cancellation check  [10](#header-10)
async def execute_stage(stage, execution_id):  
    execution = executions_db[execution_id]  
    if execution.cancelled:  
        raise CancellationError("Execution cancelled by user")  
    # Execute stage logic

Metadata

Metadata

Assignees

No one assigned

    Projects

    Status

    Backlog

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions