From 6582b16a81020307cd13c7b7f5aa4e204abab617 Mon Sep 17 00:00:00 2001 From: Nishant Bangarwa Date: Fri, 22 Dec 2017 01:21:37 +0530 Subject: [PATCH] [Integration-tests] Allow specifying additional properties while connecting to kafka (#5185) * [Integration-tests] Allow specifying additional properties while connecting to kafka Use Case - For running integration tests with a secure kafka cluster we need to set additional properties. This PR adds a way to pass in additional properties to kafka consumer/producer used in kafka integration tests. Additionally it also adds a flag to skip creating/deleting kafka topic from tests since not all secure kafka clusters allow all users to manage topics. * fix check style --- .../testing/ConfigFileConfigProvider.java | 12 ++++ .../druid/testing/DockerConfigProvider.java | 14 +++++ .../testing/IntegrationTestingConfig.java | 6 ++ .../indexer/ITKafkaIndexingServiceTest.java | 42 ++++++++++--- .../io/druid/tests/indexer/ITKafkaTest.java | 61 +++++++++++++++---- .../resources/indexer/kafka_index_task.json | 11 +--- .../indexer/kafka_supervisor_spec.json | 4 +- 7 files changed, 117 insertions(+), 33 deletions(-) diff --git a/integration-tests/src/main/java/io/druid/testing/ConfigFileConfigProvider.java b/integration-tests/src/main/java/io/druid/testing/ConfigFileConfigProvider.java index afa8a4da558..aee84778371 100644 --- a/integration-tests/src/main/java/io/druid/testing/ConfigFileConfigProvider.java +++ b/integration-tests/src/main/java/io/druid/testing/ConfigFileConfigProvider.java @@ -179,6 +179,18 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide { return password; } + + @Override + public Map getProperties() + { + return props; + } + + @Override + public boolean manageKafkaTopic() + { + return Boolean.valueOf(props.getOrDefault("manageKafkaTopic", "true")); + } }; } } diff --git a/integration-tests/src/main/java/io/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/io/druid/testing/DockerConfigProvider.java index 3763039fd6f..0ade7e69bc0 100644 --- a/integration-tests/src/main/java/io/druid/testing/DockerConfigProvider.java +++ b/integration-tests/src/main/java/io/druid/testing/DockerConfigProvider.java @@ -23,6 +23,8 @@ package io.druid.testing; import com.fasterxml.jackson.annotation.JsonProperty; import javax.validation.constraints.NotNull; +import java.util.HashMap; +import java.util.Map; public class DockerConfigProvider implements IntegrationTestingConfigProvider { @@ -109,6 +111,18 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider { return null; } + + @Override + public Map getProperties() + { + return new HashMap<>(); + } + + @Override + public boolean manageKafkaTopic() + { + return true; + } }; } } diff --git a/integration-tests/src/main/java/io/druid/testing/IntegrationTestingConfig.java b/integration-tests/src/main/java/io/druid/testing/IntegrationTestingConfig.java index dc33381b91e..17e92019e28 100644 --- a/integration-tests/src/main/java/io/druid/testing/IntegrationTestingConfig.java +++ b/integration-tests/src/main/java/io/druid/testing/IntegrationTestingConfig.java @@ -19,6 +19,8 @@ package io.druid.testing; +import java.util.Map; + /** */ public interface IntegrationTestingConfig @@ -44,4 +46,8 @@ public interface IntegrationTestingConfig String getUsername(); String getPassword(); + + Map getProperties(); + + boolean manageKafkaTopic(); } diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaIndexingServiceTest.java index 320c3f20e76..2fe7a27b290 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaIndexingServiceTest.java @@ -48,6 +48,7 @@ import org.testng.annotations.Test; import java.io.IOException; import java.io.InputStream; +import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; @@ -64,6 +65,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest private static final String TOPIC_NAME = "kafka_indexing_service_topic"; private static final int NUM_EVENTS_TO_SEND = 60; private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L; + public static final String testPropertyPrefix = "kafka.test.property."; // We'll fill in the current time and numbers for added, deleted and changed // before sending the event. @@ -117,10 +119,19 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest ZKStringSerializer$.MODULE$ ); zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false); - int numPartitions = 4; - int replicationFactor = 1; - Properties topicConfig = new Properties(); - AdminUtils.createTopic(zkUtils, TOPIC_NAME, numPartitions, replicationFactor, topicConfig, RackAwareMode.Disabled$.MODULE$); + if (config.manageKafkaTopic()) { + int numPartitions = 4; + int replicationFactor = 1; + Properties topicConfig = new Properties(); + AdminUtils.createTopic( + zkUtils, + TOPIC_NAME, + numPartitions, + replicationFactor, + topicConfig, + RackAwareMode.Disabled$.MODULE$ + ); + } } catch (Exception e) { throw new ISE(e, "could not create kafka topic"); @@ -129,10 +140,14 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest String spec; try { LOG.info("supervisorSpec name: [%s]", INDEXER_FILE); + Properties consumerProperties = new Properties(); + consumerProperties.put("bootstrap.servers", config.getKafkaHost()); + addFilteredProperties(consumerProperties); + spec = getTaskAsString(INDEXER_FILE) .replaceAll("%%DATASOURCE%%", DATASOURCE) .replaceAll("%%TOPIC%%", TOPIC_NAME) - .replaceAll("%%KAFKA_BROKER%%", config.getKafkaHost()); + .replaceAll("%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties)); LOG.info("supervisorSpec: [%s]\n", spec); } catch (Exception e) { @@ -146,6 +161,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest // set up kafka producer Properties properties = new Properties(); + addFilteredProperties(properties); properties.put("bootstrap.servers", config.getKafkaHost()); LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost()); properties.put("acks", "all"); @@ -285,13 +301,23 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest public void afterClass() throws Exception { LOG.info("teardown"); - - // delete kafka topic - AdminUtils.deleteTopic(zkUtils, TOPIC_NAME); + if (config.manageKafkaTopic()) { + // delete kafka topic + AdminUtils.deleteTopic(zkUtils, TOPIC_NAME); + } // remove segments if (segmentsExist) { unloadAndKillData(DATASOURCE); } } + + public void addFilteredProperties(Properties properties) + { + for (Map.Entry entry : config.getProperties().entrySet()) { + if (entry.getKey().startsWith(testPropertyPrefix)) { + properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue()); + } + } + } } diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaTest.java index 5f6e162d69e..c4db5f6932c 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaTest.java @@ -48,6 +48,7 @@ import org.testng.annotations.Test; import java.io.IOException; import java.io.InputStream; +import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; @@ -64,6 +65,8 @@ public class ITKafkaTest extends AbstractIndexerTest private static final String DATASOURCE = "kafka_test"; private static final String TOPIC_NAME = "kafkaTopic"; private static final int MINUTES_TO_SEND = 2; + public static final String testPropertyPrefix = "kafka.test.property."; + // We'll fill in the current time and numbers for added, deleted and changed // before sending the event. @@ -117,10 +120,20 @@ public class ITKafkaTest extends AbstractIndexerTest ZKStringSerializer$.MODULE$ ); zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false); - int numPartitions = 1; - int replicationFactor = 1; - Properties topicConfig = new Properties(); - AdminUtils.createTopic(zkUtils, TOPIC_NAME, numPartitions, replicationFactor, topicConfig, RackAwareMode.Disabled$.MODULE$); + if (config.manageKafkaTopic()) { + int numPartitions = 1; + int replicationFactor = 1; + Properties topicConfig = new Properties(); + // addFilteredProperties(topicConfig); + AdminUtils.createTopic( + zkUtils, + TOPIC_NAME, + numPartitions, + replicationFactor, + topicConfig, + RackAwareMode.Disabled$.MODULE$ + ); + } } catch (Exception e) { throw new ISE(e, "could not create kafka topic"); @@ -128,6 +141,7 @@ public class ITKafkaTest extends AbstractIndexerTest // set up kafka producer Properties properties = new Properties(); + addFilteredProperties(properties); properties.put("bootstrap.servers", config.getKafkaHost()); LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost()); properties.put("acks", "all"); @@ -179,12 +193,25 @@ public class ITKafkaTest extends AbstractIndexerTest // replace temp strings in indexer file try { LOG.info("indexerFile name: [%s]", INDEXER_FILE); + + Properties consumerProperties = new Properties(); + consumerProperties.put("zookeeper.connect", config.getZookeeperHosts()); + consumerProperties.put("zookeeper.connection.timeout.ms", "15000"); + consumerProperties.put("zookeeper.sync.time.ms", "5000"); + consumerProperties.put("group.id", Long.toString(System.currentTimeMillis())); + consumerProperties.put("zookeeper.sync.time.ms", "5000"); + consumerProperties.put("fetch.message.max.bytes", "1048586"); + consumerProperties.put("auto.offset.reset", "smallest"); + consumerProperties.put("auto.commit.enable", "false"); + + addFilteredProperties(consumerProperties); + indexerSpec = getTaskAsString(INDEXER_FILE) .replaceAll("%%DATASOURCE%%", DATASOURCE) .replaceAll("%%TOPIC%%", TOPIC_NAME) - .replaceAll("%%ZOOKEEPER_SERVER%%", config.getZookeeperHosts()) - .replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis())) - .replaceAll("%%COUNT%%", Integer.toString(num_events)); + .replaceAll("%%COUNT%%", Integer.toString(num_events)) + .replaceAll("%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties)); + LOG.info("indexerFile: [%s]\n", indexerSpec); } catch (Exception e) { @@ -239,11 +266,11 @@ public class ITKafkaTest extends AbstractIndexerTest String queryStr = query_response_template .replaceAll("%%DATASOURCE%%", DATASOURCE) - // time boundary + // time boundary .replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)) .replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)) .replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst)) - // time series + // time series .replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst)) .replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtFirst.plusMinutes(MINUTES_TO_SEND + 2))) .replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)) @@ -263,14 +290,24 @@ public class ITKafkaTest extends AbstractIndexerTest public void afterClass() throws Exception { LOG.info("teardown"); - - // delete kafka topic - AdminUtils.deleteTopic(zkUtils, TOPIC_NAME); + if (config.manageKafkaTopic()) { + // delete kafka topic + AdminUtils.deleteTopic(zkUtils, TOPIC_NAME); + } // remove segments if (segmentsExist) { unloadAndKillData(DATASOURCE); } } + + public void addFilteredProperties(Properties properties) + { + for (Map.Entry entry : config.getProperties().entrySet()) { + if (entry.getKey().startsWith(testPropertyPrefix)) { + properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue()); + } + } + } } diff --git a/integration-tests/src/test/resources/indexer/kafka_index_task.json b/integration-tests/src/test/resources/indexer/kafka_index_task.json index 9674570c805..f580d46a500 100644 --- a/integration-tests/src/test/resources/indexer/kafka_index_task.json +++ b/integration-tests/src/test/resources/indexer/kafka_index_task.json @@ -52,16 +52,7 @@ "count": "%%COUNT%%", "delegate": { "type": "kafka-0.8", - "consumerProps": { - "zookeeper.connect": "%%ZOOKEEPER_SERVER%%", - "zookeeper.connection.timeout.ms" : "15000", - "zookeeper.session.timeout.ms" : "15000", - "zookeeper.sync.time.ms" : "5000", - "group.id": "%%GROUP_ID%%", - "fetch.message.max.bytes" : "1048586", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, + "consumerProps": %%CONSUMER_PROPERTIES%%, "feed": "%%TOPIC%%" } } diff --git a/integration-tests/src/test/resources/indexer/kafka_supervisor_spec.json b/integration-tests/src/test/resources/indexer/kafka_supervisor_spec.json index ceec1befb0e..511b65dcffc 100644 --- a/integration-tests/src/test/resources/indexer/kafka_supervisor_spec.json +++ b/integration-tests/src/test/resources/indexer/kafka_supervisor_spec.json @@ -52,9 +52,7 @@ }, "ioConfig": { "topic": "%%TOPIC%%", - "consumerProperties": { - "bootstrap.servers": "%%KAFKA_BROKER%%" - }, + "consumerProperties": %%CONSUMER_PROPERTIES%%, "taskCount": 2, "replicas": 1, "taskDuration": "PT2M",