Skip to content

[Epic] Long-Running Task Execution - Support for Hours/Days Duration Tasks #243

@adiasaf

Description

@adiasaf

What You Want

Enable CUGA to execute long-running tasks that may take hours or days to complete, with robust state management, checkpointing, resumability, and monitoring capabilities. This allows CUGA to handle complex, time-intensive workflows without timeout issues or resource constraints.

Key Objectives

  1. Asynchronous Execution

    • Execute tasks asynchronously without blocking
    • Support background task processing
    • Enable task queuing and scheduling
  2. State Persistence & Checkpointing

    • Save agent state at regular intervals
    • Resume from last checkpoint on failure
    • Maintain execution history and audit trail
  3. Resilience & Recovery

    • Automatic retry on transient failures
    • Graceful handling of interruptions
    • Recovery from system crashes or restarts
  4. Monitoring & Control

    • Real-time progress tracking
    • Task cancellation and pause/resume
    • Resource usage monitoring
    • Alerting on issues or completion

Why You Need It

Business Value

  • Complex Workflows: Handle multi-step processes that take hours or days
  • Reliability: Complete tasks despite interruptions or failures
  • Resource Efficiency: Free up resources while tasks run in background
  • Scalability: Process multiple long-running tasks concurrently
  • User Experience: Non-blocking operations, users don't wait for completion

Technical Value

  • Fault Tolerance: Survive system failures and restarts
  • State Management: Maintain consistent state across long durations
  • Resource Optimization: Efficient use of compute and memory
  • Observability: Track progress and diagnose issues
  • Flexibility: Pause, resume, or cancel tasks as needed

Use Cases

  • Data Processing: Large-scale ETL, data migration, batch processing
  • Research & Analysis: Long-running simulations, data analysis
  • Content Generation: Bulk document generation, video processing
  • System Monitoring: Continuous monitoring and alerting
  • Scheduled Tasks: Daily/weekly reports, periodic maintenance

How It Could Work

Phase 1: Asynchronous Task Framework

  1. Task Queue System

    from cuga.tasks import LongRunningTask, TaskQueue
    
    # Create long-running task
    task = LongRunningTask(
        name="process_large_dataset",
        agent_config="agent_config.yaml",
        input_data={"dataset_path": "/data/large.csv"},
        estimated_duration="6 hours"
    )
    
    # Submit to queue
    queue = TaskQueue()
    task_id = queue.submit(task)
    
    # Task runs asynchronously
    print(f"Task submitted: {task_id}")
  2. Background Worker Pool

    • Dedicated worker processes for long-running tasks
    • Configurable worker count and resource limits
    • Load balancing across workers
    • Worker health monitoring
  3. Task Scheduling

    • Schedule tasks for future execution
    • Cron-like scheduling for recurring tasks
    • Priority-based task execution
    • Dependency management between tasks
  4. Task Lifecycle Management

    • States: PENDING, RUNNING, PAUSED, COMPLETED, FAILED, CANCELLED
    • State transitions and validation
    • Lifecycle hooks for custom logic

Phase 2: State Persistence & Checkpointing

  1. Automatic Checkpointing

    # Agent automatically saves checkpoints
    agent = CugaAgent(
        checkpoint_interval="5 minutes",
        checkpoint_storage="s3://bucket/checkpoints/"
    )
    
    # Checkpoint includes:
    # - Agent state
    # - Execution progress
    # - Tool outputs
    # - Memory/context
    # - Timestamp
  2. Resume from Checkpoint

    # Resume failed or interrupted task
    task = LongRunningTask.resume(task_id)
    # → Loads last checkpoint
    # → Continues from where it left off
  3. Checkpoint Storage

    • Support multiple storage backends (S3, GCS, local filesystem, database)
    • Compression and encryption
    • Retention policies and cleanup
    • Versioning and rollback
  4. Incremental Progress Tracking

    • Track completion percentage
    • Estimate remaining time
    • Record milestones and sub-tasks
    • Progress visualization

Phase 3: Resilience & Recovery

  1. Automatic Retry Logic

    • Configurable retry policies
    • Exponential backoff
    • Maximum retry attempts
    • Retry on specific error types
  2. Failure Handling

    task = LongRunningTask(
        name="resilient_task",
        retry_policy={
            "max_attempts": 3,
            "backoff": "exponential",
            "retry_on": ["NetworkError", "TimeoutError"]
        },
        on_failure="notify_admin"
    )
  3. Graceful Shutdown

    • Save checkpoint before shutdown
    • Complete current operation
    • Clean up resources
    • Mark task as PAUSED for later resume
  4. Dead Letter Queue

    • Failed tasks moved to DLQ
    • Manual review and retry
    • Error analysis and debugging

Phase 4: Monitoring & Control

  1. Real-Time Progress Tracking

    # Get task status
    status = task.get_status()
    print(f"Progress: {status.progress}%")
    print(f"Elapsed: {status.elapsed_time}")
    print(f"ETA: {status.estimated_completion}")
    print(f"Current step: {status.current_step}")
  2. Task Control API

    # Pause task
    task.pause()
    
    # Resume task
    task.resume()
    
    # Cancel task
    task.cancel(reason="User requested")
    
    # Get logs
    logs = task.get_logs(last_n=100)
  3. Monitoring Dashboard

    • Web UI for task monitoring
    • Real-time progress visualization
    • Task history and analytics
    • Resource usage graphs
    • Alert configuration
  4. Notifications & Alerts

    • Email/Slack notifications on completion or failure
    • Webhook callbacks for task events
    • Custom alerting rules
    • Integration with monitoring systems (PagerDuty, etc.)

