Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<!-- Parquet dependencies -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't parquet-hadoop enough? Do we need the whole bundle?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea... maybe :)
Feel free to fix any inefficiencies. This PR was a hackish PoC written by GPT

<version>1.15.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.4.1</version>
<scope>test</scope>
</dependency>

</dependencies>
<build>
<plugins>
Expand Down
3 changes: 2 additions & 1 deletion java/src/main/java/com/esamtrade/bucketbase/IBucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -28,6 +27,8 @@ public interface IBucket {

ObjectStream getObjectStream(PurePosixPath name) throws IOException;

long getSize(PurePosixPath name) throws IOException;

List<PurePosixPath> listObjects(PurePosixPath prefix) throws IOException;

ShallowListing shallowListObjects(PurePosixPath prefix) throws IOException;
Expand Down
6 changes: 6 additions & 0 deletions java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public ObjectStream getObjectStream(PurePosixPath name) throws FileNotFoundExcep
return new ObjectStream(new ByteArrayInputStream(content), name.toString());
}

@Override
public long getSize(PurePosixPath name) throws IOException {
byte[] content = getObject(name);
return content.length;
}

@Override
public List<PurePosixPath> listObjects(PurePosixPath prefix) {
splitPrefix(prefix); // validate prefix
Expand Down
129 changes: 97 additions & 32 deletions java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.esamtrade.bucketbase;

import com.amazonaws.SdkClientException;
import org.apache.commons.codec.digest.DigestUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
Expand All @@ -11,11 +13,13 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
import java.util.List;
Expand All @@ -24,20 +28,27 @@
import java.util.stream.Collectors;

public class S3Bucket extends BaseBucket {

private final static int DEFAULT_BUF_SIZE = 8 * 1024;
private final static int DEFAULT_UPLOAD_PART_SIZE = 5 * 1024 * 1024;
protected final int BUF_SIZE; // 8 KB by default
protected S3Client s3Client;
protected S3AsyncClient s3AsyncClient;
protected String bucketName;


public S3Bucket(S3Client s3Client, S3AsyncClient s3AsyncClient, String bucketName) {
public S3Bucket(S3Client s3Client, S3AsyncClient s3AsyncClient, String bucketName, int bufSize) {
this.s3Client = s3Client;
this.s3AsyncClient = s3AsyncClient;
this.bucketName = bucketName;
this.BUF_SIZE = bufSize;
}

public S3Bucket(String endpoint, String accessKey, String secretKey, String bucketName) {
this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName);
this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName, DEFAULT_BUF_SIZE);
}

public S3Bucket(String endpoint, String accessKey, String secretKey, String bucketName, int bufSize) {
this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName, bufSize);
}

