From 99f61e0211cca29413dc6caf01178e6ebcbd88c9 Mon Sep 17 00:00:00 2001 From: Rudhik1904 Date: Wed, 25 Mar 2026 19:47:24 -0500 Subject: [PATCH 1/7] adding MSstatsConvert::mergeAnnotation step --- R/converters.R | 3 +++ 1 file changed, 3 insertions(+) diff --git a/R/converters.R b/R/converters.R index 03af429..2025e37 100644 --- a/R/converters.R +++ b/R/converters.R @@ -199,6 +199,9 @@ bigDIANNtoMSstatsFormat <- function(input_file, filter_unique_peptides, aggregate_psms, filter_few_obs, remove_annotation, calculateAnomalyScores, anomalyModelFeatures, connection) + # Merge annotation with the preprocessed data + if (!is.null(annotation)) { + msstats_data <- MSstatsConvert::mergeAnnotation(msstats_data, annotation) return(msstats_data) } From 1f702613dbedf2f4fbb4d0a50a04a19028d265f9 Mon Sep 17 00:00:00 2001 From: Rudhik1904 Date: Sun, 29 Mar 2026 20:08:01 -0500 Subject: [PATCH 2/7] Clean function was adding extra columns with NA. These columns were not handled in backend properly. Also, there were other errors in the backend like na.rm <- TRUE should be na.rm = TRUE And arrow::write_csv_arrow doesn't accept lazy Arrow queries Fixing those things and also updating MSstatsAddAnnotationBig to use it in bigDIANNtoMSstatsFormat --- R/backends.R | 25 +++++-- R/clean_DIANN.R | 13 ++-- R/converters.R | 88 ++++++++++++++++++----- tests/testthat/test-clean_DIANN.R | 39 ---------- tests/testthat/test-converters.R | 31 +++++++- tests/testthat/test-diann_converter.R | 40 ++++++++++- tests/testthat/topN_preprocess_output.csv | 5 -- tests/testthat/topN_spectro_output.csv | 3 - tests/testthat/topN_test_diann_output.csv | 5 -- 9 files changed, 168 insertions(+), 81 deletions(-) delete mode 100644 tests/testthat/test-clean_DIANN.R delete mode 100644 tests/testthat/topN_preprocess_output.csv delete mode 100644 tests/testthat/topN_spectro_output.csv delete mode 100644 tests/testthat/topN_test_diann_output.csv diff --git a/R/backends.R b/R/backends.R index 9c9d5fc..46eba9b 100644 --- a/R/backends.R +++ b/R/backends.R @@ -95,13 +95,25 @@ MSstatsPreprocessBigArrow <- function(input_file, anomalyModelFeatures = c()) { input <- arrow::open_dataset(input_file, format = "csv") + # dynamically cast any columns Arrow inferred as 'null' + null_columns <- names(input$schema)[vapply(input$schema$fields, function(f) f$type$ToString() == "null", logical(1))] + if (length(null_columns) > 0) { + for (col in null_columns) { + input <- dplyr::mutate(input, !!rlang::sym(col) := as.numeric(!!rlang::sym(col))) + } + } + + if ("PrecursorMz" %in% input$schema$names) { + input <- dplyr::mutate(input, PrecursorMz = as.numeric(PrecursorMz)) + } + input <- dplyr::mutate(input, Feature = paste(PeptideSequence, PrecursorCharge, FragmentIon, ProductCharge, sep = "_")) feature_counts <- dplyr::group_by(input, ProteinName, Feature) feature_counts <- dplyr::summarize(feature_counts, MeanAbundance = mean(Intensity, - na.rm <- TRUE)) + na.rm = TRUE)) feature_counts <- dplyr::collect(feature_counts) feature_counts <- dplyr::mutate( @@ -112,11 +124,13 @@ MSstatsPreprocessBigArrow <- function(input_file, feature_rank <= max_feature_count) feature_counts <- dplyr::select(feature_counts, -MeanAbundance, -feature_rank) - input <- dplyr::inner_join(input, feature_counts, + input <- dplyr::inner_join(input, feature_counts, by = c("ProteinName", "Feature")) input <- dplyr::select(input, -Feature) - arrow::write_csv_arrow(input, file = paste0("topN_", output_file_name)) + # Materialize the lazy Arrow query before writing to CSV for the intermediate file + input_to_write_topN <- input %>% dplyr::compute() + arrow::write_csv_arrow(input_to_write_topN, file = paste0("topN_", output_file_name)) if (filter_unique_peptides) { pp_df <- dplyr::select(input, ProteinName, PeptideSequence) @@ -126,7 +140,6 @@ MSstatsPreprocessBigArrow <- function(input_file, pp_df <- dplyr::select(pp_df, -NumProteins) input <- dplyr::anti_join(input, pp_df, by = "PeptideSequence") } - if (aggregate_psms) { group_cols <- c("ProteinName", "PeptideSequence", "PrecursorCharge", "FragmentIon", "ProductCharge", "IsotopeLabelType", "Run", @@ -158,7 +171,9 @@ MSstatsPreprocessBigArrow <- function(input_file, input <- dplyr::select(input, -Feature) } - arrow::write_csv_arrow(input, file = output_file_name) + # Materialize the lazy Arrow query before writing to CSV for the final output + input_to_write_final <- input %>% dplyr::compute() + arrow::write_csv_arrow(input_to_write_final, file = output_file_name) input } diff --git a/R/clean_DIANN.R b/R/clean_DIANN.R index c8c7959..7723b99 100644 --- a/R/clean_DIANN.R +++ b/R/clean_DIANN.R @@ -20,12 +20,15 @@ reduceBigDIANN <- function(input_file, output_path, MBR = TRUE, calculateAnomalyScores=FALSE, anomalyModelFeatures=c(), annotation = NULL) { - if (grepl("csv", input_file)) { - delim = "," - } else if (grepl("tsv|xls", input_file)) { - delim = "\t" - } else { + first_line <- readLines(input_file, n = 1) + if (grepl("\t", first_line)) { + delim <- "\t" + } else if (grepl(",", first_line)) { + delim <- "," + } else if (grepl(";", first_line)) { delim <- ";" + } else { + delim <- "\t" } diann_chunk <- function(x, pos) cleanDIANNChunk(x, output_path, MBR, diff --git a/R/converters.R b/R/converters.R index 2025e37..d690cc3 100644 --- a/R/converters.R +++ b/R/converters.R @@ -62,7 +62,7 @@ MSstatsPreprocessBig <- function(input_file, calculateAnomalyScores, anomalyModelFeatures) } else if (backend == "sparklyr") { - MSstatsPreprocessBigSparklyr(connection, input, output_file_name, + MSstatsPreprocessBigSparklyr(connection, input_file, output_file_name, max_feature_count, filter_unique_peptides, aggregate_psms, filter_few_obs, remove_annotation) @@ -96,10 +96,16 @@ bigFragPipetoMSstatsFormat <- function(input_file, output_file_name, filter_few_obs = FALSE, remove_annotation = FALSE, connection = NULL) { - MSstatsPreprocessBig(input_file, output_file_name, - backend, max_feature_count, filter_unique_peptides, - aggregate_psms, filter_few_obs, remove_annotation, - connection = connection) + MSstatsPreprocessBig( + input_file = input_file, + output_file_name = output_file_name, + backend = backend, + max_feature_count = max_feature_count, + filter_unique_peptides = filter_unique_peptides, + aggregate_psms = aggregate_psms, + filter_few_obs = filter_few_obs, + remove_annotation = remove_annotation, + connection = connection) } @@ -145,10 +151,17 @@ bigSpectronauttoMSstatsFormat <- function(input_file, output_file_name, filter_by_qvalue, qvalue_cutoff, calculateAnomalyScores, anomalyModelFeatures) msstats_data <- MSstatsPreprocessBig( - paste0("reduce_output_", output_file_name), - output_file_name, backend, max_feature_count, - aggregate_psms, filter_few_obs, remove_annotation, calculateAnomalyScores, - anomalyModelFeatures, connection) + input_file = paste0("reduce_output_", output_file_name), + output_file_name = output_file_name, + backend = backend, + max_feature_count = max_feature_count, + filter_unique_peptides = filter_unique_peptides, + aggregate_psms = aggregate_psms, + filter_few_obs = filter_few_obs, + remove_annotation = remove_annotation, + calculateAnomalyScores = calculateAnomalyScores, + anomalyModelFeatures = anomalyModelFeatures, + connection = connection) return(msstats_data) @@ -192,17 +205,44 @@ bigDIANNtoMSstatsFormat <- function(input_file, calculateAnomalyScores, anomalyModelFeatures, annotation) + reduced_file <- paste0("reduce_output_", output_file_name) + reduced <- arrow::open_dataset(reduced_file, format = "csv") + + # Identify columns where Arrow inferred 'null' type (all values NA) + null_cols <- names(reduced$schema)[ + vapply(reduced$schema$fields, function(f) f$type$ToString() == "null", logical(1)) + ] + + if (length(null_cols) > 0) { + # Drop null-typed columns using a lazy select (no data loaded into memory) + reduced <- dplyr::select(reduced, -dplyr::all_of(null_cols)) + + # Write back using Arrow's streaming writer — stays out-of-memory. + # write_dataset creates a directory, but open_dataset can read + # directories just as easily as single files. + cleaned_file <- paste0("cleaned_", output_file_name) + arrow::write_dataset(reduced, cleaned_file, format = "csv") + reduced_file <- cleaned_file + } + # Preprocess the cleaned data (feature selection, etc.) msstats_data <- MSstatsPreprocessBig( - paste0("reduce_output_", output_file_name), - output_file_name, backend, max_feature_count, - filter_unique_peptides, aggregate_psms, filter_few_obs, - remove_annotation, calculateAnomalyScores, - anomalyModelFeatures, connection) + input_file = reduced_file, # paste0("reduce_output_", output_file_name) + output_file_name = output_file_name, + backend = backend, + max_feature_count = max_feature_count, + filter_unique_peptides = filter_unique_peptides, + aggregate_psms = aggregate_psms, + filter_few_obs = filter_few_obs, + remove_annotation = remove_annotation, + calculateAnomalyScores = calculateAnomalyScores, + anomalyModelFeatures = anomalyModelFeatures, + connection = connection) + # Merge annotation with the preprocessed data if (!is.null(annotation)) { - msstats_data <- MSstatsConvert::mergeAnnotation(msstats_data, annotation) - + msstats_data <- MSstatsAddAnnotationBig(msstats_data, annotation) + } return(msstats_data) } @@ -235,5 +275,19 @@ bigDIANNtoMSstatsFormat <- function(input_file, #' @return table of `input` and `annotation` merged by Run column. #' MSstatsAddAnnotationBig <- function(input, annotation) { - dplyr::inner_join(input, annotation, by = "Run") + join_keys <- "Run" + + # Use tbl_vars which works reliably on both Arrow + # datasets, arrow_dplyr_query objects, and data frames + input_cols <- dplyr::tbl_vars(input) + + overlap_cols <- setdiff( + intersect(input_cols, colnames(annotation)), + join_keys + ) + if (length(overlap_cols) > 0) { + input <- dplyr::select(input, -dplyr::all_of(overlap_cols)) + } + + dplyr::inner_join(input, annotation, by = join_keys) } diff --git a/tests/testthat/test-clean_DIANN.R b/tests/testthat/test-clean_DIANN.R deleted file mode 100644 index b3b0bdb..0000000 --- a/tests/testthat/test-clean_DIANN.R +++ /dev/null @@ -1,39 +0,0 @@ -library(testthat) -library(mockery) - -context("DIANN cleaning") - -test_that("cleanDIANNChunk passes annotation to MSstatsMakeAnnotation", { - # Prepare data - input_chunk <- data.frame(Run = "Run1", Intensity = 100) - annotation <- data.frame(Run = "Run1", Condition = "A", BioReplicate = 1) - - # Mocks - m_import <- mock(input_chunk) - m_clean <- mock(input_chunk) - m_annotate <- mock(merge(input_chunk, annotation, by = "Run")) - m_write <- mock(NULL) - - stub(cleanDIANNChunk, "MSstatsImport", m_import) - stub(cleanDIANNChunk, "MSstatsClean", m_clean) - stub(cleanDIANNChunk, "MSstatsMakeAnnotation", m_annotate) - stub(cleanDIANNChunk, ".writeChunkToFile", m_write) - - # Execute - cleanDIANNChunk(input_chunk, "output.csv", MBR = TRUE, - quantificationColumn = "Intensity", pos = 1, - annotation = annotation) - - # Verify - expect_called(m_annotate, 1) - - # Check arguments passed to MSstatsMakeAnnotation - args <- mock_args(m_annotate)[[1]] - expect_equal(args[[1]], input_chunk) # Input from Clean - expect_equal(args[[2]], annotation) # Annotation passed through - - # Check that the result of annotation is passed to write - expect_called(m_write, 1) - write_args <- mock_args(m_write)[[1]] - expect_equal(write_args[[1]], merge(input_chunk, annotation, by = "Run")) -}) \ No newline at end of file diff --git a/tests/testthat/test-converters.R b/tests/testthat/test-converters.R index 8fee265..7f28fef 100644 --- a/tests/testthat/test-converters.R +++ b/tests/testthat/test-converters.R @@ -92,4 +92,33 @@ test_that("bigSpectronauttoMSstatsFormat works correctly", { # Cleanup if (file.exists(output_file)) file.remove(output_file) if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) -}) \ No newline at end of file +}) + +# test_that("bigDIANNtoMSstatsFormat works with real MSstatsConvert tinytest data", { +# input_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/diann_input.tsv" +# annotation_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/annotation.csv" +# +# # Skip test if the local files are not found (e.g. on CI/CD) +# skip_if_not(file.exists(input_file), "Local DIANN input file not found") +# skip_if_not(file.exists(annotation_file), "Local annotation file not found") +# +# annot <- read.csv(annotation_file) +# output_file <- "real_diann_output.csv" +# +# processed <- bigDIANNtoMSstatsFormat( +# input_file = input_file, +# annotation = annot, +# output_file_name = output_file, +# backend = "arrow" +# ) +# +# result <- dplyr::collect(processed) +# +# expect_true(!is.null(result)) +# expect_true(nrow(result) > 0) +# +# # Cleanup +# if (file.exists(output_file)) file.remove(output_file) +# if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) +# if (file.exists(paste0("topN_", output_file))) file.remove(paste0("topN_", output_file)) +# }) \ No newline at end of file diff --git a/tests/testthat/test-diann_converter.R b/tests/testthat/test-diann_converter.R index a2c15d4..176ba72 100644 --- a/tests/testthat/test-diann_converter.R +++ b/tests/testthat/test-diann_converter.R @@ -153,7 +153,6 @@ test_that("bigDIANNtoMSstatsFormat works with arrow backend", { data.frame(Run = c("r1", "r2"), Protein.Names = "P1", Stripped.Sequence = "PEPTIDE", Modified.Sequence = "PEPTIDE", Precursor.Charge = 2, Fragment.Quant.Corrected = c(2000, 2100), Q.Value = 0.001, Precursor.Mz = 500, Fragment.Info = "y4", Lib.Q.Value = 0.001, Lib.PG.Q.Value = 0.001) ) write.csv(diann_data, input_file, row.names = FALSE) - converted <- bigDIANNtoMSstatsFormat( input_file = input_file, output_file_name = output_file, @@ -172,4 +171,43 @@ test_that("bigDIANNtoMSstatsFormat works with arrow backend", { file.remove(input_file) if (file.exists(output_file)) file.remove(output_file) if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) +}) + +test_that("bigDIANNtoMSstatsFormat works with annotation", { + input_file <- tempfile(fileext = ".csv") + output_file <- basename(tempfile(fileext = ".csv")) + + # Minimal DIANN data + diann_data <- data.frame( + Run = c("r1", "r2"), Protein.Names = "P1", Stripped.Sequence = "PEPTIDE", + Modified.Sequence = "PEPTIDE", Precursor.Charge = 2, + Fragment.Quant.Corrected = c(1000, 1100), Q.Value = 0.001, Precursor.Mz = 500, + Fragment.Info = "y1", Lib.Q.Value = 0.001, Lib.PG.Q.Value = 0.001 + ) + write.csv(diann_data, input_file, row.names = FALSE) + + # Annotation data + annot <- data.frame( + Run = c("r1", "r2"), + Condition = c("Disease", "Healthy"), + BioReplicate = c(1, 2) + ) + + converted <- bigDIANNtoMSstatsFormat( + input_file = input_file, + annotation = annot, + output_file_name = output_file, + backend = "arrow" + ) + result <- dplyr::collect(converted) + + expect_true(all(c("Condition", "BioReplicate") %in% colnames(result))) + expect_equal(result$Condition[result$Run == "r1"], "Disease") + expect_equal(result$Condition[result$Run == "r2"], "Healthy") + + # Cleanup + file.remove(input_file) + if (file.exists(output_file)) file.remove(output_file) + if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) + if (dir.exists(paste0("cleaned_", output_file))) unlink(paste0("cleaned_", output_file), recursive = TRUE) }) \ No newline at end of file diff --git a/tests/testthat/topN_preprocess_output.csv b/tests/testthat/topN_preprocess_output.csv deleted file mode 100644 index d9afa98..0000000 --- a/tests/testthat/topN_preprocess_output.csv +++ /dev/null @@ -1,5 +0,0 @@ -"ProteinName","PeptideSequence","PrecursorCharge","FragmentIon","ProductCharge","IsotopeLabelType","Condition","BioReplicate","Run","Intensity" -"P1","PEPTIDE",2,"frag3",1,"L","A",1,"run1",2000 -"P1","PEPTIDE",2,"frag3",1,"L","A",2,"run2",2100 -"P2","PEPTIDE2",3,"fragB",1,"L","B",1,"run1",800 -"P2","PEPTIDE2",3,"fragB",1,"L","B",2,"run2",850 diff --git a/tests/testthat/topN_spectro_output.csv b/tests/testthat/topN_spectro_output.csv deleted file mode 100644 index b103ba2..0000000 --- a/tests/testthat/topN_spectro_output.csv +++ /dev/null @@ -1,3 +0,0 @@ -"ProteinName","PeptideSequence","PrecursorCharge","FragmentIon","ProductCharge","IsotopeLabelType","Condition","BioReplicate","Run","Intensity" -"P1","PEPTIDE",2,"frag2",1,"L","A",1,"run1",2000 -"P1","PEPTIDE",2,"frag2",1,"L","A",2,"run2",2100 diff --git a/tests/testthat/topN_test_diann_output.csv b/tests/testthat/topN_test_diann_output.csv deleted file mode 100644 index 95d41b3..0000000 --- a/tests/testthat/topN_test_diann_output.csv +++ /dev/null @@ -1,5 +0,0 @@ -"ProteinName","PeptideSequence","PeptideModifiedSequence","PrecursorCharge","FragmentIon","ProductCharge","Run","Intensity","IsotopeLabelType" -"P1","PEPTIDE","PEPTIDE",2,"y1",1,"r1",1000,"L" -"P1","PEPTIDE","PEPTIDE",2,"y1",1,"r2",1100,"L" -"P1","PEPTIDE","PEPTIDE",2,"y4",1,"r1",2000,"L" -"P1","PEPTIDE","PEPTIDE",2,"y4",1,"r2",2100,"L" From 31921e854b619509bca9606292c83314890c9353 Mon Sep 17 00:00:00 2001 From: Rudhik1904 Date: Sun, 3 May 2026 16:14:52 -0500 Subject: [PATCH 3/7] Adding support for Parquet diann files. --- R/backends.R | 8 +-- R/clean_DIANN.R | 46 +++++++++++---- tests/testthat/test-converters.R | 98 +++++++++++++++++++++++--------- 3 files changed, 108 insertions(+), 44 deletions(-) diff --git a/R/backends.R b/R/backends.R index 46eba9b..cd2e7c2 100644 --- a/R/backends.R +++ b/R/backends.R @@ -128,9 +128,7 @@ MSstatsPreprocessBigArrow <- function(input_file, by = c("ProteinName", "Feature")) input <- dplyr::select(input, -Feature) - # Materialize the lazy Arrow query before writing to CSV for the intermediate file - input_to_write_topN <- input %>% dplyr::compute() - arrow::write_csv_arrow(input_to_write_topN, file = paste0("topN_", output_file_name)) + arrow::write_dataset(input, paste0("topN_", output_file_name), format = "csv") if (filter_unique_peptides) { pp_df <- dplyr::select(input, ProteinName, PeptideSequence) @@ -171,9 +169,7 @@ MSstatsPreprocessBigArrow <- function(input_file, input <- dplyr::select(input, -Feature) } - # Materialize the lazy Arrow query before writing to CSV for the final output - input_to_write_final <- input %>% dplyr::compute() - arrow::write_csv_arrow(input_to_write_final, file = output_file_name) + arrow::write_dataset(input, output_file_name, format = "csv") input } diff --git a/R/clean_DIANN.R b/R/clean_DIANN.R index 7723b99..4332ddf 100644 --- a/R/clean_DIANN.R +++ b/R/clean_DIANN.R @@ -17,9 +17,43 @@ reduceBigDIANN <- function(input_file, output_path, MBR = TRUE, global_qvalue_cutoff = 0.01, qvalue_cutoff = 0.01, pg_qvalue_cutoff = 0.01, - calculateAnomalyScores=FALSE, + calculateAnomalyScores=FALSE, anomalyModelFeatures=c(), annotation = NULL) { + # Per-chunk callback shared by both the parquet and delimited-text paths. + # `pos` drives .writeChunkToFile: pos == 1 overwrites, pos > 1 appends. + diann_chunk <- function(x, pos) cleanDIANNChunk(x, output_path, MBR, + quantificationColumn, pos, + global_qvalue_cutoff, + qvalue_cutoff, + pg_qvalue_cutoff, + calculateAnomalyScores, + anomalyModelFeatures, + annotation) + + # Parquet branch (DIANN 2.0+): stream record batches via arrow so the file + # is never fully materialised. read_delim_chunked can't read parquet bytes. + if (tolower(tools::file_ext(input_file)) == "parquet") { + # Lazy handle to the parquet file — no data loaded yet. + ds <- arrow::open_dataset(input_file, format = "parquet") + # Scanner + RecordBatchReader yields one batch at a time on demand. + # batch_size matches the delimited-text path's 1M-row chunks; row-group + # boundaries in the parquet may cap individual batches below this. + scanner <- arrow::Scanner$create(ds, batch_size = 1e6) + reader <- scanner$ToRecordBatchReader() + pos <- 1 + repeat { + batch <- reader$read_next_batch() + if (is.null(batch)) break # exhausted + # Materialise just this batch, run it through the shared cleaner. + diann_chunk(as.data.frame(batch), pos) + pos <- pos + 1 + } + return(invisible(NULL)) + } + + # Delimited-text branch (DIANN 1.x TSV/CSV): sniff the delimiter from the + # first line, defaulting to tab when nothing matches. first_line <- readLines(input_file, n = 1) if (grepl("\t", first_line)) { delim <- "\t" @@ -30,16 +64,8 @@ reduceBigDIANN <- function(input_file, output_path, MBR = TRUE, } else { delim <- "\t" } - - diann_chunk <- function(x, pos) cleanDIANNChunk(x, output_path, MBR, - quantificationColumn, pos, - global_qvalue_cutoff, - qvalue_cutoff, - pg_qvalue_cutoff, - calculateAnomalyScores, - anomalyModelFeatures, - annotation) + # Stream the file in 1M-row chunks, invoking diann_chunk for each. readr::read_delim_chunked(input_file, readr::DataFrameCallback$new(diann_chunk), delim = delim, diff --git a/tests/testthat/test-converters.R b/tests/testthat/test-converters.R index 7f28fef..918df1a 100644 --- a/tests/testthat/test-converters.R +++ b/tests/testthat/test-converters.R @@ -94,31 +94,73 @@ test_that("bigSpectronauttoMSstatsFormat works correctly", { if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) }) -# test_that("bigDIANNtoMSstatsFormat works with real MSstatsConvert tinytest data", { -# input_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/diann_input.tsv" -# annotation_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/annotation.csv" -# -# # Skip test if the local files are not found (e.g. on CI/CD) -# skip_if_not(file.exists(input_file), "Local DIANN input file not found") -# skip_if_not(file.exists(annotation_file), "Local annotation file not found") -# -# annot <- read.csv(annotation_file) -# output_file <- "real_diann_output.csv" -# -# processed <- bigDIANNtoMSstatsFormat( -# input_file = input_file, -# annotation = annot, -# output_file_name = output_file, -# backend = "arrow" -# ) -# -# result <- dplyr::collect(processed) -# -# expect_true(!is.null(result)) -# expect_true(nrow(result) > 0) -# -# # Cleanup -# if (file.exists(output_file)) file.remove(output_file) -# if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) -# if (file.exists(paste0("topN_", output_file))) file.remove(paste0("topN_", output_file)) -# }) \ No newline at end of file +test_that("bigDIANNtoMSstatsFormat works with real MSstatsConvert tinytest data", { + input_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/diann_input.tsv" + annotation_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/annotation.csv" + + # Skip test if the local files are not found (e.g. on CI/CD) + skip_if_not(file.exists(input_file), "Local DIANN input file not found") + skip_if_not(file.exists(annotation_file), "Local annotation file not found") + + annot <- read.csv(annotation_file) + output_file <- "real_diann_output.csv" + + processed <- bigDIANNtoMSstatsFormat( + input_file = input_file, + annotation = annot, + output_file_name = output_file, + backend = "arrow", + MBR = FALSE, + quantificationColumn = "FragmentQuantCorrected", + max_feature_count = 100, + filter_unique_peptides = FALSE, + aggregate_psms = FALSE, + filter_few_obs = FALSE + ) + + result <- dplyr::collect(processed) + + expect_true(!is.null(result)) + expect_true(nrow(result) > 0) + + # Cleanup + if (file.exists(output_file)) file.remove(output_file) + if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) + if (file.exists(paste0("topN_", output_file))) file.remove(paste0("topN_", output_file)) +}) + +test_that("bigDIANNtoMSstatsFormat works with DIANN 2.0 parquet input", { + input_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/diann_2.0.parquet" + annotation_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/annotation_diann_2.0.csv" + + skip_if_not(file.exists(input_file), "Local DIANN 2.0 parquet file not found") + skip_if_not(file.exists(annotation_file), "Local DIANN 2.0 annotation file not found") + skip_if_not_installed("arrow") + + annot <- read.csv(annotation_file) + output_file <- "diann_2_0_output.csv" + + processed <- bigDIANNtoMSstatsFormat( + input_file = input_file, + annotation = annot, + output_file_name = output_file, + backend = "arrow", + MBR = FALSE, + quantificationColumn = "auto", + max_feature_count = 100, + filter_unique_peptides = FALSE, + aggregate_psms = FALSE, + filter_few_obs = FALSE + ) + + result <- dplyr::collect(processed) + + expect_true(!is.null(result)) + expect_true(nrow(result) > 0) + + # Cleanup + if (file.exists(output_file)) file.remove(output_file) + if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) + if (file.exists(paste0("topN_", output_file))) file.remove(paste0("topN_", output_file)) + if (file.exists(paste0("cleaned_", output_file))) unlink(paste0("cleaned_", output_file), recursive = TRUE) +}) \ No newline at end of file From c8aae0d276a75e25cf127ddf0a563d398129e2e6 Mon Sep 17 00:00:00 2001 From: Rudhik1904 Date: Sun, 3 May 2026 16:20:24 -0500 Subject: [PATCH 4/7] fixing warnings in test --- tests/testthat/test-converters.R | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tests/testthat/test-converters.R b/tests/testthat/test-converters.R index 918df1a..dd3b838 100644 --- a/tests/testthat/test-converters.R +++ b/tests/testthat/test-converters.R @@ -54,9 +54,9 @@ test_that("MSstatsPreprocessBig performs feature selection correctly", { expect_equal(nrow(p2_result), 2) expect_true(all(p2_result$FragmentIon == "fragB")) - # Cleanup - file.remove(input_file) - if (file.exists(output_file)) file.remove(output_file) + # Cleanup — output may be a directory when backend = "arrow" + unlink(input_file, force = TRUE) + unlink(output_file, recursive = TRUE, force = TRUE) }) test_that("bigSpectronauttoMSstatsFormat works correctly", { @@ -89,9 +89,9 @@ test_that("bigSpectronauttoMSstatsFormat works correctly", { expect_equal(nrow(result), 2) expect_true(all(result$FragmentIon == "frag2")) - # Cleanup - if (file.exists(output_file)) file.remove(output_file) - if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) + # Cleanup — outputs may be directories when backend = "arrow" + unlink(output_file, recursive = TRUE, force = TRUE) + unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) }) test_that("bigDIANNtoMSstatsFormat works with real MSstatsConvert tinytest data", { @@ -123,10 +123,10 @@ test_that("bigDIANNtoMSstatsFormat works with real MSstatsConvert tinytest data" expect_true(!is.null(result)) expect_true(nrow(result) > 0) - # Cleanup - if (file.exists(output_file)) file.remove(output_file) - if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) - if (file.exists(paste0("topN_", output_file))) file.remove(paste0("topN_", output_file)) + # Cleanup — outputs may be directories when backend = "arrow" + unlink(output_file, recursive = TRUE, force = TRUE) + unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) + unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) }) test_that("bigDIANNtoMSstatsFormat works with DIANN 2.0 parquet input", { @@ -158,9 +158,9 @@ test_that("bigDIANNtoMSstatsFormat works with DIANN 2.0 parquet input", { expect_true(!is.null(result)) expect_true(nrow(result) > 0) - # Cleanup - if (file.exists(output_file)) file.remove(output_file) - if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) - if (file.exists(paste0("topN_", output_file))) file.remove(paste0("topN_", output_file)) - if (file.exists(paste0("cleaned_", output_file))) unlink(paste0("cleaned_", output_file), recursive = TRUE) + # Cleanup — outputs may be directories when backend = "arrow" + unlink(output_file, recursive = TRUE, force = TRUE) + unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) + unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) + unlink(paste0("cleaned_", output_file), recursive = TRUE, force = TRUE) }) \ No newline at end of file From 89a11719e7914eccb1f2a6c1ac472987d97e4349 Mon Sep 17 00:00:00 2001 From: Rudhik1904 Date: Sun, 3 May 2026 16:23:01 -0500 Subject: [PATCH 5/7] commenting out the local run test --- tests/testthat/test-converters.R | 140 +++++++++++++++---------------- 1 file changed, 70 insertions(+), 70 deletions(-) diff --git a/tests/testthat/test-converters.R b/tests/testthat/test-converters.R index dd3b838..78f6da3 100644 --- a/tests/testthat/test-converters.R +++ b/tests/testthat/test-converters.R @@ -94,73 +94,73 @@ test_that("bigSpectronauttoMSstatsFormat works correctly", { unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) }) -test_that("bigDIANNtoMSstatsFormat works with real MSstatsConvert tinytest data", { - input_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/diann_input.tsv" - annotation_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/annotation.csv" - - # Skip test if the local files are not found (e.g. on CI/CD) - skip_if_not(file.exists(input_file), "Local DIANN input file not found") - skip_if_not(file.exists(annotation_file), "Local annotation file not found") - - annot <- read.csv(annotation_file) - output_file <- "real_diann_output.csv" - - processed <- bigDIANNtoMSstatsFormat( - input_file = input_file, - annotation = annot, - output_file_name = output_file, - backend = "arrow", - MBR = FALSE, - quantificationColumn = "FragmentQuantCorrected", - max_feature_count = 100, - filter_unique_peptides = FALSE, - aggregate_psms = FALSE, - filter_few_obs = FALSE - ) - - result <- dplyr::collect(processed) - - expect_true(!is.null(result)) - expect_true(nrow(result) > 0) - - # Cleanup — outputs may be directories when backend = "arrow" - unlink(output_file, recursive = TRUE, force = TRUE) - unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) - unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) -}) - -test_that("bigDIANNtoMSstatsFormat works with DIANN 2.0 parquet input", { - input_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/diann_2.0.parquet" - annotation_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/annotation_diann_2.0.csv" - - skip_if_not(file.exists(input_file), "Local DIANN 2.0 parquet file not found") - skip_if_not(file.exists(annotation_file), "Local DIANN 2.0 annotation file not found") - skip_if_not_installed("arrow") - - annot <- read.csv(annotation_file) - output_file <- "diann_2_0_output.csv" - - processed <- bigDIANNtoMSstatsFormat( - input_file = input_file, - annotation = annot, - output_file_name = output_file, - backend = "arrow", - MBR = FALSE, - quantificationColumn = "auto", - max_feature_count = 100, - filter_unique_peptides = FALSE, - aggregate_psms = FALSE, - filter_few_obs = FALSE - ) - - result <- dplyr::collect(processed) - - expect_true(!is.null(result)) - expect_true(nrow(result) > 0) - - # Cleanup — outputs may be directories when backend = "arrow" - unlink(output_file, recursive = TRUE, force = TRUE) - unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) - unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) - unlink(paste0("cleaned_", output_file), recursive = TRUE, force = TRUE) -}) \ No newline at end of file +# test_that("bigDIANNtoMSstatsFormat works with real MSstatsConvert tinytest data", { +# input_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/diann_input.tsv" +# annotation_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/annotation.csv" + +# # Skip test if the local files are not found (e.g. on CI/CD) +# skip_if_not(file.exists(input_file), "Local DIANN input file not found") +# skip_if_not(file.exists(annotation_file), "Local annotation file not found") + +# annot <- read.csv(annotation_file) +# output_file <- "real_diann_output.csv" + +# processed <- bigDIANNtoMSstatsFormat( +# input_file = input_file, +# annotation = annot, +# output_file_name = output_file, +# backend = "arrow", +# MBR = FALSE, +# quantificationColumn = "FragmentQuantCorrected", +# max_feature_count = 100, +# filter_unique_peptides = FALSE, +# aggregate_psms = FALSE, +# filter_few_obs = FALSE +# ) + +# result <- dplyr::collect(processed) + +# expect_true(!is.null(result)) +# expect_true(nrow(result) > 0) + +# # Cleanup — outputs may be directories when backend = "arrow" +# unlink(output_file, recursive = TRUE, force = TRUE) +# unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) +# unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) +# }) + +# test_that("bigDIANNtoMSstatsFormat works with DIANN 2.0 parquet input", { +# input_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/diann_2.0.parquet" +# annotation_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/annotation_diann_2.0.csv" + +# skip_if_not(file.exists(input_file), "Local DIANN 2.0 parquet file not found") +# skip_if_not(file.exists(annotation_file), "Local DIANN 2.0 annotation file not found") +# skip_if_not_installed("arrow") + +# annot <- read.csv(annotation_file) +# output_file <- "diann_2_0_output.csv" + +# processed <- bigDIANNtoMSstatsFormat( +# input_file = input_file, +# annotation = annot, +# output_file_name = output_file, +# backend = "arrow", +# MBR = FALSE, +# quantificationColumn = "auto", +# max_feature_count = 100, +# filter_unique_peptides = FALSE, +# aggregate_psms = FALSE, +# filter_few_obs = FALSE +# ) + +# result <- dplyr::collect(processed) + +# expect_true(!is.null(result)) +# expect_true(nrow(result) > 0) + +# # Cleanup — outputs may be directories when backend = "arrow" +# unlink(output_file, recursive = TRUE, force = TRUE) +# unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) +# unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) +# unlink(paste0("cleaned_", output_file), recursive = TRUE, force = TRUE) +# }) \ No newline at end of file From 7e8329086c5bc7b85e90173060a33d0db56b74d3 Mon Sep 17 00:00:00 2001 From: Rudhik1904 Date: Mon, 4 May 2026 20:39:29 -0500 Subject: [PATCH 6/7] fixing code rabbit comments --- R/backends.R | 2 +- R/converters.R | 52 +++++++++++++++++++++++++++++--------------------- R/utils.R | 18 ++++++++++++++++- 3 files changed, 48 insertions(+), 24 deletions(-) diff --git a/R/backends.R b/R/backends.R index cd2e7c2..f8d5382 100644 --- a/R/backends.R +++ b/R/backends.R @@ -128,7 +128,7 @@ MSstatsPreprocessBigArrow <- function(input_file, by = c("ProteinName", "Feature")) input <- dplyr::select(input, -Feature) - arrow::write_dataset(input, paste0("topN_", output_file_name), format = "csv") + arrow::write_dataset(input, .prefixedPath("topN_", output_file_name), format = "csv") if (filter_unique_peptides) { pp_df <- dplyr::select(input, ProteinName, PeptideSequence) diff --git a/R/converters.R b/R/converters.R index d690cc3..13b4383 100644 --- a/R/converters.R +++ b/R/converters.R @@ -146,12 +146,13 @@ bigSpectronauttoMSstatsFormat <- function(input_file, output_file_name, calculateAnomalyScores=FALSE, anomalyModelFeatures=c(), connection = NULL) { - reduceBigSpectronaut(input_file, paste0("reduce_output_", output_file_name), + reduced_file <- .prefixedPath("reduce_output_", output_file_name) + reduceBigSpectronaut(input_file, reduced_file, intensity, filter_by_excluded, filter_by_identified, filter_by_qvalue, qvalue_cutoff, calculateAnomalyScores, anomalyModelFeatures) msstats_data <- MSstatsPreprocessBig( - input_file = paste0("reduce_output_", output_file_name), + input_file = reduced_file, output_file_name = output_file_name, backend = backend, max_feature_count = max_feature_count, @@ -197,52 +198,59 @@ bigDIANNtoMSstatsFormat <- function(input_file, connection = NULL) { # Reduce and clean the DIANN report file in chunks - reduceBigDIANN(input_file, - paste0("reduce_output_", output_file_name), + reduced_file <- .prefixedPath("reduce_output_", output_file_name) + reduceBigDIANN(input_file, + reduced_file, MBR, quantificationColumn, - global_qvalue_cutoff, qvalue_cutoff, pg_qvalue_cutoff, + global_qvalue_cutoff, qvalue_cutoff, pg_qvalue_cutoff, calculateAnomalyScores, anomalyModelFeatures, annotation) - - reduced_file <- paste0("reduce_output_", output_file_name) + reduced <- arrow::open_dataset(reduced_file, format = "csv") # Identify columns where Arrow inferred 'null' type (all values NA) null_cols <- names(reduced$schema)[ vapply(reduced$schema$fields, function(f) f$type$ToString() == "null", logical(1)) ] - + if (length(null_cols) > 0) { # Drop null-typed columns using a lazy select (no data loaded into memory) reduced <- dplyr::select(reduced, -dplyr::all_of(null_cols)) - + # Write back using Arrow's streaming writer — stays out-of-memory. # write_dataset creates a directory, but open_dataset can read # directories just as easily as single files. - cleaned_file <- paste0("cleaned_", output_file_name) + cleaned_file <- .prefixedPath("cleaned_", output_file_name) arrow::write_dataset(reduced, cleaned_file, format = "csv") reduced_file <- cleaned_file } - + # Preprocess the cleaned data (feature selection, etc.) msstats_data <- MSstatsPreprocessBig( - input_file = reduced_file, # paste0("reduce_output_", output_file_name) - output_file_name = output_file_name, - backend = backend, + input_file = reduced_file, + output_file_name = output_file_name, + backend = backend, max_feature_count = max_feature_count, - filter_unique_peptides = filter_unique_peptides, - aggregate_psms = aggregate_psms, - filter_few_obs = filter_few_obs, - remove_annotation = remove_annotation, - calculateAnomalyScores = calculateAnomalyScores, - anomalyModelFeatures = anomalyModelFeatures, + filter_unique_peptides = filter_unique_peptides, + aggregate_psms = aggregate_psms, + filter_few_obs = filter_few_obs, + remove_annotation = remove_annotation, + calculateAnomalyScores = calculateAnomalyScores, + anomalyModelFeatures = anomalyModelFeatures, connection = connection) - - # Merge annotation with the preprocessed data + + # Merge annotation with the preprocessed data and persist the merge so + # callers reopening output_file_name see Condition/BioReplicate. The arrow + # rewrite stays lazy — the underlying source is reduced_file, not + # output_file_name, so we can safely overwrite the directory we just wrote. if (!is.null(annotation)) { msstats_data <- MSstatsAddAnnotationBig(msstats_data, annotation) + if (backend == "arrow") { + unlink(output_file_name, recursive = TRUE, force = TRUE) + arrow::write_dataset(msstats_data, output_file_name, format = "csv") } + } return(msstats_data) } diff --git a/R/utils.R b/R/utils.R index 02e5073..2f122f0 100644 --- a/R/utils.R +++ b/R/utils.R @@ -1,5 +1,21 @@ +#' Build an intermediate output path by prefixing only the basename. +#' +#' Naive `paste0(prefix, output_file_name)` corrupts paths that contain a +#' directory (`subdir/out.csv` → `topN_subdir/out.csv`, +#' `/tmp/out.csv` → `topN_/tmp/out.csv`). Splitting via dirname/basename keeps +#' the directory component intact so intermediate files land beside the final +#' output. +#' +#' @param prefix Character scalar prepended to the basename. +#' @param path Output file path supplied by the caller. +#' @return Character scalar. +#' @keywords internal +.prefixedPath <- function(prefix, path) { + file.path(dirname(path), paste0(prefix, basename(path))) +} + #' Write chunk to file -#' +#' #' @param input Data frame #' @param output_path Path to output file #' @param pos Chunk position From 98caf279af834782af20a983b058486df5d89941 Mon Sep 17 00:00:00 2001 From: Rudhik1904 Date: Mon, 4 May 2026 20:41:35 -0500 Subject: [PATCH 7/7] cleanup --- tests/testthat/test-diann_converter.R | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/tests/testthat/test-diann_converter.R b/tests/testthat/test-diann_converter.R index 176ba72..4c1bbb6 100644 --- a/tests/testthat/test-diann_converter.R +++ b/tests/testthat/test-diann_converter.R @@ -167,10 +167,11 @@ test_that("bigDIANNtoMSstatsFormat works with arrow backend", { expect_true(all(c("y1", "y4") %in% unique(result$FragmentIon))) expect_false(any(c("y2", "y3") %in% unique(result$FragmentIon))) - # Cleanup - file.remove(input_file) - if (file.exists(output_file)) file.remove(output_file) - if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) + # Cleanup — outputs may be directories when backend = "arrow" + unlink(input_file, force = TRUE) + unlink(output_file, recursive = TRUE, force = TRUE) + unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) + unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) }) test_that("bigDIANNtoMSstatsFormat works with annotation", { @@ -205,9 +206,10 @@ test_that("bigDIANNtoMSstatsFormat works with annotation", { expect_equal(result$Condition[result$Run == "r1"], "Disease") expect_equal(result$Condition[result$Run == "r2"], "Healthy") - # Cleanup - file.remove(input_file) - if (file.exists(output_file)) file.remove(output_file) - if (file.exists(paste0("reduce_output_", output_file))) file.remove(paste0("reduce_output_", output_file)) - if (dir.exists(paste0("cleaned_", output_file))) unlink(paste0("cleaned_", output_file), recursive = TRUE) + # Cleanup — outputs may be directories when backend = "arrow" + unlink(input_file, force = TRUE) + unlink(output_file, recursive = TRUE, force = TRUE) + unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) + unlink(paste0("topN_", output_file), recursive = TRUE, force = TRUE) + unlink(paste0("cleaned_", output_file), recursive = TRUE, force = TRUE) }) \ No newline at end of file