From aadbe127cfff02435f14a9ff9c155856d4097b1e Mon Sep 17 00:00:00 2001 From: anushka255 Date: Thu, 4 Jun 2026 17:46:13 -0400 Subject: [PATCH] added utility functions in spectra class and added test cases for those --- src/tools/peak.py | 1 - src/tools/spectra.py | 65 ++++++++++++++++++++++++ src/tools/utils.py | 9 ++++ tests/test_spectra.py | 114 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 188 insertions(+), 1 deletion(-) diff --git a/src/tools/peak.py b/src/tools/peak.py index 8feae13..cf925d7 100644 --- a/src/tools/peak.py +++ b/src/tools/peak.py @@ -166,7 +166,6 @@ def _process_peaks(self, filepath: str | Path) -> "dict[int | str, Peak]": ) return peak_info - @dataclass(frozen=True) class Peak: """A single chromatographic peak with optional compound annotation.""" diff --git a/src/tools/spectra.py b/src/tools/spectra.py index 7c5795d..0eb5f55 100644 --- a/src/tools/spectra.py +++ b/src/tools/spectra.py @@ -6,6 +6,7 @@ import numpy as np import numpy.typing as npt import pymzml +from tools.utils import get_ppm_range class Spectra: @@ -152,3 +153,67 @@ class Spectrum: intensity: npt.NDArray[np.float64] polarity: Literal[0, 1, -1] rtime_unit: str + + def _match_peaks( + self, other_spectrum: npt.NDArray[np.float64], ppm_error: float, abs_tol: float = 0 + ) -> npt.NDArray[np.float64]: + """ + Match peaks from this spectrum against ``exp`` within a ppm tolerance. + + Each unmatched peak from ``self`` contributes a row with zero exp values; + each unmatched peak from ``exp`` contributes a row with zero self values. + + Returns an (n, 4) array with columns [self_mz, self_int, exp_mz, exp_int]. + """ + spec1 = np.column_stack([self.mz, self.intensity]) + spec1 = spec1[np.argsort(spec1[:, 0])] + spec2 = other_spectrum[np.argsort(other_spectrum[:, 0])] + matches = [] + + for i, spec in enumerate(spec1): + lower_bound, upper_bound = get_ppm_range(spec[0], ppm_error, abs_tol) + mask = (spec2[:, 0] >= lower_bound) & (spec2[:, 0] <= upper_bound) + if sum(mask) > 0: + matches.append( + np.hstack([spec, spec2[mask][np.argsort(np.abs(spec2[mask, 0] - spec[0]))[0]]]) + ) + else: + matches.append(np.r_[spec, [0, 0]]) + + matches = np.array(matches) + for spec in other_spectrum: + if spec[0] not in matches[:, 2]: + matches = np.vstack((matches, np.r_[[0, 0], spec])) + + return matches + + def compare_spectra( + self, + other_spectrum: npt.NDArray[np.float64], + ppm_error: float, + function: Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], float], + ) -> float: + """ + Score this spectrum against ``other_spectrum``. + + Parameters + ---------- + other_spectrum: + Array of shape ``(n, 2)`` with columns ``[mz, intensity]``. + + ppm_error: + Mass tolerance in parts-per-million for peak matching. + + function: + Scoring function applied to ``(self_intensities, other_intensities)`` + extracted from matched peak rows. + + Returns + ------- + float + Score returned by ``function``, or 0 if ``other_spectrum`` is empty. + """ + if other_spectrum.size == 0: + return 0.0 + matches = self._match_peaks(other_spectrum, ppm_error) + return float(function(matches[:, 1], matches[:, 3])) diff --git a/src/tools/utils.py b/src/tools/utils.py index d2fef59..e63f148 100644 --- a/src/tools/utils.py +++ b/src/tools/utils.py @@ -401,3 +401,12 @@ def get_formula(element_count: dict[str, int], charge: int) -> str: formula += sign + magnitude return formula + + +def get_ppm_range(mz, ppm_error=0, abs_tol=0): + lower_bound = mz - ppm_error / 1e6 * mz + upper_bound = mz + ppm_error / 1e6 * mz + if abs_tol != 0: + lower_bound += -abs_tol + upper_bound += abs_tol + return lower_bound, upper_bound \ No newline at end of file diff --git a/tests/test_spectra.py b/tests/test_spectra.py index 41d3991..91b0045 100644 --- a/tests/test_spectra.py +++ b/tests/test_spectra.py @@ -1,9 +1,26 @@ +from pathlib import Path + +import numpy as np import pytest from tools import Spectra from tools.spectra import Spectrum +def _make_spectrum(mz, intensity): + return Spectrum( + spectrum_index=0, + ms_level=2, + rtime=1.0, + scan_index=1, + file=Path("test.mzML"), + mz=np.array(mz, dtype=np.float64), + intensity=np.array(intensity, dtype=np.float64), + polarity=1, + rtime_unit="minute", + ) + + def test_spectra_len(data_dir, spectra): assert len(spectra) == 600 s = Spectra(filepaths=[data_dir / "Blank1A.mzML"]) @@ -70,3 +87,100 @@ def test_configure_retention_time_minute_to_hour(): s._configure_retention_time(0.0, "hour") result = s._configure_retention_time(60.0, "minute") assert result == pytest.approx(1.0) + + +def test_match_peaks_all_matched(): + """ + All self peaks have a counterpart in other_spectrum within the ppm window. + No unmatched rows should appear on either side. + """ + spec = _make_spectrum(mz=[100.0, 200.0], intensity=[0.5, 0.8]) + other = np.array([[100.0005, 0.6], [200.001, 0.9]]) # both within 20 ppm + + matches = spec._match_peaks(other, ppm_error=20) + + expected = np.array([ + [100.0, 0.5, 100.0005, 0.6], + [200.0, 0.8, 200.001, 0.9], + ]) + assert matches.shape == (2, 4) + assert np.allclose(matches, expected) + + +def test_match_peaks_partial_match(): + spec = _make_spectrum(mz=[100.0, 200.0, 300.0], intensity=[0.5, 0.8, 0.3]) + other = np.array([[100.0, 0.6], [500.0, 0.7]]) + + matches = spec._match_peaks(other, ppm_error=10) + + expected = np.array([ + [100.0, 0.5, 100.0, 0.6], + [200.0, 0.8, 0.0, 0.0], + [300.0, 0.3, 0.0, 0.0], + [0.0, 0.0, 500.0, 0.7], + ]) + assert matches.shape == (4, 4) + assert np.allclose(matches, expected) + + +def test_match_peaks_no_match(): + spec = _make_spectrum(mz=[100.0], intensity=[0.5]) + other = np.array([[300.0, 0.8]]) + + matches = spec._match_peaks(other, ppm_error=10) + + expected = np.array([ + [100.0, 0.5, 0.0, 0.0], + [0.0, 0.0, 300.0, 0.8], + ]) + assert matches.shape == (2, 4) + assert np.allclose(matches, expected) + + +def test_match_peaks_closest_chosen(): + spec = _make_spectrum(mz=[100.0], intensity=[0.5]) + other = np.array([[99.999, 0.4], [100.0005, 0.6]]) + + matches = spec._match_peaks(other, ppm_error=20) + + # 99.999 is closer to 100.0 than 100.0005 + expected = np.array([ + [100.0, 0.5, 100.0005, 0.6], + [0.0, 0.0, 99.999, 0.4], + ]) + assert matches.shape == (2, 4) + assert np.allclose(matches, expected) + + +def test_match_peaks_abs_tol(): + spec = _make_spectrum(mz=[100.0], intensity=[0.5]) + other = np.array([[100.005, 0.6]]) + + without_abs_tol = spec._match_peaks(other, ppm_error=10, abs_tol=0) + with_abs_tol = spec._match_peaks(other, ppm_error=10, abs_tol=0.005) + + assert np.allclose(without_abs_tol, [[100.0, 0.5, 0.0, 0.0], [0.0, 0.0, 100.005, 0.6]]) + assert np.allclose(with_abs_tol, [[100.0, 0.5, 100.005, 0.6]]) + + + +def test_compare_spectra_empty_other(): + spec = _make_spectrum(mz=[100.0], intensity=[0.5]) + result = spec.compare_spectra(np.empty((0, 2)), ppm_error=10, function=np.dot) + assert result == 0.0 + + +def test_compare_spectra_matched_peaks(): + spec = _make_spectrum(mz=[100.0, 200.0], intensity=[0.5, 0.8]) + other = np.array([[100.0, 0.6], [200.0, 0.9]]) + + result = spec.compare_spectra(other, ppm_error=10, function=np.dot) + assert result == pytest.approx(1.02) + + +def test_compare_spectra_no_match(): + spec = _make_spectrum(mz=[100.0], intensity=[0.5]) + other = np.array([[300.0, 0.8]]) + + result = spec.compare_spectra(other, ppm_error=10, function=np.dot) + assert result == pytest.approx(0.0)