private static S3Client createS3Client(String endpoint, String accessKey, String secretKey) {
Expand Down Expand Up @@ -70,26 +81,40 @@ public void putObject(PurePosixPath name, byte[] content) {
}

@Override
public void putObjectStream(PurePosixPath name, InputStream stream) {
public void putObjectStream(PurePosixPath name, InputStream stream) throws IOException {
String _name = validateName(name);
try {
uploadLargeStream(_name, stream);
} catch (Exception e) {
throw new RuntimeException("Failed to upload object: " + _name, e);
throw new IOException("Failed to upload object: " + _name, e);
} finally {
s3AsyncClient.close();
}
}


private void uploadLargeStream(String key, InputStream inputStream) {
int partSize = 5 * 1024 * 1024; // 5 MB
private void uploadLargeStream(String key, InputStream inputStream) throws IOException {
// Please note, that if the input stream will have less than 5MB, the S3 multipart upload throws 400 code (upload is smaller than the minimum allowed object size)
List<CompletedPart> completedParts = new ArrayList<>();
byte[] buffer = new byte[partSize];
byte[] buffer = new byte[DEFAULT_UPLOAD_PART_SIZE];
int bytesRead;
int partNumber = 1;

// 1. Initiate the multipart upload
bytesRead = readUploadBuffer(inputStream, buffer);
if (bytesRead == 0) {
// Empty stream, create empty object
this.putObject(PurePosixPath.from(key), new byte[0]);
return;
}

if (bytesRead < DEFAULT_UPLOAD_PART_SIZE) {
byte[] content = Arrays.copyOf(buffer, bytesRead);
// Small file, use regular putObject to avoid multipart upload being rejected
this.putObject(PurePosixPath.from(key), content);
return;
}

// 1. Initiate the multipart upload for large files
CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
.bucket(bucketName)
.key(key)
Expand All @@ -98,22 +123,16 @@ private void uploadLargeStream(String key, InputStream inputStream) {
String uploadId = response.uploadId();

try {
// 2. Read the input stream and upload each part
while ((bytesRead = inputStream.read(buffer)) != -1) {
byte[] bytesToUpload = (bytesRead < partSize) ? java.util.Arrays.copyOf(buffer, bytesRead) : buffer;
UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
.bucket(bucketName)
.key(key)
.uploadId(uploadId)
.partNumber(partNumber)
.contentLength((long) bytesRead)
.build();
AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytesToUpload);
CompletableFuture<UploadPartResponse> uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody);
completedParts.add(CompletedPart.builder()
.partNumber(partNumber)
.eTag(uploadPartResponse.join().eTag())
.build());
// Upload the first buffer we already read
CompletedPart firstPart = uploadPart(key, uploadId, partNumber, bytesRead, buffer);
completedParts.add(firstPart);
partNumber++;

// 2. Continue reading and uploading remaining parts
while ((bytesRead = readUploadBuffer(inputStream, buffer)) != 0) {
byte[] bytesToUpload = (bytesRead < DEFAULT_UPLOAD_PART_SIZE) ? java.util.Arrays.copyOf(buffer, bytesRead) : buffer;
CompletedPart completedPart = uploadPart(key, uploadId, partNumber, bytesRead, bytesToUpload);
completedParts.add(completedPart);
partNumber++;
}

Expand All @@ -133,10 +152,43 @@ private void uploadLargeStream(String key, InputStream inputStream) {
.uploadId(uploadId)
.build();
s3AsyncClient.abortMultipartUpload(abortMultipartUploadRequest).join();
throw new RuntimeException("Failed to upload object: " + key, e);
throw new IOException("Failed to upload object: " + key, e);
}
}

private int readUploadBuffer(InputStream inputStream, byte[] buffer) throws IOException {
int totalBytesRead = inputStream.read(buffer);

if (totalBytesRead == -1) {
return 0;
}

if (totalBytesRead < DEFAULT_UPLOAD_PART_SIZE) {
int bytesRead;
while (totalBytesRead < DEFAULT_UPLOAD_PART_SIZE && (bytesRead = inputStream.read(buffer, totalBytesRead, DEFAULT_UPLOAD_PART_SIZE - totalBytesRead)) != -1) {
totalBytesRead += bytesRead;
}
}

return totalBytesRead;
}

private CompletedPart uploadPart(String key, String uploadId, int partNumber, long bytesRead, byte[] bytesToUpload) {
UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
.bucket(bucketName)
.key(key)
.uploadId(uploadId)
.partNumber(partNumber)
.contentLength(bytesRead)
.build();
AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytesToUpload);
CompletableFuture<UploadPartResponse> uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody);
return CompletedPart.builder()
.partNumber(partNumber)
.eTag(uploadPartResponse.join().eTag())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if you join() here on the future response, you choose to perform the uploads sequentially. Is this the intention?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, I've missed this (gpt code).
Probably it's not a good idea to perform the upload sequentially, but to paralelize the upload.
Would you be so kind to suggest a better implementation?

.build();
}


@Override
public byte[] getObject(PurePosixPath name) throws IOException {
Expand All @@ -159,14 +211,29 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException {
.key(name.toString())
.build();
InputStream inputStream = s3Client.getObject(request, ResponseTransformer.toInputStream());
return new ObjectStream(inputStream, name.toString());
BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, BUF_SIZE);
return new ObjectStream(bufferedInputStream, name.toString());
} catch (NoSuchKeyException e) {
throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
}
}


@Override
public long getSize(PurePosixPath name) throws IOException {
try {
HeadObjectRequest request = HeadObjectRequest.builder()
.bucket(bucketName)
.key(name.toString())
.build();
HeadObjectResponse response = s3Client.headObject(request);
return response.contentLength();
} catch (NoSuchKeyException e) {
throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
} catch (AwsServiceException | SdkClientException e) {
throw new IOException("Failed to get object size: " + name, e);
}
Comment on lines 221 to +234
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

AWS SDK for Java v2 S3Client.headObject NoSuchKeyException missing object

💡 Result:

Short answer

  • S3Client.headObject throws NoSuchKeyException (an S3 service exception) when the key does not exist. If you lack s3:ListBucket permission S3 may instead return 403 (access denied). Use try/catch for NoSuchKeyException or inspect the S3Exception HTTP status/error code. [1][2]

Example (Java SDK v2)

  • Preferred: call headObject and handle missing-key explicitly:
    try {
    HeadObjectResponse r = s3.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build());
    // object exists
    } catch (NoSuchKeyException e) {
    // object missing
    } catch (S3Exception e) {
    // other service errors (e.g. 403 if no ListBucket permission) — check e.statusCode() / e.awsErrorDetails()
    } catch (SdkClientException e) {
    // client-side errors (network/credentials)
    }

Notes / alternatives

  • If you get 403 for a missing key, ensure the caller has s3:ListBucket on the bucket (or use a different approach). [2]
  • Alternatives: listObjectsV2 with prefix (slower), GetObjectAttributes (if available in your SDK/API region) or attempt a GetObject and handle 404. [5][8]

Sources

  • AWS Java SDK v2 S3Client.headObject docs (throws NoSuchKeyException; behavior depends on s3:ListBucket permission). [1][2]
  • AWS developer guide / examples (head/list guidance). [8]
  • Community/guide pages on HEAD returning 404 when object missing and 403 if no list permission. [5]

References
[1] S3Client API doc (headObject throws NoSuchKeyException).
[2] AWS S3 headObject behavior / permission details.
[5] Checking if an S3 object exists (example using head-object).
[8] AWS SDK for Java developer guide (S3 examples).

If you want, I can paste a ready-to-run Java method that wraps this and returns a boolean.


🌐 Web query:

MinIO S3 headObject 404 exception handling missing key

💡 Result:

Short answer

  • HeadObject/StatObject will return a 404/NotFound (or an SDK error variant that maps to NotFound/NoSuchKey) when the object key does not exist; some SDKs expose this as an HTTP 404 status, others as an error type/code (e.g. NotFound / NoSuchKey). Handle it by checking the HTTP status or the SDK error code and treating that case as “object missing.” (docs.aws.amazon.com)

Practical guidance

  • Catch the SDK error and inspect:
    • the HTTP status (== 404 / http.StatusNotFound), or
    • the service error code (e.g. "NoSuchKey" / NotFound / HeadObjectError::NotFound) — SDKs differ in representation. If you rely on string matching, prefer checking structured fields (error.Code / error.Type) when available. (docs.aws.amazon.com)
  • MinIO specifics:
    • MinIO commonly returns NoSuchKey / NotFound for missing objects; some client libraries historically required checking the error message text (fragile), and there are open issues about exposing stable error constants. Prefer checking HTTP status or SDK-provided enum/fields. (github.com)
  • Permissions caveat: AWS S3 returns 404 only if you have s3:ListBucket; without it you may get 403 (so treat 403 vs 404 differently if you expect “missing” vs “forbidden”). MinIO deployments behind proxies (Cloudflare, etc.) can also change returned status codes. (docs.aws.amazon.com)
  • Eventually-consistent/latency: if objects are created just before you call HeadObject, consider retries with exponential backoff rather than failing immediately. (stackoverflow.com)

Example patterns (pseudocode)

  • Generic:
    • try headObject()
    • if err == nil → object exists
    • else if err.httpStatus == 404 or err.code == "NoSuchKey" or err.isNotFound() → treat as missing
    • else → treat as transient or fatal error (retry or surface)

If you tell me which language/SDK you use (Go, Java, Python, .NET, JS, Rust), I’ll give a concrete code example.

Citations:


🌐 Web query:

AWS SDK Java v2 AwsServiceException statusCode method

💡 Result:

Yes — in AWS SDK for Java v2, AwsServiceException exposes the HTTP status code via SdkServiceException APIs:

  • Instance accessor: int statusCode() — returns the HTTP status code on the exception (defined on SdkServiceException and available on AwsServiceException). [1]
  • Builder setter: AwsServiceException.Builder statusCode(int statusCode) — used when building an AwsServiceException. [2]

References:
[1] AwsServiceException / SdkServiceException Javadoc (statusCode accessor).
[2] AwsServiceException.Builder Javadoc (statusCode(int) builder method).


Handle the edge case where 404 comes via AwsServiceException—good defensive programming, but don't overstate the impact.

headObject primarily throws NoSuchKeyException for missing objects (verified across AWS SDK v2 and MinIO). The edge case where you get AwsServiceException with statusCode == 404 instead is real but rare—defensive coding to handle it makes sense, but this isn't a blocker. The proposed fix is correct: split the exception handlers and check statusCode() == 404 on AwsServiceException. Current code handles the common path; this handles the outlier. Worth doing, but not critical for most deployments.

🔧 Proposed fix
-        } catch (NoSuchKeyException e) {
-            throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
-        } catch (AwsServiceException | SdkClientException e) {
-            throw new IOException("Failed to get object size: " + name, e);
-        }
+        } catch (NoSuchKeyException e) {
+            throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
+        } catch (AwsServiceException e) {
+            if (e.statusCode() == 404) {
+                throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
+            }
+            throw new IOException("Failed to get object size: " + name, e);
+        } catch (SdkClientException e) {
+            throw new IOException("Failed to get object size: " + name, e);
+        }
🤖 Prompt for AI Agents
In `@java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java` around lines 221 -
234, Update S3Bucket.getSize to treat AwsServiceException with HTTP 404 the same
as NoSuchKey: catch AwsServiceException separately (in addition to
NoSuchKeyException) and check e.statusCode() == 404 (or
e.statusCode().toString()/int equivalent depending on type) and throw
FileNotFoundException in that case; otherwise keep the existing catch for
AwsServiceException | SdkClientException to wrap other failures in an
IOException. Ensure you reference getSize, S3Bucket, HeadObjectRequest/Response
and preserve the original error as cause when throwing IOException.

}

