diff --git a/gfalibs b/gfalibs index 47fefb5..e54a382 160000 --- a/gfalibs +++ b/gfalibs @@ -1 +1 @@ -Subproject commit 47fefb5e7fd0628605c546a11c9fef240855ae4d +Subproject commit e54a3820d918345643861ff7c645ae0befa21cd8 diff --git a/include/bam-utils.h b/include/bam-utils.h new file mode 100644 index 0000000..ec83ffc --- /dev/null +++ b/include/bam-utils.h @@ -0,0 +1,171 @@ +#ifndef BAMUTILS_H +#define BAMUTILS_H + +#include + +#include +#include +#include // std::swap +#include // fprintf +#include // std::abort +#include // uint16_t, etc + +// Create a fresh unmapped BAM record (allocates a new bam1_t). +// NOTE: l_qname passed to bam_set1 must include the trailing '\0' => name.size() + 1. +static inline bam1_t* make_unmapped_bam( + const std::string& name, + const std::string& seq, + const std::string& qual_str, + uint16_t extra_flags = 0 +) { + bam1_t* b = bam_init1(); + if (!b) { + std::fprintf(stderr, "Failed to initialize bam1_t\n"); + std::abort(); + } + + const int32_t l_qname = static_cast(name.size() + 1); + const int32_t l_seq = static_cast(seq.size()); + + const uint16_t flag = static_cast(BAM_FUNMAP | extra_flags); + + // We pass nullptr for qual here and then fill the numeric quals below. + // (Some htslib versions allow passing qual_str.c_str() directly; see fill_unmapped_bam) + if (bam_set1( + b, + l_qname, + name.c_str(), + flag, + -1, -1, // tid, pos + 0, // mapq + 0, // n_cigar + nullptr, // cigar + -1, -1, // mtid, mpos + 0, // isize + l_seq, + seq.c_str(), + nullptr, // qual (filled below) + 0 // l_aux + ) < 0) + { + std::fprintf(stderr, "Failed to set BAM data\n"); + std::abort(); + } + + // Fill numeric (0..93) qualities. bam_set1 stores quals as raw Phred, not ASCII. + uint8_t* qual = bam_get_qual(b); + if (qual) { + if (!qual_str.empty()) { + if (qual_str.size() != seq.size()) { + std::fprintf(stderr, "Sequence and quality length mismatch in make_unmapped_bam\n"); + std::abort(); + } + for (size_t i = 0; i < qual_str.size(); ++i) { + const int q = static_cast(qual_str[i]) - 33; + qual[i] = static_cast(q < 0 ? 0 : q); + } + } else { + // If no qualities provided, set to 0 (Phred 0). + for (int32_t i = 0; i < l_seq; ++i) + qual[i] = 0; + } + } + + return b; +} + +// Keep b->data allocated, just mark record empty (pool-friendly). +static inline void bam_recycle_keep_capacity(bam1_t* b) { + b->l_data = 0; + b->core = bam1_core_t(); // zero-initialize core + // Do NOT free b->data; m_data stays as-is. +} + +// Fill an existing bam1_t as an unmapped record. +// Uses bam_set1 when available; otherwise falls back to making a temporary record and swapping. +static inline void fill_unmapped_bam( + bam1_t* b, + const std::string& name, + const std::string& seq, + const std::string& qual_str, + uint16_t extra_flags = 0 +) { +#ifdef bam_set1 + const int32_t l_qname = static_cast(name.size() + 1); + const int32_t l_seq = static_cast(seq.size()); + + const int ret = bam_set1( + b, + l_qname, name.c_str(), + static_cast(BAM_FUNMAP | extra_flags), + -1, -1, 0, // tid, pos, mapq + 0, nullptr, // n_cigar, cigar + -1, -1, 0, // mtid, mpos, isize + l_seq, + seq.c_str(), + qual_str.empty() ? nullptr : qual_str.c_str(), + 0 + ); + if (ret < 0) { + std::fprintf(stderr, "bam_set1 failed\n"); + std::abort(); + } + + // If qual_str was nullptr/empty, explicitly set qualities to 0. + if (qual_str.empty()) { + uint8_t* q = bam_get_qual(b); + if (q) { + for (int32_t i = 0; i < l_seq; ++i) + q[i] = 0; + } + } + +#else + // Fallback: allocate a temporary record using existing helper, + // then swap contents into b (still avoids repeated malloc/free of b itself). + bam1_t* tmp = make_unmapped_bam(name, seq, qual_str, extra_flags); + + std::swap(b->core, tmp->core); + std::swap(b->l_data, tmp->l_data); + std::swap(b->m_data, tmp->m_data); + std::swap(b->data, tmp->data); + + bam_destroy1(tmp); +#endif +} + +// ---- Templated helpers (no dependency on BamBatch typedef) ---- +// Requirements on Batch: +// - batch.reads is std::vector +// - batch.used is an integer count of valid entries + +template +static inline void bambatch_ensure_capacity(Batch& batch, size_t additional) { + const size_t need = static_cast(batch.used) + additional; + if (need <= batch.reads.size()) return; + + const size_t old = batch.reads.size(); + size_t neu = old ? old : 1024; + while (neu < need) neu *= 2; + + batch.reads.resize(neu); + for (size_t i = old; i < neu; ++i) + batch.reads[i] = bam_init1(); +} + +template +static inline void bambatch_append_unmapped( + Batch& batch, + const std::string& name, + const std::string& seq, + const std::string& qual, + uint16_t extra_flags +) { + bambatch_ensure_capacity(batch, 1); + + bam1_t* b = batch.reads[batch.used++]; + bam_recycle_keep_capacity(b); + fill_unmapped_bam(b, name, seq, qual, extra_flags); +} + +#endif /* BAMUTILS_H */ diff --git a/include/cifi.h b/include/cifi.h index 677a321..a777b05 100644 --- a/include/cifi.h +++ b/include/cifi.h @@ -2,6 +2,7 @@ #define SCIFI_H #include +#include "bam-utils.h" #include "reads.h" struct EnzymeInfo { @@ -19,54 +20,8 @@ std::vector find_cut_positions(const InRead& read, const EnzymeInfo& enz); void digest_read(const InRead& read, const EnzymeInfo& enz, std::vector& frag_seqs, std::vector& frag_quals, std::vector* frag_lengths = nullptr); // Digest sequences and corresponding quality strings -BamBatch chop_read_to_bambatch(const InRead& read, const EnzymeInfo& enz, uint32_t batchN = 0, uint32_t fileN = 0); +void chop_read_into_bambatch(const InRead& read, const EnzymeInfo& enz, BamBatch& batch); void build_pe_bambatches(const InRead& read, const EnzymeInfo& enz, BamBatch& R1_batch, BamBatch& R2_batch); -static bam1_t* make_unmapped_bam(const std::string& name, const std::string& seq, const std::string& qual_str, uint16_t extra_flags = 0) { - bam1_t* b = bam_init1(); - if (!b) { - fprintf(stderr, "Failed to initialize bam1_t\n"); - std::abort(); - } - - const int32_t l_qname = static_cast(name.size()); - const int32_t l_seq = static_cast(seq.size()); - - uint16_t flag = BAM_FUNMAP | extra_flags; - - if (bam_set1(b, - l_qname, - name.c_str(), - flag, - -1, -1, - 0, 0, - nullptr, - -1, -1, - 0, - l_seq, - seq.c_str(), - nullptr, - 0) < 0) { - fprintf(stderr, "Failed to set BAM data\n"); - std::abort(); - } - - uint8_t* qual = bam_get_qual(b); - if (qual && !qual_str.empty()) { - if (qual_str.size() != seq.size()) { - fprintf(stderr, "Sequence and quality length mismatch in make_unmapped_bam\n"); - std::abort(); - } - for (size_t i = 0; i < qual_str.size(); ++i) { - qual[i] = static_cast(qual_str[i] - 33); - } - } else if (qual) { - for (int32_t i = 0; i < l_seq; ++i) - qual[i] = 0; - } - - return b; -} - #endif /* SCIFI_H */ diff --git a/include/reads.h b/include/reads.h index 7f96bf8..5a7f6ec 100644 --- a/include/reads.h +++ b/include/reads.h @@ -48,6 +48,8 @@ struct UserInputRdeval : UserInput { // CiFi bool inputCifi = false; std::string restrictionEnzyme = ""; + + int bgzip_level = -1; }; @@ -64,8 +66,6 @@ struct InRead { uint64_t A=0, C=0, G=0, T=0, N=0, lowerCount=0; uint32_t uId=0, iId=0, seqPos=0; - std::vector tags; - std::vector> variants; float avgQuality = 0.0f; InRead() = default; @@ -83,8 +83,7 @@ struct InRead { uint32_t readPos, uint64_t A, uint64_t C, uint64_t G, uint64_t T, uint64_t N, uint64_t lowerCount, - float avgQuality, - const std::vector* tags = nullptr) + float avgQuality) : seqHeader(std::move(header)), seqComment(std::move(comment)), inSequence(std::move(sequence)), @@ -93,7 +92,6 @@ struct InRead { uId(uId), iId(iId), seqPos(readPos), avgQuality(avgQuality) { (void)threadLog; - if (tags) this->tags = *tags; #ifdef DEBUG if (threadLog) { threadLog->add("Processing read: " + seqHeader + @@ -109,7 +107,6 @@ struct InRead { } }; - // ------------------------------------------------------------- // ReadBatch and container structures // ------------------------------------------------------------- @@ -117,8 +114,11 @@ struct InRead { template struct ReadBatch { std::vector reads; + uint32_t used = 0; // how many valid entries this time uint32_t batchN = 0; // within-file batch index uint32_t fileN = 0; // file ID + + void reset_keep_capacity() { used = 0; } // DO NOT clear() }; template @@ -131,7 +131,15 @@ struct FileBatches { std::vector> files; }; -using BamBatch = ReadBatch; +struct BamBatch : ReadBatch { + ~BamBatch() { + for (bam1_t* b : this->reads) { + if (b) bam_destroy1(b); + } + this->reads.clear(); + this->used = 0; + } +}; using InReadBatch = ReadBatch; // ------------------------------------------------------------- @@ -196,7 +204,7 @@ class InReads { size_t outBuffersN = 0; // Input queues - BlockingQueue> free_pool_in; + ElasticBlockingPool free_pool_in; BlockingQueue> filled_q_in; // Output queues (BAM batches) @@ -222,7 +230,7 @@ class InReads { inline bool applyFilter(uint64_t size, float avgQuality); // Processing - float computeAvgQuality(std::string& sequenceQuality); + float computeAvgQuality(const std::string& sequenceQuality); void filterRecords(); void extractInReads(); bool traverseInReads(Sequences2& readBatch); diff --git a/src/cifi.cpp b/src/cifi.cpp index d56bf72..f06c1b2 100644 --- a/src/cifi.cpp +++ b/src/cifi.cpp @@ -17,6 +17,7 @@ #include "stream-obj.h" #include "cifi.h" +#include "bam-utils.h" // Find enzyme info from name EnzymeInfo get_enzyme(const std::string& name) { @@ -106,79 +107,57 @@ void digest_read(const InRead& read, const EnzymeInfo& enz, std::vector frag_seqs; std::vector frag_quals; - std::vector frag_lengths; // optional, you can drop if not needed - - digest_read(read, enz, frag_seqs, frag_quals, &frag_lengths); - - BamBatch batch; - batch.batchN = batchN; - batch.fileN = fileN; + digest_read(read, enz, frag_seqs, frag_quals, nullptr); - batch.reads.reserve(frag_seqs.size()); + // Ensure enough capacity in one go + bambatch_ensure_capacity(batch, frag_seqs.size()); for (std::size_t i = 0; i < frag_seqs.size(); ++i) { std::string frag_id = read.seqHeader + "_frag" + std::to_string(i); - - bam1_t* b = make_unmapped_bam( - frag_id, - frag_seqs[i], - frag_quals[i], - 0 // no extra flags - ); - - batch.reads.push_back(b); - - // If you want fragment lengths here, record frag_lengths[i] somewhere else. + bambatch_append_unmapped(batch, frag_id, frag_seqs[i], frag_quals[i], 0); } - return batch; } -void build_pe_bambatches(const InRead& read, const EnzymeInfo& enz, BamBatch& R1_batch, BamBatch& R2_batch) { +void build_pe_bambatches(const InRead& read, const EnzymeInfo& enz,BamBatch& R1_batch, BamBatch& R2_batch) { if (!read.inSequenceQuality) { throw std::runtime_error("InRead::inSequenceQuality is null in build_pe_bambatches"); } std::vector frag_seqs; std::vector frag_quals; - digest_read(read, enz, frag_seqs, frag_quals, nullptr); const std::size_t n = frag_seqs.size(); if (n == 0) return; - const int trim = 5; // as in original fic2pe - const std::size_t max_pairs = n * (n - 1) / 2; + const int trim = 5; - R1_batch.reads.reserve(R1_batch.reads.size() + max_pairs); - R2_batch.reads.reserve(R2_batch.reads.size() + max_pairs); + // Optional: pre-reserve capacity in one go + const std::size_t max_pairs = n * (n - 1) / 2; + bambatch_ensure_capacity(R1_batch, max_pairs); + bambatch_ensure_capacity(R2_batch, max_pairs); for (std::size_t i1 = 0; i1 < n; ++i1) { for (std::size_t i2 = i1 + 1; i2 < n; ++i2) { - const std::string& seq1 = frag_seqs[i1]; - const std::string& seq2 = frag_seqs[i2]; + const std::string& seq1 = frag_seqs[i1]; const std::string& q1 = frag_quals[i1]; - const std::string& q2 = frag_quals[i2]; - std::string R1_seq = seq1; - std::string R1_qual = q1; + const std::string& seq2 = frag_seqs[i2]; + const std::string& q2 = frag_quals[i2]; - std::string R2_seq; - std::string R2_qual; - - if (seq2.size() > static_cast(trim)) { - R2_seq = seq2.substr(trim); - R2_qual = q2.substr(trim); - } else { + if (seq2.size() <= static_cast(trim)) continue; - } + + std::string R2_seq = seq2.substr(trim); + std::string R2_qual = q2.substr(trim); std::string base_id = read.seqHeader + "_" + std::to_string(i1) + "_" + @@ -187,17 +166,14 @@ void build_pe_bambatches(const InRead& read, const EnzymeInfo& enz, BamBatch& R1 std::string R1_id = base_id + " 1"; std::string R2_id = base_id + " 2"; - bam1_t* b1 = make_unmapped_bam( - R1_id, R1_seq, R1_qual, + bambatch_append_unmapped( + R1_batch, R1_id, seq1, q1, static_cast(BAM_FPAIRED | BAM_FREAD1) ); - bam1_t* b2 = make_unmapped_bam( - R2_id, R2_seq, R2_qual, + bambatch_append_unmapped( + R2_batch, R2_id, R2_seq, R2_qual, static_cast(BAM_FPAIRED | BAM_FREAD2) ); - - R1_batch.reads.push_back(b1); - R2_batch.reads.push_back(b2); } } } diff --git a/src/main.cpp b/src/main.cpp index f8764a2..1d08f81 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -15,7 +15,7 @@ #include "stream-obj.h" #include "input.h" -std::string version = "0.0.8"; +std::string version = "0.0.9"; //global std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now(); // immediately start the clock when the program is run @@ -48,6 +48,7 @@ int main(int argc, char **argv) { {"parallel-files", required_argument, 0, 0}, {"decompression-threads", required_argument, 0, 0}, {"compression-threads", required_argument, 0, 0}, + {"bgzip-level", optional_argument, 0, 0}, {"sequence-report",no_argument, 0, 0}, {"exclude-list", required_argument, 0, 'e'}, @@ -119,6 +120,19 @@ int main(int argc, char **argv) { userInput.restrictionEnzyme = optarg; userInput.inputCifi = true; } + if (strcmp(long_options[option_index].name, "bgzip-level") == 0) { + if (optarg) { + userInput.bgzip_level = atoi(optarg); + + if (userInput.bgzip_level < 0 || userInput.bgzip_level > 9) { + fprintf(stderr, "--bgzip-level must be 0–9\n"); + return EXIT_FAILURE; + } + } else { + // Option provided without argument → use minimum compression + userInput.bgzip_level = 0; + } + } break; default: // handle positional arguments if (isInt(optarg)) { // if the positional argument is a number, it is likely the expected genome size @@ -198,6 +212,7 @@ int main(int argc, char **argv) { printf("\t--parallel-files numbers of files that can be opened and processed in parallel (producer threads, default:4).\n"); printf("\t--decompression-threads numbers of decompression threads used by htslib for bam/cram (default:4).\n"); printf("\t--compression-threads numbers of compression threads used by htslib for bam/cram (default:6).\n"); + printf("\t--bgzip-level[=0-9] force BGZF (bgzip) compression for FASTA/FASTQ outputs (default: 0, minimum).\n"); printf("\t--tabular tabular output.\n"); printf("\t--verbose verbose output.\n"); printf("\t-m --max-memory max number of bases in a single buffer of the ring buffer (default:1000000 bp or ~1MB). The total number of buffers is approximately consumer threads*2.\n"); diff --git a/src/reads.cpp b/src/reads.cpp index 9670e45..17940d9 100644 --- a/src/reads.cpp +++ b/src/reads.cpp @@ -5,7 +5,9 @@ #include #include +#include #include +#include #include #include "log.h" @@ -19,10 +21,6 @@ #include "stream-obj.h" #include -#include "zlib.h" -#include "zstream/zstream_common.hpp" -#include "zstream/ozstream.hpp" -#include "zstream/ozstream_impl.hpp" #include "output.h" #include "len-vector.h" @@ -41,7 +39,7 @@ InReads::InReads(UserInputRdeval& ui) ), inBuffersN((consumersN + producersN) * 2), outBuffersN(consumersN * 4 + 1), - free_pool_in(inBuffersN), + free_pool_in(inBuffersN,1), filled_q_in(inBuffersN), free_pool_out(outBuffersN + outBuffersN * userInput.inputCifi), // two buffers per consumer, double that if cifi (PE output) filled_q_out(outBuffersN + outBuffersN * userInput.inputCifi) @@ -101,15 +99,13 @@ void InReads::load() { readSummaryBatches.files.resize(userInput.inFiles.size()); // resize to accommodate batches from multiple files fileBatches.files.resize(userInput.inFiles.size()); // resize to accommodate batches from multiple files - // Preallocate exactly N buffers - for (size_t i = 0; i < inBuffersN; ++i) { - std::unique_ptr b(new Sequences2); - free_pool_in.push(std::move(b)); - } if (streamOutput) { for (size_t i = 0; i < (outBuffersN + outBuffersN * userInput.inputCifi); ++i) { auto batch = std::make_unique(); - batch->reads.reserve(1024); + batch->reads.resize(1024); + for (size_t j = 0; j < batch->reads.size(); ++j) + batch->reads[j] = bam_init1(); + batch->used = 0; free_pool_out.push(std::move(batch)); } } @@ -140,299 +136,376 @@ void InReads::load() { for (auto& th : consumers) th.join(); filled_q_out.push(std::unique_ptr()); // sentinel closeStream(); + lg.verbose("free_pool_in created=" + std::to_string(free_pool_in.created()) + + " max=" + std::to_string(free_pool_in.max())); } void InReads::extractInReads() { - - std::string newLine, seqHeader, seqComment, line, bedHeader; - std::size_t numFiles = userInput.inFiles.size(); - uint32_t batchCounter = 0; // batch number, read position in the batch - uint64_t processedLength = 0; - bool sample = userInput.ratio < 1 ? true : false; // read subsampling - - const static phmap::flat_hash_map string_to_case{ - {"fasta",1}, - {"fa",1}, - {"fasta.gz",1}, - {"fa.gz",1}, - {"fastq",1}, - {"fq",1}, - {"fastq.gz",1}, - {"fq.gz",1}, - {"bam",2}, - {"cram",2}, - {"rd",3} + + const std::size_t numFiles = userInput.inFiles.size(); + const bool sample = userInput.ratio < 1; + + // Reusable buffers (per worker thread invocation of extractInReads) + std::string name, comment, seqBuf, qualBuf, qnameBuf; + + auto split_header = [](const char* s, std::string& header, std::string& comm) { + const char* sp = std::strchr(s, ' '); + if (!sp) { header.assign(s); comm.clear(); } + else { header.assign(s, sp - s); comm.assign(sp + 1); } + }; + + auto push_batch = [&](std::unique_ptr& readBatch, uint32_t& batchN, uint32_t fileN) { + readBatch->batchN = batchN++; + readBatch->fileN = fileN; + lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); + filled_q_in.push(std::move(readBatch)); + readBatch = free_pool_in.pop(); + }; + + auto enable_threads = [&](htsFile* fp) { + if (fp && userInput.decompression_threads > 1) { + // works for BGZF; harmless if unsupported for this handle + (void)hts_set_threads(fp, userInput.decompression_threads); + } + }; + + auto read_fa_fq = [&](const std::string& file, + std::unique_ptr& readBatch, + uint32_t& batchN, + uint32_t fileN) { + + htsFile* fp = hts_open(file.c_str(), "r"); + if (!fp) { + fprintf(stderr, "Failed to open input (%s)\n", file.c_str()); + exit(EXIT_FAILURE); + } + enable_threads(fp); + + kstring_t ks = {0, 0, nullptr}; + + // Decide FASTA vs FASTQ from first non-empty line + int ret = hts_getline(fp, '\n', &ks); + while (ret >= 0 && ks.l == 0) ret = hts_getline(fp, '\n', &ks); + + if (ret < 0) { // empty file: keep your “residual even if empty” behavior outside + if (ks.s) free(ks.s); + hts_close(fp); + return; + } + + uint64_t processedLength = 0; + + auto flush_if_needed = [&](size_t add) { + processedLength += add; + if (processedLength > batchSize) { + push_batch(readBatch, batchN, fileN); + processedLength = 0; + } + }; + + if (ks.s[0] == '>') { + // FASTA: header line starts with '>', sequence may be wrapped. + std::string pendingHeader(ks.s + 1); + + while (true) { + split_header(pendingHeader.c_str(), name, comment); + + // Decide once per record whether we keep it + const bool keep = (!sample) || (newRand() <= userInput.ratio); + + seqBuf.clear(); + while (true) { + ret = hts_getline(fp, '\n', &ks); + if (ret < 0) { pendingHeader.clear(); break; } // EOF ends record + if (ks.l > 0 && ks.s[0] == '>') { // next record + pendingHeader.assign(ks.s + 1); + break; + } + + // Only build sequence if we keep this record + if (keep) seqBuf.append(ks.s, ks.l); + } + + if (keep) { + Sequence2& rec = readBatch->next_slot(); + rec.set(std::move(name), + std::move(comment), + std::move(seqBuf), + std::string{}, // no qualities in FASTA + seqPos++); + flush_if_needed(rec.sequence.size()); + } + + if (pendingHeader.empty()) break; + } + + } else if (ks.s[0] == '@') { + // FASTQ: 4-line records (assumes seq/qual are single-line) + while (true) { + if (ks.l == 0 || ks.s[0] != '@') { + fprintf(stderr, "FASTQ malformed (expected '@') in %s\n", file.c_str()); + exit(EXIT_FAILURE); + } + + split_header(ks.s + 1, name, comment); + + // Decide once per record whether we keep it + const bool keep = (!sample) || (newRand() <= userInput.ratio); + + // seq line + if (hts_getline(fp, '\n', &ks) < 0) { + fprintf(stderr, "Record appears truncated (%s). Exiting.\n", name.c_str()); + exit(EXIT_FAILURE); + } + if (keep) seqBuf.assign(ks.s, ks.l); + + // plus line + if (hts_getline(fp, '\n', &ks) < 0) { + fprintf(stderr, "Record appears truncated (%s). Exiting.\n", name.c_str()); + exit(EXIT_FAILURE); + } + + // qual line + if (hts_getline(fp, '\n', &ks) < 0) { + fprintf(stderr, "Record appears truncated (%s). Exiting.\n", name.c_str()); + exit(EXIT_FAILURE); + } + if (keep) qualBuf.assign(ks.s, ks.l); + + if (keep) { + Sequence2& rec = readBatch->next_slot(); + rec.set(std::move(name), + std::move(comment), + std::move(seqBuf), + std::move(qualBuf), + seqPos++); + flush_if_needed(rec.sequence.size()); + } + + // next header + ret = hts_getline(fp, '\n', &ks); + while (ret >= 0 && ks.l == 0) ret = hts_getline(fp, '\n', &ks); + if (ret < 0) break; + } + } else { + fprintf(stderr, "Cannot recognize text input (%s). Expected '>' (FASTA) or '@' (FASTQ).\n", file.c_str()); + exit(EXIT_FAILURE); + } + + if (ks.s) free(ks.s); + hts_close(fp); }; - + + auto read_bam_cram = [&](const std::string& file, + std::unique_ptr& readBatch, + uint32_t& batchN, + uint32_t fileN) { + + samFile* fp = hts_open(file.c_str(), "r"); + if (!fp) { + fprintf(stderr, "Failed to open BAM/CRAM (%s)\n", file.c_str()); + exit(EXIT_FAILURE); + } + enable_threads(fp); + + bam_hdr_t* hdr = sam_hdr_read(fp); + if (!hdr) { + fprintf(stderr, "Failed to read BAM/CRAM header (%s)\n", file.c_str()); + exit(EXIT_FAILURE); + } + + bam1_t* b = bam_init1(); + if (!b) { + fprintf(stderr, "Failed to allocate bam1_t\n"); + exit(EXIT_FAILURE); + } + + uint64_t processedLength = 0; + + while (sam_read1(fp, hdr, b) > 0) { + + if (sample && newRand() > userInput.ratio) + continue; + + const uint32_t len = b->core.l_qseq; + + qnameBuf.assign(bam_get_qname(b)); + + seqBuf.resize(len); + char* out = len ? &seqBuf[0] : nullptr; + uint8_t* bseq = bam_get_seq(b); + for (uint32_t k = 0; k < len; ++k) + out[k] = seq_nt16_str[bam_seqi(bseq, k)]; + + qualBuf.resize(len); + char* qout = len ? &qualBuf[0] : nullptr; + uint8_t* bq = bam_get_qual(b); + if (bq && len > 0 && bq[0] != 0xFF) { + for (uint32_t k = 0; k < len; ++k) + qout[k] = static_cast(bq[k] + 33); + } else if (len) { + std::memset(qout, '!', len); + } + + processedLength += len; + + Sequence2& rec = readBatch->next_slot(); + rec.set(std::move(qnameBuf), + std::string{}, // comment not stored in BAM path + std::move(seqBuf), + std::move(qualBuf), + seqPos++); + + if (processedLength > batchSize) { + push_batch(readBatch, batchN, fileN); + processedLength = 0; + } + } + + bam_destroy1(b); + bam_hdr_destroy(hdr); + hts_close(fp); + }; + while (true) { - + uint32_t i = fileCounterStarted.fetch_add(1, std::memory_order_relaxed); - if (i >= numFiles) - break; - + if (i >= numFiles) break; + std::unique_ptr readBatch = free_pool_in.pop(); uint32_t batchN = 0; - std::string file = userInput.file('r', i); - std::string ext = getFileExt(file); - if (ext != "rd") { + + const std::string file = userInput.file('r', i); + const std::string ext = getFileExt(file); + + if (ext != "rd") { threadPool.queueJob([this, i, file]() { std::string md5; computeMd5(file, md5); - md5s[i].first = getFileName(file); + md5s[i].first = getFileName(file); md5s[i].second = md5; return true; }); - } - switch (string_to_case.count(ext) ? string_to_case.at(ext) : 0) { - - case 1: { // fa*[.gz] - StreamObj streamObj; - std::shared_ptr stream = streamObj.openStream(userInput, 'r', i); - - if (stream) { - - switch (stream->peek()) { - - case '>': { - - stream->get(); - - while (getline(*stream, newLine)) { - - size_t spacePos = newLine.find(" "); - seqHeader = newLine.substr(0, spacePos); - if (spacePos != std::string::npos) - seqComment = newLine.substr(spacePos + 1); - else - seqComment.clear(); - std::unique_ptr inSequence = std::make_unique(); - if (!getline(*stream, *inSequence, '>')) { - fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); - exit(EXIT_FAILURE); - } - if (sample && newRand() > userInput.ratio) - continue; - processedLength += inSequence->size(); - readBatch->add(seqPos++, batchCounter++, std::move(seqHeader), std::move(seqComment), std::move(inSequence)); - - if (processedLength > batchSize) { - readBatch->batchN = batchN++; - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); - filled_q_in.push(std::move(readBatch)); - readBatch = free_pool_in.pop(); - processedLength = 0; - batchCounter = 0; - } - //lg.verbose("Individual fasta sequence read: " + seqHeader); - } - break; - } - case '@': { - - while (getline(*stream, newLine)) { // file input - - newLine.erase(0, 1); - size_t spacePos = newLine.find(" "); - seqHeader = newLine.substr(0, spacePos); - if (spacePos != std::string::npos) - seqComment = newLine.substr(spacePos + 1); - else - seqComment.clear(); - - std::unique_ptr inSequence = std::make_unique(); - if (!getline(*stream, *inSequence)) { - fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); - exit(EXIT_FAILURE); - } - if (!getline(*stream, newLine)) { - fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); - exit(EXIT_FAILURE); - } - std::unique_ptr inSequenceQuality = std::make_unique(); - if (!getline(*stream, *inSequenceQuality)) { - fprintf(stderr, "Record appears truncated (%s). Exiting.\n", seqHeader.c_str()); - exit(EXIT_FAILURE); - } - if (sample && newRand() > userInput.ratio) - continue; - processedLength += inSequence->size(); - readBatch->add(seqPos++, batchCounter++, std::move(seqHeader), std::move(seqComment), std::move(inSequence), std::move(inSequenceQuality)); - - if (processedLength > batchSize) { - readBatch->batchN = batchN++; - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); - filled_q_in.push(std::move(readBatch)); - readBatch = free_pool_in.pop(); - processedLength = 0; - batchCounter = 0; - } - //lg.verbose("Individual fastq sequence read: " + seqHeader); - } - break; - } - } - readBatch->batchN = batchN++; // process residual reads - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); - filled_q_in.push(std::move(readBatch)); - processedLength = 0; - batchCounter = 0; - } - break; - } - case 2: { // bam, cram - - samFile *fp_in = hts_open(file.c_str(),"r"); //open bam file - bam_hdr_t *bamHdr = sam_hdr_read(fp_in); //read header - bam1_t *bamdata = bam_init1(); //initialize an alignment - - htsThreadPool tpool_read = {NULL, 0}; - tpool_read.pool = hts_tpool_init(userInput.decompression_threads); - if (tpool_read.pool) - hts_set_opt(fp_in, HTS_OPT_THREAD_POOL, &tpool_read); - else - lg.verbose("Failed to generate decompression threadpool with " + std::to_string(userInput.decompression_threads) + " threads. Continuing single-threaded"); - - while(sam_read1(fp_in,bamHdr,bamdata) > 0) { - - uint32_t len = bamdata->core.l_qseq; // length of the read. - uint8_t *seq = bam_get_seq(bamdata); // seq string - std::unique_ptr inSequenceQuality; - - std::unique_ptr inSequence = std::make_unique(); - inSequence->resize(len); - for(uint32_t i=0; iat(i) = seq_nt16_str[bam_seqi(seq,i)]; //gets nucleotide id and converts them into IUPAC id. - - uint8_t* qual = bam_get_qual(bamdata); - if (qual && (len > 0) && qual[0] != 0xFF) { - inSequenceQuality = std::make_unique(len, '\0'); - for (uint32_t i = 0; i < len; ++i) - (*inSequenceQuality)[i] = static_cast(qual[i] + 33); - } else { - inSequenceQuality = std::make_unique(len, '!'); // No per-base qualities; synthesize minimal qualities - } + } + + if (ext == "rd") { + readTableCompressed(userInput.inFiles[i]); + } else if (ext == "bam" || ext == "cram") { + read_bam_cram(file, readBatch, batchN, i); + push_batch(readBatch, batchN, i); // residual (even if empty) + } else if (ext == "fasta" || ext == "fa" || ext == "fasta.gz" || ext == "fa.gz" || + ext == "fastq" || ext == "fq" || ext == "fastq.gz" || ext == "fq.gz") { + read_fa_fq(file, readBatch, batchN, i); + push_batch(readBatch, batchN, i); // residual (even if empty) + } else { + fprintf(stderr, "cannot recognize input (%s). Must be: fasta/fastq (optionally .gz), bam, cram, rd.\n", + file.c_str()); + exit(EXIT_FAILURE); + } - if (sample && newRand() > userInput.ratio) - continue; - processedLength += inSequence->size(); - - std::string qname(bam_get_qname(bamdata)); - readBatch->add(seqPos++, batchCounter++, std::move(qname), std::string(), std::move(inSequence), std::move(inSequenceQuality)); - - if (processedLength > batchSize) { - readBatch->batchN = batchN++; - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); - filled_q_in.push(std::move(readBatch)); - readBatch = free_pool_in.pop(); - processedLength = 0; - batchCounter = 0; - } - lg.verbose("Individual fastq sequence read: " + seqHeader); - } - readBatch->batchN = batchN++; // process residual reads - readBatch->fileN = i; - lg.verbose("Processing batch N: " + std::to_string(readBatch->batchN)); - filled_q_in.push(std::move(readBatch)); - processedLength = 0; - batchCounter = 0; - bam_destroy1(bamdata); - sam_close(fp_in); - if (tpool_read.pool) - hts_tpool_destroy(tpool_read.pool); - break; - } - case 3: { // rd - readTableCompressed(userInput.inFiles[i]); - break; - } - default: { // fasta[.gz] - fprintf(stderr, "cannot recognize input (%s). Must be: fasta, fastq, bam, cram.\n", file.c_str()); - exit(EXIT_FAILURE); - } - } if (fileCounterCompleted.fetch_add(1, std::memory_order_relaxed) + 1 == numFiles) { - for (size_t i = 0; i < consumersN; ++i) filled_q_in.push(std::unique_ptr(nullptr)); // sentinels + for (size_t k = 0; k < consumersN; ++k) + filled_q_in.push(std::unique_ptr(nullptr)); } - } + } +} + +static inline const double* phredProbLUTd() { + static double lut[94]; + static bool inited = false; + if (!inited) { + for (int q = 0; q < 94; ++q) { + lut[q] = std::pow(10.0, -double(q) / 10.0); + } + inited = true; + } + return lut; +} + +static inline double avg_q_from_phred33_prob_d(const std::string& qstr) { + if (qstr.empty()) return 0.0; + const double* lut = phredProbLUTd(); + double sum = 0.0; + for (unsigned char c : qstr) { + int q = int(c) - 33; + if (q < 0) q = 0; + if (q > 93) q = 93; + sum += lut[q]; + } + const double mean = sum / double(qstr.size()); + return -10.0 * std::log10(mean); } -float InReads::computeAvgQuality(std::string &sequenceQuality) { - double sumQuality = 0; - for (char &quality : sequenceQuality) - sumQuality += pow(10, -(double(quality) - 33)/10); - return (float) -10 * std::log10(sumQuality/sequenceQuality.size()); +float InReads::computeAvgQuality(const std::string& sequenceQuality) { + return (float)avg_q_from_phred33_prob_d(sequenceQuality); } void InReads::initDictionaries() { - - std::ifstream includeFile(userInput.inBedInclude); - std::string key; + + std::ifstream includeFile(userInput.inBedInclude); + std::string key; - while (includeFile >> key) - includeList.insert(key); + while (includeFile >> key) + includeList.insert(key); - includeFile.close(); - - std::ifstream excludeFile(userInput.inBedExclude); + includeFile.close(); + + std::ifstream excludeFile(userInput.inBedExclude); - while (excludeFile >> key) - excludeList.insert(key); + while (excludeFile >> key) + excludeList.insert(key); - excludeFile.close(); + excludeFile.close(); } void InReads::initFilters() { - - bool cannotParse = false; - - std::istringstream stream(userInput.filter); // get l,q and logical operator in between - std::string token1; - std::string token2; - std::string token3; - stream >> token1 >> token2 >> token3; - - if ((userInput.filter.find('l') == std::string::npos && userInput.filter.find('q') == std::string::npos) || // no filter found - (token1.size() < 3) || // first filter too short - (token2.size() != 0 && token3.size() == 0) || // second filter missing - (token2.size() == 0 && token3.size() != 0) || // missing operator - (token2.size() > 1) // malformatted operator - ) - cannotParse = true; - - if (cannotParse){ - fprintf(stderr, "Could not parse filter. Terminating.\n"); - exit(EXIT_FAILURE); - } - - std::string lFilter, qFilter; - if (token2.size()) - logicalOperator = token2[0]; - - if (token1[0] == 'l') { - lFilter = token1; - lSign = lFilter[1]; - l = stoi(lFilter.substr(2)); - } else if (token1[0] == 'q') { - qFilter = token1; - qSign = qFilter[1]; - q = stoi(qFilter.substr(2)); - } - - if (token3.size() != 0) { - if (token3[0] == 'l') { - lFilter = token3; - lSign = lFilter[1]; - l = stoi(lFilter.substr(2)); - }else if (token3[0] == 'q') { - qFilter = token3; - qSign = qFilter[1]; - q = stoi(qFilter.substr(2)); - } - } + + bool cannotParse = false; + + std::istringstream stream(userInput.filter); // get l,q and logical operator in between + std::string token1; + std::string token2; + std::string token3; + stream >> token1 >> token2 >> token3; + + if ((userInput.filter.find('l') == std::string::npos && userInput.filter.find('q') == std::string::npos) || // no filter found + (token1.size() < 3) || // first filter too short + (token2.size() != 0 && token3.size() == 0) || // second filter missing + (token2.size() == 0 && token3.size() != 0) || // missing operator + (token2.size() > 1) // malformatted operator + ) + cannotParse = true; + + if (cannotParse){ + fprintf(stderr, "Could not parse filter. Terminating.\n"); + exit(EXIT_FAILURE); + } + + std::string lFilter, qFilter; + if (token2.size()) + logicalOperator = token2[0]; + + if (token1[0] == 'l') { + lFilter = token1; + lSign = lFilter[1]; + l = stoi(lFilter.substr(2)); + } else if (token1[0] == 'q') { + qFilter = token1; + qSign = qFilter[1]; + q = stoi(qFilter.substr(2)); + } + + if (token3.size() != 0) { + if (token3[0] == 'l') { + lFilter = token3; + lSign = lFilter[1]; + l = stoi(lFilter.substr(2)); + }else if (token3[0] == 'q') { + qFilter = token3; + qSign = qFilter[1]; + q = stoi(qFilter.substr(2)); + } + } } void InReads::filterRecords() { @@ -455,57 +528,58 @@ void InReads::filterRecords() { } inline bool InReads::filterRead(Sequence2* sequence) { - - uint64_t size = sequence->sequence->size(); - float avgQuality = 0; - if (sequence->sequenceQuality != NULL) - avgQuality = computeAvgQuality(*sequence->sequenceQuality); - - return applyFilter(size, avgQuality); + + uint64_t size = sequence->sequence.size(); + float avgQuality = 0.0f; + if (!sequence->sequenceQuality.empty()) + avgQuality = computeAvgQuality(sequence->sequenceQuality); + return applyFilter(size, avgQuality); } inline bool InReads::applyFilter(uint64_t size, float avgQuality) { - - bool lFilter = ((lSign == '0') || - ((lSign == '>') && (size > l)) || - ((lSign == '<') && (size < l)) || - ((lSign == '=') && (size == l)) - ); - - bool qFilter = ((qSign == '0') || - ((qSign == '>') && (avgQuality > q)) || - ((qSign == '<') && (avgQuality < q)) || - ((qSign == '=') && (avgQuality == q)) - ); - - if ((logicalOperator == '0' && (lSign != '0' && lFilter)) || - (logicalOperator == '0' && (qSign != '0' && qFilter)) || - (logicalOperator == '|' && (lFilter || qFilter)) || - (logicalOperator == '&' && (lFilter && qFilter)) - ) - return false; - return true; + + bool lFilter = ((lSign == '0') || + ((lSign == '>') && (size > l)) || + ((lSign == '<') && (size < l)) || + ((lSign == '=') && (size == l)) + ); + + bool qFilter = ((qSign == '0') || + ((qSign == '>') && (avgQuality > q)) || + ((qSign == '<') && (avgQuality < q)) || + ((qSign == '=') && (avgQuality == q)) + ); + + if ((logicalOperator == '0' && (lSign != '0' && lFilter)) || + (logicalOperator == '0' && (qSign != '0' && qFilter)) || + (logicalOperator == '|' && (lFilter || qFilter)) || + (logicalOperator == '&' && (lFilter && qFilter)) + ) + return false; + return true; } -bool InReads::traverseInReads(Sequences2& readBatchIn) -{ +bool InReads::traverseInReads(Sequences2& readBatchIn) { Log threadLog; threadLog.setId(readBatchIn.batchN); - + std::unique_ptr readBatchOut; std::unique_ptr R1_batch; std::unique_ptr R2_batch; - - if (!streamOutput) { // we allocate once as we are not outputting anything + + // Only need output batches if we're streaming or if CiFi builds internal batches. + // Keep your original behavior for !streamOutput (alloc once) since you said it was fine. + if (!streamOutput) { if (userInput.inputCifi && userInput.cifiCombinations_flag) { R1_batch = std::make_unique(); R2_batch = std::make_unique(); - }else{ + } else { readBatchOut = std::make_unique(); } } - if (userInput.inputCifi && userInput.cifiCombinations_flag) { // Two output batches per input batch + // Acquire from pool if streaming + if (userInput.inputCifi && userInput.cifiCombinations_flag) { const uint32_t fileIdx = readBatchIn.fileN; const uint32_t baseOut = fileIdx * 2; @@ -514,26 +588,27 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) R2_batch = free_pool_out.pop(); } - R1_batch->reads.clear(); - R2_batch->reads.clear(); + R1_batch->used = 0; + R2_batch->used = 0; R1_batch->batchN = readBatchIn.batchN; R1_batch->fileN = baseOut; R2_batch->batchN = readBatchIn.batchN; R2_batch->fileN = baseOut + 1; - } else { // existing single-output case + } else { if (streamOutput) readBatchOut = free_pool_out.pop(); - - readBatchOut->reads.clear(); - readBatchOut->fileN = readBatchIn.fileN; + + readBatchOut->used = 0; + readBatchOut->fileN = readBatchIn.fileN; readBatchOut->batchN = readBatchIn.batchN; } + EnzymeInfo enz; - if(userInput.inputCifi) + if (userInput.inputCifi) enz = get_enzyme(userInput.restrictionEnzyme); - + std::vector inReadsSummaryBatch; uint32_t readN = 0; LenVector readLensBatch; @@ -543,109 +618,102 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) const bool hasInclude = !includeList.empty(); const bool hasExclude = !excludeList.empty(); const bool hasFilter = (userInput.filter != "none"); + + thread_local std::string tmpQual; - for (const auto& sequencePtr : readBatchIn.sequences) { - Sequence2* sequence = sequencePtr.get(); + for (uint32_t idx = 0; idx < readBatchIn.used; ++idx) { + Sequence2& sequence = *readBatchIn.slots[idx]; - if (hasInclude && - includeList.find(sequence->header) == includeList.end()) + if (hasInclude && includeList.find(sequence.header) == includeList.end()) continue; - - if (hasExclude && - excludeList.find(sequence->header) != excludeList.end()) + if (hasExclude && excludeList.find(sequence.header) != excludeList.end()) continue; - - if (hasFilter && filterRead(sequence)) + if (hasFilter && filterRead(&sequence)) continue; - InRead read = traverseInRead(&threadLog, - sequence, - readBatchIn.batchN + readN++); + // Compute stats (and optionally payload) via traverseInRead() + InRead read = traverseInRead(&threadLog, &sequence, readBatchIn.batchN + readN++); const uint64_t len = read.A + read.C + read.G + read.T + read.N; readLensBatch.push_back(std::make_pair(len, read.avgQuality)); - batchA += read.A; - batchT += read.T; - batchC += read.C; - batchG += read.G; + batchA += read.A; + batchT += read.T; + batchC += read.C; + batchG += read.G; batchN_ += read.N; - // STREAMING OUTPUT: convert to BAM record and append to batch + // ---- STREAMING OUTPUT (non-CiFi): fill BAM directly from Sequence2 ---- if (streamOutput && !userInput.inputCifi) { - // Ensure quality exists - if (!read.inSequenceQuality) { - read.inSequenceQuality = - std::make_unique(read.inSequence->size(), '!'); + // Ensure there is a slot in the preallocated bam array + if (readBatchOut->used >= readBatchOut->reads.size()) { + // Grow once in a while; still pool-friendly. + const size_t old = readBatchOut->reads.size(); + const size_t neu = old ? old * 2 : 1024; + readBatchOut->reads.resize(neu); + for (size_t j = old; j < neu; ++j) + readBatchOut->reads[j] = bam_init1(); } - bam1_t* q = make_unmapped_bam( - read.seqHeader, // name - *read.inSequence, // seq - *read.inSequenceQuality, // qual (ASCII+33) - 0 // extra_flags (none, just BAM_FUNMAP) - ); + bam1_t* q = readBatchOut->reads[readBatchOut->used++]; + bam_recycle_keep_capacity(q); - readBatchOut->reads.push_back(q); - - }else if (userInput.inputCifi) { - - if (userInput.cifiCombinations_flag) { - // SCIFI + COMBINATIONS: two *separate* BamBatch outputs + if (!sequence.sequenceQuality.empty()) { + fill_unmapped_bam(q, sequence.header, sequence.sequence, sequence.sequenceQuality, 0); + } else { + tmpQual.assign(sequence.sequence.size(), '!'); + fill_unmapped_bam(q, sequence.header, sequence.sequence, tmpQual, 0); + } + } + // ---- CiFi paths (need payload in read) ---- + else if (userInput.inputCifi) { + + // Make sure payload exists for CiFi processing + if (!read.inSequence) { + // If traverseInRead was called with needPayload=false by mistake, + // this makes the behavior safe. + read.inSequence = std::make_unique(sequence.sequence); + } + if (!read.inSequenceQuality) { + if (!sequence.sequenceQuality.empty()) + read.inSequenceQuality = std::make_unique(sequence.sequenceQuality); + else + read.inSequenceQuality = std::make_unique(read.inSequence->size(), '!'); + } - if (!read.inSequenceQuality) { - read.inSequenceQuality = - std::make_unique(read.inSequence->size(), '!'); - } + if (userInput.cifiCombinations_flag) { build_pe_bambatches(read, enz, *R1_batch, *R2_batch); - }else{ - // SCIFI, NO COMBINATIONS: just chop into fragments in this batch - if (!read.inSequenceQuality) { - read.inSequenceQuality = - std::make_unique(read.inSequence->size(), '!'); - } - - BamBatch tmp = chop_read_to_bambatch( - read, - enz, - readBatchOut->batchN, - readBatchOut->fileN - ); - - // append chopped fragments into our current batch - readBatchOut->reads.insert(readBatchOut->reads.end(), - tmp.reads.begin(), tmp.reads.end()); + } else { + const uint32_t before = readBatchOut->used; + chop_read_into_bambatch(read, enz, *readBatchOut); + const uint32_t added = readBatchOut->used - before; + cifiReadN.fetch_add(added, std::memory_order_relaxed); } } + if (userInput.content_flag) inReadsSummaryBatch.push_back(std::move(read)); + } - sequence->sequence.reset(); - sequence->sequenceQuality.reset(); - } - + // Push output batches if (userInput.inputCifi && userInput.cifiCombinations_flag) { - cifiReadN.fetch_add(R1_batch->reads.size() + R2_batch->reads.size()); - + cifiReadN.fetch_add(R1_batch->used + R2_batch->used); + if (streamOutput) { filled_q_out.push(std::move(R1_batch)); filled_q_out.push(std::move(R2_batch)); - }else{ - R1_batch->reads.clear(); - R2_batch->reads.clear(); + } else { + R1_batch->used = 0; + R2_batch->used = 0; } - // If they’re empty, they just get destroyed and their BamBatch - // instances are removed from the pool; the writer will add more - // via free_pool_out.push after writing other batches. } else { - cifiReadN.fetch_add(readBatchOut->reads.size()); if (streamOutput) filled_q_out.push(std::move(readBatchOut)); else - readBatchOut->reads.clear(); + readBatchOut->used = 0; } - { // Update global stats and summaries + { // Global stats / summaries std::unique_lock lck(writerMutex); if (userInput.content_flag && !inReadsSummaryBatch.empty()) { @@ -661,11 +729,11 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) } readLens.insert(readLensBatch); - totA += batchA; - totT += batchT; - totC += batchC; - totG += batchG; - totN += batchN_; + totA += batchA; + totT += batchT; + totC += batchC; + totG += batchG; + totN += batchN_; totReads += readN; threadLog.print(); @@ -676,13 +744,15 @@ bool InReads::traverseInReads(Sequences2& readBatchIn) InRead InReads::traverseInRead(Log* threadLog, Sequence2* sequence, uint32_t seqPos) { std::vector> bedCoords; + if (userInput.hc_cutoff != -1) { - homopolymerCompress(sequence->sequence.get(), bedCoords, userInput.hc_cutoff); - sequence->sequenceQuality.reset(); + homopolymerCompress(&sequence->sequence, bedCoords, userInput.hc_cutoff); + // keep capacity, just clear content + sequence->sequenceQuality.clear(); } uint64_t A=0, C=0, G=0, T=0, N=0, lowerCount=0; - for (char base : *sequence->sequence) { + for (char base : sequence->sequence) { if (std::islower(static_cast(base))) ++lowerCount; switch (base) { case 'A': case 'a': ++A; break; @@ -693,19 +763,40 @@ InRead InReads::traverseInRead(Log* threadLog, Sequence2* sequence, uint32_t seq } } - float avgQuality = sequence->sequenceQuality - ? computeAvgQuality(*sequence->sequenceQuality) - : 0.0f; + const float avgQuality = + (!sequence->sequenceQuality.empty()) + ? computeAvgQuality(sequence->sequenceQuality) + : 0.0f; + + // Decide if this read needs payload in InRead. + // - Needed for CiFi processing + // - For content summaries, you currently drop payload when !streamOutput, so payload not needed there. + // If you later decide content summaries should keep sequence, flip this. + const bool needPayload = userInput.inputCifi; + + std::unique_ptr seqPtr; + std::unique_ptr qualPtr; + + if (needPayload) { + // yes, this allocates — but ONLY for CiFi (or whatever you later enable) + seqPtr = std::make_unique(sequence->sequence); + + if (!sequence->sequenceQuality.empty()) + qualPtr = std::make_unique(sequence->sequenceQuality); + // else leave nullptr; caller can synthesize if needed + } InRead inRead( threadLog, 0, 0, sequence->header, sequence->comment, - std::move(sequence->sequence), - std::move(sequence->sequenceQuality), - seqPos, A, C, G, T, N, lowerCount, + std::move(seqPtr), + std::move(qualPtr), + seqPos, + A, C, G, T, N, lowerCount, avgQuality ); + // Preserve your old behavior: if content summaries + no streaming, drop payload if (userInput.content_flag && !streamOutput) inRead.dropPayload(); @@ -720,164 +811,164 @@ uint64_t InReads::getTotReadLen() { for (uint64_t i = 0; i < readCount; ++i) sum += readLens[i].first; } - return sum; + return sum; } double InReads::computeGCcontent() { - uint64_t totReadLen = totA + totC + totG + totT; - double GCcontent = (double) (totG+totC)/totReadLen * 100; - - return GCcontent; + uint64_t totReadLen = totA + totC + totG + totT; + double GCcontent = (double) (totG+totC)/totReadLen * 100; + + return GCcontent; } double InReads::computeAvgReadLen() { - return (double) getTotReadLen()/totReads; + return (double) getTotReadLen()/totReads; } uint64_t InReads::getReadN50() { - return readNstars[4]; + return readNstars[4]; } void InReads::evalNstars() { // clean up once len-vector iterator is available, still expensive - uint64_t sum = 0, totLen = getTotReadLen(); - uint8_t N = 1; - for(unsigned int i = 0; i < readLens.size(); ++i) { // for each length - sum += readLens[i].first; // increase sum - while (sum >= ((double) totLen / 10 * N) && N<= 10) { // conditionally add length.at or pos to each N/L* bin - - readNstars[N-1] = readLens[i].first; - readLstars[N-1] = i + 1; - N += 1; - } - } + uint64_t sum = 0, totLen = getTotReadLen(); + uint8_t N = 1; + for(unsigned int i = 0; i < readLens.size(); ++i) { // for each length + sum += readLens[i].first; // increase sum + while (sum >= ((double) totLen / 10 * N) && N<= 10) { // conditionally add length.at or pos to each N/L* bin + + readNstars[N-1] = readLens[i].first; + readLstars[N-1] = i + 1; + N += 1; + } + } } uint64_t InReads::getSmallestRead() { - return readLens.back(); + return readLens.back(); } uint64_t InReads::getLargestRead() { - return readLens.front(); + return readLens.front(); } double InReads::getAvgQuality(){ - double sumQualities = 0, avgQualitiesSize=readLens.size(); + double sumQualities = 0, avgQualitiesSize=readLens.size(); - for (uint64_t i = 0; i < avgQualitiesSize; ++i) - sumQualities += readLens[i].first * pow(10, -readLens[i].second/10); // sum the qualities normalized by their read length + for (uint64_t i = 0; i < avgQualitiesSize; ++i) + sumQualities += readLens[i].first * pow(10, -readLens[i].second/10); // sum the qualities normalized by their read length - return -10 * std::log10(sumQualities/getTotReadLen()); + return -10 * std::log10(sumQualities/getTotReadLen()); } void InReads::report() { - if (totReads > 0) { + if (totReads > 0) { if (userInput.filter != "none" && getFileExt(userInput.file('r', 0)) == "rd") filterRecords(); - - readLens.sort(); - - std::cout << std::fixed; // disables scientific notation - std::cout << std::setprecision(2); // 2 decimal points - - if (!tabular_flag) - std::cout< hist; - - for (uint64_t i = 0; i < readCount; ++i) { - if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) - ++hist[readLens[i].first]; - } - std::vector> table(hist.begin(), hist.end()); // converts the hashmap to a table - std::sort(table.begin(), table.end()); - - for (auto pair : table) - std::cout< hist; - - for (uint64_t i = 0; i < readCount; ++i) { - if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) - ++hist[readLens[i].first]; - } - std::vector> table(hist.begin(), hist.end()); - std::sort(table.begin(), table.end()); - uint64_t totReadLen = getTotReadLen(), sum = 0; - - for (auto pair : table) { - std::cout<<+pair.first<<"\t"<<+pair.second<<"\t"<<+pair.first*pair.second<<"\t"<<+(totReadLen - sum)<<"\n"; - sum += pair.first*pair.second; - } - } + + std::cout << std::fixed; // disables scientific notation + std::cout << std::setprecision(2); // 2 decimal points + + uint64_t readCount = readLens.size(); + bool noFilter = userInput.filter == "none" ? true : false; + if(userInput.sizeOutType == 's') + readLens.sort(); + + if (userInput.sizeOutType == 'u' || userInput.sizeOutType == 's') { + + for (uint64_t i = 0; i < readCount; ++i) { + if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) + std::cout << readLens[i].first << "\n"; + } + + }else if(userInput.sizeOutType == 'h') { + + phmap::parallel_flat_hash_map hist; + + for (uint64_t i = 0; i < readCount; ++i) { + if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) + ++hist[readLens[i].first]; + } + std::vector> table(hist.begin(), hist.end()); // converts the hashmap to a table + std::sort(table.begin(), table.end()); + + for (auto pair : table) + std::cout< hist; + + for (uint64_t i = 0; i < readCount; ++i) { + if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) + ++hist[readLens[i].first]; + } + std::vector> table(hist.begin(), hist.end()); + std::sort(table.begin(), table.end()); + uint64_t totReadLen = getTotReadLen(), sum = 0; + + for (auto pair : table) { + std::cout<<+pair.first<<"\t"<<+pair.second<<"\t"<<+pair.first*pair.second<<"\t"<<+(totReadLen - sum)<<"\n"; + sum += pair.first*pair.second; + } + } } void InReads::printQualities() { - - std::cout << std::fixed; // disables scientific notation - std::cout << std::setprecision(2); // 2 decimal points - - uint64_t readCount = readLens.size(); - - bool noFilter = userInput.filter == "none" ? true : false; - char qualityOut = userInput.qualityOut; - - if (qualityOut == 'q'){ - for (uint64_t i = 0; i < readCount; ++i) { - if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) - std::cout << readLens[i].second << "\n"; - } - } - else if (qualityOut == 'a') { // a prints read lengths and qualities - for (uint64_t i = 0; i < readCount; ++i) { - if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) - std::cout << readLens[i].first << "," << readLens[i].second << "\n"; - } - } + + std::cout << std::fixed; // disables scientific notation + std::cout << std::setprecision(2); // 2 decimal points + + uint64_t readCount = readLens.size(); + + bool noFilter = userInput.filter == "none" ? true : false; + char qualityOut = userInput.qualityOut; + + if (qualityOut == 'q'){ + for (uint64_t i = 0; i < readCount; ++i) { + if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) + std::cout << readLens[i].second << "\n"; + } + } + else if (qualityOut == 'a') { // a prints read lengths and qualities + for (uint64_t i = 0; i < readCount; ++i) { + if (noFilter || !applyFilter(readLens[i].first, readLens[i].second)) + std::cout << readLens[i].first << "," << readLens[i].second << "\n"; + } + } } void InReads::printContent() { @@ -926,61 +1017,61 @@ void InReads::printContent() { } void dump_read(bam1_t* b) { - printf("->core.tid:(%d)\n", b->core.tid); - printf("->core.pos:(%ld)\n", static_cast(b->core.pos)); - printf("->core.bin:(%d)\n", b->core.bin); - printf("->core.qual:(%d)\n", b->core.qual); - printf("->core.l_qname:(%d)\n", b->core.l_qname); - printf("->core.flag:(%d)\n", b->core.flag); - printf("->core.n_cigar:(%d)\n", b->core.n_cigar); - printf("->core.l_qseq:(%d)\n", b->core.l_qseq); - printf("->core.mtid:(%d)\n", b->core.mtid); - printf("->core.mpos:(%ld)\n", static_cast(b->core.mpos)); - printf("->core.isize:(%ld)\n", static_cast(b->core.isize)); - if (b->data) { - printf("->data:"); - int i; - for (i = 0; i < b->l_data; ++i) { - printf("%x ", b->data[i]); - } - printf("\n"); - } - if (b->core.l_qname) { - printf("qname: %s\n",bam_get_qname(b)); - } - if (b->core.l_qseq) { - printf("qseq:"); - int i; - for (i = 0; i < b->core.l_qseq; ++i) { - printf("%c", seq_nt16_str[bam_seqi(bam_get_seq(b),i)]); - } - printf("\n"); - printf("qual:"); - uint8_t *s = bam_get_qual(b); - for (i = 0; i < b->core.l_qseq; ++i) { - printf("%c",s[i] + 33); - } - printf("\n"); - - } - - if (bam_get_l_aux(b)) { - uint32_t i = 0; - uint8_t* aux = bam_get_aux(b); - - while (i < bam_get_l_aux(b)) { - printf("%.2s:%c:",aux+i,*(aux+i+2)); - i += 2; - switch (*(aux+i)) { - case 'Z': - while (*(aux+1+i) != '\0') { putc(*(aux+1+i), stdout); ++i; } - break; - } - putc('\n',stdout); - ++i;++i; - } - } - printf("\n"); + printf("->core.tid:(%d)\n", b->core.tid); + printf("->core.pos:(%ld)\n", static_cast(b->core.pos)); + printf("->core.bin:(%d)\n", b->core.bin); + printf("->core.qual:(%d)\n", b->core.qual); + printf("->core.l_qname:(%d)\n", b->core.l_qname); + printf("->core.flag:(%d)\n", b->core.flag); + printf("->core.n_cigar:(%d)\n", b->core.n_cigar); + printf("->core.l_qseq:(%d)\n", b->core.l_qseq); + printf("->core.mtid:(%d)\n", b->core.mtid); + printf("->core.mpos:(%ld)\n", static_cast(b->core.mpos)); + printf("->core.isize:(%ld)\n", static_cast(b->core.isize)); + if (b->data) { + printf("->data:"); + int i; + for (i = 0; i < b->l_data; ++i) { + printf("%x ", b->data[i]); + } + printf("\n"); + } + if (b->core.l_qname) { + printf("qname: %s\n",bam_get_qname(b)); + } + if (b->core.l_qseq) { + printf("qseq:"); + int i; + for (i = 0; i < b->core.l_qseq; ++i) { + printf("%c", seq_nt16_str[bam_seqi(bam_get_seq(b),i)]); + } + printf("\n"); + printf("qual:"); + uint8_t *s = bam_get_qual(b); + for (i = 0; i < b->core.l_qseq; ++i) { + printf("%c",s[i] + 33); + } + printf("\n"); + + } + + if (bam_get_l_aux(b)) { + uint32_t i = 0; + uint8_t* aux = bam_get_aux(b); + + while (i < bam_get_l_aux(b)) { + printf("%.2s:%c:",aux+i,*(aux+i+2)); + i += 2; + switch (*(aux+i)) { + case 'Z': + while (*(aux+1+i) != '\0') { putc(*(aux+1+i), stdout); ++i; } + break; + } + putc('\n',stdout); + ++i;++i; + } + } + printf("\n"); } void InReads::initStream() { @@ -1044,10 +1135,9 @@ void InReads::openOutputForFile(size_t outId) { lg.verbose("openOutputForFile: invalid outId " + std::to_string(outId)); return; } - if (fps[outId] != nullptr) // already open return; - + const static phmap::flat_hash_map string_to_case{ {"fasta", 1}, {"fa", 1}, @@ -1060,33 +1150,32 @@ void InReads::openOutputForFile(size_t outId) { {"bam", 3}, {"cram", 4} }; - // Decide output file name + + // Decide output file name (UNCHANGED) std::string outName; - if (splitOutputByFile) { // One/two outputs per input file; typically prefix + per-file suffix + if (splitOutputByFile) { if (userInput.cifiCombinations_flag) { - if (outId >= outCount) { lg.verbose("openOutputForFile: outId " + std::to_string(outId) + " has no corresponding outFiles entry in cifiCombinations mode"); std::abort(); } - // Determine input file index size_t fileIdx = outId / 2; - if (fileIdx >= numFiles) { // Validate + if (fileIdx >= numFiles) { lg.verbose("openOutputForFile: computed fileIdx=" + std::to_string(fileIdx) + " invalid (outId=" + std::to_string(outId) + ", total files=" + std::to_string(userInput.inFiles.size()) + ")"); std::abort(); } - std::string baseName = getFileName(userInput.inFiles[fileIdx]); // Base name of the input file + std::string baseName = getFileName(userInput.inFiles[fileIdx]); std::string ext = getFileExt(userInput.inFiles[fileIdx]); std::string baseNameNoExt = stripKnownExt(baseName, ext); - std::string suffix = ((outId % 2) == 0) ? "_1" : "_2"; // Assign _1 or _2 depending on parity + std::string suffix = ((outId % 2) == 0) ? "_1" : "_2"; outName = userInput.outPrefix + baseNameNoExt + suffix + "." + ext; lg.verbose("SciFi combinations mode: outId=" + std::to_string(outId) + " fileIdx=" + std::to_string(fileIdx) + " → " + outName); - }else{ + } else { if (outId >= userInput.inFiles.size()) { lg.verbose("openOutputForFile: outId " + std::to_string(outId) + " has no corresponding outFiles entry"); @@ -1094,37 +1183,98 @@ void InReads::openOutputForFile(size_t outId) { } outName = userInput.outPrefix + getFileName(userInput.inFiles[outId]); } - } else { // Single-output mode => everything maps to slot 0 + } else { if (userInput.outFiles.empty()) { lg.verbose("openOutputForFile: no output file specified"); std::abort(); } outName = userInput.outFiles[0]; } - // Choose mode based on extension - std::string ext = getFileExt(outName); + // Choose mode based on extension (UNCHANGED) + std::string ext = getFileExt(outName); int fmt_case = string_to_case.count(ext) ? string_to_case.at(ext) : 0; + htsFile* ofp = nullptr; + auto strip_bgzf_from_mode = [](char* mode) { + // Remove 'z' and an optional following digit [0-9] + // Example: "wfz6" -> "wf", "wFz" -> "wF" + const size_t n = std::strlen(mode); + size_t w = 0; + for (size_t r = 0; r < n; ++r) { + if (mode[r] == 'z') { + // skip 'z' + // if next is a digit, skip it too + if (r + 1 < n && mode[r + 1] >= '0' && mode[r + 1] <= '9') { + ++r; + } + continue; + } + mode[w++] = mode[r]; + } + mode[w] = '\0'; + }; + switch (fmt_case) { + case 1: // fasta[.gz] case 2: { // fastq[.gz] - char mode[4] = "w"; + + char mode[16] = "w"; + + // Let htslib decide format (FASTA vs FASTQ), and it may also add compression flags based on suffix + // e.g. for ".gz" it often injects 'z' (BGZF) into mode. if (sam_open_mode(mode + 1, outName.c_str(), NULL) < 0) { printf("Invalid file name\n"); exit(EXIT_FAILURE); } + + if (userInput.bgzip_level >= 0) { + // BGZF requested: keep/ensure 'z' and optionally set level digit + int lvl = userInput.bgzip_level; + + // “optional level”: if flag present but no explicit level, you said "use minimum" + // implement that as level 1 (change to 0 if you truly want no compression) + if (lvl == 0) lvl = 1; + + if (lvl < 0 || lvl > 9) { + fprintf(stderr, "Invalid bgzip level %d (must be 0–9)\n", lvl); + exit(EXIT_FAILURE); + } + + // Ensure mode has 'z' and a level digit. + // First strip any existing z/level then append fresh. + strip_bgzf_from_mode(mode); + + const size_t mlen = std::strlen(mode); + mode[mlen + 0] = 'z'; + mode[mlen + 1] = char('0' + lvl); + mode[mlen + 2] = '\0'; + + } else { + // BGZF NOT requested: force plain gzip if ".gz" + // Remove any bgzf injected by sam_open_mode, then add 'g' + strip_bgzf_from_mode(mode); + + const size_t mlen = std::strlen(mode); + mode[mlen + 0] = 'g'; + mode[mlen + 1] = '\0'; + } + if (!(ofp = sam_open(outName.c_str(), mode))) { printf("Could not open %s\n", outName.c_str()); exit(EXIT_FAILURE); } + break; } + case 3: { // bam - ofp = sam_open(outName.c_str(),"wb"); + ofp = sam_open(outName.c_str(), "wb"); break; } + case 4: { // cram htsFormat fmt4 = {sequence_data, cram, {3, 1}, gzip, 6, NULL}; hts_parse_format(&fmt4, "cram,no_ref=1"); @@ -1143,14 +1293,14 @@ void InReads::openOutputForFile(size_t outId) { std::abort(); } - // Duplicate and write BAM/CRAM header + // Duplicate and write header (UNCHANGED) bam_hdr_t* ohdr = bam_hdr_dup(hdrTemplate); if (!ohdr) { lg.verbose("Failed to duplicate BAM header"); std::abort(); } - // Attach thread pool if available + // Attach thread pool if available (UNCHANGED; works for BGZF too) if (tpool_write.pool) { hts_set_opt(ofp, HTS_OPT_THREAD_POOL, &tpool_write); } @@ -1163,8 +1313,7 @@ void InReads::openOutputForFile(size_t outId) { fps[outId] = ofp; hdrs[outId] = ohdr; - lg.verbose("Opened output file '" + outName + - "' for outId " + std::to_string(outId)); + lg.verbose("Opened output file '" + outName + "' for outId " + std::to_string(outId)); } void InReads::writeToStream() { @@ -1204,10 +1353,7 @@ void InReads::writeToStream() { const uint64_t id = batch->batchN; if (fileId >= numStreams) { - // Defensive: bad file index, recycle (shouldn't happen) - for (bam1_t* rec : batch->reads) - bam_destroy1(rec); - batch->reads.clear(); + batch->used = 0; free_pool_out.push(std::move(batch)); continue; } @@ -1229,20 +1375,19 @@ void InReads::writeToStream() { htsFile* ofp = fps[outId]; bam_hdr_t* ohdr = hdrs[outId]; - lg.verbose("Writing " + std::to_string(bptr->reads.size()) + + lg.verbose("Writing " + std::to_string(bptr->used) + " reads (batch " + std::to_string(bptr->batchN) + ") to outId " + std::to_string(outId) + " (fileId " + std::to_string(fileId) + ")"); - - for (bam1_t* rec : bptr->reads) { + + for (uint32_t k = 0; k < bptr->used; ++k) { + bam1_t* rec = bptr->reads[k]; if (sam_write1(ofp, ohdr, rec) < 0) { fprintf(stderr, "Error writing BAM record (outId %zu)\n", outId); std::abort(); } - bam_destroy1(rec); } - - bptr->reads.clear(); + bptr->used = 0; free_pool_out.push(std::move(bptr)); ++next; @@ -1262,15 +1407,14 @@ void InReads::writeToStream() { htsFile* ofp = fps[outId]; bam_hdr_t* ohdr = hdrs[outId]; - for (bam1_t* rec : bptr->reads) { + for (uint32_t k = 0; k < bptr->used; ++k) { + bam1_t* rec = bptr->reads[k]; if (sam_write1(ofp, ohdr, rec) < 0) { fprintf(stderr, "Error writing BAM record during final flush (outId %zu)\n", outId); std::abort(); } - bam_destroy1(rec); } - - bptr->reads.clear(); + bptr->used = 0; free_pool_out.push(std::move(bptr)); } filePending.clear(); @@ -1579,6 +1723,6 @@ void InReads::readTableCompressed(std::string inFile) { void InReads::printMd5() { - for (auto md5 : md5s) - std::cout<