S3 ingestion spec should not uses the default credentials provider chain when environment value password provider is misconfigured. (#9552)

* fix s3 optional cred

* S3 ingestion spec uses the default credentials provider chain when environment value password provider is misconfigured.

* fix failing test
This commit is contained in:
Maytas Monsereenusorn 2020-03-24 15:09:02 -07:00 committed by GitHub
parent e1b201c279
commit 3f521943fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 428 additions and 18 deletions

View File

@ -344,7 +344,7 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags
# END - Integration tests for Compile with Java 8 and Run with Java 8
@ -383,7 +383,7 @@ jobs:
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
# END - Integration tests for Compile with Java 8 and Run with Java 11
- name: "security vulnerabilities"

View File

@ -67,9 +67,7 @@ public class S3InputSourceConfig
public boolean isCredentialsConfigured()
{
return accessKeyId != null &&
accessKeyId.getPassword() != null &&
secretAccessKey != null &&
secretAccessKey.getPassword() != null;
secretAccessKey != null;
}
@Override

View File

@ -37,6 +37,12 @@
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.sdk.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-s3-extensions</artifactId>

View File

@ -25,6 +25,7 @@ import com.google.common.base.Predicates;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
@ -108,7 +109,7 @@ public class OverlordResourceTestClient
}
}
public TaskState getTaskStatus(String taskID)
public TaskStatusPlus getTaskStatus(String taskID)
{
try {
StatusResponseHolder response = makeRequest(
@ -127,7 +128,7 @@ public class OverlordResourceTestClient
{
}
);
return taskStatusResponse.getStatus().getStatusCode();
return taskStatusResponse.getStatus();
}
catch (Exception e) {
throw new RuntimeException(e);
@ -186,7 +187,7 @@ public class OverlordResourceTestClient
@Override
public Boolean call()
{
TaskState status = getTaskStatus(taskID);
TaskState status = getTaskStatus(taskID).getStatusCode();
if (status == TaskState.FAILED) {
throw new ISE("Indexer task FAILED");
}
@ -200,6 +201,34 @@ public class OverlordResourceTestClient
);
}
public void waitUntilTaskFails(final String taskID)
{
waitUntilTaskFails(taskID, 10000, 60);
}
public void waitUntilTaskFails(final String taskID, final int millisEach, final int numTimes)
{
ITRetryUtil.retryUntil(
new Callable<Boolean>()
{
@Override
public Boolean call()
{
TaskState status = getTaskStatus(taskID).getStatusCode();
if (status == TaskState.SUCCESS) {
throw new ISE("Indexer task SUCCEED");
}
return status == TaskState.FAILED;
}
},
true,
millisEach,
numTimes,
taskID
);
}
public String submitSupervisor(String spec)
{
try {

View File

@ -54,5 +54,8 @@ public class TestNGGroup
// See integration-tests/docker/environment-configs/override-examples/hdfs for env vars to provide.
// Additionally, hadoop docker must be started by passing -Dstart.hadoop.docker=true to mvn.
public static final String HDFS_DEEP_STORAGE = "hdfs-deep-storage";
// This group is not part of CI. To run this group, s3 configs/credentials for your s3 must be provided in a file.
// The path of the file must then be pass to mvn with -Doverride.config.path=<PATH_TO_FILE>
// See integration-tests/docker/environment-configs/override-examples/s3 for env vars to provide.
public static final String S3_INGESTION = "s3-ingestion";
}

View File

@ -31,7 +31,7 @@ import java.util.List;
import java.util.UUID;
import java.util.function.Function;
public abstract class AbstractAzureInputSourceSimpleIndexTest extends AbstractITBatchIndexTest
public abstract class AbstractAzureInputSourceParallelIndexTest extends AbstractITBatchIndexTest
{
private static final String INDEX_TASK = "/indexer/wikipedia_cloud_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";

View File

@ -31,7 +31,7 @@ import java.util.List;
import java.util.UUID;
import java.util.function.Function;
public abstract class AbstractGcsInputSourceSimpleIndexTest extends AbstractITBatchIndexTest
public abstract class AbstractGcsInputSourceParallelIndexTest extends AbstractITBatchIndexTest
{
private static final String INDEX_TASK = "/indexer/wikipedia_cloud_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";

View File

@ -31,7 +31,7 @@ import java.util.List;
import java.util.UUID;
import java.util.function.Function;
public abstract class AbstractS3InputSourceSimpleIndexTest extends AbstractITBatchIndexTest
public abstract class AbstractS3InputSourceParallelIndexTest extends AbstractITBatchIndexTest
{
private static final String INDEX_TASK = "/indexer/wikipedia_cloud_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";

View File

@ -39,7 +39,7 @@ import java.util.List;
*/
@Test(groups = TestNGGroup.AZURE_DEEP_STORAGE)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITAzureToAzureParallelIndexTest extends AbstractAzureInputSourceSimpleIndexTest
public class ITAzureToAzureParallelIndexTest extends AbstractAzureInputSourceParallelIndexTest
{
@Test(dataProvider = "resources")
public void testAzureIndexData(Pair<String, List> azureInputSource) throws Exception

View File

@ -41,7 +41,7 @@ import java.util.List;
*/
@Test(groups = TestNGGroup.HDFS_DEEP_STORAGE)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITAzureToHdfsParallelIndexTest extends AbstractAzureInputSourceSimpleIndexTest
public class ITAzureToHdfsParallelIndexTest extends AbstractAzureInputSourceParallelIndexTest
{
@Test(dataProvider = "resources")
public void testAzureIndexData(Pair<String, List> azureInputSource) throws Exception

View File

@ -40,7 +40,7 @@ import java.util.List;
*/
@Test(groups = TestNGGroup.GCS_DEEP_STORAGE)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITGcsToGcsParallelIndexTest extends AbstractGcsInputSourceSimpleIndexTest
public class ITGcsToGcsParallelIndexTest extends AbstractGcsInputSourceParallelIndexTest
{
@Test(dataProvider = "resources")
public void testGcsIndexData(Pair<String, List> gcsInputSource) throws Exception

View File

@ -42,7 +42,7 @@ import java.util.List;
*/
@Test(groups = TestNGGroup.HDFS_DEEP_STORAGE)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITGcsToHdfsParallelIndexTest extends AbstractGcsInputSourceSimpleIndexTest
public class ITGcsToHdfsParallelIndexTest extends AbstractGcsInputSourceParallelIndexTest
{
@Test(dataProvider = "resources")
public void testGcsIndexData(Pair<String, List> gcsInputSource) throws Exception

View File

@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.Assert;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.util.UUID;
import java.util.function.Function;
/**
* IMPORTANT:
* To run this test, you must:
* 1) Set the bucket and path for your data. This can be done by setting -Ddruid.test.config.cloudBucket and
* -Ddruid.test.config.cloudPath or setting "cloud_bucket" and "cloud_path" in the config file.
* 2) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json
* located in integration-tests/src/test/resources/data/batch_index to your S3 at the location set in step 1.
* 3) Provide -Doverride.config.path=<PATH_TO_FILE> with s3 credentials/configs set. See
* integration-tests/docker/environment-configs/override-examples/s3 for env vars to provide.
* Note that druid_s3_accessKey and druid_s3_secretKey should be unset or set to credentials that does not have
* access to the bucket and path specified in #1. The credentials that does have access to the bucket and path
* specified in #1 should be set to the env variable OVERRIDE_S3_ACCESS_KEY and OVERRIDE_S3_SECRET_KEY
*/
@Test(groups = TestNGGroup.S3_INGESTION)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITS3OverrideCredentialsIndexTest extends AbstractITBatchIndexTest
{
private static final String INDEX_TASK_WITH_OVERRIDE = "/indexer/wikipedia_override_credentials_index_task.json";
private static final String INDEX_TASK_WITHOUT_OVERRIDE = "/indexer/wikipedia_cloud_simple_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test_" + UUID.randomUUID();
private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
private static final ImmutableList INPUT_SOURCE_OBJECTS_VALUE = ImmutableList.of
(
ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%" + WIKIPEDIA_DATA_1),
ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%" + WIKIPEDIA_DATA_2),
ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%" + WIKIPEDIA_DATA_3)
);
@Test
public void testS3WithValidOverrideCredentialsIndexDataShouldSucceed() throws Exception
{
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
) {
final Function<String, String> s3PropsTransform = spec -> {
try {
String inputSourceValue = jsonMapper.writeValueAsString(INPUT_SOURCE_OBJECTS_VALUE);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%BUCKET%%",
config.getCloudBucket()
);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%PATH%%",
config.getCloudPath()
);
spec = StringUtils.replace(
spec,
"%%ACCESS_KEY_PROPERTY_VALUE%%",
jsonMapper.writeValueAsString(
ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_ACCESS_KEY")
)
);
spec = StringUtils.replace(
spec,
"%%SECRET_KEY_PROPERTY_VALUE%%",
jsonMapper.writeValueAsString(
ImmutableMap.of("type", "environment", "variable", "OVERRIDE_S3_SECRET_KEY")
)
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_TYPE%%",
"s3"
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_KEY%%",
INPUT_SOURCE_OBJECTS_KEY
);
return StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_VALUE%%",
inputSourceValue
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
};
doIndexTest(
INDEX_DATASOURCE,
INDEX_TASK_WITH_OVERRIDE,
s3PropsTransform,
INDEX_QUERIES_RESOURCE,
false,
true,
true
);
}
}
@Test
public void testS3WithoutOverrideCredentialsIndexDataShouldFailed() throws Exception
{
try {
final Function<String, String> s3PropsTransform = spec -> {
try {
String inputSourceValue = jsonMapper.writeValueAsString(INPUT_SOURCE_OBJECTS_VALUE);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%BUCKET%%",
config.getCloudBucket()
);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%PATH%%",
config.getCloudPath()
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_TYPE%%",
"s3"
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_KEY%%",
INPUT_SOURCE_OBJECTS_KEY
);
return StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_VALUE%%",
inputSourceValue
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
};
final String fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix();
final String taskSpec = s3PropsTransform.apply(
StringUtils.replace(
getResourceAsString(INDEX_TASK_WITHOUT_OVERRIDE),
"%%DATASOURCE%%",
fullDatasourceName
)
);
final String taskID = indexer.submitTask(taskSpec);
indexer.waitUntilTaskFails(taskID);
TaskStatusPlus taskStatusPlus = indexer.getTaskStatus(taskID);
// Index task is expected to fail as the default S3 Credentials in Druid's config (druid.s3.accessKey and
// druid.s3.secretKey should not have access to the bucket and path for our data. (Refer to the setup instruction
// at the top of this test class.
Assert.assertEquals(taskStatusPlus.getStatusCode(), TaskState.FAILED);
Assert.assertNotNull(taskStatusPlus.getErrorMsg());
Assert.assertTrue(
taskStatusPlus.getErrorMsg().contains("com.amazonaws.services.s3.model.AmazonS3Exception"),
"Expect task to fail with AmazonS3Exception");
}
finally {
// If the test pass, then there is no datasource to unload
closeQuietly(unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()));
}
}
@Test
public void testS3WithInvalidOverrideCredentialsIndexDataShouldFailed() throws Exception
{
try {
final Function<String, String> s3PropsTransform = spec -> {
try {
String inputSourceValue = jsonMapper.writeValueAsString(INPUT_SOURCE_OBJECTS_VALUE);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%BUCKET%%",
config.getCloudBucket()
);
inputSourceValue = StringUtils.replace(
inputSourceValue,
"%%PATH%%",
config.getCloudPath()
);
spec = StringUtils.replace(
spec,
"%%ACCESS_KEY_PROPERTY_VALUE%%",
jsonMapper.writeValueAsString(
ImmutableMap.of("type", "environment", "variable", "NON_EXISTENT_INVALID_ENV_VAR")
)
);
spec = StringUtils.replace(
spec,
"%%SECRET_KEY_PROPERTY_VALUE%%",
jsonMapper.writeValueAsString(
ImmutableMap.of("type", "environment", "variable", "NON_EXISTENT_INVALID_ENV_VAR")
)
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_TYPE%%",
"s3"
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_KEY%%",
INPUT_SOURCE_OBJECTS_KEY
);
return StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_VALUE%%",
inputSourceValue
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
};
final String fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix();
final String taskSpec = s3PropsTransform.apply(
StringUtils.replace(
getResourceAsString(INDEX_TASK_WITH_OVERRIDE),
"%%DATASOURCE%%",
fullDatasourceName
)
);
final String taskID = indexer.submitTask(taskSpec);
indexer.waitUntilTaskFails(taskID);
TaskStatusPlus taskStatusPlus = indexer.getTaskStatus(taskID);
// Index task is expected to fail as the overrided s3 access key and s3 secret key cannot be null
Assert.assertEquals(taskStatusPlus.getStatusCode(), TaskState.FAILED);
Assert.assertNotNull(taskStatusPlus.getErrorMsg());
Assert.assertTrue(
taskStatusPlus.getErrorMsg().contains("IllegalArgumentException: Access key cannot be null"),
"Expect task to fail with IllegalArgumentException: Access key cannot be null");
}
finally {
// If the test pass, then there is no datasource to unload
closeQuietly(unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()));
}
}
private void closeQuietly(Closeable closeable)
{
try {
if (closeable != null) {
closeable.close();
}
}
catch (Exception var2) {
}
}
}

View File

@ -41,7 +41,7 @@ import java.util.List;
*/
@Test(groups = TestNGGroup.HDFS_DEEP_STORAGE)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITS3ToHdfsParallelIndexTest extends AbstractS3InputSourceSimpleIndexTest
public class ITS3ToHdfsParallelIndexTest extends AbstractS3InputSourceParallelIndexTest
{
@Test(dataProvider = "resources")
public void testS3IndexData(Pair<String, List> s3InputSource) throws Exception

View File

@ -39,7 +39,7 @@ import java.util.List;
*/
@Test(groups = TestNGGroup.S3_DEEP_STORAGE)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITS3ToS3ParallelIndexTest extends AbstractS3InputSourceSimpleIndexTest
public class ITS3ToS3ParallelIndexTest extends AbstractS3InputSourceParallelIndexTest
{
@Test(dataProvider = "resources")
public void testS3IndexData(Pair<String, List> s3InputSource) throws Exception

View File

@ -0,0 +1,85 @@
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"timestampSpec": {
"column": "timestamp"
},
"dimensionsSpec": {
"dimensions": [
"page",
{"type": "string", "name": "language", "createBitmapIndex": false},
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
]
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
},
{
"name": "thetaSketch",
"type": "thetaSketch",
"fieldName": "user"
},
{
"name": "quantilesDoublesSketch",
"type": "quantilesDoublesSketch",
"fieldName": "delta"
},
{
"name": "HLLSketchBuild",
"type": "HLLSketchBuild",
"fieldName": "user"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "second",
"intervals" : [ "2013-08-31/2013-09-02" ]
}
},
"ioConfig": {
"type": "index",
"inputSource": {
"type": "%%INPUT_SOURCE_TYPE%%",
"properties": {
"accessKeyId": %%ACCESS_KEY_PROPERTY_VALUE%%,
"secretAccessKey": %%SECRET_KEY_PROPERTY_VALUE%%
},
"%%INPUT_SOURCE_PROPERTY_KEY%%": %%INPUT_SOURCE_PROPERTY_VALUE%%
},
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "index"
}
}
}