/**
* Lists all objects in the S3 bucket with the given prefix.
*
Expand All @@ -175,17 +242,15 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException {
* @param prefix The prefix to filter objects by.
* @return A list of paths to the objects in the bucket.
*/
@Override
public List<PurePosixPath> listObjects(PurePosixPath prefix) {
splitPrefix(prefix); // validate prefix
List<PurePosixPath> result = new ArrayList<>();
ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(prefix.toString())
.build();

List<PurePosixPath> results = s3Client.listObjectsV2Paginator(request).contents().stream().map(S3Object::key).map(PurePosixPath::from).toList();

return results;
return s3Client.listObjectsV2Paginator(request).contents().stream().map(S3Object::key).map(PurePosixPath::from).toList();
}

@Override
Expand Down
14 changes: 14 additions & 0 deletions java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.esamtrade.bucketbase;

import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
Expand Down Expand Up @@ -82,6 +83,19 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException {
return new ObjectStream(inputStream, name.toString());
}

@Override
public long getSize(PurePosixPath name) throws IOException {
try {
return s3Client.getObjectMetadata(bucketName, name.toString()).getContentLength();
}
catch (AmazonS3Exception e) {
if (e.getStatusCode() == 404)
throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
throw new IOException("Failed to get object metadata: " + e.getMessage(), e);
} catch (SdkClientException e) {
throw new IOException("Failed to get object metadata: " + e.getMessage(), e);
}
}

/**
* Retrieves a list of object paths stored in the bucket that match the given prefix.
Expand Down
Loading