Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 181 additions & 47 deletions docs/implementing_data_loaders.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,71 +305,207 @@ def _get_table_metadata(self, table: pa.Table, duration: float, batch_count: int

## Testing

### Integration Test Structure
### Generalized Test Infrastructure

Create integration tests in `tests/integration/test_{system}_loader.py`:
The project uses a generalized test infrastructure that eliminates code duplication across loader tests. Instead of writing standalone tests for each loader, you inherit from shared base test classes.

### Architecture

```
tests/integration/loaders/
├── conftest.py # Base classes and fixtures
├── test_base_loader.py # 7 core tests (all loaders inherit)
├── test_base_streaming.py # 5 streaming tests (for loaders with reorg support)
└── backends/
├── test_postgresql.py # PostgreSQL-specific config + tests
├── test_redis.py # Redis-specific config + tests
└── test_example.py # Your loader tests here
```

### Step 1: Create Configuration Fixture

Add your loader's configuration fixture to `tests/conftest.py`:

```python
# tests/integration/test_example_loader.py
@pytest.fixture(scope='session')
def example_test_config(request):
"""Example loader configuration from testcontainer or environment"""
# Use testcontainers for CI, or fall back to environment variables
if TESTCONTAINERS_AVAILABLE and USE_TESTCONTAINERS:
# Set up testcontainer (if applicable)
example_container = request.getfixturevalue('example_container')
return {
'host': example_container.get_container_host_ip(),
'port': example_container.get_exposed_port(5432),
'database': 'test_db',
'user': 'test_user',
'password': 'test_pass',
}
else:
# Fall back to environment variables
return {
'host': os.getenv('EXAMPLE_HOST', 'localhost'),
'port': int(os.getenv('EXAMPLE_PORT', '5432')),
'database': os.getenv('EXAMPLE_DB', 'test_db'),
'user': os.getenv('EXAMPLE_USER', 'test_user'),
'password': os.getenv('EXAMPLE_PASSWORD', 'test_pass'),
}
```

### Step 2: Create Test Configuration Class

Create `tests/integration/loaders/backends/test_example.py`:

```python
"""
Example loader integration tests using generalized test infrastructure.
"""

from typing import Any, Dict, List, Optional
import pytest
import pyarrow as pa
from src.amp.loaders.base import LoadMode

from src.amp.loaders.implementations.example_loader import ExampleLoader
from tests.integration.loaders.conftest import LoaderTestConfig
from tests.integration.loaders.test_base_loader import BaseLoaderTests
from tests.integration.loaders.test_base_streaming import BaseStreamingTests


class ExampleTestConfig(LoaderTestConfig):
"""Example-specific test configuration"""

loader_class = ExampleLoader
config_fixture_name = 'example_test_config'

# Declare loader capabilities
supports_overwrite = True
supports_streaming = True # Set to False if no streaming support
supports_multi_network = True # For blockchain loaders with reorg
supports_null_values = True

def get_row_count(self, loader: ExampleLoader, table_name: str) -> int:
"""Get row count from table"""
# Implement using your loader's API
return loader._connection.query(f"SELECT COUNT(*) FROM {table_name}")[0]['count']

def query_rows(
self,
loader: ExampleLoader,
table_name: str,
where: Optional[str] = None,
order_by: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Query rows from table"""
query = f"SELECT * FROM {table_name}"
if where:
query += f" WHERE {where}"
if order_by:
query += f" ORDER BY {order_by}"
return loader._connection.query(query)

def cleanup_table(self, loader: ExampleLoader, table_name: str) -> None:
"""Drop table"""
loader._connection.execute(f"DROP TABLE IF EXISTS {table_name}")

def get_column_names(self, loader: ExampleLoader, table_name: str) -> List[str]:
"""Get column names from table"""
result = loader._connection.query(
f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name}'"
)
return [row['column_name'] for row in result]


# Core tests - ALL loaders must inherit these
class TestExampleCore(BaseLoaderTests):
"""Inherits 7 core tests: connection, context manager, batching, modes, null handling, errors"""
config = ExampleTestConfig()

@pytest.fixture
def example_config():
return {
'host': 'localhost',
'port': 5432,
'database': 'test_db',
'user': 'test_user',
'password': 'test_pass'
}

@pytest.fixture
def test_data():
return pa.Table.from_pydict({
'id': [1, 2, 3],
'name': ['a', 'b', 'c'],
'value': [1.0, 2.0, 3.0]
})
# Streaming tests - Only for loaders with streaming/reorg support
class TestExampleStreaming(BaseStreamingTests):
"""Inherits 5 streaming tests: metadata columns, reorg deletion, overlapping ranges, multi-network, microbatch dedup"""
config = ExampleTestConfig()


# Loader-specific tests
@pytest.mark.integration
@pytest.mark.example
class TestExampleLoaderIntegration:
def test_connection(self, example_config):
loader = ExampleLoader(example_config)

loader.connect()
assert loader.is_connected

loader.disconnect()
assert not loader.is_connected

def test_basic_loading(self, example_config, test_data):
loader = ExampleLoader(example_config)

class TestExampleSpecific:
"""Example-specific functionality tests"""
config = ExampleTestConfig()

def test_custom_feature(self, loader, test_table_name, cleanup_tables):
"""Test example-specific functionality"""
cleanup_tables.append(test_table_name)

with loader:
result = loader.load_table(test_data, 'test_table')

# Test your loader's unique features
result = loader.some_custom_method(test_table_name)
assert result.success
assert result.rows_loaded == 3
assert result.metadata['operation'] == 'load_table'
assert result.metadata['batches_processed'] > 0
```

### What You Get Automatically

By inheriting from the base test classes, you automatically get:

**From `BaseLoaderTests` (7 core tests):**
- `test_connection` - Connection establishment and disconnection
- `test_context_manager` - Context manager functionality
- `test_batch_loading` - Basic batch loading
- `test_append_mode` - Append mode operations
- `test_overwrite_mode` - Overwrite mode operations
- `test_null_handling` - Null value handling
- `test_error_handling` - Error scenarios

**From `BaseStreamingTests` (5 streaming tests):**
- `test_streaming_metadata_columns` - Metadata column creation
- `test_reorg_deletion` - Blockchain reorganization handling
- `test_reorg_overlapping_ranges` - Overlapping range invalidation
- `test_reorg_multi_network` - Multi-network reorg isolation
- `test_microbatch_deduplication` - Microbatch duplicate detection

### Required LoaderTestConfig Methods

You must implement these four methods in your `LoaderTestConfig` subclass:

```python
def get_row_count(self, loader, table_name: str) -> int:
"""Return number of rows in table"""

def query_rows(self, loader, table_name: str, where=None, order_by=None) -> List[Dict]:
"""Query and return rows as list of dicts"""

def cleanup_table(self, loader, table_name: str) -> None:
"""Drop/delete the table"""

def get_column_names(self, loader, table_name: str) -> List[str]:
"""Return list of column names"""
```

### Capability Flags

Set these flags in your `LoaderTestConfig` to control which tests run:

```python
supports_overwrite = True # Can overwrite existing data
supports_streaming = True # Supports streaming with metadata
supports_multi_network = True # Supports multi-network isolation (blockchain loaders)
supports_null_values = True # Handles NULL values correctly
```

### Running Tests

```bash
# Run all integration tests
make test-integration
# Run all tests for your loader
uv run pytest tests/integration/loaders/backends/test_example.py -v

# Run only core tests
uv run pytest tests/integration/loaders/backends/test_example.py::TestExampleCore -v

# Run specific loader tests
make test-example
# Run only streaming tests
uv run pytest tests/integration/loaders/backends/test_example.py::TestExampleStreaming -v

# Run with environment variables
uv run --env-file .test.env pytest tests/integration/test_example_loader.py -v
# Run specific test
uv run pytest tests/integration/loaders/backends/test_example.py::TestExampleCore::test_connection -v
```

## Best Practices
Expand Down Expand Up @@ -645,5 +781,3 @@ class KeyValueLoader(DataLoader[KeyValueConfig]):
'database': self.config.database
}
```

This documentation provides everything needed to implement new data loaders efficiently and consistently!
Loading
Loading