diff --git a/docs/source/user-guide/latest/datasources.md b/docs/source/user-guide/latest/datasources.md index 572beb1e02..6588d53d78 100644 --- a/docs/source/user-guide/latest/datasources.md +++ b/docs/source/user-guide/latest/datasources.md @@ -205,7 +205,7 @@ AWS credential providers can be configured using the `fs.s3a.aws.credentials.pro | `com.amazonaws.auth.InstanceProfileCredentialsProvider`
`software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider` | Access S3 using EC2 instance metadata service (IMDS) | None | | `com.amazonaws.auth.ContainerCredentialsProvider`
`software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider`
`com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper` | Access S3 using ECS task credentials | None | | `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`
`software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider` | Authenticate using web identity token file | None | -| `com.amazonaws.auth.profile.ProfileCredentialsProvider`
`software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider` | Authenticate using a named profile from the local AWS credentials file | None | +| `com.amazonaws.auth.profile.ProfileCredentialsProvider`
`software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider` | Authenticate using a named profile from the local AWS credentials file | `fs.s3a.auth.profile.name` (optional), `fs.s3a.auth.profile.file` (optional) | Multiple credential providers can be specified in a comma-separated list using the `fs.s3a.aws.credentials.provider` configuration, just as Hadoop AWS supports. If `fs.s3a.aws.credentials.provider` is not configured, Hadoop S3A's default credential provider chain will be used. All configuration options also support bucket-specific overrides using the pattern `fs.s3a.bucket.{bucket-name}.{option}`. diff --git a/native/core/src/parquet/objectstore/s3.rs b/native/core/src/parquet/objectstore/s3.rs index a427ad8ad5..38d900ba2d 100644 --- a/native/core/src/parquet/objectstore/s3.rs +++ b/native/core/src/parquet/objectstore/s3.rs @@ -22,6 +22,8 @@ use url::Url; use crate::execution::jni_api::get_runtime; use async_trait::async_trait; +#[allow(deprecated)] +use aws_config::profile::profile_file::{ProfileFileKind, ProfileFiles}; use aws_config::{ ecs::EcsCredentialsProvider, environment::EnvironmentVariableCredentialsProvider, imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain, @@ -442,7 +444,9 @@ fn build_aws_credential_provider_metadata( } HADOOP_ASSUMED_ROLE => build_assume_role_credential_provider_metadata(configs, bucket), AWS_WEB_IDENTITY_V1 | AWS_WEB_IDENTITY => Ok(CredentialProviderMetadata::WebIdentity), - AWS_PROFILE_V1 | AWS_PROFILE => Ok(CredentialProviderMetadata::Profile), + AWS_PROFILE_V1 | AWS_PROFILE => { + Ok(build_profile_credential_provider_metadata(configs, bucket)) + } _ => Err(object_store::Error::Generic { store: "S3", source: format!("Unsupported credential provider: {credential_provider_name}").into(), @@ -521,6 +525,24 @@ fn build_assume_role_credential_provider_metadata( }) } +/// Build the [CredentialProviderMetadata::Profile] variant from S3A configs. +/// +/// Honors `fs.s3a.auth.profile.name` and `fs.s3a.auth.profile.file` (and their bucket-scoped +/// overrides). Both options match the names used by Hadoop's +/// `org.apache.hadoop.fs.s3a.auth.ProfileAWSCredentialsProvider` (introduced in Hadoop 3.4.2), +/// but we resolve them on the native side so they are not tied to a particular Hadoop version. +fn build_profile_credential_provider_metadata( + configs: &HashMap, + bucket: &str, +) -> CredentialProviderMetadata { + let profile_name = get_config_trimmed(configs, bucket, "auth.profile.name").map(str::to_string); + let profile_file = get_config_trimmed(configs, bucket, "auth.profile.file").map(str::to_string); + CredentialProviderMetadata::Profile { + profile_name, + profile_file, + } +} + /// A caching wrapper around AWS credential providers that implements the object_store `CredentialProvider` trait. /// /// This struct bridges AWS SDK credential providers (`ProvideCredentials`) with the object_store @@ -673,7 +695,10 @@ enum CredentialProviderMetadata { Imds, Environment, WebIdentity, - Profile, + Profile { + profile_name: Option, + profile_file: Option, + }, Static { is_valid: bool, access_key: String, @@ -696,7 +721,7 @@ impl CredentialProviderMetadata { CredentialProviderMetadata::Imds => "Imds", CredentialProviderMetadata::Environment => "Environment", CredentialProviderMetadata::WebIdentity => "WebIdentity", - CredentialProviderMetadata::Profile => "Profile", + CredentialProviderMetadata::Profile { .. } => "Profile", CredentialProviderMetadata::Static { .. } => "Static", CredentialProviderMetadata::AssumeRole { .. } => "AssumeRole", CredentialProviderMetadata::Chain(..) => "Chain", @@ -712,7 +737,14 @@ impl CredentialProviderMetadata { CredentialProviderMetadata::Imds => "Imds".to_string(), CredentialProviderMetadata::Environment => "Environment".to_string(), CredentialProviderMetadata::WebIdentity => "WebIdentity".to_string(), - CredentialProviderMetadata::Profile => "Profile".to_string(), + CredentialProviderMetadata::Profile { + profile_name, + profile_file, + } => { + let name = profile_name.as_deref().unwrap_or(""); + let file = profile_file.as_deref().unwrap_or(""); + format!("Profile(name: {name}, file: {file})") + } CredentialProviderMetadata::Static { is_valid, .. } => { format!("Static(valid: {is_valid})") } @@ -775,11 +807,23 @@ impl CredentialProviderMetadata { .build(); Ok(Arc::new(credential_provider)) } - CredentialProviderMetadata::Profile => { - let credential_provider = ProfileFileCredentialsProvider::builder() - .configure(&ProviderConfig::with_default_region().await) - .build(); - Ok(Arc::new(credential_provider)) + CredentialProviderMetadata::Profile { + profile_name, + profile_file, + } => { + let mut builder = ProfileFileCredentialsProvider::builder() + .configure(&ProviderConfig::with_default_region().await); + if let Some(name) = profile_name { + builder = builder.profile_name(name.clone()); + } + if let Some(file) = profile_file { + #[allow(deprecated)] + let profile_files = ProfileFiles::builder() + .with_file(ProfileFileKind::Credentials, file.clone()) + .build(); + builder = builder.profile_files(profile_files); + } + Ok(Arc::new(builder.build())) } CredentialProviderMetadata::Static { is_valid, @@ -927,6 +971,34 @@ mod tests { self } + fn with_profile_name(mut self, name: &str) -> Self { + self.configs + .insert("fs.s3a.auth.profile.name".to_string(), name.to_string()); + self + } + + fn with_profile_file(mut self, file: &str) -> Self { + self.configs + .insert("fs.s3a.auth.profile.file".to_string(), file.to_string()); + self + } + + fn with_bucket_profile_name(mut self, bucket: &str, name: &str) -> Self { + self.configs.insert( + format!("fs.s3a.bucket.{bucket}.auth.profile.name"), + name.to_string(), + ); + self + } + + fn with_bucket_profile_file(mut self, bucket: &str, file: &str) -> Self { + self.configs.insert( + format!("fs.s3a.bucket.{bucket}.auth.profile.file"), + file.to_string(), + ); + self + } + fn with_assume_role_credentials_provider(mut self, provider: &str) -> Self { self.configs.insert( "fs.s3a.assumed.role.credentials.provider".to_string(), @@ -1475,10 +1547,64 @@ mod tests { assert!(result.is_some(), "Should return a credential provider"); let test_provider = result.unwrap().metadata(); - assert_eq!(test_provider, CredentialProviderMetadata::Profile); + assert_eq!( + test_provider, + CredentialProviderMetadata::Profile { + profile_name: None, + profile_file: None, + } + ); } } + #[tokio::test] + #[cfg_attr(miri, ignore)] // AWS credential providers call foreign functions + async fn test_profile_credential_provider_with_name_and_file() { + let configs = TestConfigBuilder::new() + .with_credential_provider(AWS_PROFILE) + .with_profile_name("my-profile") + .with_profile_file("/tmp/my-credentials") + .build(); + + let result = build_credential_provider(&configs, "test-bucket", Duration::from_secs(300)) + .await + .unwrap(); + assert!(result.is_some(), "Should return a credential provider"); + + let test_provider = result.unwrap().metadata(); + assert_eq!( + test_provider, + CredentialProviderMetadata::Profile { + profile_name: Some("my-profile".to_string()), + profile_file: Some("/tmp/my-credentials".to_string()), + } + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] // AWS credential providers call foreign functions + async fn test_profile_credential_provider_bucket_overrides() { + let configs = TestConfigBuilder::new() + .with_credential_provider(AWS_PROFILE) + .with_profile_name("global-profile") + .with_bucket_profile_name("test-bucket", "bucket-profile") + .with_bucket_profile_file("test-bucket", "/tmp/bucket-credentials") + .build(); + + let result = build_credential_provider(&configs, "test-bucket", Duration::from_secs(300)) + .await + .unwrap(); + assert!(result.is_some(), "Should return a credential provider"); + + assert_eq!( + result.unwrap().metadata(), + CredentialProviderMetadata::Profile { + profile_name: Some("bucket-profile".to_string()), + profile_file: Some("/tmp/bucket-credentials".to_string()), + } + ); + } + #[tokio::test] #[cfg_attr(miri, ignore)] // AWS credential providers call foreign functions async fn test_hadoop_iam_instance_credential_provider() {