diff --git a/R/backends.R b/R/backends.R index 9c9d5fc..f8d5382 100644 --- a/R/backends.R +++ b/R/backends.R @@ -95,13 +95,25 @@ MSstatsPreprocessBigArrow <- function(input_file, anomalyModelFeatures = c()) { input <- arrow::open_dataset(input_file, format = "csv") + # dynamically cast any columns Arrow inferred as 'null' + null_columns <- names(input$schema)[vapply(input$schema$fields, function(f) f$type$ToString() == "null", logical(1))] + if (length(null_columns) > 0) { + for (col in null_columns) { + input <- dplyr::mutate(input, !!rlang::sym(col) := as.numeric(!!rlang::sym(col))) + } + } + + if ("PrecursorMz" %in% input$schema$names) { + input <- dplyr::mutate(input, PrecursorMz = as.numeric(PrecursorMz)) + } + input <- dplyr::mutate(input, Feature = paste(PeptideSequence, PrecursorCharge, FragmentIon, ProductCharge, sep = "_")) feature_counts <- dplyr::group_by(input, ProteinName, Feature) feature_counts <- dplyr::summarize(feature_counts, MeanAbundance = mean(Intensity, - na.rm <- TRUE)) + na.rm = TRUE)) feature_counts <- dplyr::collect(feature_counts) feature_counts <- dplyr::mutate( @@ -112,11 +124,11 @@ MSstatsPreprocessBigArrow <- function(input_file, feature_rank <= max_feature_count) feature_counts <- dplyr::select(feature_counts, -MeanAbundance, -feature_rank) - input <- dplyr::inner_join(input, feature_counts, + input <- dplyr::inner_join(input, feature_counts, by = c("ProteinName", "Feature")) input <- dplyr::select(input, -Feature) - arrow::write_csv_arrow(input, file = paste0("topN_", output_file_name)) + arrow::write_dataset(input, .prefixedPath("topN_", output_file_name), format = "csv") if (filter_unique_peptides) { pp_df <- dplyr::select(input, ProteinName, PeptideSequence) @@ -126,7 +138,6 @@ MSstatsPreprocessBigArrow <- function(input_file, pp_df <- dplyr::select(pp_df, -NumProteins) input <- dplyr::anti_join(input, pp_df, by = "PeptideSequence") } - if (aggregate_psms) { group_cols <- c("ProteinName", "PeptideSequence", "PrecursorCharge", "FragmentIon", "ProductCharge", "IsotopeLabelType", "Run", @@ -158,7 +169,7 @@ MSstatsPreprocessBigArrow <- function(input_file, input <- dplyr::select(input, -Feature) } - arrow::write_csv_arrow(input, file = output_file_name) + arrow::write_dataset(input, output_file_name, format = "csv") input } diff --git a/R/clean_DIANN.R b/R/clean_DIANN.R index c8c7959..4332ddf 100644 --- a/R/clean_DIANN.R +++ b/R/clean_DIANN.R @@ -17,26 +17,55 @@ reduceBigDIANN <- function(input_file, output_path, MBR = TRUE, global_qvalue_cutoff = 0.01, qvalue_cutoff = 0.01, pg_qvalue_cutoff = 0.01, - calculateAnomalyScores=FALSE, + calculateAnomalyScores=FALSE, anomalyModelFeatures=c(), annotation = NULL) { - if (grepl("csv", input_file)) { - delim = "," - } else if (grepl("tsv|xls", input_file)) { - delim = "\t" - } else { - delim <- ";" - } - - diann_chunk <- function(x, pos) cleanDIANNChunk(x, output_path, MBR, + # Per-chunk callback shared by both the parquet and delimited-text paths. + # `pos` drives .writeChunkToFile: pos == 1 overwrites, pos > 1 appends. + diann_chunk <- function(x, pos) cleanDIANNChunk(x, output_path, MBR, quantificationColumn, pos, - global_qvalue_cutoff, - qvalue_cutoff, - pg_qvalue_cutoff, + global_qvalue_cutoff, + qvalue_cutoff, + pg_qvalue_cutoff, calculateAnomalyScores, anomalyModelFeatures, annotation) + # Parquet branch (DIANN 2.0+): stream record batches via arrow so the file + # is never fully materialised. read_delim_chunked can't read parquet bytes. + if (tolower(tools::file_ext(input_file)) == "parquet") { + # Lazy handle to the parquet file — no data loaded yet. + ds <- arrow::open_dataset(input_file, format = "parquet") + # Scanner + RecordBatchReader yields one batch at a time on demand. + # batch_size matches the delimited-text path's 1M-row chunks; row-group + # boundaries in the parquet may cap individual batches below this. + scanner <- arrow::Scanner$create(ds, batch_size = 1e6) + reader <- scanner$ToRecordBatchReader() + pos <- 1 + repeat { + batch <- reader$read_next_batch() + if (is.null(batch)) break # exhausted + # Materialise just this batch, run it through the shared cleaner. + diann_chunk(as.data.frame(batch), pos) + pos <- pos + 1 + } + return(invisible(NULL)) + } + + # Delimited-text branch (DIANN 1.x TSV/CSV): sniff the delimiter from the + # first line, defaulting to tab when nothing matches. + first_line <- readLines(input_file, n = 1) + if (grepl("\t", first_line)) { + delim <- "\t" + } else if (grepl(",", first_line)) { + delim <- "," + } else if (grepl(";", first_line)) { + delim <- ";" + } else { + delim <- "\t" + } + + # Stream the file in 1M-row chunks, invoking diann_chunk for each. readr::read_delim_chunked(input_file, readr::DataFrameCallback$new(diann_chunk), delim = delim, diff --git a/R/converters.R b/R/converters.R index 03af429..13b4383 100644 --- a/R/converters.R +++ b/R/converters.R @@ -62,7 +62,7 @@ MSstatsPreprocessBig <- function(input_file, calculateAnomalyScores, anomalyModelFeatures) } else if (backend == "sparklyr") { - MSstatsPreprocessBigSparklyr(connection, input, output_file_name, + MSstatsPreprocessBigSparklyr(connection, input_file, output_file_name, max_feature_count, filter_unique_peptides, aggregate_psms, filter_few_obs, remove_annotation) @@ -96,10 +96,16 @@ bigFragPipetoMSstatsFormat <- function(input_file, output_file_name, filter_few_obs = FALSE, remove_annotation = FALSE, connection = NULL) { - MSstatsPreprocessBig(input_file, output_file_name, - backend, max_feature_count, filter_unique_peptides, - aggregate_psms, filter_few_obs, remove_annotation, - connection = connection) + MSstatsPreprocessBig( + input_file = input_file, + output_file_name = output_file_name, + backend = backend, + max_feature_count = max_feature_count, + filter_unique_peptides = filter_unique_peptides, + aggregate_psms = aggregate_psms, + filter_few_obs = filter_few_obs, + remove_annotation = remove_annotation, + connection = connection) } @@ -140,15 +146,23 @@ bigSpectronauttoMSstatsFormat <- function(input_file, output_file_name, calculateAnomalyScores=FALSE, anomalyModelFeatures=c(), connection = NULL) { - reduceBigSpectronaut(input_file, paste0("reduce_output_", output_file_name), + reduced_file <- .prefixedPath("reduce_output_", output_file_name) + reduceBigSpectronaut(input_file, reduced_file, intensity, filter_by_excluded, filter_by_identified, filter_by_qvalue, qvalue_cutoff, calculateAnomalyScores, anomalyModelFeatures) msstats_data <- MSstatsPreprocessBig( - paste0("reduce_output_", output_file_name), - output_file_name, backend, max_feature_count, - aggregate_psms, filter_few_obs, remove_annotation, calculateAnomalyScores, - anomalyModelFeatures, connection) + input_file = reduced_file, + output_file_name = output_file_name, + backend = backend, + max_feature_count = max_feature_count, + filter_unique_peptides = filter_unique_peptides, + aggregate_psms = aggregate_psms, + filter_few_obs = filter_few_obs, + remove_annotation = remove_annotation, + calculateAnomalyScores = calculateAnomalyScores, + anomalyModelFeatures = anomalyModelFeatures, + connection = connection) return(msstats_data) @@ -184,22 +198,59 @@ bigDIANNtoMSstatsFormat <- function(input_file, connection = NULL) { # Reduce and clean the DIANN report file in chunks - reduceBigDIANN(input_file, - paste0("reduce_output_", output_file_name), + reduced_file <- .prefixedPath("reduce_output_", output_file_name) + reduceBigDIANN(input_file, + reduced_file, MBR, quantificationColumn, - global_qvalue_cutoff, qvalue_cutoff, pg_qvalue_cutoff, + global_qvalue_cutoff, qvalue_cutoff, pg_qvalue_cutoff, calculateAnomalyScores, anomalyModelFeatures, annotation) - + + reduced <- arrow::open_dataset(reduced_file, format = "csv") + + # Identify columns where Arrow inferred 'null' type (all values NA) + null_cols <- names(reduced$schema)[ + vapply(reduced$schema$fields, function(f) f$type$ToString() == "null", logical(1)) + ] + + if (length(null_cols) > 0) { + # Drop null-typed columns using a lazy select (no data loaded into memory) + reduced <- dplyr::select(reduced, -dplyr::all_of(null_cols)) + + # Write back using Arrow's streaming writer — stays out-of-memory. + # write_dataset creates a directory, but open_dataset can read + # directories just as easily as single files. + cleaned_file <- .prefixedPath("cleaned_", output_file_name) + arrow::write_dataset(reduced, cleaned_file, format = "csv") + reduced_file <- cleaned_file + } + # Preprocess the cleaned data (feature selection, etc.) msstats_data <- MSstatsPreprocessBig( - paste0("reduce_output_", output_file_name), - output_file_name, backend, max_feature_count, - filter_unique_peptides, aggregate_psms, filter_few_obs, - remove_annotation, calculateAnomalyScores, - anomalyModelFeatures, connection) - + input_file = reduced_file, + output_file_name = output_file_name, + backend = backend, + max_feature_count = max_feature_count, + filter_unique_peptides = filter_unique_peptides, + aggregate_psms = aggregate_psms, + filter_few_obs = filter_few_obs, + remove_annotation = remove_annotation, + calculateAnomalyScores = calculateAnomalyScores, + anomalyModelFeatures = anomalyModelFeatures, + connection = connection) + + # Merge annotation with the preprocessed data and persist the merge so + # callers reopening output_file_name see Condition/BioReplicate. The arrow + # rewrite stays lazy — the underlying source is reduced_file, not + # output_file_name, so we can safely overwrite the directory we just wrote. + if (!is.null(annotation)) { + msstats_data <- MSstatsAddAnnotationBig(msstats_data, annotation) + if (backend == "arrow") { + unlink(output_file_name, recursive = TRUE, force = TRUE) + arrow::write_dataset(msstats_data, output_file_name, format = "csv") + } + } return(msstats_data) } @@ -232,5 +283,19 @@ bigDIANNtoMSstatsFormat <- function(input_file, #' @return table of `input` and `annotation` merged by Run column. #' MSstatsAddAnnotationBig <- function(input, annotation) { - dplyr::inner_join(input, annotation, by = "Run") + join_keys <- "Run" + + # Use tbl_vars which works reliably on both Arrow + # datasets, arrow_dplyr_query objects, and data frames + input_cols <- dplyr::tbl_vars(input) + + overlap_cols <- setdiff( + intersect(input_cols, colnames(annotation)), + join_keys + ) + if (length(overlap_cols) > 0) { + input <- dplyr::select(input, -dplyr::all_of(overlap_cols)) + } + + dplyr::inner_join(input, annotation, by = join_keys) } diff --git a/R/utils.R b/R/utils.R index 02e5073..2f122f0 100644 --- a/R/utils.R +++ b/R/utils.R @@ -1,5 +1,21 @@ +#' Build an intermediate output path by prefixing only the basename. +#' +#' Naive `paste0(prefix, output_file_name)` corrupts paths that contain a +#' directory (`subdir/out.csv` → `topN_subdir/out.csv`, +#' `/tmp/out.csv` → `topN_/tmp/out.csv`). Splitting via dirname/basename keeps +#' the directory component intact so intermediate files land beside the final +#' output. +#' +#' @param prefix Character scalar prepended to the basename. +#' @param path Output file path supplied by the caller. +#' @return Character scalar. +#' @keywords internal +.prefixedPath <- function(prefix, path) { + file.path(dirname(path), paste0(prefix, basename(path))) +} + #' Write chunk to file -#' +#' #' @param input Data frame #' @param output_path Path to output file #' @param pos Chunk position diff --git a/tests/testthat/test-clean_DIANN.R b/tests/testthat/test-clean_DIANN.R deleted file mode 100644 index b3b0bdb..0000000 --- a/tests/testthat/test-clean_DIANN.R +++ /dev/null @@ -1,39 +0,0 @@ -library(testthat) -library(mockery) - -context("DIANN cleaning") - -test_that("cleanDIANNChunk passes annotation to MSstatsMakeAnnotation", { - # Prepare data - input_chunk <- data.frame(Run = "Run1", Intensity = 100) - annotation <- data.frame(Run = "Run1", Condition = "A", BioReplicate = 1) - - # Mocks - m_import <- mock(input_chunk) - m_clean <- mock(input_chunk) - m_annotate <- mock(merge(input_chunk, annotation, by = "Run")) - m_write <- mock(NULL) - - stub(cleanDIANNChunk, "MSstatsImport", m_import) - stub(cleanDIANNChunk, "MSstatsClean", m_clean) - stub(cleanDIANNChunk, "MSstatsMakeAnnotation", m_annotate) - stub(cleanDIANNChunk, ".writeChunkToFile", m_write) - - # Execute - cleanDIANNChunk(input_chunk, "output.csv", MBR = TRUE, - quantificationColumn = "Intensity", pos = 1, - annotation = annotation) - - # Verify - expect_called(m_annotate, 1) - - # Check arguments passed to MSstatsMakeAnnotation - args <- mock_args(m_annotate)[[1]] - expect_equal(args[[1]], input_chunk) # Input from Clean - expect_equal(args[[2]], annotation) # Annotation passed through - - # Check that the result of annotation is passed to write - expect_called(m_write, 1) - write_args <- mock_args(m_write)[[1]] - expect_equal(write_args[[1]], merge(input_chunk, annotation, by = "Run")) -}) \ No newline at end of file diff --git a/tests/testthat/test-converters.R b/tests/testthat/test-converters.R index 8fee265..78f6da3 100644 --- a/tests/testthat/test-converters.R +++ b/tests/testthat/test-converters.R @@ -54,9 +54,9 @@ test_that("MSstatsPreprocessBig performs feature selection correctly", { expect_equal(nrow(p2_result), 2) expect_true(all(p2_result$FragmentIon == "fragB")) - # Cleanup - file.remove(input_file) - if (file.exists(output_file)) file.remove(output_file) + # Cleanup — output may be a directory when backend = "arrow" + unlink(input_file, force = TRUE) + unlink(output_file, recursive = TRUE, force = TRUE) }) test_that("bigSpectronauttoMSstatsFormat works correctly", { @@ -89,7 +89,78 @@ test_that("bigSpectronauttoMSstatsFormat works correctly", { expect_equal(nrow(result), 2) expect_true(all(result$FragmentIon == "frag2")) - # Cleanup - if (file.exists(output_file)) file.remove(output_file) - if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) -}) \ No newline at end of file + # Cleanup — outputs may be directories when backend = "arrow" + unlink(output_file, recursive = TRUE, force = TRUE) + unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) +}) + +# test_that("bigDIANNtoMSstatsFormat works with real MSstatsConvert tinytest data", { +# input_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/diann_input.tsv" +# annotation_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/annotation.csv" + +# # Skip test if the local files are not found (e.g. on CI/CD) +# skip_if_not(file.exists(input_file), "Local DIANN input file not found") +# skip_if_not(file.exists(annotation_file), "Local annotation file not found") + +# annot <- read.csv(annotation_file) +# output_file <- "real_diann_output.csv" + +# processed <- bigDIANNtoMSstatsFormat( +# input_file = input_file, +# annotation = annot, +# output_file_name = output_file, +# backend = "arrow", +# MBR = FALSE, +# quantificationColumn = "FragmentQuantCorrected", +# max_feature_count = 100, +# filter_unique_peptides = FALSE, +# aggregate_psms = FALSE, +# filter_few_obs = FALSE +# ) + +# result <- dplyr::collect(processed) + +# expect_true(!is.null(result)) +# expect_true(nrow(result) > 0) + +# # Cleanup — outputs may be directories when backend = "arrow" +# unlink(output_file, recursive = TRUE, force = TRUE) +# unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) +# unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) +# }) + +# test_that("bigDIANNtoMSstatsFormat works with DIANN 2.0 parquet input", { +# input_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/diann_2.0.parquet" +# annotation_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/annotation_diann_2.0.csv" + +# skip_if_not(file.exists(input_file), "Local DIANN 2.0 parquet file not found") +# skip_if_not(file.exists(annotation_file), "Local DIANN 2.0 annotation file not found") +# skip_if_not_installed("arrow") + +# annot <- read.csv(annotation_file) +# output_file <- "diann_2_0_output.csv" + +# processed <- bigDIANNtoMSstatsFormat( +# input_file = input_file, +# annotation = annot, +# output_file_name = output_file, +# backend = "arrow", +# MBR = FALSE, +# quantificationColumn = "auto", +# max_feature_count = 100, +# filter_unique_peptides = FALSE, +# aggregate_psms = FALSE, +# filter_few_obs = FALSE +# ) + +# result <- dplyr::collect(processed) + +# expect_true(!is.null(result)) +# expect_true(nrow(result) > 0) + +# # Cleanup — outputs may be directories when backend = "arrow" +# unlink(output_file, recursive = TRUE, force = TRUE) +# unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) +# unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) +# unlink(paste0("cleaned_", output_file), recursive = TRUE, force = TRUE) +# }) \ No newline at end of file diff --git a/tests/testthat/test-diann_converter.R b/tests/testthat/test-diann_converter.R index a2c15d4..4c1bbb6 100644 --- a/tests/testthat/test-diann_converter.R +++ b/tests/testthat/test-diann_converter.R @@ -153,7 +153,6 @@ test_that("bigDIANNtoMSstatsFormat works with arrow backend", { data.frame(Run = c("r1", "r2"), Protein.Names = "P1", Stripped.Sequence = "PEPTIDE", Modified.Sequence = "PEPTIDE", Precursor.Charge = 2, Fragment.Quant.Corrected = c(2000, 2100), Q.Value = 0.001, Precursor.Mz = 500, Fragment.Info = "y4", Lib.Q.Value = 0.001, Lib.PG.Q.Value = 0.001) ) write.csv(diann_data, input_file, row.names = FALSE) - converted <- bigDIANNtoMSstatsFormat( input_file = input_file, output_file_name = output_file, @@ -168,8 +167,49 @@ test_that("bigDIANNtoMSstatsFormat works with arrow backend", { expect_true(all(c("y1", "y4") %in% unique(result$FragmentIon))) expect_false(any(c("y2", "y3") %in% unique(result$FragmentIon))) - # Cleanup - file.remove(input_file) - if (file.exists(output_file)) file.remove(output_file) - if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) + # Cleanup — outputs may be directories when backend = "arrow" + unlink(input_file, force = TRUE) + unlink(output_file, recursive = TRUE, force = TRUE) + unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) + unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) +}) + +test_that("bigDIANNtoMSstatsFormat works with annotation", { + input_file <- tempfile(fileext = ".csv") + output_file <- basename(tempfile(fileext = ".csv")) + + # Minimal DIANN data + diann_data <- data.frame( + Run = c("r1", "r2"), Protein.Names = "P1", Stripped.Sequence = "PEPTIDE", + Modified.Sequence = "PEPTIDE", Precursor.Charge = 2, + Fragment.Quant.Corrected = c(1000, 1100), Q.Value = 0.001, Precursor.Mz = 500, + Fragment.Info = "y1", Lib.Q.Value = 0.001, Lib.PG.Q.Value = 0.001 + ) + write.csv(diann_data, input_file, row.names = FALSE) + + # Annotation data + annot <- data.frame( + Run = c("r1", "r2"), + Condition = c("Disease", "Healthy"), + BioReplicate = c(1, 2) + ) + + converted <- bigDIANNtoMSstatsFormat( + input_file = input_file, + annotation = annot, + output_file_name = output_file, + backend = "arrow" + ) + result <- dplyr::collect(converted) + + expect_true(all(c("Condition", "BioReplicate") %in% colnames(result))) + expect_equal(result$Condition[result$Run == "r1"], "Disease") + expect_equal(result$Condition[result$Run == "r2"], "Healthy") + + # Cleanup — outputs may be directories when backend = "arrow" + unlink(input_file, force = TRUE) + unlink(output_file, recursive = TRUE, force = TRUE) + unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) + unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) + unlink(paste0("cleaned_", output_file), recursive = TRUE, force = TRUE) }) \ No newline at end of file diff --git a/tests/testthat/topN_preprocess_output.csv b/tests/testthat/topN_preprocess_output.csv deleted file mode 100644 index d9afa98..0000000 --- a/tests/testthat/topN_preprocess_output.csv +++ /dev/null @@ -1,5 +0,0 @@ -"ProteinName","PeptideSequence","PrecursorCharge","FragmentIon","ProductCharge","IsotopeLabelType","Condition","BioReplicate","Run","Intensity" -"P1","PEPTIDE",2,"frag3",1,"L","A",1,"run1",2000 -"P1","PEPTIDE",2,"frag3",1,"L","A",2,"run2",2100 -"P2","PEPTIDE2",3,"fragB",1,"L","B",1,"run1",800 -"P2","PEPTIDE2",3,"fragB",1,"L","B",2,"run2",850 diff --git a/tests/testthat/topN_spectro_output.csv b/tests/testthat/topN_spectro_output.csv deleted file mode 100644 index b103ba2..0000000 --- a/tests/testthat/topN_spectro_output.csv +++ /dev/null @@ -1,3 +0,0 @@ -"ProteinName","PeptideSequence","PrecursorCharge","FragmentIon","ProductCharge","IsotopeLabelType","Condition","BioReplicate","Run","Intensity" -"P1","PEPTIDE",2,"frag2",1,"L","A",1,"run1",2000 -"P1","PEPTIDE",2,"frag2",1,"L","A",2,"run2",2100 diff --git a/tests/testthat/topN_test_diann_output.csv b/tests/testthat/topN_test_diann_output.csv deleted file mode 100644 index 95d41b3..0000000 --- a/tests/testthat/topN_test_diann_output.csv +++ /dev/null @@ -1,5 +0,0 @@ -"ProteinName","PeptideSequence","PeptideModifiedSequence","PrecursorCharge","FragmentIon","ProductCharge","Run","Intensity","IsotopeLabelType" -"P1","PEPTIDE","PEPTIDE",2,"y1",1,"r1",1000,"L" -"P1","PEPTIDE","PEPTIDE",2,"y1",1,"r2",1100,"L" -"P1","PEPTIDE","PEPTIDE",2,"y4",1,"r1",2000,"L" -"P1","PEPTIDE","PEPTIDE",2,"y4",1,"r2",2100,"L"