Add integration tests for S3 Assume Role ingestion feature (#11472)

* add IT for S3 assume role

* fix checkstyle

* fix test

* fix pom

* fix test
This commit is contained in:
Maytas Monsereenusorn 2021-07-23 10:09:09 +07:00 committed by GitHub
parent 9767b42e85
commit 161f4dbc0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 502 additions and 26 deletions

View File

@ -409,6 +409,12 @@
<version>0.9.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws.sdk.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>

View File

@ -70,6 +70,9 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
private String cloudBucket;
private String cloudPath;
private String cloudRegion;
private String s3AssumeRoleWithExternalId;
private String s3AssumeRoleExternalId;
private String s3AssumeRoleWithoutExternalId;
private String hadoopGcsCredentialsPath;
private String azureKey;
private String streamEndpoint;
@ -232,6 +235,10 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
cloudBucket = props.get("cloud_bucket");
cloudPath = props.get("cloud_path");
cloudRegion = props.get("cloud_region");
s3AssumeRoleWithExternalId = props.get("s3_assume_role_with_external_id");
s3AssumeRoleExternalId = props.get("s3_assume_role_external_id");
s3AssumeRoleWithoutExternalId = props.get("s3_assume_role_without_external_id");
hadoopGcsCredentialsPath = props.get("hadoopGcsCredentialsPath");
azureKey = props.get("azureKey");
streamEndpoint = props.get("stream_endpoint");
@ -483,6 +490,24 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
return cloudRegion;
}
@Override
public String getS3AssumeRoleWithExternalId()
{
return s3AssumeRoleWithExternalId;
}
@Override
public String getS3AssumeRoleExternalId()
{
return s3AssumeRoleExternalId;
}
@Override
public String getS3AssumeRoleWithoutExternalId()
{
return s3AssumeRoleWithoutExternalId;
}
@Override
public String getAzureKey()
{

View File

@ -57,6 +57,15 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
@JsonProperty
private String cloudRegion;
@JsonProperty
private String s3AssumeRoleWithExternalId;
@JsonProperty
private String s3AssumeRoleExternalId;
@JsonProperty
private String s3AssumeRoleWithoutExternalId;
@JsonProperty
private String hadoopGcsCredentialsPath;
@ -390,6 +399,24 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
return cloudRegion;
}
@Override
public String getS3AssumeRoleWithExternalId()
{
return s3AssumeRoleWithExternalId;
}
@Override
public String getS3AssumeRoleExternalId()
{
return s3AssumeRoleExternalId;
}
@Override
public String getS3AssumeRoleWithoutExternalId()
{
return s3AssumeRoleWithoutExternalId;
}
@Override
public String getAzureKey()
{

View File

@ -158,6 +158,12 @@ public interface IntegrationTestingConfig
String getCloudRegion();
String getS3AssumeRoleWithExternalId();
String getS3AssumeRoleExternalId();
String getS3AssumeRoleWithoutExternalId();
String getAzureKey();
String getHadoopGcsCredentialsPath();

View File

@ -0,0 +1,274 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.testng.Assert;
import org.testng.SkipException;
import java.io.Closeable;
import java.util.UUID;
import java.util.function.Function;
public abstract class AbstractS3AssumeRoleIndexTest extends AbstractITBatchIndexTest
{
private static final String INDEX_TASK_WITH_OVERRIDE = "/indexer/wikipedia_override_credentials_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
private static final ImmutableList INPUT_SOURCE_OBJECTS_VALUE = ImmutableList.of
(
ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%" + WIKIPEDIA_DATA_1),
ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%" + WIKIPEDIA_DATA_2),
ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%" + WIKIPEDIA_DATA_3)
);
abstract boolean isSetS3OverrideCredentials();
void doTestS3WithValidAssumeRoleAndExternalIdShouldSucceed() throws Exception
{
if (config.getS3AssumeRoleExternalId() == null || config.getS3AssumeRoleWithExternalId() == null) {
throw new SkipException("S3 Assume Role and external Id must be set for this test");
}
final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
try (
final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix());
) {
final Function<String, String> s3PropsTransform = spec -> {
try {
String inputSourceValue = jsonMapper.writeValueAsString(INPUT_SOURCE_OBJECTS_VALUE);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%BUCKET%%",
config.getCloudBucket()
);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%PATH%%",
config.getCloudPath()
);
ImmutableMap.Builder<String, Object> s3ConfigMap = ImmutableMap.builder();
if (isSetS3OverrideCredentials()) {
s3ConfigMap.put("accessKeyId", ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_ACCESS_KEY"));
s3ConfigMap.put("secretAccessKey", ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_SECRET_KEY"));
}
s3ConfigMap.put("assumeRoleArn", config.getS3AssumeRoleWithExternalId());
s3ConfigMap.put("assumeRoleExternalId", config.getS3AssumeRoleExternalId());
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_CONFIG%%",
jsonMapper.writeValueAsString(s3ConfigMap.build())
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_TYPE%%",
"s3"
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_KEY%%",
INPUT_SOURCE_OBJECTS_KEY
);
return StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_VALUE%%",
inputSourceValue
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
};
doIndexTest(
indexDatasource,
INDEX_TASK_WITH_OVERRIDE,
s3PropsTransform,
INDEX_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(false, false)
);
}
}
void doTestS3WithAssumeRoleAndInvalidExternalIdShouldFail() throws Exception
{
if (config.getS3AssumeRoleExternalId() == null || config.getS3AssumeRoleWithExternalId() == null) {
throw new SkipException("S3 Assume Role and external Id must be set for this test");
}
final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
try {
final Function<String, String> s3PropsTransform = spec -> {
try {
String inputSourceValue = jsonMapper.writeValueAsString(INPUT_SOURCE_OBJECTS_VALUE);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%BUCKET%%",
config.getCloudBucket()
);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%PATH%%",
config.getCloudPath()
);
ImmutableMap.Builder<String, Object> s3ConfigMap = ImmutableMap.builder();
if (isSetS3OverrideCredentials()) {
s3ConfigMap.put("accessKeyId", ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_ACCESS_KEY"));
s3ConfigMap.put("secretAccessKey", ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_SECRET_KEY"));
}
s3ConfigMap.put("assumeRoleArn", config.getS3AssumeRoleWithExternalId());
s3ConfigMap.put("assumeRoleExternalId", "RANDOM_INVALID_VALUE_" + UUID.randomUUID());
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_CONFIG%%",
jsonMapper.writeValueAsString(s3ConfigMap.build())
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_TYPE%%",
"s3"
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_KEY%%",
INPUT_SOURCE_OBJECTS_KEY
);
return StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_VALUE%%",
inputSourceValue
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
};
final String fullDatasourceName = indexDatasource + config.getExtraDatasourceNameSuffix();
final String taskSpec = s3PropsTransform.apply(
StringUtils.replace(
getResourceAsString(INDEX_TASK_WITH_OVERRIDE),
"%%DATASOURCE%%",
fullDatasourceName
)
);
final String taskID = indexer.submitTask(taskSpec);
indexer.waitUntilTaskFails(taskID);
TaskStatusPlus taskStatusPlus = indexer.getTaskStatus(taskID);
// Index task is expected to fail as the external id is invalid
Assert.assertEquals(taskStatusPlus.getStatusCode(), TaskState.FAILED);
Assert.assertNotNull(taskStatusPlus.getErrorMsg());
Assert.assertTrue(
taskStatusPlus.getErrorMsg().contains("com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException"),
"Expect task to fail with AWSSecurityTokenServiceException");
}
finally {
// If the test pass, then there is no datasource to unload
closeQuietly(unloader(indexDatasource + config.getExtraDatasourceNameSuffix()));
}
}
void doTestS3WithValidAssumeRoleWithoutExternalIdShouldSucceed() throws Exception
{
if (config.getS3AssumeRoleWithoutExternalId() == null) {
throw new SkipException("S3 Assume Role must be set for this test");
}
final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
try (
final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix());
) {
final Function<String, String> s3PropsTransform = spec -> {
try {
String inputSourceValue = jsonMapper.writeValueAsString(INPUT_SOURCE_OBJECTS_VALUE);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%BUCKET%%",
config.getCloudBucket()
);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%PATH%%",
config.getCloudPath()
);
ImmutableMap.Builder<String, Object> s3ConfigMap = ImmutableMap.builder();
if (isSetS3OverrideCredentials()) {
s3ConfigMap.put("accessKeyId", ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_ACCESS_KEY"));
s3ConfigMap.put("secretAccessKey", ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_SECRET_KEY"));
}
s3ConfigMap.put("assumeRoleArn", config.getS3AssumeRoleWithoutExternalId());
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_CONFIG%%",
jsonMapper.writeValueAsString(s3ConfigMap.build())
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_TYPE%%",
"s3"
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_KEY%%",
INPUT_SOURCE_OBJECTS_KEY
);
return StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_VALUE%%",
inputSourceValue
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
};
doIndexTest(
indexDatasource,
INDEX_TASK_WITH_OVERRIDE,
s3PropsTransform,
INDEX_QUERIES_RESOURCE,
false,
true,
true,
new Pair<>(false, false)
);
}
}
private void closeQuietly(Closeable closeable)
{
try {
if (closeable != null) {
closeable.close();
}
}
catch (Exception var2) {
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
/**
* IMPORTANT:
* To run this test, you must:
* 1) Set the bucket and path for your data. This can be done by setting -Ddruid.test.config.cloudBucket and
* -Ddruid.test.config.cloudPath or setting "cloud_bucket" and "cloud_path" in the config file.
* 2) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json
* located in integration-tests/src/test/resources/data/batch_index/json to your S3 at the location set in step 1.
* 3) Provide -Doverride.config.path=<PATH_TO_FILE> with s3 credentials/configs set. See
* integration-tests/docker/environment-configs/override-examples/s3 for env vars to provide.
* Note that druid_s3_accessKey and druid_s3_secretKey should be set to credentials that does have
* access to the role in #4.
* 4) Set the assume role configs. This can be done by setting
* -Ddruid.test.config.s3AssumeRoleWithExternalId or setting "s3_assume_role_with_external_id" in the config file.
* -Ddruid.test.config.s3AssumeRoleExternalId or setting "s3_assume_role_external_id" in the config file.
* -Ddruid.test.config.s3AssumeRoleWithoutExternalId or setting "s3_assume_role_without_external_id" in the config file.
* The credientials provided in #3 must be able to assume these roles.
* These roles must also have access to the bucket and path for your data in #1.
* (s3AssumeRoleExternalId is the external id for s3AssumeRoleWithExternalId, while s3AssumeRoleWithoutExternalId
* should not have external id set)
*
* NOTE: Tests in this class will be skipped if properties in #4 are not set.
*/
@Test(groups = TestNGGroup.S3_DEEP_STORAGE)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITS3AssumeRoleIndexTest extends AbstractS3AssumeRoleIndexTest
{
@Override
public boolean isSetS3OverrideCredentials()
{
return false;
}
@Test
public void testS3WithValidAssumeRoleAndExternalIdShouldSucceed() throws Exception
{
doTestS3WithValidAssumeRoleAndExternalIdShouldSucceed();
}
@Test
public void testS3WithAssumeRoleAndInvalidExternalIdShouldFail() throws Exception
{
doTestS3WithAssumeRoleAndInvalidExternalIdShouldFail();
}
@Test
public void testS3WithValidAssumeRoleWithoutExternalIdShouldSucceed() throws Exception
{
doTestS3WithValidAssumeRoleWithoutExternalIdShouldSucceed();
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
/**
* IMPORTANT:
* To run this test, you must:
* 1) Set the bucket and path for your data. This can be done by setting -Ddruid.test.config.cloudBucket and
* -Ddruid.test.config.cloudPath or setting "cloud_bucket" and "cloud_path" in the config file.
* 2) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json
* located in integration-tests/src/test/resources/data/batch_index/json to your S3 at the location set in step 1.
* 3) Provide -Doverride.config.path=<PATH_TO_FILE> with s3 credentials/configs set. See
* integration-tests/docker/environment-configs/override-examples/s3 for env vars to provide.
* Note that druid_s3_accessKey and druid_s3_secretKey should be unset or set to credentials that does not have
* access to the role. The credentials that does have access to the role should be set to the env variable
* OVERRIDE_S3_ACCESS_KEY and OVERRIDE_S3_SECRET_KEY
* 4) Set the assume role configs. This can be done by setting
* -Ddruid.test.config.s3AssumeRoleWithExternalId or setting "s3_assume_role_with_external_id" in the config file.
* -Ddruid.test.config.s3AssumeRoleExternalId or setting "s3_assume_role_external_id" in the config file.
* -Ddruid.test.config.s3AssumeRoleWithoutExternalId or setting "s3_assume_role_without_external_id" in the config file.
* The credientials provided in OVERRIDE_S3_ACCESS_KEY and OVERRIDE_S3_SECRET_KEY must be able to assume these roles.
* These roles must also have access to the bucket and path for your data in #1.
* (s3AssumeRoleExternalId is the external id for s3AssumeRoleWithExternalId, while s3AssumeRoleWithoutExternalId
* should not have external id set)
*
* NOTE: Tests in this class will be skipped if properties in #4 are not set.
*/
@Test(groups = TestNGGroup.S3_INGESTION)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITS3AssumeRoleWithOverrideCredentialsIndexTest extends AbstractS3AssumeRoleIndexTest
{
@Override
public boolean isSetS3OverrideCredentials()
{
return true;
}
@Test
public void testS3WithValidAssumeRoleAndExternalIdUsingOverrideCredentialsShouldSucceed() throws Exception
{
doTestS3WithValidAssumeRoleAndExternalIdShouldSucceed();
}
@Test
public void testS3WithAssumeRoleAndInvalidExternalIdUsingOverrideCredentialsShouldFail() throws Exception
{
doTestS3WithAssumeRoleAndInvalidExternalIdShouldFail();
}
@Test
public void testS3WithValidAssumeRoleWithoutExternalIdUsingOverrideCredentialsShouldSucceed() throws Exception
{
doTestS3WithValidAssumeRoleWithoutExternalIdShouldSucceed();
}
}

View File

@ -86,22 +86,16 @@ public class ITS3OverrideCredentialsIndexTest extends AbstractITBatchIndexTest
"%%PATH%%",
config.getCloudPath()
);
spec = StringUtils.replace(
spec,
"%%ACCESS_KEY_PROPERTY_VALUE%%",
"%%INPUT_SOURCE_CONFIG%%",
jsonMapper.writeValueAsString(
ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_ACCESS_KEY")
ImmutableMap.of(
"accessKeyId", ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_ACCESS_KEY"),
"secretAccessKey", ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_SECRET_KEY")
)
)
);
spec = StringUtils.replace(
spec,
"%%SECRET_KEY_PROPERTY_VALUE%%",
jsonMapper.writeValueAsString(
ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_SECRET_KEY")
)
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_TYPE%%",
@ -219,22 +213,16 @@ public class ITS3OverrideCredentialsIndexTest extends AbstractITBatchIndexTest
"%%PATH%%",
config.getCloudPath()
);
spec = StringUtils.replace(
spec,
"%%ACCESS_KEY_PROPERTY_VALUE%%",
"%%INPUT_SOURCE_CONFIG%%",
jsonMapper.writeValueAsString(
ImmutableMap.of("type", "environment", "variable", "NON_EXISTENT_INVALID_ENV_VAR")
ImmutableMap.of(
"accessKeyId", ImmutableMap.of("type", "environment", "variable", "INVALID_ACCESS_KEY"),
"secretAccessKey", ImmutableMap.of("type", "environment", "variable", "INVALID_SECRET_KEY")
)
)
);
spec = StringUtils.replace(
spec,
"%%SECRET_KEY_PROPERTY_VALUE%%",
jsonMapper.writeValueAsString(
ImmutableMap.of("type", "environment", "variable", "NON_EXISTENT_INVALID_ENV_VAR")
)
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_TYPE%%",

View File

@ -68,10 +68,7 @@
"type": "index",
"inputSource": {
"type": "%%INPUT_SOURCE_TYPE%%",
"properties": {
"accessKeyId": %%ACCESS_KEY_PROPERTY_VALUE%%,
"secretAccessKey": %%SECRET_KEY_PROPERTY_VALUE%%
},
"properties": %%INPUT_SOURCE_CONFIG%%,
"%%INPUT_SOURCE_PROPERTY_KEY%%": %%INPUT_SOURCE_PROPERTY_VALUE%%
},
"inputFormat": {