[Integration-tests] Allow specifying additional properties while connecting to kafka (#5185)

* [Integration-tests] Allow specifying additional properties while connecting to kafka

Use Case - For running integration tests with a secure kafka cluster we
need to set additional properties.

This PR adds a way to pass in additional properties to kafka
consumer/producer used in kafka integration tests.
Additionally it also adds a flag to skip creating/deleting kafka topic
from tests since not all secure kafka clusters allow all users to
manage topics.

* fix check style
This commit is contained in:
Nishant Bangarwa 2017-12-22 01:21:37 +05:30 committed by Himanshu
parent 0f5c7d1aec
commit 6582b16a81
7 changed files with 117 additions and 33 deletions

View File

@ -179,6 +179,18 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
{
return password;
}
@Override
public Map<String, String> getProperties()
{
return props;
}
@Override
public boolean manageKafkaTopic()
{
return Boolean.valueOf(props.getOrDefault("manageKafkaTopic", "true"));
}
};
}
}

View File

@ -23,6 +23,8 @@ package io.druid.testing;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.Map;
public class DockerConfigProvider implements IntegrationTestingConfigProvider
{
@ -109,6 +111,18 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
{
return null;
}
@Override
public Map<String, String> getProperties()
{
return new HashMap<>();
}
@Override
public boolean manageKafkaTopic()
{
return true;
}
};
}
}

View File

@ -19,6 +19,8 @@
package io.druid.testing;
import java.util.Map;
/**
*/
public interface IntegrationTestingConfig
@ -44,4 +46,8 @@ public interface IntegrationTestingConfig
String getUsername();
String getPassword();
Map<String, String> getProperties();
boolean manageKafkaTopic();
}

View File

@ -48,6 +48,7 @@ import org.testng.annotations.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
@ -64,6 +65,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
private static final String TOPIC_NAME = "kafka_indexing_service_topic";
private static final int NUM_EVENTS_TO_SEND = 60;
private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L;
public static final String testPropertyPrefix = "kafka.test.property.";
// We'll fill in the current time and numbers for added, deleted and changed
// before sending the event.
@ -117,10 +119,19 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
ZKStringSerializer$.MODULE$
);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false);
int numPartitions = 4;
int replicationFactor = 1;
Properties topicConfig = new Properties();
AdminUtils.createTopic(zkUtils, TOPIC_NAME, numPartitions, replicationFactor, topicConfig, RackAwareMode.Disabled$.MODULE$);
if (config.manageKafkaTopic()) {
int numPartitions = 4;
int replicationFactor = 1;
Properties topicConfig = new Properties();
AdminUtils.createTopic(
zkUtils,
TOPIC_NAME,
numPartitions,
replicationFactor,
topicConfig,
RackAwareMode.Disabled$.MODULE$
);
}
}
catch (Exception e) {
throw new ISE(e, "could not create kafka topic");
@ -129,10 +140,14 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
String spec;
try {
LOG.info("supervisorSpec name: [%s]", INDEXER_FILE);
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", config.getKafkaHost());
addFilteredProperties(consumerProperties);
spec = getTaskAsString(INDEXER_FILE)
.replaceAll("%%DATASOURCE%%", DATASOURCE)
.replaceAll("%%TOPIC%%", TOPIC_NAME)
.replaceAll("%%KAFKA_BROKER%%", config.getKafkaHost());
.replaceAll("%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties));
LOG.info("supervisorSpec: [%s]\n", spec);
}
catch (Exception e) {
@ -146,6 +161,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
// set up kafka producer
Properties properties = new Properties();
addFilteredProperties(properties);
properties.put("bootstrap.servers", config.getKafkaHost());
LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost());
properties.put("acks", "all");
@ -285,13 +301,23 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
public void afterClass() throws Exception
{
LOG.info("teardown");
// delete kafka topic
AdminUtils.deleteTopic(zkUtils, TOPIC_NAME);
if (config.manageKafkaTopic()) {
// delete kafka topic
AdminUtils.deleteTopic(zkUtils, TOPIC_NAME);
}
// remove segments
if (segmentsExist) {
unloadAndKillData(DATASOURCE);
}
}
public void addFilteredProperties(Properties properties)
{
for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
if (entry.getKey().startsWith(testPropertyPrefix)) {
properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue());
}
}
}
}

View File

