Add additional properties for Kafka AdminClient and consumer from test config file (#10137)

* Add kafka test configs from file for AdminClient and consumer

* review comment
This commit is contained in:
Nishant Bangarwa 2020-07-13 18:16:05 +05:30 committed by GitHub
parent f3023c6058
commit 2b48de074a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 17 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.testing.utils;
import com.google.common.collect.ImmutableList;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
@ -39,11 +40,12 @@ public class KafkaAdminClient implements StreamAdminClient
{
private AdminClient adminClient;
public KafkaAdminClient(String kafkaInternalHost)
public KafkaAdminClient(IntegrationTestingConfig config)
{
Properties config = new Properties();
config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaInternalHost);
adminClient = AdminClient.create(config);
Properties properties = new Properties();
KafkaUtil.addPropertiesFromTestConfig(config, properties);
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafkaHost());
adminClient = AdminClient.create(properties);
}
@Override

View File

@ -29,14 +29,12 @@ import org.apache.kafka.common.serialization.StringSerializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class KafkaEventWriter implements StreamEventWriter
{
private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
private final KafkaProducer<String, byte[]> producer;
private final boolean txnEnabled;
private final List<Future<RecordMetadata>> pendingWriteRecords = new ArrayList<>();
@ -44,7 +42,7 @@ public class KafkaEventWriter implements StreamEventWriter
public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled)
{
Properties properties = new Properties();
addFilteredProperties(config, properties);
KafkaUtil.addPropertiesFromTestConfig(config, properties);
properties.setProperty("bootstrap.servers", config.getKafkaHost());
properties.setProperty("acks", "all");
properties.setProperty("retries", "3");
@ -135,13 +133,4 @@ public class KafkaEventWriter implements StreamEventWriter
throw new RuntimeException(e);
}
}
private void addFilteredProperties(IntegrationTestingConfig config, Properties properties)
{
for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
if (entry.getKey().startsWith(TEST_PROPERTY_PREFIX)) {
properties.setProperty(entry.getKey().substring(TEST_PROPERTY_PREFIX.length()), entry.getValue());
}
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import org.apache.druid.testing.IntegrationTestingConfig;
import java.util.Map;
import java.util.Properties;
public class KafkaUtil
{
private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
public static void addPropertiesFromTestConfig(IntegrationTestingConfig config, Properties properties)
{
for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
if (entry.getKey().startsWith(TEST_PROPERTY_PREFIX)) {
properties.setProperty(entry.getKey().substring(TEST_PROPERTY_PREFIX.length()), entry.getValue());
}
}
}
}

View File

@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.KafkaAdminClient;
import org.apache.druid.testing.utils.KafkaEventWriter;
import org.apache.druid.testing.utils.KafkaUtil;
import org.apache.druid.testing.utils.StreamAdminClient;
import org.apache.druid.testing.utils.StreamEventWriter;
@ -37,7 +38,7 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
@Override
StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config)
{
return new KafkaAdminClient(config.getKafkaHost());
return new KafkaAdminClient(config);
}
@Override
@ -57,6 +58,7 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
{
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties consumerProperties = new Properties();
KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
consumerProperties.putAll(consumerConfigs);
consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
return spec -> {