From 9047fa3d9c8d9bbfc8b20279cb38bb9683f9b68b Mon Sep 17 00:00:00 2001 From: Egor Riashin Date: Wed, 9 Jun 2021 13:32:35 +0300 Subject: [PATCH] S3 ingestion can assume role (#10995) * feature s3 assume role * feature s3 assume role * feature s3 assume role * feature s3 assume role * feature s3 assume role * feature s3 assume role * tests fix * spelling fix * sts fix Co-authored-by: egor-ryashin --- docs/ingestion/native-batch.md | 43 ++++++++++ extensions-core/s3-extensions/pom.xml | 4 +- .../druid/data/input/s3/S3InputSource.java | 84 +++++++++++++++++-- .../data/input/s3/S3InputSourceConfig.java | 40 +++++++-- .../data/input/s3/S3InputSourceTest.java | 26 +++++- website/.spelling | 3 + 6 files changed, 179 insertions(+), 21 deletions(-) diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 7039bfb2333..d4205da9409 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -894,6 +894,47 @@ Sample specs: ... ``` +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "s3", + "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"], + "properties": { + "accessKeyId": "KLJ78979SDFdS2", + "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd" + } + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "s3", + "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"], + "properties": { + "accessKeyId": "KLJ78979SDFdS2", + "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd", + "assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3" + } + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `s3`.|None|yes| @@ -917,6 +958,8 @@ Properties Object: |--------|-----------|-------|---------| |accessKeyId|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's access key|None|yes if secretAccessKey is given| |secretAccessKey|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's secret key|None|yes if accessKeyId is given| +|assumeRoleArn|AWS ARN of the role to assume [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html). **assumeRoleArn** can be used either with the ingestion spec AWS credentials or with the default S3 credentials|None|no| +|assumeRoleExternalId|A unique identifier that might be required when you assume a role in another account [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html)|None|no| **Note :** *If accessKeyId and secretAccessKey are not given, the default [S3 credentials provider chain](../development/extensions-core/s3.md#s3-authentication-methods) is used.* diff --git a/extensions-core/s3-extensions/pom.xml b/extensions-core/s3-extensions/pom.xml index 30c63f672bb..1a2d2fc3983 100644 --- a/extensions-core/s3-extensions/pom.xml +++ b/extensions-core/s3-extensions/pom.xml @@ -115,8 +115,8 @@ com.amazonaws aws-java-sdk-sts - provided - + ${aws.sdk.version} + org.apache.druid diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java index 22c10680f80..24fd99cf520 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java @@ -19,12 +19,17 @@ package org.apache.druid.data.input.s3; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.securitytoken.AWSSecurityTokenService; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; @@ -35,6 +40,7 @@ import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.CloudObjectInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.s3.S3InputDataConfig; import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.S3Utils; @@ -47,6 +53,7 @@ import java.net.URI; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -59,6 +66,7 @@ public class S3InputSource extends CloudObjectInputSource @JsonProperty("properties") private final S3InputSourceConfig s3InputSourceConfig; private final S3InputDataConfig inputDataConfig; + private final AWSCredentialsProvider awsCredentialsProvider; /** * Constructor for S3InputSource @@ -84,7 +92,8 @@ public class S3InputSource extends CloudObjectInputSource @JsonProperty("uris") @Nullable List uris, @JsonProperty("prefixes") @Nullable List prefixes, @JsonProperty("objects") @Nullable List objects, - @JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig + @JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig, + @JacksonInject AWSCredentialsProvider awsCredentialsProvider ) { super(S3StorageDruidModule.SCHEME, uris, prefixes, objects); @@ -95,13 +104,19 @@ public class S3InputSource extends CloudObjectInputSource () -> { if (s3ClientBuilder != null && s3InputSourceConfig != null) { if (s3InputSourceConfig.isCredentialsConfigured()) { - AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider( - new BasicAWSCredentials( - s3InputSourceConfig.getAccessKeyId().getPassword(), - s3InputSourceConfig.getSecretAccessKey().getPassword() - ) - ); - s3ClientBuilder.getAmazonS3ClientBuilder().withCredentials(credentials); + if (s3InputSourceConfig.getAssumeRoleArn() == null) { + s3ClientBuilder + .getAmazonS3ClientBuilder() + .withCredentials(createStaticCredentialsProvider(s3InputSourceConfig)); + } else { + applyAssumeRole( + s3ClientBuilder, + s3InputSourceConfig, + createStaticCredentialsProvider(s3InputSourceConfig) + ); + } + } else { + applyAssumeRole(s3ClientBuilder, s3InputSourceConfig, awsCredentialsProvider); } return s3ClientBuilder.build(); } else { @@ -109,6 +124,56 @@ public class S3InputSource extends CloudObjectInputSource } } ); + this.awsCredentialsProvider = awsCredentialsProvider; + } + + @VisibleForTesting + public S3InputSource( + ServerSideEncryptingAmazonS3 s3Client, + ServerSideEncryptingAmazonS3.Builder s3ClientBuilder, + S3InputDataConfig inputDataConfig, + List uris, + List prefixes, + List objects, + S3InputSourceConfig s3InputSourceConfig + ) + { + this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null); + } + + private void applyAssumeRole( + ServerSideEncryptingAmazonS3.Builder s3ClientBuilder, + S3InputSourceConfig s3InputSourceConfig, + AWSCredentialsProvider awsCredentialsProvider + ) + { + String assumeRoleArn = s3InputSourceConfig.getAssumeRoleArn(); + if (assumeRoleArn != null) { + String roleSessionName = StringUtils.format("druid-s3-input-source-%s", UUID.randomUUID().toString()); + AWSSecurityTokenService securityTokenService = AWSSecurityTokenServiceClientBuilder.standard() + .withCredentials(awsCredentialsProvider) + .build(); + STSAssumeRoleSessionCredentialsProvider.Builder roleCredentialsProviderBuilder; + roleCredentialsProviderBuilder = new STSAssumeRoleSessionCredentialsProvider + .Builder(assumeRoleArn, roleSessionName).withStsClient(securityTokenService); + + if (s3InputSourceConfig.getAssumeRoleExternalId() != null) { + roleCredentialsProviderBuilder.withExternalId(s3InputSourceConfig.getAssumeRoleExternalId()); + } + + s3ClientBuilder.getAmazonS3ClientBuilder().withCredentials(roleCredentialsProviderBuilder.build()); + } + } + + @Nonnull + private AWSStaticCredentialsProvider createStaticCredentialsProvider(S3InputSourceConfig s3InputSourceConfig) + { + return new AWSStaticCredentialsProvider( + new BasicAWSCredentials( + s3InputSourceConfig.getAccessKeyId().getPassword(), + s3InputSourceConfig.getSecretAccessKey().getPassword() + ) + ); } @Nullable @@ -149,7 +214,8 @@ public class S3InputSource extends CloudObjectInputSource null, null, split.get(), - getS3InputSourceConfig() + getS3InputSourceConfig(), + awsCredentialsProvider ); } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceConfig.java index 4543449c4d4..6b837e703a0 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceConfig.java @@ -34,30 +34,50 @@ import java.util.Objects; */ public class S3InputSourceConfig { + @JsonProperty + private String assumeRoleArn; + @JsonProperty + private String assumeRoleExternalId; + @JsonProperty + private PasswordProvider accessKeyId; + @JsonProperty + private PasswordProvider secretAccessKey; + @JsonCreator public S3InputSourceConfig( @JsonProperty("accessKeyId") @Nullable PasswordProvider accessKeyId, - @JsonProperty("secretAccessKey") @Nullable PasswordProvider secretAccessKey + @JsonProperty("secretAccessKey") @Nullable PasswordProvider secretAccessKey, + @JsonProperty("assumeRoleArn") @Nullable String assumeRoleArn, + @JsonProperty("assumeRoleExternalId") @Nullable String assumeRoleExternalId ) { + this.assumeRoleArn = assumeRoleArn; + this.assumeRoleExternalId = assumeRoleExternalId; if (accessKeyId != null || secretAccessKey != null) { this.accessKeyId = Preconditions.checkNotNull(accessKeyId, "accessKeyId cannot be null if secretAccessKey is given"); this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey, "secretAccessKey cannot be null if accessKeyId is given"); } } - @JsonProperty - private PasswordProvider accessKeyId; - - @JsonProperty - private PasswordProvider secretAccessKey; + @Nullable + public String getAssumeRoleArn() + { + return assumeRoleArn; + } + @Nullable + public String getAssumeRoleExternalId() + { + return assumeRoleExternalId; + } + @Nullable public PasswordProvider getAccessKeyId() { return accessKeyId; } + @Nullable public PasswordProvider getSecretAccessKey() { return secretAccessKey; @@ -76,6 +96,8 @@ public class S3InputSourceConfig return "S3InputSourceConfig{" + "accessKeyId=" + accessKeyId + ", secretAccessKey=" + secretAccessKey + + ", assumeRoleArn=" + assumeRoleArn + + ", assumeRoleExternalId=" + assumeRoleExternalId + '}'; } @@ -90,12 +112,14 @@ public class S3InputSourceConfig } S3InputSourceConfig that = (S3InputSourceConfig) o; return Objects.equals(accessKeyId, that.accessKeyId) && - Objects.equals(secretAccessKey, that.secretAccessKey); + Objects.equals(secretAccessKey, that.secretAccessKey) && + Objects.equals(assumeRoleArn, that.assumeRoleArn) && + Objects.equals(assumeRoleExternalId, that.assumeRoleExternalId); } @Override public int hashCode() { - return Objects.hash(accessKeyId, secretAccessKey); + return Objects.hash(accessKeyId, secretAccessKey, assumeRoleArn, assumeRoleExternalId); } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index 41a5f8a36c8..957dd4db632 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.s3; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3ClientBuilder; @@ -40,6 +41,7 @@ import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Provides; +import org.apache.druid.common.aws.AWSCredentialsUtils; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; @@ -120,7 +122,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest ); private static final S3InputSourceConfig CLOUD_CONFIG_PROPERTIES = new S3InputSourceConfig( - new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret")); + new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret"), null, null); private static final List EXPECTED_LOCATION = ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv")); @@ -221,6 +223,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest { S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret = EasyMock.createMock(S3InputSourceConfig.class); EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret); + EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleArn()).andStubReturn(null); EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured()) .andStubReturn(false); EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret); @@ -279,6 +282,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest null, EXPECTED_LOCATION, null + ); final S3InputSource serdeWithPrefixes = MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); @@ -664,7 +668,25 @@ public class S3InputSourceTest extends InitializedNullHandlingTest DruidModule baseModule = new TestS3Module(); final Injector injector = Guice.createInjector( new ObjectMapperModule(), - baseModule + baseModule, + new DruidModule() + { + @Provides + public AWSCredentialsProvider getAWSCredentialsProvider() + { + return AWSCredentialsUtils.defaultAWSCredentialsProviderChain(null); + } + + @Override public List getJacksonModules() + { + return Collections.emptyList(); + } + + @Override public void configure(Binder binder) + { + + } + } ); final ObjectMapper baseMapper = injector.getInstance(ObjectMapper.class); diff --git a/website/.spelling b/website/.spelling index eeba3cbf4bf..4434cd0b1cc 100644 --- a/website/.spelling +++ b/website/.spelling @@ -25,6 +25,7 @@ ACL ACLs APIs AvroStorage +ARN AWS AWS_CONTAINER_CREDENTIALS_RELATIVE_URI AWS_CONTAINER_CREDENTIALS_FULL_URI @@ -195,6 +196,8 @@ aggregator aggregators ambari analytics +assumeRoleArn +assumeRoleExternalId async authorizer authorizers