Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 13 additions & 64 deletions crates/sage-cli/src/output.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,31 @@
use rayon::prelude::*;
use sage_core::spectrum::MS1Spectra;
use sage_core::spectrum::ProcessedSpectrum;
use sage_core::{scoring::Feature, tmt::TmtQuant};

#[derive(Default)]
pub struct SageResults {
pub ms1: MS1Spectra,
pub ms1: Vec<ProcessedSpectrum>,
pub features: Vec<Feature>,
pub quant: Vec<TmtQuant>,
}

impl SageResults {
fn fold(mut self, other: SageResults) -> Self {
self.ms1.extend(other.ms1);
self.features.extend(other.features);
self.quant.extend(other.quant);
self
}
}

impl FromParallelIterator<SageResults> for SageResults {
fn from_par_iter<I>(par_iter: I) -> Self
where
I: IntoParallelIterator<Item = SageResults>,
{
par_iter
.into_par_iter()
.reduce(SageResults::default, |mut acc, x| {
acc.features.extend(x.features);
acc.quant.extend(x.quant);
match (acc.ms1, x.ms1) {
(MS1Spectra::NoMobility(mut a), MS1Spectra::NoMobility(b)) => {
a.extend(b);
acc.ms1 = MS1Spectra::NoMobility(a);
}
(MS1Spectra::WithMobility(mut a), MS1Spectra::WithMobility(b)) => {
a.extend(b);
acc.ms1 = MS1Spectra::WithMobility(a);
}
(MS1Spectra::Empty, MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::Empty;
}
(MS1Spectra::Empty, MS1Spectra::WithMobility(a))
| (MS1Spectra::WithMobility(a), MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::WithMobility(a);
}
(MS1Spectra::Empty, MS1Spectra::NoMobility(a))
| (MS1Spectra::NoMobility(a), MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::NoMobility(a);
}
_ => {
// In theory this can happen if someone is searching
// together files of different types, mixing the ones
// that support IMS and the ones that dont.
// ... I dont think this should be run-time recoverable.
unreachable!("Found combination of MS1 spectra with and without mobility.")
}
};
acc
})
.reduce(SageResults::default, SageResults::fold)
}
}

Expand All @@ -59,34 +36,6 @@ impl FromIterator<SageResults> for SageResults {
{
par_iter
.into_iter()
.fold(SageResults::default(), |mut acc, x| {
acc.features.extend(x.features);
acc.quant.extend(x.quant);
match (acc.ms1, x.ms1) {
(MS1Spectra::NoMobility(mut a), MS1Spectra::NoMobility(b)) => {
a.extend(b);
acc.ms1 = MS1Spectra::NoMobility(a);
}
(MS1Spectra::WithMobility(mut a), MS1Spectra::WithMobility(b)) => {
a.extend(b);
acc.ms1 = MS1Spectra::WithMobility(a);
}
(MS1Spectra::Empty, MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::Empty;
}
(MS1Spectra::Empty, MS1Spectra::WithMobility(a))
| (MS1Spectra::WithMobility(a), MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::WithMobility(a);
}
(MS1Spectra::Empty, MS1Spectra::NoMobility(a))
| (MS1Spectra::NoMobility(a), MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::NoMobility(a);
}
_ => {
unreachable!("Found combination of MS1 spectra with and without mobility.")
}
};
acc
})
.fold(SageResults::default(), SageResults::fold)
}
}
48 changes: 18 additions & 30 deletions crates/sage-cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sage_core::mass::Tolerance;
use sage_core::peptide::Peptide;
use sage_core::scoring::Fragments;
use sage_core::scoring::{Feature, Scorer};
use sage_core::spectrum::{MS1Spectra, ProcessedSpectrum, RawSpectrum, SpectrumProcessor};
use sage_core::spectrum::{ProcessedSpectrum, RawSpectrum, SpectrumProcessor};
use sage_core::tmt::TmtQuant;
use std::collections::{HashMap, HashSet};
use std::time::Instant;
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Runner {
}

