diff --git a/common/src/main/java/org/apache/comet/cloud/s3/CometS3AccessMode.java b/common/src/main/java/org/apache/comet/cloud/s3/CometS3AccessMode.java
new file mode 100644
index 0000000000..f6ef294141
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/cloud/s3/CometS3AccessMode.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+/** Access intent passed to {@link CometS3CredentialProvider#getCredentialsForPath}. */
+public enum CometS3AccessMode {
+ /** GET / HEAD / LIST. All Comet native scan paths request this today. */
+ READ,
+ /** PUT / POST / DELETE / multipart. Reserved for future native write paths. */
+ WRITE
+}
diff --git a/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java b/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java
new file mode 100644
index 0000000000..e54e9fa026
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JNI entry point invoked from native code to resolve {@link CometS3CredentialProvider}.
+ *
+ *
The provider is resolved once via {@link ServiceLoader} and cached in a {@code static final}
+ * field. A query falling back from Comet to Spark mid-execution therefore sees identical
+ * credentials, since both paths resolve from the same executor classpath.
+ *
+ *
Multiple registered impls fail fast at class-load; chaining is a vendor-side concern.
+ */
+public final class CometS3CredentialDispatcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CometS3CredentialDispatcher.class);
+
+ private static final CometS3CredentialProvider PROVIDER = resolve();
+ private static final CometS3AccessMode[] MODES = CometS3AccessMode.values();
+
+ private CometS3CredentialDispatcher() {}
+
+ public static boolean isProviderRegistered() {
+ return PROVIDER != null;
+ }
+
+ /** Invoked by native code. {@code mode} is the {@link CometS3AccessMode} ordinal. */
+ public static CometS3Credentials getCredentialsForPath(String bucket, String path, int mode)
+ throws Exception {
+ if (PROVIDER == null) {
+ throw new IllegalStateException(
+ "No CometS3CredentialProvider registered; check META-INF/services on the classpath");
+ }
+ if (mode < 0 || mode >= MODES.length) {
+ throw new IllegalArgumentException("Invalid CometS3AccessMode ordinal: " + mode);
+ }
+ CometS3AccessMode accessMode = MODES[mode];
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fetching credentials for bucket={} path={} mode={}", bucket, path, accessMode);
+ }
+ return PROVIDER.getCredentialsForPath(bucket, path, accessMode);
+ }
+
+ private static CometS3CredentialProvider resolve() {
+ List impls = new ArrayList<>();
+ for (CometS3CredentialProvider impl : ServiceLoader.load(CometS3CredentialProvider.class)) {
+ impls.add(impl);
+ }
+ if (impls.isEmpty()) {
+ LOG.info(
+ "No CometS3CredentialProvider registered; native S3 readers will use the default "
+ + "AWS credential chain");
+ return null;
+ }
+ if (impls.size() > 1) {
+ List names =
+ impls.stream().map(p -> p.getClass().getName()).collect(Collectors.toList());
+ throw new IllegalStateException(
+ "Multiple CometS3CredentialProvider impls on classpath: " + names);
+ }
+ CometS3CredentialProvider provider = impls.get(0);
+ LOG.info("Registered CometS3CredentialProvider: {}", provider.getClass().getName());
+ return provider;
+ }
+}
diff --git a/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialProvider.java b/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialProvider.java
new file mode 100644
index 0000000000..677645a3e7
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+/**
+ * SPI for supplying AWS credentials to Comet's native S3 readers, which bypass Spark's Hadoop S3A
+ * code path and cannot reach signer-based or path-aware credential mechanisms through the standard
+ * parameterless {@code AWSCredentialsProvider.getCredentials()} contract.
+ *
+ *
Peer to {@code org.apache.hadoop.fs.s3a.AwsSignerInitializer} (Hadoop S3A) and {@code
+ * org.apache.iceberg.aws.AwsClientFactory} (Iceberg-Java): the same shape vendors already implement
+ * for those two, with a smaller surface (one method).
+ *
+ *
Why a new SPI?
+ *
+ * No existing contract carries per-path AWS credentials from vendor code to Comet's native readers:
+ *
+ *
+ *
{@code org.apache.spark.deploy.security.cloud.CloudCredentialsProvider} yields a single JWT
+ * per service name. No path argument and does not return AWS credentials.
+ *
Hadoop S3A custom signers hide path-aware logic inside {@code Signer.sign(request,
+ * credentials)}. Credentials never leave the signing pipeline, and the underlying secret key
+ * is an HMAC key (not present in the signed output), so running the signer on a synthesized
+ * request cannot recover it.
+ *
Reflecting into vendor singletons encodes per-vendor class and lifecycle details in Comet
+ * and breaks silently on vendor upgrades.
+ *
A Comet-specific HTTP STS endpoint would push a new public API onto every vendor; vendors
+ * ship this logic as Java code, not HTTP.
+ *
+ *
+ *
Vendors register an implementation via {@code
+ * META-INF/services/org.apache.comet.cloud.s3.CometS3CredentialProvider}. {@link
+ * #getCredentialsForPath} may be invoked concurrently from many native tokio tasks, so
+ * implementations must be thread-safe.
+ *
+ *
Returns credentials or throws; there is no fall-through return value. A provider that is only
+ * authoritative for some paths should resolve the default AWS chain itself for the rest. See the
+ * user guide on cloud credential providers.
+ */
+public interface CometS3CredentialProvider {
+
+ /**
+ * @param bucket S3 bucket name (no scheme, no path)
+ * @param path object key or prefix, leading slash included (matches the URL path component)
+ * @param mode access intent for this request
+ * @return non-null credentials; {@code null} is a contract violation
+ */
+ CometS3Credentials getCredentialsForPath(String bucket, String path, CometS3AccessMode mode)
+ throws Exception;
+}
diff --git a/common/src/main/java/org/apache/comet/cloud/s3/CometS3Credentials.java b/common/src/main/java/org/apache/comet/cloud/s3/CometS3Credentials.java
new file mode 100644
index 0000000000..6c443bde52
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/cloud/s3/CometS3Credentials.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import java.util.Objects;
+
+/**
+ * Credentials returned by a {@link CometS3CredentialProvider}. Fields are read back over JNI by
+ * name, so the field names are part of the cross-language contract.
+ *
+ *
{@code sessionToken} is null for non-STS credentials. {@code expirationEpochMillis} of {@code
+ * 0} means "unknown"; the Iceberg path then caps opendal's cache at a short fallback to avoid
+ * serving stale credentials for the executor lifetime.
+ */
+public final class CometS3Credentials {
+
+ private final String accessKeyId;
+ private final String secretAccessKey;
+ private final String sessionToken;
+ private final long expirationEpochMillis;
+
+ public CometS3Credentials(
+ String accessKeyId, String secretAccessKey, String sessionToken, long expirationEpochMillis) {
+ this.accessKeyId = Objects.requireNonNull(accessKeyId, "accessKeyId");
+ this.secretAccessKey = Objects.requireNonNull(secretAccessKey, "secretAccessKey");
+ this.sessionToken = sessionToken;
+ this.expirationEpochMillis = expirationEpochMillis;
+ }
+
+ public String getAccessKeyId() {
+ return accessKeyId;
+ }
+
+ public String getSecretAccessKey() {
+ return secretAccessKey;
+ }
+
+ public String getSessionToken() {
+ return sessionToken;
+ }
+
+ public long getExpirationEpochMillis() {
+ return expirationEpochMillis;
+ }
+}
diff --git a/common/src/test/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcherTest.java b/common/src/test/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcherTest.java
new file mode 100644
index 0000000000..bc360e401d
--- /dev/null
+++ b/common/src/test/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcherTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class CometS3CredentialDispatcherTest {
+
+ private static final int READ = CometS3AccessMode.READ.ordinal();
+ private static final int WRITE = CometS3AccessMode.WRITE.ordinal();
+
+ @Before
+ public void resetTestProvider() {
+ TestCometS3CredentialProvider.reset();
+ }
+
+ @Test
+ public void providerIsRegisteredFromTestClasspath() {
+ assertTrue(CometS3CredentialDispatcher.isProviderRegistered());
+ }
+
+ @Test
+ public void getCredentialsRoundTripsThroughProvider() throws Exception {
+ CometS3Credentials creds =
+ CometS3CredentialDispatcher.getCredentialsForPath("my-bucket", "path/to/object", READ);
+
+ assertNotNull(creds);
+ assertEquals("AKIATEST", creds.getAccessKeyId());
+ assertEquals("secret", creds.getSecretAccessKey());
+ assertEquals("session-tok", creds.getSessionToken());
+ assertEquals(0L, creds.getExpirationEpochMillis());
+
+ assertEquals(1, TestCometS3CredentialProvider.callCount.get());
+ assertEquals("my-bucket", TestCometS3CredentialProvider.lastBucket);
+ assertEquals("path/to/object", TestCometS3CredentialProvider.lastPath);
+ assertEquals(CometS3AccessMode.READ, TestCometS3CredentialProvider.lastMode);
+ }
+
+ @Test
+ public void writeModeIsForwarded() throws Exception {
+ CometS3CredentialDispatcher.getCredentialsForPath("b", "k", WRITE);
+ assertEquals(CometS3AccessMode.WRITE, TestCometS3CredentialProvider.lastMode);
+ }
+
+ @Test
+ public void unknownModeRejected() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> CometS3CredentialDispatcher.getCredentialsForPath("b", "k", 99));
+ }
+
+ @Test
+ public void providerExceptionsPropagate() {
+ IllegalStateException boom = new IllegalStateException("simulated STS failure");
+ TestCometS3CredentialProvider.throwOnNext = boom;
+
+ Exception thrown =
+ assertThrows(
+ Exception.class,
+ () -> CometS3CredentialDispatcher.getCredentialsForPath("b", "k", READ));
+ assertSame(boom, thrown);
+ }
+
+ @Test
+ public void nullSessionTokenAllowed() throws Exception {
+ TestCometS3CredentialProvider.nextResult = new CometS3Credentials("AKIA", "sec", null, 0L);
+
+ CometS3Credentials creds = CometS3CredentialDispatcher.getCredentialsForPath("b", "k", READ);
+
+ assertNull(creds.getSessionToken());
+ }
+
+ @Test
+ public void providerReceivesEachCallSeparately() throws Exception {
+ CometS3CredentialDispatcher.getCredentialsForPath("b1", "k1", READ);
+ CometS3CredentialDispatcher.getCredentialsForPath("b2", "k2", READ);
+ CometS3CredentialDispatcher.getCredentialsForPath("b3", "k3", READ);
+
+ assertEquals(3, TestCometS3CredentialProvider.callCount.get());
+ assertEquals("b3", TestCometS3CredentialProvider.lastBucket);
+ assertEquals("k3", TestCometS3CredentialProvider.lastPath);
+ }
+}
diff --git a/common/src/test/java/org/apache/comet/cloud/s3/TestCometS3CredentialProvider.java b/common/src/test/java/org/apache/comet/cloud/s3/TestCometS3CredentialProvider.java
new file mode 100644
index 0000000000..f0429ebf1c
--- /dev/null
+++ b/common/src/test/java/org/apache/comet/cloud/s3/TestCometS3CredentialProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test-only provider registered via {@code META-INF/services}. State is static because the
+ * dispatcher caches a single provider instance for the JVM lifetime.
+ */
+public class TestCometS3CredentialProvider implements CometS3CredentialProvider {
+
+ static final AtomicInteger callCount = new AtomicInteger(0);
+ static volatile String lastBucket;
+ static volatile String lastPath;
+ static volatile CometS3AccessMode lastMode;
+ static volatile RuntimeException throwOnNext;
+ static volatile CometS3Credentials nextResult =
+ new CometS3Credentials("AKIATEST", "secret", "session-tok", 0L);
+
+ static void reset() {
+ callCount.set(0);
+ lastBucket = null;
+ lastPath = null;
+ lastMode = null;
+ throwOnNext = null;
+ nextResult = new CometS3Credentials("AKIATEST", "secret", "session-tok", 0L);
+ }
+
+ @Override
+ public CometS3Credentials getCredentialsForPath(
+ String bucket, String path, CometS3AccessMode mode) {
+ callCount.incrementAndGet();
+ lastBucket = bucket;
+ lastPath = path;
+ lastMode = mode;
+ RuntimeException toThrow = throwOnNext;
+ if (toThrow != null) {
+ throwOnNext = null;
+ throw toThrow;
+ }
+ return nextResult;
+ }
+}
diff --git a/common/src/test/resources/META-INF/services/org.apache.comet.cloud.s3.CometS3CredentialProvider b/common/src/test/resources/META-INF/services/org.apache.comet.cloud.s3.CometS3CredentialProvider
new file mode 100644
index 0000000000..cb6a1717e7
--- /dev/null
+++ b/common/src/test/resources/META-INF/services/org.apache.comet.cloud.s3.CometS3CredentialProvider
@@ -0,0 +1 @@
+org.apache.comet.cloud.s3.TestCometS3CredentialProvider
diff --git a/dev/ci/check-suites.py b/dev/ci/check-suites.py
index 279c6a89c9..b7369d1707 100644
--- a/dev/ci/check-suites.py
+++ b/dev/ci/check-suites.py
@@ -35,6 +35,7 @@ def file_to_class_name(path: Path) -> str | None:
"org.apache.comet.parquet.ParquetReadSuite", # abstract
"org.apache.comet.parquet.ParquetReadFromS3Suite", # manual test suite
"org.apache.comet.IcebergReadFromS3Suite", # manual test suite
+ "org.apache.comet.cloud.s3.CometS3CredentialBridgeSuite", # manual test suite
"org.apache.spark.sql.comet.CometPlanStabilitySuite", # abstract
"org.apache.spark.sql.comet.ParquetDatetimeRebaseSuite", # abstract
"org.apache.comet.exec.CometColumnarShuffleSuite" # abstract
diff --git a/docs/source/user-guide/latest/cloud-credential-providers.md b/docs/source/user-guide/latest/cloud-credential-providers.md
new file mode 100644
index 0000000000..7a39c84cfb
--- /dev/null
+++ b/docs/source/user-guide/latest/cloud-credential-providers.md
@@ -0,0 +1,204 @@
+
+
+# Cloud Credential Providers
+
+Comet's native S3 readers normally fetch credentials from the standard AWS credential chain
+(static keys, instance profiles, environment variables, etc.). Some clusters use a vendor-managed
+mechanism instead, where credentials are issued per request based on a JWT or per S3 path. For
+those clusters, Comet supports loading a vendor-supplied bridge JAR that routes every native
+credential request through the vendor's Java code.
+
+## Do I need this?
+
+You don't, if any of the following describe your cluster:
+
+- You use static AWS credentials (`fs.s3a.access.key` / `fs.s3a.secret.key`).
+- You use EC2 instance profiles, EKS pod identities, ECS task roles, or environment variables.
+- Your S3 access works in Spark today via the default AWS credential chain.
+
+You probably do, if any of these are true:
+
+- You have a Hadoop S3A custom signer configured (`fs.s3a.custom.signers=...`).
+- You have a Spark `CloudCredentialsProvider` that issues a JWT for a vendor STS service.
+- You have a custom Iceberg `client.factory` that injects a configured S3 client.
+- Spark queries against your S3 paths work, but the same queries with Comet enabled fail with 403.
+
+## Enabling a bridge JAR
+
+Add the vendor-supplied bridge JAR to your Spark executor classpath:
+
+```sh
+spark-submit --jars vendor-comet-bridge.jar ...
+```
+
+Or via `spark.jars`:
+
+```
+spark.jars=/path/to/vendor-comet-bridge.jar
+```
+
+Comet discovers the bridge through `META-INF/services` at executor startup. There are no Comet
+config keys to set.
+
+OSS Comet ships no vendor-specific bridges. Get one from the same vendor that supplies your
+Hadoop S3A signer or Iceberg client factory. If they do not yet provide one, send them to the
+"Writing a bridge" section below.
+
+## Verification
+
+With the bridge on the classpath, executor logs show:
+
+- At startup: `Registered CometS3CredentialProvider: `
+- On first S3 access (debug level): `Fetching credentials for bucket=... path=... mode=...`
+
+Without a bridge registered you get exactly one line at startup:
+
+```
+No CometS3CredentialProvider registered; native S3 readers will use the default AWS credential chain
+```
+
+## Troubleshooting
+
+**`Multiple CometS3CredentialProvider impls on classpath: [...]`** at startup. Remove all but one
+bridge JAR. Comet does not chain providers; it fails fast to prevent silent ambiguity.
+
+**`No CometS3CredentialProvider registered`** combined with `403 AccessDenied`. The bridge JAR is
+not on the executor classpath. Re-check `--jars` / `spark.jars`. On YARN or Kubernetes, confirm
+the JAR actually reached the executor and not only the driver.
+
+**Credentials silently going stale during long-running jobs.** The bridge caps opendal's
+credential cache at 5 minutes when the vendor does not populate `expirationEpochMillis`. Ask the
+vendor to return a real expiry; the 5-minute floor is a safety net, not a knob.
+
+## Iceberg: explicit S3 region required
+
+With the bridge registered, Comet wires a custom credential loader into `iceberg-storage-opendal`.
+opendal's built-in S3 region auto-detection only runs when no custom loader is configured, so on
+the bridge path the region (and endpoint for non-AWS) must be set explicitly on the Spark catalog:
+
+```
+spark.sql.catalog..s3.region = us-east-1
+spark.sql.catalog..s3.endpoint = https://... (non-AWS only)
+spark.sql.catalog..s3.path-style-access = true (path-style endpoints only)
+```
+
+If you hit `region is missing. Please find it by S3::detect_region() or set them in env`, this
+is the missing config.
+
+## Writing a bridge
+
+Comet's native scan paths (`object_store` for raw Parquet, `opendal` via `iceberg-rust` for
+Iceberg) bypass Hadoop S3A entirely. The standard `AWSCredentialsProvider.getCredentials()` has no
+path argument, so vendors that issue per-path STS credentials cannot expose them through it. The
+`CometS3CredentialProvider` SPI fills that gap.
+
+Implement `org.apache.comet.cloud.s3.CometS3CredentialProvider`:
+
+```java
+package org.apache.comet.cloud.s3;
+
+public interface CometS3CredentialProvider {
+ CometS3Credentials getCredentialsForPath(
+ String bucket, String path, CometS3AccessMode mode) throws Exception;
+}
+```
+
+Register via `META-INF/services/org.apache.comet.cloud.s3.CometS3CredentialProvider` with the
+fully-qualified class name. `getCredentialsForPath` may be invoked concurrently from many native
+tokio worker threads; the implementation must be thread-safe.
+
+### Returned fields
+
+| Field | Notes |
+| ----------------------- | ------------------------------------------------- |
+| `accessKeyId` | Required. |
+| `secretAccessKey` | Required. |
+| `sessionToken` | `null` for non-STS credentials. |
+| `expirationEpochMillis` | Absolute expiry. `0` means "unknown" (see below). |
+
+Provide a real `expirationEpochMillis` whenever you have one. The Iceberg path uses it to decide
+when `opendal` must re-call the provider for a fresh credential. `0` is treated as unknown and the
+Iceberg path defaults to a 5-minute refresh to bound staleness.
+
+### Returns or throws
+
+The SPI follows the same shape as the other JVM AWS-credential SPIs (AWS SDK v1/v2,
+Hadoop S3A, Iceberg `VendedCredentialsProvider`): return credentials or throw. There is no
+"fall-through" return value. Chaining is a vendor-side concern.
+
+If your provider is authoritative only for some paths, resolve the default AWS chain yourself for
+the rest:
+
+```java
+private final DefaultCredentialsProvider defaultChain = DefaultCredentialsProvider.create();
+
+@Override
+public CometS3Credentials getCredentialsForPath(
+ String bucket, String path, CometS3AccessMode mode) throws Exception {
+ if (handlesPath(bucket, path)) {
+ return mintFromMyVendorService(bucket, path, mode);
+ }
+ AwsCredentials c = defaultChain.resolveCredentials();
+ String token = (c instanceof AwsSessionCredentials)
+ ? ((AwsSessionCredentials) c).sessionToken()
+ : null;
+ return new CometS3Credentials(c.accessKeyId(), c.secretAccessKey(), token, 0L);
+}
+```
+
+### Access mode
+
+| Value | Used for |
+| ------- | -------------------------------------------------------------------------- |
+| `READ` | All native scan paths (raw Parquet, Iceberg). Comet today only sends READ. |
+| `WRITE` | Reserved for future native write paths. |
+
+A `WRITE` credential is not implicitly read-capable. Vendors that need read-during-write
+workflows include the required read permissions in the IAM policy attached to their `WRITE`
+credentials.
+
+### Discovery rules
+
+- Zero impls registered: native readers use the default AWS credential chain.
+- One impl registered: cached and used for every request.
+- Multiple impls registered: `CometS3CredentialDispatcher` throws `IllegalStateException` at
+ class-load. Pick one bridge JAR.
+
+### Build setup
+
+Vendor implementations need the Comet SPI classes at compile time only. Use `provided`-scope:
+
+```xml
+
+ org.apache.datafusion
+ comet-common-spark${spark.version.short}_${scala.binary.version}
+ ${comet.version}
+ provided
+
+```
+
+### Iceberg path: error message fidelity
+
+When the bridge is wired into `iceberg-rust`, the outer `reqsign-core::ProvideCredentialChain`
+currently swallows thrown exceptions into "no credential" before the request reaches opendal. The
+credential is still not issued and the request still fails, only the message is degraded to an
+opaque anonymous-request failure. No Comet change fixes this; it is resolved upstream if
+`iceberg-rust` stops wrapping custom loaders in its outer chain or moves its S3 backend to
+`object_store`.
diff --git a/docs/source/user-guide/latest/index.rst b/docs/source/user-guide/latest/index.rst
index 314a0a51bd..bbacdbe0b8 100644
--- a/docs/source/user-guide/latest/index.rst
+++ b/docs/source/user-guide/latest/index.rst
@@ -49,4 +49,5 @@ to read more.
Tuning Guide
Metrics Guide
Iceberg Guide
+ Cloud Credential Providers
Kubernetes Guide
diff --git a/native/Cargo.lock b/native/Cargo.lock
index df3c3b03c0..7f2317de37 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -674,6 +674,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ec6fb3fe69024a75fa7e1bfb48aa6cf59706a101658ea01bfd33b2b248a038f"
dependencies = [
"aws-lc-sys",
+ "untrusted 0.7.1",
"zeroize",
]
@@ -1994,7 +1995,7 @@ dependencies = [
"object_store",
"object_store_opendal",
"once_cell",
- "opendal 0.56.0",
+ "opendal",
"parking_lot",
"parquet",
"paste",
@@ -2002,6 +2003,7 @@ dependencies = [
"procfs",
"prost",
"rand 0.10.1",
+ "reqsign-core",
"reqwest 0.12.28",
"serde_json",
"tempfile",
@@ -3514,7 +3516,6 @@ dependencies = [
"tokio",
"tokio-rustls",
"tower-service",
- "webpki-roots",
]
[[package]]
@@ -3567,7 +3568,7 @@ dependencies = [
[[package]]
name = "iceberg"
version = "0.9.0"
-source = "git+https://github.com/apache/iceberg-rust?rev=1ad4bfd#1ad4bfd39319508e79960d16dad1b1cdf965c5f4"
+source = "git+https://github.com/apache/iceberg-rust?rev=83b4595#83b4595e2b5f522974d24d51c8ecbd09a093fa92"
dependencies = [
"aes-gcm",
"anyhow",
@@ -3622,7 +3623,7 @@ dependencies = [
[[package]]
name = "iceberg-storage-opendal"
version = "0.9.0"
-source = "git+https://github.com/apache/iceberg-rust?rev=1ad4bfd#1ad4bfd39319508e79960d16dad1b1cdf965c5f4"
+source = "git+https://github.com/apache/iceberg-rust?rev=83b4595#83b4595e2b5f522974d24d51c8ecbd09a093fa92"
dependencies = [
"anyhow",
"async-trait",
@@ -3630,9 +3631,9 @@ dependencies = [
"cfg-if",
"futures",
"iceberg",
- "opendal 0.55.0",
- "reqsign",
- "reqwest 0.12.28",
+ "opendal",
+ "reqsign-aws-v4",
+ "reqsign-core",
"serde",
"typetag",
"url",
@@ -4028,17 +4029,20 @@ dependencies = [
[[package]]
name = "jsonwebtoken"
-version = "9.3.1"
+version = "10.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde"
+checksum = "eba32bfb4ffdeaca3e34431072faf01745c9b26d25504aa7a6cf5684334fc4fc"
dependencies = [
+ "aws-lc-rs",
"base64",
+ "getrandom 0.2.17",
"js-sys",
"pem",
- "ring",
"serde",
"serde_json",
+ "signature",
"simple_asn1",
+ "zeroize",
]
[[package]]
@@ -4569,7 +4573,7 @@ dependencies = [
"futures",
"mea",
"object_store",
- "opendal 0.56.0",
+ "opendal",
"pin-project",
"tokio",
]
@@ -4598,35 +4602,6 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
-[[package]]
-name = "opendal"
-version = "0.55.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d075ab8a203a6ab4bc1bce0a4b9fe486a72bf8b939037f4b78d95386384bc80a"
-dependencies = [
- "anyhow",
- "backon",
- "base64",
- "bytes",
- "crc32c",
- "futures",
- "getrandom 0.2.17",
- "http 1.4.0",
- "http-body 1.0.1",
- "jiff",
- "log",
- "md-5",
- "percent-encoding",
- "quick-xml 0.38.4",
- "reqsign",
- "reqwest 0.12.28",
- "serde",
- "serde_json",
- "tokio",
- "url",
- "uuid",
-]
-
[[package]]
name = "opendal"
version = "0.56.0"
@@ -4639,7 +4614,12 @@ dependencies = [
"opendal-layer-logging",
"opendal-layer-retry",
"opendal-layer-timeout",
+ "opendal-service-azdls",
+ "opendal-service-fs",
+ "opendal-service-gcs",
"opendal-service-hdfs",
+ "opendal-service-oss",
+ "opendal-service-s3",
]
[[package]]
@@ -4713,6 +4693,70 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "opendal-service-azdls"
+version = "0.56.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f9884c2d8cf8ba2bb077d79c877dac5863ba3bab9e2c9c1e41a2e0491404772"
+dependencies = [
+ "bytes",
+ "http 1.4.0",
+ "log",
+ "opendal-core",
+ "opendal-service-azure-common",
+ "quick-xml 0.38.4",
+ "reqsign-azure-storage",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "serde",
+ "serde_json",
+]
+
+[[package]]
+name = "opendal-service-azure-common"
+version = "0.56.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ffb0e45d6c8dcf66ce2da20e241bcb80e6e540e109a4ff20f318f6c9b4c54e0c"
+dependencies = [
+ "http 1.4.0",
+ "opendal-core",
+]
+
+[[package]]
+name = "opendal-service-fs"
+version = "0.56.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cf0be0417abeeb0053376d816b90fceb9ca98f20dfb54ebf1f2a282729f83663"
+dependencies = [
+ "bytes",
+ "log",
+ "opendal-core",
+ "serde",
+ "tokio",
+ "xattr",
+]
+
+[[package]]
+name = "opendal-service-gcs"
+version = "0.56.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "70a49477a10163431896d106136117f5670717f9c9e49cf6f710528800c6633a"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "http 1.4.0",
+ "log",
+ "opendal-core",
+ "percent-encoding",
+ "quick-xml 0.38.4",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "reqsign-google",
+ "serde",
+ "serde_json",
+ "tokio",
+]
+
[[package]]
name = "opendal-service-hdfs"
version = "0.56.0"
@@ -4728,6 +4772,44 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "opendal-service-oss"
+version = "0.56.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "29c8a917829ad06d21b639558532cb0101fe49b040d946d673a73018683fac05"
+dependencies = [
+ "bytes",
+ "http 1.4.0",
+ "log",
+ "opendal-core",
+ "quick-xml 0.38.4",
+ "reqsign-aliyun-oss",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "serde",
+]
+
+[[package]]
+name = "opendal-service-s3"
+version = "0.56.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9dadddeb9bb50b0d30927dd914c298c4ddca47e4c1cfa7674d311f0cf9b051c8"
+dependencies = [
+ "base64",
+ "bytes",
+ "crc32c",
+ "http 1.4.0",
+ "log",
+ "md-5",
+ "opendal-core",
+ "quick-xml 0.38.4",
+ "reqsign-aws-v4",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "serde",
+ "url",
+]
+
[[package]]
name = "openssl-probe"
version = "0.2.1"
@@ -5270,16 +5352,6 @@ dependencies = [
"memchr",
]
-[[package]]
-name = "quick-xml"
-version = "0.37.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb"
-dependencies = [
- "memchr",
- "serde",
-]
-
[[package]]
name = "quick-xml"
version = "0.38.4"
@@ -5383,7 +5455,6 @@ version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a"
dependencies = [
- "libc",
"rand_chacha 0.3.1",
"rand_core 0.6.4",
]
@@ -5538,35 +5609,63 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
[[package]]
-name = "reqsign"
-version = "0.16.5"
+name = "reqsign-aliyun-oss"
+version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "43451dbf3590a7590684c25fb8d12ecdcc90ed3ac123433e500447c7d77ed701"
+checksum = "57ac2757f3140aa2e213b554148ae0b52733e624fc6723f0cc6bb3d440176c95"
+dependencies = [
+ "anyhow",
+ "form_urlencoded",
+ "http 1.4.0",
+ "log",
+ "percent-encoding",
+ "reqsign-core",
+ "rust-ini",
+ "serde",
+ "serde_json",
+]
+
+[[package]]
+name = "reqsign-aws-v4"
+version = "3.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44eaca382e94505a49f1a4849658d153aebf79d9c1a58e5dd3b10361511e9f43"
+dependencies = [
+ "anyhow",
+ "bytes",
+ "form_urlencoded",
+ "http 1.4.0",
+ "log",
+ "percent-encoding",
+ "quick-xml 0.39.2",
+ "reqsign-core",
+ "rust-ini",
+ "serde",
+ "serde_json",
+ "serde_urlencoded",
+ "sha1",
+]
+
+[[package]]
+name = "reqsign-azure-storage"
+version = "3.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a321980405d596bd34aaf95c4722a3de4128a67fd19e74a81a83aa3fdf082e6"
dependencies = [
"anyhow",
- "async-trait",
"base64",
- "chrono",
+ "bytes",
"form_urlencoded",
- "getrandom 0.2.17",
- "hex",
- "hmac 0.12.1",
- "home",
"http 1.4.0",
"jsonwebtoken",
"log",
- "once_cell",
+ "pem",
"percent-encoding",
- "quick-xml 0.37.5",
- "rand 0.8.6",
- "reqwest 0.12.28",
+ "reqsign-core",
"rsa",
- "rust-ini",
"serde",
"serde_json",
"sha1",
- "sha2 0.10.9",
- "tokio",
]
[[package]]
@@ -5591,6 +5690,37 @@ dependencies = [
"windows-sys 0.61.2",
]
+[[package]]
+name = "reqsign-file-read-tokio"
+version = "3.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e2d89295b3d17abea31851cc8de55d843d89c52132c864963c38d41920613dc5"
+dependencies = [
+ "anyhow",
+ "reqsign-core",
+ "tokio",
+]
+
+[[package]]
+name = "reqsign-google"
+version = "3.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "35cc609b49c69e76ecaceb775a03f792d1ed3e7755ab3548d4534fd801e3242e"
+dependencies = [
+ "form_urlencoded",
+ "http 1.4.0",
+ "jsonwebtoken",
+ "log",
+ "percent-encoding",
+ "reqsign-aws-v4",
+ "reqsign-core",
+ "rsa",
+ "serde",
+ "serde_json",
+ "sha2 0.10.9",
+ "tokio",
+]
+
[[package]]
name = "reqwest"
version = "0.12.28"
@@ -5631,7 +5761,6 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams 0.4.2",
"web-sys",
- "webpki-roots",
]
[[package]]
@@ -5691,7 +5820,7 @@ dependencies = [
"cfg-if",
"getrandom 0.2.17",
"libc",
- "untrusted",
+ "untrusted 0.9.0",
"windows-sys 0.52.0",
]
@@ -5862,7 +5991,7 @@ dependencies = [
"aws-lc-rs",
"ring",
"rustls-pki-types",
- "untrusted",
+ "untrusted 0.9.0",
]
[[package]]
@@ -6857,6 +6986,12 @@ version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
+[[package]]
+name = "untrusted"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
+
[[package]]
name = "untrusted"
version = "0.9.0"
@@ -7110,15 +7245,6 @@ dependencies = [
"rustls-pki-types",
]
-[[package]]
-name = "webpki-roots"
-version = "1.0.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "52f5ee44c96cf55f1b349600768e3ece3a8f26010c05265ab73f945bb1a2eb9d"
-dependencies = [
- "rustls-pki-types",
-]
-
[[package]]
name = "which"
version = "4.4.2"
@@ -7546,6 +7672,16 @@ version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4"
+[[package]]
+name = "xattr"
+version = "1.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156"
+dependencies = [
+ "libc",
+ "rustix 1.1.4",
+]
+
[[package]]
name = "xmlparser"
version = "0.13.6"
@@ -7621,6 +7757,20 @@ name = "zeroize"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0"
+dependencies = [
+ "zeroize_derive",
+]
+
+[[package]]
+name = "zeroize_derive"
+version = "1.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.117",
+]
[[package]]
name = "zerotrie"
diff --git a/native/Cargo.toml b/native/Cargo.toml
index d1b5c74af9..c6e07f25e2 100644
--- a/native/Cargo.toml
+++ b/native/Cargo.toml
@@ -58,8 +58,9 @@ object_store = { version = "0.13.1", features = ["gcp", "azure", "aws", "http"]
url = "2.2"
aws-config = "1.8.16"
aws-credential-types = "1.2.13"
-iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd" }
-iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd", features = ["opendal-all"] }
+iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "83b4595" }
+iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "83b4595", features = ["opendal-all"] }
+reqsign-core = "3"
[profile.release]
debug = true
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 4fb3ed4c5d..1c8c49d09e 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -75,6 +75,7 @@ hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]}
opendal = { version = "0.56.0", optional = true, features = ["services-hdfs"] }
iceberg = { workspace = true }
iceberg-storage-opendal = { workspace = true }
+reqsign-core = { workspace = true }
serde_json = "1.0"
uuid = "1.23.0"
diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs
index 55bcbef349..217a28a699 100644
--- a/native/core/src/execution/operators/iceberg_scan.rs
+++ b/native/core/src/execution/operators/iceberg_scan.rs
@@ -40,9 +40,13 @@ use datafusion::physical_plan::{
use futures::{Stream, StreamExt, TryStreamExt};
use iceberg::arrow::ScanMetrics;
use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
+use iceberg_storage_opendal::CustomAwsCredentialLoader;
use iceberg_storage_opendal::OpenDalStorageFactory;
use crate::execution::operators::ExecutionError;
+use crate::parquet::objectstore::comet_s3_credential_bridge::{
+ AccessMode, CometS3CredentialBridge,
+};
use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
use datafusion_comet_spark_expr::EvalMode;
@@ -207,9 +211,12 @@ impl IcebergScanExec {
};
match scheme {
"file" => Ok(Arc::new(OpenDalStorageFactory::Fs)),
- "s3" | "s3a" => Ok(Arc::new(OpenDalStorageFactory::S3 {
- customized_credential_load: None,
- })),
+ "s3" | "s3a" => {
+ let customized_credential_load = build_s3_credential_loader(path);
+ Ok(Arc::new(OpenDalStorageFactory::S3 {
+ customized_credential_load,
+ }))
+ }
"gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)),
"oss" => Ok(Arc::new(OpenDalStorageFactory::Oss)),
_ => Err(DataFusionError::Execution(format!(
@@ -233,6 +240,14 @@ impl IcebergScanExec {
}
}
+/// Wires the registered Comet credential provider into opendal's S3 service for this scan, or
+/// returns `None` so opendal falls back to its default credential chain.
+fn build_s3_credential_loader(metadata_location: &str) -> Option {
+ let url = url::Url::parse(metadata_location).ok()?;
+ let bridge = CometS3CredentialBridge::for_url(&url, AccessMode::Read)?;
+ Some(CustomAwsCredentialLoader::new(bridge))
+}
+
/// Metrics for IcebergScanExec
struct IcebergScanMetrics {
/// Baseline metrics (output rows, elapsed compute time)
@@ -397,7 +412,7 @@ fn adapt_batch_with_expressions(
return Ok(batch);
}
- // Zero-column projection (e.g. SELECT count(*)) — preserve row count
+ // Zero-column projection (e.g. SELECT count(*)), preserve row count
if projection_exprs.is_empty() {
return Ok(RecordBatch::try_new_with_options(
Arc::clone(target_schema),
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 5de14aa610..06b7190fb0 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -29,7 +29,7 @@ pub mod read;
pub mod schema_adapter;
mod cast_column;
-mod objectstore;
+pub(crate) mod objectstore;
use std::collections::HashMap;
use std::task::Poll;
diff --git a/native/core/src/parquet/objectstore/comet_s3_credential_bridge.rs b/native/core/src/parquet/objectstore/comet_s3_credential_bridge.rs
new file mode 100644
index 0000000000..f9f7dcdf1b
--- /dev/null
+++ b/native/core/src/parquet/objectstore/comet_s3_credential_bridge.rs
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JNI bridge to the `CometS3CredentialDispatcher` SPI, exposed as
+//! `object_store::CredentialProvider` and `reqsign_core::ProvideCredential` for the raw Parquet
+//! and Iceberg scan paths respectively.
+//!
+//! ```text
+//! JVM Native (Rust)
+//! --- -------------
+//!
+//! ServiceLoader s3.rs (object_store)
+//! (one-time, at class-load) iceberg_scan.rs (opendal)
+//! | |
+//! v v
+//! CometS3CredentialDispatcher CometS3CredentialBridge
+//! (static singleton) impl object_store::CredentialProvider
+//! | ^ impl reqsign_core::ProvideCredential
+//! | | |
+//! | +<---- JNI call -----------------+
+//! | getCredentialsForPath(bucket, path, mode ordinal)
+//! v
+//! vendor CometS3CredentialProvider
+//! |
+//! v
+//! CometS3Credentials POJO
+//! |
+//! +------- JNI field reads ---------------->+
+//! |
+//! v
+//! AwsCredential / IcebergAwsCredential
+//! (used to sign S3 requests)
+//! ```
+
+use crate::execution::operators::ExecutionError;
+use crate::jvm_bridge::{jni_static_call, JVMClasses};
+use crate::JAVA_VM;
+use async_trait::async_trait;
+use iceberg_storage_opendal::AwsCredential as IcebergAwsCredential;
+use jni::objects::{JFieldID, JObject, JString};
+use jni::signature::{Primitive, ReturnType};
+use jni::sys::{jboolean, jint};
+use log::{debug, warn};
+use object_store::aws::AwsCredential;
+use object_store::CredentialProvider;
+use once_cell::sync::OnceCell;
+use reqsign_core::time::Timestamp;
+use reqsign_core::{
+ Context, Error as ReqsignError, ErrorKind as ReqsignErrorKind,
+ ProvideCredential as IcebergProvideCredential,
+};
+use std::sync::Arc;
+use std::time::Duration;
+use url::Url;
+
+/// Cap on opendal's credential cache when the provider does not report an expiry. Prevents the
+/// executor from holding a stale credential for the entire job lifetime.
+const DEFAULT_EXPIRY_WHEN_UNKNOWN: Duration = Duration::from_secs(300);
+
+static PROVIDER_REGISTERED: OnceCell = OnceCell::new();
+/// Once-per-process latch for the "missing expiry" warning; bridges are per-scan so a per-bridge
+/// latch would log once per scan on the same misbehaving provider.
+static WARNED_MISSING_EXPIRY: OnceCell<()> = OnceCell::new();
+
+/// Access intent forwarded to the Java SPI. Ordinal must match the JVM `CometS3AccessMode` enum.
+#[derive(Debug, Clone, Copy)]
+pub enum AccessMode {
+ Read = 0,
+ /// No native write path yet; kept so the SPI contract is complete.
+ #[allow(dead_code)]
+ Write = 1,
+}
+
+/// True iff a `CometS3CredentialProvider` was discovered on the JVM classpath.
+pub fn is_provider_registered() -> bool {
+ *PROVIDER_REGISTERED.get_or_init(|| {
+ // Unit tests construct stores without a JVM. Production init always precedes any store
+ // construction, so the None branch only fires in tests.
+ if JAVA_VM.get().is_none() {
+ return false;
+ }
+ JVMClasses::with_env(|env| -> Result {
+ let registered: jboolean = unsafe {
+ jni_static_call!(env,
+ comet_s3_credential_dispatcher.is_provider_registered() -> jboolean
+ )?
+ };
+ Ok(registered)
+ })
+ .unwrap_or_else(|e| {
+ debug!(
+ "CometS3CredentialDispatcher.isProviderRegistered failed; native S3 readers \
+ will use the default AWS credential chain: {e}"
+ );
+ false
+ })
+ })
+}
+
+/// Per-request credential provider that delegates to the Java SPI via JNI. Constructed once per
+/// S3 store or FileIO and forwards the same `(bucket, path, mode)` tuple on every fetch.
+#[derive(Debug)]
+pub struct CometS3CredentialBridge {
+ bucket: String,
+ path: String,
+ mode: AccessMode,
+}
+
+impl CometS3CredentialBridge {
+ pub fn new(bucket: impl Into, path: impl Into, mode: AccessMode) -> Self {
+ Self {
+ bucket: bucket.into(),
+ path: path.into(),
+ mode,
+ }
+ }
+
+ /// Shared constructor for the s3.rs and iceberg_scan.rs call sites. Returns `None` when no
+ /// provider is registered so callers can fall through to their default credential path.
+ pub fn for_url(url: &Url, mode: AccessMode) -> Option {
+ if !is_provider_registered() {
+ return None;
+ }
+ let bucket = url.host_str()?.to_string();
+ debug!("Routing S3 credentials for bucket {bucket} through CometS3CredentialProvider");
+ Some(Self::new(bucket, url.path(), mode))
+ }
+
+ fn fetch_raw(&self) -> Result {
+ JVMClasses::with_env(|env| -> Result {
+ let bucket = env
+ .new_string(&self.bucket)
+ .map_err(|e| ExecutionError::GeneralError(format!("new_string(bucket): {e}")))?;
+ let path = env
+ .new_string(&self.path)
+ .map_err(|e| ExecutionError::GeneralError(format!("new_string(path): {e}")))?;
+ let mode = self.mode as jint;
+
+ let creds_obj: JObject = unsafe {
+ jni_static_call!(env,
+ comet_s3_credential_dispatcher.get_credentials_for_path(
+ &bucket, &path, mode
+ ) -> JObject
+ )?
+ };
+ if creds_obj.is_null() {
+ return Err(ExecutionError::GeneralError(
+ "getCredentialsForPath returned null (contract violation)".to_string(),
+ ));
+ }
+
+ let d = &JVMClasses::get().comet_s3_credential_dispatcher;
+ Ok(RawCredentials {
+ access_key_id: read_required_string(
+ env,
+ &creds_obj,
+ d.field_access_key_id,
+ "accessKeyId",
+ )?,
+ secret_access_key: read_required_string(
+ env,
+ &creds_obj,
+ d.field_secret_access_key,
+ "secretAccessKey",
+ )?,
+ session_token: read_optional_string(env, &creds_obj, d.field_session_token)?,
+ expiration_epoch_millis: unsafe {
+ env.get_field_unchecked(
+ &creds_obj,
+ d.field_expiration_epoch_millis,
+ ReturnType::Primitive(Primitive::Long),
+ )
+ }
+ .and_then(|v| v.j())
+ .map_err(|e| {
+ ExecutionError::GeneralError(format!("read expirationEpochMillis: {e}"))
+ })?,
+ })
+ })
+ }
+}
+
+struct RawCredentials {
+ access_key_id: String,
+ secret_access_key: String,
+ session_token: Option,
+ /// Absolute expiry. `0` means the provider did not report one.
+ expiration_epoch_millis: i64,
+}
+
+#[async_trait]
+impl CredentialProvider for CometS3CredentialBridge {
+ type Credential = AwsCredential;
+
+ async fn get_credential(&self) -> object_store::Result> {
+ let raw = self.fetch_raw().map_err(|e| object_store::Error::Generic {
+ store: "S3",
+ source: e.to_string().into(),
+ })?;
+ Ok(Arc::new(AwsCredential {
+ key_id: raw.access_key_id,
+ secret_key: raw.secret_access_key,
+ token: raw.session_token,
+ }))
+ }
+}
+
+impl IcebergProvideCredential for CometS3CredentialBridge {
+ type Credential = IcebergAwsCredential;
+
+ async fn provide_credential(
+ &self,
+ _ctx: &Context,
+ ) -> reqsign_core::Result