Skip to content

Gilfeather/worker-dispatcher-template

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

3 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Worker Dispatcher Template

A comprehensive FastAPI template implementing Domain-Driven Design (DDD) and Clean Architecture principles with a Worker-Dispatcher pattern for asynchronous task processing.

πŸ—οΈ Architecture Overview

This template follows a layered architecture with clear separation of concerns:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Presentation Layer            β”‚
β”‚  (FastAPI routes, controllers, logic)   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                  β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚            Domain Layer                 β”‚
β”‚   (Entities, Services, Business Logic)  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                  β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         Infrastructure Layer            β”‚
β”‚  (Database, External APIs, Adapters)    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Principles

  • Dependency Inversion: Domain layer defines interfaces, infrastructure implements them
  • Single Responsibility: Each module has a clear, focused purpose
  • Open/Closed: Easy to extend without modifying existing code
  • Interface Segregation: Small, focused interfaces rather than large ones

πŸš€ Features

  • Worker-Dispatcher Pattern: Scalable asynchronous task processing
  • Clean Architecture: Proper separation of concerns and dependency inversion
  • FastAPI Integration: Modern async web framework with automatic API documentation
  • Pydantic Models: Type-safe data validation and serialization
  • Database Integration: SQLAlchemy with PostgreSQL support
  • External Service Adapters: AWS S3, Slack, Google Cloud Platform, Amazon Marketing Cloud
  • Docker Support: Multi-stage builds and docker-compose setup
  • Comprehensive Testing: Unit tests, integration tests, and test utilities
  • Database Migrations: Alembic for database schema management
  • Monitoring & Health Checks: Built-in health checks and monitoring endpoints

πŸ“ Project Structure

worker-dispatcher-template/
β”œβ”€β”€ app/
β”‚   β”œβ”€β”€ domain/                 # Domain layer
β”‚   β”‚   β”œβ”€β”€ entities.py         # Core business entities
β”‚   β”‚   β”œβ”€β”€ enums.py           # Domain enumerations
β”‚   β”‚   └── services/          # Domain services and interfaces
β”‚   β”œβ”€β”€ infrastructure/        # Infrastructure layer
β”‚   β”‚   β”œβ”€β”€ db/               # Database models and repositories
β”‚   β”‚   β”œβ”€β”€ aws/              # AWS service adapters
β”‚   β”‚   β”œβ”€β”€ slack/            # Slack integration
β”‚   β”‚   β”œβ”€β”€ gcp/              # Google Cloud Platform adapters
β”‚   β”‚   β”œβ”€β”€ amc/              # Amazon Marketing Cloud adapter
β”‚   β”‚   └── exceptions/       # Custom exceptions
β”‚   └── presentation/         # Presentation layer
β”‚       β”œβ”€β”€ interface.py      # FastAPI application setup
β”‚       └── logic/           # API routes and schemas
β”œβ”€β”€ worker/                   # Worker implementation
β”œβ”€β”€ tests/                    # Test suite
β”‚   β”œβ”€β”€ unit/                # Unit tests
β”‚   β”œβ”€β”€ integration/         # Integration tests
β”‚   └── e2e/                 # End-to-end tests
β”œβ”€β”€ alembic/                 # Database migrations
β”œβ”€β”€ docker-compose.yml       # Docker services
β”œβ”€β”€ Dockerfile              # Application container
β”œβ”€β”€ Dockerfile.worker       # Worker container
β”œβ”€β”€ pyproject.toml          # Poetry configuration
└── README.md              # This file

πŸƒ Quick Start

Prerequisites

  • Python 3.9+
  • Poetry (for dependency management)
  • Docker & Docker Compose
  • PostgreSQL (or use Docker)

Installation

  1. Clone the repository:
git clone <repository-url>
cd worker-dispatcher-template
  1. Install dependencies:
poetry install
  1. Set up environment variables:
cp .env.example .env
# Edit .env with your configuration
  1. Start services with Docker Compose:
docker-compose up -d
  1. Run database migrations:
poetry run alembic upgrade head
  1. Access the application:

Manual Setup (without Docker)

  1. Start PostgreSQL:
# Using Docker
docker run -d --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=password postgres:15

# Or install locally
  1. Set environment variables:
export DATABASE_URL="postgresql+asyncpg://postgres:password@localhost:5432/worker_dispatcher"
export REDIS_URL="redis://localhost:6379"
  1. Run the application:
poetry run uvicorn app.presentation.interface:app --reload
  1. Test the dispatch/work workflow:
poetry run python example_usage.py

πŸ”§ Configuration

Environment Variables

Variable Description Default
DATABASE_URL PostgreSQL connection string postgresql+asyncpg://postgres:password@localhost:5432/worker_dispatcher
REDIS_URL Redis connection string redis://localhost:6379
AWS_ACCESS_KEY_ID AWS access key -
AWS_SECRET_ACCESS_KEY AWS secret key -
AWS_DEFAULT_REGION AWS region us-east-1
SLACK_BOT_TOKEN Slack bot token -
SLACK_WEBHOOK_URL Slack webhook URL -
GCP_PROJECT_ID Google Cloud Project ID -
WORKER_NAME Worker instance name worker-1
WORKER_CAPABILITIES Worker capabilities (comma-separated) general

Database Configuration

The application uses Alembic for database migrations. To create a new migration:

poetry run alembic revision --autogenerate -m "Description of changes"
poetry run alembic upgrade head

