diff --git a/.github/actions/install_eitprocessing/action.yml b/.github/actions/install_eitprocessing/action.yml index 283b8d38c..fc38c326b 100644 --- a/.github/actions/install_eitprocessing/action.yml +++ b/.github/actions/install_eitprocessing/action.yml @@ -8,17 +8,13 @@ inputs: dependencies: description: "The optional dependencies of eitprocessing to install" required: false - extract-data: - description: "Whether to extract testing data" - required: false - default: "false" token: description: "GitHub TOKEN" required: true - data-directory: - description: "Directory where to store eitprocessing data" + zenodo-record-id: + description: "Zenodo record ID for test dataset" required: false - default: ${{ github.workspace }}/../eitprocessing_data/ + default: "17423608" runs: using: "composite" @@ -26,18 +22,6 @@ runs: - uses: actions/setup-python@v5 with: python-version: ${{ inputs.python-version }} - - uses: docker/login-action@v3 - if: ${{ inputs.extract-data == 'true' }} - with: - registry: ghcr.io - username: psomhorst - password: ${{ inputs.token }} - - uses: shrink/actions-docker-extract@v3 - if: ${{ inputs.extract-data == 'true' }} - with: - image: ghcr.io/eit-alive/eittestdata:latest - destination: ${{ inputs.data-directory }} - path: /eitprocessing/. - uses: actions/cache@v4 id: cache-python-env with: diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 99e3bd750..4ea9aab49 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -16,21 +16,45 @@ jobs: os: ["ubuntu-latest"] python-version: ["3.10"] env: - EIT_PROCESSING_TEST_DATA: ${{ github.workspace }}/../eitprocessing_data/ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} COVERALLS_FLAG_NAME: python-${{ matrix.python-version }} + ZENODO_RECORD_ID: 17423608 steps: - uses: actions/checkout@v4 - uses: ./.github/actions/install_eitprocessing with: dependencies: testing - extract-data: true python-version: ${{ matrix.python-version }} token: ${{ secrets.GITHUB_TOKEN }} - data-directory: ${{ env.EIT_PROCESSING_TEST_DATA }} + - name: Set data dir + shell: bash + id: set-path + run: echo "data_dir=$GITHUB_WORKSPACE/test_data" >> "$GITHUB_OUTPUT" + - name: Restore dataset cache + id: cache + uses: actions/cache@v4 + with: + path: ${{ steps.set-path.outputs.data_dir }} + key: eitprocessing-testdata-zenodo.${{ env.ZENODO_RECORD_ID }} + - name: Install zenodo-get + if: steps.cache.outputs.cache-hit != 'true' + run: python3 -m pip install zenodo-get + shell: bash + - name: Download test dataset + shell: bash + if: steps.cache.outputs['cache-hit'] != 'true' + run: | + mkdir -p ${{ steps.set-path.outputs.data_dir }} + cd ${{ steps.set-path.outputs.data_dir }} + zenodo_get $ZENODO_RECORD_ID + cd - + - name: Uninstall zenodo-get + if: steps.cache.outputs['cache-hit'] != 'true' + run: python3 -m pip uninstall --yes zenodo-get + shell: bash - name: Run coveralls run: | - pytest --cov --cov-report xml --cov-report term --cov-report html - git config --global --add safe.directory /ci + pytest --cov --cov-report xml --cov-report term --cov-report html + git config --global --add safe.directory /ci coveralls --service=github diff --git a/.github/workflows/release_github.yml b/.github/workflows/release_github.yml index 5e558b237..62c5a5e47 100644 --- a/.github/workflows/release_github.yml +++ b/.github/workflows/release_github.yml @@ -1,5 +1,4 @@ name: Draft GitHub Release - on: workflow_dispatch: inputs: @@ -25,6 +24,7 @@ permissions: env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} GH_PAGER: cat + ZENODO_RECORD_ID: 17423608 jobs: checks: @@ -77,8 +77,6 @@ jobs: os: ["ubuntu-latest"] python-version: ["3.10", "3.11", "3.12", "3.13"] name: Build for ${{ matrix.python-version }}, ${{ matrix.os }} - env: - EIT_PROCESSING_TEST_DATA: ${{ github.workspace }}/../eitprocessing_data/ steps: - name: Checkout repository uses: actions/checkout@v4 @@ -87,11 +85,34 @@ jobs: uses: ./.github/actions/install_eitprocessing with: dependencies: testing - extract-data: true python-version: ${{ matrix.python-version }} token: ${{ secrets.GITHUB_TOKEN }} - data-directory: ${{ env.EIT_PROCESSING_TEST_DATA }} - + - name: Set data dir + shell: bash + id: set-path + run: echo "data_dir=$GITHUB_WORKSPACE/test_data" >> "$GITHUB_OUTPUT" + - name: Restore dataset cache + id: cache + uses: actions/cache@v4 + with: + path: ${{ steps.set-path.outputs.data_dir }} + key: eitprocessing-testdata-zenodo.${{ env.ZENODO_RECORD_ID }} + - name: Install zenodo-get + if: steps.cache.outputs.cache-hit != 'true' + run: python3 -m pip install zenodo-get + shell: bash + - name: Download test dataset + shell: bash + if: steps.cache.outputs['cache-hit'] != 'true' + run: | + mkdir -p ${{ steps.set-path.outputs.data_dir }} + cd ${{ steps.set-path.outputs.data_dir }} + zenodo_get $ZENODO_RECORD_ID + cd - + - name: Uninstall zenodo-get + if: steps.cache.outputs['cache-hit'] != 'true' + run: python3 -m pip uninstall --yes zenodo-get + shell: bash - name: Run pytest run: pytest -v --runslow diff --git a/.github/workflows/release_pypi.yml b/.github/workflows/release_pypi.yml index 21369470f..e378911d8 100644 --- a/.github/workflows/release_pypi.yml +++ b/.github/workflows/release_pypi.yml @@ -16,7 +16,6 @@ jobs: - uses: ./.github/actions/install_eitprocessing with: dependencies: publishing - extract-data: false python-version: "3.10" - name: Build wheel and source distribution run: python -m build diff --git a/.github/workflows/test_build_documentation.yml b/.github/workflows/test_build_documentation.yml index 425e9ee54..627c4fd41 100644 --- a/.github/workflows/test_build_documentation.yml +++ b/.github/workflows/test_build_documentation.yml @@ -23,7 +23,6 @@ jobs: - uses: ./.github/actions/install_eitprocessing with: dependencies: docs - extract-data: false python-version: "3.10" - name: Link notebooks run: ln -s ../../notebooks docs/examples diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 39f979056..62bfa4950 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -1,5 +1,6 @@ name: Build and Test - +env: + ZENODO_RECORD_ID: 17423608 on: push: branches: @@ -14,7 +15,6 @@ on: branches: - main - develop - jobs: test_and_build: if: github.event.pull_request.draft == false @@ -25,16 +25,38 @@ jobs: os: ["ubuntu-latest"] python-version: ["3.10"] name: Test and build for ${{ matrix.python-version }}, ${{ matrix.os }} - env: - EIT_PROCESSING_TEST_DATA: ${{ github.workspace }}/../eitprocessing_data/ steps: - uses: actions/checkout@v4 - uses: ./.github/actions/install_eitprocessing with: dependencies: testing - extract-data: true python-version: ${{ matrix.python-version }} token: ${{ secrets.GITHUB_TOKEN }} - data-directory: ${{ env.EIT_PROCESSING_TEST_DATA }} + - name: Set data dir + shell: bash + id: set-path + run: echo "data_dir=$GITHUB_WORKSPACE/test_data" >> "$GITHUB_OUTPUT" + - name: Restore dataset cache + id: cache + uses: actions/cache@v4 + with: + path: ${{ steps.set-path.outputs.data_dir }} + key: eitprocessing-testdata-zenodo.${{ env.ZENODO_RECORD_ID }} + - name: Install zenodo-get + if: steps.cache.outputs.cache-hit != 'true' + run: python3 -m pip install zenodo-get + shell: bash + - name: Download test dataset + shell: bash + if: steps.cache.outputs['cache-hit'] != 'true' + run: | + mkdir -p ${{ steps.set-path.outputs.data_dir }} + cd ${{ steps.set-path.outputs.data_dir }} + zenodo_get $ZENODO_RECORD_ID + cd - + - name: Uninstall zenodo-get + if: steps.cache.outputs['cache-hit'] != 'true' + run: python3 -m pip uninstall --yes zenodo-get + shell: bash - name: Run pytest run: pytest -v diff --git a/README.dev.md b/README.dev.md index 22ed56b01..51c5c0722 100644 --- a/README.dev.md +++ b/README.dev.md @@ -12,7 +12,7 @@ Please follow these steps: 1. (**important**) announce your plan to the rest of the community _before you start working_. This announcement should be in the form of a (new) issue; 1. (**important**) wait until some kind of consensus is reached about your idea being a good idea; -1. if needed, fork the repository to your own Github profile and create your own feature branch off of the latest master commit. While working on your feature branch, make sure to stay up to date with the master branch by pulling in changes, possibly from the 'upstream' repository (follow the instructions [here](https://help.github.com/articles/configuring-a-remote-for-a-fork/) and [here](https://help.github.com/articles/syncing-a-fork/)); +1. if needed, fork the repository to your own GitHub profile and create your own feature branch off of the latest master commit. While working on your feature branch, make sure to stay up to date with the master branch by pulling in changes, possibly from the 'upstream' repository (follow the instructions [here](https://help.github.com/articles/configuring-a-remote-for-a-fork/) and [here](https://help.github.com/articles/syncing-a-fork/)); 1. make sure the existing tests still work by running `pytest` (see also [here](#testing-locally)); 1. add your own tests (if necessary); 1. update or expand the documentation; @@ -31,7 +31,7 @@ readibility or simplicity is more important than absolute correctness. It is hard to define the precise balance we are looking for, so instead we will refer to the [Zen of python](https://peps.python.org/pep-0020/). -Note that all contrubtions to this project will be published under our [Apache 2.0 licence] +Note that all contributions to this project will be published under our [Apache 2.0 licence] (). #### Docstrings @@ -49,7 +49,7 @@ place to start. This extension is currently in preview, but seems to work more r #### Branch naming convention Please try to adhere to the following branch naming convention: -__. +`__`. E.g., `042_life_universe_everything_douglasadams`. This allows, at a single glance, to see in the issue that you're @@ -98,6 +98,38 @@ the CI. Make sure you have developer options installed as described in the [README](README.md) (otherwise run: `pip install -e .[dev]` on the repository folder in your environment) +##### Downloading test data +Some tests require access to test data. You can download the test data from Zenodo via the button below. Note that for +some reason downloading all files at ones results in a corrupted zip file. Please download the files one by one. + +[![](https://zenodo.org/badge/DOI/10.5281/zenodo.17423608.svg)](https://doi.org/10.5281/zenodo.17423608) + +Test data should reside in the `test_data/` folder in the root of the repository. + +Alternatively, use zenodo-get to download the data directly into the `test_data/` folder: + +Using `uv`: + +```shell +mkdir -p test_data +cd test_data +uv tool run zenodo_get 17423608 +cd - +``` + +Using `pip`: + +```shell +pip install zenodo-get +mkdir -p test_data +cd test_data +zenodo_get 17423608 +cd - +``` + + +##### Running tests + For testing all you need to do is run: ```shell @@ -120,7 +152,9 @@ coverage report We use [ruff](https://docs.astral.sh/ruff/) for linting, sorting imports and formatting of python (notebook) files. The configurations of `ruff` are set in [pyproject.toml](pyproject.toml) file. -If you are using VS code, please install and activate the [Ruff extension](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) to automatically format and check linting. +If you are using VS code, please install and activate the [Ruff +extension](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) to automatically format and check +linting. Make sure to use the ruff version installed in your environment. Otherwise, please ensure check both linting (`ruff fix .`) and formatting (`ruff format .`) before requesting a review. diff --git a/eitprocessing/datahandling/eitdata.py b/eitprocessing/datahandling/eitdata.py index 6b4930ab6..ea50d9bf0 100644 --- a/eitprocessing/datahandling/eitdata.py +++ b/eitprocessing/datahandling/eitdata.py @@ -60,6 +60,15 @@ def __post_init__(self, suppress_simulated_warning: bool) -> None: self.path = self.path[0] self.name = self.name or self.label + old_sample_frequency = self.sample_frequency + self.sample_frequency = float(self.sample_frequency) + if self.sample_frequency != old_sample_frequency: + msg = ( + "Sample frequency could not be correctly converted from " + f"{old_sample_frequency} ({type(old_sample_frequency)}) to " + f"{self.sample_frequency:.1f} (float)." + ) + raise TypeError(msg) if (lv := len(self.pixel_impedance)) != (lt := len(self.time)): msg = f"The number of time points ({lt}) does not match the number of pixel impedance values ({lv})." diff --git a/eitprocessing/datahandling/loading/draeger.py b/eitprocessing/datahandling/loading/draeger.py index 523cd3666..5fb048fd8 100644 --- a/eitprocessing/datahandling/loading/draeger.py +++ b/eitprocessing/datahandling/loading/draeger.py @@ -1,12 +1,15 @@ from __future__ import annotations +import math import mmap import sys import warnings from functools import partial from typing import TYPE_CHECKING, NamedTuple +from warnings import catch_warnings import numpy as np +import scipy as sp from eitprocessing.datahandling.continuousdata import ContinuousData from eitprocessing.datahandling.datacollection import DataCollection @@ -24,9 +27,10 @@ load_draeger_data = partial(load_eit_data, vendor=Vendor.DRAEGER) NAN_VALUE_INDICATOR = -1e30 +SAMPLE_FREQUENCY_ESTIMATION_PRECISION = 4 -def load_from_single_path( +def load_from_single_path( # noqa: PLR0915 path: Path, sample_frequency: float | None = None, first_frame: int = 0, @@ -59,7 +63,10 @@ def load_from_single_path( msg = f"Invalid input: `first_frame` ({f0}) is larger than the total number of frames in the file ({fn})." raise ValueError(msg) - n_frames = min(total_frames - first_frame, max_frames or sys.maxsize) + n_frames = min(total_frames - first_frame, max_frames if max_frames is not None else sys.maxsize) + if n_frames < 1: + msg = f"No frames to load with `{first_frame=}` and `{max_frames=}`." + raise ValueError(msg) if max_frames and max_frames != n_frames: msg = ( @@ -103,6 +110,10 @@ def load_from_single_path( # time wraps around the number of seconds in a day time = np.unwrap(time, period=24 * 60 * 60) + if not np.all(np.diff(time) > 0): + msg = "The time axis is not strictly monotonically increasing." + raise ValueError(msg) + sample_frequency = _estimate_sample_frequency(time, sample_frequency) eit_data = EITData( @@ -184,19 +195,47 @@ def load_from_single_path( def _estimate_sample_frequency(time: np.ndarray, sample_frequency: float | None) -> float: """Estimate the sample frequency from the time axis, and check with provided sample frequency.""" - estimated_sample_frequency = round((len(time) - 1) / (time[-1] - time[0]), 4) + with catch_warnings(): + warnings.filterwarnings("ignore", category=RuntimeWarning) + unrounded_estimated_sample_frequency = 1 / sp.stats.linregress(np.arange(len(time)), time).slope - if sample_frequency is None: - return estimated_sample_frequency + if np.isnan(unrounded_estimated_sample_frequency): + msg = ( + "Could not estimate sample frequency from time axis, " + f"which could be due to too few data points ({len(time)})." + ) + if sample_frequency is not None: + warnings.warn(msg, RuntimeWarning, stacklevel=2) + return float(sample_frequency) - if sample_frequency != estimated_sample_frequency: + raise ValueError(msg) + + # Rounds to the number of digits, rather than the number of decimals + estimated_sample_frequency = round( + unrounded_estimated_sample_frequency, + -math.ceil(np.log10(abs(unrounded_estimated_sample_frequency))) + SAMPLE_FREQUENCY_ESTIMATION_PRECISION, + ) + + if sample_frequency is None: + return float(estimated_sample_frequency) + if not isinstance(sample_frequency, (int, float)): + msg = f"Provided sample frequency has invalid type {type(sample_frequency)}; should be int or float." + raise TypeError(msg) + + if not np.isclose( + sample_frequency, unrounded_estimated_sample_frequency, rtol=10**-SAMPLE_FREQUENCY_ESTIMATION_PRECISION, atol=0 + ): msg = ( - f"Provided sample frequency ({sample_frequency}) does not match " - f"the estimated sample frequency ({estimated_sample_frequency})." + "Provided sample frequency " + f"({sample_frequency:.{SAMPLE_FREQUENCY_ESTIMATION_PRECISION + 2}f} Hz) " + "does not match the estimated sample frequency " + f"({unrounded_estimated_sample_frequency:.{SAMPLE_FREQUENCY_ESTIMATION_PRECISION + 2}f} Hz) " + f"within {SAMPLE_FREQUENCY_ESTIMATION_PRECISION} digits. " + "Note that the estimate might not be as accurate for very short signals." ) warnings.warn(msg, RuntimeWarning, stacklevel=2) - return sample_frequency + return float(sample_frequency) def _convert_medibus_data( @@ -249,7 +288,7 @@ def _read_frame( index is non-negative. When the index is negative, no data is saved. In any case, the event marker is returned. """ - frame_time = round(reader.float64() * 24 * 60 * 60, 3) + frame_time = reader.float64() * 24 * 60 * 60 _ = reader.float32() frame_pixel_impedance = reader.npfloat32(length=1024) frame_pixel_impedance = np.reshape(frame_pixel_impedance, (32, 32), "C") diff --git a/eitprocessing/datahandling/loading/sentec.py b/eitprocessing/datahandling/loading/sentec.py index 8a257916b..5a252eda2 100644 --- a/eitprocessing/datahandling/loading/sentec.py +++ b/eitprocessing/datahandling/loading/sentec.py @@ -5,7 +5,7 @@ import warnings from enum import IntEnum from functools import partial -from typing import TYPE_CHECKING, BinaryIO +from typing import TYPE_CHECKING import numpy as np @@ -27,92 +27,59 @@ load_sentec_data = partial(load_eit_data, vendor=Vendor.SENTEC) -def load_from_single_path( # noqa: C901, PLR0912 +def load_from_single_path( path: Path, - sample_frequency: float | None = 50.2, + sample_frequency: float | None = None, first_frame: int = 0, max_frames: int | None = None, ) -> dict[str, DataCollection]: """Load Sentec EIT data from path.""" + time: list[float] = [] + index = 0 with path.open("br") as fo, mmap.mmap(fo.fileno(), length=0, access=mmap.ACCESS_READ) as fh: file_length = os.fstat(fo.fileno()).st_size reader = BinReader(fh, endian="little") version = reader.uint8() - time: list[float] = [] max_n_images = int(file_length / 32 / 32 / 4) + if max_frames is not None: + max_n_images = min(max_n_images, max_frames) + image = np.full(shape=(max_n_images, 32, 32), fill_value=np.nan) - index = 0 - n_images_added = 0 # while there are still data to be read and the number of read data points is higher # than the maximum specified, keep reading - while fh.tell() < file_length and (max_frames is None or len(time) < max_frames): + while fh.tell() < file_length and (len(time) < max_n_images): _ = reader.uint64() # skip timestamp reading domain_id = reader.uint8() number_data_fields = reader.uint8() for _ in range(number_data_fields): - data_id = reader.uint8() - payload_size = reader.uint16() - - if payload_size == 0: - continue - - if domain_id == Domain.MEASUREMENT: - if data_id == MeasurementDataID.TIMESTAMP: - time_of_caption = reader.uint32() - time.append(time_of_caption) - - elif data_id == MeasurementDataID.ZERO_REF_IMAGE: - index += 1 - - ref = _read_frame( - fh, - version, - index, - payload_size, - reader, - first_frame, - ) - - if ref is not None: - image[n_images_added, :, :] = ref - n_images_added += 1 - else: - fh.seek(payload_size, os.SEEK_CUR) - - # read the sample frequency from the file, if present - # (domain 64 = configuration, data 5 = sample frequency) - elif domain_id == Domain.CONFIGURATION and data_id == ConfigurationDataID.SAMPLE_FREQUENCY: - sample_frequency = np.round(reader.float32(), 4) - msg = ( - "Sample frequency value found in file. " - f"The sample frequency value will be set to {sample_frequency:.2f}" - ) - warnings.warn(msg, RuntimeWarning, stacklevel=2) - - else: - fh.seek(payload_size, os.SEEK_CUR) - image = image[:n_images_added, :, :] - n_frames = len(image) if image is not None else 0 - - if (f0 := first_frame) > (fn := index): - msg = f"Invalid input: `first_frame` ({f0}) is larger than the total number of frames in the file ({fn})." + index, sample_frequency = _read_data_field( + reader, domain_id, index, first_frame, fh, time, version, image, sample_frequency + ) + + if first_frame >= index: + msg = f"`first_frame` ({first_frame}) is larger than or equal to the number of frames in the file ({index})." raise ValueError(msg) + image = image[first_frame:index, :, :] + n_frames = len(image) + if max_frames and n_frames != max_frames: msg = ( f"The number of frames requested ({max_frames}) is larger " f"than the available number ({n_frames}) of frames after " f"the first frame selected ({first_frame}, total frames: " - f"{index}).\n {n_frames} frames will be loaded." + f"{index}).\n {n_frames} frames were loaded." ) warnings.warn(msg, RuntimeWarning, stacklevel=2) - if not sample_frequency: + if sample_frequency is None: sample_frequency = SENTEC_SAMPLE_FREQUENCY + time_array = np.unwrap(np.array(time), period=np.iinfo(np.uint32).max) / 1_000_000 + eitdata_collection = DataCollection(EITData) eitdata_collection.add( EITData( @@ -120,7 +87,7 @@ def load_from_single_path( # noqa: C901, PLR0912 path=path, sample_frequency=sample_frequency, nframes=n_frames, - time=np.unwrap(np.array(time), period=np.iinfo(np.uint32).max) / 1000000, + time=time_array, label="raw", pixel_impedance=image, ), @@ -134,13 +101,62 @@ def load_from_single_path( # noqa: C901, PLR0912 } +def _read_data_field( + reader: BinReader, + domain_id: int, + index: int, + first_frame: int, + fh: mmap.mmap, + time: list[float], + version: int, + image: np.ndarray, + sample_frequency: float | None, +) -> tuple[int, float | None]: + """Reads the specified data field from file, and returns an updated index and sample frequency.""" + data_id = reader.uint8() + payload_size = reader.uint16() + + if payload_size == 0: + return index, sample_frequency + + match domain_id, data_id: + case Domain.MEASUREMENT, MeasurementDataID.TIMESTAMP: + if index < first_frame: + fh.seek(payload_size, os.SEEK_CUR) + else: + time.append(reader.uint32()) + + case Domain.MEASUREMENT, MeasurementDataID.ZERO_REF_IMAGE: + if index < first_frame: + fh.seek(payload_size, os.SEEK_CUR) + else: + ref = _read_frame(version=version, payload_size=payload_size, reader=reader) + image[index, :, :] = ref + + index += 1 + + case Domain.CONFIGURATION, ConfigurationDataID.SAMPLE_FREQUENCY: + loaded_sample_frequency = np.round(reader.float32(), 4) + if sample_frequency and not np.isclose(loaded_sample_frequency, sample_frequency): + msg = ( + f"Sample frequency provided ({sample_frequency:.2f} Hz) " + f"differs from value found in file " + f"({loaded_sample_frequency:.2f} Hz). " + f"The sample frequency value found in the file will be used." + ) + warnings.warn(msg, RuntimeWarning, stacklevel=2) + sample_frequency = loaded_sample_frequency + + case _, _: + fh.seek(payload_size, os.SEEK_CUR) + + return index, sample_frequency + + def _read_frame( - fh: BinaryIO | mmap.mmap, version: int, - index: int, payload_size: int, reader: BinReader, - first_frame: int = 0, ) -> NDArray | None: """Read a single frame in the file. @@ -157,10 +173,6 @@ def _read_frame( Returns: A 32 x 32 matrix, containing the pixels values. """ - if index < first_frame: - fh.seek(payload_size, os.SEEK_CUR) - return None - if version > 1: # read quality index. We don't use it, so we skip the bytes _ = reader.uint8() @@ -171,19 +183,16 @@ def _read_frame( image_width = reader.uint8() image_height = reader.uint8() - # the sign of the zero_ref values has to be inverted - zero_ref = -reader.npfloat32(n_pixels) - if image_width * image_height != n_pixels: msg = ( - f"The length of image array is " - f"{n_pixels} which is not equal to the " - f"product of the width ({image_width}) and " - f"height ({image_height}) of the frame." - f"Image will not be stored." + f"The length of image array is {n_pixels} which is not equal to " + f"the product of the width ({image_width}) and height " + f"({image_height}) of the frame." ) raise OSError(msg) + # the sign of the zero_ref values has to be inverted + zero_ref = -reader.npfloat32(n_pixels) return np.reshape(zero_ref, (image_width, image_height), order="C") diff --git a/eitprocessing/datahandling/sequence.py b/eitprocessing/datahandling/sequence.py index e52709721..f4a4b768f 100644 --- a/eitprocessing/datahandling/sequence.py +++ b/eitprocessing/datahandling/sequence.py @@ -80,27 +80,25 @@ def __len__(self): return len(self.time) def __add__(self, other: Sequence) -> Sequence: - return self.concatenate(self, other) + return self.concatenate(other) - @classmethod # TODO: why is this a class method? In other cases it's instance method def concatenate( - cls, - a: Sequence, - b: Sequence, + self: Sequence, + other: Sequence, newlabel: str | None = None, ) -> Sequence: """Create a merge of two Sequence objects.""" # TODO: rewrite - concat_eit = a.eit_data.concatenate(b.eit_data) - concat_continuous = a.continuous_data.concatenate(b.continuous_data) - concat_sparse = a.sparse_data.concatenate(b.sparse_data) - concat_interval = a.interval_data.concatenate(b.interval_data) + concat_eit = self.eit_data.concatenate(other.eit_data) + concat_continuous = self.continuous_data.concatenate(other.continuous_data) + concat_sparse = self.sparse_data.concatenate(other.sparse_data) + concat_interval = self.interval_data.concatenate(other.interval_data) - newlabel = newlabel or a.label + newlabel = newlabel or self.label # TODO: add concatenation of other attached objects - return a.__class__( + return self.__class__( eit_data=concat_eit, continuous_data=concat_continuous, sparse_data=concat_sparse, diff --git a/tests/conftest.py b/tests/conftest.py index be4181dc4..ef4660e9a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,24 +1,15 @@ -import os from pathlib import Path import pytest -from eitprocessing.datahandling.loading import load_eit_data from eitprocessing.datahandling.sequence import Sequence -# ruff: noqa: ERA001 #TODO: remove this line +environment = Path(__file__).resolve().parent.parent +data_directory = Path(environment) / "test_data" # overwrite for new style tests +pytest_plugins = [ + "tests.fixtures.eitdata", # load fixtures from different modules as 'plugins' as workaround +] -environment = os.environ.get( - "EIT_PROCESSING_TEST_DATA", - Path.resolve(Path(__file__).parent.parent), -) -data_directory = Path(environment) / "tests" / "test_data" -draeger_file1 = data_directory / "Draeger_Test3.bin" -draeger_file2 = data_directory / "Draeger_Test.bin" -draeger_file3 = data_directory / "Draeger_Test_event_on_first_frame.bin" -draeger_wrapped_time_axis_file = data_directory / "Draeger_wrapped_time_axis.bin" -draeger_file_pp = data_directory / "Draeger_PP_data.bin" -timpel_file = data_directory / "Timpel_test.txt" dummy_file = data_directory / "not_a_file.dummy" @@ -39,45 +30,14 @@ def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item item.add_marker(skip_slow) -@pytest.fixture(scope="session") -def draeger1() -> Sequence: - return load_eit_data(draeger_file1, vendor="draeger", sample_frequency=20, label="draeger1") - - -@pytest.fixture(scope="session") -def draeger2() -> Sequence: - return load_eit_data(draeger_file2, vendor="draeger", sample_frequency=20, label="draeger2") - - -@pytest.fixture(scope="session") -def draeger_both() -> Sequence: - return load_eit_data([draeger_file2, draeger_file1], vendor="draeger", sample_frequency=20, label="draeger_both") - - -@pytest.fixture(scope="session") -def draeger_pp() -> Sequence: - return load_eit_data(draeger_file_pp, vendor="draeger", sample_frequency=50, label="draeger2") - - -@pytest.fixture(scope="session") -def timpel1() -> Sequence: - return load_eit_data(timpel_file, vendor="timpel", label="timpel") - - -@pytest.fixture(scope="session") -def draeger_wrapped_time_axis() -> Sequence: - return load_eit_data( - draeger_wrapped_time_axis_file, vendor="draeger", sample_frequency=20, label="draeger_wrapped_time_axis" - ) - - -# @pytest.fixture(scope="session") -# def timpel_double(): -# return load_eit_data([timpel_file, timpel_file], vendor="timpel", label="timpel_double") - - # TODO: Replace request.getfixturevalue() with sequence where possible in other tests @pytest.fixture def sequence(request: pytest.FixtureRequest) -> Sequence: """Return a Sequence fixture.""" return request.getfixturevalue(request.param) + + +@pytest.fixture +def sequence_path(request: pytest.FixtureRequest) -> Path: + """Return a Sequence fixture.""" + return request.getfixturevalue(request.param) diff --git a/tests/eitdata/__init__.py b/tests/eitdata/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/eitdata/test_loading_draeger.py b/tests/eitdata/test_loading_draeger.py new file mode 100644 index 000000000..1bea8d9e7 --- /dev/null +++ b/tests/eitdata/test_loading_draeger.py @@ -0,0 +1,198 @@ +import sys +import tempfile +from pathlib import Path + +import numpy as np +import pytest + +from eitprocessing.datahandling.eitdata import EITData +from eitprocessing.datahandling.loading import load_eit_data +from eitprocessing.datahandling.loading.draeger import _bin_file_formats +from eitprocessing.datahandling.sequence import Sequence + + +# TODO: create/find data with 6 continuous data channels +@pytest.mark.parametrize( + ("sequence", "sequence_path", "length", "n_continuous_channels", "sample_frequency"), + [ + ("draeger_20hz_healthy_volunteer", "draeger_20hz_healthy_volunteer_path", 6920, 10, 20), + ("draeger_20hz_healthy_volunteer_fixed_rr", "draeger_20hz_healthy_volunteer_fixed_rr_path", 7340, 10, 20), + ( + "draeger_20hz_healthy_volunteer_pressure_pod", + "draeger_20hz_healthy_volunteer_pressure_pod_path", + 1320, + 10, + 20, + ), + ( + "draeger_50hz_healthy_volunteer_pressure_pod", + "draeger_50hz_healthy_volunteer_pressure_pod_path", + 3700, + 10, + 50, + ), + ( + "draeger_20hz_healthy_volunteer_time_wrap_v120", + "draeger_20hz_healthy_volunteer_time_wrap_v120_path", + 2460, + 6, + 20, + ), + ( + "draeger_20hz_healthy_volunteer_time_wrap_v130", + "draeger_20hz_healthy_volunteer_time_wrap_v130_path", + 2460, + 10, + 20, + ), + ], + indirect=["sequence", "sequence_path"], +) +def test_load_draeger( + sequence: Sequence, + sequence_path: Path, + length: int, + n_continuous_channels: int, + sample_frequency: float, +): + assert isinstance(sequence, Sequence), "Loaded object should be a Sequence" + assert isinstance(sequence.eit_data["raw"], EITData), "Sequence should contain EITData with 'raw' key" + assert sequence.eit_data["raw"].path == sequence_path + assert sequence.eit_data["raw"].sample_frequency == sample_frequency, ( + f"Sample frequency should be {sample_frequency:.1f} Hz" + ) + assert len(sequence.eit_data["raw"]) == len(sequence.eit_data["raw"].time), ( + "Length of EITData should match length of time axis" + ) + assert len(sequence.eit_data["raw"].time) == length, f"{sequence.label} should contain 14140 frames" + + assert len(sequence.continuous_data) == n_continuous_channels + 1, ( + f"Draeger data should have {n_continuous_channels} continuous medibus fields + the calculated global impedance" + ) + + assert sequence == load_eit_data( + sequence_path, vendor="draeger", sample_frequency=sample_frequency, label=sequence.label + ), "Loading with same parameters should yield same data" + assert sequence == load_eit_data( + sequence_path, vendor="draeger", sample_frequency=sample_frequency, label="something_else" + ), "Loading with different label should yield same data" + assert sequence == load_eit_data(sequence_path, vendor="draeger"), ( + "Loading without sample frequency should yield the same data" + ) + + +def test_draeger_20hz_healthy_volunteer_2_differ( + draeger_20hz_healthy_volunteer: Sequence, draeger_20hz_healthy_volunteer_fixed_rr: Sequence +): + assert draeger_20hz_healthy_volunteer != draeger_20hz_healthy_volunteer_fixed_rr, ( + "Different files should yield different data" + ) + + +def test_draeger_20hz_healthy_volunteer_and_fixed_rr( + draeger_20hz_healthy_volunteer: Sequence, + draeger_20hz_healthy_volunteer_fixed_rr: Sequence, + draeger_20hz_healthy_volunteer_and_fixed_rr: Sequence, +): + # Load multiple + assert len(draeger_20hz_healthy_volunteer_and_fixed_rr) == len(draeger_20hz_healthy_volunteer) + len( + draeger_20hz_healthy_volunteer_fixed_rr + ), "Combined data length should equal sum of individual lengths" + + +def test_draeger_sample_frequency_mismatch_warning(draeger_20hz_healthy_volunteer_path: Path): + with pytest.warns(RuntimeWarning, match="Provided sample frequency"): + _ = load_eit_data(draeger_20hz_healthy_volunteer_path, vendor="draeger", sample_frequency=25) + + +def test_estimate_sample_frequency_few_points(draeger_20hz_healthy_volunteer_path: Path): + with pytest.raises(ValueError, match="Could not estimate sample frequency from time axis"): + _ = load_eit_data(draeger_20hz_healthy_volunteer_path, vendor="draeger", max_frames=1) + + with pytest.warns(RuntimeWarning, match="Could not estimate sample frequency from time axis"): + _ = load_eit_data(draeger_20hz_healthy_volunteer_path, vendor="draeger", max_frames=1, sample_frequency=20) + + without_sf = load_eit_data(draeger_20hz_healthy_volunteer_path, vendor="draeger", max_frames=2) + with_sf = load_eit_data(draeger_20hz_healthy_volunteer_path, vendor="draeger", sample_frequency=20, max_frames=2) + assert without_sf == with_sf, "Loading without provided sample frequency should work with few data points" + + +def test_skipping_frames(draeger_20hz_healthy_volunteer: Sequence): + n_frames = len(draeger_20hz_healthy_volunteer) + + assert draeger_20hz_healthy_volunteer == load_eit_data( + draeger_20hz_healthy_volunteer.eit_data["raw"].path, vendor="draeger", first_frame=0 + ) + assert draeger_20hz_healthy_volunteer == load_eit_data( + draeger_20hz_healthy_volunteer.eit_data["raw"].path, vendor="draeger", max_frames=n_frames + ) + + short_sequence_1 = load_eit_data( + draeger_20hz_healthy_volunteer.eit_data["raw"].path, vendor="draeger", first_frame=n_frames - 2 + ) + assert len(short_sequence_1) == 2, "Loading from near end should yield 2 frames" + + short_sequence_2 = load_eit_data( + draeger_20hz_healthy_volunteer.eit_data["raw"].path, vendor="draeger", max_frames=2 + ) + assert len(short_sequence_2) == 2, "Loading with max_frames=2 should yield 2 frames" + + with pytest.warns( + RuntimeWarning, + match=r"The number of frames requested \(\d+\) is larger than the available number \(\d+\) of frames.", + ): + _ = load_eit_data( + draeger_20hz_healthy_volunteer.eit_data["raw"].path, vendor="draeger", max_frames=n_frames + 1 + ) + + with pytest.raises(ValueError, match="No frames to load with"): + _ = load_eit_data(draeger_20hz_healthy_volunteer.eit_data["raw"].path, vendor="draeger", max_frames=0) + + +def test_events(draeger_20hz_healthy_volunteer: Sequence): + events = draeger_20hz_healthy_volunteer.sparse_data["events_(draeger)"] + assert len(events) == 1, "There should be 1 event in the draeger_20hz_healthy_volunteer data" + assert events.values[0].text == "bed inflating", "Event text should match expected value" + + +def test_event_on_first_frame(draeger_20hz_healthy_volunteer: Sequence): + """Tests loading a sequence where there is an event on the first frame. + + There are two ways this can occur. The first is when the frame occurs later in the file, but we skip all frames + before the first frame with an event. This is tested by loading with first_frame set to the index of the first event + (`seq_first_frame_is_event_index`). The second way this can happen is if the data file has an event on the first + frame. We test this by creating a temporary copy of the data file, where the initial frames before the event file + are removed. + """ + event_index = np.searchsorted( + draeger_20hz_healthy_volunteer.time, + draeger_20hz_healthy_volunteer.sparse_data["events_(draeger)"].time[0], + ) + + seq_first_frame_is_event_index = load_eit_data( + draeger_20hz_healthy_volunteer.eit_data["raw"].path, vendor="draeger", first_frame=event_index + ) + assert ( + np.searchsorted( + seq_first_frame_is_event_index.time, + seq_first_frame_is_event_index.data["events_(draeger)"].time[0], + ) + == 0 + ), "The event should be on the first frame when loading from its timepoint." + + frame_size = _bin_file_formats["pressure_pod"]["frame_size"] + ignore_bytes = event_index * frame_size # number of bytes to ignore at start of file + + kwargs = {"delete_on_close": False} if sys.version_info >= (3, 12) else {} + with tempfile.NamedTemporaryFile(**kwargs) as temporary_file: + # Create a temporary file, that is removed after the context manager is closed + tempfile_path = Path(temporary_file.name) + with draeger_20hz_healthy_volunteer.eit_data["raw"].path.open("rb") as original_file: + original_file.seek(ignore_bytes) # skip frames before the event + temporary_file.write(original_file.read()) # write remaining data to temp file + + seq_trimmed_file = load_eit_data(tempfile_path, vendor="draeger") + + assert seq_first_frame_is_event_index == seq_trimmed_file, ( + "Loading from temp file should match loading from original file skipping frames." + ) diff --git a/tests/eitdata/test_loading_illegal.py b/tests/eitdata/test_loading_illegal.py new file mode 100644 index 000000000..36be446db --- /dev/null +++ b/tests/eitdata/test_loading_illegal.py @@ -0,0 +1,21 @@ +from pathlib import Path + +import pytest + +from eitprocessing.datahandling.loading import load_eit_data + + +def test_loading_illegal_path(): + for vendor in ["draeger", "timpel"]: + with pytest.raises(FileNotFoundError): + _ = load_eit_data("non-existing-path", vendor=vendor, sample_frequency=20) + + +# TODO: add timpel/sentec data +def test_loading_illegal_vendor(draeger_20hz_healthy_volunteer_path: Path): + with pytest.raises(OSError): + # wrong vendor for the file + _ = load_eit_data(draeger_20hz_healthy_volunteer_path, vendor="timpel") + + with pytest.raises(NotImplementedError): + _ = load_eit_data(draeger_20hz_healthy_volunteer_path, vendor="non-existing vendor") diff --git a/tests/eitdata/test_loading_partial.py b/tests/eitdata/test_loading_partial.py new file mode 100644 index 000000000..3dbbe71e9 --- /dev/null +++ b/tests/eitdata/test_loading_partial.py @@ -0,0 +1,50 @@ +from pathlib import Path + +import pytest + +from eitprocessing.datahandling.loading import load_eit_data +from eitprocessing.datahandling.sequence import Sequence + + +# TODO: add other vendors +# TODO: add dataset with events, and test loading from the frame at or just after the event +@pytest.mark.parametrize( + ("sequence_path", "sequence", "split_frame", "vendor", "sample_frequency"), + [ + ("draeger_20hz_healthy_volunteer_path", "draeger_20hz_healthy_volunteer", 100, "draeger", 20), + ("timpel_healthy_volunteer_1_path", "timpel_healthy_volunteer_1", 100, "timpel", None), + ], + indirect=["sequence_path", "sequence"], +) +def test_load_partial( + sequence_path: Path, + sequence: Sequence, + split_frame: int, + vendor: str, + sample_frequency: float, +): + sequence_part_1 = load_eit_data( + sequence_path, vendor=vendor, sample_frequency=sample_frequency, max_frames=split_frame, label="part 1" + ) + + sequence_part_2 = load_eit_data( + sequence_path, vendor=vendor, sample_frequency=sample_frequency, first_frame=split_frame, label="part 2" + ) + + assert len(sequence_part_1) == split_frame, "The first sequence should contain the specified number of frames" + assert len(sequence_part_2) == len(sequence) - split_frame, ( + "The second sequence should contains the remaining frames" + ) + assert len(sequence_part_1) + len(sequence_part_2) == len(sequence), ( + "The combined length should match the total length" + ) + + assert sequence_part_1 == sequence[:split_frame], "The first part should match the beginning of the full data" + + assert sequence_part_1.concatenate(sequence_part_2) == sequence, ( + "Concatenating both parts should reconstruct the full data" + ) + + # TODO: enable after fixing select_by_time issues + pytest.skip("Currently fails due to select_by_time issues") + assert sequence_part_2 == sequence[split_frame:], "The second part should match the end of the full data" diff --git a/tests/eitdata/test_loading_sentec.py b/tests/eitdata/test_loading_sentec.py new file mode 100644 index 000000000..73f8bb244 --- /dev/null +++ b/tests/eitdata/test_loading_sentec.py @@ -0,0 +1,151 @@ +from pathlib import Path + +import numpy as np +import pytest + +from eitprocessing.datahandling.eitdata import EITData +from eitprocessing.datahandling.loading import load_eit_data +from eitprocessing.datahandling.sequence import Sequence + + +@pytest.mark.parametrize( + "sequence", + [ + "sentec_healthy_volunteer_1a", + "sentec_healthy_volunteer_1b", + "sentec_healthy_volunteer_2a", + "sentec_healthy_volunteer_2b", + ], + indirect=["sequence"], +) +def test_load_sentec_single_file( + sequence: Sequence, +): + assert isinstance(sequence, Sequence) + assert "raw" in sequence.eit_data + + eit_data = sequence.eit_data["raw"] + + assert isinstance(eit_data, EITData) + assert np.isclose(eit_data.sample_frequency, 50.2, rtol=2e-2), "Sample frequency should be approximately 50.2 Hz" + assert len(eit_data.time) > 0, "Time axis should not be empty" + assert len(eit_data.time) == len(eit_data.pixel_impedance), "Length of time axis should match number of frames" + assert len(eit_data) == len(eit_data.pixel_impedance), "Length of EITData should match number of frames" + + assert len(sequence.continuous_data) == 0, "Sentec data should not have continuous data channels" + assert len(sequence.sparse_data) == 0, "Sentec data should not have sparse data channels" + assert len(sequence.interval_data) == 0, "Sentec data should not have interval data channels" + + assert sequence == load_eit_data(sequence.eit_data["raw"].path, vendor="sentec", label=sequence.label), ( + "Loading with same parameters should yield same data" + ) + + sequence_loaded_w_sample_freq = load_eit_data( + sequence.eit_data["raw"].path, vendor="sentec", sample_frequency=eit_data.sample_frequency, label=sequence.label + ) + assert ( + sequence.eit_data["raw"].sample_frequency == sequence_loaded_w_sample_freq.eit_data["raw"].sample_frequency + ), "When specifying the sample frequency, the final sample frequencies should match" + assert sequence == sequence_loaded_w_sample_freq, "Loading providing the sample frequency should yield same data" + + assert sequence == load_eit_data( + sequence.eit_data["raw"].path, + vendor="sentec", + sample_frequency=eit_data.sample_frequency, + label="something else", + ), "Loading with a different label should yield same data" + + +@pytest.mark.parametrize( + ("sequence_a_fixture_name", "sequence_b_fixture_name", "sequence_merge_fixture_name"), + [ + ( + "sentec_healthy_volunteer_1a", + "sentec_healthy_volunteer_1b", + "sentec_healthy_volunteer_1", + ), + ( + "sentec_healthy_volunteer_2a", + "sentec_healthy_volunteer_2b", + "sentec_healthy_volunteer_2", + ), + ], +) +def test_load_sentec_multiple_files( + sequence_a_fixture_name: str, + sequence_b_fixture_name: str, + sequence_merge_fixture_name: str, + request: pytest.FixtureRequest, +): + sequence_a = request.getfixturevalue(sequence_a_fixture_name) + sequence_b = request.getfixturevalue(sequence_b_fixture_name) + sequence_merged = request.getfixturevalue(sequence_merge_fixture_name) + + assert len(sequence_merged) == len(sequence_a) + len(sequence_b), ( + "Combined length of individual sequences should match merged sequence" + ) + assert sequence_merged == Sequence.concatenate(sequence_a, sequence_b), ( + "Merging individual sequences should equal pre-loaded merged sequence" + ) + + assert sequence_merged[: len(sequence_a)] == sequence_a, ( + "First part of merged sequence should match first individual sequence" + ) + assert sequence_merged[len(sequence_a) :] == sequence_b, ( + "Second part of merged sequence should match second individual sequence" + ) + + +def test_load_sentec_skip_frames(sentec_healthy_volunteer_1a: Sequence, sentec_healthy_volunteer_1a_path: Path): + n_frames = len(sentec_healthy_volunteer_1a) + + assert sentec_healthy_volunteer_1a == load_eit_data( + sentec_healthy_volunteer_1a_path, vendor="sentec", first_frame=0 + ) + assert sentec_healthy_volunteer_1a == load_eit_data( + sentec_healthy_volunteer_1a_path, vendor="sentec", max_frames=n_frames + ) + assert sentec_healthy_volunteer_1a == load_eit_data( + sentec_healthy_volunteer_1a_path, vendor="sentec", first_frame=0, max_frames=n_frames + ) + + first_frame = 100 + with pytest.warns(RuntimeWarning, match=r"The number of frames requested \(\d+\) is larger than"): + sequence_skip_first = load_eit_data( + sentec_healthy_volunteer_1a_path, + vendor="sentec", + first_frame=first_frame, + max_frames=n_frames, + ) + assert len(sequence_skip_first) == len(sentec_healthy_volunteer_1a) - first_frame, ( + "Loading from a later first_frame should yield fewer frames" + ) + assert sequence_skip_first == sentec_healthy_volunteer_1a[first_frame:], ( + "Loaded sequence skipping first frames should match slicing" + ) + + max_frames = n_frames - 100 + sequence_limited_frames = load_eit_data( + sentec_healthy_volunteer_1a_path, + vendor="sentec", + first_frame=0, + max_frames=max_frames, + ) + assert len(sequence_limited_frames) == max_frames, "Loading with max_frames should yield specified number of frames" + assert sequence_limited_frames == sentec_healthy_volunteer_1a[:max_frames], ( + "Loaded sequence with limited frames should match slicing" + ) + + sequence_single_frame = load_eit_data( + sentec_healthy_volunteer_1a_path, + vendor="sentec", + first_frame=n_frames - 1, + ) + assert len(sequence_single_frame) == 1 + + with pytest.raises(ValueError, match=r"`first_frame` \(\d+\) is larger than or equal to"): + _ = load_eit_data( + sentec_healthy_volunteer_1a_path, + vendor="sentec", + first_frame=n_frames, + ) diff --git a/tests/eitdata/test_loading_time_axis.py b/tests/eitdata/test_loading_time_axis.py new file mode 100644 index 000000000..8b2d49c5e --- /dev/null +++ b/tests/eitdata/test_loading_time_axis.py @@ -0,0 +1,13 @@ +import numpy as np +import pytest + + +# TODO: add other vendors +@pytest.mark.parametrize( + "sequence", + ["draeger_20hz_healthy_volunteer_fixed_rr", "draeger_50hz_healthy_volunteer_pressure_pod"], + indirect=True, +) +def test_time_axis(sequence: str): + time_diff = np.diff(sequence.time) + assert np.allclose(time_diff, time_diff.mean()) diff --git a/tests/eitdata/test_loading_timpel.py b/tests/eitdata/test_loading_timpel.py new file mode 100644 index 000000000..08da4d1bb --- /dev/null +++ b/tests/eitdata/test_loading_timpel.py @@ -0,0 +1,33 @@ +import pytest + +from eitprocessing.datahandling.eitdata import EITData, Vendor +from eitprocessing.datahandling.loading import load_eit_data +from eitprocessing.datahandling.sequence import Sequence + + +@pytest.mark.parametrize("sequence", ["timpel_healthy_volunteer_1", "timpel_healthy_volunteer_2"], indirect=True) +def test_loading_timpel( + sequence: Sequence, +): + assert isinstance(sequence, Sequence) + assert isinstance(sequence.eit_data["raw"], EITData) + assert sequence.eit_data["raw"].vendor == Vendor.TIMPEL + + for key in ["global_impedance_(raw)", "airway_pressure_(timpel)", "flow_(timpel)", "volume_(timpel)"]: + assert key in sequence.continuous_data, f"Missing continuous data key: {key}" + + for key in ["breaths_(timpel)"]: + assert key in sequence.interval_data, f"Missing interval data key: {key}" + + for key in ["minvalues_(timpel)", "maxvalues_(timpel)", "qrscomplexes_(timpel)"]: + assert key in sequence.sparse_data, f"Missing sparse data key: {key}" + + loaded_using_enum_vendor = load_eit_data(sequence.eit_data["raw"].path, vendor=Vendor.TIMPEL, label="timpel") + assert sequence == loaded_using_enum_vendor + + +def test_loading_timpel_multiple_files(): + # TODO: find out whether it is possible to have a single measurements split into multiple files + pytest.skip( + "Loading multiple Timpel files cannot be tested with unrelated data files, because the time axes will overlap." + ) diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/fixtures/eitdata.py b/tests/fixtures/eitdata.py new file mode 100644 index 000000000..f410f1246 --- /dev/null +++ b/tests/fixtures/eitdata.py @@ -0,0 +1,213 @@ +from pathlib import Path + +import pytest + +from eitprocessing.datahandling.loading import load_eit_data +from eitprocessing.datahandling.sequence import Sequence +from tests.conftest import data_directory + + +@pytest.fixture +def draeger_20hz_healthy_volunteer_path() -> Path: + return data_directory / "draeger_20Hz_healthy_volunteer.bin" + + +@pytest.fixture +def draeger_20hz_healthy_volunteer_fixed_rr_path() -> Path: + return data_directory / "draeger_20Hz_healthy_volunteer_fixed_rr.bin" + + +@pytest.fixture +def draeger_20hz_healthy_volunteer_pressure_pod_path() -> Path: + return data_directory / "draeger_20Hz_healthy_volunteer_pressure_pod.bin" + + +@pytest.fixture +def draeger_50hz_healthy_volunteer_pressure_pod_path() -> Path: + return data_directory / "draeger_50Hz_healthy_volunteer_pressure_pod.bin" + + +@pytest.fixture +def draeger_20hz_healthy_volunteer_time_wrap_v120_path() -> Path: + return data_directory / "draeger_20Hz_healthy_volunteer_time_wrap_v120.bin" + + +@pytest.fixture +def draeger_20hz_healthy_volunteer_time_wrap_v130_path() -> Path: + return data_directory / "draeger_20Hz_healthy_volunteer_time_wrap_v130.bin" + + +@pytest.fixture +def sentec_healthy_volunteer_1a_path() -> Path: + return data_directory / "sentec_healthy_volunteer_1a.zri" + + +@pytest.fixture +def sentec_healthy_volunteer_1b_path() -> Path: + return data_directory / "sentec_healthy_volunteer_1b.zri" + + +@pytest.fixture +def sentec_healthy_volunteer_2a_path() -> Path: + return data_directory / "sentec_healthy_volunteer_2a.zri" + + +@pytest.fixture +def sentec_healthy_volunteer_2b_path() -> Path: + return data_directory / "sentec_healthy_volunteer_2b.zri" + + +@pytest.fixture +def timpel_healthy_volunteer_1_path() -> Path: + return data_directory / "timpel_healthy_volunteer_1.txt" + + +@pytest.fixture +def timpel_healthy_volunteer_2_path() -> Path: + return data_directory / "timpel_healthy_volunteer_2.txt" + + +@pytest.fixture +def draeger_20hz_healthy_volunteer(draeger_20hz_healthy_volunteer_path: Path) -> Sequence: + return load_eit_data( + draeger_20hz_healthy_volunteer_path, + vendor="draeger", + sample_frequency=20, + label="draeger_20hz_healthy_volunteer", + ) + + +@pytest.fixture +def draeger_20hz_healthy_volunteer_fixed_rr(draeger_20hz_healthy_volunteer_fixed_rr_path: Path) -> Sequence: + return load_eit_data( + draeger_20hz_healthy_volunteer_fixed_rr_path, + vendor="draeger", + sample_frequency=20, + label="draeger_20hz_healthy_volunteer_fixed_rr", + ) + + +@pytest.fixture +def draeger_20hz_healthy_volunteer_pressure_pod(draeger_20hz_healthy_volunteer_pressure_pod_path: Path) -> Sequence: + return load_eit_data( + draeger_20hz_healthy_volunteer_pressure_pod_path, + vendor="draeger", + sample_frequency=20, + label="draeger_20hz_healthy_volunteer_pressure_pod", + ) + + +@pytest.fixture +def draeger_50hz_healthy_volunteer_pressure_pod(draeger_50hz_healthy_volunteer_pressure_pod_path: Path) -> Sequence: + return load_eit_data( + draeger_50hz_healthy_volunteer_pressure_pod_path, + vendor="draeger", + sample_frequency=50, + label="draeger_50hz_healthy_volunteer_pressure_pod", + ) + + +@pytest.fixture +def draeger_20hz_healthy_volunteer_and_fixed_rr( + draeger_20hz_healthy_volunteer_path: Path, draeger_20hz_healthy_volunteer_fixed_rr_path: Path +) -> Sequence: + return load_eit_data( + [draeger_20hz_healthy_volunteer_path, draeger_20hz_healthy_volunteer_fixed_rr_path], + vendor="draeger", + sample_frequency=20, + label="draeger_20hz_healthy_volunteer_and_fixed_rr", + ) + + +@pytest.fixture +def draeger_20hz_healthy_volunteer_time_wrap_v120(draeger_20hz_healthy_volunteer_time_wrap_v120_path: Path) -> Sequence: + return load_eit_data( + draeger_20hz_healthy_volunteer_time_wrap_v120_path, + vendor="draeger", + label="draeger_20hz_healthy_volunteer_time_wrap_v120", + ) + + +@pytest.fixture +def draeger_20hz_healthy_volunteer_time_wrap_v130(draeger_20hz_healthy_volunteer_time_wrap_v130_path: Path) -> Sequence: + return load_eit_data( + draeger_20hz_healthy_volunteer_time_wrap_v130_path, + vendor="draeger", + label="draeger_20hz_healthy_volunteer_time_wrap_v130", + ) + + +@pytest.fixture +def sentec_healthy_volunteer_1a(sentec_healthy_volunteer_1a_path: Path) -> Sequence: + return load_eit_data( + sentec_healthy_volunteer_1a_path, + vendor="sentec", + label="sentec_healthy_volunteer_1a", + ) + + +@pytest.fixture +def sentec_healthy_volunteer_1b(sentec_healthy_volunteer_1b_path: Path) -> Sequence: + return load_eit_data( + sentec_healthy_volunteer_1b_path, + vendor="sentec", + label="sentec_healthy_volunteer_1b", + ) + + +@pytest.fixture +def sentec_healthy_volunteer_1( + sentec_healthy_volunteer_1a_path: Path, sentec_healthy_volunteer_1b_path: Path +) -> Sequence: + return load_eit_data( + [sentec_healthy_volunteer_1a_path, sentec_healthy_volunteer_1b_path], + vendor="sentec", + label="sentec_healthy_volunteer_1", + ) + + +@pytest.fixture +def sentec_healthy_volunteer_2a(sentec_healthy_volunteer_2a_path: Path) -> Sequence: + return load_eit_data( + sentec_healthy_volunteer_2a_path, + vendor="sentec", + label="sentec_healthy_volunteer_2a", + ) + + +@pytest.fixture +def sentec_healthy_volunteer_2b(sentec_healthy_volunteer_2b_path: Path) -> Sequence: + return load_eit_data( + sentec_healthy_volunteer_2b_path, + vendor="sentec", + label="sentec_healthy_volunteer_2b", + ) + + +@pytest.fixture +def sentec_healthy_volunteer_2( + sentec_healthy_volunteer_2a_path: Path, sentec_healthy_volunteer_2b_path: Path +) -> Sequence: + return load_eit_data( + [sentec_healthy_volunteer_2a_path, sentec_healthy_volunteer_2b_path], + vendor="sentec", + label="sentec_healthy_volunteer_2", + ) + + +@pytest.fixture +def timpel_healthy_volunteer_1(timpel_healthy_volunteer_1_path: Path) -> Sequence: + return load_eit_data( + timpel_healthy_volunteer_1_path, + vendor="timpel", + label="timpel_healthy_volunteer_1", + ) + + +@pytest.fixture +def timpel_healthy_volunteer_2(timpel_healthy_volunteer_2_path: Path) -> Sequence: + return load_eit_data( + timpel_healthy_volunteer_2_path, + vendor="timpel", + label="timpel_healthy_volunteer_2", + ) diff --git a/tests/mixins/test_eq.py b/tests/mixins/test_eq.py index f4300b635..56c499075 100644 --- a/tests/mixins/test_eq.py +++ b/tests/mixins/test_eq.py @@ -2,54 +2,18 @@ import pytest # TODO: noqa: F401 (needed for fixtures) once the pytest.skip is removed -from eitprocessing.datahandling.loading import load_eit_data from eitprocessing.datahandling.sequence import Sequence -from tests.conftest import draeger_file1 -def test_eq(): - data = load_eit_data(draeger_file1, vendor="draeger", sample_frequency=20) - data2 = load_eit_data(draeger_file1, vendor="draeger", sample_frequency=20) +# TODO: add other vendors' dataset +@pytest.mark.parametrize( + "sequence", + ["draeger_20hz_healthy_volunteer", "draeger_20hz_healthy_volunteer_pressure_pod"], + indirect=True, +) +def test_copy(sequence: Sequence): + sequence_copy = deepcopy(sequence) + assert sequence == sequence_copy - data.isequivalent(data2) - -def test_copy( - draeger1: Sequence, - timpel1: Sequence, -): - data: Sequence - for data in [draeger1, timpel1]: - data_copy = deepcopy(data) - assert data == data_copy - - -def test_equals( - draeger1: Sequence, - timpel1: Sequence, -): - pytest.skip("Will add tests to check that correct attributes do and don't lead to failed equality.") - # Here we should add tests ensuring that changes that shouldn't lead to failed equality indeed don't - # and vice versa. - # The current tests are outdated versions of this - data: Sequence - for data in [draeger1, timpel1]: - data_copy = Sequence() - data_copy.path = deepcopy(data.path) - data_copy.time = deepcopy(data.time) - data_copy.nframes = deepcopy(data.nframes) - data_copy.sample_frequency = deepcopy(data.sample_frequency) - data_copy.framesets = deepcopy(data.framesets) - data_copy.timing_errors = deepcopy(data.timing_errors) - data_copy.vendor = deepcopy(data.vendor) - - assert data_copy == data - - # test wheter a difference in framesets fails equality test - data_copy.framesets["test"] = data_copy.framesets["raw"].deepcopy() - assert data != data_copy - data_copy.framesets = deepcopy(data.framesets) - - data_copy.framesets["raw"].name += "_" - assert data != data_copy - data_copy.framesets = deepcopy(data.framesets) +# TODO: add tests for specific items in sequences diff --git a/tests/mixins/test_slicing.py b/tests/mixins/test_slicing.py index daacf3034..e32e0a3dd 100644 --- a/tests/mixins/test_slicing.py +++ b/tests/mixins/test_slicing.py @@ -1,84 +1,82 @@ -from copy import deepcopy - import pytest -from eitprocessing.datahandling.eitdata import Vendor +from eitprocessing.datahandling.loading import load_eit_data from eitprocessing.datahandling.sequence import Sequence -# ruff: noqa: ERA001 #TODO: remove this line - -def test_slicing(draeger1: Sequence, timpel1: Sequence): +@pytest.mark.parametrize( + "sequence", + ["draeger_20hz_healthy_volunteer", "draeger_20hz_healthy_volunteer_pressure_pod"], + indirect=True, +) +def test_slicing(sequence: Sequence): cutoff = 100 + assert len(sequence) > cutoff, "Test sequence should be longer than cutoff for meaningful test" - for seq in [draeger1, timpel1]: - assert seq[0:cutoff] == seq[:cutoff] - assert seq[cutoff : len(seq)] == seq[cutoff:] - - assert len(seq[:cutoff]) == cutoff - assert len(seq) == len(seq[cutoff:]) + len(seq[-cutoff:]) - assert len(seq) == len(seq[:cutoff]) + len(seq[:-cutoff]) + assert sequence[0:cutoff] == sequence[:cutoff] + assert sequence[cutoff : len(sequence)] == sequence[cutoff:] - # concatenated = Sequence.concatenate(seq[:cutoff], seq[cutoff:]) - # concatenated.eit_data["raw"].path = seq.eit_data["raw"].path # what's this doing?? - # assert concatenated == seq + assert len(sequence[:cutoff]) == cutoff + assert len(sequence) == len(sequence[cutoff:]) + len(sequence[-cutoff:]) + assert len(sequence) == len(sequence[:cutoff]) + len(sequence[:-cutoff]) -def test_select_by_time(draeger2: Sequence): +@pytest.mark.parametrize("sequence", ["timpel_healthy_volunteer_1", "draeger_20hz_healthy_volunteer"], indirect=True) +def test_select_by_time(sequence: Sequence): pytest.skip("selecting by time not finalized yet") # test illegal with pytest.warns(UserWarning, match="No starting or end timepoints was selected"): - _ = draeger2.select_by_time() + _ = sequence.select_by_time() with pytest.warns(UserWarning, match="No starting or end timepoints was selected"): - _ = draeger2.select_by_time(None, None) + _ = sequence.select_by_time(None, None) with pytest.warns(UserWarning, match="No starting or end timepoints was selected"): - _ = draeger2.select_by_time(None) + _ = sequence.select_by_time(None) with pytest.warns(UserWarning, match="No starting or end timepoints was selected"): - _ = draeger2.select_by_time(end_time=None) + _ = sequence.select_by_time(end_time=None) # TODO (#82): this function is kinda ugly. Would be nice to refactor it # but I am struggling to think of a logical way to loop through. ms = 1 / 1000 # test start_time only - full_length = len(draeger2) + full_length = len(sequence) start_slices = [ # (slice time, expected missing slices if inclusive=True, expected missing slices if inclusive=False) - (draeger2.time[22], 22, 22), - (draeger2.time[22] - ms, 21, 22), - (draeger2.time[22] + ms, 22, 23), + (sequence.time[22], 22, 22), + (sequence.time[22] - ms, 21, 22), + (sequence.time[22] + ms, 22, 23), ] for test_settings in start_slices: print(test_settings) # noqa: T201 - sliced_inc = draeger2.select_by_time(start_time=test_settings[0], start_inclusive=True) + sliced_inc = sequence.select_by_time(start_time=test_settings[0], start_inclusive=True) assert len(sliced_inc) == full_length - test_settings[1] - sliced_exc = draeger2.select_by_time(start_time=test_settings[0], start_inclusive=False) - assert len(sliced_exc) == len(draeger2) - test_settings[2] + sliced_exc = sequence.select_by_time(start_time=test_settings[0], start_inclusive=False) + assert len(sliced_exc) == len(sequence) - test_settings[2] # test default: - assert draeger2.select_by_time(start_time=test_settings[0]) == sliced_inc + assert sequence.select_by_time(start_time=test_settings[0]) == sliced_inc # test end_time only end_slices = [ # (slice time, expected length if inclusive=True, expected length if inclusive=False) - (draeger2.time[52], 52, 52), - (draeger2.time[52] - ms, 51, 52), - (draeger2.time[52] + ms, 52, 53), + (sequence.time[52], 52, 52), + (sequence.time[52] - ms, 51, 52), + (sequence.time[52] + ms, 52, 53), ] for test_settings in end_slices: print(test_settings) # noqa: T201 - sliced_inc = draeger2.select_by_time(end_time=test_settings[0], end_inclusive=True) + sliced_inc = sequence.select_by_time(end_time=test_settings[0], end_inclusive=True) assert len(sliced_inc) == test_settings[1] - sliced_exc = draeger2.select_by_time(end_time=test_settings[0], end_inclusive=False) + sliced_exc = sequence.select_by_time(end_time=test_settings[0], end_inclusive=False) assert len(sliced_exc) == test_settings[2] # test default: - assert draeger2.select_by_time(end_time=test_settings[0]) == sliced_exc + assert sequence.select_by_time(end_time=test_settings[0]) == sliced_exc # test start_time and end_time for start_slicing in start_slices: for end_slicing in end_slices: # True/True - sliced = draeger2.select_by_time( + sliced = sequence.select_by_time( start_time=start_slicing[0], end_time=end_slicing[0], start_inclusive=True, @@ -87,7 +85,7 @@ def test_select_by_time(draeger2: Sequence): assert len(sliced) == end_slicing[1] - start_slicing[1] # False/True - sliced = draeger2.select_by_time( + sliced = sequence.select_by_time( start_time=start_slicing[0], end_time=end_slicing[0], start_inclusive=False, @@ -96,7 +94,7 @@ def test_select_by_time(draeger2: Sequence): assert len(sliced) == end_slicing[1] - start_slicing[2] # True/False - sliced = draeger2.select_by_time( + sliced = sequence.select_by_time( start_time=start_slicing[0], end_time=end_slicing[0], start_inclusive=True, @@ -105,7 +103,7 @@ def test_select_by_time(draeger2: Sequence): assert len(sliced) == end_slicing[2] - start_slicing[1] # False/False - sliced = draeger2.select_by_time( + sliced = sequence.select_by_time( start_time=start_slicing[0], end_time=end_slicing[0], start_inclusive=False, @@ -115,52 +113,86 @@ def test_select_by_time(draeger2: Sequence): def test_concatenate( - draeger1: Sequence, - draeger2: Sequence, - draeger_both: Sequence, - timpel1: Sequence, - # timpel_double: Sequence, + draeger_20hz_healthy_volunteer: Sequence, + draeger_20hz_healthy_volunteer_fixed_rr: Sequence, + draeger_20hz_healthy_volunteer_and_fixed_rr: Sequence, +): + """Tests concatenation against a sequence that is loaded from two files.""" + merged_sequence = Sequence.concatenate(draeger_20hz_healthy_volunteer, draeger_20hz_healthy_volunteer_fixed_rr) + + assert len(merged_sequence.eit_data["raw"]) == len(draeger_20hz_healthy_volunteer.eit_data["raw"]) + len( + draeger_20hz_healthy_volunteer_fixed_rr.eit_data["raw"], + ), "Length of concatenated sequence should equal sum of individual lengths." + assert merged_sequence == draeger_20hz_healthy_volunteer_and_fixed_rr, ( + "Concatenated sequence should equal pre-loaded combined sequence." + ) + + added_sequence = draeger_20hz_healthy_volunteer + draeger_20hz_healthy_volunteer_fixed_rr + assert added_sequence == merged_sequence, "Adding two sequences should be equivalent to concatenation." + + +def test_concatenate_three( + draeger_20hz_healthy_volunteer: Sequence, + draeger_20hz_healthy_volunteer_fixed_rr: Sequence, + draeger_20hz_healthy_volunteer_pressure_pod: Sequence, ): - merged_draeger = Sequence.concatenate(draeger2, draeger1) - assert len(merged_draeger.eit_data["raw"]) == len(draeger2.eit_data["raw"]) + len( - draeger1.eit_data["raw"], + merged_sequence_1 = Sequence.concatenate(draeger_20hz_healthy_volunteer, draeger_20hz_healthy_volunteer_fixed_rr) + merged_sequence_2 = Sequence.concatenate(merged_sequence_1, draeger_20hz_healthy_volunteer_pressure_pod) + + paths = [ + sequence.eit_data["raw"].path + for sequence in [ + draeger_20hz_healthy_volunteer, + draeger_20hz_healthy_volunteer_fixed_rr, + draeger_20hz_healthy_volunteer_pressure_pod, + ] + ] + loaded_sequence = load_eit_data(paths, vendor="draeger") + + added_sequence = ( + draeger_20hz_healthy_volunteer + + draeger_20hz_healthy_volunteer_fixed_rr + + draeger_20hz_healthy_volunteer_pressure_pod ) - assert merged_draeger == draeger_both - added_draeger = draeger2 + draeger1 - assert added_draeger == merged_draeger + assert merged_sequence_2 == loaded_sequence, "Chained concatenation should equal files loaded together." + assert merged_sequence_2 == added_sequence, "Chained addition should equal chained concatenation." + + +def test_merging_timing_order( + draeger_20hz_healthy_volunteer_fixed_rr: Sequence, draeger_20hz_healthy_volunteer: Sequence +): + with pytest.raises( + ValueError, match=r"Concatenation failed. Second dataset \(.+\) may not start before first \(.+\) ends\." + ): + _ = Sequence.concatenate(draeger_20hz_healthy_volunteer_fixed_rr, draeger_20hz_healthy_volunteer) + + +@pytest.mark.parametrize("sequence", ["timpel_healthy_volunteer_1"], indirect=True) +def test_concatenate_slicing(sequence: Sequence): # slice and concatenate cutoff_point = 100 - part1 = timpel1[:cutoff_point] - part2 = timpel1[cutoff_point:] - assert timpel1 == Sequence.concatenate(part1, part2) + part1 = sequence[:cutoff_point] + part2 = sequence[cutoff_point:] + assert sequence == Sequence.concatenate(part1, part2) - # TODO: add tests for: + # TODO: add tests for # - concatenating a third Sequence on top (or two double-sequences), also checking that path attribute is flat list # - as above, but for timpel and sentec -def test_illegal_concatenation(timpel1: Sequence, draeger1: Sequence, draeger2: Sequence): - # Concatenate wrong order - _ = Sequence.concatenate(draeger2, draeger1) - with pytest.raises(ValueError): - _ = Sequence.concatenate(draeger1, draeger2) - +def test_concatenate_different_vendors( + timpel_healthy_volunteer_1: Sequence, + draeger_20hz_healthy_volunteer: Sequence, +): # Concatenate different vendors with pytest.raises(TypeError): - _ = Sequence.concatenate(timpel1, draeger1) + _ = Sequence.concatenate(timpel_healthy_volunteer_1, draeger_20hz_healthy_volunteer) - # Concatenate different sample_frequency (for EITData) - draeger1_sample_frequency = deepcopy(draeger1) - _ = Sequence.concatenate(draeger2, draeger1_sample_frequency) - draeger1_sample_frequency.eit_data["raw"].sample_frequency = 50 - with pytest.raises(ValueError): - _ = Sequence.concatenate(draeger2, draeger1_sample_frequency) - # Not sure what this one is testing exactly. - # My guess is that adjusting the vendor of an EIData instance should not be allowed once it has been instantiated. - draeger1_vendor = deepcopy(draeger1) - draeger1_vendor.eit_data["raw"].vendor = Vendor.TIMPEL +def test_concatenate_different_sample_frequency( + draeger_20hz_healthy_volunteer: Sequence, + draeger_50hz_healthy_volunteer_pressure_pod: Sequence, +): with pytest.raises(ValueError): - # TODO (#77): update this to AttributeError, once equivalence check for framesets is implemented. - _ = Sequence.concatenate(draeger1, timpel1) + _ = Sequence.concatenate(draeger_20hz_healthy_volunteer, draeger_50hz_healthy_volunteer_pressure_pod) diff --git a/tests/test_amplitude_lungspace.py b/tests/test_amplitude_lungspace.py index 046682d14..6032ae428 100644 --- a/tests/test_amplitude_lungspace.py +++ b/tests/test_amplitude_lungspace.py @@ -99,6 +99,6 @@ def test_tiv_lungspace_with_timing_data(create_signal: Callable): _ = AmplitudeLungspace(threshold=0.2).apply(signal, timing_data=timing_data) -def test_tiv_lungpsace_with_real_data(draeger1: Sequence): - eit_data = draeger1.eit_data["raw"] +def test_tiv_lungpsace_with_real_data(draeger_20hz_healthy_volunteer_pressure_pod: Sequence): + eit_data = draeger_20hz_healthy_volunteer_pressure_pod.eit_data["raw"] _ = AmplitudeLungspace(threshold=0.2).apply(eit_data) diff --git a/tests/test_breath_detection.py b/tests/test_breath_detection.py index 191ae697a..248722147 100644 --- a/tests/test_breath_detection.py +++ b/tests/test_breath_detection.py @@ -1,6 +1,5 @@ import copy -import os -from pathlib import Path +import warnings from typing import Any import numpy as np @@ -12,17 +11,6 @@ from eitprocessing.datahandling.sequence import Sequence from eitprocessing.features.breath_detection import BreathDetection -environment = Path( - os.environ.get( - "EIT_PROCESSING_TEST_DATA", - Path(__file__).parent.parent.resolve(), - ), -) -data_directory = environment / "tests" / "test_data" -draeger_file1 = data_directory / "Draeger_Test3.bin" -draeger_file2 = data_directory / "Draeger_Test.bin" -timpel_file = data_directory / "Timpel_Test.txt" - def _make_cosine_wave(sample_frequency: float, length: int, frequency: float) -> tuple[np.ndarray, np.ndarray]: """Generate a cosine wave with the given parameters and amplitude 1. @@ -296,61 +284,71 @@ def test_pass_invalid(obj: Any): # noqa: ANN401 bd.find_breaths(obj) -def test_pass_continuousdata(draeger1: Sequence): - draeger1 = copy.deepcopy(draeger1) # prevents writing results to original file - cd = draeger1.continuous_data["global_impedance_(raw)"] +def test_pass_continuousdata(draeger_20hz_healthy_volunteer_pressure_pod: Sequence): + sequence = copy.deepcopy(draeger_20hz_healthy_volunteer_pressure_pod) # prevents writing results to original file + cd = sequence.continuous_data["global_impedance_(raw)"] bd = BreathDetection() breaths_container = bd.find_breaths(cd) assert isinstance(breaths_container, IntervalData) # results are not stored - assert "breaths" not in draeger1.interval_data + assert "breaths" not in sequence.interval_data - bd.find_breaths(cd, sequence=draeger1) + bd.find_breaths(cd, sequence=sequence) # results are now stored - assert "breaths" in draeger1.interval_data - assert draeger1.interval_data["breaths"] == breaths_container - assert draeger1.interval_data["breaths"] is not breaths_container + assert "breaths" in sequence.interval_data + assert sequence.interval_data["breaths"] == breaths_container + assert sequence.interval_data["breaths"] is not breaths_container -def test_with_data(draeger1: Sequence, draeger2: Sequence, timpel1: Sequence, pytestconfig: pytest.Config): +@pytest.mark.parametrize( + ("sequence", "slice_"), + [ + ("draeger_20hz_healthy_volunteer_pressure_pod", slice(None)), + ("draeger_50hz_healthy_volunteer_pressure_pod", slice(None)), + ("timpel_healthy_volunteer_1", slice(421, 495)), + ], + indirect=["sequence"], +) +def test_with_data(sequence: Sequence, slice_: slice, pytestconfig: pytest.Config): if pytestconfig.getoption("--cov"): pytest.skip("Skip with option '--cov' so other tests can cover 100%.") - draeger1 = copy.deepcopy(draeger1) - draeger2 = copy.deepcopy(draeger2) - timpel1 = copy.deepcopy(timpel1) - for sequence in draeger1, draeger2, timpel1: - bd = BreathDetection() + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=UserWarning, message="No starting or end timepoint was selected.") + sequence = sequence.t[slice_] - cd = sequence.continuous_data["global_impedance_(raw)"] - breaths = bd.find_breaths(cd) + sequence = copy.deepcopy(sequence) # prevents writing results to original file + bd = BreathDetection() - for breath in breaths.values: - # Test whether the indices are in the proper order within a breath - assert breath.start_time < breath.middle_time < breath.end_time + cd = sequence.continuous_data["global_impedance_(raw)"] + breaths = bd.find_breaths(cd) + + for breath in breaths.values: + # Test whether the indices are in the proper order within a breath + assert breath.start_time < breath.middle_time < breath.end_time - # Test whether the peak values are larger than valley values - assert cd.t[breath.middle_time].values[0] > cd.t[breath.start_time].values[0] - assert cd.t[breath.middle_time].values[0] > cd.t[breath.end_time].values[0] + # Test whether the peak values are larger than valley values + assert cd.t[breath.middle_time].values[0] > cd.t[breath.start_time].values[0] + assert cd.t[breath.middle_time].values[0] > cd.t[breath.end_time].values[0] - start_indices, middle_indices, end_indices = (list(x) for x in zip(*breaths.values, strict=True)) + start_indices, middle_indices, end_indices = (list(x) for x in zip(*breaths.values, strict=True)) - # Test whether breaths are sorted properly - assert start_indices == sorted(start_indices) - assert middle_indices == sorted(middle_indices) - assert end_indices == sorted(end_indices) + # Test whether breaths are sorted properly + assert start_indices == sorted(start_indices) + assert middle_indices == sorted(middle_indices) + assert end_indices == sorted(end_indices) - # Test whether indices are unique. `set` removes non-unique values, - # `sorted(list(...))` converts the set to a sorted list again. - assert list(start_indices) == sorted(set(start_indices)) - assert list(middle_indices) == sorted(set(middle_indices)) - assert list(end_indices) == sorted(set(end_indices)) + # Test whether indices are unique. `set` removes non-unique values, + # `sorted(list(...))` converts the set to a sorted list again. + assert list(start_indices) == sorted(set(start_indices)) + assert list(middle_indices) == sorted(set(middle_indices)) + assert list(end_indices) == sorted(set(end_indices)) - # Test whether the start of the next breath is on/after the previous breath - assert all( - start_index >= end_index for start_index, end_index in zip(start_indices[1:], end_indices[:-1], strict=True) - ) + # Test whether the start of the next breath is on/after the previous breath + assert all( + start_index >= end_index for start_index, end_index in zip(start_indices[1:], end_indices[:-1], strict=True) + ) def test_create_breaths_from_peak_valley_data(): @@ -485,7 +483,7 @@ def test_find_breaths(): cd = ContinuousData( label, "Generated waveform data", - None, + "", "mock", "", time=time, @@ -518,7 +516,7 @@ def test_find_breaths(): cd = ContinuousData( label, "Generated waveform data", - None, + "", "mock", "", time=time, diff --git a/tests/test_eeli.py b/tests/test_eeli.py index e05aaf79c..c837685fa 100644 --- a/tests/test_eeli.py +++ b/tests/test_eeli.py @@ -105,11 +105,11 @@ def test_bd_init(): EELI(breath_detection_kwargs={"minimum_duration": 0}, breath_detection=BreathDetection(minimum_duration=0)) -def test_with_data(draeger1: Sequence, pytestconfig: pytest.Config): +def test_with_data(draeger_20hz_healthy_volunteer_pressure_pod: Sequence, pytestconfig: pytest.Config): if pytestconfig.getoption("--cov"): pytest.skip("Skip with option '--cov' so other tests can cover 100%.") - cd = draeger1.continuous_data["global_impedance_(raw)"] + cd = draeger_20hz_healthy_volunteer_pressure_pod.continuous_data["global_impedance_(raw)"] eeli_values = EELI().compute_parameter(cd).values breaths = BreathDetection().find_breaths(cd) @@ -117,14 +117,14 @@ def test_with_data(draeger1: Sequence, pytestconfig: pytest.Config): assert len(eeli_values) == len(breaths) -def test_non_impedance_data(draeger1: Sequence) -> None: - cd = draeger1.continuous_data["global_impedance_(raw)"] +def test_non_impedance_data(draeger_20hz_healthy_volunteer_pressure_pod: Sequence) -> None: + cd = draeger_20hz_healthy_volunteer_pressure_pod.continuous_data["global_impedance_(raw)"] original_category = cd.category _ = EELI().compute_parameter(cd) cd.category = "foo" - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="This method will only work on 'impedance' data, not 'foo'."): _ = EELI().compute_parameter(cd) cd.category = original_category diff --git a/tests/test_labels.py b/tests/test_labels.py index 1b3ec2ca8..c3ee3ffd1 100644 --- a/tests/test_labels.py +++ b/tests/test_labels.py @@ -1,53 +1,73 @@ +from pathlib import Path + import pytest # TODO: noqa: F401 (needed for fixtures) once the pytest.skip is removed from eitprocessing.datahandling.loading import load_eit_data from eitprocessing.datahandling.sequence import Sequence -from tests.conftest import draeger_file1, timpel_file -def test_default_label(draeger1: Sequence): - draeger_default = load_eit_data(draeger_file1, vendor="draeger", sample_frequency=20) +def test_default_label( + draeger_20hz_healthy_volunteer_path: Path, + draeger_20hz_healthy_volunteer: Sequence, + timpel_healthy_volunteer_1_path: Path, +): + draeger_default = load_eit_data(draeger_20hz_healthy_volunteer_path, vendor="draeger", sample_frequency=20) assert isinstance(draeger_default.label, str) assert draeger_default.label == f"Sequence_{id(draeger_default)}" - timpel_default = load_eit_data(timpel_file, vendor="timpel") + timpel_default = load_eit_data(timpel_healthy_volunteer_1_path, vendor="timpel") assert isinstance(timpel_default.label, str) assert timpel_default.label == f"Sequence_{id(timpel_default)}" # test that default label changes upon reloading identical data - draeger_reloaded = load_eit_data(draeger_file1, vendor="draeger", sample_frequency=20) + draeger_reloaded = load_eit_data(draeger_20hz_healthy_volunteer_path, vendor="draeger", sample_frequency=20) assert draeger_default == draeger_reloaded assert draeger_default.label != draeger_reloaded.label - assert draeger_default.label != draeger1.label + assert draeger_default.label != draeger_20hz_healthy_volunteer.label -def test_relabeling(timpel1: Sequence, draeger2: Sequence, draeger1: Sequence): +def test_relabeling( + timpel_healthy_volunteer_1: Sequence, + draeger_20hz_healthy_volunteer_fixed_rr: Sequence, + draeger_20hz_healthy_volunteer: Sequence, +): pytest.skip("changing labels is currently bugging") # merging - merged = Sequence.concatenate(draeger2, draeger1) - assert merged.label != draeger1.label - assert merged.label != draeger2.label - assert merged.label == f"Merge of <{draeger2.label}> and <{draeger1.label}>" + merged = Sequence.concatenate(draeger_20hz_healthy_volunteer_fixed_rr, draeger_20hz_healthy_volunteer) + assert merged.label != draeger_20hz_healthy_volunteer.label + assert merged.label != draeger_20hz_healthy_volunteer_fixed_rr.label + assert ( + merged.label + == f"Merge of <{draeger_20hz_healthy_volunteer_fixed_rr.label}> and <{draeger_20hz_healthy_volunteer.label}>" + ) # slicing indices = slice(3, 12) - sliced_timpel = timpel1[indices] - assert sliced_timpel.label != timpel1.label - assert sliced_timpel.label == f"Slice ({indices.start}-{indices.stop}] of <{timpel1.label}>" + sliced_timpel = timpel_healthy_volunteer_1[indices] + assert sliced_timpel.label != timpel_healthy_volunteer_1.label + assert sliced_timpel.label == f"Slice ({indices.start}-{indices.stop}] of <{timpel_healthy_volunteer_1.label}>" # custom new label:) test_label = "test label" - merged = Sequence.concatenate(draeger2, draeger1, newlabel=test_label) + merged = Sequence.concatenate( + draeger_20hz_healthy_volunteer_fixed_rr, draeger_20hz_healthy_volunteer, newlabel=test_label + ) assert merged.label == test_label - sliced_timpel = Sequence.select_by_index(timpel1, start=indices.start, end=indices.stop, newlabel=test_label) + sliced_timpel = Sequence.select_by_index( + timpel_healthy_volunteer_1, start=indices.start, end=indices.stop, newlabel=test_label + ) assert sliced_timpel.label == test_label # selecting by time pytest.skip("selecting by time not finalized yet") t22 = 55825.268 t52 = 55826.768 - time_sliced = draeger2.select_by_time(t22, t52 + 0.001) - assert time_sliced.label != draeger2.label, "time slicing does not assign new label by default" - assert time_sliced.label == f"Slice (22-52) of <{draeger2.label}>", "slicing generates unexpected default label" - time_sliced_2 = draeger2.select_by_time(t22, t52, label=test_label) + time_sliced = draeger_20hz_healthy_volunteer_fixed_rr.select_by_time(t22, t52 + 0.001) + assert time_sliced.label != draeger_20hz_healthy_volunteer_fixed_rr.label, ( + "time slicing does not assign new label by default" + ) + assert time_sliced.label == f"Slice (22-52) of <{draeger_20hz_healthy_volunteer_fixed_rr.label}>", ( + "slicing generates unexpected default label" + ) + time_sliced_2 = draeger_20hz_healthy_volunteer_fixed_rr.select_by_time(t22, t52, label=test_label) assert time_sliced_2.label == test_label, "incorrect label assigned when time slicing data with new label" diff --git a/tests/test_loading.py b/tests/test_loading.py deleted file mode 100644 index 27cf65326..000000000 --- a/tests/test_loading.py +++ /dev/null @@ -1,165 +0,0 @@ -import numpy as np -import pytest - -from eitprocessing.datahandling.eitdata import EITData, Vendor -from eitprocessing.datahandling.loading import load_eit_data -from eitprocessing.datahandling.sequence import Sequence -from tests.conftest import ( - draeger_file1, - draeger_file2, - draeger_file3, - dummy_file, - timpel_file, -) - -# ruff: noqa: ERA001 #TODO: remove this line - - -def test_loading_draeger( - draeger1: Sequence, - draeger2: Sequence, - draeger_both: Sequence, - draeger_pp: Sequence, -): - assert isinstance(draeger1, Sequence) - assert isinstance(draeger1.eit_data["raw"], EITData) - assert draeger1.eit_data["raw"].sample_frequency == 20 - assert len(draeger1.eit_data["raw"]) == len(draeger1.eit_data["raw"].time) - assert len(draeger2.eit_data["raw"].time) == 20740 - - assert draeger1 == load_eit_data(draeger_file1, vendor="draeger", sample_frequency=20, label="draeger1") - assert draeger1 == load_eit_data(draeger_file1, vendor="draeger", sample_frequency=20, label="something_else") - assert draeger1 != draeger2 - - # Load multiple - assert len(draeger_both.eit_data["raw"]) == len(draeger1.eit_data["raw"]) + len( - draeger2.eit_data["raw"], - ) - - # draeger data with pressure pod data has 10 continuous medibus fields, 'normal' only 6 - assert len(draeger_pp.continuous_data) == 10 + 1 - assert len(draeger1.continuous_data) == 6 + 1 - - # test below not possible due to requirement of axis 1 ending before axis b starts - # draeger_inverted = load_eit_data([draeger_file1, draeger_file2], vendor="draeger", label="inverted") - # assert len(draeger_both) == len(draeger_inverted) - # assert draeger_both != draeger_inverted - - -def test_sample_frequency_draeger(): - with_sf = load_eit_data(draeger_file1, vendor="draeger", sample_frequency=20) - without_sf = load_eit_data(draeger_file1, vendor="draeger") - assert with_sf.eit_data["raw"].sample_frequency == without_sf.eit_data["raw"].sample_frequency - - with pytest.warns(RuntimeWarning): - _ = load_eit_data(draeger_file1, vendor="draeger", sample_frequency=50) - - -def test_loading_timpel( - draeger1: Sequence, - timpel1: Sequence, - # timpel_double: Sequence, # does not currently work, because it won't load due to the time axes overlapping -): - using_vendor = load_eit_data(timpel_file, vendor=Vendor.TIMPEL, label="timpel") - assert timpel1 == using_vendor - assert isinstance(timpel1, Sequence) - assert isinstance(timpel1.eit_data["raw"], EITData) - assert timpel1.eit_data["raw"].vendor != draeger1.eit_data["raw"].vendor - - # Load multiple - # assert isinstance(timpel_double, Sequence) - # assert len(timpel_double) == 2 * len(timpel1) - - -def test_loading_illegal(): - # non existing - for vendor in ["draeger", "timpel"]: - with pytest.raises(FileNotFoundError): - _ = load_eit_data(dummy_file, vendor=vendor, sample_frequency=20) - - # incorrect vendor - with pytest.raises(OSError): - _ = load_eit_data(draeger_file1, vendor="timpel") - with pytest.raises(OSError): - _ = load_eit_data(timpel_file, vendor="draeger", sample_frequency=20) - - -def test_load_partial( - draeger2: Sequence, - timpel1: Sequence, -): - cutoff = 58 - # Keep cutoff at 58 for draeger2 as there is an event mark at this - # timepoint. Starting the load specifically at the timepoint of an event - # marker was tricky to implement, so keeping this cutoff will ensure that - # code keeps working for this fringe situation. - - # TODO (#81): test what happens if a file has an actual event marker on the very - # first frame. I suspect this will not be loaded, but I don't have a test - # file for this situation. - - # Timpel - timpel_part1 = load_eit_data(timpel_file, vendor="timpel", max_frames=cutoff, label="timpel_part_1") - timpel_part2 = load_eit_data(timpel_file, vendor="timpel", first_frame=cutoff, label="timpel_part2") - - assert len(timpel_part1) == cutoff - assert len(timpel_part2) == len(timpel1) - cutoff - assert timpel_part1 == timpel1[:cutoff] - assert timpel_part2 == timpel1[cutoff:] - assert Sequence.concatenate(timpel_part1, timpel_part2) == timpel1 - # assert Sequence.concatenate(timpel_part2, timpel_part1) != timpel1 - - # Draeger - draeger2_part1 = load_eit_data( - draeger_file2, - vendor="draeger", - sample_frequency=20, - max_frames=cutoff, - label="draeger_part_1", - ) - draeger2_part2 = load_eit_data( - draeger_file2, - vendor="draeger", - sample_frequency=20, - first_frame=cutoff, - label="draeger_part_2", - ) - - assert len(draeger2_part1) == cutoff - assert len(draeger2_part2) == len(draeger2) - cutoff - assert draeger2_part1 == draeger2[:cutoff] - pytest.skip( - "Tests below rely on proper functioning of select_by_time, " - "which should be refactored before fixing these tests", - ) - assert draeger2_part2 == draeger2[cutoff:] - assert Sequence.concatenate(draeger2_part1, draeger2_part2) == draeger2 - # assert Sequence.concatenate(draeger2_part2, draeger2_part1) != draeger2 - - -def test_illegal_first_frame(): - for ff in [0.5, -1, "fdw", 1e12]: - with pytest.raises((TypeError, ValueError)): - _ = load_eit_data(draeger_file1, vendor="draeger", sample_frequency=20, first_frame=ff) - - for ff2 in [0, 0.0, 1.0, None]: - _ = load_eit_data(draeger_file1, vendor="draeger", sample_frequency=20, first_frame=ff2) - - -def test_max_frames_too_large(): - with pytest.warns(): - _ = load_eit_data(draeger_file1, vendor="draeger", sample_frequency=20, max_frames=1e12) - - -def test_event_on_first_frame(draeger2: Sequence): - draeger3 = load_eit_data(draeger_file3, vendor="draeger", sample_frequency=20) - draeger3_events = draeger3.sparse_data["events_(draeger)"] - assert draeger3_events == draeger2.sparse_data["events_(draeger)"] - assert draeger3_events.time[0] == draeger3.eit_data["raw"].time[0] - - -@pytest.mark.parametrize("fixture_name", ["draeger1", "draeger2", "draeger_wrapped_time_axis"]) -def test_time_axis(fixture_name: str, request: pytest.FixtureRequest): - sequence = request.getfixturevalue(fixture_name) - time_diff = np.diff(sequence.time) - assert np.allclose(time_diff, time_diff.mean()) diff --git a/tests/test_mdn_filter.py b/tests/test_mdn_filter.py index d28f49ca4..b5c8051b6 100644 --- a/tests/test_mdn_filter.py +++ b/tests/test_mdn_filter.py @@ -110,8 +110,8 @@ def test_respiratory_rate_higher_than_heart_rate(): ) -def test_with_continuous_data(draeger1: Sequence): - continuous_data = draeger1.continuous_data["global_impedance_(raw)"] +def test_with_continuous_data(draeger_20hz_healthy_volunteer: Sequence): + continuous_data = draeger_20hz_healthy_volunteer.continuous_data["global_impedance_(raw)"] mdn_filter = MDNFilter( respiratory_rate=10 / MINUTE, heart_rate=80 / MINUTE, @@ -125,8 +125,8 @@ def test_with_continuous_data(draeger1: Sequence): assert np.allclose(filtered_data.values, filtered_signal) -def test_with_eit_data(draeger1: Sequence): - eit_data = draeger1.eit_data["raw"] +def test_with_eit_data(draeger_20hz_healthy_volunteer: Sequence): + eit_data = draeger_20hz_healthy_volunteer.eit_data["raw"] mdn_filter = MDNFilter( respiratory_rate=10 / MINUTE, heart_rate=80 / MINUTE, @@ -244,9 +244,9 @@ def test_wrong_input_type_raises(): mdn_filter.apply(12345) -def test_provide_sample_frequency_axis_with_datacontainers_raises(draeger1: Sequence): - eit_data = draeger1.eit_data["raw"] - continuous_data = draeger1.continuous_data["global_impedance_(raw)"] +def test_provide_sample_frequency_axis_with_datacontainers_raises(draeger_20hz_healthy_volunteer: Sequence): + eit_data = draeger_20hz_healthy_volunteer.eit_data["raw"] + continuous_data = draeger_20hz_healthy_volunteer.continuous_data["global_impedance_(raw)"] mdn_filter = MDNFilter( respiratory_rate=10 / MINUTE, heart_rate=80 / MINUTE, @@ -265,8 +265,8 @@ def test_provide_sample_frequency_axis_with_datacontainers_raises(draeger1: Sequ mdn_filter.apply(eit_data, axis=0) -def test_kwargs(draeger1: Sequence): - eit_data = draeger1.eit_data["raw"] +def test_kwargs(draeger_20hz_healthy_volunteer: Sequence): + eit_data = draeger_20hz_healthy_volunteer.eit_data["raw"] mdn_filter = MDNFilter( respiratory_rate=10 / MINUTE, heart_rate=80 / MINUTE, @@ -278,9 +278,9 @@ def test_kwargs(draeger1: Sequence): assert filtered_data.label == "Filtered EIT Data" -def test_plot_filter_effects(draeger1: Sequence): +def test_plot_filter_effects(draeger_20hz_healthy_volunteer: Sequence): """This test only checks that the plotting function runs without error.""" - impedance = draeger1.continuous_data["global_impedance_(raw)"] + impedance = draeger_20hz_healthy_volunteer.continuous_data["global_impedance_(raw)"] mdn_filter = MDNFilter( respiratory_rate=10 / MINUTE, heart_rate=80 / MINUTE, diff --git a/tests/test_parameter_tiv.py b/tests/test_parameter_tiv.py index 9b2df3def..3410e4eda 100644 --- a/tests/test_parameter_tiv.py +++ b/tests/test_parameter_tiv.py @@ -1,6 +1,4 @@ import copy -import os -from pathlib import Path from unittest.mock import patch import numpy as np @@ -12,18 +10,8 @@ from eitprocessing.datahandling.intervaldata import IntervalData from eitprocessing.datahandling.sequence import Sequence from eitprocessing.datahandling.sparsedata import SparseData +from eitprocessing.features.breath_detection import BreathDetection from eitprocessing.parameters.tidal_impedance_variation import TIV -from tests.test_breath_detection import BreathDetection - -environment = Path( - os.environ.get( - "EIT_PROCESSING_TEST_DATA", - Path(__file__).parent.parent.resolve(), - ), -) -data_directory = environment / "tests" / "test_data" -draeger_file1 = data_directory / "Draeger_Test3.bin" -timpel_file = data_directory / "Timpel_Test.txt" def create_result_array(value: float): @@ -186,39 +174,38 @@ def test_tiv_initialization_with_valid_method(): @pytest.mark.parametrize( - ("store_input", "sequence_fixture", "expected_exception"), + ("store_input", "sequence", "expected_exception"), [ (True, "not_a_sequence", ValueError), # Expect ValueError because a string is not a valid Sequence (True, "none_sequence", RuntimeError), # Expect RuntimeError because sequence is None and store=True ], + indirect=["sequence"], ) def test_store_result_with_errors( mock_eit_data: EITData, mock_continuous_data: ContinuousData, - request: pytest.FixtureRequest, store_input: bool, - sequence_fixture: str, - expected_exception: ValueError | RuntimeError, + sequence: str, + expected_exception: type[Exception], ): """Test storing results when errors are expected.""" tiv = TIV() # Ensure that breaths are detected # Retrieve the sequence from the fixture - test_sequence = request.getfixturevalue(sequence_fixture) # Expect a specific exception (either ValueError or RuntimeError) with pytest.raises(expected_exception): tiv.compute_continuous_parameter( mock_continuous_data, tiv_method="inspiratory", - sequence=test_sequence, + sequence=sequence, store=store_input, ) with pytest.raises(expected_exception): tiv.compute_pixel_parameter( eit_data=mock_eit_data, continuous_data=mock_continuous_data, - sequence=test_sequence, + sequence=sequence, tiv_method="inspiratory", tiv_timing="pixel", store=store_input, @@ -227,7 +214,7 @@ def test_store_result_with_errors( @pytest.mark.parametrize( - ("store_input", "sequence_fixture"), + ("store_input", "sequence"), [ (True, "mock_sequence"), # Result should be stored (False, "mock_sequence"), # No result should be stored @@ -235,25 +222,22 @@ def test_store_result_with_errors( (None, "mock_sequence"), # Result should be stored (None, "none_sequence"), # No result stored, no sequence provided ], + indirect=["sequence"], ) def test_store_result_success( mock_eit_data: EITData, mock_continuous_data: ContinuousData, - request: pytest.FixtureRequest, store_input: bool, - sequence_fixture: str, + sequence: str, ): """Test storing results when no errors are expected.""" tiv = TIV() # Ensure that breaths are detected - # Retrieve the sequence from the fixture - test_sequence = request.getfixturevalue(sequence_fixture) - # Run continuous and pixel tiv computation and check the result continuous_result = tiv.compute_continuous_parameter( mock_continuous_data, tiv_method="inspiratory", - sequence=test_sequence, + sequence=sequence, store=store_input, result_label="continuous_tivs", ) @@ -261,7 +245,7 @@ def test_store_result_success( pixel_result = tiv.compute_pixel_parameter( eit_data=mock_eit_data, continuous_data=mock_continuous_data, - sequence=test_sequence, + sequence=sequence, tiv_method="inspiratory", tiv_timing="pixel", store=store_input, @@ -269,12 +253,12 @@ def test_store_result_success( ) # Check that the results are stored correctly based on store_input - if store_input in [True, None] and test_sequence is not None: - assert len(test_sequence.sparse_data.data) == 2 - assert test_sequence.sparse_data["continuous_tivs"] == continuous_result - assert test_sequence.sparse_data["pixel_tivs"] == pixel_result - elif test_sequence is not None: - assert len(test_sequence.sparse_data.data) == 0 + if store_input in [True, None] and sequence is not None: + assert len(sequence.sparse_data.data) == 2 + assert sequence.sparse_data["continuous_tivs"] == continuous_result + assert sequence.sparse_data["pixel_tivs"] == pixel_result + elif sequence is not None: + assert len(sequence.sparse_data.data) == 0 @pytest.mark.parametrize( @@ -482,36 +466,37 @@ def test_tiv_with_no_breaths_pixel( assert result.shape == (0, 2, 2) -def test_with_data(draeger1: Sequence, timpel1: Sequence, pytestconfig: pytest.Config): +@pytest.mark.parametrize( + "sequence", + ["draeger_20hz_healthy_volunteer_pressure_pod", "draeger_50hz_healthy_volunteer_pressure_pod"], + indirect=True, +) +def test_with_data(sequence: Sequence, pytestconfig: pytest.Config): # Skip test if '--cov' option is enabled if pytestconfig.getoption("--cov"): pytest.skip("Skip with option '--cov' so other tests can cover 100%.") - # Make deep copies of the data to avoid modifying the original sequences - draeger1 = copy.deepcopy(draeger1) - timpel1 = copy.deepcopy(timpel1) + sequence = copy.deepcopy(sequence) - # Iterate over both sequences (draeger1 and timpel1) - for sequence in draeger1, timpel1: - # Initialize the TIV object - tiv = TIV() - eit_data = sequence.eit_data["raw"] - cd = sequence.continuous_data["global_impedance_(raw)"] + # Initialize the TIV object + tiv = TIV() + eit_data = sequence.eit_data["raw"] + cd = sequence.continuous_data["global_impedance_(raw)"] - result_continuous = tiv.compute_continuous_parameter(cd, tiv_method="inspiratory") - result_pixel = tiv.compute_pixel_parameter(eit_data, cd, sequence) + result_continuous = tiv.compute_continuous_parameter(cd, tiv_method="inspiratory") + result_pixel = tiv.compute_pixel_parameter(eit_data, cd, sequence) - arr_result_continuous = np.stack(result_continuous.values) - arr_result_pixel = np.stack(result_pixel.values) + arr_result_continuous = np.stack(result_continuous.values) + arr_result_pixel = np.stack(result_pixel.values) - assert result_continuous is not None - assert isinstance(result_continuous, SparseData) - assert arr_result_continuous.ndim == 1 - assert np.all(arr_result_continuous > 0) # values should be positive for continuous data + assert result_continuous is not None + assert isinstance(result_continuous, SparseData) + assert arr_result_continuous.ndim == 1 + assert np.all(arr_result_continuous > 0) # values should be positive for continuous data - assert result_pixel is not None - assert isinstance(result_pixel, SparseData) - assert arr_result_pixel.ndim == 3 + assert result_pixel is not None + assert isinstance(result_pixel, SparseData) + assert arr_result_pixel.ndim == 3 @pytest.mark.parametrize( diff --git a/tests/test_pixel_breath.py b/tests/test_pixel_breath.py index ebcd71afa..43ee5791b 100644 --- a/tests/test_pixel_breath.py +++ b/tests/test_pixel_breath.py @@ -1,7 +1,6 @@ import copy import itertools -import os -from pathlib import Path +import warnings from unittest.mock import patch import numpy as np @@ -14,18 +13,8 @@ from eitprocessing.datahandling.intervaldata import IntervalData from eitprocessing.datahandling.sequence import Sequence from eitprocessing.datahandling.sparsedata import SparseData +from eitprocessing.features.breath_detection import BreathDetection from eitprocessing.features.pixel_breath import PixelBreath -from tests.test_breath_detection import BreathDetection - -environment = Path( - os.environ.get( - "EIT_PROCESSING_TEST_DATA", - Path(__file__).parent.parent.resolve(), - ), -) -data_directory = environment / "tests" / "test_data" -draeger_file1 = data_directory / "Draeger_Test3.bin" -timpel_file = data_directory / "Timpel_Test.txt" # @pytest.fixture() @@ -249,32 +238,30 @@ def test__find_extreme_indices(mock_only_pixel_impedance: tuple): @pytest.mark.parametrize( - ("store_input", "sequence_fixture", "expected_exception"), + ("store_input", "sequence", "expected_exception"), [ (True, "not_a_sequence", ValueError), # Expect ValueError because an empty list is not a Sequence (True, "none_sequence", RuntimeError), # Expect RuntimeError because store=True but no Sequence is provided ], + indirect=["sequence"], ) def test_store_result_with_errors( mock_eit_data: EITData, mock_continuous_data: ContinuousData, - request: pytest.FixtureRequest, store_input: bool, - sequence_fixture: str, + sequence: Sequence, expected_exception: type[ValueError | RuntimeError], ): """Test storing results when errors are expected.""" pi = PixelBreath(breath_detection=BreathDetection(minimum_duration=0.01)) # Ensure that breaths are detected - sequence = request.getfixturevalue(sequence_fixture) - # Expect a specific exception (either ValueError or RuntimeError) with pytest.raises(expected_exception): pi.find_pixel_breaths(mock_eit_data, mock_continuous_data, sequence, store=store_input) @pytest.mark.parametrize( - ("store_input", "sequence_fixture"), + ("store_input", "sequence"), [ (True, "mock_sequence"), # Result should be stored (False, "mock_sequence"), # No result should be stored @@ -282,19 +269,17 @@ def test_store_result_with_errors( (None, "mock_sequence"), # Result should be stored (None, "none_sequence"), # No result stored, no sequence provided ], + indirect=["sequence"], ) def test_store_result_success( mock_eit_data: EITData, mock_continuous_data: ContinuousData, - request: pytest.FixtureRequest, store_input: bool, - sequence_fixture: str, + sequence: Sequence, ): """Test storing results when no errors are expected.""" pi = PixelBreath(breath_detection=BreathDetection(minimum_duration=0.01)) # Ensure that breaths are detected - sequence = request.getfixturevalue(sequence_fixture) - # Run pixel breath detection and check the result result = pi.find_pixel_breaths(mock_eit_data, mock_continuous_data, sequence, store=store_input) @@ -348,63 +333,73 @@ def test_with_zero_impedance(mock_zero_eit_data: EITData, mock_continuous_data: assert test_result.shape == (3, 2, 2) -def test_with_data(draeger1: Sequence, timpel1: Sequence, pytestconfig: pytest.Config): +@pytest.mark.parametrize( + ("sequence", "slice_"), + [ + ("draeger_20hz_healthy_volunteer_pressure_pod", slice(None)), + ("timpel_healthy_volunteer_1", slice(421, 495)), + ], + indirect=["sequence"], +) +def test_with_data(sequence: Sequence, slice_: slice, pytestconfig: pytest.Config): if pytestconfig.getoption("--cov"): pytest.skip("Skip with option '--cov' so other tests can cover 100%.") - draeger1 = copy.deepcopy(draeger1) - timpel1 = copy.deepcopy(timpel1) - for sequence in draeger1, timpel1: - ssequence = sequence - pi = PixelBreath() - eit_data = ssequence.eit_data["raw"] - eit_data.pixel_impedance[:, 0, 0] = np.nan # set one pixel to NaN to test handling of NaNs - cd = ssequence.continuous_data["global_impedance_(raw)"] - pixel_breaths = pi.find_pixel_breaths(eit_data, cd) - test_result = np.stack(pixel_breaths.values) - assert not np.all(test_result == None) # noqa: E711 - _, n_rows, n_cols = test_result.shape - - for row, col in itertools.product(range(n_rows), range(n_cols)): - filtered_values = [val for val in test_result[:, row, col] if val is not None] - if not filtered_values: - return - start_indices, middle_indices, end_indices = (list(x) for x in zip(*filtered_values, strict=True)) - # Test whether pixel breaths are sorted properly - assert start_indices == sorted(start_indices) - assert middle_indices == sorted(middle_indices) - assert end_indices == sorted(end_indices) - - # Test whether indices are unique. `set` removes non-unique values, - # `sorted(list(...))` converts the set to a sorted list again. - assert list(start_indices) == sorted(set(start_indices)) - assert list(middle_indices) == sorted(set(middle_indices)) - assert list(end_indices) == sorted(set(end_indices)) - - # Test whether the start of the next breath is on/after the previous breath - assert all( - start_index >= end_index - for start_index, end_index in zip(start_indices[1:], end_indices[:-1], strict=True) - ) - for breath in filtered_values: - # Test whether the indices are in the proper order within a breath - assert breath.start_time < breath.middle_time < breath.end_time - - -def test_phase_modes(draeger1: Sequence, pytestconfig: pytest.Config): + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=UserWarning, message="No starting or end timepoint was selected.") + sequence = sequence.t[slice_] + + sequence = copy.deepcopy(sequence) + + pi = PixelBreath() + eit_data = sequence.eit_data["raw"] + eit_data.pixel_impedance[:, 0, 0] = np.nan # set one pixel to NaN to test handling of NaNs + cd = sequence.continuous_data["global_impedance_(raw)"] + pixel_breaths = pi.find_pixel_breaths(eit_data, cd) + test_result = np.stack(pixel_breaths.values) + assert not np.all(test_result == None) # noqa: E711 + _, n_rows, n_cols = test_result.shape + + for row, col in itertools.product(range(n_rows), range(n_cols)): + filtered_values = [val for val in test_result[:, row, col] if val is not None] + if not filtered_values: + return + start_indices, middle_indices, end_indices = (list(x) for x in zip(*filtered_values, strict=True)) + # Test whether pixel breaths are sorted properly + assert start_indices == sorted(start_indices) + assert middle_indices == sorted(middle_indices) + assert end_indices == sorted(end_indices) + + # Test whether indices are unique. `set` removes non-unique values, + # `sorted(list(...))` converts the set to a sorted list again. + assert list(start_indices) == sorted(set(start_indices)) + assert list(middle_indices) == sorted(set(middle_indices)) + assert list(end_indices) == sorted(set(end_indices)) + + # Test whether the start of the next breath is on/after the previous breath + assert all( + start_index >= end_index for start_index, end_index in zip(start_indices[1:], end_indices[:-1], strict=True) + ) + for breath in filtered_values: + # Test whether the indices are in the proper order within a breath + assert breath.start_time < breath.middle_time < breath.end_time + + +def test_phase_modes(draeger_50hz_healthy_volunteer_pressure_pod: Sequence, pytestconfig: pytest.Config): if pytestconfig.getoption("--cov"): pytest.skip("Skip with option '--cov' so other tests can cover 100%.") - ssequence = copy.deepcopy(draeger1) - eit_data = ssequence.eit_data["raw"] + sequence = copy.deepcopy(draeger_50hz_healthy_volunteer_pressure_pod) + eit_data = sequence.eit_data["raw"] - # reduce the pixel set to middly 'well-behaved' pixels with positive TIV - eit_data.pixel_impedance = eit_data.pixel_impedance[:, 10:23, 10:23] + # reduce the pixel set to some 'well-behaved' pixels with positive TIV + eit_data.pixel_impedance = eit_data.pixel_impedance[:, 14:20, 14:20] # flip a single pixel, so the differences between algorithms becomes predictable - eit_data.pixel_impedance[:, 6, 6] = -eit_data.pixel_impedance[:, 6, 6] + flip_row, flip_col = 5, 5 + eit_data.pixel_impedance[:, flip_row, flip_col] = -eit_data.pixel_impedance[:, flip_row, flip_col] - cd = ssequence.continuous_data["global_impedance_(raw)"] + cd = sequence.continuous_data["global_impedance_(raw)"] # replace the 'global' data with the sum of the middly pixels cd.values = np.sum(eit_data.pixel_impedance, axis=(1, 2)) @@ -412,9 +407,9 @@ def test_phase_modes(draeger1: Sequence, pytestconfig: pytest.Config): pb_negative_amplitude = PixelBreath(phase_correction_mode="negative amplitude").find_pixel_breaths(eit_data, cd) pb_phase_shift = PixelBreath(phase_correction_mode="phase shift").find_pixel_breaths(eit_data, cd) + with pytest.warns(RuntimeWarning, match=r"Skipping pixel \(.+\) because more than half \(.+\) of breaths skipped."): + pb_none = PixelBreath(phase_correction_mode="none").find_pixel_breaths(eit_data, cd) # results are not compared, other than for length; just make sure it runs - pb_none = PixelBreath(phase_correction_mode="none").find_pixel_breaths(eit_data, cd) - assert len(pb_negative_amplitude) == len(pb_phase_shift) == len(pb_none) # all breaths, except for the first and last, should have been detected @@ -423,10 +418,10 @@ def test_phase_modes(draeger1: Sequence, pytestconfig: pytest.Config): same_pixel_timing = np.array(pb_negative_amplitude.values) == np.array(pb_phase_shift.values) assert not np.all(same_pixel_timing) - assert not np.any(same_pixel_timing[1:-1, 6, 6]) # the single flipped pixel - assert np.all(same_pixel_timing[1:-1, :6, :]) # all pixels in the rows above match - assert np.all(same_pixel_timing[1:-1, 7:, :]) # all pixels in the rows below match - assert np.all(same_pixel_timing[1:-1, :, :6]) # all pixels in the columns to the left match - assert np.all(same_pixel_timing[1:-1, :, 7:]) # all pixels in the columns to the right match + assert not np.any(same_pixel_timing[1:-1, flip_row, flip_col]) # the single flipped pixel + assert np.all(same_pixel_timing[1:-1, :flip_row, :]) # all pixels in the rows above match + assert np.all(same_pixel_timing[1:-1, flip_row + 1 :, :]) # all pixels in the rows below match + assert np.all(same_pixel_timing[1:-1, :, :flip_col]) # all pixels in the columns to the left match + assert np.all(same_pixel_timing[1:-1, :, flip_col + 1 :]) # all pixels in the columns to the right match assert np.all(same_pixel_timing[0, :, :]) # all first values match, because they are all None assert np.all(same_pixel_timing[-1, :, :]) # all last values match, because they are all None diff --git a/tests/test_pixelmask.py b/tests/test_pixelmask.py index da6f6b511..a69e16698 100644 --- a/tests/test_pixelmask.py +++ b/tests/test_pixelmask.py @@ -151,8 +151,8 @@ def test_pixelmask_apply_numpy_array_higher_dimensions(): assert np.array_equal(result, np.array([[[np.nan, 2], [3, np.nan]], [[np.nan, 6], [7, np.nan]]]), equal_nan=True) -def test_pixelmask_apply_eitdata(draeger1: Sequence): - eit_data = draeger1.eit_data["raw"] +def test_pixelmask_apply_eitdata(draeger_20hz_healthy_volunteer: Sequence): + eit_data = draeger_20hz_healthy_volunteer.eit_data["raw"] with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="All-NaN slice encountered", category=RuntimeWarning) mask = PixelMask(np.full((32, 32), np.nan), suppress_all_nan_warning=True) diff --git a/tests/test_pixelmask_collection.py b/tests/test_pixelmask_collection.py index 4670dc914..3ea1f311e 100644 --- a/tests/test_pixelmask_collection.py +++ b/tests/test_pixelmask_collection.py @@ -216,8 +216,8 @@ def test_apply_to_numpy_data_label_format(anonymous_boolean_mask: Callable, nump _ = collection.apply(data, label_format="masked_{mask_label}") -def test_apply_to_eitdata_labelled(draeger1: Sequence, labelled_boolean_mask: Callable): - eit_data = draeger1.eit_data["raw"][:100] +def test_apply_to_eitdata_labelled(draeger_20hz_healthy_volunteer: Sequence, labelled_boolean_mask: Callable): + eit_data = draeger_20hz_healthy_volunteer.eit_data["raw"][:100] pm1 = labelled_boolean_mask("mask1") pm2 = labelled_boolean_mask("mask2") @@ -291,7 +291,7 @@ def test_apply_with_invalid_label_format_raises(pixel_map: Callable, labelled_bo _ = collection.apply(pixel_map_instance, label_format="{mask_label} {something_else}") -def test_apply_with_invalid_data_type_raises(labelled_boolean_mask: Callable, draeger1: Sequence): +def test_apply_with_invalid_data_type_raises(labelled_boolean_mask: Callable, draeger_20hz_healthy_volunteer: Sequence): collection = PixelMaskCollection([labelled_boolean_mask("mask1"), labelled_boolean_mask("mask2")]) with pytest.raises(TypeError, match="Unsupported data type:"): @@ -301,7 +301,7 @@ def test_apply_with_invalid_data_type_raises(labelled_boolean_mask: Callable, dr _ = collection.apply([[1, 2]]) with pytest.raises(TypeError, match="Unsupported data type:"): - _ = collection.apply(draeger1) + _ = collection.apply(draeger_20hz_healthy_volunteer) def test_apply_with_label_keyword_raises(labelled_boolean_mask: Callable, pixel_map: Callable): @@ -330,8 +330,8 @@ def test_apply_unsupported_type(): assert "Unsupported data type" in str(excinfo.value) -def test_apply_to_eitdata(draeger1: Sequence): - eit_data = draeger1.eit_data["raw"] +def test_apply_to_eitdata(draeger_20hz_healthy_volunteer: Sequence): + eit_data = draeger_20hz_healthy_volunteer.eit_data["raw"] pm1 = get_geometric_mask("ventral") pm2 = get_geometric_mask("dorsal") diff --git a/tests/test_rate_detection.py b/tests/test_rate_detection.py index f214fd3df..4c943ace4 100644 --- a/tests/test_rate_detection.py +++ b/tests/test_rate_detection.py @@ -333,9 +333,11 @@ def test_changing_frequency( @pytest.mark.parametrize( ("sequence", "slice_", "expected_rr", "expected_hr"), [ - ("draeger1", slice(None, None), 0.124, 1.121), - ("draeger2", slice(56650, 56760), 0.416, 1.183), - ("timpel1", slice(None, None), 0.329, 2.196), + ("draeger_20hz_healthy_volunteer_pressure_pod", slice(None, None), 0.140, 1.18), + ("draeger_50hz_healthy_volunteer_pressure_pod", slice(None, None), 0.188, 1.23), + ("draeger_20hz_healthy_volunteer_fixed_rr", slice(None, 47398), 0.167, 1.27), + ("draeger_20hz_healthy_volunteer_fixed_rr", slice(47522, None), 0.25, 1.33), + ("timpel_healthy_volunteer_1", slice(421, 495), 0.172, 1.381), ], indirect=["sequence"], ) diff --git a/tests/test_sequence_data.py b/tests/test_sequence_data.py index deea26952..e867ae6e4 100644 --- a/tests/test_sequence_data.py +++ b/tests/test_sequence_data.py @@ -3,9 +3,9 @@ import numpy as np import pytest +from eitprocessing.datahandling.continuousdata import ContinuousData from eitprocessing.datahandling.intervaldata import IntervalData from eitprocessing.datahandling.sequence import Sequence, _DataAccess -from tests.test_breath_detection import ContinuousData @pytest.fixture diff --git a/tests/test_tiv_lungspace.py b/tests/test_tiv_lungspace.py index 649d104b8..3770351e1 100644 --- a/tests/test_tiv_lungspace.py +++ b/tests/test_tiv_lungspace.py @@ -96,6 +96,15 @@ def test_tiv_lungspace_with_timing_data(create_signal: Callable): _ = TIVLungspace(threshold=0.2).apply(signal, timing_data=timing_data) -def test_tiv_lungpsace_with_real_data(draeger1: Sequence): - eit_data = draeger1.eit_data["raw"] +@pytest.mark.parametrize( + "sequence", + [ + "draeger_20hz_healthy_volunteer_pressure_pod", + "draeger_20hz_healthy_volunteer", + "draeger_50hz_healthy_volunteer_pressure_pod", + ], + indirect=True, +) +def test_tiv_lungpsace_with_real_data(sequence: Sequence): + eit_data = sequence.eit_data["raw"] _ = TIVLungspace(threshold=0.2).apply(eit_data) diff --git a/tests/test_watershed.py b/tests/test_watershed.py index ec7680bce..92390725f 100644 --- a/tests/test_watershed.py +++ b/tests/test_watershed.py @@ -81,8 +81,8 @@ def test_watershed_init_threshold_wrong_type(threshold: object): @pytest.mark.parametrize("threshold", [0.1, 0.15, 0.2, 0.25, 0.3]) -def test_watershed_with_real_data(draeger1: Sequence, threshold: float): - eit_data = draeger1.eit_data["raw"] +def test_watershed_with_real_data(draeger_50hz_healthy_volunteer_pressure_pod: Sequence, threshold: float): + eit_data = draeger_50hz_healthy_volunteer_pressure_pod.eit_data["raw"] watershed_mask = WatershedLungspace(threshold_fraction=threshold).apply(eit_data, captures=(captures := {})) assert captures, "captures should have values after runnning apply" @@ -127,17 +127,17 @@ def test_watershed_with_simulated_data(simulated_eit_data: Callable[..., EITData assert np.all(captures["mean tiv"].values <= captures["mean amplitude"].values, where=included_pixels) -def test_watershed_timing_data(draeger1: Sequence): - eit_data = draeger1.eit_data["raw"] - timing_data = draeger1.continuous_data["global_impedance_(raw)"] +def test_watershed_timing_data(draeger_50hz_healthy_volunteer_pressure_pod: Sequence): + eit_data = draeger_50hz_healthy_volunteer_pressure_pod.eit_data["raw"] + timing_data = draeger_50hz_healthy_volunteer_pressure_pod.continuous_data["global_impedance_(raw)"] watershed_mask_w_timing = WatershedLungspace().apply(eit_data, timing_data=timing_data) watershed_mask = WatershedLungspace().apply(eit_data) assert watershed_mask_w_timing == watershed_mask, "The timing data should be the same in this case" -def test_watershed_captures(draeger1: Sequence): - eit_data = draeger1.eit_data["raw"] +def test_watershed_captures(draeger_50hz_healthy_volunteer_pressure_pod: Sequence): + eit_data = draeger_50hz_healthy_volunteer_pressure_pod.eit_data["raw"] with pytest.raises(TypeError): _ = WatershedLungspace().apply(eit_data, captures=(captures := []))