mirror of https://github.com/apache/druid.git
S3 ingestion can assume role (#10995)
* feature s3 assume role * feature s3 assume role * feature s3 assume role * feature s3 assume role * feature s3 assume role * feature s3 assume role * tests fix * spelling fix * sts fix Co-authored-by: egor-ryashin <egor.ryashin@rilldata.com>
This commit is contained in:
parent
651810f9e5
commit
9047fa3d9c
|
@ -894,6 +894,47 @@ Sample specs:
|
|||
...
|
||||
```
|
||||
|
||||
```json
|
||||
...
|
||||
"ioConfig": {
|
||||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "s3",
|
||||
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
|
||||
"properties": {
|
||||
"accessKeyId": "KLJ78979SDFdS2",
|
||||
"secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd"
|
||||
}
|
||||
},
|
||||
"inputFormat": {
|
||||
"type": "json"
|
||||
},
|
||||
...
|
||||
},
|
||||
...
|
||||
```
|
||||
|
||||
```json
|
||||
...
|
||||
"ioConfig": {
|
||||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "s3",
|
||||
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
|
||||
"properties": {
|
||||
"accessKeyId": "KLJ78979SDFdS2",
|
||||
"secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd",
|
||||
"assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3"
|
||||
}
|
||||
},
|
||||
"inputFormat": {
|
||||
"type": "json"
|
||||
},
|
||||
...
|
||||
},
|
||||
...
|
||||
```
|
||||
|
||||
|property|description|default|required?|
|
||||
|--------|-----------|-------|---------|
|
||||
|type|This should be `s3`.|None|yes|
|
||||
|
@ -917,6 +958,8 @@ Properties Object:
|
|||
|--------|-----------|-------|---------|
|
||||
|accessKeyId|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's access key|None|yes if secretAccessKey is given|
|
||||
|secretAccessKey|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's secret key|None|yes if accessKeyId is given|
|
||||
|assumeRoleArn|AWS ARN of the role to assume [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html). **assumeRoleArn** can be used either with the ingestion spec AWS credentials or with the default S3 credentials|None|no|
|
||||
|assumeRoleExternalId|A unique identifier that might be required when you assume a role in another account [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html)|None|no|
|
||||
|
||||
**Note :** *If accessKeyId and secretAccessKey are not given, the default [S3 credentials provider chain](../development/extensions-core/s3.md#s3-authentication-methods) is used.*
|
||||
|
||||
|
|
|
@ -115,8 +115,8 @@
|
|||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk-sts</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<version>${aws.sdk.version}</version>
|
||||
</dependency>
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
|
|
|
@ -19,12 +19,17 @@
|
|||
|
||||
package org.apache.druid.data.input.s3;
|
||||
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.AWSStaticCredentialsProvider;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
|
||||
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Suppliers;
|
||||
|
@ -35,6 +40,7 @@ import org.apache.druid.data.input.SplitHintSpec;
|
|||
import org.apache.druid.data.input.impl.CloudObjectInputSource;
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
import org.apache.druid.data.input.impl.SplittableInputSource;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.storage.s3.S3InputDataConfig;
|
||||
import org.apache.druid.storage.s3.S3StorageDruidModule;
|
||||
import org.apache.druid.storage.s3.S3Utils;
|
||||
|
@ -47,6 +53,7 @@ import java.net.URI;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -59,6 +66,7 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
@JsonProperty("properties")
|
||||
private final S3InputSourceConfig s3InputSourceConfig;
|
||||
private final S3InputDataConfig inputDataConfig;
|
||||
private final AWSCredentialsProvider awsCredentialsProvider;
|
||||
|
||||
/**
|
||||
* Constructor for S3InputSource
|
||||
|
@ -84,7 +92,8 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
@JsonProperty("uris") @Nullable List<URI> uris,
|
||||
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
||||
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
||||
@JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig
|
||||
@JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig,
|
||||
@JacksonInject AWSCredentialsProvider awsCredentialsProvider
|
||||
)
|
||||
{
|
||||
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects);
|
||||
|
@ -95,13 +104,19 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
() -> {
|
||||
if (s3ClientBuilder != null && s3InputSourceConfig != null) {
|
||||
if (s3InputSourceConfig.isCredentialsConfigured()) {
|
||||
AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider(
|
||||
new BasicAWSCredentials(
|
||||
s3InputSourceConfig.getAccessKeyId().getPassword(),
|
||||
s3InputSourceConfig.getSecretAccessKey().getPassword()
|
||||
)
|
||||
);
|
||||
s3ClientBuilder.getAmazonS3ClientBuilder().withCredentials(credentials);
|
||||
if (s3InputSourceConfig.getAssumeRoleArn() == null) {
|
||||
s3ClientBuilder
|
||||
.getAmazonS3ClientBuilder()
|
||||
.withCredentials(createStaticCredentialsProvider(s3InputSourceConfig));
|
||||
} else {
|
||||
applyAssumeRole(
|
||||
s3ClientBuilder,
|
||||
s3InputSourceConfig,
|
||||
createStaticCredentialsProvider(s3InputSourceConfig)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
applyAssumeRole(s3ClientBuilder, s3InputSourceConfig, awsCredentialsProvider);
|
||||
}
|
||||
return s3ClientBuilder.build();
|
||||
} else {
|
||||
|
@ -109,6 +124,56 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
}
|
||||
}
|
||||
);
|
||||
this.awsCredentialsProvider = awsCredentialsProvider;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public S3InputSource(
|
||||
ServerSideEncryptingAmazonS3 s3Client,
|
||||
ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
|
||||
S3InputDataConfig inputDataConfig,
|
||||
List<URI> uris,
|
||||
List<URI> prefixes,
|
||||
List<CloudObjectLocation> objects,
|
||||
S3InputSourceConfig s3InputSourceConfig
|
||||
)
|
||||
{
|
||||
this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null);
|
||||
}
|
||||
|
||||
private void applyAssumeRole(
|
||||
ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
|
||||
S3InputSourceConfig s3InputSourceConfig,
|
||||
AWSCredentialsProvider awsCredentialsProvider
|
||||
)
|
||||
{
|
||||
String assumeRoleArn = s3InputSourceConfig.getAssumeRoleArn();
|
||||
if (assumeRoleArn != null) {
|
||||
String roleSessionName = StringUtils.format("druid-s3-input-source-%s", UUID.randomUUID().toString());
|
||||
AWSSecurityTokenService securityTokenService = AWSSecurityTokenServiceClientBuilder.standard()
|
||||
.withCredentials(awsCredentialsProvider)
|
||||
.build();
|
||||
STSAssumeRoleSessionCredentialsProvider.Builder roleCredentialsProviderBuilder;
|
||||
roleCredentialsProviderBuilder = new STSAssumeRoleSessionCredentialsProvider
|
||||
.Builder(assumeRoleArn, roleSessionName).withStsClient(securityTokenService);
|
||||
|
||||
if (s3InputSourceConfig.getAssumeRoleExternalId() != null) {
|
||||
roleCredentialsProviderBuilder.withExternalId(s3InputSourceConfig.getAssumeRoleExternalId());
|
||||
}
|
||||
|
||||
s3ClientBuilder.getAmazonS3ClientBuilder().withCredentials(roleCredentialsProviderBuilder.build());
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private AWSStaticCredentialsProvider createStaticCredentialsProvider(S3InputSourceConfig s3InputSourceConfig)
|
||||
{
|
||||
return new AWSStaticCredentialsProvider(
|
||||
new BasicAWSCredentials(
|
||||
s3InputSourceConfig.getAccessKeyId().getPassword(),
|
||||
s3InputSourceConfig.getSecretAccessKey().getPassword()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
@ -149,7 +214,8 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
null,
|
||||
null,
|
||||
split.get(),
|
||||
getS3InputSourceConfig()
|
||||
getS3InputSourceConfig(),
|
||||
awsCredentialsProvider
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -34,30 +34,50 @@ import java.util.Objects;
|
|||
*/
|
||||
public class S3InputSourceConfig
|
||||
{
|
||||
@JsonProperty
|
||||
private String assumeRoleArn;
|
||||
@JsonProperty
|
||||
private String assumeRoleExternalId;
|
||||
@JsonProperty
|
||||
private PasswordProvider accessKeyId;
|
||||
@JsonProperty
|
||||
private PasswordProvider secretAccessKey;
|
||||
|
||||
@JsonCreator
|
||||
public S3InputSourceConfig(
|
||||
@JsonProperty("accessKeyId") @Nullable PasswordProvider accessKeyId,
|
||||
@JsonProperty("secretAccessKey") @Nullable PasswordProvider secretAccessKey
|
||||
@JsonProperty("secretAccessKey") @Nullable PasswordProvider secretAccessKey,
|
||||
@JsonProperty("assumeRoleArn") @Nullable String assumeRoleArn,
|
||||
@JsonProperty("assumeRoleExternalId") @Nullable String assumeRoleExternalId
|
||||
)
|
||||
{
|
||||
this.assumeRoleArn = assumeRoleArn;
|
||||
this.assumeRoleExternalId = assumeRoleExternalId;
|
||||
if (accessKeyId != null || secretAccessKey != null) {
|
||||
this.accessKeyId = Preconditions.checkNotNull(accessKeyId, "accessKeyId cannot be null if secretAccessKey is given");
|
||||
this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey, "secretAccessKey cannot be null if accessKeyId is given");
|
||||
}
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
private PasswordProvider accessKeyId;
|
||||
|
||||
@JsonProperty
|
||||
private PasswordProvider secretAccessKey;
|
||||
@Nullable
|
||||
public String getAssumeRoleArn()
|
||||
{
|
||||
return assumeRoleArn;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public String getAssumeRoleExternalId()
|
||||
{
|
||||
return assumeRoleExternalId;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public PasswordProvider getAccessKeyId()
|
||||
{
|
||||
return accessKeyId;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public PasswordProvider getSecretAccessKey()
|
||||
{
|
||||
return secretAccessKey;
|
||||
|
@ -76,6 +96,8 @@ public class S3InputSourceConfig
|
|||
return "S3InputSourceConfig{" +
|
||||
"accessKeyId=" + accessKeyId +
|
||||
", secretAccessKey=" + secretAccessKey +
|
||||
", assumeRoleArn=" + assumeRoleArn +
|
||||
", assumeRoleExternalId=" + assumeRoleExternalId +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
@ -90,12 +112,14 @@ public class S3InputSourceConfig
|
|||
}
|
||||
S3InputSourceConfig that = (S3InputSourceConfig) o;
|
||||
return Objects.equals(accessKeyId, that.accessKeyId) &&
|
||||
Objects.equals(secretAccessKey, that.secretAccessKey);
|
||||
Objects.equals(secretAccessKey, that.secretAccessKey) &&
|
||||
Objects.equals(assumeRoleArn, that.assumeRoleArn) &&
|
||||
Objects.equals(assumeRoleExternalId, that.assumeRoleExternalId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(accessKeyId, secretAccessKey);
|
||||
return Objects.hash(accessKeyId, secretAccessKey, assumeRoleArn, assumeRoleExternalId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.data.input.s3;
|
||||
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
|
||||
|
@ -40,6 +41,7 @@ import com.google.inject.Binder;
|
|||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Provides;
|
||||
import org.apache.druid.common.aws.AWSCredentialsUtils;
|
||||
import org.apache.druid.data.input.ColumnsFilter;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.InputRowSchema;
|
||||
|
@ -120,7 +122,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
);
|
||||
|
||||
private static final S3InputSourceConfig CLOUD_CONFIG_PROPERTIES = new S3InputSourceConfig(
|
||||
new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret"));
|
||||
new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret"), null, null);
|
||||
|
||||
private static final List<CloudObjectLocation> EXPECTED_LOCATION =
|
||||
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
|
||||
|
@ -221,6 +223,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
{
|
||||
S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret = EasyMock.createMock(S3InputSourceConfig.class);
|
||||
EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret);
|
||||
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleArn()).andStubReturn(null);
|
||||
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured())
|
||||
.andStubReturn(false);
|
||||
EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret);
|
||||
|
@ -279,6 +282,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
EXPECTED_LOCATION,
|
||||
null
|
||||
|
||||
);
|
||||
final S3InputSource serdeWithPrefixes =
|
||||
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
|
||||
|
@ -664,7 +668,25 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
DruidModule baseModule = new TestS3Module();
|
||||
final Injector injector = Guice.createInjector(
|
||||
new ObjectMapperModule(),
|
||||
baseModule
|
||||
baseModule,
|
||||
new DruidModule()
|
||||
{
|
||||
@Provides
|
||||
public AWSCredentialsProvider getAWSCredentialsProvider()
|
||||
{
|
||||
return AWSCredentialsUtils.defaultAWSCredentialsProviderChain(null);
|
||||
}
|
||||
|
||||
@Override public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override public void configure(Binder binder)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
);
|
||||
final ObjectMapper baseMapper = injector.getInstance(ObjectMapper.class);
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ ACL
|
|||
ACLs
|
||||
APIs
|
||||
AvroStorage
|
||||
ARN
|
||||
AWS
|
||||
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
|
||||
AWS_CONTAINER_CREDENTIALS_FULL_URI
|
||||
|
@ -195,6 +196,8 @@ aggregator
|
|||
aggregators
|
||||
ambari
|
||||
analytics
|
||||
assumeRoleArn
|
||||
assumeRoleExternalId
|
||||
async
|
||||
authorizer
|
||||
authorizers
|
||||
|
|
Loading…
Reference in New Issue