Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions R/backends.R
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,25 @@ MSstatsPreprocessBigArrow <- function(input_file,
anomalyModelFeatures = c()) {
input <- arrow::open_dataset(input_file, format = "csv")

# dynamically cast any columns Arrow inferred as 'null'
null_columns <- names(input$schema)[vapply(input$schema$fields, function(f) f$type$ToString() == "null", logical(1))]
if (length(null_columns) > 0) {
for (col in null_columns) {
input <- dplyr::mutate(input, !!rlang::sym(col) := as.numeric(!!rlang::sym(col)))
}
}

if ("PrecursorMz" %in% input$schema$names) {
input <- dplyr::mutate(input, PrecursorMz = as.numeric(PrecursorMz))
}

input <- dplyr::mutate(input,
Feature = paste(PeptideSequence, PrecursorCharge,
FragmentIon, ProductCharge, sep = "_"))
feature_counts <- dplyr::group_by(input, ProteinName, Feature)
feature_counts <- dplyr::summarize(feature_counts,
MeanAbundance = mean(Intensity,
na.rm <- TRUE))
na.rm = TRUE))
feature_counts <- dplyr::collect(feature_counts)

feature_counts <- dplyr::mutate(
Expand All @@ -112,11 +124,11 @@ MSstatsPreprocessBigArrow <- function(input_file,
feature_rank <= max_feature_count)

feature_counts <- dplyr::select(feature_counts, -MeanAbundance, -feature_rank)
input <- dplyr::inner_join(input, feature_counts,
input <- dplyr::inner_join(input, feature_counts,
by = c("ProteinName", "Feature"))
input <- dplyr::select(input, -Feature)

arrow::write_csv_arrow(input, file = paste0("topN_", output_file_name))
arrow::write_dataset(input, .prefixedPath("topN_", output_file_name), format = "csv")

if (filter_unique_peptides) {
pp_df <- dplyr::select(input, ProteinName, PeptideSequence)
Expand All @@ -126,7 +138,6 @@ MSstatsPreprocessBigArrow <- function(input_file,
pp_df <- dplyr::select(pp_df, -NumProteins)
input <- dplyr::anti_join(input, pp_df, by = "PeptideSequence")
}

if (aggregate_psms) {
group_cols <- c("ProteinName", "PeptideSequence", "PrecursorCharge",
"FragmentIon", "ProductCharge", "IsotopeLabelType", "Run",
Expand Down Expand Up @@ -158,7 +169,7 @@ MSstatsPreprocessBigArrow <- function(input_file,
input <- dplyr::select(input, -Feature)
}

arrow::write_csv_arrow(input, file = output_file_name)
arrow::write_dataset(input, output_file_name, format = "csv")
input
}

Expand Down
55 changes: 42 additions & 13 deletions R/clean_DIANN.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,55 @@ reduceBigDIANN <- function(input_file, output_path, MBR = TRUE,
global_qvalue_cutoff = 0.01,
qvalue_cutoff = 0.01,
pg_qvalue_cutoff = 0.01,
calculateAnomalyScores=FALSE,
calculateAnomalyScores=FALSE,
anomalyModelFeatures=c(),
annotation = NULL) {
if (grepl("csv", input_file)) {
delim = ","
} else if (grepl("tsv|xls", input_file)) {
delim = "\t"
} else {
delim <- ";"
}

diann_chunk <- function(x, pos) cleanDIANNChunk(x, output_path, MBR,
# Per-chunk callback shared by both the parquet and delimited-text paths.
# `pos` drives .writeChunkToFile: pos == 1 overwrites, pos > 1 appends.
diann_chunk <- function(x, pos) cleanDIANNChunk(x, output_path, MBR,
quantificationColumn, pos,
global_qvalue_cutoff,
qvalue_cutoff,
pg_qvalue_cutoff,
global_qvalue_cutoff,
qvalue_cutoff,
pg_qvalue_cutoff,
calculateAnomalyScores,
anomalyModelFeatures,
annotation)

# Parquet branch (DIANN 2.0+): stream record batches via arrow so the file
# is never fully materialised. read_delim_chunked can't read parquet bytes.
if (tolower(tools::file_ext(input_file)) == "parquet") {
# Lazy handle to the parquet file — no data loaded yet.
ds <- arrow::open_dataset(input_file, format = "parquet")
# Scanner + RecordBatchReader yields one batch at a time on demand.
# batch_size matches the delimited-text path's 1M-row chunks; row-group
# boundaries in the parquet may cap individual batches below this.
scanner <- arrow::Scanner$create(ds, batch_size = 1e6)
reader <- scanner$ToRecordBatchReader()
pos <- 1
repeat {
batch <- reader$read_next_batch()
if (is.null(batch)) break # exhausted
# Materialise just this batch, run it through the shared cleaner.
diann_chunk(as.data.frame(batch), pos)
pos <- pos + 1
}
return(invisible(NULL))
}

# Delimited-text branch (DIANN 1.x TSV/CSV): sniff the delimiter from the
# first line, defaulting to tab when nothing matches.
first_line <- readLines(input_file, n = 1)
if (grepl("\t", first_line)) {
delim <- "\t"
} else if (grepl(",", first_line)) {
delim <- ","
} else if (grepl(";", first_line)) {
delim <- ";"
} else {
delim <- "\t"
}

# Stream the file in 1M-row chunks, invoking diann_chunk for each.
readr::read_delim_chunked(input_file,
readr::DataFrameCallback$new(diann_chunk),
delim = delim,
Expand Down
107 changes: 86 additions & 21 deletions R/converters.R
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ MSstatsPreprocessBig <- function(input_file,
calculateAnomalyScores,
anomalyModelFeatures)
} else if (backend == "sparklyr") {
MSstatsPreprocessBigSparklyr(connection, input, output_file_name,
MSstatsPreprocessBigSparklyr(connection, input_file, output_file_name,
max_feature_count, filter_unique_peptides,
aggregate_psms, filter_few_obs,
remove_annotation)
Expand Down Expand Up @@ -96,10 +96,16 @@ bigFragPipetoMSstatsFormat <- function(input_file, output_file_name,
filter_few_obs = FALSE,
remove_annotation = FALSE,
connection = NULL) {
MSstatsPreprocessBig(input_file, output_file_name,
backend, max_feature_count, filter_unique_peptides,
aggregate_psms, filter_few_obs, remove_annotation,
connection = connection)
MSstatsPreprocessBig(
input_file = input_file,
output_file_name = output_file_name,
backend = backend,
max_feature_count = max_feature_count,
filter_unique_peptides = filter_unique_peptides,
aggregate_psms = aggregate_psms,
filter_few_obs = filter_few_obs,
remove_annotation = remove_annotation,
connection = connection)
}


Expand Down Expand Up @@ -140,15 +146,23 @@ bigSpectronauttoMSstatsFormat <- function(input_file, output_file_name,
calculateAnomalyScores=FALSE,
anomalyModelFeatures=c(),
connection = NULL) {
reduceBigSpectronaut(input_file, paste0("reduce_output_", output_file_name),
reduced_file <- .prefixedPath("reduce_output_", output_file_name)
reduceBigSpectronaut(input_file, reduced_file,
intensity, filter_by_excluded, filter_by_identified,
filter_by_qvalue, qvalue_cutoff,
calculateAnomalyScores, anomalyModelFeatures)
msstats_data <- MSstatsPreprocessBig(
paste0("reduce_output_", output_file_name),
output_file_name, backend, max_feature_count,
aggregate_psms, filter_few_obs, remove_annotation, calculateAnomalyScores,
anomalyModelFeatures, connection)
input_file = reduced_file,
output_file_name = output_file_name,
backend = backend,
max_feature_count = max_feature_count,
filter_unique_peptides = filter_unique_peptides,
aggregate_psms = aggregate_psms,
filter_few_obs = filter_few_obs,
remove_annotation = remove_annotation,
calculateAnomalyScores = calculateAnomalyScores,
anomalyModelFeatures = anomalyModelFeatures,
connection = connection)

return(msstats_data)

Expand Down Expand Up @@ -184,22 +198,59 @@ bigDIANNtoMSstatsFormat <- function(input_file,
connection = NULL) {

# Reduce and clean the DIANN report file in chunks
reduceBigDIANN(input_file,
paste0("reduce_output_", output_file_name),
reduced_file <- .prefixedPath("reduce_output_", output_file_name)
reduceBigDIANN(input_file,
reduced_file,
MBR,
quantificationColumn,
global_qvalue_cutoff, qvalue_cutoff, pg_qvalue_cutoff,
global_qvalue_cutoff, qvalue_cutoff, pg_qvalue_cutoff,
calculateAnomalyScores, anomalyModelFeatures,
annotation)


reduced <- arrow::open_dataset(reduced_file, format = "csv")

# Identify columns where Arrow inferred 'null' type (all values NA)
null_cols <- names(reduced$schema)[
vapply(reduced$schema$fields, function(f) f$type$ToString() == "null", logical(1))
]

if (length(null_cols) > 0) {
# Drop null-typed columns using a lazy select (no data loaded into memory)
reduced <- dplyr::select(reduced, -dplyr::all_of(null_cols))

# Write back using Arrow's streaming writer — stays out-of-memory.
# write_dataset creates a directory, but open_dataset can read
# directories just as easily as single files.
cleaned_file <- .prefixedPath("cleaned_", output_file_name)
arrow::write_dataset(reduced, cleaned_file, format = "csv")
reduced_file <- cleaned_file
}

# Preprocess the cleaned data (feature selection, etc.)
msstats_data <- MSstatsPreprocessBig(
paste0("reduce_output_", output_file_name),
output_file_name, backend, max_feature_count,
filter_unique_peptides, aggregate_psms, filter_few_obs,
remove_annotation, calculateAnomalyScores,
anomalyModelFeatures, connection)

input_file = reduced_file,
output_file_name = output_file_name,
backend = backend,
max_feature_count = max_feature_count,
filter_unique_peptides = filter_unique_peptides,
aggregate_psms = aggregate_psms,
filter_few_obs = filter_few_obs,
remove_annotation = remove_annotation,
calculateAnomalyScores = calculateAnomalyScores,
anomalyModelFeatures = anomalyModelFeatures,
connection = connection)

# Merge annotation with the preprocessed data and persist the merge so
# callers reopening output_file_name see Condition/BioReplicate. The arrow
# rewrite stays lazy — the underlying source is reduced_file, not
# output_file_name, so we can safely overwrite the directory we just wrote.
if (!is.null(annotation)) {
msstats_data <- MSstatsAddAnnotationBig(msstats_data, annotation)
if (backend == "arrow") {
unlink(output_file_name, recursive = TRUE, force = TRUE)
arrow::write_dataset(msstats_data, output_file_name, format = "csv")
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
return(msstats_data)
}

Expand Down Expand Up @@ -232,5 +283,19 @@ bigDIANNtoMSstatsFormat <- function(input_file,
#' @return table of `input` and `annotation` merged by Run column.
#'
MSstatsAddAnnotationBig <- function(input, annotation) {
dplyr::inner_join(input, annotation, by = "Run")
join_keys <- "Run"

# Use tbl_vars which works reliably on both Arrow
# datasets, arrow_dplyr_query objects, and data frames
input_cols <- dplyr::tbl_vars(input)

overlap_cols <- setdiff(
intersect(input_cols, colnames(annotation)),
join_keys
)
if (length(overlap_cols) > 0) {
input <- dplyr::select(input, -dplyr::all_of(overlap_cols))
}

dplyr::inner_join(input, annotation, by = join_keys)
}
18 changes: 17 additions & 1 deletion R/utils.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
#' Build an intermediate output path by prefixing only the basename.
#'
#' Naive `paste0(prefix, output_file_name)` corrupts paths that contain a
#' directory (`subdir/out.csv` → `topN_subdir/out.csv`,
#' `/tmp/out.csv` → `topN_/tmp/out.csv`). Splitting via dirname/basename keeps
#' the directory component intact so intermediate files land beside the final
#' output.
#'
#' @param prefix Character scalar prepended to the basename.
#' @param path Output file path supplied by the caller.
#' @return Character scalar.
#' @keywords internal
.prefixedPath <- function(prefix, path) {
file.path(dirname(path), paste0(prefix, basename(path)))
}

#' Write chunk to file
#'
#'
#' @param input Data frame
#' @param output_path Path to output file
#' @param pos Chunk position
Expand Down
39 changes: 0 additions & 39 deletions tests/testthat/test-clean_DIANN.R

This file was deleted.

Loading
Loading