adjustments to Kafka integration tests to allow running against Azure Event Hubs streams (#10463)

* adjustments to kafka integration tests to allow running against azure event hubs in kafka mode

* oops

* make better

* more better
This commit is contained in:
Clint Wylie 2020-10-05 08:54:29 -07:00 committed by GitHub
parent 1c77491da6
commit 307c1b0720
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 73 additions and 6 deletions

View File

@ -21,8 +21,13 @@ package org.apache.druid.testing;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -54,6 +59,10 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
@JsonProperty
private String streamEndpoint;
@JsonProperty
@JsonDeserialize(using = ArbitraryPropertiesJsonDeserializer.class)
private Map<String, String> properties = new HashMap<>();
@Override
public IntegrationTestingConfig get()
{
@ -190,7 +199,7 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
@Override
public String getProperty(String prop)
{
throw new UnsupportedOperationException("DockerConfigProvider does not support property " + prop);
return properties.get(prop);
}
@Override
@ -208,7 +217,7 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
@Override
public Map<String, String> getProperties()
{
return new HashMap<>();
return properties;
}
@Override
@ -260,4 +269,34 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
}
};
}
// there is probably a better way to do this...
static class ArbitraryPropertiesJsonDeserializer extends JsonDeserializer<Map<String, String>>
{
@Override
public Map<String, String> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException
{
// given some config input, such as
// druid.test.config.properites.a.b.c=d
// calling jsonParser.readValueAs(Map.class) here results in a map that has both nested objects and also
// flattened string pairs, so the map looks something like this (in JSON form):
// {
// "a" : { "b": { "c" : "d" }}},
// "a.b.c":"d"
// }
// The string pairs are the values we want to populate this map with, so filtering out the top level keys which
// do not have string values leaves us with
// { "a.b.c":"d"}
// from the given example, which is what we want
Map<String, Object> parsed = jsonParser.readValueAs(Map.class);
Map<String, String> flat = new HashMap<>();
for (Map.Entry<String, Object> entry : parsed.entrySet()) {
if (!(entry.getValue() instanceof Map)) {
flat.put(entry.getKey(), (String) entry.getValue());
}
}
return flat;
}
}
}

View File

@ -43,8 +43,8 @@ public class KafkaAdminClient implements StreamAdminClient
public KafkaAdminClient(IntegrationTestingConfig config)
{
Properties properties = new Properties();
KafkaUtil.addPropertiesFromTestConfig(config, properties);
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafkaHost());
KafkaUtil.addPropertiesFromTestConfig(config, properties);
adminClient = AdminClient.create(properties);
}

View File

@ -42,7 +42,6 @@ public class KafkaEventWriter implements StreamEventWriter
public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled)
{
Properties properties = new Properties();
KafkaUtil.addPropertiesFromTestConfig(config, properties);
properties.setProperty("bootstrap.servers", config.getKafkaHost());
properties.setProperty("acks", "all");
properties.setProperty("retries", "3");
@ -53,6 +52,7 @@ public class KafkaEventWriter implements StreamEventWriter
properties.setProperty("enable.idempotence", "true");
properties.setProperty("transactional.id", IdUtils.getRandomId());
}
KafkaUtil.addPropertiesFromTestConfig(config, properties);
this.producer = new KafkaProducer<>(
properties,
new StringSerializer(),

View File

@ -21,12 +21,16 @@ package org.apache.druid.testing.utils;
import org.apache.druid.testing.IntegrationTestingConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KafkaUtil
{
private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
private static final String TEST_CONFIG_PROPERTY_PREFIX = "kafka.test.config.";
public static final String TEST_CONFIG_TRANSACTION_ENABLED = "transactionEnabled";
public static void addPropertiesFromTestConfig(IntegrationTestingConfig config, Properties properties)
{
@ -36,4 +40,15 @@ public class KafkaUtil
}
}
}
public static Map<String, String> getAdditionalKafkaTestConfigFromProperties(IntegrationTestingConfig config)
{
Map<String, String> theMap = new HashMap<>();
for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
if (entry.getKey().startsWith(TEST_CONFIG_PROPERTY_PREFIX)) {
theMap.put(entry.getKey().substring(TEST_CONFIG_PROPERTY_PREFIX.length()), entry.getValue());
}
}
return theMap;
}
}

View File

@ -58,9 +58,9 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
{
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties consumerProperties = new Properties();
KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
consumerProperties.putAll(consumerConfigs);
consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
return spec -> {
try {
spec = StringUtils.replace(

View File

@ -22,7 +22,9 @@ package org.apache.druid.tests.parallelized;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.KafkaUtil;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
@ -78,6 +80,9 @@ public class ITKafkaIndexingServiceDataFormatTest extends AbstractKafkaIndexingS
@Inject
private @Json ObjectMapper jsonMapper;
@Inject
private IntegrationTestingConfig config;
@BeforeClass
public void beforeClass() throws Exception
{
@ -88,7 +93,15 @@ public class ITKafkaIndexingServiceDataFormatTest extends AbstractKafkaIndexingS
public void testIndexData(boolean transactionEnabled, String serializerPath, String parserType, String specPath)
throws Exception
{
doTestIndexDataStableState(transactionEnabled, serializerPath, parserType, specPath);
Map<String, String> testConfig = KafkaUtil.getAdditionalKafkaTestConfigFromProperties(config);
boolean txnEnable = Boolean.parseBoolean(
testConfig.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED, "false")
);
if (txnEnable != transactionEnabled) {
// do nothing
return;
}
doTestIndexDataStableState(txnEnable, serializerPath, parserType, specPath);
}
@Override