From cc9836b03ca5a537c7a7cdbeb446bb6197d10e22 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 28 Apr 2026 21:07:22 +0200 Subject: [PATCH] persist: rip lgbytes/lgalloc out of blob and arrow paths Lgalloc is disabled in practice, so the memcpy from SDK `Bytes` (azure) or arrow `Buffer`s into freshly-allocated `MetricsRegion`s falls back to the heap and is pure overhead. Stream SDK `Bytes` straight into `SegmentedBytes` for azure and stop reallocating arrow buffers. Keep the parquet null-buffer workaround in `realloc_data` (now `rebuild_data`) since it's independent of lgalloc. Removes: * `LgBytesMetrics` and `MetricsRegion` from `mz_ore::lgbytes` (delete the whole module). * `S3BlobMetrics::lgbytes`. * `ColumnarMetrics::{lgbytes_arrow, cfg, is_cc_active}` and the associated constructor params. * `BlobConfig::try_from`'s `cfg` param, `AzureBlobConfig`'s `cfg` field, and the `persist_enable_arrow_lgalloc_{cc,noncc}_sizes` dyncfgs. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ore/src/lgbytes.rs | 284 ------------------ src/ore/src/lib.rs | 6 - .../src/maelstrom/txn_list_append_multi.rs | 1 - .../src/maelstrom/txn_list_append_single.rs | 1 - src/persist-client/src/cache.rs | 1 - src/persist-client/src/cli/args.rs | 9 +- src/persist-client/src/internal/metrics.rs | 7 +- src/persist/src/azure.rs | 69 +---- src/persist/src/cfg.rs | 7 +- src/persist/src/indexed/columnar/arrow.rs | 91 ++---- src/persist/src/metrics.rs | 30 +- 11 files changed, 35 insertions(+), 471 deletions(-) delete mode 100644 src/ore/src/lgbytes.rs diff --git a/src/ore/src/lgbytes.rs b/src/ore/src/lgbytes.rs deleted file mode 100644 index ddf9bbb52fba3..0000000000000 --- a/src/ore/src/lgbytes.rs +++ /dev/null @@ -1,284 +0,0 @@ -// Copyright Materialize, Inc. and contributors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License in the LICENSE file at the -// root of this repository, or online at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! The [bytes] crate but backed by [lgalloc]. - -use std::fmt::Debug; -use std::time::Instant; - -use bytes::Bytes; -use lgalloc::AllocError; -use prometheus::{Counter, CounterVec, Histogram, IntCounter, IntCounterVec}; -use tracing::debug; - -use crate::cast::{CastFrom, CastLossy}; -use crate::metric; -use crate::metrics::MetricsRegistry; -use crate::region::Region; - -impl From> for Bytes { - fn from(bytes: MetricsRegion) -> Bytes { - // This will handle the drop correctly when the refcount goes to 0... - // see the rustdoc on this method for more details. - Bytes::from_owner(bytes) - } -} - -/// A [Region] wrapper that increments metrics when it is dropped. -/// -/// The `T: Copy` bound ensures that the `Region` doesn't leak resources when -/// dropped. -pub struct MetricsRegion { - buf: Region, - free_count: IntCounter, - free_capacity_bytes: IntCounter, -} - -impl Debug for MetricsRegion { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Debug::fmt(self.buf.as_vec(), f) - } -} - -impl MetricsRegion { - fn capacity_bytes(&self) -> usize { - self.buf.capacity() * std::mem::size_of::() - } - - /// Copy all of the elements from `slice` into the [`Region`]. - /// - /// # Panics - /// - /// * If the [`Region`] does not have enough capacity. - pub fn extend_from_slice(&mut self, slice: &[T]) { - self.buf.extend_from_slice(slice); - } -} - -impl PartialEq for MetricsRegion { - fn eq(&self, other: &Self) -> bool { - self.buf.as_vec() == other.buf.as_vec() - } -} - -impl Eq for MetricsRegion {} - -impl Drop for MetricsRegion { - fn drop(&mut self) { - self.free_count.inc(); - self.free_capacity_bytes - .inc_by(u64::cast_from(self.capacity_bytes())); - } -} - -impl AsRef<[T]> for MetricsRegion { - fn as_ref(&self) -> &[T] { - &self.buf[..] - } -} - -/// Metrics for lgalloc'd bytes.. -#[derive(Debug, Clone)] -pub struct LgBytesMetrics { - /// Metrics for the "persist_azure" usage of lgalloc bytes. - pub persist_azure: LgBytesOpMetrics, - /// Metrics for the "persist_arrow" usage of lgalloc bytes. - pub persist_arrow: LgBytesOpMetrics, -} - -/// Metrics for an individual usage of lgalloc bytes. -#[derive(Clone)] -pub struct LgBytesOpMetrics { - heap: LgBytesRegionMetrics, - mmap: LgBytesRegionMetrics, - alloc_seconds: Counter, - mmap_disabled_count: IntCounter, - mmap_error_count: IntCounter, - // NB: Unlike the _bytes per-Region metrics, which are capacity, this is - // intentionally the requested len. - len_sizes: Histogram, -} - -impl std::fmt::Debug for LgBytesOpMetrics { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LgBytesOperationMetrics") - .finish_non_exhaustive() - } -} - -#[derive(Clone)] -struct LgBytesRegionMetrics { - alloc_count: IntCounter, - alloc_capacity_bytes: IntCounter, - free_count: IntCounter, - free_capacity_bytes: IntCounter, -} - -impl LgBytesMetrics { - /// Returns a new [LgBytesMetrics] connected to the given metrics registry. - pub fn new(registry: &MetricsRegistry) -> Self { - let alloc_count: IntCounterVec = registry.register(metric!( - name: "mz_lgbytes_alloc_count", - help: "count of LgBytes allocations", - var_labels: ["op", "region"], - )); - let alloc_capacity_bytes: IntCounterVec = registry.register(metric!( - name: "mz_lgbytes_alloc_capacity_bytes", - help: "total capacity bytes of LgBytes allocations", - var_labels: ["op", "region"], - )); - let free_count: IntCounterVec = registry.register(metric!( - name: "mz_lgbytes_free_count", - help: "count of LgBytes frees", - var_labels: ["op", "region"], - )); - let free_capacity_bytes: IntCounterVec = registry.register(metric!( - name: "mz_lgbytes_free_capacity_bytes", - help: "total capacity bytes of LgBytes frees", - var_labels: ["op", "region"], - )); - let alloc_seconds: CounterVec = registry.register(metric!( - name: "mz_lgbytes_alloc_seconds", - help: "seconds spent getting LgBytes allocations and copying in data", - var_labels: ["op"], - )); - let mmap_disabled_count: IntCounter = registry.register(metric!( - name: "mz_bytes_mmap_disabled_count", - help: "count alloc attempts with lgalloc disabled", - )); - let mmap_error_count: IntCounter = registry.register(metric!( - name: "mz_bytes_mmap_error_count", - help: "count of errors when attempting file-based mapped alloc", - )); - let len_sizes: Histogram = registry.register(metric!( - name: "mz_bytes_alloc_len_sizes", - help: "histogram of LgBytes alloc len sizes", - buckets: crate::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(), - )); - let op = |name: &str| LgBytesOpMetrics { - heap: LgBytesRegionMetrics { - alloc_count: alloc_count.with_label_values(&[name, "heap"]), - alloc_capacity_bytes: alloc_capacity_bytes.with_label_values(&[name, "heap"]), - free_count: free_count.with_label_values(&[name, "heap"]), - free_capacity_bytes: free_capacity_bytes.with_label_values(&[name, "heap"]), - }, - mmap: LgBytesRegionMetrics { - alloc_count: alloc_count.with_label_values(&[name, "mmap"]), - alloc_capacity_bytes: alloc_capacity_bytes.with_label_values(&[name, "mmap"]), - free_count: free_count.with_label_values(&[name, "mmap"]), - free_capacity_bytes: free_capacity_bytes.with_label_values(&[name, "mmap"]), - }, - alloc_seconds: alloc_seconds.with_label_values(&[name]), - mmap_disabled_count: mmap_disabled_count.clone(), - mmap_error_count: mmap_error_count.clone(), - len_sizes: len_sizes.clone(), - }; - LgBytesMetrics { - persist_azure: op("persist_azure"), - persist_arrow: op("persist_arrow"), - } - } -} - -impl LgBytesOpMetrics { - /// Returns a new empty [`MetricsRegion`] to hold at least `T` elements. - pub fn new_region(&self, capacity: usize) -> MetricsRegion { - let start = Instant::now(); - - // Round the capacity up to the minimum lgalloc mmap size. - let capacity = std::cmp::max(capacity, 1 << lgalloc::VALID_SIZE_CLASS.start); - let region = match Region::new_mmap(capacity) { - Ok(region) => region, - Err(err) => { - if let AllocError::Disabled = err { - self.mmap_disabled_count.inc() - } else { - debug!("failed to mmap allocate: {}", err); - self.mmap_error_count.inc(); - } - Region::new_heap(capacity) - } - }; - let region = self.metrics_region(region); - self.alloc_seconds.inc_by(start.elapsed().as_secs_f64()); - - region - } - - /// Attempts to copy the given bytes into an lgalloc-managed file-based mapped - /// region. If that fails, we return the original bytes. - pub fn try_mmap_bytes(&self, buf: Bytes) -> Bytes { - self.try_mmap_region(buf.as_ref()) - .map(Bytes::from) - .unwrap_or(buf) - } - - /// Attempts to copy the given buf into an lgalloc managed file-based mapped region. - pub fn try_mmap_region( - &self, - buf: impl AsRef<[T]>, - ) -> Result, AllocError> { - let start = Instant::now(); - let buf = buf.as_ref(); - // Round the capacity up to the minimum lgalloc mmap size. - let capacity = std::cmp::max(buf.len(), 1 << lgalloc::VALID_SIZE_CLASS.start); - let buf = match Region::new_mmap(capacity) { - Ok(mut region) => { - region.extend_from_slice(buf); - Ok(region) - } - Err(err) => { - match &err { - AllocError::Disabled => self.mmap_disabled_count.inc(), - err => { - debug!("failed to mmap allocate: {}", err); - self.mmap_error_count.inc(); - } - }; - Err(err) - } - }?; - let region = self.metrics_region(buf); - self.alloc_seconds.inc_by(start.elapsed().as_secs_f64()); - Ok(region) - } - - /// Wraps the already owned buf into a [Region::Heap] with metrics. - /// - /// Besides metrics, this is essentially a no-op. - pub fn heap_region(&self, buf: Vec) -> MetricsRegion { - // Intentionally don't bother incrementing alloc_seconds here. - self.metrics_region(Region::Heap(buf)) - } - - fn metrics_region(&self, buf: Region) -> MetricsRegion { - let metrics = match buf { - Region::MMap(_) => &self.mmap, - Region::Heap(_) => &self.heap, - }; - let region = MetricsRegion { - buf, - free_count: metrics.free_count.clone(), - free_capacity_bytes: metrics.free_capacity_bytes.clone(), - }; - metrics.alloc_count.inc(); - metrics - .alloc_capacity_bytes - .inc_by(u64::cast_from(region.capacity_bytes())); - let len_bytes = region.buf.len() * std::mem::size_of::(); - self.len_sizes.observe(f64::cast_lossy(len_bytes)); - region - } -} diff --git a/src/ore/src/lib.rs b/src/ore/src/lib.rs index 63533346ca42f..9cbafa6350c88 100644 --- a/src/ore/src/lib.rs +++ b/src/ore/src/lib.rs @@ -50,12 +50,6 @@ pub mod hint; pub mod id_gen; pub mod iter; pub mod lex; -#[cfg_attr( - nightly_doc_features, - doc(cfg(all(feature = "bytes", feature = "region"))) -)] -#[cfg(all(feature = "bytes", feature = "region", feature = "tracing"))] -pub mod lgbytes; #[cfg_attr(nightly_doc_features, doc(cfg(feature = "metrics")))] #[cfg(feature = "metrics")] pub mod metrics; diff --git a/src/persist-cli/src/maelstrom/txn_list_append_multi.rs b/src/persist-cli/src/maelstrom/txn_list_append_multi.rs index 9d9158672298e..80c83ff46b31e 100644 --- a/src/persist-cli/src/maelstrom/txn_list_append_multi.rs +++ b/src/persist-cli/src/maelstrom/txn_list_append_multi.rs @@ -393,7 +393,6 @@ impl Service for TransactorService { blob_uri, Box::new(config.clone()), metrics.s3_blob.clone(), - Arc::clone(&config.configs), ) .await .expect("blob_uri should be valid"); diff --git a/src/persist-cli/src/maelstrom/txn_list_append_single.rs b/src/persist-cli/src/maelstrom/txn_list_append_single.rs index 3505d9d075fcd..c84fd616962c3 100644 --- a/src/persist-cli/src/maelstrom/txn_list_append_single.rs +++ b/src/persist-cli/src/maelstrom/txn_list_append_single.rs @@ -601,7 +601,6 @@ impl Service for TransactorService { blob_uri, Box::new(config.clone()), metrics.s3_blob.clone(), - Arc::clone(&config.configs), ) .await .expect("blob_uri should be valid"); diff --git a/src/persist-client/src/cache.rs b/src/persist-client/src/cache.rs index cf95bbd4b16c7..dfd4a98e1416d 100644 --- a/src/persist-client/src/cache.rs +++ b/src/persist-client/src/cache.rs @@ -243,7 +243,6 @@ impl PersistClientCache { x.key(), Box::new(self.cfg.clone()), self.metrics.s3_blob.clone(), - Arc::clone(&self.cfg.configs), ) .await?; let blob = retry_external(&self.metrics.retries.external.blob_open, || { diff --git a/src/persist-client/src/cli/args.rs b/src/persist-client/src/cli/args.rs index f64be03db81a6..199634bba262b 100644 --- a/src/persist-client/src/cli/args.rs +++ b/src/persist-client/src/cli/args.rs @@ -149,13 +149,8 @@ pub(super) async fn make_blob( commit: bool, metrics: Arc, ) -> anyhow::Result> { - let blob = BlobConfig::try_from( - blob_uri, - Box::new(cfg.clone()), - metrics.s3_blob.clone(), - Arc::clone(&cfg.configs), - ) - .await?; + let blob = + BlobConfig::try_from(blob_uri, Box::new(cfg.clone()), metrics.s3_blob.clone()).await?; let blob = blob.clone().open().await?; let blob = if commit { blob diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 6435688b7f32b..8daba8b775854 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -140,12 +140,7 @@ impl Metrics { move || start.elapsed().as_secs_f64(), ); let s3_blob = S3BlobMetrics::new(registry); - let columnar = ColumnarMetrics::new( - registry, - &s3_blob.lgbytes, - Arc::clone(&cfg.configs), - cfg.is_cc_active, - ); + let columnar = ColumnarMetrics::new(registry); Metrics { blob: vecs.blob_metrics(), consensus: vecs.consensus_metrics(), diff --git a/src/persist/src/azure.rs b/src/persist/src/azure.rs index 1f331e86d5135..1c48a0a4abf62 100644 --- a/src/persist/src/azure.rs +++ b/src/persist/src/azure.rs @@ -26,10 +26,8 @@ use tracing::{info, warn}; use url::Url; use uuid::Uuid; -use mz_dyncfg::ConfigSet; use mz_ore::bytes::SegmentedBytes; use mz_ore::cast::CastFrom; -use mz_ore::lgbytes::MetricsRegion; use mz_ore::metrics::MetricsRegistry; use crate::cfg::BlobKnobs; @@ -40,15 +38,9 @@ use crate::metrics::S3BlobMetrics; /// Configuration for opening an [AzureBlob]. #[derive(Clone, Debug)] pub struct AzureBlobConfig { - // The metrics struct here is a bit of a misnomer. We only need access - // to the LgBytes metrics, which has an Azure-specific field. For now, - // it saves considerable plumbing to reuse [S3BlobMetrics]. - // - // TODO: spin up an AzureBlobMetrics and do the plumbing. metrics: S3BlobMetrics, client: ContainerClient, prefix: String, - cfg: Arc, } impl AzureBlobConfig { @@ -66,7 +58,6 @@ impl AzureBlobConfig { metrics: S3BlobMetrics, url: Url, knobs: Box, - cfg: Arc, ) -> Result { let client = if account == EMULATOR_ACCOUNT { info!("Connecting to Azure emulator"); @@ -130,7 +121,6 @@ impl AzureBlobConfig { Ok(AzureBlobConfig { metrics, client, - cfg, prefix, }) } @@ -186,7 +176,6 @@ impl AzureBlobConfig { metrics, Url::parse(&format!("http://localhost:40111/{}", container_name)).expect("valid url"), Box::new(TestBlobKnobs), - Arc::new(ConfigSet::default()), )?; Ok(Some(config)) @@ -199,7 +188,6 @@ pub struct AzureBlob { metrics: S3BlobMetrics, client: ContainerClient, prefix: String, - _cfg: Arc, } impl AzureBlob { @@ -221,7 +209,6 @@ impl AzureBlob { metrics: config.metrics, client: config.client, prefix: config.prefix, - _cfg: config.cfg, }; Ok(ret) @@ -242,55 +229,26 @@ impl Blob for AzureBlob { async fn fetch_chunk( response: GetBlobResponse, metrics: S3BlobMetrics, - ) -> Result { + ) -> Result, ExternalError> { let content_length = response.blob.properties.content_length; - // Here we're being quite defensive. If `content_length` comes back - // as 0 it's most likely incorrect. In that case we'll copy bytes - // of the network into a growable buffer, then copy the entire - // buffer into lgalloc. - let mut buffer = match content_length { - 1.. => { - let region = metrics - .lgbytes - .persist_azure - .new_region(usize::cast_from(content_length)); - PreSizedBuffer::Sized(region) - } - 0 => PreSizedBuffer::Unknown(SegmentedBytes::new()), - }; - + let mut parts: Vec = Vec::new(); + let mut total_len: u64 = 0; let mut body = response.data; while let Some(value) = body.next().await { let value = value .map_err(|e| ExternalError::from(e.context("azure blob get body error")))?; - - match &mut buffer { - PreSizedBuffer::Sized(region) => region.extend_from_slice(&value), - PreSizedBuffer::Unknown(segments) => segments.push(value), - } + total_len += u64::cast_from(value.len()); + parts.push(value); } - // Spill our bytes to lgalloc, if they aren't already. - let lgbytes: Bytes = match buffer { - PreSizedBuffer::Sized(region) => region.into(), - // Now that we've collected all of the segments, we know the size of our region. - PreSizedBuffer::Unknown(segments) => { - let mut region = metrics.lgbytes.persist_azure.new_region(segments.len()); - for segment in segments.into_segments() { - region.extend_from_slice(segment.as_ref()); - } - region.into() - } - }; - // Report if the content-length header didn't match the number of // bytes we read from the network. - if content_length != u64::cast_from(lgbytes.len()) { + if content_length != total_len { metrics.get_invalid_resp.inc(); } - Ok(lgbytes) + Ok(parts) } let mut requests = FuturesOrdered::new(); @@ -322,8 +280,9 @@ impl Blob for AzureBlob { // Await on all of our chunks. let mut segments = SegmentedBytes::with_capacity(requests.len()); while let Some(body) = requests.next().await { - let segment = body.context("azure blob get body err")?; - segments.push(segment); + for part in body.context("azure blob get body err")? { + segments.push(part); + } } Ok(Some(segments)) @@ -421,13 +380,6 @@ impl Blob for AzureBlob { } } -/// If possible we'll pre-allocate a chunk of memory in lgalloc and write into -/// that as we read bytes off the network. -enum PreSizedBuffer { - Sized(MetricsRegion), - Unknown(SegmentedBytes), -} - #[cfg(test)] mod tests { use tracing::info; @@ -456,7 +408,6 @@ mod tests { let config = AzureBlobConfig { metrics: config.metrics.clone(), client: config.client.clone(), - cfg: Arc::new(ConfigSet::default()), prefix: config.prefix.clone(), }; AzureBlob::open(config).await diff --git a/src/persist/src/cfg.rs b/src/persist/src/cfg.rs index 44125bbac9245..4ee3173bd5785 100644 --- a/src/persist/src/cfg.rs +++ b/src/persist/src/cfg.rs @@ -33,10 +33,7 @@ use crate::s3::{S3Blob, S3BlobConfig}; /// Adds the full set of all mz_persist `Config`s. pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet { - configs - .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES) - .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES) - .add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES) + configs.add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES) } /// Config for an implementation of [Blob]. @@ -90,7 +87,6 @@ impl BlobConfig { url: &SensitiveUrl, knobs: Box, metrics: S3BlobMetrics, - cfg: Arc, ) -> Result { let mut query_params = url.query_pairs().collect::>(); @@ -182,7 +178,6 @@ impl BlobConfig { metrics, url.clone().into_redacted(), knobs, - cfg, )?)) } else { Err(anyhow!("unknown persist blob scheme: {}", url.as_str())) diff --git a/src/persist/src/indexed/columnar/arrow.rs b/src/persist/src/indexed/columnar/arrow.rs index fac5bc9f9e8a4..3aca51e5ad22e 100644 --- a/src/persist/src/indexed/columnar/arrow.rs +++ b/src/persist/src/indexed/columnar/arrow.rs @@ -9,15 +9,12 @@ //! Apache Arrow encodings and utils for persist data -use std::ptr::NonNull; use std::sync::Arc; use anyhow::anyhow; use arrow::array::{Array, ArrayData, ArrayRef, BinaryArray, Int64Array, RecordBatch, make_array}; -use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer}; -use arrow::datatypes::ToByteSlice; +use arrow::buffer::{BooleanBuffer, NullBuffer}; use itertools::Itertools; -use mz_dyncfg::Config; use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt}; use crate::indexed::encoding::BlobTraceUpdates; @@ -62,53 +59,30 @@ pub fn encode_arrow_batch(updates: &BlobTraceUpdates) -> RecordBatch { RecordBatch::try_from_iter_with_nullable(fields).expect("valid field definitions") } -pub(crate) const ENABLE_ARROW_LGALLOC_CC_SIZES: Config = Config::new( - "persist_enable_arrow_lgalloc_cc_sizes", - true, - "An incident flag to disable copying decoded arrow data into lgalloc on cc sized clusters.", -); - -pub(crate) const ENABLE_ARROW_LGALLOC_NONCC_SIZES: Config = Config::new( - "persist_enable_arrow_lgalloc_noncc_sizes", - false, - "A feature flag to enable copying decoded arrow data into lgalloc on non-cc sized clusters.", -); - -fn realloc_data(data: ArrayData, nullable: bool, metrics: &ColumnarMetrics) -> ArrayData { - // NB: Arrow generally aligns buffers very coarsely: see arrow::alloc::ALIGNMENT. - // However, lgalloc aligns buffers even more coarsely - to the page boundary - - // so we never expect alignment issues in practice. If that changes, build() - // will return an error below, as it does for all invalid data. - let buffers = data - .buffers() - .iter() - .map(|b| realloc_buffer(b, metrics)) - .collect(); +/// Walks the given arrow [`ArrayData`] recursively, dropping null buffers from +/// non-nullable fields. +/// +/// Workaround for : parquet decoding +/// can generate nulls in non-nullable fields that are only masked by, e.g., a +/// grandparent, but some arrow code expects the direct parent to mask its +/// non-nullable children. Dropping the buffer here prevents those validations +/// from failing. (Top-level arrays are always marked nullable, so they're +/// unaffected.) +fn rebuild_data(data: ArrayData, nullable: bool, metrics: &ColumnarMetrics) -> ArrayData { + let buffers = data.buffers().to_vec(); let child_data = { let field_iter = mz_persist_types::arrow::fields_for_type(data.data_type()).iter(); let child_iter = data.child_data().iter(); field_iter .zip_eq(child_iter) - .map(|(f, d)| realloc_data(d.clone(), f.is_nullable(), metrics)) + .map(|(f, d)| rebuild_data(d.clone(), f.is_nullable(), metrics)) .collect() }; let nulls = if nullable { - data.nulls().map(|n| { - let buffer = realloc_buffer(n.buffer(), metrics); - NullBuffer::new(BooleanBuffer::new(buffer, n.offset(), n.len())) - }) + data.nulls() + .map(|n| NullBuffer::new(BooleanBuffer::new(n.buffer().clone(), n.offset(), n.len()))) } else { if data.nulls().is_some() { - // This is a workaround for: https://github.com/apache/arrow-rs/issues/6510 - // It should always be safe to drop the null buffer for a non-nullable field, since - // any nulls cannot possibly represent real data and thus must be masked off at - // some higher level. We always realloc data we get back from parquet, so this is - // a convenient and efficient place to do the rewrite. - // Why does this help? Parquet decoding can generate nulls in non-nullable fields - // that are only masked by eg. a grandparent, not the direct parent... but some arrow - // code expects the parent to mask any nulls in its non-nullable children. Dropping - // the buffer here prevents those validations from failing. (Top-level arrays are always - // marked nullable, but since they don't have parents that's not a problem either.) metrics.parquet.elided_null_buffers.inc(); } None @@ -125,49 +99,22 @@ fn realloc_data(data: ArrayData, nullable: bool, metrics: &ColumnarMetrics) -> A .expect("reconstructing valid arrow array") } -/// Re-allocate the backing storage for a specific array using lgalloc, if it's configured. -/// (And hopefully-temporarily work around a parquet decoding issue upstream.) +/// Rebuild the given array, dropping null buffers on non-nullable fields. pub fn realloc_array>(array: &A, metrics: &ColumnarMetrics) -> A { let data = array.to_data(); // Top-level arrays are always nullable. - let data = realloc_data(data, true, metrics); + let data = rebuild_data(data, true, metrics); A::from(data) } -/// Re-allocate the backing storage for an array ref using lgalloc, if it's configured. -/// (And hopefully-temporarily work around a parquet decoding issue upstream.) +/// Rebuild the given array ref, dropping null buffers on non-nullable fields. pub fn realloc_any(array: ArrayRef, metrics: &ColumnarMetrics) -> ArrayRef { let data = array.into_data(); // Top-level arrays are always nullable. - let data = realloc_data(data, true, metrics); + let data = rebuild_data(data, true, metrics); make_array(data) } -fn realloc_buffer(buffer: &Buffer, metrics: &ColumnarMetrics) -> Buffer { - let use_lgbytes_mmap = if metrics.is_cc_active { - ENABLE_ARROW_LGALLOC_CC_SIZES.get(&metrics.cfg) - } else { - ENABLE_ARROW_LGALLOC_NONCC_SIZES.get(&metrics.cfg) - }; - let region = if use_lgbytes_mmap { - metrics - .lgbytes_arrow - .try_mmap_region(buffer.as_slice()) - .ok() - } else { - None - }; - let Some(region) = region else { - return buffer.clone(); - }; - let bytes: &[u8] = region.as_ref().to_byte_slice(); - let ptr: NonNull<[u8]> = bytes.into(); - // This is fine: see [[NonNull::as_non_null_ptr]] for an unstable version of this usage. - let ptr: NonNull = ptr.cast(); - // SAFETY: `ptr` is valid for `len` bytes, and kept alive as long as `region` lives. - unsafe { Buffer::from_custom_allocation(ptr, bytes.len(), Arc::new(region)) } -} - /// Converts an [`arrow`] [RecordBatch] into a [BlobTraceUpdates] and reallocate the backing data. pub fn decode_arrow_batch( batch: &RecordBatch, diff --git a/src/persist/src/metrics.rs b/src/persist/src/metrics.rs index 520b1a48f06a6..d09fe7c60062f 100644 --- a/src/persist/src/metrics.rs +++ b/src/persist/src/metrics.rs @@ -9,11 +9,8 @@ //! Implementation-specific metrics for persist blobs and consensus -use std::sync::Arc; use std::time::Instant; -use mz_dyncfg::ConfigSet; -use mz_ore::lgbytes::{LgBytesMetrics, LgBytesOpMetrics}; use mz_ore::metric; use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry}; use prometheus::IntCounterVec; @@ -35,11 +32,6 @@ pub struct S3BlobMetrics { pub(crate) delete_object: IntCounter, pub(crate) list_objects: IntCounter, pub(crate) error_counts: IntCounterVec, - - /// Metrics for all usages of LgBytes. Exposed as public for convenience in - /// persist boot, we'll have to pull this out and do the plumbing - /// differently if mz gains a non-persist user of LgBytes. - pub lgbytes: LgBytesMetrics, } impl S3BlobMetrics { @@ -82,7 +74,6 @@ impl S3BlobMetrics { delete_object: operations.with_label_values(&["delete_object"]), list_objects: operations.with_label_values(&["list_objects"]), error_counts: errors, - lgbytes: LgBytesMetrics::new(registry), } } } @@ -252,29 +243,16 @@ impl ParquetColumnMetrics { /// Metrics for `ColumnarRecords`. #[derive(Debug)] pub struct ColumnarMetrics { - pub(crate) lgbytes_arrow: LgBytesOpMetrics, pub(crate) parquet: ParquetMetrics, pub(crate) arrow: ArrowMetrics, - // TODO: Having these two here isn't quite the right thing to do, but it - // saves a LOT of plumbing. - pub(crate) cfg: Arc, - pub(crate) is_cc_active: bool, } impl ColumnarMetrics { /// Returns a new [ColumnarMetrics]. - pub fn new( - registry: &MetricsRegistry, - lgbytes: &LgBytesMetrics, - cfg: Arc, - is_cc_active: bool, - ) -> Self { + pub fn new(registry: &MetricsRegistry) -> Self { ColumnarMetrics { parquet: ParquetMetrics::new(registry), arrow: ArrowMetrics::new(registry), - lgbytes_arrow: lgbytes.persist_arrow.clone(), - cfg, - is_cc_active, } } @@ -292,10 +270,6 @@ impl ColumnarMetrics { /// /// Exposed for testing. pub fn disconnected() -> Self { - let registry = MetricsRegistry::new(); - let lgbytes = LgBytesMetrics::new(®istry); - let cfg = crate::cfg::all_dyn_configs(ConfigSet::default()); - - Self::new(®istry, &lgbytes, Arc::new(cfg), false) + Self::new(&MetricsRegistry::new()) } }