From 76fc25a69cb1a4e0acb0d4ecc99ac547d3c0d8a4 Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Tue, 24 Feb 2026 16:12:52 -0500 Subject: [PATCH 01/15] avoid memory reallocations for new reads --- gfalibs | 2 +- include/reads.h | 2 +- src/reads.cpp | 546 ++++++++++++++++++++++++++---------------------- 3 files changed, 304 insertions(+), 246 deletions(-) diff --git a/gfalibs b/gfalibs index 47fefb5..f1d26fa 160000 --- a/gfalibs +++ b/gfalibs @@ -1 +1 @@ -Subproject commit 47fefb5e7fd0628605c546a11c9fef240855ae4d +Subproject commit f1d26fa5b6723692aa6c0630ec9a3afde8a1ee50 diff --git a/include/reads.h b/include/reads.h index 7f96bf8..e4e9acf 100644 --- a/include/reads.h +++ b/include/reads.h @@ -222,7 +222,7 @@ class InReads { inline bool applyFilter(uint64_t size, float avgQuality); // Processing - float computeAvgQuality(std::string& sequenceQuality); + float computeAvgQuality(const std::string& sequenceQuality); void filterRecords(); void extractInReads(); bool traverseInReads(Sequences2& readBatch); diff --git a/src/reads.cpp b/src/reads.cpp index 9670e45..b320b27 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -143,226 +143,276 @@ void InReads::load() { } void InReads::extractInReads() { - + std::string newLine, seqHeader, seqComment, line, bedHeader; + std::string seqBuf, qualBuf, plusBuf, qnameBuf; // reusable buffers std::size_t numFiles = userInput.inFiles.size(); - uint32_t batchCounter = 0; // batch number, read position in the batch uint64_t processedLength = 0; - bool sample = userInput.ratio < 1 ? true : false; // read subsampling - + bool sample = userInput.ratio < 1; + const static phmap::flat_hash_map string_to_case{ - {"fasta",1}, - {"fa",1}, - {"fasta.gz",1}, - {"fa.gz",1}, - {"fastq",1}, - {"fq",1}, - {"fastq.gz",1}, - {"fq.gz",1}, - {"bam",2}, - {"cram",2}, + {"fasta",1}, {"fa",1}, {"fasta.gz",1}, {"fa.gz",1}, + {"fastq",1}, {"fq",1}, {"fastq.gz",1}, {"fq.gz",1}, + {"bam",2}, {"cram",2}, {"rd",3} }; - + while (true) { - + uint32_t i = fileCounterStarted.fetch_add(1, std::memory_order_relaxed); if (i >= numFiles) break; - + std::unique_ptr readBatch = free_pool_in.pop(); uint32_t batchN = 0; - std::string file = userInput.file('r', i); - std::string ext = getFileExt(file); - if (ext != "rd") { + + std::string file = userInput.file('r', i); + std::string ext = getFileExt(file); + + if (ext != "rd") { threadPool.queueJob([this, i, file]() { std::string md5; computeMd5(file, md5); - md5s[i].first = getFileName(file); + md5s[i].first = getFileName(file); md5s[i].second = md5; return true; }); - } - switch (string_to_case.count(ext) ? string_to_case.at(ext) : 0) { - - case 1: { // fa*[.gz] + } + + switch (string_to_case.count(ext) ? string_to_case.at(ext) : 0) { + + case 1: { // fa*[.gz] or fq*[.gz] StreamObj streamObj; std::shared_ptr stream = streamObj.openStream(userInput, 'r', i); - - if (stream) { - - switch (stream->peek()) { - - case '>': { - - stream->get(); - - while (getline(*stream, newLine)) { - - size_t spacePos = newLine.find(" "); - seqHeader = newLine.substr(0, spacePos); - if (spacePos != std::string::npos) - seqComment = newLine.substr(spacePos + 1); - else - seqComment.clear(); - std::unique_ptr inSequence = std::make_unique(); - if (!getline(*stream, *inSequence, '>')) { + + if (stream) { + switch (stream->peek()) { + + case '>': { // FASTA + stream->get(); // consume '>' + + while (getline(*stream, newLine)) { + + const size_t spacePos = newLine.find(' '); + if (spacePos == std::string::npos) { + seqHeader = newLine; + seqComment.clear(); + } else { + seqHeader.assign(newLine, 0, spacePos); + seqComment.assign(newLine, spacePos + 1, std::string::npos); + } + + // Read sequence until next '>' + seqBuf.clear(); + if (!getline(*stream, seqBuf, '>')) { fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); exit(EXIT_FAILURE); } + + // Subsample BEFORE doing anything else if (sample && newRand() > userInput.ratio) - continue; - processedLength += inSequence->size(); - readBatch->add(seqPos++, batchCounter++, std::move(seqHeader), std::move(seqComment), std::move(inSequence)); - - if (processedLength > batchSize) { - readBatch->batchN = batchN++; - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); + continue; + + processedLength += seqBuf.size(); + + // Store into a reused slot (Sequence2 stores strings by value) + { + Sequence2& rec = readBatch->next_slot(); + rec.set(std::move(seqHeader), + std::move(seqComment), + std::move(seqBuf), + std::string{}, // no qualities for fasta + seqPos++); + } + + if (processedLength > batchSize) { + readBatch->batchN = batchN++; + readBatch->fileN = i; + lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); filled_q_in.push(std::move(readBatch)); - readBatch = free_pool_in.pop(); - processedLength = 0; - batchCounter = 0; - } - //lg.verbose("Individual fasta sequence read: " + seqHeader); - } - break; - } - case '@': { - - while (getline(*stream, newLine)) { // file input - - newLine.erase(0, 1); - size_t spacePos = newLine.find(" "); - seqHeader = newLine.substr(0, spacePos); - if (spacePos != std::string::npos) - seqComment = newLine.substr(spacePos + 1); - else - seqComment.clear(); - - std::unique_ptr inSequence = std::make_unique(); - if (!getline(*stream, *inSequence)) { + + readBatch = free_pool_in.pop(); + processedLength = 0; + } + } + break; + } + + case '@': { // FASTQ + while (getline(*stream, newLine)) { + + // header line starts with '@' + if (!newLine.empty() && newLine[0] == '@') + newLine.erase(0, 1); + + const size_t spacePos = newLine.find(' '); + if (spacePos == std::string::npos) { + seqHeader = newLine; + seqComment.clear(); + } else { + seqHeader.assign(newLine, 0, spacePos); + seqComment.assign(newLine, spacePos + 1, std::string::npos); + } + + seqBuf.clear(); + plusBuf.clear(); + qualBuf.clear(); + + if (!getline(*stream, seqBuf)) { fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); exit(EXIT_FAILURE); } - if (!getline(*stream, newLine)) { + if (!getline(*stream, plusBuf)) { fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); exit(EXIT_FAILURE); } - std::unique_ptr inSequenceQuality = std::make_unique(); - if (!getline(*stream, *inSequenceQuality)) { + if (!getline(*stream, qualBuf)) { fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); exit(EXIT_FAILURE); } + + // Subsample early if (sample && newRand() > userInput.ratio) - continue; - processedLength += inSequence->size(); - readBatch->add(seqPos++, batchCounter++, std::move(seqHeader), std::move(seqComment), std::move(inSequence), std::move(inSequenceQuality)); - - if (processedLength > batchSize) { - readBatch->batchN = batchN++; - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); + continue; + + processedLength += seqBuf.size(); + + { + Sequence2& rec = readBatch->next_slot(); + rec.set(std::move(seqHeader), + std::move(seqComment), + std::move(seqBuf), + std::move(qualBuf), + seqPos++); + } + + if (processedLength > batchSize) { + readBatch->batchN = batchN++; + readBatch->fileN = i; + lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); filled_q_in.push(std::move(readBatch)); - readBatch = free_pool_in.pop(); - processedLength = 0; - batchCounter = 0; - } - //lg.verbose("Individual fastq sequence read: " + seqHeader); - } - break; - } - } - readBatch->batchN = batchN++; // process residual reads - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); + + readBatch = free_pool_in.pop(); + processedLength = 0; + } + } + break; + } + } + + // process residual reads (even if empty; keep your existing behavior) + readBatch->batchN = batchN++; + readBatch->fileN = i; + lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); filled_q_in.push(std::move(readBatch)); + processedLength = 0; - batchCounter = 0; - } - break; - } - case 2: { // bam, cram - - samFile *fp_in = hts_open(file.c_str(),"r"); //open bam file - bam_hdr_t *bamHdr = sam_hdr_read(fp_in); //read header - bam1_t *bamdata = bam_init1(); //initialize an alignment - + } + break; + } + + case 2: { // bam, cram + samFile* fp_in = hts_open(file.c_str(), "r"); + bam_hdr_t* bamHdr = sam_hdr_read(fp_in); + bam1_t* bamdata = bam_init1(); + htsThreadPool tpool_read = {NULL, 0}; - tpool_read.pool = hts_tpool_init(userInput.decompression_threads); - if (tpool_read.pool) - hts_set_opt(fp_in, HTS_OPT_THREAD_POOL, &tpool_read); - else - lg.verbose("Failed to generate decompression threadpool with " + std::to_string(userInput.decompression_threads) + " threads. Continuing single-threaded"); - - while(sam_read1(fp_in,bamHdr,bamdata) > 0) { - - uint32_t len = bamdata->core.l_qseq; // length of the read. - uint8_t *seq = bam_get_seq(bamdata); // seq string - std::unique_ptr inSequenceQuality; - - std::unique_ptr inSequence = std::make_unique(); - inSequence->resize(len); - for(uint32_t i=0; iat(i) = seq_nt16_str[bam_seqi(seq,i)]; //gets nucleotide id and converts them into IUPAC id. - - uint8_t* qual = bam_get_qual(bamdata); - if (qual && (len > 0) && qual[0] != 0xFF) { - inSequenceQuality = std::make_unique(len, '\0'); - for (uint32_t i = 0; i < len; ++i) - (*inSequenceQuality)[i] = static_cast(qual[i] + 33); + tpool_read.pool = hts_tpool_init(userInput.decompression_threads); + if (tpool_read.pool) + hts_set_opt(fp_in, HTS_OPT_THREAD_POOL, &tpool_read); + else + lg.verbose("Failed to generate decompression threadpool with " + + std::to_string(userInput.decompression_threads) + + " threads. Continuing single-threaded"); + + while (sam_read1(fp_in, bamHdr, bamdata) > 0) { + + // Subsample BEFORE decoding + if (sample && newRand() > userInput.ratio) + continue; + + const uint32_t len = bamdata->core.l_qseq; + + // qname (reuse capacity) + qnameBuf.assign(bam_get_qname(bamdata)); + + // decode sequence (reuse capacity) + seqBuf.resize(len); + char* out = len ? &seqBuf[0] : nullptr; + uint8_t* bseq = bam_get_seq(bamdata); + for (uint32_t k = 0; k < len; ++k) + out[k] = seq_nt16_str[bam_seqi(bseq, k)]; + + // decode/synthesize qualities (reuse capacity) + qualBuf.resize(len); + char* qout = len ? &qualBuf[0] : nullptr; + uint8_t* bq = bam_get_qual(bamdata); + if (bq && len > 0 && bq[0] != 0xFF) { + for (uint32_t k = 0; k < len; ++k) + qout[k] = static_cast(bq[k] + 33); } else { - inSequenceQuality = std::make_unique(len, '!'); // No per-base qualities; synthesize minimal qualities + std::memset(qout, '!', len); } - if (sample && newRand() > userInput.ratio) - continue; - processedLength += inSequence->size(); + processedLength += len; - std::string qname(bam_get_qname(bamdata)); - readBatch->add(seqPos++, batchCounter++, std::move(qname), std::string(), std::move(inSequence), std::move(inSequenceQuality)); + { + Sequence2& rec = readBatch->next_slot(); + rec.set(std::move(qnameBuf), + std::string{}, // no comment for bam + std::move(seqBuf), + std::move(qualBuf), + seqPos++); + } - if (processedLength > batchSize) { - readBatch->batchN = batchN++; - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); + if (processedLength > batchSize) { + readBatch->batchN = batchN++; + readBatch->fileN = i; + lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); filled_q_in.push(std::move(readBatch)); - readBatch = free_pool_in.pop(); - processedLength = 0; - batchCounter = 0; - } - lg.verbose("Individual fastq sequence read: " + seqHeader); - } - readBatch->batchN = batchN++; // process residual reads - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); + + readBatch = free_pool_in.pop(); + processedLength = 0; + } + } + + // residual + readBatch->batchN = batchN++; + readBatch->fileN = i; + lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); filled_q_in.push(std::move(readBatch)); + processedLength = 0; - batchCounter = 0; - bam_destroy1(bamdata); - sam_close(fp_in); - if (tpool_read.pool) - hts_tpool_destroy(tpool_read.pool); - break; - } - case 3: { // rd - readTableCompressed(userInput.inFiles[i]); - break; - } - default: { // fasta[.gz] - fprintf(stderr, "cannot recognize input (%s). Must be: fasta, fastq, bam, cram.\n", file.c_str()); - exit(EXIT_FAILURE); - } - } + + bam_destroy1(bamdata); + sam_close(fp_in); + if (tpool_read.pool) + hts_tpool_destroy(tpool_read.pool); + + break; + } + + case 3: { // rd + readTableCompressed(userInput.inFiles[i]); + break; + } + + default: { + fprintf(stderr, "cannot recognize input (%s). Must be: fasta, fastq, bam, cram.\n", file.c_str()); + exit(EXIT_FAILURE); + } + } + if (fileCounterCompleted.fetch_add(1, std::memory_order_relaxed) + 1 == numFiles) { - for (size_t i = 0; i < consumersN; ++i) filled_q_in.push(std::unique_ptr(nullptr)); // sentinels + for (size_t k = 0; k < consumersN; ++k) + filled_q_in.push(std::unique_ptr(nullptr)); // sentinels } - } + } } -float InReads::computeAvgQuality(std::string &sequenceQuality) { +float InReads::computeAvgQuality(const std::string &sequenceQuality) { double sumQuality = 0; - for (char &quality : sequenceQuality) + for (const char &quality : sequenceQuality) sumQuality += pow(10, -(double(quality) - 33)/10); return (float) -10 * std::log10(sumQuality/sequenceQuality.size()); } @@ -455,13 +505,12 @@ void InReads::filterRecords() { } inline bool InReads::filterRead(Sequence2* sequence) { - - uint64_t size = sequence->sequence->size(); - float avgQuality = 0; - if (sequence->sequenceQuality != NULL) - avgQuality = computeAvgQuality(*sequence->sequenceQuality); - - return applyFilter(size, avgQuality); + + uint64_t size = sequence->sequence.size(); + float avgQuality = 0.0f; + if (!sequence->sequenceQuality.empty()) + avgQuality = computeAvgQuality(sequence->sequenceQuality); + return applyFilter(size, avgQuality); } inline bool InReads::applyFilter(uint64_t size, float avgQuality) { @@ -491,21 +540,21 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) { Log threadLog; threadLog.setId(readBatchIn.batchN); - + std::unique_ptr readBatchOut; std::unique_ptr R1_batch; std::unique_ptr R2_batch; - - if (!streamOutput) { // we allocate once as we are not outputting anything + + if (!streamOutput) { // allocate once if not outputting if (userInput.inputCifi && userInput.cifiCombinations_flag) { R1_batch = std::make_unique(); R2_batch = std::make_unique(); - }else{ + } else { readBatchOut = std::make_unique(); } } - if (userInput.inputCifi && userInput.cifiCombinations_flag) { // Two output batches per input batch + if (userInput.inputCifi && userInput.cifiCombinations_flag) { const uint32_t fileIdx = readBatchIn.fileN; const uint32_t baseOut = fileIdx * 2; @@ -522,18 +571,20 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) R2_batch->batchN = readBatchIn.batchN; R2_batch->fileN = baseOut + 1; - } else { // existing single-output case + + } else { if (streamOutput) readBatchOut = free_pool_out.pop(); - + readBatchOut->reads.clear(); readBatchOut->fileN = readBatchIn.fileN; readBatchOut->batchN = readBatchIn.batchN; } + EnzymeInfo enz; - if(userInput.inputCifi) + if (userInput.inputCifi) enz = get_enzyme(userInput.restrictionEnzyme); - + std::vector inReadsSummaryBatch; uint32_t readN = 0; LenVector readLensBatch; @@ -544,99 +595,92 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) const bool hasExclude = !excludeList.empty(); const bool hasFilter = (userInput.filter != "none"); - for (const auto& sequencePtr : readBatchIn.sequences) { - Sequence2* sequence = sequencePtr.get(); + // Iterate only the used slots (pool-friendly) + for (uint32_t idx = 0; idx < readBatchIn.used; ++idx) { + Sequence2& sequence = *readBatchIn.slots[idx]; - if (hasInclude && - includeList.find(sequence->header) == includeList.end()) + if (hasInclude && includeList.find(sequence.header) == includeList.end()) continue; - if (hasExclude && - excludeList.find(sequence->header) != excludeList.end()) + if (hasExclude && excludeList.find(sequence.header) != excludeList.end()) continue; - if (hasFilter && filterRead(sequence)) + if (hasFilter && filterRead(&sequence)) continue; - InRead read = traverseInRead(&threadLog, - sequence, - readBatchIn.batchN + readN++); + // NOTE: traverseInRead signature remains Sequence2* + InRead read = traverseInRead(&threadLog, &sequence, readBatchIn.batchN + readN++); const uint64_t len = read.A + read.C + read.G + read.T + read.N; readLensBatch.push_back(std::make_pair(len, read.avgQuality)); - batchA += read.A; - batchT += read.T; - batchC += read.C; - batchG += read.G; + batchA += read.A; + batchT += read.T; + batchC += read.C; + batchG += read.G; batchN_ += read.N; // STREAMING OUTPUT: convert to BAM record and append to batch if (streamOutput && !userInput.inputCifi) { - // Ensure quality exists + if (!read.inSequenceQuality) { read.inSequenceQuality = std::make_unique(read.inSequence->size(), '!'); } bam1_t* q = make_unmapped_bam( - read.seqHeader, // name - *read.inSequence, // seq - *read.inSequenceQuality, // qual (ASCII+33) - 0 // extra_flags (none, just BAM_FUNMAP) + read.seqHeader, // name + *read.inSequence, // seq + *read.inSequenceQuality, // qual + 0 ); readBatchOut->reads.push_back(q); - - }else if (userInput.inputCifi) { - - if (userInput.cifiCombinations_flag) { - // SCIFI + COMBINATIONS: two *separate* BamBatch outputs + } else if (userInput.inputCifi) { + + if (userInput.cifiCombinations_flag) { if (!read.inSequenceQuality) { read.inSequenceQuality = std::make_unique(read.inSequence->size(), '!'); } build_pe_bambatches(read, enz, *R1_batch, *R2_batch); - }else{ - // SCIFI, NO COMBINATIONS: just chop into fragments in this batch + + } else { if (!read.inSequenceQuality) { read.inSequenceQuality = - std::make_unique(read.inSequence->size(), '!'); + std::make_unique(read.inSequence->size(), '!'); } - + BamBatch tmp = chop_read_to_bambatch( - read, - enz, - readBatchOut->batchN, - readBatchOut->fileN - ); - - // append chopped fragments into our current batch + read, + enz, + readBatchOut->batchN, + readBatchOut->fileN + ); + readBatchOut->reads.insert(readBatchOut->reads.end(), tmp.reads.begin(), tmp.reads.end()); } } + if (userInput.content_flag) inReadsSummaryBatch.push_back(std::move(read)); - - sequence->sequence.reset(); - sequence->sequenceQuality.reset(); - } + // DO NOT clear Sequence2 strings here; keep capacity in pool. + // recycle_keep_capacity() does that when the batch is returned to free_pool_in. + } if (userInput.inputCifi && userInput.cifiCombinations_flag) { cifiReadN.fetch_add(R1_batch->reads.size() + R2_batch->reads.size()); - + if (streamOutput) { filled_q_out.push(std::move(R1_batch)); filled_q_out.push(std::move(R2_batch)); - }else{ + } else { R1_batch->reads.clear(); R2_batch->reads.clear(); } - // If they’re empty, they just get destroyed and their BamBatch - // instances are removed from the pool; the writer will add more - // via free_pool_out.push after writing other batches. + } else { cifiReadN.fetch_add(readBatchOut->reads.size()); if (streamOutput) @@ -645,7 +689,7 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) readBatchOut->reads.clear(); } - { // Update global stats and summaries + { // Update global stats and summaries std::unique_lock lck(writerMutex); if (userInput.content_flag && !inReadsSummaryBatch.empty()) { @@ -653,7 +697,7 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) readSummaryBatches.files.resize(readBatchIn.fileN + 1); auto& rbVec = readSummaryBatches.files[readBatchIn.fileN].readBatches; - auto summaryBatch = std::make_unique(); + auto summaryBatch = std::make_unique(); summaryBatch->reads = std::move(inReadsSummaryBatch); summaryBatch->batchN = readBatchIn.batchN; summaryBatch->fileN = readBatchIn.fileN; @@ -661,28 +705,34 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) } readLens.insert(readLensBatch); - totA += batchA; - totT += batchT; - totC += batchC; - totG += batchG; - totN += batchN_; + totA += batchA; + totT += batchT; + totC += batchC; + totG += batchG; + totN += batchN_; totReads += readN; threadLog.print(); } + writerMutexCondition.notify_one(); return true; } -InRead InReads::traverseInRead(Log* threadLog, Sequence2* sequence, uint32_t seqPos) { + +InRead InReads::traverseInRead(Log* threadLog, Sequence2* sequence, uint32_t seqPos) +{ std::vector> bedCoords; + if (userInput.hc_cutoff != -1) { - homopolymerCompress(sequence->sequence.get(), bedCoords, userInput.hc_cutoff); - sequence->sequenceQuality.reset(); + // homopolymerCompress expects a std::string* + homopolymerCompress(&sequence->sequence, bedCoords, userInput.hc_cutoff); + // Keep capacity in pool, just clear + sequence->sequenceQuality.clear(); } uint64_t A=0, C=0, G=0, T=0, N=0, lowerCount=0; - for (char base : *sequence->sequence) { + for (char base : sequence->sequence) { if (std::islower(static_cast(base))) ++lowerCount; switch (base) { case 'A': case 'a': ++A; break; @@ -693,15 +743,23 @@ InRead InReads::traverseInRead(Log* threadLog, Sequence2* sequence, uint32_t seq } } - float avgQuality = sequence->sequenceQuality - ? computeAvgQuality(*sequence->sequenceQuality) - : 0.0f; + float avgQuality = (!sequence->sequenceQuality.empty()) + ? computeAvgQuality(sequence->sequenceQuality) // make this take const std::string& + : 0.0f; + + // COPY payload into InRead-owned buffers so output can keep it, + // while Sequence2 retains its capacity in the pool. + auto seqCopy = std::make_unique(sequence->sequence); + + std::unique_ptr qualCopy; + if (!sequence->sequenceQuality.empty()) + qualCopy = std::make_unique(sequence->sequenceQuality); InRead inRead( threadLog, 0, 0, sequence->header, sequence->comment, - std::move(sequence->sequence), - std::move(sequence->sequenceQuality), + std::move(seqCopy), + std::move(qualCopy), seqPos, A, C, G, T, N, lowerCount, avgQuality ); From d37fed9d419c934a31168ce903a7c79419269f7a Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Tue, 24 Feb 2026 17:38:04 -0500 Subject: [PATCH 02/15] remove additional memory allocations --- include/reads.h | 4 ++- src/reads.cpp | 92 +++++++++++++++++++++++++++++++++++-------------- 2 files changed, 70 insertions(+), 26 deletions(-) diff --git a/include/reads.h b/include/reads.h index e4e9acf..abf276d 100644 --- a/include/reads.h +++ b/include/reads.h @@ -109,7 +109,6 @@ struct InRead { } }; - // ------------------------------------------------------------- // ReadBatch and container structures // ------------------------------------------------------------- @@ -117,8 +116,11 @@ struct InRead { template struct ReadBatch { std::vector reads; + uint32_t used = 0; // how many valid entries this time uint32_t batchN = 0; // within-file batch index uint32_t fileN = 0; // file ID + + void reset_keep_capacity() { used = 0; } // DO NOT clear() }; template diff --git a/src/reads.cpp b/src/reads.cpp index b320b27..b705325 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -29,6 +29,52 @@ #include "cifi.h" #include "reads.h" +static inline void bam_recycle_keep_capacity(bam1_t* b) { + // Keep b->data allocated, just mark record empty. + b->l_data = 0; + b->core = bam1_core_t(); // zero-initialize core + // Do NOT free b->data; m_data stays as-is. +} + +static inline void fill_unmapped_bam( + bam1_t* b, + const std::string& name, + const std::string& seq, + const std::string& qual_str, + uint16_t extra_flags = 0 +) { +#ifdef bam_set1 + const int32_t l_qname = static_cast(name.size() + 1); + const int32_t l_seq = static_cast(seq.size()); + + const int ret = bam_set1( + b, + l_qname, name.c_str(), + static_cast(BAM_FUNMAP | extra_flags), + -1, -1, 0, + 0, nullptr, + -1, -1, 0, + l_seq, + seq.c_str(), + qual_str.c_str(), + 0 + ); + if (ret < 0) { fprintf(stderr, "bam_set1 failed\n"); std::abort(); } +#else + // Fallback: allocate a temporary record using existing helper, + // then swap contents into b (still avoids repeated malloc/free of b itself). + bam1_t* tmp = make_unmapped_bam(name, seq, qual_str, extra_flags); + + // Swap core and buffers + std::swap(b->core, tmp->core); + std::swap(b->l_data, tmp->l_data); + std::swap(b->m_data, tmp->m_data); + std::swap(b->data, tmp->data); + + bam_destroy1(tmp); +#endif +} + InReads::InReads(UserInputRdeval& ui) : userInput(ui), splitOutputByFile(ui.outPrefix != "" ? true : false), @@ -109,7 +155,10 @@ void InReads::load() { if (streamOutput) { for (size_t i = 0; i < (outBuffersN + outBuffersN * userInput.inputCifi); ++i) { auto batch = std::make_unique(); - batch->reads.reserve(1024); + batch->reads.resize(1024); + for (size_t j = 0; j < batch->reads.size(); ++j) + batch->reads[j] = bam_init1(); + batch->used = 0; free_pool_out.push(std::move(batch)); } } @@ -563,8 +612,8 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) R2_batch = free_pool_out.pop(); } - R1_batch->reads.clear(); - R2_batch->reads.clear(); + R1_batch->used = 0; + R2_batch->used = 0; R1_batch->batchN = readBatchIn.batchN; R1_batch->fileN = baseOut; @@ -576,7 +625,7 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) if (streamOutput) readBatchOut = free_pool_out.pop(); - readBatchOut->reads.clear(); + readBatchOut->used = 0; readBatchOut->fileN = readBatchIn.fileN; readBatchOut->batchN = readBatchIn.batchN; } @@ -628,14 +677,9 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) std::make_unique(read.inSequence->size(), '!'); } - bam1_t* q = make_unmapped_bam( - read.seqHeader, // name - *read.inSequence, // seq - *read.inSequenceQuality, // qual - 0 - ); - - readBatchOut->reads.push_back(q); + bam1_t* q = readBatchOut->reads[readBatchOut->used++]; + bam_recycle_keep_capacity(q); + fill_unmapped_bam(q, read.seqHeader, *read.inSequence, *read.inSequenceQuality, 0); } else if (userInput.inputCifi) { @@ -677,8 +721,8 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) filled_q_out.push(std::move(R1_batch)); filled_q_out.push(std::move(R2_batch)); } else { - R1_batch->reads.clear(); - R2_batch->reads.clear(); + R1_batch->used = 0; + R2_batch->used = 0; } } else { @@ -686,7 +730,7 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) if (streamOutput) filled_q_out.push(std::move(readBatchOut)); else - readBatchOut->reads.clear(); + readBatchOut->used = 0; } { // Update global stats and summaries @@ -1265,7 +1309,7 @@ void InReads::writeToStream() { // Defensive: bad file index, recycle (shouldn't happen) for (bam1_t* rec : batch->reads) bam_destroy1(rec); - batch->reads.clear(); + batch->used = 0; free_pool_out.push(std::move(batch)); continue; } @@ -1291,16 +1335,15 @@ void InReads::writeToStream() { " reads (batch " + std::to_string(bptr->batchN) + ") to outId " + std::to_string(outId) + " (fileId " + std::to_string(fileId) + ")"); - - for (bam1_t* rec : bptr->reads) { + + for (uint32_t k = 0; k < bptr->used; ++k) { + bam1_t* rec = bptr->reads[k]; if (sam_write1(ofp, ohdr, rec) < 0) { fprintf(stderr, "Error writing BAM record (outId %zu)\n", outId); std::abort(); } - bam_destroy1(rec); } - - bptr->reads.clear(); + bptr->used = 0; free_pool_out.push(std::move(bptr)); ++next; @@ -1320,15 +1363,14 @@ void InReads::writeToStream() { htsFile* ofp = fps[outId]; bam_hdr_t* ohdr = hdrs[outId]; - for (bam1_t* rec : bptr->reads) { + for (uint32_t k = 0; k < bptr->used; ++k) { + bam1_t* rec = bptr->reads[k]; if (sam_write1(ofp, ohdr, rec) < 0) { fprintf(stderr, "Error writing BAM record during final flush (outId %zu)\n", outId); std::abort(); } - bam_destroy1(rec); } - - bptr->reads.clear(); + bptr->used = 0; free_pool_out.push(std::move(bptr)); } filePending.clear(); From e408ea1fc1add0ebc6e24d7d6a967704cce2ee2a Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Tue, 24 Feb 2026 22:49:44 -0500 Subject: [PATCH 03/15] reuse bam records in output as well --- include/bam-utils.h | 171 ++++++++++++++++++++++++++++++++++++++++ include/cifi.h | 49 +----------- include/reads.h | 6 +- src/cifi.cpp | 72 ++++++----------- src/reads.cpp | 185 ++++++++++++++++++-------------------------- 5 files changed, 275 insertions(+), 208 deletions(-) create mode 100644 include/bam-utils.h diff --git a/include/bam-utils.h b/include/bam-utils.h new file mode 100644 index 0000000..ec83ffc --- /dev/null +++ b/include/bam-utils.h @@ -0,0 +1,171 @@ +#ifndef BAMUTILS_H +#define BAMUTILS_H + +#include + +#include +#include +#include // std::swap +#include // fprintf +#include // std::abort +#include // uint16_t, etc + +// Create a fresh unmapped BAM record (allocates a new bam1_t). +// NOTE: l_qname passed to bam_set1 must include the trailing '\0' => name.size() + 1. +static inline bam1_t* make_unmapped_bam( + const std::string& name, + const std::string& seq, + const std::string& qual_str, + uint16_t extra_flags = 0 +) { + bam1_t* b = bam_init1(); + if (!b) { + std::fprintf(stderr, "Failed to initialize bam1_t\n"); + std::abort(); + } + + const int32_t l_qname = static_cast(name.size() + 1); + const int32_t l_seq = static_cast(seq.size()); + + const uint16_t flag = static_cast(BAM_FUNMAP | extra_flags); + + // We pass nullptr for qual here and then fill the numeric quals below. + // (Some htslib versions allow passing qual_str.c_str() directly; see fill_unmapped_bam) + if (bam_set1( + b, + l_qname, + name.c_str(), + flag, + -1, -1, // tid, pos + 0, // mapq + 0, // n_cigar + nullptr, // cigar + -1, -1, // mtid, mpos + 0, // isize + l_seq, + seq.c_str(), + nullptr, // qual (filled below) + 0 // l_aux + ) < 0) + { + std::fprintf(stderr, "Failed to set BAM data\n"); + std::abort(); + } + + // Fill numeric (0..93) qualities. bam_set1 stores quals as raw Phred, not ASCII. + uint8_t* qual = bam_get_qual(b); + if (qual) { + if (!qual_str.empty()) { + if (qual_str.size() != seq.size()) { + std::fprintf(stderr, "Sequence and quality length mismatch in make_unmapped_bam\n"); + std::abort(); + } + for (size_t i = 0; i < qual_str.size(); ++i) { + const int q = static_cast(qual_str[i]) - 33; + qual[i] = static_cast(q < 0 ? 0 : q); + } + } else { + // If no qualities provided, set to 0 (Phred 0). + for (int32_t i = 0; i < l_seq; ++i) + qual[i] = 0; + } + } + + return b; +} + +// Keep b->data allocated, just mark record empty (pool-friendly). +static inline void bam_recycle_keep_capacity(bam1_t* b) { + b->l_data = 0; + b->core = bam1_core_t(); // zero-initialize core + // Do NOT free b->data; m_data stays as-is. +} + +// Fill an existing bam1_t as an unmapped record. +// Uses bam_set1 when available; otherwise falls back to making a temporary record and swapping. +static inline void fill_unmapped_bam( + bam1_t* b, + const std::string& name, + const std::string& seq, + const std::string& qual_str, + uint16_t extra_flags = 0 +) { +#ifdef bam_set1 + const int32_t l_qname = static_cast(name.size() + 1); + const int32_t l_seq = static_cast(seq.size()); + + const int ret = bam_set1( + b, + l_qname, name.c_str(), + static_cast(BAM_FUNMAP | extra_flags), + -1, -1, 0, // tid, pos, mapq + 0, nullptr, // n_cigar, cigar + -1, -1, 0, // mtid, mpos, isize + l_seq, + seq.c_str(), + qual_str.empty() ? nullptr : qual_str.c_str(), + 0 + ); + if (ret < 0) { + std::fprintf(stderr, "bam_set1 failed\n"); + std::abort(); + } + + // If qual_str was nullptr/empty, explicitly set qualities to 0. + if (qual_str.empty()) { + uint8_t* q = bam_get_qual(b); + if (q) { + for (int32_t i = 0; i < l_seq; ++i) + q[i] = 0; + } + } + +#else + // Fallback: allocate a temporary record using existing helper, + // then swap contents into b (still avoids repeated malloc/free of b itself). + bam1_t* tmp = make_unmapped_bam(name, seq, qual_str, extra_flags); + + std::swap(b->core, tmp->core); + std::swap(b->l_data, tmp->l_data); + std::swap(b->m_data, tmp->m_data); + std::swap(b->data, tmp->data); + + bam_destroy1(tmp); +#endif +} + +// ---- Templated helpers (no dependency on BamBatch typedef) ---- +// Requirements on Batch: +// - batch.reads is std::vector +// - batch.used is an integer count of valid entries + +template +static inline void bambatch_ensure_capacity(Batch& batch, size_t additional) { + const size_t need = static_cast(batch.used) + additional; + if (need <= batch.reads.size()) return; + + const size_t old = batch.reads.size(); + size_t neu = old ? old : 1024; + while (neu < need) neu *= 2; + + batch.reads.resize(neu); + for (size_t i = old; i < neu; ++i) + batch.reads[i] = bam_init1(); +} + +template +static inline void bambatch_append_unmapped( + Batch& batch, + const std::string& name, + const std::string& seq, + const std::string& qual, + uint16_t extra_flags +) { + bambatch_ensure_capacity(batch, 1); + + bam1_t* b = batch.reads[batch.used++]; + bam_recycle_keep_capacity(b); + fill_unmapped_bam(b, name, seq, qual, extra_flags); +} + +#endif /* BAMUTILS_H */ diff --git a/include/cifi.h b/include/cifi.h index 677a321..a777b05 100644 --- a/include/cifi.h +++ b/include/cifi.h @@ -2,6 +2,7 @@ #define SCIFI_H #include +#include "bam-utils.h" #include "reads.h" struct EnzymeInfo { @@ -19,54 +20,8 @@ std::vector find_cut_positions(const InRead& read, const EnzymeInfo& enz); void digest_read(const InRead& read, const EnzymeInfo& enz, std::vector& frag_seqs, std::vector& frag_quals, std::vector* frag_lengths = nullptr); // Digest sequences and corresponding quality strings -BamBatch chop_read_to_bambatch(const InRead& read, const EnzymeInfo& enz, uint32_t batchN = 0, uint32_t fileN = 0); +void chop_read_into_bambatch(const InRead& read, const EnzymeInfo& enz, BamBatch& batch); void build_pe_bambatches(const InRead& read, const EnzymeInfo& enz, BamBatch& R1_batch, BamBatch& R2_batch); -static bam1_t* make_unmapped_bam(const std::string& name, const std::string& seq, const std::string& qual_str, uint16_t extra_flags = 0) { - bam1_t* b = bam_init1(); - if (!b) { - fprintf(stderr, "Failed to initialize bam1_t\n"); - std::abort(); - } - - const int32_t l_qname = static_cast(name.size()); - const int32_t l_seq = static_cast(seq.size()); - - uint16_t flag = BAM_FUNMAP | extra_flags; - - if (bam_set1(b, - l_qname, - name.c_str(), - flag, - -1, -1, - 0, 0, - nullptr, - -1, -1, - 0, - l_seq, - seq.c_str(), - nullptr, - 0) < 0) { - fprintf(stderr, "Failed to set BAM data\n"); - std::abort(); - } - - uint8_t* qual = bam_get_qual(b); - if (qual && !qual_str.empty()) { - if (qual_str.size() != seq.size()) { - fprintf(stderr, "Sequence and quality length mismatch in make_unmapped_bam\n"); - std::abort(); - } - for (size_t i = 0; i < qual_str.size(); ++i) { - qual[i] = static_cast(qual_str[i] - 33); - } - } else if (qual) { - for (int32_t i = 0; i < l_seq; ++i) - qual[i] = 0; - } - - return b; -} - #endif /* SCIFI_H */ diff --git a/include/reads.h b/include/reads.h index abf276d..611e1bb 100644 --- a/include/reads.h +++ b/include/reads.h @@ -64,8 +64,6 @@ struct InRead { uint64_t A=0, C=0, G=0, T=0, N=0, lowerCount=0; uint32_t uId=0, iId=0, seqPos=0; - std::vector tags; - std::vector> variants; float avgQuality = 0.0f; InRead() = default; @@ -83,8 +81,7 @@ struct InRead { uint32_t readPos, uint64_t A, uint64_t C, uint64_t G, uint64_t T, uint64_t N, uint64_t lowerCount, - float avgQuality, - const std::vector* tags = nullptr) + float avgQuality) : seqHeader(std::move(header)), seqComment(std::move(comment)), inSequence(std::move(sequence)), @@ -93,7 +90,6 @@ struct InRead { uId(uId), iId(iId), seqPos(readPos), avgQuality(avgQuality) { (void)threadLog; - if (tags) this->tags = *tags; #ifdef DEBUG if (threadLog) { threadLog->add("Processing read: " + seqHeader + diff --git a/src/cifi.cpp b/src/cifi.cpp index d56bf72..f06c1b2 100644 --- a/src/cifi.cpp +++ b/src/cifi.cpp @@ -17,6 +17,7 @@ #include "stream-obj.h" #include "cifi.h" +#include "bam-utils.h" // Find enzyme info from name EnzymeInfo get_enzyme(const std::string& name) { @@ -106,79 +107,57 @@ void digest_read(const InRead& read, const EnzymeInfo& enz, std::vector frag_seqs; std::vector frag_quals; - std::vector frag_lengths; // optional, you can drop if not needed - - digest_read(read, enz, frag_seqs, frag_quals, &frag_lengths); - - BamBatch batch; - batch.batchN = batchN; - batch.fileN = fileN; + digest_read(read, enz, frag_seqs, frag_quals, nullptr); - batch.reads.reserve(frag_seqs.size()); + // Ensure enough capacity in one go + bambatch_ensure_capacity(batch, frag_seqs.size()); for (std::size_t i = 0; i < frag_seqs.size(); ++i) { std::string frag_id = read.seqHeader + "_frag" + std::to_string(i); - - bam1_t* b = make_unmapped_bam( - frag_id, - frag_seqs[i], - frag_quals[i], - 0 // no extra flags - ); - - batch.reads.push_back(b); - - // If you want fragment lengths here, record frag_lengths[i] somewhere else. + bambatch_append_unmapped(batch, frag_id, frag_seqs[i], frag_quals[i], 0); } - return batch; } -void build_pe_bambatches(const InRead& read, const EnzymeInfo& enz, BamBatch& R1_batch, BamBatch& R2_batch) { +void build_pe_bambatches(const InRead& read, const EnzymeInfo& enz,BamBatch& R1_batch, BamBatch& R2_batch) { if (!read.inSequenceQuality) { throw std::runtime_error("InRead::inSequenceQuality is null in build_pe_bambatches"); } std::vector frag_seqs; std::vector frag_quals; - digest_read(read, enz, frag_seqs, frag_quals, nullptr); const std::size_t n = frag_seqs.size(); if (n == 0) return; - const int trim = 5; // as in original fic2pe - const std::size_t max_pairs = n * (n - 1) / 2; + const int trim = 5; - R1_batch.reads.reserve(R1_batch.reads.size() + max_pairs); - R2_batch.reads.reserve(R2_batch.reads.size() + max_pairs); + // Optional: pre-reserve capacity in one go + const std::size_t max_pairs = n * (n - 1) / 2; + bambatch_ensure_capacity(R1_batch, max_pairs); + bambatch_ensure_capacity(R2_batch, max_pairs); for (std::size_t i1 = 0; i1 < n; ++i1) { for (std::size_t i2 = i1 + 1; i2 < n; ++i2) { - const std::string& seq1 = frag_seqs[i1]; - const std::string& seq2 = frag_seqs[i2]; + const std::string& seq1 = frag_seqs[i1]; const std::string& q1 = frag_quals[i1]; - const std::string& q2 = frag_quals[i2]; - std::string R1_seq = seq1; - std::string R1_qual = q1; + const std::string& seq2 = frag_seqs[i2]; + const std::string& q2 = frag_quals[i2]; - std::string R2_seq; - std::string R2_qual; - - if (seq2.size() > static_cast(trim)) { - R2_seq = seq2.substr(trim); - R2_qual = q2.substr(trim); - } else { + if (seq2.size() <= static_cast(trim)) continue; - } + + std::string R2_seq = seq2.substr(trim); + std::string R2_qual = q2.substr(trim); std::string base_id = read.seqHeader + "_" + std::to_string(i1) + "_" + @@ -187,17 +166,14 @@ void build_pe_bambatches(const InRead& read, const EnzymeInfo& enz, BamBatch& R1 std::string R1_id = base_id + " 1"; std::string R2_id = base_id + " 2"; - bam1_t* b1 = make_unmapped_bam( - R1_id, R1_seq, R1_qual, + bambatch_append_unmapped( + R1_batch, R1_id, seq1, q1, static_cast(BAM_FPAIRED | BAM_FREAD1) ); - bam1_t* b2 = make_unmapped_bam( - R2_id, R2_seq, R2_qual, + bambatch_append_unmapped( + R2_batch, R2_id, R2_seq, R2_qual, static_cast(BAM_FPAIRED | BAM_FREAD2) ); - - R1_batch.reads.push_back(b1); - R2_batch.reads.push_back(b2); } } } diff --git a/src/reads.cpp b/src/reads.cpp index b705325..3a27dbd 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -29,52 +29,6 @@ #include "cifi.h" #include "reads.h" -static inline void bam_recycle_keep_capacity(bam1_t* b) { - // Keep b->data allocated, just mark record empty. - b->l_data = 0; - b->core = bam1_core_t(); // zero-initialize core - // Do NOT free b->data; m_data stays as-is. -} - -static inline void fill_unmapped_bam( - bam1_t* b, - const std::string& name, - const std::string& seq, - const std::string& qual_str, - uint16_t extra_flags = 0 -) { -#ifdef bam_set1 - const int32_t l_qname = static_cast(name.size() + 1); - const int32_t l_seq = static_cast(seq.size()); - - const int ret = bam_set1( - b, - l_qname, name.c_str(), - static_cast(BAM_FUNMAP | extra_flags), - -1, -1, 0, - 0, nullptr, - -1, -1, 0, - l_seq, - seq.c_str(), - qual_str.c_str(), - 0 - ); - if (ret < 0) { fprintf(stderr, "bam_set1 failed\n"); std::abort(); } -#else - // Fallback: allocate a temporary record using existing helper, - // then swap contents into b (still avoids repeated malloc/free of b itself). - bam1_t* tmp = make_unmapped_bam(name, seq, qual_str, extra_flags); - - // Swap core and buffers - std::swap(b->core, tmp->core); - std::swap(b->l_data, tmp->l_data); - std::swap(b->m_data, tmp->m_data); - std::swap(b->data, tmp->data); - - bam_destroy1(tmp); -#endif -} - InReads::InReads(UserInputRdeval& ui) : userInput(ui), splitOutputByFile(ui.outPrefix != "" ? true : false), @@ -585,8 +539,7 @@ inline bool InReads::applyFilter(uint64_t size, float avgQuality) { return true; } -bool InReads::traverseInReads(Sequences2& readBatchIn) -{ +bool InReads::traverseInReads(Sequences2& readBatchIn) { Log threadLog; threadLog.setId(readBatchIn.batchN); @@ -594,7 +547,9 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) std::unique_ptr R1_batch; std::unique_ptr R2_batch; - if (!streamOutput) { // allocate once if not outputting + // Only need output batches if we're streaming or if CiFi builds internal batches. + // Keep your original behavior for !streamOutput (alloc once) since you said it was fine. + if (!streamOutput) { if (userInput.inputCifi && userInput.cifiCombinations_flag) { R1_batch = std::make_unique(); R2_batch = std::make_unique(); @@ -603,6 +558,7 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) } } + // Acquire from pool if streaming if (userInput.inputCifi && userInput.cifiCombinations_flag) { const uint32_t fileIdx = readBatchIn.fileN; const uint32_t baseOut = fileIdx * 2; @@ -620,13 +576,12 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) R2_batch->batchN = readBatchIn.batchN; R2_batch->fileN = baseOut + 1; - } else { if (streamOutput) readBatchOut = free_pool_out.pop(); - readBatchOut->used = 0; - readBatchOut->fileN = readBatchIn.fileN; + readBatchOut->used = 0; + readBatchOut->fileN = readBatchIn.fileN; readBatchOut->batchN = readBatchIn.batchN; } @@ -643,21 +598,20 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) const bool hasInclude = !includeList.empty(); const bool hasExclude = !excludeList.empty(); const bool hasFilter = (userInput.filter != "none"); + + thread_local std::string tmpQual; - // Iterate only the used slots (pool-friendly) for (uint32_t idx = 0; idx < readBatchIn.used; ++idx) { Sequence2& sequence = *readBatchIn.slots[idx]; if (hasInclude && includeList.find(sequence.header) == includeList.end()) continue; - if (hasExclude && excludeList.find(sequence.header) != excludeList.end()) continue; - if (hasFilter && filterRead(&sequence)) continue; - // NOTE: traverseInRead signature remains Sequence2* + // Compute stats (and optionally payload) via traverseInRead() InRead read = traverseInRead(&threadLog, &sequence, readBatchIn.batchN + readN++); const uint64_t len = read.A + read.C + read.G + read.T + read.N; @@ -669,53 +623,61 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) batchG += read.G; batchN_ += read.N; - // STREAMING OUTPUT: convert to BAM record and append to batch + // ---- STREAMING OUTPUT (non-CiFi): fill BAM directly from Sequence2 ---- if (streamOutput && !userInput.inputCifi) { - - if (!read.inSequenceQuality) { - read.inSequenceQuality = - std::make_unique(read.inSequence->size(), '!'); + // Ensure there is a slot in the preallocated bam array + if (readBatchOut->used >= readBatchOut->reads.size()) { + // Grow once in a while; still pool-friendly. + const size_t old = readBatchOut->reads.size(); + const size_t neu = old ? old * 2 : 1024; + readBatchOut->reads.resize(neu); + for (size_t j = old; j < neu; ++j) + readBatchOut->reads[j] = bam_init1(); } bam1_t* q = readBatchOut->reads[readBatchOut->used++]; bam_recycle_keep_capacity(q); - fill_unmapped_bam(q, read.seqHeader, *read.inSequence, *read.inSequenceQuality, 0); - } else if (userInput.inputCifi) { + if (!sequence.sequenceQuality.empty()) { + fill_unmapped_bam(q, sequence.header, sequence.sequence, sequence.sequenceQuality, 0); + } else { + tmpQual.assign(sequence.sequence.size(), '!'); + fill_unmapped_bam(q, sequence.header, sequence.sequence, tmpQual, 0); + } + } + // ---- CiFi paths (need payload in read) ---- + else if (userInput.inputCifi) { + + // Make sure payload exists for CiFi processing + if (!read.inSequence) { + // If traverseInRead was called with needPayload=false by mistake, + // this makes the behavior safe. + read.inSequence = std::make_unique(sequence.sequence); + } + if (!read.inSequenceQuality) { + if (!sequence.sequenceQuality.empty()) + read.inSequenceQuality = std::make_unique(sequence.sequenceQuality); + else + read.inSequenceQuality = std::make_unique(read.inSequence->size(), '!'); + } if (userInput.cifiCombinations_flag) { - if (!read.inSequenceQuality) { - read.inSequenceQuality = - std::make_unique(read.inSequence->size(), '!'); - } build_pe_bambatches(read, enz, *R1_batch, *R2_batch); - } else { - if (!read.inSequenceQuality) { - read.inSequenceQuality = - std::make_unique(read.inSequence->size(), '!'); - } - - BamBatch tmp = chop_read_to_bambatch( - read, - enz, - readBatchOut->batchN, - readBatchOut->fileN - ); - - readBatchOut->reads.insert(readBatchOut->reads.end(), - tmp.reads.begin(), tmp.reads.end()); + const uint32_t before = readBatchOut->used; + chop_read_into_bambatch(read, enz, *readBatchOut); + const uint32_t added = readBatchOut->used - before; + cifiReadN.fetch_add(added, std::memory_order_relaxed); } } if (userInput.content_flag) inReadsSummaryBatch.push_back(std::move(read)); - // DO NOT clear Sequence2 strings here; keep capacity in pool. - // recycle_keep_capacity() does that when the batch is returned to free_pool_in. } + // Push output batches if (userInput.inputCifi && userInput.cifiCombinations_flag) { - cifiReadN.fetch_add(R1_batch->reads.size() + R2_batch->reads.size()); + cifiReadN.fetch_add(R1_batch->used + R2_batch->used); if (streamOutput) { filled_q_out.push(std::move(R1_batch)); @@ -724,16 +686,14 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) R1_batch->used = 0; R2_batch->used = 0; } - } else { - cifiReadN.fetch_add(readBatchOut->reads.size()); if (streamOutput) filled_q_out.push(std::move(readBatchOut)); else readBatchOut->used = 0; } - { // Update global stats and summaries + { // Global stats / summaries std::unique_lock lck(writerMutex); if (userInput.content_flag && !inReadsSummaryBatch.empty()) { @@ -741,7 +701,7 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) readSummaryBatches.files.resize(readBatchIn.fileN + 1); auto& rbVec = readSummaryBatches.files[readBatchIn.fileN].readBatches; - auto summaryBatch = std::make_unique(); + auto summaryBatch = std::make_unique(); summaryBatch->reads = std::move(inReadsSummaryBatch); summaryBatch->batchN = readBatchIn.batchN; summaryBatch->fileN = readBatchIn.fileN; @@ -758,20 +718,16 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) threadLog.print(); } - writerMutexCondition.notify_one(); return true; } - -InRead InReads::traverseInRead(Log* threadLog, Sequence2* sequence, uint32_t seqPos) -{ +InRead InReads::traverseInRead(Log* threadLog, Sequence2* sequence, uint32_t seqPos) { std::vector> bedCoords; if (userInput.hc_cutoff != -1) { - // homopolymerCompress expects a std::string* homopolymerCompress(&sequence->sequence, bedCoords, userInput.hc_cutoff); - // Keep capacity in pool, just clear + // keep capacity, just clear content sequence->sequenceQuality.clear(); } @@ -787,27 +743,40 @@ InRead InReads::traverseInRead(Log* threadLog, Sequence2* sequence, uint32_t seq } } - float avgQuality = (!sequence->sequenceQuality.empty()) - ? computeAvgQuality(sequence->sequenceQuality) // make this take const std::string& - : 0.0f; + const float avgQuality = + (!sequence->sequenceQuality.empty()) + ? computeAvgQuality(sequence->sequenceQuality) + : 0.0f; - // COPY payload into InRead-owned buffers so output can keep it, - // while Sequence2 retains its capacity in the pool. - auto seqCopy = std::make_unique(sequence->sequence); + // Decide if this read needs payload in InRead. + // - Needed for CiFi processing + // - For content summaries, you currently drop payload when !streamOutput, so payload not needed there. + // If you later decide content summaries should keep sequence, flip this. + const bool needPayload = userInput.inputCifi; - std::unique_ptr qualCopy; - if (!sequence->sequenceQuality.empty()) - qualCopy = std::make_unique(sequence->sequenceQuality); + std::unique_ptr seqPtr; + std::unique_ptr qualPtr; + + if (needPayload) { + // yes, this allocates — but ONLY for CiFi (or whatever you later enable) + seqPtr = std::make_unique(sequence->sequence); + + if (!sequence->sequenceQuality.empty()) + qualPtr = std::make_unique(sequence->sequenceQuality); + // else leave nullptr; caller can synthesize if needed + } InRead inRead( threadLog, 0, 0, sequence->header, sequence->comment, - std::move(seqCopy), - std::move(qualCopy), - seqPos, A, C, G, T, N, lowerCount, + std::move(seqPtr), + std::move(qualPtr), + seqPos, + A, C, G, T, N, lowerCount, avgQuality ); + // Preserve your old behavior: if content summaries + no streaming, drop payload if (userInput.content_flag && !streamOutput) inRead.dropPayload(); @@ -1331,7 +1300,7 @@ void InReads::writeToStream() { htsFile* ofp = fps[outId]; bam_hdr_t* ohdr = hdrs[outId]; - lg.verbose("Writing " + std::to_string(bptr->reads.size()) + + lg.verbose("Writing " + std::to_string(bptr->used) + " reads (batch " + std::to_string(bptr->batchN) + ") to outId " + std::to_string(outId) + " (fileId " + std::to_string(fileId) + ")"); From f2e704d67bfcef9cec735f04ced8d117381c9917 Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Tue, 24 Feb 2026 23:34:56 -0500 Subject: [PATCH 04/15] removed zlib dep, fasta/fastq input also handled by htslib --- src/reads.cpp | 457 ++++++++++++++++++++++++-------------------------- 1 file changed, 223 insertions(+), 234 deletions(-) diff --git a/src/reads.cpp b/src/reads.cpp index 3a27dbd..0480b5b 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -5,7 +5,9 @@ #include #include +#include #include +#include #include #include "log.h" @@ -19,10 +21,6 @@ #include "stream-obj.h" #include -#include "zlib.h" -#include "zstream/zstream_common.hpp" -#include "zstream/ozstream.hpp" -#include "zstream/ozstream_impl.hpp" #include "output.h" #include "len-vector.h" @@ -147,268 +145,259 @@ void InReads::load() { void InReads::extractInReads() { - std::string newLine, seqHeader, seqComment, line, bedHeader; - std::string seqBuf, qualBuf, plusBuf, qnameBuf; // reusable buffers - std::size_t numFiles = userInput.inFiles.size(); - uint64_t processedLength = 0; - bool sample = userInput.ratio < 1; + const std::size_t numFiles = userInput.inFiles.size(); + const bool sample = userInput.ratio < 1; - const static phmap::flat_hash_map string_to_case{ - {"fasta",1}, {"fa",1}, {"fasta.gz",1}, {"fa.gz",1}, - {"fastq",1}, {"fq",1}, {"fastq.gz",1}, {"fq.gz",1}, - {"bam",2}, {"cram",2}, - {"rd",3} + // Reusable buffers (per worker thread invocation of extractInReads) + std::string name, comment, seqBuf, qualBuf, qnameBuf; + + auto split_header = [](const char* s, std::string& header, std::string& comm) { + const char* sp = std::strchr(s, ' '); + if (!sp) { header.assign(s); comm.clear(); } + else { header.assign(s, sp - s); comm.assign(sp + 1); } }; - while (true) { + auto push_batch = [&](std::unique_ptr& readBatch, uint32_t& batchN, uint32_t fileN) { + readBatch->batchN = batchN++; + readBatch->fileN = fileN; + lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); + filled_q_in.push(std::move(readBatch)); + readBatch = free_pool_in.pop(); + }; - uint32_t i = fileCounterStarted.fetch_add(1, std::memory_order_relaxed); - if (i >= numFiles) - break; + auto enable_threads = [&](htsFile* fp) { + if (fp && userInput.decompression_threads > 1) { + // works for BGZF; harmless if unsupported for this handle + (void)hts_set_threads(fp, userInput.decompression_threads); + } + }; - std::unique_ptr readBatch = free_pool_in.pop(); - uint32_t batchN = 0; + auto read_fa_fq = [&](const std::string& file, + std::unique_ptr& readBatch, + uint32_t& batchN, + uint32_t fileN) { - std::string file = userInput.file('r', i); - std::string ext = getFileExt(file); + htsFile* fp = hts_open(file.c_str(), "r"); + if (!fp) { + fprintf(stderr, "Failed to open input (%s)\n", file.c_str()); + exit(EXIT_FAILURE); + } + enable_threads(fp); - if (ext != "rd") { - threadPool.queueJob([this, i, file]() { - std::string md5; - computeMd5(file, md5); - md5s[i].first = getFileName(file); - md5s[i].second = md5; - return true; - }); + kstring_t ks = {0, 0, nullptr}; + + // Decide FASTA vs FASTQ from first non-empty line + int ret = hts_getline(fp, '\n', &ks); + while (ret >= 0 && ks.l == 0) ret = hts_getline(fp, '\n', &ks); + + if (ret < 0) { // empty file: keep your “residual even if empty” behavior outside + if (ks.s) free(ks.s); + hts_close(fp); + return; } - switch (string_to_case.count(ext) ? string_to_case.at(ext) : 0) { - - case 1: { // fa*[.gz] or fq*[.gz] - StreamObj streamObj; - std::shared_ptr stream = streamObj.openStream(userInput, 'r', i); - - if (stream) { - switch (stream->peek()) { - - case '>': { // FASTA - stream->get(); // consume '>' - - while (getline(*stream, newLine)) { - - const size_t spacePos = newLine.find(' '); - if (spacePos == std::string::npos) { - seqHeader = newLine; - seqComment.clear(); - } else { - seqHeader.assign(newLine, 0, spacePos); - seqComment.assign(newLine, spacePos + 1, std::string::npos); - } - - // Read sequence until next '>' - seqBuf.clear(); - if (!getline(*stream, seqBuf, '>')) { - fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); - exit(EXIT_FAILURE); - } - - // Subsample BEFORE doing anything else - if (sample && newRand() > userInput.ratio) - continue; - - processedLength += seqBuf.size(); - - // Store into a reused slot (Sequence2 stores strings by value) - { - Sequence2& rec = readBatch->next_slot(); - rec.set(std::move(seqHeader), - std::move(seqComment), - std::move(seqBuf), - std::string{}, // no qualities for fasta - seqPos++); - } - - if (processedLength > batchSize) { - readBatch->batchN = batchN++; - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); - filled_q_in.push(std::move(readBatch)); - - readBatch = free_pool_in.pop(); - processedLength = 0; - } - } - break; - } - - case '@': { // FASTQ - while (getline(*stream, newLine)) { - - // header line starts with '@' - if (!newLine.empty() && newLine[0] == '@') - newLine.erase(0, 1); - - const size_t spacePos = newLine.find(' '); - if (spacePos == std::string::npos) { - seqHeader = newLine; - seqComment.clear(); - } else { - seqHeader.assign(newLine, 0, spacePos); - seqComment.assign(newLine, spacePos + 1, std::string::npos); - } - - seqBuf.clear(); - plusBuf.clear(); - qualBuf.clear(); - - if (!getline(*stream, seqBuf)) { - fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); - exit(EXIT_FAILURE); - } - if (!getline(*stream, plusBuf)) { - fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); - exit(EXIT_FAILURE); - } - if (!getline(*stream, qualBuf)) { - fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); - exit(EXIT_FAILURE); - } - - // Subsample early - if (sample && newRand() > userInput.ratio) - continue; - - processedLength += seqBuf.size(); - - { - Sequence2& rec = readBatch->next_slot(); - rec.set(std::move(seqHeader), - std::move(seqComment), - std::move(seqBuf), - std::move(qualBuf), - seqPos++); - } - - if (processedLength > batchSize) { - readBatch->batchN = batchN++; - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); - filled_q_in.push(std::move(readBatch)); - - readBatch = free_pool_in.pop(); - processedLength = 0; - } - } - break; - } + uint64_t processedLength = 0; + + auto flush_if_needed = [&](size_t add) { + processedLength += add; + if (processedLength > batchSize) { + push_batch(readBatch, batchN, fileN); + processedLength = 0; + } + }; + + if (ks.s[0] == '>') { + // FASTA: header line starts with '>', sequence may be wrapped. + std::string pendingHeader(ks.s + 1); + + while (true) { + split_header(pendingHeader.c_str(), name, comment); + + seqBuf.clear(); + while (true) { + ret = hts_getline(fp, '\n', &ks); + if (ret < 0) { pendingHeader.clear(); break; } // EOF ends record + if (ks.l > 0 && ks.s[0] == '>') { // next record + pendingHeader.assign(ks.s + 1); + break; } + seqBuf.append(ks.s, ks.l); // append wrapped lines + } + + if (!(sample && newRand() > userInput.ratio)) { + Sequence2& rec = readBatch->next_slot(); + rec.set(std::move(name), + std::move(comment), + std::move(seqBuf), + std::string{}, // no qualities + seqPos++); + flush_if_needed(rec.sequence.size()); + } + + if (pendingHeader.empty()) break; + } - // process residual reads (even if empty; keep your existing behavior) - readBatch->batchN = batchN++; - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); - filled_q_in.push(std::move(readBatch)); + } else if (ks.s[0] == '@') { + // FASTQ: 4-line records (as in your original; assumes seq/qual are single-line) + while (true) { + if (ks.l == 0 || ks.s[0] != '@') { + fprintf(stderr, "FASTQ malformed (expected '@') in %s\n", file.c_str()); + exit(EXIT_FAILURE); + } + + split_header(ks.s + 1, name, comment); + + if (hts_getline(fp, '\n', &ks) < 0) { // seq + fprintf(stderr, "Record appears truncated (%s). Exiting.\n", name.c_str()); + exit(EXIT_FAILURE); + } + seqBuf.assign(ks.s, ks.l); + + if (hts_getline(fp, '\n', &ks) < 0) { // plus + fprintf(stderr, "Record appears truncated (%s). Exiting.\n", name.c_str()); + exit(EXIT_FAILURE); + } - processedLength = 0; + if (hts_getline(fp, '\n', &ks) < 0) { // qual + fprintf(stderr, "Record appears truncated (%s). Exiting.\n", name.c_str()); + exit(EXIT_FAILURE); + } + qualBuf.assign(ks.s, ks.l); + + if (!(sample && newRand() > userInput.ratio)) { + Sequence2& rec = readBatch->next_slot(); + rec.set(std::move(name), + std::move(comment), + std::move(seqBuf), + std::move(qualBuf), + seqPos++); + flush_if_needed(rec.sequence.size()); } - break; + + ret = hts_getline(fp, '\n', &ks); // next header + while (ret >= 0 && ks.l == 0) ret = hts_getline(fp, '\n', &ks); + if (ret < 0) break; } - case 2: { // bam, cram - samFile* fp_in = hts_open(file.c_str(), "r"); - bam_hdr_t* bamHdr = sam_hdr_read(fp_in); - bam1_t* bamdata = bam_init1(); + } else { + fprintf(stderr, "Cannot recognize text input (%s). Expected '>' (FASTA) or '@' (FASTQ).\n", file.c_str()); + exit(EXIT_FAILURE); + } + + if (ks.s) free(ks.s); + hts_close(fp); + }; - htsThreadPool tpool_read = {NULL, 0}; - tpool_read.pool = hts_tpool_init(userInput.decompression_threads); - if (tpool_read.pool) - hts_set_opt(fp_in, HTS_OPT_THREAD_POOL, &tpool_read); - else - lg.verbose("Failed to generate decompression threadpool with " + - std::to_string(userInput.decompression_threads) + - " threads. Continuing single-threaded"); - - while (sam_read1(fp_in, bamHdr, bamdata) > 0) { - - // Subsample BEFORE decoding - if (sample && newRand() > userInput.ratio) - continue; - - const uint32_t len = bamdata->core.l_qseq; - - // qname (reuse capacity) - qnameBuf.assign(bam_get_qname(bamdata)); - - // decode sequence (reuse capacity) - seqBuf.resize(len); - char* out = len ? &seqBuf[0] : nullptr; - uint8_t* bseq = bam_get_seq(bamdata); - for (uint32_t k = 0; k < len; ++k) - out[k] = seq_nt16_str[bam_seqi(bseq, k)]; - - // decode/synthesize qualities (reuse capacity) - qualBuf.resize(len); - char* qout = len ? &qualBuf[0] : nullptr; - uint8_t* bq = bam_get_qual(bamdata); - if (bq && len > 0 && bq[0] != 0xFF) { - for (uint32_t k = 0; k < len; ++k) - qout[k] = static_cast(bq[k] + 33); - } else { - std::memset(qout, '!', len); - } + auto read_bam_cram = [&](const std::string& file, + std::unique_ptr& readBatch, + uint32_t& batchN, + uint32_t fileN) { - processedLength += len; + samFile* fp = hts_open(file.c_str(), "r"); + if (!fp) { + fprintf(stderr, "Failed to open BAM/CRAM (%s)\n", file.c_str()); + exit(EXIT_FAILURE); + } + enable_threads(fp); - { - Sequence2& rec = readBatch->next_slot(); - rec.set(std::move(qnameBuf), - std::string{}, // no comment for bam - std::move(seqBuf), - std::move(qualBuf), - seqPos++); - } + bam_hdr_t* hdr = sam_hdr_read(fp); + if (!hdr) { + fprintf(stderr, "Failed to read BAM/CRAM header (%s)\n", file.c_str()); + exit(EXIT_FAILURE); + } - if (processedLength > batchSize) { - readBatch->batchN = batchN++; - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); - filled_q_in.push(std::move(readBatch)); + bam1_t* b = bam_init1(); + if (!b) { + fprintf(stderr, "Failed to allocate bam1_t\n"); + exit(EXIT_FAILURE); + } - readBatch = free_pool_in.pop(); - processedLength = 0; - } - } + uint64_t processedLength = 0; - // residual - readBatch->batchN = batchN++; - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); - filled_q_in.push(std::move(readBatch)); + while (sam_read1(fp, hdr, b) > 0) { - processedLength = 0; + if (sample && newRand() > userInput.ratio) + continue; - bam_destroy1(bamdata); - sam_close(fp_in); - if (tpool_read.pool) - hts_tpool_destroy(tpool_read.pool); + const uint32_t len = b->core.l_qseq; - break; - } + qnameBuf.assign(bam_get_qname(b)); + + seqBuf.resize(len); + char* out = len ? &seqBuf[0] : nullptr; + uint8_t* bseq = bam_get_seq(b); + for (uint32_t k = 0; k < len; ++k) + out[k] = seq_nt16_str[bam_seqi(bseq, k)]; - case 3: { // rd - readTableCompressed(userInput.inFiles[i]); - break; + qualBuf.resize(len); + char* qout = len ? &qualBuf[0] : nullptr; + uint8_t* bq = bam_get_qual(b); + if (bq && len > 0 && bq[0] != 0xFF) { + for (uint32_t k = 0; k < len; ++k) + qout[k] = static_cast(bq[k] + 33); + } else if (len) { + std::memset(qout, '!', len); } - default: { - fprintf(stderr, "cannot recognize input (%s). Must be: fasta, fastq, bam, cram.\n", file.c_str()); - exit(EXIT_FAILURE); + processedLength += len; + + Sequence2& rec = readBatch->next_slot(); + rec.set(std::move(qnameBuf), + std::string{}, // comment not stored in BAM path + std::move(seqBuf), + std::move(qualBuf), + seqPos++); + + if (processedLength > batchSize) { + push_batch(readBatch, batchN, fileN); + processedLength = 0; } } + bam_destroy1(b); + bam_hdr_destroy(hdr); + hts_close(fp); + }; + + while (true) { + + uint32_t i = fileCounterStarted.fetch_add(1, std::memory_order_relaxed); + if (i >= numFiles) break; + + std::unique_ptr readBatch = free_pool_in.pop(); + uint32_t batchN = 0; + + const std::string file = userInput.file('r', i); + const std::string ext = getFileExt(file); + + if (ext != "rd") { + threadPool.queueJob([this, i, file]() { + std::string md5; + computeMd5(file, md5); + md5s[i].first = getFileName(file); + md5s[i].second = md5; + return true; + }); + } + + if (ext == "rd") { + readTableCompressed(userInput.inFiles[i]); + } else if (ext == "bam" || ext == "cram") { + read_bam_cram(file, readBatch, batchN, i); + push_batch(readBatch, batchN, i); // residual (even if empty) + } else if (ext == "fasta" || ext == "fa" || ext == "fasta.gz" || ext == "fa.gz" || + ext == "fastq" || ext == "fq" || ext == "fastq.gz" || ext == "fq.gz") { + read_fa_fq(file, readBatch, batchN, i); + push_batch(readBatch, batchN, i); // residual (even if empty) + } else { + fprintf(stderr, "cannot recognize input (%s). Must be: fasta/fastq (optionally .gz), bam, cram, rd.\n", + file.c_str()); + exit(EXIT_FAILURE); + } + if (fileCounterCompleted.fetch_add(1, std::memory_order_relaxed) + 1 == numFiles) { for (size_t k = 0; k < consumersN; ++k) - filled_q_in.push(std::unique_ptr(nullptr)); // sentinels + filled_q_in.push(std::unique_ptr(nullptr)); } } } From 498b168b8996acf48044cf7cc31b9158ced66f8d Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Wed, 25 Feb 2026 00:11:02 -0500 Subject: [PATCH 05/15] precompute powers --- src/reads.cpp | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/src/reads.cpp b/src/reads.cpp index 0480b5b..4f3988e 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -402,11 +402,35 @@ void InReads::extractInReads() { } } -float InReads::computeAvgQuality(const std::string &sequenceQuality) { - double sumQuality = 0; - for (const char &quality : sequenceQuality) - sumQuality += pow(10, -(double(quality) - 33)/10); - return (float) -10 * std::log10(sumQuality/sequenceQuality.size()); +static inline const float* phredProbLUT() { + // char is ASCII, but quality is (char-33) in [0..93] typically + static float lut[94]; + static bool inited = false; + if (!inited) { + for (int q = 0; q < 94; ++q) { + // 10^(-q/10) + lut[q] = std::pow(10.0f, -float(q)/10.0f); + } + inited = true; + } + return lut; +} + +static inline float avg_q_from_phred33_prob(const std::string& qstr) { + const float* lut = phredProbLUT(); + double sum = 0.0; + for (unsigned char c : qstr) { + const int q = int(c) - 33; + if (q >= 0 && q < 94) sum += lut[q]; + else sum += lut[0]; // defensive + } + const double mean = sum / double(qstr.size()); + return (float)(-10.0 * std::log10(mean)); +} + +float computeAvgQuality(const std::string& sequenceQuality) { + if (sequenceQuality.empty()) return 0.0f; + return avg_q_from_phred33_prob(sequenceQuality); } void InReads::initDictionaries() { @@ -1264,9 +1288,6 @@ void InReads::writeToStream() { const uint64_t id = batch->batchN; if (fileId >= numStreams) { - // Defensive: bad file index, recycle (shouldn't happen) - for (bam1_t* rec : batch->reads) - bam_destroy1(rec); batch->used = 0; free_pool_out.push(std::move(batch)); continue; From 093f1325800923aaec1d35a1b0530891da8489d2 Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Wed, 25 Feb 2026 00:27:57 -0500 Subject: [PATCH 06/15] free up memory from batches at the end --- include/reads.h | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/include/reads.h b/include/reads.h index 611e1bb..b8095b8 100644 --- a/include/reads.h +++ b/include/reads.h @@ -129,7 +129,15 @@ struct FileBatches { std::vector> files; }; -using BamBatch = ReadBatch; +struct BamBatch : ReadBatch { + ~BamBatch() { + for (bam1_t* b : this->reads) { + if (b) bam_destroy1(b); + } + this->reads.clear(); + this->used = 0; + } +}; using InReadBatch = ReadBatch; // ------------------------------------------------------------- From 3abf269f53c661fcf3a39e0fcd07cc3983e77522 Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Wed, 25 Feb 2026 00:28:14 -0500 Subject: [PATCH 07/15] faster sampling input loop --- src/reads.cpp | 66 +++++++++++++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/src/reads.cpp b/src/reads.cpp index 4f3988e..d80af32 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -213,23 +213,28 @@ void InReads::extractInReads() { while (true) { split_header(pendingHeader.c_str(), name, comment); + // Decide once per record whether we keep it + const bool keep = (!sample) || (newRand() <= userInput.ratio); + seqBuf.clear(); while (true) { ret = hts_getline(fp, '\n', &ks); - if (ret < 0) { pendingHeader.clear(); break; } // EOF ends record - if (ks.l > 0 && ks.s[0] == '>') { // next record + if (ret < 0) { pendingHeader.clear(); break; } // EOF ends record + if (ks.l > 0 && ks.s[0] == '>') { // next record pendingHeader.assign(ks.s + 1); break; } - seqBuf.append(ks.s, ks.l); // append wrapped lines + + // Only build sequence if we keep this record + if (keep) seqBuf.append(ks.s, ks.l); } - if (!(sample && newRand() > userInput.ratio)) { + if (keep) { Sequence2& rec = readBatch->next_slot(); rec.set(std::move(name), std::move(comment), std::move(seqBuf), - std::string{}, // no qualities + std::string{}, // no qualities in FASTA seqPos++); flush_if_needed(rec.sequence.size()); } @@ -238,7 +243,7 @@ void InReads::extractInReads() { } } else if (ks.s[0] == '@') { - // FASTQ: 4-line records (as in your original; assumes seq/qual are single-line) + // FASTQ: 4-line records (assumes seq/qual are single-line) while (true) { if (ks.l == 0 || ks.s[0] != '@') { fprintf(stderr, "FASTQ malformed (expected '@') in %s\n", file.c_str()); @@ -247,24 +252,30 @@ void InReads::extractInReads() { split_header(ks.s + 1, name, comment); - if (hts_getline(fp, '\n', &ks) < 0) { // seq + // Decide once per record whether we keep it + const bool keep = (!sample) || (newRand() <= userInput.ratio); + + // seq line + if (hts_getline(fp, '\n', &ks) < 0) { fprintf(stderr, "Record appears truncated (%s). Exiting.\n", name.c_str()); exit(EXIT_FAILURE); } - seqBuf.assign(ks.s, ks.l); + if (keep) seqBuf.assign(ks.s, ks.l); - if (hts_getline(fp, '\n', &ks) < 0) { // plus + // plus line + if (hts_getline(fp, '\n', &ks) < 0) { fprintf(stderr, "Record appears truncated (%s). Exiting.\n", name.c_str()); exit(EXIT_FAILURE); } - if (hts_getline(fp, '\n', &ks) < 0) { // qual + // qual line + if (hts_getline(fp, '\n', &ks) < 0) { fprintf(stderr, "Record appears truncated (%s). Exiting.\n", name.c_str()); exit(EXIT_FAILURE); } - qualBuf.assign(ks.s, ks.l); + if (keep) qualBuf.assign(ks.s, ks.l); - if (!(sample && newRand() > userInput.ratio)) { + if (keep) { Sequence2& rec = readBatch->next_slot(); rec.set(std::move(name), std::move(comment), @@ -274,11 +285,11 @@ void InReads::extractInReads() { flush_if_needed(rec.sequence.size()); } - ret = hts_getline(fp, '\n', &ks); // next header + // next header + ret = hts_getline(fp, '\n', &ks); while (ret >= 0 && ks.l == 0) ret = hts_getline(fp, '\n', &ks); if (ret < 0) break; } - } else { fprintf(stderr, "Cannot recognize text input (%s). Expected '>' (FASTA) or '@' (FASTQ).\n", file.c_str()); exit(EXIT_FAILURE); @@ -402,35 +413,34 @@ void InReads::extractInReads() { } } -static inline const float* phredProbLUT() { - // char is ASCII, but quality is (char-33) in [0..93] typically - static float lut[94]; +static inline const double* phredProbLUTd() { + static double lut[94]; static bool inited = false; if (!inited) { for (int q = 0; q < 94; ++q) { - // 10^(-q/10) - lut[q] = std::pow(10.0f, -float(q)/10.0f); + lut[q] = std::pow(10.0, -double(q) / 10.0); } inited = true; } return lut; } -static inline float avg_q_from_phred33_prob(const std::string& qstr) { - const float* lut = phredProbLUT(); +static inline double avg_q_from_phred33_prob_d(const std::string& qstr) { + if (qstr.empty()) return 0.0; + const double* lut = phredProbLUTd(); double sum = 0.0; for (unsigned char c : qstr) { - const int q = int(c) - 33; - if (q >= 0 && q < 94) sum += lut[q]; - else sum += lut[0]; // defensive + int q = int(c) - 33; + if (q < 0) q = 0; + if (q > 93) q = 93; + sum += lut[q]; } const double mean = sum / double(qstr.size()); - return (float)(-10.0 * std::log10(mean)); + return -10.0 * std::log10(mean); } -float computeAvgQuality(const std::string& sequenceQuality) { - if (sequenceQuality.empty()) return 0.0f; - return avg_q_from_phred33_prob(sequenceQuality); +float InReads::computeAvgQuality(const std::string& sequenceQuality) { + return (float)avg_q_from_phred33_prob_d(sequenceQuality); } void InReads::initDictionaries() { From 20d27333d73414d4dab654248904532b23dbc44d Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Wed, 25 Feb 2026 13:09:28 -0500 Subject: [PATCH 08/15] added elasticblockingqueue to reduce memory overhead --- gfalibs | 2 +- include/reads.h | 2 +- src/reads.cpp | 503 ++++++++++++++++++++++++------------------------ 3 files changed, 251 insertions(+), 256 deletions(-) diff --git a/gfalibs b/gfalibs index f1d26fa..e54a382 160000 --- a/gfalibs +++ b/gfalibs @@ -1 +1 @@ -Subproject commit f1d26fa5b6723692aa6c0630ec9a3afde8a1ee50 +Subproject commit e54a3820d918345643861ff7c645ae0befa21cd8 diff --git a/include/reads.h b/include/reads.h index b8095b8..01fc584 100644 --- a/include/reads.h +++ b/include/reads.h @@ -202,7 +202,7 @@ class InReads { size_t outBuffersN = 0; // Input queues - BlockingQueue> free_pool_in; + ElasticBlockingPool free_pool_in; BlockingQueue> filled_q_in; // Output queues (BAM batches) diff --git a/src/reads.cpp b/src/reads.cpp index d80af32..a106533 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -39,7 +39,7 @@ InReads::InReads(UserInputRdeval& ui) ), inBuffersN((consumersN + producersN) * 2), outBuffersN(consumersN * 4 + 1), - free_pool_in(inBuffersN), + free_pool_in(inBuffersN,1), filled_q_in(inBuffersN), free_pool_out(outBuffersN + outBuffersN * userInput.inputCifi), // two buffers per consumer, double that if cifi (PE output) filled_q_out(outBuffersN + outBuffersN * userInput.inputCifi) @@ -99,11 +99,6 @@ void InReads::load() { readSummaryBatches.files.resize(userInput.inFiles.size()); // resize to accommodate batches from multiple files fileBatches.files.resize(userInput.inFiles.size()); // resize to accommodate batches from multiple files - // Preallocate exactly N buffers - for (size_t i = 0; i < inBuffersN; ++i) { - std::unique_ptr b(new Sequences2); - free_pool_in.push(std::move(b)); - } if (streamOutput) { for (size_t i = 0; i < (outBuffersN + outBuffersN * userInput.inputCifi); ++i) { auto batch = std::make_unique(); @@ -444,71 +439,71 @@ float InReads::computeAvgQuality(const std::string& sequenceQuality) { } void InReads::initDictionaries() { - - std::ifstream includeFile(userInput.inBedInclude); - std::string key; + + std::ifstream includeFile(userInput.inBedInclude); + std::string key; - while (includeFile >> key) - includeList.insert(key); + while (includeFile >> key) + includeList.insert(key); - includeFile.close(); - - std::ifstream excludeFile(userInput.inBedExclude); + includeFile.close(); + + std::ifstream excludeFile(userInput.inBedExclude); - while (excludeFile >> key) - excludeList.insert(key); + while (excludeFile >> key) + excludeList.insert(key); - excludeFile.close(); + excludeFile.close(); } void InReads::initFilters() { - - bool cannotParse = false; - - std::istringstream stream(userInput.filter); // get l,q and logical operator in between - std::string token1; - std::string token2; - std::string token3; - stream >> token1 >> token2 >> token3; - - if ((userInput.filter.find('l') == std::string::npos && userInput.filter.find('q') == std::string::npos) || // no filter found - (token1.size() < 3) || // first filter too short - (token2.size() != 0 && token3.size() == 0) || // second filter missing - (token2.size() == 0 && token3.size() != 0) || // missing operator - (token2.size() > 1) // malformatted operator - ) - cannotParse = true; - - if (cannotParse){ - fprintf(stderr, "Could not parse filter. Terminating.\n"); - exit(EXIT_FAILURE); - } - - std::string lFilter, qFilter; - if (token2.size()) - logicalOperator = token2[0]; - - if (token1[0] == 'l') { - lFilter = token1; - lSign = lFilter[1]; - l = stoi(lFilter.substr(2)); - } else if (token1[0] == 'q') { - qFilter = token1; - qSign = qFilter[1]; - q = stoi(qFilter.substr(2)); - } - - if (token3.size() != 0) { - if (token3[0] == 'l') { - lFilter = token3; - lSign = lFilter[1]; - l = stoi(lFilter.substr(2)); - }else if (token3[0] == 'q') { - qFilter = token3; - qSign = qFilter[1]; - q = stoi(qFilter.substr(2)); - } - } + + bool cannotParse = false; + + std::istringstream stream(userInput.filter); // get l,q and logical operator in between + std::string token1; + std::string token2; + std::string token3; + stream >> token1 >> token2 >> token3; + + if ((userInput.filter.find('l') == std::string::npos && userInput.filter.find('q') == std::string::npos) || // no filter found + (token1.size() < 3) || // first filter too short + (token2.size() != 0 && token3.size() == 0) || // second filter missing + (token2.size() == 0 && token3.size() != 0) || // missing operator + (token2.size() > 1) // malformatted operator + ) + cannotParse = true; + + if (cannotParse){ + fprintf(stderr, "Could not parse filter. Terminating.\n"); + exit(EXIT_FAILURE); + } + + std::string lFilter, qFilter; + if (token2.size()) + logicalOperator = token2[0]; + + if (token1[0] == 'l') { + lFilter = token1; + lSign = lFilter[1]; + l = stoi(lFilter.substr(2)); + } else if (token1[0] == 'q') { + qFilter = token1; + qSign = qFilter[1]; + q = stoi(qFilter.substr(2)); + } + + if (token3.size() != 0) { + if (token3[0] == 'l') { + lFilter = token3; + lSign = lFilter[1]; + l = stoi(lFilter.substr(2)); + }else if (token3[0] == 'q') { + qFilter = token3; + qSign = qFilter[1]; + q = stoi(qFilter.substr(2)); + } + } } void InReads::filterRecords() { @@ -540,26 +535,26 @@ inline bool InReads::filterRead(Sequence2* sequence) { } inline bool InReads::applyFilter(uint64_t size, float avgQuality) { - - bool lFilter = ((lSign == '0') || - ((lSign == '>') && (size > l)) || - ((lSign == '<') && (size < l)) || - ((lSign == '=') && (size == l)) - ); - - bool qFilter = ((qSign == '0') || - ((qSign == '>') && (avgQuality > q)) || - ((qSign == '<') && (avgQuality < q)) || - ((qSign == '=') && (avgQuality == q)) - ); - - if ((logicalOperator == '0' && (lSign != '0' && lFilter)) || - (logicalOperator == '0' && (qSign != '0' && qFilter)) || - (logicalOperator == '|' && (lFilter || qFilter)) || - (logicalOperator == '&' && (lFilter && qFilter)) - ) - return false; - return true; + + bool lFilter = ((lSign == '0') || + ((lSign == '>') && (size > l)) || + ((lSign == '<') && (size < l)) || + ((lSign == '=') && (size == l)) + ); + + bool qFilter = ((qSign == '0') || + ((qSign == '>') && (avgQuality > q)) || + ((qSign == '<') && (avgQuality < q)) || + ((qSign == '=') && (avgQuality == q)) + ); + + if ((logicalOperator == '0' && (lSign != '0' && lFilter)) || + (logicalOperator == '0' && (qSign != '0' && qFilter)) || + (logicalOperator == '|' && (lFilter || qFilter)) || + (logicalOperator == '&' && (lFilter && qFilter)) + ) + return false; + return true; } bool InReads::traverseInReads(Sequences2& readBatchIn) { @@ -814,164 +809,164 @@ uint64_t InReads::getTotReadLen() { for (uint64_t i = 0; i < readCount; ++i) sum += readLens[i].first; } - return sum; + return sum; } double InReads::computeGCcontent() { - uint64_t totReadLen = totA + totC + totG + totT; - double GCcontent = (double) (totG+totC)/totReadLen * 100; - - return GCcontent; + uint64_t totReadLen = totA + totC + totG + totT; + double GCcontent = (double) (totG+totC)/totReadLen * 100; + + return GCcontent; } double InReads::computeAvgReadLen() { - return (double) getTotReadLen()/totReads; + return (double) getTotReadLen()/totReads; } uint64_t InReads::getReadN50() { - return readNstars[4]; + return readNstars[4]; } void InReads::evalNstars() { // clean up once len-vector iterator is available, still expensive - uint64_t sum = 0, totLen = getTotReadLen(); - uint8_t N = 1; - for(unsigned int i = 0; i < readLens.size(); ++i) { // for each length - sum += readLens[i].first; // increase sum - while (sum >= ((double) totLen / 10 * N) && N<= 10) { // conditionally add length.at or pos to each N/L* bin - - readNstars[N-1] = readLens[i].first; - readLstars[N-1] = i + 1; - N += 1; - } - } + uint64_t sum = 0, totLen = getTotReadLen(); + uint8_t N = 1; + for(unsigned int i = 0; i < readLens.size(); ++i) { // for each length + sum += readLens[i].first; // increase sum + while (sum >= ((double) totLen / 10 * N) && N<= 10) { // conditionally add length.at or pos to each N/L* bin + + readNstars[N-1] = readLens[i].first; + readLstars[N-1] = i + 1; + N += 1; + } + } } uint64_t InReads::getSmallestRead() { - return readLens.back(); + return readLens.back(); } uint64_t InReads::getLargestRead() { - return readLens.front(); + return readLens.front(); } double InReads::getAvgQuality(){ - double sumQualities = 0, avgQualitiesSize=readLens.size(); + double sumQualities = 0, avgQualitiesSize=readLens.size(); - for (uint64_t i = 0; i < avgQualitiesSize; ++i) - sumQualities += readLens[i].first * pow(10, -readLens[i].second/10); // sum the qualities normalized by their read length + for (uint64_t i = 0; i < avgQualitiesSize; ++i) + sumQualities += readLens[i].first * pow(10, -readLens[i].second/10); // sum the qualities normalized by their read length - return -10 * std::log10(sumQualities/getTotReadLen()); + return -10 * std::log10(sumQualities/getTotReadLen()); } void InReads::report() { - if (totReads > 0) { + if (totReads > 0) { if (userInput.filter != "none" && getFileExt(userInput.file('r', 0)) == "rd") filterRecords(); - - readLens.sort(); - - std::cout << std::fixed; // disables scientific notation - std::cout << std::setprecision(2); // 2 decimal points - - if (!tabular_flag) - std::cout< hist; - - for (uint64_t i = 0; i < readCount; ++i) { - if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) - ++hist[readLens[i].first]; - } - std::vector> table(hist.begin(), hist.end()); // converts the hashmap to a table - std::sort(table.begin(), table.end()); - - for (auto pair : table) - std::cout< hist; - - for (uint64_t i = 0; i < readCount; ++i) { - if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) - ++hist[readLens[i].first]; - } - std::vector> table(hist.begin(), hist.end()); - std::sort(table.begin(), table.end()); - uint64_t totReadLen = getTotReadLen(), sum = 0; - - for (auto pair : table) { - std::cout<<+pair.first<<"\t"<<+pair.second<<"\t"<<+pair.first*pair.second<<"\t"<<+(totReadLen - sum)<<"\n"; - sum += pair.first*pair.second; - } - } + + std::cout << std::fixed; // disables scientific notation + std::cout << std::setprecision(2); // 2 decimal points + + uint64_t readCount = readLens.size(); + bool noFilter = userInput.filter == "none" ? true : false; + if(userInput.sizeOutType == 's') + readLens.sort(); + + if (userInput.sizeOutType == 'u' || userInput.sizeOutType == 's') { + + for (uint64_t i = 0; i < readCount; ++i) { + if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) + std::cout << readLens[i].first << "\n"; + } + + }else if(userInput.sizeOutType == 'h') { + + phmap::parallel_flat_hash_map hist; + + for (uint64_t i = 0; i < readCount; ++i) { + if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) + ++hist[readLens[i].first]; + } + std::vector> table(hist.begin(), hist.end()); // converts the hashmap to a table + std::sort(table.begin(), table.end()); + + for (auto pair : table) + std::cout< hist; + + for (uint64_t i = 0; i < readCount; ++i) { + if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) + ++hist[readLens[i].first]; + } + std::vector> table(hist.begin(), hist.end()); + std::sort(table.begin(), table.end()); + uint64_t totReadLen = getTotReadLen(), sum = 0; + + for (auto pair : table) { + std::cout<<+pair.first<<"\t"<<+pair.second<<"\t"<<+pair.first*pair.second<<"\t"<<+(totReadLen - sum)<<"\n"; + sum += pair.first*pair.second; + } + } } void InReads::printQualities() { - - std::cout << std::fixed; // disables scientific notation - std::cout << std::setprecision(2); // 2 decimal points - - uint64_t readCount = readLens.size(); - - bool noFilter = userInput.filter == "none" ? true : false; - char qualityOut = userInput.qualityOut; - - if (qualityOut == 'q'){ - for (uint64_t i = 0; i < readCount; ++i) { - if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) - std::cout << readLens[i].second << "\n"; - } - } - else if (qualityOut == 'a') { // a prints read lengths and qualities - for (uint64_t i = 0; i < readCount; ++i) { - if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) - std::cout << readLens[i].first << "," << readLens[i].second << "\n"; - } - } + + std::cout << std::fixed; // disables scientific notation + std::cout << std::setprecision(2); // 2 decimal points + + uint64_t readCount = readLens.size(); + + bool noFilter = userInput.filter == "none" ? true : false; + char qualityOut = userInput.qualityOut; + + if (qualityOut == 'q'){ + for (uint64_t i = 0; i < readCount; ++i) { + if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) + std::cout << readLens[i].second << "\n"; + } + } + else if (qualityOut == 'a') { // a prints read lengths and qualities + for (uint64_t i = 0; i < readCount; ++i) { + if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) + std::cout << readLens[i].first << "," << readLens[i].second << "\n"; + } + } } void InReads::printContent() { @@ -1020,61 +1015,61 @@ void InReads::printContent() { } void dump_read(bam1_t* b) { - printf("->core.tid:(%d)\n", b->core.tid); - printf("->core.pos:(%ld)\n", static_cast(b->core.pos)); - printf("->core.bin:(%d)\n", b->core.bin); - printf("->core.qual:(%d)\n", b->core.qual); - printf("->core.l_qname:(%d)\n", b->core.l_qname); - printf("->core.flag:(%d)\n", b->core.flag); - printf("->core.n_cigar:(%d)\n", b->core.n_cigar); - printf("->core.l_qseq:(%d)\n", b->core.l_qseq); - printf("->core.mtid:(%d)\n", b->core.mtid); - printf("->core.mpos:(%ld)\n", static_cast(b->core.mpos)); - printf("->core.isize:(%ld)\n", static_cast(b->core.isize)); - if (b->data) { - printf("->data:"); - int i; - for (i = 0; i < b->l_data; ++i) { - printf("%x ", b->data[i]); - } - printf("\n"); - } - if (b->core.l_qname) { - printf("qname: %s\n",bam_get_qname(b)); - } - if (b->core.l_qseq) { - printf("qseq:"); - int i; - for (i = 0; i < b->core.l_qseq; ++i) { - printf("%c", seq_nt16_str[bam_seqi(bam_get_seq(b),i)]); - } - printf("\n"); - printf("qual:"); - uint8_t *s = bam_get_qual(b); - for (i = 0; i < b->core.l_qseq; ++i) { - printf("%c",s[i] + 33); - } - printf("\n"); - - } - - if (bam_get_l_aux(b)) { - uint32_t i = 0; - uint8_t* aux = bam_get_aux(b); - - while (i < bam_get_l_aux(b)) { - printf("%.2s:%c:",aux+i,*(aux+i+2)); - i += 2; - switch (*(aux+i)) { - case 'Z': - while (*(aux+1+i) != '\0') { putc(*(aux+1+i), stdout); ++i; } - break; - } - putc('\n',stdout); - ++i;++i; - } - } - printf("\n"); + printf("->core.tid:(%d)\n", b->core.tid); + printf("->core.pos:(%ld)\n", static_cast(b->core.pos)); + printf("->core.bin:(%d)\n", b->core.bin); + printf("->core.qual:(%d)\n", b->core.qual); + printf("->core.l_qname:(%d)\n", b->core.l_qname); + printf("->core.flag:(%d)\n", b->core.flag); + printf("->core.n_cigar:(%d)\n", b->core.n_cigar); + printf("->core.l_qseq:(%d)\n", b->core.l_qseq); + printf("->core.mtid:(%d)\n", b->core.mtid); + printf("->core.mpos:(%ld)\n", static_cast(b->core.mpos)); + printf("->core.isize:(%ld)\n", static_cast(b->core.isize)); + if (b->data) { + printf("->data:"); + int i; + for (i = 0; i < b->l_data; ++i) { + printf("%x ", b->data[i]); + } + printf("\n"); + } + if (b->core.l_qname) { + printf("qname: %s\n",bam_get_qname(b)); + } + if (b->core.l_qseq) { + printf("qseq:"); + int i; + for (i = 0; i < b->core.l_qseq; ++i) { + printf("%c", seq_nt16_str[bam_seqi(bam_get_seq(b),i)]); + } + printf("\n"); + printf("qual:"); + uint8_t *s = bam_get_qual(b); + for (i = 0; i < b->core.l_qseq; ++i) { + printf("%c",s[i] + 33); + } + printf("\n"); + + } + + if (bam_get_l_aux(b)) { + uint32_t i = 0; + uint8_t* aux = bam_get_aux(b); + + while (i < bam_get_l_aux(b)) { + printf("%.2s:%c:",aux+i,*(aux+i+2)); + i += 2; + switch (*(aux+i)) { + case 'Z': + while (*(aux+1+i) != '\0') { putc(*(aux+1+i), stdout); ++i; } + break; + } + putc('\n',stdout); + ++i;++i; + } + } + printf("\n"); } void InReads::initStream() { @@ -1668,6 +1663,6 @@ void InReads::readTableCompressed(std::string inFile) { void InReads::printMd5() { - for (auto md5 : md5s) - std::cout< Date: Wed, 25 Feb 2026 14:37:39 -0500 Subject: [PATCH 09/15] Update reads.cpp --- src/reads.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/reads.cpp b/src/reads.cpp index a106533..bd0ffc6 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -136,6 +136,8 @@ void InReads::load() { for (auto& th : consumers) th.join(); filled_q_out.push(std::unique_ptr()); // sentinel closeStream(); + lg.verbose("free_pool_in created=" + std::to_string(free_pool_in.created()) + + " max=" + std::to_string(free_pool_in.max())); } void InReads::extractInReads() { From bc1b8e82eb8f22dae5a045cc875004b9df361def Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Wed, 25 Feb 2026 15:17:20 -0500 Subject: [PATCH 10/15] added bgzip support --- include/reads.h | 2 ++ src/main.cpp | 15 ++++++++++ src/reads.cpp | 78 ++++++++++++++++++++++++++++++++----------------- 3 files changed, 68 insertions(+), 27 deletions(-) diff --git a/include/reads.h b/include/reads.h index 01fc584..5a7f6ec 100644 --- a/include/reads.h +++ b/include/reads.h @@ -48,6 +48,8 @@ struct UserInputRdeval : UserInput { // CiFi bool inputCifi = false; std::string restrictionEnzyme = ""; + + int bgzip_level = -1; }; diff --git a/src/main.cpp b/src/main.cpp index f8764a2..f0fe443 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -48,6 +48,7 @@ int main(int argc, char **argv) { {"parallel-files", required_argument, 0, 0}, {"decompression-threads", required_argument, 0, 0}, {"compression-threads", required_argument, 0, 0}, + {"bgzip-level", optional_argument, 0, 0}, {"sequence-report",no_argument, 0, 0}, {"exclude-list", required_argument, 0, 'e'}, @@ -119,6 +120,19 @@ int main(int argc, char **argv) { userInput.restrictionEnzyme = optarg; userInput.inputCifi = true; } + if (strcmp(long_options[option_index].name, "bgzip-level") == 0) { + if (optarg) { + userInput.bgzip_level = atoi(optarg); + + if (userInput.bgzip_level < 0 || userInput.bgzip_level > 9) { + fprintf(stderr, "--bgzip-level must be 0–9\n"); + return EXIT_FAILURE; + } + } else { + // Option provided without argument → use minimum compression + userInput.bgzip_level = 0; + } + } break; default: // handle positional arguments if (isInt(optarg)) { // if the positional argument is a number, it is likely the expected genome size @@ -198,6 +212,7 @@ int main(int argc, char **argv) { printf("\t--parallel-files numbers of files that can be opened and processed in parallel (producer threads, default:4).\n"); printf("\t--decompression-threads numbers of decompression threads used by htslib for bam/cram (default:4).\n"); printf("\t--compression-threads numbers of compression threads used by htslib for bam/cram (default:6).\n"); + printf("\t--bgzip-level[=0-9] force BGZF (bgzip) compression for FASTA/FASTQ outputs (default: 0, minimum).\n"); printf("\t--tabular tabular output.\n"); printf("\t--verbose verbose output.\n"); printf("\t-m --max-memory max number of bases in a single buffer of the ring buffer (default:1000000 bp or ~1MB). The total number of buffers is approximately consumer threads*2.\n"); diff --git a/src/reads.cpp b/src/reads.cpp index bd0ffc6..3a12067 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -1135,10 +1135,9 @@ void InReads::openOutputForFile(size_t outId) { lg.verbose("openOutputForFile: invalid outId " + std::to_string(outId)); return; } - if (fps[outId] != nullptr) // already open return; - + const static phmap::flat_hash_map string_to_case{ {"fasta", 1}, {"fa", 1}, @@ -1151,33 +1150,32 @@ void InReads::openOutputForFile(size_t outId) { {"bam", 3}, {"cram", 4} }; - // Decide output file name + + // Decide output file name (UNCHANGED) std::string outName; - if (splitOutputByFile) { // One/two outputs per input file; typically prefix + per-file suffix + if (splitOutputByFile) { if (userInput.cifiCombinations_flag) { - if (outId >= outCount) { lg.verbose("openOutputForFile: outId " + std::to_string(outId) + " has no corresponding outFiles entry in cifiCombinations mode"); std::abort(); } - // Determine input file index size_t fileIdx = outId / 2; - if (fileIdx >= numFiles) { // Validate + if (fileIdx >= numFiles) { lg.verbose("openOutputForFile: computed fileIdx=" + std::to_string(fileIdx) + " invalid (outId=" + std::to_string(outId) + ", total files=" + std::to_string(userInput.inFiles.size()) + ")"); std::abort(); } - std::string baseName = getFileName(userInput.inFiles[fileIdx]); // Base name of the input file + std::string baseName = getFileName(userInput.inFiles[fileIdx]); std::string ext = getFileExt(userInput.inFiles[fileIdx]); std::string baseNameNoExt = stripKnownExt(baseName, ext); - std::string suffix = ((outId % 2) == 0) ? "_1" : "_2"; // Assign _1 or _2 depending on parity + std::string suffix = ((outId % 2) == 0) ? "_1" : "_2"; outName = userInput.outPrefix + baseNameNoExt + suffix + "." + ext; lg.verbose("SciFi combinations mode: outId=" + std::to_string(outId) + " fileIdx=" + std::to_string(fileIdx) + " → " + outName); - }else{ + } else { if (outId >= userInput.inFiles.size()) { lg.verbose("openOutputForFile: outId " + std::to_string(outId) + " has no corresponding outFiles entry"); @@ -1185,37 +1183,64 @@ void InReads::openOutputForFile(size_t outId) { } outName = userInput.outPrefix + getFileName(userInput.inFiles[outId]); } - } else { // Single-output mode => everything maps to slot 0 + } else { if (userInput.outFiles.empty()) { lg.verbose("openOutputForFile: no output file specified"); std::abort(); } outName = userInput.outFiles[0]; } - // Choose mode based on extension - std::string ext = getFileExt(outName); + // Choose mode based on extension (UNCHANGED) + std::string ext = getFileExt(outName); int fmt_case = string_to_case.count(ext) ? string_to_case.at(ext) : 0; + htsFile* ofp = nullptr; - + switch (fmt_case) { case 1: // fasta[.gz] case 2: { // fastq[.gz] - char mode[4] = "w"; - if (sam_open_mode(mode + 1, outName.c_str(), NULL) < 0) { - printf("Invalid file name\n"); - exit(EXIT_FAILURE); - } - if (!(ofp = sam_open(outName.c_str(), mode))) { - printf("Could not open %s\n", outName.c_str()); - exit(EXIT_FAILURE); + + // If user requested BGZF compression level, force BGZF + if (userInput.bgzip_level >= 0) { + + if (userInput.bgzip_level > 9) { + fprintf(stderr, "Invalid bgzip level %d (must be 0–9)\n", + userInput.bgzip_level); + exit(EXIT_FAILURE); + } + + // "wz" = write BGZF + // append compression level digit if provided + std::string mode = "wz"; + mode += char('0' + userInput.bgzip_level); + + ofp = hts_open(outName.c_str(), mode.c_str()); + if (!ofp) { + fprintf(stderr, "Could not open %s with BGZF mode %s\n", + outName.c_str(), mode.c_str()); + exit(EXIT_FAILURE); + } + + } else { + char mode[4] = "w"; + if (sam_open_mode(mode + 1, outName.c_str(), NULL) < 0) { + printf("Invalid file name\n"); + exit(EXIT_FAILURE); + } + if (!(ofp = sam_open(outName.c_str(), mode))) { + printf("Could not open %s\n", outName.c_str()); + exit(EXIT_FAILURE); + } } break; } + case 3: { // bam - ofp = sam_open(outName.c_str(),"wb"); + ofp = sam_open(outName.c_str(), "wb"); break; } + case 4: { // cram htsFormat fmt4 = {sequence_data, cram, {3, 1}, gzip, 6, NULL}; hts_parse_format(&fmt4, "cram,no_ref=1"); @@ -1234,14 +1259,14 @@ void InReads::openOutputForFile(size_t outId) { std::abort(); } - // Duplicate and write BAM/CRAM header + // Duplicate and write header (UNCHANGED) bam_hdr_t* ohdr = bam_hdr_dup(hdrTemplate); if (!ohdr) { lg.verbose("Failed to duplicate BAM header"); std::abort(); } - // Attach thread pool if available + // Attach thread pool if available (UNCHANGED; works for BGZF too) if (tpool_write.pool) { hts_set_opt(ofp, HTS_OPT_THREAD_POOL, &tpool_write); } @@ -1254,8 +1279,7 @@ void InReads::openOutputForFile(size_t outId) { fps[outId] = ofp; hdrs[outId] = ohdr; - lg.verbose("Opened output file '" + outName + - "' for outId " + std::to_string(outId)); + lg.verbose("Opened output file '" + outName + "' for outId " + std::to_string(outId)); } void InReads::writeToStream() { From e25e6aac7ed584e8ba4c2c350794fe388eed61c1 Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Wed, 25 Feb 2026 15:39:04 -0500 Subject: [PATCH 11/15] Update reads.cpp --- src/reads.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reads.cpp b/src/reads.cpp index 3a12067..96a10ee 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -58,7 +58,7 @@ InReads::InReads(UserInputRdeval& ui) } } - if (splitOutputByFile) + if (splitOutputByFile || userInput.bgzip_level >= 0) streamOutput = true; const size_t numFiles = userInput.inFiles.size(); From bd48640bfb3fd6b79e7821b5c0de6395127777b8 Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Wed, 25 Feb 2026 15:40:49 -0500 Subject: [PATCH 12/15] Update reads.cpp --- src/reads.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reads.cpp b/src/reads.cpp index 96a10ee..3a12067 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -58,7 +58,7 @@ InReads::InReads(UserInputRdeval& ui) } } - if (splitOutputByFile || userInput.bgzip_level >= 0) + if (splitOutputByFile) streamOutput = true; const size_t numFiles = userInput.inFiles.size(); From efad745ac91c045db5583b17a2c0d42f90a2e818 Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Wed, 25 Feb 2026 15:59:46 -0500 Subject: [PATCH 13/15] fixed bug with bgzip output --- src/reads.cpp | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/src/reads.cpp b/src/reads.cpp index 3a12067..8563bae 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -1200,38 +1200,39 @@ void InReads::openOutputForFile(size_t outId) { switch (fmt_case) { case 1: // fasta[.gz] case 2: { // fastq[.gz] + char mode[8] = "w"; - // If user requested BGZF compression level, force BGZF + // This sets the FORMAT (FASTA/FASTQ) in mode[1], e.g. "wF" or "wf" + if (sam_open_mode(mode + 1, outName.c_str(), NULL) < 0) { + printf("Invalid file name\n"); + exit(EXIT_FAILURE); + } + + // If --bgzip[=level] was requested, APPEND 'z' + optional digit if (userInput.bgzip_level >= 0) { + int lvl = userInput.bgzip_level; - if (userInput.bgzip_level > 9) { - fprintf(stderr, "Invalid bgzip level %d (must be 0–9)\n", - userInput.bgzip_level); - exit(EXIT_FAILURE); + // "optional level": if flag given without level, you said "use minimum" + // pick 1 as "minimum compression" (fastest that still compresses) + if (lvl == 0) { + // If you want "minimum" to mean 0 instead, delete this block. + lvl = 1; } - // "wz" = write BGZF - // append compression level digit if provided - std::string mode = "wz"; - mode += char('0' + userInput.bgzip_level); - - ofp = hts_open(outName.c_str(), mode.c_str()); - if (!ofp) { - fprintf(stderr, "Could not open %s with BGZF mode %s\n", - outName.c_str(), mode.c_str()); + if (lvl < 0 || lvl > 9) { + fprintf(stderr, "Invalid bgzip level %d (must be 0–9)\n", lvl); exit(EXIT_FAILURE); } - } else { - char mode[4] = "w"; - if (sam_open_mode(mode + 1, outName.c_str(), NULL) < 0) { - printf("Invalid file name\n"); - exit(EXIT_FAILURE); - } - if (!(ofp = sam_open(outName.c_str(), mode))) { - printf("Could not open %s\n", outName.c_str()); - exit(EXIT_FAILURE); - } + size_t mlen = std::strlen(mode); // e.g. 2 ("wf") + mode[mlen++] = 'z'; + mode[mlen++] = char('0' + lvl); + mode[mlen] = '\0'; // now "wfz1" or "wFz1" + } + + if (!(ofp = sam_open(outName.c_str(), mode))) { + printf("Could not open %s\n", outName.c_str()); + exit(EXIT_FAILURE); } break; } From 6ed92793b2cc85d1e67401b8158283d8b8f5676b Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Wed, 25 Feb 2026 16:11:16 -0500 Subject: [PATCH 14/15] Update main.cpp --- src/main.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.cpp b/src/main.cpp index f0fe443..1d08f81 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -15,7 +15,7 @@ #include "stream-obj.h" #include "input.h" -std::string version = "0.0.8"; +std::string version = "0.0.9"; //global std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now(); // immediately start the clock when the program is run From f62c0d2de9ce7ab85f82d3c102ba98b682c42455 Mon Sep 17 00:00:00 2001 From: Giulio Formenti Date: Wed, 25 Feb 2026 16:29:36 -0500 Subject: [PATCH 15/15] avoid automatically writing bgzip --- src/reads.cpp | 59 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/src/reads.cpp b/src/reads.cpp index 8563bae..17940d9 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -1196,44 +1196,77 @@ void InReads::openOutputForFile(size_t outId) { int fmt_case = string_to_case.count(ext) ? string_to_case.at(ext) : 0; htsFile* ofp = nullptr; + + auto strip_bgzf_from_mode = [](char* mode) { + // Remove 'z' and an optional following digit [0-9] + // Example: "wfz6" -> "wf", "wFz" -> "wF" + const size_t n = std::strlen(mode); + size_t w = 0; + for (size_t r = 0; r < n; ++r) { + if (mode[r] == 'z') { + // skip 'z' + // if next is a digit, skip it too + if (r + 1 < n && mode[r + 1] >= '0' && mode[r + 1] <= '9') { + ++r; + } + continue; + } + mode[w++] = mode[r]; + } + mode[w] = '\0'; + }; switch (fmt_case) { + case 1: // fasta[.gz] case 2: { // fastq[.gz] - char mode[8] = "w"; - // This sets the FORMAT (FASTA/FASTQ) in mode[1], e.g. "wF" or "wf" + char mode[16] = "w"; + + // Let htslib decide format (FASTA vs FASTQ), and it may also add compression flags based on suffix + // e.g. for ".gz" it often injects 'z' (BGZF) into mode. if (sam_open_mode(mode + 1, outName.c_str(), NULL) < 0) { printf("Invalid file name\n"); exit(EXIT_FAILURE); } - // If --bgzip[=level] was requested, APPEND 'z' + optional digit if (userInput.bgzip_level >= 0) { + // BGZF requested: keep/ensure 'z' and optionally set level digit int lvl = userInput.bgzip_level; - // "optional level": if flag given without level, you said "use minimum" - // pick 1 as "minimum compression" (fastest that still compresses) - if (lvl == 0) { - // If you want "minimum" to mean 0 instead, delete this block. - lvl = 1; - } + // “optional level”: if flag present but no explicit level, you said "use minimum" + // implement that as level 1 (change to 0 if you truly want no compression) + if (lvl == 0) lvl = 1; if (lvl < 0 || lvl > 9) { fprintf(stderr, "Invalid bgzip level %d (must be 0–9)\n", lvl); exit(EXIT_FAILURE); } - size_t mlen = std::strlen(mode); // e.g. 2 ("wf") - mode[mlen++] = 'z'; - mode[mlen++] = char('0' + lvl); - mode[mlen] = '\0'; // now "wfz1" or "wFz1" + // Ensure mode has 'z' and a level digit. + // First strip any existing z/level then append fresh. + strip_bgzf_from_mode(mode); + + const size_t mlen = std::strlen(mode); + mode[mlen + 0] = 'z'; + mode[mlen + 1] = char('0' + lvl); + mode[mlen + 2] = '\0'; + + } else { + // BGZF NOT requested: force plain gzip if ".gz" + // Remove any bgzf injected by sam_open_mode, then add 'g' + strip_bgzf_from_mode(mode); + + const size_t mlen = std::strlen(mode); + mode[mlen + 0] = 'g'; + mode[mlen + 1] = '\0'; } if (!(ofp = sam_open(outName.c_str(), mode))) { printf("Could not open %s\n", outName.c_str()); exit(EXIT_FAILURE); } + break; }