-
Notifications
You must be signed in to change notification settings - Fork 89
Utilize memory allocator in ReadProperties.GetStream #547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Utilize memory allocator in ReadProperties.GetStream #547
Conversation
e3a38f7 to
c7bee5e
Compare
| require.NoError(t, err) | ||
|
|
||
| icr := col0.(*file.Int64ColumnChunkReader) | ||
| // require.NoError(t, icr.SeekToRow(3)) // TODO: this causes a panic currently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I uncomment this, then this causes a panic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That shouldn't be panicing as the SeekToRow works correctly based on my last tests.... I'll see if i can debug this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still panics:
--- FAIL: TestDecryptColumns (0.00s)
--- FAIL: TestDecryptColumns/DataPageV2_BufferedRead (0.00s)
panic: cipher: message authentication failed [recovered, repanicked]
goroutine 8 [running]:
testing.tRunner.func1.2({0x2899940, 0xc000054100})
/usr/local/Cellar/go/1.25.6/libexec/src/testing/testing.go:1872 +0x237
testing.tRunner.func1()
/usr/local/Cellar/go/1.25.6/libexec/src/testing/testing.go:1875 +0x35b
panic({0x2899940?, 0xc000054100?})
/usr/local/Cellar/go/1.25.6/libexec/src/runtime/panic.go:783 +0x132
github.com/apache/arrow-go/v18/parquet/internal/encryption.(*aesDecryptor).Decrypt(0xc0002c58f0, {0xc000026800, 0x4000, 0x4000}, {0xc0002c5b10?, 0x600c00003ff20?, 0x44d3560?}, {0xc0002c5e90, 0xd, 0x10})
/daniel-adam-tfs/arrow-go/parquet/internal/encryption/aes.go:262 +0x26d
github.com/apache/arrow-go/v18/parquet/internal/encryption.(*decryptor).Decrypt(0xc0003326c0?, {0xc000026800?, 0xc00003fd40?, 0x1397b05?})
/daniel-adam-tfs/arrow-go/parquet/internal/encryption/decryptor.go:268 +0x45
github.com/apache/arrow-go/v18/parquet/file.(*serializedPageReader).readPageHeader(0xc0003326c0, {0x2aceb20, 0xc00003ff20}, 0xc0002ff740)
/daniel-adam-tfs/arrow-go/parquet/file/page_reader.go:704 +0x139
github.com/apache/arrow-go/v18/parquet/file.(*serializedPageReader).Next(0xc0003326c0)
/daniel-adam-tfs/arrow-go/parquet/file/page_reader.go:798 +0xdd
github.com/apache/arrow-go/v18/parquet/file.(*serializedPageReader).SeekToPageWithRow(0xc0003326c0, 0x3)
/daniel-adam-tfs/arrow-go/parquet/file/page_reader.go:749 +0x186
github.com/apache/arrow-go/v18/parquet/file.(*columnChunkReader).SeekToRow(0xc0002fa780, 0x3)
/daniel-adam-tfs/arrow-go/parquet/file/column_reader.go:584 +0x2a
github.com/apache/arrow-go/v18/parquet/file_test.checkDecryptedValues(0xc000326380, 0xc000210870, 0xc0003283f0)
/daniel-adam-tfs/arrow-go/parquet/file/column_reader_test.go:867 +0x591
github.com/apache/arrow-go/v18/parquet/file_test.TestDecryptColumns.func1(0xc000326380)
/daniel-adam-tfs/arrow-go/parquet/file/column_reader_test.go:964 +0x18a
testing.tRunner(0xc000326380, 0xc0002fe680)
/usr/local/Cellar/go/1.25.6/libexec/src/testing/testing.go:1934 +0xea
created by testing.(*T).Run in goroutine 7
/usr/local/Cellar/go/1.25.6/libexec/src/testing/testing.go:1997 +0x465
FAIL github.com/apache/arrow-go/v18/parquet/file 1.922s
For each of the cases defined, doesn't matter whether it is a V1 or V2 page, or if is it compressed or not, or if BufferedStream is used or not. SeekToRow works in the unencrypted case, so this is again encryption related.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be related to this line:
arrow-go/parquet/file/page_reader.go
Lines 699 to 702 in 38dc64b
| if oidx == nil { | |
| if _, err = section.Seek(p.dataOffset-p.baseOffset, io.SeekStart); err != nil { | |
| return err | |
| } |
I think this is intended to skip past the dictionary page to the data page. But for some reason after this Seek the
arrow-go/parquet/file/page_reader.go
Line 663 in 38dc64b
| view = p.cryptoCtx.MetaDecryptor.Decrypt(view) |
call fails. The offset to the data page is correct, so I'm thinking that some internal state of the decryptor or the page reader is affected by skip past the parsing of the dictionary page.
But I'll remove the SeekToRow and park this for now in #566. Because we are benchmarking various formats + readers of unencrypted data, so I wanna make some optimization (including this PR, which according to my benchmarks speeds things up a little).
|
Alright, decryption should be correct now. Let me rebase this (...and try remember what was happening here 😄 ) |
b9587e2 to
28b112c
Compare
|
By stealing the buffers and conditionally reusing the buffers based on the encryption/compression I got a decent improvement mostly in the allocated memory: ReadInt32 benchmark: ReadInt32Buffered benchmark: |
28b112c to
636a05d
Compare
internal/utils/buf_reader.go
Outdated
|
|
||
| func (r *byteReader) Buffered() int { return len(r.buf) - r.pos } | ||
|
|
||
| func (r *byteReader) Free() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should Free do r.r = nil and r.buf = nil ?
| } | ||
|
|
||
| if r.fileDecryptor == nil { | ||
| stream.Free() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just put defer stream.Free() at line 121 instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't think of an simple way of doing it. The most robust way is something this:
shouldFreeStream := true
takeStream := func(s BufferedReaderV2) BufferedReaderV2 {
shouldFreeStream = false
return s
}
And use takeStream when stream is assigned and returned and check shouldFreeStream in the deferred function.
parquet/reader_properties.go
Outdated
| // Buffered returns the number of bytes already read and stored in the buffer | ||
| Buffered() int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will potentially break other users, we instead need to create a new interface the embeds this one so that we don't break other consumers. Something like adding to internal/utils/buf_reader.go
type BufferedReaderV2 interface {
BufferedReader
Buffered() int
Free()
}d274400 to
3458513
Compare
| @@ -451,12 +477,17 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress. | |||
| func (p *serializedPageReader) Reset(r parquet.BufferedReader, nrows int64, compressType compress.Compression, ctx *CryptoContext) { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I could change the signature of Reset of the PageReader interface to use parquet.BufferedReaderV2 I could skip the whole bufferedReaderV2Adapter shenanigans, but NewPageReader and RecordReader.SetPageReader are public API. If they get ever removed from public API then we can get rid of bufferedReaderV2Adapter as well.
…nsive benchmarks - Refactor page reader to unify V1/V2 page reading logic with better buffer management - Implement readOrStealData to optimize buffered vs direct reads - Reduce buffer allocations by strategically reusing decompress/data buffers Benchmark infrastructure improvements: - Add TestMain with setup/teardown for shared benchmark files - Create 8 benchmark file variants (V1/V2, plain/snappy, encrypted/unencrypted) - Add BenchmarkReadInt32 and BenchmarkReadInt32Buffered with sub-benchmarks - Move file creation to shared setup to eliminate per-benchmark overhead
- Introduce BufferedReaderV2 interface extending BufferedReader with Buffered() and Free() methods. - Enables explicit resource management and buffer introspection for readers using custom allocators. - Update GetStreamV2 to return BufferedReaderV2, allowing callers to free allocated buffers when done. - Keep BufferedReader and BufferedReader for backwards compatibility
3458513 to
68ebe43
Compare
|
I've rebased it, so it has the fix and test from yesterday. But let me add some additional tests, since going by the coverage report not all code branches in the new functions is covered. |
Rationale for this change
Optimization of memory usage, enables the use of custom allocators when reading column data with both buffered and unbuffered readers.
What changes are included in this PR?
Page Reader Optimizations:
Custom Allocator Support:
Are these changes tested?
Are there any user-facing changes?
New Features:
Breaking Changes: