A comprehensive FastAPI template implementing Domain-Driven Design (DDD) and Clean Architecture principles with a Worker-Dispatcher pattern for asynchronous task processing.
This template follows a layered architecture with clear separation of concerns:
βββββββββββββββββββββββββββββββββββββββββββ
β Presentation Layer β
β (FastAPI routes, controllers, logic) β
βββββββββββββββββββ¬ββββββββββββββββββββββββ
β
βββββββββββββββββββΌββββββββββββββββββββββββ
β Domain Layer β
β (Entities, Services, Business Logic) β
βββββββββββββββββββ¬ββββββββββββββββββββββββ
β
βββββββββββββββββββΌββββββββββββββββββββββββ
β Infrastructure Layer β
β (Database, External APIs, Adapters) β
βββββββββββββββββββββββββββββββββββββββββββ
- Dependency Inversion: Domain layer defines interfaces, infrastructure implements them
- Single Responsibility: Each module has a clear, focused purpose
- Open/Closed: Easy to extend without modifying existing code
- Interface Segregation: Small, focused interfaces rather than large ones
- Worker-Dispatcher Pattern: Scalable asynchronous task processing
- Clean Architecture: Proper separation of concerns and dependency inversion
- FastAPI Integration: Modern async web framework with automatic API documentation
- Pydantic Models: Type-safe data validation and serialization
- Database Integration: SQLAlchemy with PostgreSQL support
- External Service Adapters: AWS S3, Slack, Google Cloud Platform, Amazon Marketing Cloud
- Docker Support: Multi-stage builds and docker-compose setup
- Comprehensive Testing: Unit tests, integration tests, and test utilities
- Database Migrations: Alembic for database schema management
- Monitoring & Health Checks: Built-in health checks and monitoring endpoints
worker-dispatcher-template/
βββ app/
β βββ domain/ # Domain layer
β β βββ entities.py # Core business entities
β β βββ enums.py # Domain enumerations
β β βββ services/ # Domain services and interfaces
β βββ infrastructure/ # Infrastructure layer
β β βββ db/ # Database models and repositories
β β βββ aws/ # AWS service adapters
β β βββ slack/ # Slack integration
β β βββ gcp/ # Google Cloud Platform adapters
β β βββ amc/ # Amazon Marketing Cloud adapter
β β βββ exceptions/ # Custom exceptions
β βββ presentation/ # Presentation layer
β βββ interface.py # FastAPI application setup
β βββ logic/ # API routes and schemas
βββ worker/ # Worker implementation
βββ tests/ # Test suite
β βββ unit/ # Unit tests
β βββ integration/ # Integration tests
β βββ e2e/ # End-to-end tests
βββ alembic/ # Database migrations
βββ docker-compose.yml # Docker services
βββ Dockerfile # Application container
βββ Dockerfile.worker # Worker container
βββ pyproject.toml # Poetry configuration
βββ README.md # This file
- Python 3.9+
- Poetry (for dependency management)
- Docker & Docker Compose
- PostgreSQL (or use Docker)
- Clone the repository:
git clone <repository-url>
cd worker-dispatcher-template- Install dependencies:
poetry install- Set up environment variables:
cp .env.example .env
# Edit .env with your configuration- Start services with Docker Compose:
docker-compose up -d- Run database migrations:
poetry run alembic upgrade head- Access the application:
- API: http://localhost:8000
- API Documentation: http://localhost:8000/docs
- Health Check: http://localhost:8000/health
- Dispatch Endpoints: http://localhost:8000/dispatch/
- Work Endpoints: http://localhost:8000/work/
- Start PostgreSQL:
# Using Docker
docker run -d --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=password postgres:15
# Or install locally- Set environment variables:
export DATABASE_URL="postgresql+asyncpg://postgres:password@localhost:5432/worker_dispatcher"
export REDIS_URL="redis://localhost:6379"- Run the application:
poetry run uvicorn app.presentation.interface:app --reload- Test the dispatch/work workflow:
poetry run python example_usage.py| Variable | Description | Default |
|---|---|---|
DATABASE_URL |
PostgreSQL connection string | postgresql+asyncpg://postgres:password@localhost:5432/worker_dispatcher |
REDIS_URL |
Redis connection string | redis://localhost:6379 |
AWS_ACCESS_KEY_ID |
AWS access key | - |
AWS_SECRET_ACCESS_KEY |
AWS secret key | - |
AWS_DEFAULT_REGION |
AWS region | us-east-1 |
SLACK_BOT_TOKEN |
Slack bot token | - |
SLACK_WEBHOOK_URL |
Slack webhook URL | - |
GCP_PROJECT_ID |
Google Cloud Project ID | - |
WORKER_NAME |
Worker instance name | worker-1 |
WORKER_CAPABILITIES |
Worker capabilities (comma-separated) | general |
The application uses Alembic for database migrations. To create a new migration:
poetry run alembic revision --autogenerate -m "Description of changes"
poetry run alembic upgrade headPOST /dispatch/task- Dispatch a new taskPOST /dispatch/task/bulk- Dispatch multiple tasksPOST /dispatch/assign- Manually assign task to workerGET /dispatch/queue/status- Get queue statusGET /dispatch/available-workers- Get available workersPOST /dispatch/schedule- Trigger task processingGET /dispatch/stats- Get dispatch statistics
POST /work/register- Register as a workerGET /work/next-task- Get next task to processPOST /work/complete-task- Mark task as completedPOST /work/fail-task- Mark task as failedPOST /work/heartbeat- Send worker heartbeatGET /work/status/{worker_id}- Get worker statusPOST /work/process-tasks- Start continuous task processingPOST /work/stop-processing- Stop task processingGET /work/my-tasks/{worker_id}- Get worker's task historyGET /work/capabilities- Get available capabilities
POST /api/v1/tasks/- Create a new taskGET /api/v1/tasks/- List tasks with filteringGET /api/v1/tasks/{task_id}- Get specific taskPUT /api/v1/tasks/{task_id}- Update taskDELETE /api/v1/tasks/{task_id}- Delete taskPOST /api/v1/workers/- Register a new workerGET /api/v1/workers/- List workers with filtering
GET /api/v1/monitoring/health- System health statusGET /api/v1/monitoring/queue/statistics- Queue statisticsGET /api/v1/monitoring/workers/statistics- Worker statisticsGET /api/v1/monitoring/metrics- Comprehensive metricsGET /api/v1/monitoring/dashboard- Dashboard data
# Run all tests
poetry run pytest
# Run with coverage
poetry run pytest --cov=app --cov-report=html
# Run specific test file
poetry run pytest tests/unit/domain/test_entities.py
# Run integration tests
poetry run pytest tests/integration/- Unit Tests: Test individual components in isolation
- Integration Tests: Test component interactions
- End-to-End Tests: Test complete workflows
The test suite uses:
pytestfor test frameworkpytest-asynciofor async test supportpytest-covfor coverage reporting- Mock objects for external dependencies
from app.infrastructure.s3.adapter import S3Adapter
s3_adapter = S3Adapter(region_name="us-east-1")
url = await s3_adapter.upload_file("local_file.txt", "bucket-name", "key")from app.infrastructure.slack.adapter import SlackAdapter
slack_adapter = SlackAdapter(bot_token="xoxb-your-token")
await slack_adapter.send_message("#general", "Hello from Worker Dispatcher!")from app.infrastructure.gcp.adapter import GCPAdapter
gcp_adapter = GCPAdapter(project_id="your-project-id")
await gcp_adapter.storage.upload_blob("bucket", "local_file.txt", "remote_file.txt")The application provides comprehensive health checks:
curl http://localhost:8000/health
curl http://localhost:8000/api/v1/monitoring/healthAccess detailed metrics at:
curl http://localhost:8000/api/v1/monitoring/metricsThe application uses structured logging with correlation IDs for request tracking.
- Build images:
docker-compose build- Deploy services:
docker-compose up -d- Scale workers:
docker-compose up -d --scale worker=5- Use proper environment variables for production
- Configure database connection pooling
- Set up proper logging aggregation
- Implement monitoring and alerting
- Use a reverse proxy (Nginx) for production
- Configure auto-scaling for workers
- No hardcoded secrets or credentials
- Environment variable configuration
- Input validation with Pydantic
- SQL injection prevention with SQLAlchemy
- CORS configuration
- Rate limiting (via Nginx)
- Security headers
The Nginx configuration includes:
- X-Frame-Options
- X-Content-Type-Options
- X-XSS-Protection
- Referrer-Policy
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests for new functionality
- Run the test suite
- Submit a pull request
# Install development dependencies
poetry install --with dev
# Install pre-commit hooks
poetry run pre-commit install
# Run linting
poetry run black .
poetry run isort .
poetry run flake8 .
poetry run mypy .This project is licensed under the MIT License - see the LICENSE file for details.
For support and questions:
- Check the documentation
- Review existing issues
- Create a new issue with detailed information
- Include logs and error messages
- Add Redis caching layer
- Implement circuit breaker pattern
- Add GraphQL API support
- Implement task scheduling with cron expressions
- Add webhook notifications
- Implement task dependencies
- Add performance monitoring
- Create dashboard UI
- Add multi-tenant support
- Implement audit logging