diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java index d63d08833cd..b8311a677b2 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java @@ -20,6 +20,7 @@ package org.apache.druid.testing.utils; import com.google.common.collect.ImmutableList; +import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreatePartitionsResult; @@ -39,11 +40,12 @@ public class KafkaAdminClient implements StreamAdminClient { private AdminClient adminClient; - public KafkaAdminClient(String kafkaInternalHost) + public KafkaAdminClient(IntegrationTestingConfig config) { - Properties config = new Properties(); - config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaInternalHost); - adminClient = AdminClient.create(config); + Properties properties = new Properties(); + KafkaUtil.addPropertiesFromTestConfig(config, properties); + properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafkaHost()); + adminClient = AdminClient.create(properties); } @Override diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java index a9d6da9419c..a505285d6de 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java @@ -29,14 +29,12 @@ import org.apache.kafka.common.serialization.StringSerializer; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class KafkaEventWriter implements StreamEventWriter { - private static final String TEST_PROPERTY_PREFIX = "kafka.test.property."; private final KafkaProducer producer; private final boolean txnEnabled; private final List> pendingWriteRecords = new ArrayList<>(); @@ -44,7 +42,7 @@ public class KafkaEventWriter implements StreamEventWriter public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled) { Properties properties = new Properties(); - addFilteredProperties(config, properties); + KafkaUtil.addPropertiesFromTestConfig(config, properties); properties.setProperty("bootstrap.servers", config.getKafkaHost()); properties.setProperty("acks", "all"); properties.setProperty("retries", "3"); @@ -135,13 +133,4 @@ public class KafkaEventWriter implements StreamEventWriter throw new RuntimeException(e); } } - - private void addFilteredProperties(IntegrationTestingConfig config, Properties properties) - { - for (Map.Entry entry : config.getProperties().entrySet()) { - if (entry.getKey().startsWith(TEST_PROPERTY_PREFIX)) { - properties.setProperty(entry.getKey().substring(TEST_PROPERTY_PREFIX.length()), entry.getValue()); - } - } - } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java new file mode 100644 index 00000000000..36534c2a8a8 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import org.apache.druid.testing.IntegrationTestingConfig; + +import java.util.Map; +import java.util.Properties; + +public class KafkaUtil +{ + private static final String TEST_PROPERTY_PREFIX = "kafka.test.property."; + + public static void addPropertiesFromTestConfig(IntegrationTestingConfig config, Properties properties) + { + for (Map.Entry entry : config.getProperties().entrySet()) { + if (entry.getKey().startsWith(TEST_PROPERTY_PREFIX)) { + properties.setProperty(entry.getKey().substring(TEST_PROPERTY_PREFIX.length()), entry.getValue()); + } + } + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index 9d1a69bec23..cc2a22f827d 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.utils.KafkaAdminClient; import org.apache.druid.testing.utils.KafkaEventWriter; +import org.apache.druid.testing.utils.KafkaUtil; import org.apache.druid.testing.utils.StreamAdminClient; import org.apache.druid.testing.utils.StreamEventWriter; @@ -37,7 +38,7 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd @Override StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) { - return new KafkaAdminClient(config.getKafkaHost()); + return new KafkaAdminClient(config); } @Override @@ -57,6 +58,7 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd { final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Properties consumerProperties = new Properties(); + KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties); consumerProperties.putAll(consumerConfigs); consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost()); return spec -> {