πŸ“Š API Endpoints

Dispatch Endpoints (Task Distribution)

  • POST /dispatch/task - Dispatch a new task
  • POST /dispatch/task/bulk - Dispatch multiple tasks
  • POST /dispatch/assign - Manually assign task to worker
  • GET /dispatch/queue/status - Get queue status
  • GET /dispatch/available-workers - Get available workers
  • POST /dispatch/schedule - Trigger task processing
  • GET /dispatch/stats - Get dispatch statistics

Work Endpoints (Task Processing)

  • POST /work/register - Register as a worker
  • GET /work/next-task - Get next task to process
  • POST /work/complete-task - Mark task as completed
  • POST /work/fail-task - Mark task as failed
  • POST /work/heartbeat - Send worker heartbeat
  • GET /work/status/{worker_id} - Get worker status
  • POST /work/process-tasks - Start continuous task processing
  • POST /work/stop-processing - Stop task processing
  • GET /work/my-tasks/{worker_id} - Get worker's task history
  • GET /work/capabilities - Get available capabilities

Traditional REST API (Optional)

  • POST /api/v1/tasks/ - Create a new task
  • GET /api/v1/tasks/ - List tasks with filtering
  • GET /api/v1/tasks/{task_id} - Get specific task
  • PUT /api/v1/tasks/{task_id} - Update task
  • DELETE /api/v1/tasks/{task_id} - Delete task
  • POST /api/v1/workers/ - Register a new worker
  • GET /api/v1/workers/ - List workers with filtering

Monitoring

  • GET /api/v1/monitoring/health - System health status
  • GET /api/v1/monitoring/queue/statistics - Queue statistics
  • GET /api/v1/monitoring/workers/statistics - Worker statistics
  • GET /api/v1/monitoring/metrics - Comprehensive metrics
  • GET /api/v1/monitoring/dashboard - Dashboard data

πŸ§ͺ Testing

Running Tests

# Run all tests
poetry run pytest

# Run with coverage
poetry run pytest --cov=app --cov-report=html

# Run specific test file
poetry run pytest tests/unit/domain/test_entities.py

# Run integration tests
poetry run pytest tests/integration/

Test Structure

  • Unit Tests: Test individual components in isolation
  • Integration Tests: Test component interactions
  • End-to-End Tests: Test complete workflows

Test Configuration

The test suite uses:

  • pytest for test framework
  • pytest-asyncio for async test support
  • pytest-cov for coverage reporting
  • Mock objects for external dependencies

πŸ”Œ External Service Integration

AWS S3

from app.infrastructure.s3.adapter import S3Adapter

s3_adapter = S3Adapter(region_name="us-east-1")
url = await s3_adapter.upload_file("local_file.txt", "bucket-name", "key")

Slack

from app.infrastructure.slack.adapter import SlackAdapter

slack_adapter = SlackAdapter(bot_token="xoxb-your-token")
await slack_adapter.send_message("#general", "Hello from Worker Dispatcher!")

Google Cloud Platform

from app.infrastructure.gcp.adapter import GCPAdapter

gcp_adapter = GCPAdapter(project_id="your-project-id")
await gcp_adapter.storage.upload_blob("bucket", "local_file.txt", "remote_file.txt")

πŸ“ˆ Monitoring and Observability

Health Checks

The application provides comprehensive health checks:

curl http://localhost:8000/health
curl http://localhost:8000/api/v1/monitoring/health

Metrics

Access detailed metrics at:

curl http://localhost:8000/api/v1/monitoring/metrics

Logging

The application uses structured logging with correlation IDs for request tracking.

🚒 Deployment

Docker Deployment

  1. Build images:
docker-compose build
  1. Deploy services:
docker-compose up -d
  1. Scale workers:
docker-compose up -d --scale worker=5

Production Considerations

  • Use proper environment variables for production
  • Configure database connection pooling
  • Set up proper logging aggregation
  • Implement monitoring and alerting
  • Use a reverse proxy (Nginx) for production
  • Configure auto-scaling for workers

πŸ”’ Security

Best Practices Implemented

  • No hardcoded secrets or credentials
  • Environment variable configuration
  • Input validation with Pydantic
  • SQL injection prevention with SQLAlchemy
  • CORS configuration
  • Rate limiting (via Nginx)
  • Security headers

Security Headers

The Nginx configuration includes:

  • X-Frame-Options
  • X-Content-Type-Options
  • X-XSS-Protection
  • Referrer-Policy

🀝 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests for new functionality
  5. Run the test suite
  6. Submit a pull request

Development Setup

# Install development dependencies
poetry install --with dev

# Install pre-commit hooks
poetry run pre-commit install

# Run linting
poetry run black .
poetry run isort .
poetry run flake8 .
poetry run mypy .

πŸ“ License

This project is licensed under the MIT License - see the LICENSE file for details.

πŸ†˜ Support

For support and questions:

  1. Check the documentation
  2. Review existing issues
  3. Create a new issue with detailed information
  4. Include logs and error messages

🎯 Roadmap

  • Add Redis caching layer
  • Implement circuit breaker pattern
  • Add GraphQL API support
  • Implement task scheduling with cron expressions
  • Add webhook notifications
  • Implement task dependencies
  • Add performance monitoring
  • Create dashboard UI
  • Add multi-tenant support
  • Implement audit logging

πŸ“š Additional Resources

About

No description, website, or topics provided.

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages