diff --git a/src/tools/database.py b/src/tools/database.py index 2da99fc..0cf7466 100644 --- a/src/tools/database.py +++ b/src/tools/database.py @@ -5,7 +5,6 @@ from tools.elements import Compound, IsotopeDB from tools.utils import get_adducts, get_file_delimiter, modify_formula_dict -import re class Database: diff --git a/src/tools/elements.py b/src/tools/elements.py index d065d0a..fd627a1 100644 --- a/src/tools/elements.py +++ b/src/tools/elements.py @@ -3,12 +3,11 @@ from functools import cached_property from importlib.resources import files from pathlib import Path -from typing import Any import numpy as np import pandas as pd -from tools.utils import get_decoy_info, modify_formula_dict, str_to_dict, get_formula, get_charge +from tools.utils import get_charge, get_decoy_info, get_formula, modify_formula_dict, str_to_dict ELECTRON_MASS = 5.486e-4 diff --git a/src/tools/peak.py b/src/tools/peak.py index 14d9479..8feae13 100644 --- a/src/tools/peak.py +++ b/src/tools/peak.py @@ -1,6 +1,6 @@ import logging from collections.abc import Iterator -from dataclasses import InitVar, dataclass, field +from dataclasses import dataclass from pathlib import Path import pandas as pd diff --git a/src/tools/spectra.py b/src/tools/spectra.py index 1f150b7..7c5795d 100644 --- a/src/tools/spectra.py +++ b/src/tools/spectra.py @@ -1,12 +1,11 @@ -from collections.abc import Iterator +from collections.abc import Callable, Iterator from dataclasses import dataclass from pathlib import Path -from typing import Literal, Callable +from typing import Literal import numpy as np import numpy.typing as npt import pymzml -import pandas as pd class Spectra: diff --git a/src/tools/utils.py b/src/tools/utils.py index a21567c..d2fef59 100644 --- a/src/tools/utils.py +++ b/src/tools/utils.py @@ -70,52 +70,6 @@ def _get_open_method(_filepath: Path) -> tuple[Callable[..., Any], str]: } -def get_ppm_range( - lower_bound: np.ndarray, upper_bound: np.ndarray, ppm_error: float -) -> tuple[np.ndarray, np.ndarray]: - """ - Expand an m/z range by a given ppm tolerance. - - Parameters - ---------- - lower_bound : np.ndarray - Lower m/z boundary values. - upper_bound : np.ndarray - Upper m/z boundary values. - ppm_error : float - Parts-per-million tolerance to apply. - - Returns - ------- - tuple[np.ndarray, np.ndarray] - Updated (lower_bound, upper_bound) after applying the ppm expansion. - """ - lower_bound += -ppm_error / 1e6 * lower_bound - upper_bound += ppm_error / 1e6 * upper_bound - return lower_bound, upper_bound - - -def calculate_ppm_error( - observed_mz: float | np.ndarray, theoretical_mz: float | np.ndarray -) -> float | np.ndarray: - """ - Calculate the absolute ppm error between observed and theoretical m/z values. - - Parameters - ---------- - observed_mz : float or np.ndarray - Observed m/z value(s). - theoretical_mz : float or np.ndarray - Theoretical m/z value(s). - - Returns - ------- - float or np.ndarray - Absolute ppm error(s). - """ - return np.abs((observed_mz - theoretical_mz) / theoretical_mz) * 1e6 - - def aggregate_dict_values(dict1: dict[str, int], dict2: dict[str, int]) -> dict[str, int]: """ Merge two dictionaries by summing values for matching keys. @@ -323,41 +277,6 @@ def get_adducts(header: Sequence[str]) -> list[str]: return [item for item in header if re.match("^[M[+-].*](\d+)?[+-]", item)] -def normalize_scores(dist: float, dist_range: list[float] | None) -> float: - """ - Normalize a distance or score value to the [0, 1] range. - - Parameters - ---------- - dist : float - Raw distance or score value. - dist_range : list[float] or None - Expected range [min, max] of the score. Use [0, np.inf] for strictly non-negative - distances, [-np.inf, 0] for non-positive, or None to default to [0, 1]. - - Returns - ------- - float - Normalized score clipped to [0, 1]. - """ - if dist_range is None: - dist_range = [0, 1] - - if dist_range == [0, np.inf]: - result = dist / (1 + dist) - elif dist_range == [-np.inf, 0]: - result = 1 / (1 - dist) - else: - result = (dist - dist_range[0]) / (dist_range[1] - dist_range[0]) - - if result < 0: - result = 0 - elif result > 1: - result = 1 - - return result - - def remove_noise(spectra: np.ndarray | list, noise: float | None) -> np.ndarray: """ Zero out intensities below a relative noise threshold. @@ -379,26 +298,6 @@ def remove_noise(spectra: np.ndarray | list, noise: float | None) -> np.ndarray: return np.stack([spectra[:, 0], intensities], axis=1) -def normalize_intensity(spectrum: np.ndarray) -> np.ndarray: - """ - Normalize spectrum intensities using total-sum normalization. - - Parameters - ---------- - spectrum : np.ndarray - 2D array of shape (n, 2) with columns [m/z, intensity]. - - Returns - ------- - np.ndarray - Spectrum with intensities rescaled so that they sum to 1. - """ - - if len(spectrum) > 0 and (_sum := np.sum(spectrum[:, 1])) > 0: - spectrum[:, 1] = spectrum[:, 1] / _sum - return spectrum - - def str_to_dict(formula: str) -> dict[str, int]: """ Parse a chemical formula string into a dictionary of element counts. diff --git a/tests/conftest.py b/tests/conftest.py index 79a78a5..d03b703 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,9 @@ +import warnings from pathlib import Path import numpy as np import pytest -import warnings + from tools import Database, IsotopeDB, Peaks, Spectra np.set_printoptions(legacy="1.25") diff --git a/tests/test_elements.py b/tests/test_elements.py index 4b9bb4e..bfe03cd 100644 --- a/tests/test_elements.py +++ b/tests/test_elements.py @@ -1,4 +1,3 @@ -from pathlib import Path import numpy as np import pandas as pd diff --git a/tests/test_peak.py b/tests/test_peak.py index 21e46cb..bc317ee 100644 --- a/tests/test_peak.py +++ b/tests/test_peak.py @@ -1,4 +1,4 @@ -from tools import Peak, Compound +from tools import Compound, Peak def test_peaks_object(isotope_db, peaks): diff --git a/tests/test_utils.py b/tests/test_utils.py index bd16997..4e98b29 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,15 +1,18 @@ +import gzip + import numpy as np from tools.utils import ( get_adducts, + get_charge, get_decoy_info, get_element_count, get_file_delimiter, + get_file_info, + get_formula, modify_charge, modify_formula_dict, remove_noise, - get_charge, - get_formula, ) @@ -29,6 +32,39 @@ def test_get_file_delimiter(data_dir): assert delimiter == expected_delimiter +def test_get_file_info(data_dir, tmp_path): + """ + Checks whether `get_file_info` returns accurate metadata for plain and + gzip-compressed delimited files, including delimiter, open function, mode, + row count, column count, and whether 'mz' appears as a column header. + """ + result = get_file_info(data_dir / "iso_list.csv") + assert result["delim"] == "," + assert result["open_fn"] is open + assert result["mode"] == "r" + assert result["n_rows"] == 296 + assert result["n_columns"] == 4 + assert result["has_header"] is False + + result = get_file_info(data_dir / "formula-database-truncated.tsv.gz") + assert result["delim"] == "\t" + assert result["open_fn"] is gzip.open + assert result["mode"] == "rt" + assert result["n_rows"] == 2000 + assert result["n_columns"] == 15 + assert result["has_header"] is False + + mz_csv = tmp_path / "mz_data.csv" + mz_csv.write_text("mz,intensity\n100.0,0.5\n200.0,1.0\n") + result = get_file_info(mz_csv) + assert result["delim"] == "," + assert result["open_fn"] is open + assert result["mode"] == "r" + assert result["n_rows"] == 3 + assert result["n_columns"] == 2 + assert result["has_header"] is True + + def test_modify_formula_dict(): """ Checks whether the `modify_formula_dict` function accurately updates a dictionary