Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/datasources.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ AWS credential providers can be configured using the `fs.s3a.aws.credentials.pro
| `com.amazonaws.auth.InstanceProfileCredentialsProvider`<br/>`software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider` | Access S3 using EC2 instance metadata service (IMDS) | None |
| `com.amazonaws.auth.ContainerCredentialsProvider`<br/>`software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider`<br/>`com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper` | Access S3 using ECS task credentials | None |
| `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`<br/>`software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider` | Authenticate using web identity token file | None |
| `com.amazonaws.auth.profile.ProfileCredentialsProvider`<br/>`software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider` | Authenticate using a named profile from the local AWS credentials file | None |
| `com.amazonaws.auth.profile.ProfileCredentialsProvider`<br/>`software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider` | Authenticate using a named profile from the local AWS credentials file | `fs.s3a.auth.profile.name` (optional), `fs.s3a.auth.profile.file` (optional) |

Multiple credential providers can be specified in a comma-separated list using the `fs.s3a.aws.credentials.provider` configuration, just as Hadoop AWS supports. If `fs.s3a.aws.credentials.provider` is not configured, Hadoop S3A's default credential provider chain will be used. All configuration options also support bucket-specific overrides using the pattern `fs.s3a.bucket.{bucket-name}.{option}`.

Expand Down
146 changes: 136 additions & 10 deletions native/core/src/parquet/objectstore/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use url::Url;

use crate::execution::jni_api::get_runtime;
use async_trait::async_trait;
#[allow(deprecated)]
use aws_config::profile::profile_file::{ProfileFileKind, ProfileFiles};
use aws_config::{
ecs::EcsCredentialsProvider, environment::EnvironmentVariableCredentialsProvider,
imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
Expand Down Expand Up @@ -442,7 +444,9 @@ fn build_aws_credential_provider_metadata(
}
HADOOP_ASSUMED_ROLE => build_assume_role_credential_provider_metadata(configs, bucket),
AWS_WEB_IDENTITY_V1 | AWS_WEB_IDENTITY => Ok(CredentialProviderMetadata::WebIdentity),
AWS_PROFILE_V1 | AWS_PROFILE => Ok(CredentialProviderMetadata::Profile),
AWS_PROFILE_V1 | AWS_PROFILE => {
Ok(build_profile_credential_provider_metadata(configs, bucket))
}
_ => Err(object_store::Error::Generic {
store: "S3",
source: format!("Unsupported credential provider: {credential_provider_name}").into(),
Expand Down Expand Up @@ -521,6 +525,24 @@ fn build_assume_role_credential_provider_metadata(
})
}

/// Build the [CredentialProviderMetadata::Profile] variant from S3A configs.
///
/// Honors `fs.s3a.auth.profile.name` and `fs.s3a.auth.profile.file` (and their bucket-scoped
/// overrides). Both options match the names used by Hadoop's
/// `org.apache.hadoop.fs.s3a.auth.ProfileAWSCredentialsProvider` (introduced in Hadoop 3.4.2),
/// but we resolve them on the native side so they are not tied to a particular Hadoop version.
fn build_profile_credential_provider_metadata(
configs: &HashMap<String, String>,
bucket: &str,
) -> CredentialProviderMetadata {
let profile_name = get_config_trimmed(configs, bucket, "auth.profile.name").map(str::to_string);
let profile_file = get_config_trimmed(configs, bucket, "auth.profile.file").map(str::to_string);
CredentialProviderMetadata::Profile {
profile_name,
profile_file,
}
}

/// A caching wrapper around AWS credential providers that implements the object_store `CredentialProvider` trait.
///
/// This struct bridges AWS SDK credential providers (`ProvideCredentials`) with the object_store
Expand Down Expand Up @@ -673,7 +695,10 @@ enum CredentialProviderMetadata {
Imds,
Environment,
WebIdentity,
Profile,
Profile {
profile_name: Option<String>,
profile_file: Option<String>,
},
Static {
is_valid: bool,
access_key: String,
Expand All @@ -696,7 +721,7 @@ impl CredentialProviderMetadata {
CredentialProviderMetadata::Imds => "Imds",
CredentialProviderMetadata::Environment => "Environment",
CredentialProviderMetadata::WebIdentity => "WebIdentity",
CredentialProviderMetadata::Profile => "Profile",
CredentialProviderMetadata::Profile { .. } => "Profile",
CredentialProviderMetadata::Static { .. } => "Static",
CredentialProviderMetadata::AssumeRole { .. } => "AssumeRole",
CredentialProviderMetadata::Chain(..) => "Chain",
Expand All @@ -712,7 +737,14 @@ impl CredentialProviderMetadata {
CredentialProviderMetadata::Imds => "Imds".to_string(),
CredentialProviderMetadata::Environment => "Environment".to_string(),
CredentialProviderMetadata::WebIdentity => "WebIdentity".to_string(),
CredentialProviderMetadata::Profile => "Profile".to_string(),
CredentialProviderMetadata::Profile {
profile_name,
profile_file,
} => {
let name = profile_name.as_deref().unwrap_or("<default>");
let file = profile_file.as_deref().unwrap_or("<default>");
format!("Profile(name: {name}, file: {file})")
}
CredentialProviderMetadata::Static { is_valid, .. } => {
format!("Static(valid: {is_valid})")
}
Expand Down Expand Up @@ -775,11 +807,23 @@ impl CredentialProviderMetadata {
.build();
Ok(Arc::new(credential_provider))
}
CredentialProviderMetadata::Profile => {
let credential_provider = ProfileFileCredentialsProvider::builder()
.configure(&ProviderConfig::with_default_region().await)
.build();
Ok(Arc::new(credential_provider))
CredentialProviderMetadata::Profile {
profile_name,
profile_file,
} => {
let mut builder = ProfileFileCredentialsProvider::builder()
.configure(&ProviderConfig::with_default_region().await);
if let Some(name) = profile_name {
builder = builder.profile_name(name.clone());
}
if let Some(file) = profile_file {
#[allow(deprecated)]
let profile_files = ProfileFiles::builder()
.with_file(ProfileFileKind::Credentials, file.clone())
.build();
builder = builder.profile_files(profile_files);
}
Ok(Arc::new(builder.build()))
}
CredentialProviderMetadata::Static {
is_valid,
Expand Down Expand Up @@ -927,6 +971,34 @@ mod tests {
self
}

fn with_profile_name(mut self, name: &str) -> Self {
self.configs
.insert("fs.s3a.auth.profile.name".to_string(), name.to_string());
self
}

fn with_profile_file(mut self, file: &str) -> Self {
self.configs
.insert("fs.s3a.auth.profile.file".to_string(), file.to_string());
self
}

fn with_bucket_profile_name(mut self, bucket: &str, name: &str) -> Self {
self.configs.insert(
format!("fs.s3a.bucket.{bucket}.auth.profile.name"),
name.to_string(),
);
self
}

fn with_bucket_profile_file(mut self, bucket: &str, file: &str) -> Self {
self.configs.insert(
format!("fs.s3a.bucket.{bucket}.auth.profile.file"),
file.to_string(),
);
self
}

fn with_assume_role_credentials_provider(mut self, provider: &str) -> Self {
self.configs.insert(
"fs.s3a.assumed.role.credentials.provider".to_string(),
Expand Down Expand Up @@ -1475,10 +1547,64 @@ mod tests {
assert!(result.is_some(), "Should return a credential provider");

let test_provider = result.unwrap().metadata();
assert_eq!(test_provider, CredentialProviderMetadata::Profile);
assert_eq!(
test_provider,
CredentialProviderMetadata::Profile {
profile_name: None,
profile_file: None,
}
);
}
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // AWS credential providers call foreign functions
async fn test_profile_credential_provider_with_name_and_file() {
let configs = TestConfigBuilder::new()
.with_credential_provider(AWS_PROFILE)
.with_profile_name("my-profile")
.with_profile_file("/tmp/my-credentials")
.build();

let result = build_credential_provider(&configs, "test-bucket", Duration::from_secs(300))
.await
.unwrap();
assert!(result.is_some(), "Should return a credential provider");

let test_provider = result.unwrap().metadata();
assert_eq!(
test_provider,
CredentialProviderMetadata::Profile {
profile_name: Some("my-profile".to_string()),
profile_file: Some("/tmp/my-credentials".to_string()),
}
);
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // AWS credential providers call foreign functions
async fn test_profile_credential_provider_bucket_overrides() {
let configs = TestConfigBuilder::new()
.with_credential_provider(AWS_PROFILE)
.with_profile_name("global-profile")
.with_bucket_profile_name("test-bucket", "bucket-profile")
.with_bucket_profile_file("test-bucket", "/tmp/bucket-credentials")
.build();

let result = build_credential_provider(&configs, "test-bucket", Duration::from_secs(300))
.await
.unwrap();
assert!(result.is_some(), "Should return a credential provider");

assert_eq!(
result.unwrap().metadata(),
CredentialProviderMetadata::Profile {
profile_name: Some("bucket-profile".to_string()),
profile_file: Some("/tmp/bucket-credentials".to_string()),
}
);
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // AWS credential providers call foreign functions
async fn test_hadoop_iam_instance_credential_provider() {
Expand Down
Loading