diff --git a/extensions-core/kinesis-indexing-service/pom.xml b/extensions-core/kinesis-indexing-service/pom.xml index 590aa94b991..ea4afaeab8a 100644 --- a/extensions-core/kinesis-indexing-service/pom.xml +++ b/extensions-core/kinesis-indexing-service/pom.xml @@ -188,6 +188,11 @@ assertj-core test + + com.github.stefanbirkner + system-rules + test + diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index e8f49bd34aa..5d5a307e4fd 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -22,6 +22,8 @@ package org.apache.druid.indexing.kinesis; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.name.Named; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.TaskResource; @@ -51,7 +53,7 @@ public class KinesisIndexTask extends SeekableStreamIndexTask @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, - @JacksonInject AWSCredentialsConfig awsCredentialsConfig, + @JacksonInject @Named(KinesisIndexingServiceModule.AWS_SCOPE) AWSCredentialsConfig awsCredentialsConfig, @JacksonInject AppenderatorsManager appenderatorsManager ) { @@ -128,4 +130,10 @@ public class KinesisIndexTask extends SeekableStreamIndexTask { return TYPE; } + + @VisibleForTesting + AWSCredentialsConfig getAwsCredentialsConfig() + { + return awsCredentialsConfig; + } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java index 3cea45a14ac..bd11e4a2df6 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java @@ -38,6 +38,9 @@ import java.util.List; public class KinesisIndexingServiceModule implements DruidModule { + public static final String AWS_SCOPE = "kinesis"; + static final String PROPERTY_BASE = "druid.kinesis"; + @Override public List getJacksonModules() { @@ -63,6 +66,6 @@ public class KinesisIndexingServiceModule implements DruidModule } ).to(KinesisIndexTaskClientFactory.class).in(LazySingleton.class); - JsonConfigProvider.bind(binder, "druid.kinesis", AWSCredentialsConfig.class, Names.named("kinesis")); + JsonConfigProvider.bind(binder, PROPERTY_BASE, AWSCredentialsConfig.class, Names.named(AWS_SCOPE)); } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java index dbe77a21ee9..664b3b58b32 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java @@ -42,7 +42,7 @@ public class KinesisSamplerSpec extends SeekableStreamSamplerSpec @JsonProperty("spec") final KinesisSupervisorSpec ingestionSpec, @JsonProperty("samplerConfig") @Nullable final SamplerConfig samplerConfig, @JacksonInject InputSourceSampler inputSourceSampler, - @JacksonInject @Named("kinesis") AWSCredentialsConfig awsCredentialsConfig + @JacksonInject @Named(KinesisIndexingServiceModule.AWS_SCOPE) AWSCredentialsConfig awsCredentialsConfig ) { super(ingestionSpec, samplerConfig, inputSourceSampler); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index e9610c60970..3092d3ac859 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -28,6 +28,7 @@ import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.guice.annotations.Json; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory; +import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; @@ -62,7 +63,7 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec @JacksonInject ServiceEmitter emitter, @JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, - @JacksonInject @Named("kinesis") AWSCredentialsConfig awsCredentialsConfig, + @JacksonInject @Named(KinesisIndexingServiceModule.AWS_SCOPE) AWSCredentialsConfig awsCredentialsConfig, @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig ) { diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java new file mode 100644 index 00000000000..646a3344d97 --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kinesis; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import org.apache.druid.common.aws.AWSCredentialsConfig; +import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; +import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.common.task.TestAppenderatorsManager; +import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.initialization.Initialization; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; +import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; +import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.ProvideSystemProperty; + +import java.util.Arrays; +import java.util.Collections; + +public class KinesisIndexTaskSerdeTest +{ + private static final DataSchema DATA_SCHEMA = new DataSchema("dataSource", null, null, null, null, null, null, null); + private static final KinesisIndexTaskTuningConfig TUNING_CONFIG = new KinesisIndexTaskTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig( + 0, + "baseSequenceName", + new SeekableStreamStartSequenceNumbers<>("stream", Collections.emptyMap(), null), + new SeekableStreamEndSequenceNumbers<>("stream", Collections.emptyMap()), + null, + null, + null, + null, + "endpoint", + null, + null, + null, + null, + false + ); + private static final String ACCESS_KEY = "test-access-key"; + private static final String SECRET_KEY = "test-secret-key"; + private static final String FILE_SESSION_CREDENTIALS = "test-file-session-credentials"; + + @Rule + public ProvideSystemProperty properties = new ProvideSystemProperty( + KinesisIndexingServiceModule.PROPERTY_BASE + ".accessKey", + ACCESS_KEY + ).and( + KinesisIndexingServiceModule.PROPERTY_BASE + ".secretKey", + SECRET_KEY + ).and( + KinesisIndexingServiceModule.PROPERTY_BASE + ".fileSessionCredentials", + FILE_SESSION_CREDENTIALS + ); + + @Test + public void injectsProperAwsCredentialsConfig() throws Exception + { + KinesisIndexTask target = new KinesisIndexTask( + "id", + null, + DATA_SCHEMA, + TUNING_CONFIG, + IO_CONFIG, + null, + null, + null, + null, + null, + null + ); + ObjectMapper objectMapper = createObjectMapper(); + String serialized = objectMapper.writeValueAsString(target); + KinesisIndexTask deserialized = objectMapper.readValue(serialized, KinesisIndexTask.class); + + AWSCredentialsConfig awsCredentialsConfig = deserialized.getAwsCredentialsConfig(); + Assert.assertEquals(ACCESS_KEY, awsCredentialsConfig.getAccessKey().getPassword()); + Assert.assertEquals(SECRET_KEY, awsCredentialsConfig.getSecretKey().getPassword()); + Assert.assertEquals(FILE_SESSION_CREDENTIALS, awsCredentialsConfig.getFileSessionCredentials()); + } + + private static ObjectMapper createObjectMapper() + { + DruidModule module = new KinesisIndexingServiceModule(); + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + Arrays.asList( + module, + (Module) binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000); + binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); + binder.bind(RowIngestionMetersFactory.class).toInstance(new DropwizardRowIngestionMetersFactory()); + binder.bind(AppenderatorsManager.class).toInstance(new TestAppenderatorsManager()); + } + ) + ); + ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class); + module.getJacksonModules().forEach(objectMapper::registerModule); + return objectMapper; + } +} diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index a334a790bdb..c214def6436 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.name.Named; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.MapCache; @@ -2939,7 +2940,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, - @JacksonInject AWSCredentialsConfig awsCredentialsConfig, + @JacksonInject @Named(KinesisIndexingServiceModule.AWS_SCOPE) AWSCredentialsConfig awsCredentialsConfig, @JacksonInject AppenderatorsManager appenderatorsManager ) {