diff --git a/common/src/main/java/org/apache/comet/cloud/s3/CometS3AccessMode.java b/common/src/main/java/org/apache/comet/cloud/s3/CometS3AccessMode.java new file mode 100644 index 0000000000..f6ef294141 --- /dev/null +++ b/common/src/main/java/org/apache/comet/cloud/s3/CometS3AccessMode.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +/** Access intent passed to {@link CometS3CredentialProvider#getCredentialsForPath}. */ +public enum CometS3AccessMode { + /** GET / HEAD / LIST. All Comet native scan paths request this today. */ + READ, + /** PUT / POST / DELETE / multipart. Reserved for future native write paths. */ + WRITE +} diff --git a/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java b/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java new file mode 100644 index 0000000000..e54e9fa026 --- /dev/null +++ b/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JNI entry point invoked from native code to resolve {@link CometS3CredentialProvider}. + * + *

The provider is resolved once via {@link ServiceLoader} and cached in a {@code static final} + * field. A query falling back from Comet to Spark mid-execution therefore sees identical + * credentials, since both paths resolve from the same executor classpath. + * + *

Multiple registered impls fail fast at class-load; chaining is a vendor-side concern. + */ +public final class CometS3CredentialDispatcher { + + private static final Logger LOG = LoggerFactory.getLogger(CometS3CredentialDispatcher.class); + + private static final CometS3CredentialProvider PROVIDER = resolve(); + private static final CometS3AccessMode[] MODES = CometS3AccessMode.values(); + + private CometS3CredentialDispatcher() {} + + public static boolean isProviderRegistered() { + return PROVIDER != null; + } + + /** Invoked by native code. {@code mode} is the {@link CometS3AccessMode} ordinal. */ + public static CometS3Credentials getCredentialsForPath(String bucket, String path, int mode) + throws Exception { + if (PROVIDER == null) { + throw new IllegalStateException( + "No CometS3CredentialProvider registered; check META-INF/services on the classpath"); + } + if (mode < 0 || mode >= MODES.length) { + throw new IllegalArgumentException("Invalid CometS3AccessMode ordinal: " + mode); + } + CometS3AccessMode accessMode = MODES[mode]; + if (LOG.isDebugEnabled()) { + LOG.debug("Fetching credentials for bucket={} path={} mode={}", bucket, path, accessMode); + } + return PROVIDER.getCredentialsForPath(bucket, path, accessMode); + } + + private static CometS3CredentialProvider resolve() { + List impls = new ArrayList<>(); + for (CometS3CredentialProvider impl : ServiceLoader.load(CometS3CredentialProvider.class)) { + impls.add(impl); + } + if (impls.isEmpty()) { + LOG.info( + "No CometS3CredentialProvider registered; native S3 readers will use the default " + + "AWS credential chain"); + return null; + } + if (impls.size() > 1) { + List names = + impls.stream().map(p -> p.getClass().getName()).collect(Collectors.toList()); + throw new IllegalStateException( + "Multiple CometS3CredentialProvider impls on classpath: " + names); + } + CometS3CredentialProvider provider = impls.get(0); + LOG.info("Registered CometS3CredentialProvider: {}", provider.getClass().getName()); + return provider; + } +} diff --git a/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialProvider.java b/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialProvider.java new file mode 100644 index 0000000000..677645a3e7 --- /dev/null +++ b/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialProvider.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +/** + * SPI for supplying AWS credentials to Comet's native S3 readers, which bypass Spark's Hadoop S3A + * code path and cannot reach signer-based or path-aware credential mechanisms through the standard + * parameterless {@code AWSCredentialsProvider.getCredentials()} contract. + * + *

Peer to {@code org.apache.hadoop.fs.s3a.AwsSignerInitializer} (Hadoop S3A) and {@code + * org.apache.iceberg.aws.AwsClientFactory} (Iceberg-Java): the same shape vendors already implement + * for those two, with a smaller surface (one method). + * + *

Why a new SPI?

+ * + * No existing contract carries per-path AWS credentials from vendor code to Comet's native readers: + * + * + * + *

Vendors register an implementation via {@code + * META-INF/services/org.apache.comet.cloud.s3.CometS3CredentialProvider}. {@link + * #getCredentialsForPath} may be invoked concurrently from many native tokio tasks, so + * implementations must be thread-safe. + * + *

Returns credentials or throws; there is no fall-through return value. A provider that is only + * authoritative for some paths should resolve the default AWS chain itself for the rest. See the + * user guide on cloud credential providers. + */ +public interface CometS3CredentialProvider { + + /** + * @param bucket S3 bucket name (no scheme, no path) + * @param path object key or prefix, leading slash included (matches the URL path component) + * @param mode access intent for this request + * @return non-null credentials; {@code null} is a contract violation + */ + CometS3Credentials getCredentialsForPath(String bucket, String path, CometS3AccessMode mode) + throws Exception; +} diff --git a/common/src/main/java/org/apache/comet/cloud/s3/CometS3Credentials.java b/common/src/main/java/org/apache/comet/cloud/s3/CometS3Credentials.java new file mode 100644 index 0000000000..6c443bde52 --- /dev/null +++ b/common/src/main/java/org/apache/comet/cloud/s3/CometS3Credentials.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +import java.util.Objects; + +/** + * Credentials returned by a {@link CometS3CredentialProvider}. Fields are read back over JNI by + * name, so the field names are part of the cross-language contract. + * + *

{@code sessionToken} is null for non-STS credentials. {@code expirationEpochMillis} of {@code + * 0} means "unknown"; the Iceberg path then caps opendal's cache at a short fallback to avoid + * serving stale credentials for the executor lifetime. + */ +public final class CometS3Credentials { + + private final String accessKeyId; + private final String secretAccessKey; + private final String sessionToken; + private final long expirationEpochMillis; + + public CometS3Credentials( + String accessKeyId, String secretAccessKey, String sessionToken, long expirationEpochMillis) { + this.accessKeyId = Objects.requireNonNull(accessKeyId, "accessKeyId"); + this.secretAccessKey = Objects.requireNonNull(secretAccessKey, "secretAccessKey"); + this.sessionToken = sessionToken; + this.expirationEpochMillis = expirationEpochMillis; + } + + public String getAccessKeyId() { + return accessKeyId; + } + + public String getSecretAccessKey() { + return secretAccessKey; + } + + public String getSessionToken() { + return sessionToken; + } + + public long getExpirationEpochMillis() { + return expirationEpochMillis; + } +} diff --git a/common/src/test/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcherTest.java b/common/src/test/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcherTest.java new file mode 100644 index 0000000000..bc360e401d --- /dev/null +++ b/common/src/test/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcherTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class CometS3CredentialDispatcherTest { + + private static final int READ = CometS3AccessMode.READ.ordinal(); + private static final int WRITE = CometS3AccessMode.WRITE.ordinal(); + + @Before + public void resetTestProvider() { + TestCometS3CredentialProvider.reset(); + } + + @Test + public void providerIsRegisteredFromTestClasspath() { + assertTrue(CometS3CredentialDispatcher.isProviderRegistered()); + } + + @Test + public void getCredentialsRoundTripsThroughProvider() throws Exception { + CometS3Credentials creds = + CometS3CredentialDispatcher.getCredentialsForPath("my-bucket", "path/to/object", READ); + + assertNotNull(creds); + assertEquals("AKIATEST", creds.getAccessKeyId()); + assertEquals("secret", creds.getSecretAccessKey()); + assertEquals("session-tok", creds.getSessionToken()); + assertEquals(0L, creds.getExpirationEpochMillis()); + + assertEquals(1, TestCometS3CredentialProvider.callCount.get()); + assertEquals("my-bucket", TestCometS3CredentialProvider.lastBucket); + assertEquals("path/to/object", TestCometS3CredentialProvider.lastPath); + assertEquals(CometS3AccessMode.READ, TestCometS3CredentialProvider.lastMode); + } + + @Test + public void writeModeIsForwarded() throws Exception { + CometS3CredentialDispatcher.getCredentialsForPath("b", "k", WRITE); + assertEquals(CometS3AccessMode.WRITE, TestCometS3CredentialProvider.lastMode); + } + + @Test + public void unknownModeRejected() { + assertThrows( + IllegalArgumentException.class, + () -> CometS3CredentialDispatcher.getCredentialsForPath("b", "k", 99)); + } + + @Test + public void providerExceptionsPropagate() { + IllegalStateException boom = new IllegalStateException("simulated STS failure"); + TestCometS3CredentialProvider.throwOnNext = boom; + + Exception thrown = + assertThrows( + Exception.class, + () -> CometS3CredentialDispatcher.getCredentialsForPath("b", "k", READ)); + assertSame(boom, thrown); + } + + @Test + public void nullSessionTokenAllowed() throws Exception { + TestCometS3CredentialProvider.nextResult = new CometS3Credentials("AKIA", "sec", null, 0L); + + CometS3Credentials creds = CometS3CredentialDispatcher.getCredentialsForPath("b", "k", READ); + + assertNull(creds.getSessionToken()); + } + + @Test + public void providerReceivesEachCallSeparately() throws Exception { + CometS3CredentialDispatcher.getCredentialsForPath("b1", "k1", READ); + CometS3CredentialDispatcher.getCredentialsForPath("b2", "k2", READ); + CometS3CredentialDispatcher.getCredentialsForPath("b3", "k3", READ); + + assertEquals(3, TestCometS3CredentialProvider.callCount.get()); + assertEquals("b3", TestCometS3CredentialProvider.lastBucket); + assertEquals("k3", TestCometS3CredentialProvider.lastPath); + } +} diff --git a/common/src/test/java/org/apache/comet/cloud/s3/TestCometS3CredentialProvider.java b/common/src/test/java/org/apache/comet/cloud/s3/TestCometS3CredentialProvider.java new file mode 100644 index 0000000000..f0429ebf1c --- /dev/null +++ b/common/src/test/java/org/apache/comet/cloud/s3/TestCometS3CredentialProvider.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Test-only provider registered via {@code META-INF/services}. State is static because the + * dispatcher caches a single provider instance for the JVM lifetime. + */ +public class TestCometS3CredentialProvider implements CometS3CredentialProvider { + + static final AtomicInteger callCount = new AtomicInteger(0); + static volatile String lastBucket; + static volatile String lastPath; + static volatile CometS3AccessMode lastMode; + static volatile RuntimeException throwOnNext; + static volatile CometS3Credentials nextResult = + new CometS3Credentials("AKIATEST", "secret", "session-tok", 0L); + + static void reset() { + callCount.set(0); + lastBucket = null; + lastPath = null; + lastMode = null; + throwOnNext = null; + nextResult = new CometS3Credentials("AKIATEST", "secret", "session-tok", 0L); + } + + @Override + public CometS3Credentials getCredentialsForPath( + String bucket, String path, CometS3AccessMode mode) { + callCount.incrementAndGet(); + lastBucket = bucket; + lastPath = path; + lastMode = mode; + RuntimeException toThrow = throwOnNext; + if (toThrow != null) { + throwOnNext = null; + throw toThrow; + } + return nextResult; + } +} diff --git a/common/src/test/resources/META-INF/services/org.apache.comet.cloud.s3.CometS3CredentialProvider b/common/src/test/resources/META-INF/services/org.apache.comet.cloud.s3.CometS3CredentialProvider new file mode 100644 index 0000000000..cb6a1717e7 --- /dev/null +++ b/common/src/test/resources/META-INF/services/org.apache.comet.cloud.s3.CometS3CredentialProvider @@ -0,0 +1 @@ +org.apache.comet.cloud.s3.TestCometS3CredentialProvider diff --git a/dev/ci/check-suites.py b/dev/ci/check-suites.py index 279c6a89c9..b7369d1707 100644 --- a/dev/ci/check-suites.py +++ b/dev/ci/check-suites.py @@ -35,6 +35,7 @@ def file_to_class_name(path: Path) -> str | None: "org.apache.comet.parquet.ParquetReadSuite", # abstract "org.apache.comet.parquet.ParquetReadFromS3Suite", # manual test suite "org.apache.comet.IcebergReadFromS3Suite", # manual test suite + "org.apache.comet.cloud.s3.CometS3CredentialBridgeSuite", # manual test suite "org.apache.spark.sql.comet.CometPlanStabilitySuite", # abstract "org.apache.spark.sql.comet.ParquetDatetimeRebaseSuite", # abstract "org.apache.comet.exec.CometColumnarShuffleSuite" # abstract diff --git a/docs/source/user-guide/latest/cloud-credential-providers.md b/docs/source/user-guide/latest/cloud-credential-providers.md new file mode 100644 index 0000000000..7a39c84cfb --- /dev/null +++ b/docs/source/user-guide/latest/cloud-credential-providers.md @@ -0,0 +1,204 @@ + + +# Cloud Credential Providers + +Comet's native S3 readers normally fetch credentials from the standard AWS credential chain +(static keys, instance profiles, environment variables, etc.). Some clusters use a vendor-managed +mechanism instead, where credentials are issued per request based on a JWT or per S3 path. For +those clusters, Comet supports loading a vendor-supplied bridge JAR that routes every native +credential request through the vendor's Java code. + +## Do I need this? + +You don't, if any of the following describe your cluster: + +- You use static AWS credentials (`fs.s3a.access.key` / `fs.s3a.secret.key`). +- You use EC2 instance profiles, EKS pod identities, ECS task roles, or environment variables. +- Your S3 access works in Spark today via the default AWS credential chain. + +You probably do, if any of these are true: + +- You have a Hadoop S3A custom signer configured (`fs.s3a.custom.signers=...`). +- You have a Spark `CloudCredentialsProvider` that issues a JWT for a vendor STS service. +- You have a custom Iceberg `client.factory` that injects a configured S3 client. +- Spark queries against your S3 paths work, but the same queries with Comet enabled fail with 403. + +## Enabling a bridge JAR + +Add the vendor-supplied bridge JAR to your Spark executor classpath: + +```sh +spark-submit --jars vendor-comet-bridge.jar ... +``` + +Or via `spark.jars`: + +``` +spark.jars=/path/to/vendor-comet-bridge.jar +``` + +Comet discovers the bridge through `META-INF/services` at executor startup. There are no Comet +config keys to set. + +OSS Comet ships no vendor-specific bridges. Get one from the same vendor that supplies your +Hadoop S3A signer or Iceberg client factory. If they do not yet provide one, send them to the +"Writing a bridge" section below. + +## Verification + +With the bridge on the classpath, executor logs show: + +- At startup: `Registered CometS3CredentialProvider: ` +- On first S3 access (debug level): `Fetching credentials for bucket=... path=... mode=...` + +Without a bridge registered you get exactly one line at startup: + +``` +No CometS3CredentialProvider registered; native S3 readers will use the default AWS credential chain +``` + +## Troubleshooting + +**`Multiple CometS3CredentialProvider impls on classpath: [...]`** at startup. Remove all but one +bridge JAR. Comet does not chain providers; it fails fast to prevent silent ambiguity. + +**`No CometS3CredentialProvider registered`** combined with `403 AccessDenied`. The bridge JAR is +not on the executor classpath. Re-check `--jars` / `spark.jars`. On YARN or Kubernetes, confirm +the JAR actually reached the executor and not only the driver. + +**Credentials silently going stale during long-running jobs.** The bridge caps opendal's +credential cache at 5 minutes when the vendor does not populate `expirationEpochMillis`. Ask the +vendor to return a real expiry; the 5-minute floor is a safety net, not a knob. + +## Iceberg: explicit S3 region required + +With the bridge registered, Comet wires a custom credential loader into `iceberg-storage-opendal`. +opendal's built-in S3 region auto-detection only runs when no custom loader is configured, so on +the bridge path the region (and endpoint for non-AWS) must be set explicitly on the Spark catalog: + +``` +spark.sql.catalog..s3.region = us-east-1 +spark.sql.catalog..s3.endpoint = https://... (non-AWS only) +spark.sql.catalog..s3.path-style-access = true (path-style endpoints only) +``` + +If you hit `region is missing. Please find it by S3::detect_region() or set them in env`, this +is the missing config. + +## Writing a bridge + +Comet's native scan paths (`object_store` for raw Parquet, `opendal` via `iceberg-rust` for +Iceberg) bypass Hadoop S3A entirely. The standard `AWSCredentialsProvider.getCredentials()` has no +path argument, so vendors that issue per-path STS credentials cannot expose them through it. The +`CometS3CredentialProvider` SPI fills that gap. + +Implement `org.apache.comet.cloud.s3.CometS3CredentialProvider`: + +```java +package org.apache.comet.cloud.s3; + +public interface CometS3CredentialProvider { + CometS3Credentials getCredentialsForPath( + String bucket, String path, CometS3AccessMode mode) throws Exception; +} +``` + +Register via `META-INF/services/org.apache.comet.cloud.s3.CometS3CredentialProvider` with the +fully-qualified class name. `getCredentialsForPath` may be invoked concurrently from many native +tokio worker threads; the implementation must be thread-safe. + +### Returned fields + +| Field | Notes | +| ----------------------- | ------------------------------------------------- | +| `accessKeyId` | Required. | +| `secretAccessKey` | Required. | +| `sessionToken` | `null` for non-STS credentials. | +| `expirationEpochMillis` | Absolute expiry. `0` means "unknown" (see below). | + +Provide a real `expirationEpochMillis` whenever you have one. The Iceberg path uses it to decide +when `opendal` must re-call the provider for a fresh credential. `0` is treated as unknown and the +Iceberg path defaults to a 5-minute refresh to bound staleness. + +### Returns or throws + +The SPI follows the same shape as the other JVM AWS-credential SPIs (AWS SDK v1/v2, +Hadoop S3A, Iceberg `VendedCredentialsProvider`): return credentials or throw. There is no +"fall-through" return value. Chaining is a vendor-side concern. + +If your provider is authoritative only for some paths, resolve the default AWS chain yourself for +the rest: + +```java +private final DefaultCredentialsProvider defaultChain = DefaultCredentialsProvider.create(); + +@Override +public CometS3Credentials getCredentialsForPath( + String bucket, String path, CometS3AccessMode mode) throws Exception { + if (handlesPath(bucket, path)) { + return mintFromMyVendorService(bucket, path, mode); + } + AwsCredentials c = defaultChain.resolveCredentials(); + String token = (c instanceof AwsSessionCredentials) + ? ((AwsSessionCredentials) c).sessionToken() + : null; + return new CometS3Credentials(c.accessKeyId(), c.secretAccessKey(), token, 0L); +} +``` + +### Access mode + +| Value | Used for | +| ------- | -------------------------------------------------------------------------- | +| `READ` | All native scan paths (raw Parquet, Iceberg). Comet today only sends READ. | +| `WRITE` | Reserved for future native write paths. | + +A `WRITE` credential is not implicitly read-capable. Vendors that need read-during-write +workflows include the required read permissions in the IAM policy attached to their `WRITE` +credentials. + +### Discovery rules + +- Zero impls registered: native readers use the default AWS credential chain. +- One impl registered: cached and used for every request. +- Multiple impls registered: `CometS3CredentialDispatcher` throws `IllegalStateException` at + class-load. Pick one bridge JAR. + +### Build setup + +Vendor implementations need the Comet SPI classes at compile time only. Use `provided`-scope: + +```xml + + org.apache.datafusion + comet-common-spark${spark.version.short}_${scala.binary.version} + ${comet.version} + provided + +``` + +### Iceberg path: error message fidelity + +When the bridge is wired into `iceberg-rust`, the outer `reqsign-core::ProvideCredentialChain` +currently swallows thrown exceptions into "no credential" before the request reaches opendal. The +credential is still not issued and the request still fails, only the message is degraded to an +opaque anonymous-request failure. No Comet change fixes this; it is resolved upstream if +`iceberg-rust` stops wrapping custom loaders in its outer chain or moves its S3 backend to +`object_store`. diff --git a/docs/source/user-guide/latest/index.rst b/docs/source/user-guide/latest/index.rst index 314a0a51bd..bbacdbe0b8 100644 --- a/docs/source/user-guide/latest/index.rst +++ b/docs/source/user-guide/latest/index.rst @@ -49,4 +49,5 @@ to read more. Tuning Guide Metrics Guide Iceberg Guide + Cloud Credential Providers Kubernetes Guide diff --git a/native/Cargo.lock b/native/Cargo.lock index df3c3b03c0..7f2317de37 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -674,6 +674,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ec6fb3fe69024a75fa7e1bfb48aa6cf59706a101658ea01bfd33b2b248a038f" dependencies = [ "aws-lc-sys", + "untrusted 0.7.1", "zeroize", ] @@ -1994,7 +1995,7 @@ dependencies = [ "object_store", "object_store_opendal", "once_cell", - "opendal 0.56.0", + "opendal", "parking_lot", "parquet", "paste", @@ -2002,6 +2003,7 @@ dependencies = [ "procfs", "prost", "rand 0.10.1", + "reqsign-core", "reqwest 0.12.28", "serde_json", "tempfile", @@ -3514,7 +3516,6 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots", ] [[package]] @@ -3567,7 +3568,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.9.0" -source = "git+https://github.com/apache/iceberg-rust?rev=1ad4bfd#1ad4bfd39319508e79960d16dad1b1cdf965c5f4" +source = "git+https://github.com/apache/iceberg-rust?rev=83b4595#83b4595e2b5f522974d24d51c8ecbd09a093fa92" dependencies = [ "aes-gcm", "anyhow", @@ -3622,7 +3623,7 @@ dependencies = [ [[package]] name = "iceberg-storage-opendal" version = "0.9.0" -source = "git+https://github.com/apache/iceberg-rust?rev=1ad4bfd#1ad4bfd39319508e79960d16dad1b1cdf965c5f4" +source = "git+https://github.com/apache/iceberg-rust?rev=83b4595#83b4595e2b5f522974d24d51c8ecbd09a093fa92" dependencies = [ "anyhow", "async-trait", @@ -3630,9 +3631,9 @@ dependencies = [ "cfg-if", "futures", "iceberg", - "opendal 0.55.0", - "reqsign", - "reqwest 0.12.28", + "opendal", + "reqsign-aws-v4", + "reqsign-core", "serde", "typetag", "url", @@ -4028,17 +4029,20 @@ dependencies = [ [[package]] name = "jsonwebtoken" -version = "9.3.1" +version = "10.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +checksum = "eba32bfb4ffdeaca3e34431072faf01745c9b26d25504aa7a6cf5684334fc4fc" dependencies = [ + "aws-lc-rs", "base64", + "getrandom 0.2.17", "js-sys", "pem", - "ring", "serde", "serde_json", + "signature", "simple_asn1", + "zeroize", ] [[package]] @@ -4569,7 +4573,7 @@ dependencies = [ "futures", "mea", "object_store", - "opendal 0.56.0", + "opendal", "pin-project", "tokio", ] @@ -4598,35 +4602,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "opendal" -version = "0.55.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d075ab8a203a6ab4bc1bce0a4b9fe486a72bf8b939037f4b78d95386384bc80a" -dependencies = [ - "anyhow", - "backon", - "base64", - "bytes", - "crc32c", - "futures", - "getrandom 0.2.17", - "http 1.4.0", - "http-body 1.0.1", - "jiff", - "log", - "md-5", - "percent-encoding", - "quick-xml 0.38.4", - "reqsign", - "reqwest 0.12.28", - "serde", - "serde_json", - "tokio", - "url", - "uuid", -] - [[package]] name = "opendal" version = "0.56.0" @@ -4639,7 +4614,12 @@ dependencies = [ "opendal-layer-logging", "opendal-layer-retry", "opendal-layer-timeout", + "opendal-service-azdls", + "opendal-service-fs", + "opendal-service-gcs", "opendal-service-hdfs", + "opendal-service-oss", + "opendal-service-s3", ] [[package]] @@ -4713,6 +4693,70 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-azdls" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f9884c2d8cf8ba2bb077d79c877dac5863ba3bab9e2c9c1e41a2e0491404772" +dependencies = [ + "bytes", + "http 1.4.0", + "log", + "opendal-core", + "opendal-service-azure-common", + "quick-xml 0.38.4", + "reqsign-azure-storage", + "reqsign-core", + "reqsign-file-read-tokio", + "serde", + "serde_json", +] + +[[package]] +name = "opendal-service-azure-common" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb0e45d6c8dcf66ce2da20e241bcb80e6e540e109a4ff20f318f6c9b4c54e0c" +dependencies = [ + "http 1.4.0", + "opendal-core", +] + +[[package]] +name = "opendal-service-fs" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf0be0417abeeb0053376d816b90fceb9ca98f20dfb54ebf1f2a282729f83663" +dependencies = [ + "bytes", + "log", + "opendal-core", + "serde", + "tokio", + "xattr", +] + +[[package]] +name = "opendal-service-gcs" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70a49477a10163431896d106136117f5670717f9c9e49cf6f710528800c6633a" +dependencies = [ + "async-trait", + "bytes", + "http 1.4.0", + "log", + "opendal-core", + "percent-encoding", + "quick-xml 0.38.4", + "reqsign-core", + "reqsign-file-read-tokio", + "reqsign-google", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "opendal-service-hdfs" version = "0.56.0" @@ -4728,6 +4772,44 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-oss" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c8a917829ad06d21b639558532cb0101fe49b040d946d673a73018683fac05" +dependencies = [ + "bytes", + "http 1.4.0", + "log", + "opendal-core", + "quick-xml 0.38.4", + "reqsign-aliyun-oss", + "reqsign-core", + "reqsign-file-read-tokio", + "serde", +] + +[[package]] +name = "opendal-service-s3" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dadddeb9bb50b0d30927dd914c298c4ddca47e4c1cfa7674d311f0cf9b051c8" +dependencies = [ + "base64", + "bytes", + "crc32c", + "http 1.4.0", + "log", + "md-5", + "opendal-core", + "quick-xml 0.38.4", + "reqsign-aws-v4", + "reqsign-core", + "reqsign-file-read-tokio", + "serde", + "url", +] + [[package]] name = "openssl-probe" version = "0.2.1" @@ -5270,16 +5352,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "quick-xml" -version = "0.37.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "quick-xml" version = "0.38.4" @@ -5383,7 +5455,6 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a" dependencies = [ - "libc", "rand_chacha 0.3.1", "rand_core 0.6.4", ] @@ -5538,35 +5609,63 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" [[package]] -name = "reqsign" -version = "0.16.5" +name = "reqsign-aliyun-oss" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43451dbf3590a7590684c25fb8d12ecdcc90ed3ac123433e500447c7d77ed701" +checksum = "57ac2757f3140aa2e213b554148ae0b52733e624fc6723f0cc6bb3d440176c95" +dependencies = [ + "anyhow", + "form_urlencoded", + "http 1.4.0", + "log", + "percent-encoding", + "reqsign-core", + "rust-ini", + "serde", + "serde_json", +] + +[[package]] +name = "reqsign-aws-v4" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44eaca382e94505a49f1a4849658d153aebf79d9c1a58e5dd3b10361511e9f43" +dependencies = [ + "anyhow", + "bytes", + "form_urlencoded", + "http 1.4.0", + "log", + "percent-encoding", + "quick-xml 0.39.2", + "reqsign-core", + "rust-ini", + "serde", + "serde_json", + "serde_urlencoded", + "sha1", +] + +[[package]] +name = "reqsign-azure-storage" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a321980405d596bd34aaf95c4722a3de4128a67fd19e74a81a83aa3fdf082e6" dependencies = [ "anyhow", - "async-trait", "base64", - "chrono", + "bytes", "form_urlencoded", - "getrandom 0.2.17", - "hex", - "hmac 0.12.1", - "home", "http 1.4.0", "jsonwebtoken", "log", - "once_cell", + "pem", "percent-encoding", - "quick-xml 0.37.5", - "rand 0.8.6", - "reqwest 0.12.28", + "reqsign-core", "rsa", - "rust-ini", "serde", "serde_json", "sha1", - "sha2 0.10.9", - "tokio", ] [[package]] @@ -5591,6 +5690,37 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "reqsign-file-read-tokio" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d89295b3d17abea31851cc8de55d843d89c52132c864963c38d41920613dc5" +dependencies = [ + "anyhow", + "reqsign-core", + "tokio", +] + +[[package]] +name = "reqsign-google" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35cc609b49c69e76ecaceb775a03f792d1ed3e7755ab3548d4534fd801e3242e" +dependencies = [ + "form_urlencoded", + "http 1.4.0", + "jsonwebtoken", + "log", + "percent-encoding", + "reqsign-aws-v4", + "reqsign-core", + "rsa", + "serde", + "serde_json", + "sha2 0.10.9", + "tokio", +] + [[package]] name = "reqwest" version = "0.12.28" @@ -5631,7 +5761,6 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams 0.4.2", "web-sys", - "webpki-roots", ] [[package]] @@ -5691,7 +5820,7 @@ dependencies = [ "cfg-if", "getrandom 0.2.17", "libc", - "untrusted", + "untrusted 0.9.0", "windows-sys 0.52.0", ] @@ -5862,7 +5991,7 @@ dependencies = [ "aws-lc-rs", "ring", "rustls-pki-types", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -6857,6 +6986,12 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "untrusted" version = "0.9.0" @@ -7110,15 +7245,6 @@ dependencies = [ "rustls-pki-types", ] -[[package]] -name = "webpki-roots" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52f5ee44c96cf55f1b349600768e3ece3a8f26010c05265ab73f945bb1a2eb9d" -dependencies = [ - "rustls-pki-types", -] - [[package]] name = "which" version = "4.4.2" @@ -7546,6 +7672,16 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" +[[package]] +name = "xattr" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +dependencies = [ + "libc", + "rustix 1.1.4", +] + [[package]] name = "xmlparser" version = "0.13.6" @@ -7621,6 +7757,20 @@ name = "zeroize" version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] [[package]] name = "zerotrie" diff --git a/native/Cargo.toml b/native/Cargo.toml index d1b5c74af9..c6e07f25e2 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -58,8 +58,9 @@ object_store = { version = "0.13.1", features = ["gcp", "azure", "aws", "http"] url = "2.2" aws-config = "1.8.16" aws-credential-types = "1.2.13" -iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd" } -iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd", features = ["opendal-all"] } +iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "83b4595" } +iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "83b4595", features = ["opendal-all"] } +reqsign-core = "3" [profile.release] debug = true diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 4fb3ed4c5d..1c8c49d09e 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -75,6 +75,7 @@ hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]} opendal = { version = "0.56.0", optional = true, features = ["services-hdfs"] } iceberg = { workspace = true } iceberg-storage-opendal = { workspace = true } +reqsign-core = { workspace = true } serde_json = "1.0" uuid = "1.23.0" diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index 55bcbef349..217a28a699 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -40,9 +40,13 @@ use datafusion::physical_plan::{ use futures::{Stream, StreamExt, TryStreamExt}; use iceberg::arrow::ScanMetrics; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; +use iceberg_storage_opendal::CustomAwsCredentialLoader; use iceberg_storage_opendal::OpenDalStorageFactory; use crate::execution::operators::ExecutionError; +use crate::parquet::objectstore::comet_s3_credential_bridge::{ + AccessMode, CometS3CredentialBridge, +}; use crate::parquet::parquet_support::SparkParquetOptions; use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory; use datafusion_comet_spark_expr::EvalMode; @@ -207,9 +211,12 @@ impl IcebergScanExec { }; match scheme { "file" => Ok(Arc::new(OpenDalStorageFactory::Fs)), - "s3" | "s3a" => Ok(Arc::new(OpenDalStorageFactory::S3 { - customized_credential_load: None, - })), + "s3" | "s3a" => { + let customized_credential_load = build_s3_credential_loader(path); + Ok(Arc::new(OpenDalStorageFactory::S3 { + customized_credential_load, + })) + } "gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)), "oss" => Ok(Arc::new(OpenDalStorageFactory::Oss)), _ => Err(DataFusionError::Execution(format!( @@ -233,6 +240,14 @@ impl IcebergScanExec { } } +/// Wires the registered Comet credential provider into opendal's S3 service for this scan, or +/// returns `None` so opendal falls back to its default credential chain. +fn build_s3_credential_loader(metadata_location: &str) -> Option { + let url = url::Url::parse(metadata_location).ok()?; + let bridge = CometS3CredentialBridge::for_url(&url, AccessMode::Read)?; + Some(CustomAwsCredentialLoader::new(bridge)) +} + /// Metrics for IcebergScanExec struct IcebergScanMetrics { /// Baseline metrics (output rows, elapsed compute time) @@ -397,7 +412,7 @@ fn adapt_batch_with_expressions( return Ok(batch); } - // Zero-column projection (e.g. SELECT count(*)) — preserve row count + // Zero-column projection (e.g. SELECT count(*)), preserve row count if projection_exprs.is_empty() { return Ok(RecordBatch::try_new_with_options( Arc::clone(target_schema), diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 5de14aa610..06b7190fb0 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -29,7 +29,7 @@ pub mod read; pub mod schema_adapter; mod cast_column; -mod objectstore; +pub(crate) mod objectstore; use std::collections::HashMap; use std::task::Poll; diff --git a/native/core/src/parquet/objectstore/comet_s3_credential_bridge.rs b/native/core/src/parquet/objectstore/comet_s3_credential_bridge.rs new file mode 100644 index 0000000000..f9f7dcdf1b --- /dev/null +++ b/native/core/src/parquet/objectstore/comet_s3_credential_bridge.rs @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! JNI bridge to the `CometS3CredentialDispatcher` SPI, exposed as +//! `object_store::CredentialProvider` and `reqsign_core::ProvideCredential` for the raw Parquet +//! and Iceberg scan paths respectively. +//! +//! ```text +//! JVM Native (Rust) +//! --- ------------- +//! +//! ServiceLoader s3.rs (object_store) +//! (one-time, at class-load) iceberg_scan.rs (opendal) +//! | | +//! v v +//! CometS3CredentialDispatcher CometS3CredentialBridge +//! (static singleton) impl object_store::CredentialProvider +//! | ^ impl reqsign_core::ProvideCredential +//! | | | +//! | +<---- JNI call -----------------+ +//! | getCredentialsForPath(bucket, path, mode ordinal) +//! v +//! vendor CometS3CredentialProvider +//! | +//! v +//! CometS3Credentials POJO +//! | +//! +------- JNI field reads ---------------->+ +//! | +//! v +//! AwsCredential / IcebergAwsCredential +//! (used to sign S3 requests) +//! ``` + +use crate::execution::operators::ExecutionError; +use crate::jvm_bridge::{jni_static_call, JVMClasses}; +use crate::JAVA_VM; +use async_trait::async_trait; +use iceberg_storage_opendal::AwsCredential as IcebergAwsCredential; +use jni::objects::{JFieldID, JObject, JString}; +use jni::signature::{Primitive, ReturnType}; +use jni::sys::{jboolean, jint}; +use log::{debug, warn}; +use object_store::aws::AwsCredential; +use object_store::CredentialProvider; +use once_cell::sync::OnceCell; +use reqsign_core::time::Timestamp; +use reqsign_core::{ + Context, Error as ReqsignError, ErrorKind as ReqsignErrorKind, + ProvideCredential as IcebergProvideCredential, +}; +use std::sync::Arc; +use std::time::Duration; +use url::Url; + +/// Cap on opendal's credential cache when the provider does not report an expiry. Prevents the +/// executor from holding a stale credential for the entire job lifetime. +const DEFAULT_EXPIRY_WHEN_UNKNOWN: Duration = Duration::from_secs(300); + +static PROVIDER_REGISTERED: OnceCell = OnceCell::new(); +/// Once-per-process latch for the "missing expiry" warning; bridges are per-scan so a per-bridge +/// latch would log once per scan on the same misbehaving provider. +static WARNED_MISSING_EXPIRY: OnceCell<()> = OnceCell::new(); + +/// Access intent forwarded to the Java SPI. Ordinal must match the JVM `CometS3AccessMode` enum. +#[derive(Debug, Clone, Copy)] +pub enum AccessMode { + Read = 0, + /// No native write path yet; kept so the SPI contract is complete. + #[allow(dead_code)] + Write = 1, +} + +/// True iff a `CometS3CredentialProvider` was discovered on the JVM classpath. +pub fn is_provider_registered() -> bool { + *PROVIDER_REGISTERED.get_or_init(|| { + // Unit tests construct stores without a JVM. Production init always precedes any store + // construction, so the None branch only fires in tests. + if JAVA_VM.get().is_none() { + return false; + } + JVMClasses::with_env(|env| -> Result { + let registered: jboolean = unsafe { + jni_static_call!(env, + comet_s3_credential_dispatcher.is_provider_registered() -> jboolean + )? + }; + Ok(registered) + }) + .unwrap_or_else(|e| { + debug!( + "CometS3CredentialDispatcher.isProviderRegistered failed; native S3 readers \ + will use the default AWS credential chain: {e}" + ); + false + }) + }) +} + +/// Per-request credential provider that delegates to the Java SPI via JNI. Constructed once per +/// S3 store or FileIO and forwards the same `(bucket, path, mode)` tuple on every fetch. +#[derive(Debug)] +pub struct CometS3CredentialBridge { + bucket: String, + path: String, + mode: AccessMode, +} + +impl CometS3CredentialBridge { + pub fn new(bucket: impl Into, path: impl Into, mode: AccessMode) -> Self { + Self { + bucket: bucket.into(), + path: path.into(), + mode, + } + } + + /// Shared constructor for the s3.rs and iceberg_scan.rs call sites. Returns `None` when no + /// provider is registered so callers can fall through to their default credential path. + pub fn for_url(url: &Url, mode: AccessMode) -> Option { + if !is_provider_registered() { + return None; + } + let bucket = url.host_str()?.to_string(); + debug!("Routing S3 credentials for bucket {bucket} through CometS3CredentialProvider"); + Some(Self::new(bucket, url.path(), mode)) + } + + fn fetch_raw(&self) -> Result { + JVMClasses::with_env(|env| -> Result { + let bucket = env + .new_string(&self.bucket) + .map_err(|e| ExecutionError::GeneralError(format!("new_string(bucket): {e}")))?; + let path = env + .new_string(&self.path) + .map_err(|e| ExecutionError::GeneralError(format!("new_string(path): {e}")))?; + let mode = self.mode as jint; + + let creds_obj: JObject = unsafe { + jni_static_call!(env, + comet_s3_credential_dispatcher.get_credentials_for_path( + &bucket, &path, mode + ) -> JObject + )? + }; + if creds_obj.is_null() { + return Err(ExecutionError::GeneralError( + "getCredentialsForPath returned null (contract violation)".to_string(), + )); + } + + let d = &JVMClasses::get().comet_s3_credential_dispatcher; + Ok(RawCredentials { + access_key_id: read_required_string( + env, + &creds_obj, + d.field_access_key_id, + "accessKeyId", + )?, + secret_access_key: read_required_string( + env, + &creds_obj, + d.field_secret_access_key, + "secretAccessKey", + )?, + session_token: read_optional_string(env, &creds_obj, d.field_session_token)?, + expiration_epoch_millis: unsafe { + env.get_field_unchecked( + &creds_obj, + d.field_expiration_epoch_millis, + ReturnType::Primitive(Primitive::Long), + ) + } + .and_then(|v| v.j()) + .map_err(|e| { + ExecutionError::GeneralError(format!("read expirationEpochMillis: {e}")) + })?, + }) + }) + } +} + +struct RawCredentials { + access_key_id: String, + secret_access_key: String, + session_token: Option, + /// Absolute expiry. `0` means the provider did not report one. + expiration_epoch_millis: i64, +} + +#[async_trait] +impl CredentialProvider for CometS3CredentialBridge { + type Credential = AwsCredential; + + async fn get_credential(&self) -> object_store::Result> { + let raw = self.fetch_raw().map_err(|e| object_store::Error::Generic { + store: "S3", + source: e.to_string().into(), + })?; + Ok(Arc::new(AwsCredential { + key_id: raw.access_key_id, + secret_key: raw.secret_access_key, + token: raw.session_token, + })) + } +} + +impl IcebergProvideCredential for CometS3CredentialBridge { + type Credential = IcebergAwsCredential; + + async fn provide_credential( + &self, + _ctx: &Context, + ) -> reqsign_core::Result> { + let raw = self + .fetch_raw() + .map_err(|e| ReqsignError::new(ReqsignErrorKind::CredentialInvalid, e.to_string()))?; + + let expires_in = if raw.expiration_epoch_millis > 0 { + Some( + Timestamp::from_millisecond(raw.expiration_epoch_millis).map_err(|e| { + ReqsignError::new( + ReqsignErrorKind::CredentialInvalid, + format!( + "Invalid expirationEpochMillis {}: {e}", + raw.expiration_epoch_millis + ), + ) + })?, + ) + } else { + if WARNED_MISSING_EXPIRY.set(()).is_ok() { + warn!( + "CometS3CredentialProvider returned credentials without expiration; \ + defaulting to {}s expiry to bound opendal caching", + DEFAULT_EXPIRY_WHEN_UNKNOWN.as_secs() + ); + } + Some(Timestamp::now() + DEFAULT_EXPIRY_WHEN_UNKNOWN) + }; + + Ok(Some(IcebergAwsCredential { + access_key_id: raw.access_key_id, + secret_access_key: raw.secret_access_key, + session_token: raw.session_token, + expires_in, + })) + } +} + +fn read_required_string( + env: &mut jni::Env, + instance: &JObject, + field: JFieldID, + name: &str, +) -> Result { + read_optional_string(env, instance, field)? + .ok_or_else(|| ExecutionError::GeneralError(format!("{name} was null"))) +} + +fn read_optional_string( + env: &mut jni::Env, + instance: &JObject, + field: JFieldID, +) -> Result, ExecutionError> { + let value = unsafe { env.get_field_unchecked(instance, field, ReturnType::Object) } + .and_then(|v| v.l()) + .map_err(|e| ExecutionError::GeneralError(format!("get_field_unchecked: {e}")))?; + if value.is_null() { + return Ok(None); + } + let jstr = unsafe { JString::from_raw(env, value.into_raw()) }; + jstr.try_to_string(env) + .map(Some) + .map_err(|e| ExecutionError::GeneralError(format!("try_to_string: {e}"))) +} diff --git a/native/core/src/parquet/objectstore/mod.rs b/native/core/src/parquet/objectstore/mod.rs index bedae08f69..a12835d5cf 100644 --- a/native/core/src/parquet/objectstore/mod.rs +++ b/native/core/src/parquet/objectstore/mod.rs @@ -15,4 +15,5 @@ // specific language governing permissions and limitations // under the License. +pub mod comet_s3_credential_bridge; pub mod s3; diff --git a/native/core/src/parquet/objectstore/s3.rs b/native/core/src/parquet/objectstore/s3.rs index a427ad8ad5..3e90ad6391 100644 --- a/native/core/src/parquet/objectstore/s3.rs +++ b/native/core/src/parquet/objectstore/s3.rs @@ -21,6 +21,9 @@ use std::sync::OnceLock; use url::Url; use crate::execution::jni_api::get_runtime; +use crate::parquet::objectstore::comet_s3_credential_bridge::{ + AccessMode, CometS3CredentialBridge, +}; use async_trait::async_trait; use aws_config::{ ecs::EcsCredentialsProvider, environment::EnvironmentVariableCredentialsProvider, @@ -78,11 +81,13 @@ pub fn create_store( source: "Missing bucket name in S3 URL".into(), })?; - let credential_provider = - get_runtime().block_on(build_credential_provider(configs, bucket, min_ttl))?; - builder = match credential_provider { - Some(provider) => builder.with_credentials(Arc::new(provider)), - None => builder.with_skip_signature(true), + builder = if let Some(bridge) = CometS3CredentialBridge::for_url(url, AccessMode::Read) { + builder.with_credentials(Arc::new(bridge)) + } else { + match get_runtime().block_on(build_credential_provider(configs, bucket, min_ttl))? { + Some(provider) => builder.with_credentials(Arc::new(provider)), + None => builder.with_skip_signature(true), + } }; let s3_configs = extract_s3_config_options(configs, bucket); diff --git a/native/jni-bridge/src/comet_s3_credential_dispatcher.rs b/native/jni-bridge/src/comet_s3_credential_dispatcher.rs new file mode 100644 index 0000000000..7e3a606692 --- /dev/null +++ b/native/jni-bridge/src/comet_s3_credential_dispatcher.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use jni::{ + errors::Result as JniResult, + objects::{JClass, JFieldID, JStaticMethodID}, + signature::{Primitive, ReturnType}, + strings::JNIString, + Env, +}; + +/// JNI handles for the JVM `CometS3CredentialDispatcher` SPI plus the `CometS3Credentials` POJO +/// whose fields the native bridge reads back. +pub struct CometS3CredentialDispatcher<'a> { + pub class: JClass<'a>, + /// Retained so the cached POJO `JFieldID`s stay alive for the executor lifetime. + #[allow(dead_code)] + pub credentials_class: JClass<'a>, + pub method_is_provider_registered: JStaticMethodID, + pub method_is_provider_registered_ret: ReturnType, + pub method_get_credentials_for_path: JStaticMethodID, + pub method_get_credentials_for_path_ret: ReturnType, + pub field_access_key_id: JFieldID, + pub field_secret_access_key: JFieldID, + pub field_session_token: JFieldID, + pub field_expiration_epoch_millis: JFieldID, +} + +impl<'a> CometS3CredentialDispatcher<'a> { + pub const JVM_CLASS: &'static str = "org/apache/comet/cloud/s3/CometS3CredentialDispatcher"; + pub const CREDENTIALS_CLASS: &'static str = "org/apache/comet/cloud/s3/CometS3Credentials"; + + pub fn new(env: &mut Env<'a>) -> JniResult> { + let class = env.find_class(JNIString::new(Self::JVM_CLASS))?; + let credentials_class = env.find_class(JNIString::new(Self::CREDENTIALS_CLASS))?; + + Ok(CometS3CredentialDispatcher { + method_is_provider_registered: env.get_static_method_id( + JNIString::new(Self::JVM_CLASS), + jni::jni_str!("isProviderRegistered"), + jni::jni_sig!("()Z"), + )?, + method_is_provider_registered_ret: ReturnType::Primitive(Primitive::Boolean), + method_get_credentials_for_path: env.get_static_method_id( + JNIString::new(Self::JVM_CLASS), + jni::jni_str!("getCredentialsForPath"), + jni::jni_sig!( + "(Ljava/lang/String;Ljava/lang/String;I)Lorg/apache/comet/cloud/s3/CometS3Credentials;" + ), + )?, + method_get_credentials_for_path_ret: ReturnType::Object, + field_access_key_id: env.get_field_id( + &credentials_class, + jni::jni_str!("accessKeyId"), + jni::jni_sig!("Ljava/lang/String;"), + )?, + field_secret_access_key: env.get_field_id( + &credentials_class, + jni::jni_str!("secretAccessKey"), + jni::jni_sig!("Ljava/lang/String;"), + )?, + field_session_token: env.get_field_id( + &credentials_class, + jni::jni_str!("sessionToken"), + jni::jni_sig!("Ljava/lang/String;"), + )?, + field_expiration_epoch_millis: env.get_field_id( + &credentials_class, + jni::jni_str!("expirationEpochMillis"), + jni::jni_sig!("J"), + )?, + class, + credentials_class, + }) + } +} diff --git a/native/jni-bridge/src/lib.rs b/native/jni-bridge/src/lib.rs index d72323c961..5420cc8157 100644 --- a/native/jni-bridge/src/lib.rs +++ b/native/jni-bridge/src/lib.rs @@ -191,12 +191,14 @@ mod comet_exec; pub use comet_exec::*; mod batch_iterator; mod comet_metric_node; +mod comet_s3_credential_dispatcher; mod comet_task_memory_manager; mod comet_udf_bridge; mod shuffle_block_iterator; use batch_iterator::CometBatchIterator; pub use comet_metric_node::*; +pub use comet_s3_credential_dispatcher::CometS3CredentialDispatcher; pub use comet_task_memory_manager::*; use comet_udf_bridge::CometUdfBridge; use shuffle_block_iterator::CometShuffleBlockIterator; @@ -233,6 +235,10 @@ pub struct JVMClasses<'a> { /// The CometUdfBridge class used to dispatch JVM scalar UDFs. /// `None` if the class is not on the classpath. pub comet_udf_bridge: Option>, + /// JNI handles for the CometS3CredentialDispatcher SPI and the CometS3Credentials POJO. + /// Always present (the classes ship in `comet-common`); whether a vendor provider is actually + /// registered is a separate runtime check. + pub comet_s3_credential_dispatcher: CometS3CredentialDispatcher<'a>, } unsafe impl Send for JVMClasses<'_> {} @@ -310,6 +316,7 @@ impl JVMClasses<'_> { } bridge }, + comet_s3_credential_dispatcher: CometS3CredentialDispatcher::new(env).unwrap(), } }); } diff --git a/pom.xml b/pom.xml index 7419fecc92..dbd439717f 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ under the License. 4.13.6 2.0.17 33.2.1-jre - 1.21.0 + 1.21.4 2.31.51 ${project.basedir}/../native/target/debug darwin @@ -476,6 +476,23 @@ under the License. ${testcontainers.version} test + + + com.github.docker-java + docker-java-api + 3.7.1 + test + + + com.github.docker-java + docker-java-transport-zerodep + 3.7.1 + test +