Phase 5: Advanced Features

  1. Distributed Execution

    • Split task across multiple workers
    • Parallel sub-task execution
    • Result aggregation
    • Distributed state management
  2. Resource Management

    • CPU and memory limits per task
    • GPU allocation for ML tasks
    • Disk space monitoring
    • Network bandwidth throttling
  3. Task Dependencies

    • Define task dependencies (DAG)
    • Conditional execution
    • Fan-out/fan-in patterns
    • Workflow orchestration
  4. Time-Based Features

    • Task timeout configuration
    • Scheduled execution windows
    • Rate limiting
    • Time-based checkpointing

Sub-Tasks

This epic should be broken down into individual issues for:

Core Framework

  • Design long-running task architecture
  • Implement task queue system
  • Build background worker pool
  • Add task scheduling capabilities
  • Create task lifecycle management

State Management

  • Design checkpoint format and storage
  • Implement automatic checkpointing
  • Add resume from checkpoint functionality
  • Build progress tracking system
  • Create state persistence layer

Resilience

  • Implement retry logic and policies
  • Add failure handling mechanisms
  • Build graceful shutdown system
  • Create dead letter queue
  • Add error recovery strategies

Monitoring & Control

  • Build task status API
  • Implement task control operations (pause/resume/cancel)
  • Create monitoring dashboard
  • Add logging and audit trail
  • Build notification system

Advanced Features

  • Add distributed execution support
  • Implement resource management
  • Build task dependency system
  • Add time-based features
  • Create workflow orchestration

Testing & Documentation

  • Create long-running task examples
  • Write integration tests
  • Add performance benchmarks
  • Write user documentation
  • Create operational runbook

Success Metrics

Must Achieve:

  • ✅ Execute tasks running for 24+ hours without failure
  • ✅ Resume from checkpoint with <1% data loss
  • ✅ Survive system restarts and crashes
  • ✅ Support 100+ concurrent long-running tasks
  • ✅ <5 minute recovery time from failures
  • ✅ Real-time progress tracking with <10 second latency

Nice to Have:

  • Support for tasks running weeks/months
  • Sub-second checkpoint overhead
  • Distributed execution across multiple machines
  • Visual workflow designer
  • Predictive failure detection

Technical Considerations

Architecture Decisions

  1. Task Queue Backend

    • Options: Celery, RQ, custom implementation
    • Consider: Redis, RabbitMQ, AWS SQS, Kafka
    • Trade-offs: complexity vs features
  2. Checkpoint Storage

    • Local filesystem for development
    • S3/GCS for production
    • Database for metadata
    • Hybrid approach
  3. State Serialization

    • Pickle for Python objects
    • JSON for interoperability
    • Protobuf for efficiency
    • Custom serialization for complex types
  4. Worker Architecture

    • Process-based workers (multiprocessing)
    • Container-based workers (Docker/Kubernetes)
    • Serverless workers (AWS Lambda, Azure Functions)

Challenges & Solutions

Challenge: State size grows over time

  • Solution: Incremental checkpoints, state compression, garbage collection

Challenge: Checkpoint overhead impacts performance

  • Solution: Async checkpointing, configurable intervals, smart checkpointing

Challenge: Coordinating distributed tasks

  • Solution: Distributed locks, consensus algorithms, event-driven coordination

Challenge: Resource exhaustion

  • Solution: Resource limits, monitoring, auto-scaling, task prioritization

Links and Context

Task Queue Systems

  • Celery - Distributed task queue
  • RQ - Simple Python task queue
  • Temporal - Durable execution platform
  • Airflow - Workflow orchestration

Related Components

  • CUGA agent: src/cuga/backend/cuga_graph/
  • State management: src/cuga/backend/cuga_graph/state/
  • Memory: src/cuga/backend/cuga_graph/nodes/save_reuse/

Similar Projects

Example Use Cases

Use Case 1: Large-Scale Data Processing

Task: Process 1TB dataset
Duration: 12 hours
Checkpoints: Every 30 minutes
Recovery: Resume from last checkpoint on failure

Use Case 2: Continuous Monitoring

Task: Monitor system metrics 24/7
Duration: Indefinite
Checkpoints: Hourly
Actions: Alert on anomalies, generate daily reports

Use Case 3: Scheduled Batch Job

Task: Generate monthly reports
Schedule: 1st of each month at 2 AM
Duration: 4 hours
Notifications: Email on completion

Implementation Timeline

  • Phase 1 (Weeks 1-4): Core async framework and task queue
  • Phase 2 (Weeks 5-8): State persistence and checkpointing
  • Phase 3 (Weeks 9-12): Resilience and recovery mechanisms
  • Phase 4 (Weeks 13-16): Monitoring, control, and dashboard
  • Phase 5 (Weeks 17-20): Advanced features and optimization

Dependencies

  • Task queue backend (Celery/RQ or custom)
  • Persistent storage (S3, database)
  • Monitoring infrastructure
  • Container orchestration (optional, for distributed execution)
  • Message broker (Redis, RabbitMQ, or Kafka)

Metadata

Metadata

Assignees

No one assigned

    Labels

    component: agentCore agent loop, DynamicAgentGraph, LLM node, tool execution, CugaLiteenhancementNew feature or requestneeds-triageNewly filed, not yet reviewedtype: epicLarge body of work grouping multiple issues

    Type

    No type
    No fields configured for issues without a type.

    Projects

    Status

    Backlog

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions