-
Notifications
You must be signed in to change notification settings - Fork 1
Added test for read/writing Parquet files as streams to IBucket #144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
cfceeb8
004dd29
4c7751a
3fcf702
daafc0e
a82123f
def083d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,10 @@ | ||
| package com.esamtrade.bucketbase; | ||
|
|
||
| import com.amazonaws.SdkClientException; | ||
| import org.apache.commons.codec.digest.DigestUtils; | ||
| import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; | ||
| import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; | ||
| import software.amazon.awssdk.awscore.exception.AwsServiceException; | ||
| import software.amazon.awssdk.core.async.AsyncRequestBody; | ||
| import software.amazon.awssdk.core.sync.RequestBody; | ||
| import software.amazon.awssdk.core.sync.ResponseTransformer; | ||
|
|
@@ -11,11 +13,13 @@ | |
| import software.amazon.awssdk.services.s3.S3Client; | ||
| import software.amazon.awssdk.services.s3.model.*; | ||
|
|
||
| import java.io.BufferedInputStream; | ||
| import java.io.FileNotFoundException; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.net.URI; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Base64; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
|
|
@@ -24,20 +28,27 @@ | |
| import java.util.stream.Collectors; | ||
|
|
||
| public class S3Bucket extends BaseBucket { | ||
|
|
||
| private final static int DEFAULT_BUF_SIZE = 8 * 1024; | ||
| private final static int DEFAULT_UPLOAD_PART_SIZE = 5 * 1024 * 1024; | ||
| protected final int BUF_SIZE; // 8 KB by default | ||
| protected S3Client s3Client; | ||
| protected S3AsyncClient s3AsyncClient; | ||
| protected String bucketName; | ||
|
|
||
|
|
||
| public S3Bucket(S3Client s3Client, S3AsyncClient s3AsyncClient, String bucketName) { | ||
| public S3Bucket(S3Client s3Client, S3AsyncClient s3AsyncClient, String bucketName, int bufSize) { | ||
| this.s3Client = s3Client; | ||
| this.s3AsyncClient = s3AsyncClient; | ||
| this.bucketName = bucketName; | ||
| this.BUF_SIZE = bufSize; | ||
| } | ||
|
|
||
| public S3Bucket(String endpoint, String accessKey, String secretKey, String bucketName) { | ||
| this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName); | ||
| this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName, DEFAULT_BUF_SIZE); | ||
| } | ||
|
|
||
| public S3Bucket(String endpoint, String accessKey, String secretKey, String bucketName, int bufSize) { | ||
| this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName, bufSize); | ||
| } | ||
|
|
||
| private static S3Client createS3Client(String endpoint, String accessKey, String secretKey) { | ||
|
|
@@ -70,26 +81,40 @@ public void putObject(PurePosixPath name, byte[] content) { | |
| } | ||
|
|
||
| @Override | ||
| public void putObjectStream(PurePosixPath name, InputStream stream) { | ||
| public void putObjectStream(PurePosixPath name, InputStream stream) throws IOException { | ||
| String _name = validateName(name); | ||
| try { | ||
| uploadLargeStream(_name, stream); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to upload object: " + _name, e); | ||
| throw new IOException("Failed to upload object: " + _name, e); | ||
| } finally { | ||
| s3AsyncClient.close(); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| private void uploadLargeStream(String key, InputStream inputStream) { | ||
| int partSize = 5 * 1024 * 1024; // 5 MB | ||
| private void uploadLargeStream(String key, InputStream inputStream) throws IOException { | ||
| // Please note, that if the input stream will have less than 5MB, the S3 multipart upload throws 400 code (upload is smaller than the minimum allowed object size) | ||
| List<CompletedPart> completedParts = new ArrayList<>(); | ||
| byte[] buffer = new byte[partSize]; | ||
| byte[] buffer = new byte[DEFAULT_UPLOAD_PART_SIZE]; | ||
| int bytesRead; | ||
| int partNumber = 1; | ||
|
|
||
| // 1. Initiate the multipart upload | ||
| bytesRead = readUploadBuffer(inputStream, buffer); | ||
| if (bytesRead == 0) { | ||
| // Empty stream, create empty object | ||
| this.putObject(PurePosixPath.from(key), new byte[0]); | ||
| return; | ||
| } | ||
|
|
||
| if (bytesRead < DEFAULT_UPLOAD_PART_SIZE) { | ||
| byte[] content = Arrays.copyOf(buffer, bytesRead); | ||
| // Small file, use regular putObject to avoid multipart upload being rejected | ||
| this.putObject(PurePosixPath.from(key), content); | ||
| return; | ||
| } | ||
|
|
||
| // 1. Initiate the multipart upload for large files | ||
| CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder() | ||
| .bucket(bucketName) | ||
| .key(key) | ||
|
|
@@ -98,22 +123,16 @@ private void uploadLargeStream(String key, InputStream inputStream) { | |
| String uploadId = response.uploadId(); | ||
|
|
||
| try { | ||
| // 2. Read the input stream and upload each part | ||
| while ((bytesRead = inputStream.read(buffer)) != -1) { | ||
| byte[] bytesToUpload = (bytesRead < partSize) ? java.util.Arrays.copyOf(buffer, bytesRead) : buffer; | ||
| UploadPartRequest uploadPartRequest = UploadPartRequest.builder() | ||
| .bucket(bucketName) | ||
| .key(key) | ||
| .uploadId(uploadId) | ||
| .partNumber(partNumber) | ||
| .contentLength((long) bytesRead) | ||
| .build(); | ||
| AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytesToUpload); | ||
| CompletableFuture<UploadPartResponse> uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody); | ||
| completedParts.add(CompletedPart.builder() | ||
| .partNumber(partNumber) | ||
| .eTag(uploadPartResponse.join().eTag()) | ||
| .build()); | ||
| // Upload the first buffer we already read | ||
| CompletedPart firstPart = uploadPart(key, uploadId, partNumber, bytesRead, buffer); | ||
| completedParts.add(firstPart); | ||
| partNumber++; | ||
|
|
||
| // 2. Continue reading and uploading remaining parts | ||
| while ((bytesRead = readUploadBuffer(inputStream, buffer)) != 0) { | ||
| byte[] bytesToUpload = (bytesRead < DEFAULT_UPLOAD_PART_SIZE) ? java.util.Arrays.copyOf(buffer, bytesRead) : buffer; | ||
| CompletedPart completedPart = uploadPart(key, uploadId, partNumber, bytesRead, bytesToUpload); | ||
| completedParts.add(completedPart); | ||
| partNumber++; | ||
| } | ||
|
|
||
|
|
@@ -133,10 +152,43 @@ private void uploadLargeStream(String key, InputStream inputStream) { | |
| .uploadId(uploadId) | ||
| .build(); | ||
| s3AsyncClient.abortMultipartUpload(abortMultipartUploadRequest).join(); | ||
| throw new RuntimeException("Failed to upload object: " + key, e); | ||
| throw new IOException("Failed to upload object: " + key, e); | ||
| } | ||
| } | ||
|
|
||
| private int readUploadBuffer(InputStream inputStream, byte[] buffer) throws IOException { | ||
| int totalBytesRead = inputStream.read(buffer); | ||
|
|
||
| if (totalBytesRead == -1) { | ||
| return 0; | ||
| } | ||
|
|
||
| if (totalBytesRead < DEFAULT_UPLOAD_PART_SIZE) { | ||
| int bytesRead; | ||
| while (totalBytesRead < DEFAULT_UPLOAD_PART_SIZE && (bytesRead = inputStream.read(buffer, totalBytesRead, DEFAULT_UPLOAD_PART_SIZE - totalBytesRead)) != -1) { | ||
| totalBytesRead += bytesRead; | ||
| } | ||
| } | ||
|
|
||
| return totalBytesRead; | ||
| } | ||
|
|
||
| private CompletedPart uploadPart(String key, String uploadId, int partNumber, long bytesRead, byte[] bytesToUpload) { | ||
| UploadPartRequest uploadPartRequest = UploadPartRequest.builder() | ||
| .bucket(bucketName) | ||
| .key(key) | ||
| .uploadId(uploadId) | ||
| .partNumber(partNumber) | ||
| .contentLength(bytesRead) | ||
| .build(); | ||
| AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytesToUpload); | ||
| CompletableFuture<UploadPartResponse> uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody); | ||
| return CompletedPart.builder() | ||
| .partNumber(partNumber) | ||
| .eTag(uploadPartResponse.join().eTag()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that if you join() here on the future response, you choose to perform the uploads sequentially. Is this the intention?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch, I've missed this (gpt code). |
||
| .build(); | ||
| } | ||
|
|
||
|
|
||
| @Override | ||
| public byte[] getObject(PurePosixPath name) throws IOException { | ||
|
|
@@ -159,14 +211,29 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException { | |
| .key(name.toString()) | ||
| .build(); | ||
| InputStream inputStream = s3Client.getObject(request, ResponseTransformer.toInputStream()); | ||
| return new ObjectStream(inputStream, name.toString()); | ||
| BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, BUF_SIZE); | ||
| return new ObjectStream(bufferedInputStream, name.toString()); | ||
| } catch (NoSuchKeyException e) { | ||
| throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| @Override | ||
| public long getSize(PurePosixPath name) throws IOException { | ||
| try { | ||
| HeadObjectRequest request = HeadObjectRequest.builder() | ||
| .bucket(bucketName) | ||
| .key(name.toString()) | ||
| .build(); | ||
| HeadObjectResponse response = s3Client.headObject(request); | ||
| return response.contentLength(); | ||
| } catch (NoSuchKeyException e) { | ||
| throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); | ||
| } catch (AwsServiceException | SdkClientException e) { | ||
| throw new IOException("Failed to get object size: " + name, e); | ||
| } | ||
|
Comment on lines
221
to
+234
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: Short answer
Example (Java SDK v2)
Notes / alternatives
Sources
References If you want, I can paste a ready-to-run Java method that wraps this and returns a boolean. 🌐 Web query:
💡 Result: Short answer
Practical guidance
Example patterns (pseudocode)
If you tell me which language/SDK you use (Go, Java, Python, .NET, JS, Rust), I’ll give a concrete code example. Citations:
🌐 Web query:
💡 Result: Yes — in AWS SDK for Java v2, AwsServiceException exposes the HTTP status code via SdkServiceException APIs:
References: Handle the edge case where 404 comes via
🔧 Proposed fix- } catch (NoSuchKeyException e) {
- throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
- } catch (AwsServiceException | SdkClientException e) {
- throw new IOException("Failed to get object size: " + name, e);
- }
+ } catch (NoSuchKeyException e) {
+ throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
+ } catch (AwsServiceException e) {
+ if (e.statusCode() == 404) {
+ throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
+ }
+ throw new IOException("Failed to get object size: " + name, e);
+ } catch (SdkClientException e) {
+ throw new IOException("Failed to get object size: " + name, e);
+ }🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| /** | ||
| * Lists all objects in the S3 bucket with the given prefix. | ||
| * | ||
|
|
@@ -175,17 +242,15 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException { | |
| * @param prefix The prefix to filter objects by. | ||
| * @return A list of paths to the objects in the bucket. | ||
| */ | ||
| @Override | ||
| public List<PurePosixPath> listObjects(PurePosixPath prefix) { | ||
| splitPrefix(prefix); // validate prefix | ||
| List<PurePosixPath> result = new ArrayList<>(); | ||
| ListObjectsV2Request request = ListObjectsV2Request.builder() | ||
| .bucket(bucketName) | ||
| .prefix(prefix.toString()) | ||
| .build(); | ||
|
|
||
| List<PurePosixPath> results = s3Client.listObjectsV2Paginator(request).contents().stream().map(S3Object::key).map(PurePosixPath::from).toList(); | ||
|
|
||
| return results; | ||
| return s3Client.listObjectsV2Paginator(request).contents().stream().map(S3Object::key).map(PurePosixPath::from).toList(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't parquet-hadoop enough? Do we need the whole bundle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea... maybe :)
Feel free to fix any inefficiencies. This PR was a hackish PoC written by GPT