@ -48,6 +48,7 @@ import org.testng.annotations.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
@ -64,6 +65,8 @@ public class ITKafkaTest extends AbstractIndexerTest
private static final String DATASOURCE = "kafka_test";
private static final String TOPIC_NAME = "kafkaTopic";
private static final int MINUTES_TO_SEND = 2;
public static final String testPropertyPrefix = "kafka.test.property.";
// We'll fill in the current time and numbers for added, deleted and changed
// before sending the event.
@ -117,10 +120,20 @@ public class ITKafkaTest extends AbstractIndexerTest
ZKStringSerializer$.MODULE$
);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false);
int numPartitions = 1;
int replicationFactor = 1;
Properties topicConfig = new Properties();
AdminUtils.createTopic(zkUtils, TOPIC_NAME, numPartitions, replicationFactor, topicConfig, RackAwareMode.Disabled$.MODULE$);
if (config.manageKafkaTopic()) {
int numPartitions = 1;
int replicationFactor = 1;
Properties topicConfig = new Properties();
// addFilteredProperties(topicConfig);
AdminUtils.createTopic(
zkUtils,
TOPIC_NAME,
numPartitions,
replicationFactor,
topicConfig,
RackAwareMode.Disabled$.MODULE$
);
}
}
catch (Exception e) {
throw new ISE(e, "could not create kafka topic");
@ -128,6 +141,7 @@ public class ITKafkaTest extends AbstractIndexerTest
// set up kafka producer
Properties properties = new Properties();
addFilteredProperties(properties);
properties.put("bootstrap.servers", config.getKafkaHost());
LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost());
properties.put("acks", "all");
@ -179,12 +193,25 @@ public class ITKafkaTest extends AbstractIndexerTest
// replace temp strings in indexer file
try {
LOG.info("indexerFile name: [%s]", INDEXER_FILE);
Properties consumerProperties = new Properties();
consumerProperties.put("zookeeper.connect", config.getZookeeperHosts());
consumerProperties.put("zookeeper.connection.timeout.ms", "15000");
consumerProperties.put("zookeeper.sync.time.ms", "5000");
consumerProperties.put("group.id", Long.toString(System.currentTimeMillis()));
consumerProperties.put("zookeeper.sync.time.ms", "5000");
consumerProperties.put("fetch.message.max.bytes", "1048586");
consumerProperties.put("auto.offset.reset", "smallest");
consumerProperties.put("auto.commit.enable", "false");
addFilteredProperties(consumerProperties);
indexerSpec = getTaskAsString(INDEXER_FILE)
.replaceAll("%%DATASOURCE%%", DATASOURCE)
.replaceAll("%%TOPIC%%", TOPIC_NAME)
.replaceAll("%%ZOOKEEPER_SERVER%%", config.getZookeeperHosts())
.replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis()))
.replaceAll("%%COUNT%%", Integer.toString(num_events));
.replaceAll("%%COUNT%%", Integer.toString(num_events))
.replaceAll("%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties));
LOG.info("indexerFile: [%s]\n", indexerSpec);
}
catch (Exception e) {
@ -239,11 +266,11 @@ public class ITKafkaTest extends AbstractIndexerTest
String queryStr = query_response_template
.replaceAll("%%DATASOURCE%%", DATASOURCE)
// time boundary
// time boundary
.replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast))
.replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst))
// time series
// time series
.replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst))
.replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtFirst.plusMinutes(MINUTES_TO_SEND + 2)))
.replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
@ -263,14 +290,24 @@ public class ITKafkaTest extends AbstractIndexerTest
public void afterClass() throws Exception
{
LOG.info("teardown");
// delete kafka topic
AdminUtils.deleteTopic(zkUtils, TOPIC_NAME);
if (config.manageKafkaTopic()) {
// delete kafka topic
AdminUtils.deleteTopic(zkUtils, TOPIC_NAME);
}
// remove segments
if (segmentsExist) {
unloadAndKillData(DATASOURCE);
}
}
public void addFilteredProperties(Properties properties)
{
for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
if (entry.getKey().startsWith(testPropertyPrefix)) {
properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue());
}
}
}
}

View File

@ -52,16 +52,7 @@
"count": "%%COUNT%%",
"delegate": {
"type": "kafka-0.8",
"consumerProps": {
"zookeeper.connect": "%%ZOOKEEPER_SERVER%%",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "%%GROUP_ID%%",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "smallest",
"auto.commit.enable": "false"
},
"consumerProps": %%CONSUMER_PROPERTIES%%,
"feed": "%%TOPIC%%"
}
}

View File

@ -52,9 +52,7 @@
},
"ioConfig": {
"topic": "%%TOPIC%%",
"consumerProperties": {
"bootstrap.servers": "%%KAFKA_BROKER%%"
},
"consumerProperties": %%CONSUMER_PROPERTIES%%,
"taskCount": 2,
"replicas": 1,
"taskDuration": "PT2M",