pub fn prefilter_peptides(self, parallel: usize, fasta: Fasta) -> Vec<Peptide> {
let spectra: Option<Vec<ProcessedSpectrum<_>>> =
let spectra: Option<Vec<ProcessedSpectrum>> =
match parallel >= self.parameters.mzml_paths.len() {
true => Some(
self.read_processed_spectra(&self.parameters.mzml_paths, 0, 0)
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Runner {
fn peptide_filter_processed_spectra(
&self,
scorer: &Scorer,
spectra: &Vec<ProcessedSpectrum<sage_core::spectrum::Peak>>,
spectra: &[ProcessedSpectrum],
keep: &[std::sync::atomic::AtomicBool],
) {
use std::sync::atomic::{AtomicUsize, Ordering};
Expand All @@ -250,7 +250,7 @@ impl Runner {

spectra
.par_iter()
.filter(|spec| spec.peaks.len() >= self.parameters.min_peaks && spec.level == 2)
.filter(|spec| spec.masses.len() >= self.parameters.min_peaks && spec.level == 2)
.for_each(|spectrum| {
let prev = counter.fetch_add(1, Ordering::Relaxed);
if prev > 0 && prev % 10_000 == 0 {
Expand Down Expand Up @@ -301,15 +301,15 @@ impl Runner {
fn search_processed_spectra(
&self,
scorer: &Scorer,
msn_spectra: &Vec<ProcessedSpectrum<sage_core::spectrum::Peak>>,
msn_spectra: &[ProcessedSpectrum],
) -> Vec<Feature> {
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = AtomicUsize::new(0);
let start = Instant::now();

let features: Vec<_> = msn_spectra
.par_iter()
.filter(|spec| spec.peaks.len() >= self.parameters.min_peaks && spec.level == 2)
.filter(|spec| spec.masses.len() >= self.parameters.min_peaks && spec.level == 2)
.map(|x| {
let prev = counter.fetch_add(1, Ordering::Relaxed);
if prev > 0 && prev % 10_000 == 0 {
Expand All @@ -332,8 +332,8 @@ impl Runner {

fn complete_features(
&self,
msn_spectra: Vec<ProcessedSpectrum<sage_core::spectrum::Peak>>,
ms1_spectra: MS1Spectra,
msn_spectra: Vec<ProcessedSpectrum>,
ms1_spectra: Vec<ProcessedSpectrum>,
features: Vec<Feature>,
) -> SageResults {
let quant = self
Expand Down Expand Up @@ -378,10 +378,7 @@ impl Runner {
chunk: &[Url],
chunk_idx: usize,
batch_size: usize,
) -> (
MS1Spectra,
Vec<ProcessedSpectrum<sage_core::spectrum::Peak>>,
) {
) -> (Vec<ProcessedSpectrum>, Vec<ProcessedSpectrum>) {
// Read all of the spectra at once - this can help prevent memory over-consumption issues
info!(
"processing files {} .. {} ",
Expand Down Expand Up @@ -462,26 +459,17 @@ impl Runner {
.map(|s| sp.process(s))
.collect::<Vec<_>>();

// If all the MS1 spectra contain IMS, then we can process them
// we use the IMS! otherwise we dont.
// Note: Empty iterators return true.
let all_contain_ims = spectra.ms1.iter().all(|x| x.mobility.is_some());
let ms1_empty = spectra.ms1.is_empty();
let ms1_spectra = if ms1_empty {
let has_ims = spectra.ms1.iter().any(|x| x.mobility.is_some());
let ms1_spectra = if spectra.ms1.is_empty() {
log::trace!("no MS1 spectra found");
MS1Spectra::Empty
} else if all_contain_ims {
log::trace!("Processing MS1 spectra with IMS");
let spectra = spectra
.ms1
.into_iter()
.map(|x| sp.process_with_mobility(x))
.collect();
MS1Spectra::WithMobility(spectra)
Vec::new()
} else {
log::trace!("Processing MS1 spectra without IMS");
let spectra = spectra.ms1.into_iter().map(|s| sp.process(s)).collect();
MS1Spectra::NoMobility(spectra)
if has_ims {
log::trace!("Processing MS1 spectra with IMS columns");
} else {
log::trace!("Processing MS1 spectra without IMS");
}
spectra.ms1.into_par_iter().map(|s| sp.process(s)).collect()
};

let io_time = Instant::now() - start;
Expand Down
2 changes: 1 addition & 1 deletion crates/sage-cli/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn integration() -> anyhow::Result<()> {

let sp = SpectrumProcessor::new(100, true, 0.0);
let processed = sp.process(spectra[0].clone());
assert!(processed.peaks.len() <= 300);
assert!(processed.masses.len() <= 300);

let scorer = Scorer {
db: &database,
Expand Down
110 changes: 48 additions & 62 deletions crates/sage/src/lfq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::database::{binary_search_slice, IndexedDatabase, PeptideIx};
use crate::mass::{composition, Composition, Tolerance, NEUTRON};
use crate::ml::{matrix::Matrix, retention_alignment::Alignment};
use crate::scoring::Feature;
use crate::spectrum::MS1Spectra;
use crate::spectrum::ProcessedSpectrum;
use dashmap::DashMap;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -226,80 +226,66 @@ impl FeatureMap {
pub fn quantify(
&self,
db: &IndexedDatabase,
spectra: &MS1Spectra,
spectra: &[ProcessedSpectrum],
alignments: &[Alignment],
) -> HashMap<(PrecursorId, bool), (Peak, Vec<f64>), fnv::FnvBuildHasher> {
let scores: DashMap<(PrecursorId, bool), Grid, fnv::FnvBuildHasher> = DashMap::default();

log::info!("tracing MS1 features");

// TODO: find a good way to abstract this ... I think a macro would be the way to go.
match spectra {
MS1Spectra::NoMobility(spectra) => spectra.par_iter().for_each(|spectrum| {
if spectra.is_empty() {
log::warn!("no MS1 spectra found for quantification");
} else {
spectra.par_iter().for_each(|spectrum| {
let a = alignments[spectrum.file_id];
let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept;
let query = self.rt_slice(rt, RT_TOL);

for peak in &spectrum.peaks {
for entry in query.mass_lookup(peak.mass) {
let id = match self.settings.combine_charge_states {
true => PrecursorId::Combined(entry.peptide),
false => PrecursorId::Charged((entry.peptide, entry.charge)),
};

let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| {
let p = &db[entry.peptide];
let composition = p
.sequence
.iter()
.map(|r| composition(*r))
.sum::<Composition>();
let dist = crate::isotopes::peptide_isotopes(
composition.carbon,
composition.sulfur,
);
Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE)
});

grid.add_entry(rt, entry.isotope, spectrum.file_id, peak.intensity);
}
}
}),
MS1Spectra::WithMobility(spectra) => spectra.par_iter().for_each(|spectrum| {
let a = alignments[spectrum.file_id];
let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept;
let query = self.rt_slice(rt, RT_TOL);
let add_entry = |entry: &PrecursorRange, intensity: f32| {
let id = match self.settings.combine_charge_states {
true => PrecursorId::Combined(entry.peptide),
false => PrecursorId::Charged((entry.peptide, entry.charge)),
};

for peak in &spectrum.peaks {
for entry in query.mass_mobility_lookup(peak.mass, peak.mobility) {
let id = match self.settings.combine_charge_states {
true => PrecursorId::Combined(entry.peptide),
false => PrecursorId::Charged((entry.peptide, entry.charge)),
};

let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| {
let p = &db[entry.peptide];
let composition = p
.sequence
.iter()
.map(|r| composition(*r))
.sum::<Composition>();
let dist = crate::isotopes::peptide_isotopes(
composition.carbon,
composition.sulfur,
);
Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE)
});

grid.add_entry(rt, entry.isotope, spectrum.file_id, peak.intensity);
let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| {
let p = &db[entry.peptide];
let composition = p
.sequence
.iter()
.map(|r| composition(*r))
.sum::<Composition>();
let dist = crate::isotopes::peptide_isotopes(
composition.carbon,
composition.sulfur,
);
Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE)
});

grid.add_entry(rt, entry.isotope, spectrum.file_id, intensity);
};

if spectrum.mobilities.is_empty() {
for (&mass, &intensity) in
spectrum.masses.iter().zip(spectrum.intensities.iter())
{
for entry in query.mass_lookup(mass) {
add_entry(entry, intensity);
}
}
} else {
for ((&mass, &intensity), &mobility) in spectrum
.masses
.iter()
.zip(spectrum.intensities.iter())
.zip(spectrum.mobilities.iter())
{
for entry in query.mass_mobility_lookup(mass, mobility) {
add_entry(entry, intensity);
}
}
}
}),
MS1Spectra::Empty => {
// Should never be called if no MS1 spectra are present
log::warn!("no MS1 spectra found for quantification");
}
};
});
}

log::info!("integrating MS1 features");

Expand Down
Loading
Loading