From 307c1b072006115dc0780913bb3fd7d1831849b4 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 5 Oct 2020 08:54:29 -0700 Subject: [PATCH] adjustments to Kafka integration tests to allow running against Azure Event Hubs streams (#10463) * adjustments to kafka integration tests to allow running against azure event hubs in kafka mode * oops * make better * more better --- .../druid/testing/DockerConfigProvider.java | 43 ++++++++++++++++++- .../druid/testing/utils/KafkaAdminClient.java | 2 +- .../druid/testing/utils/KafkaEventWriter.java | 2 +- .../apache/druid/testing/utils/KafkaUtil.java | 15 +++++++ .../AbstractKafkaIndexingServiceTest.java | 2 +- .../ITKafkaIndexingServiceDataFormatTest.java | 15 ++++++- 6 files changed, 73 insertions(+), 6 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java index 11c540fb69d..67266b068c5 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java @@ -21,8 +21,13 @@ package org.apache.druid.testing; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import javax.validation.constraints.NotNull; +import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -54,6 +59,10 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider @JsonProperty private String streamEndpoint; + @JsonProperty + @JsonDeserialize(using = ArbitraryPropertiesJsonDeserializer.class) + private Map properties = new HashMap<>(); + @Override public IntegrationTestingConfig get() { @@ -190,7 +199,7 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider @Override public String getProperty(String prop) { - throw new UnsupportedOperationException("DockerConfigProvider does not support property " + prop); + return properties.get(prop); } @Override @@ -208,7 +217,7 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider @Override public Map getProperties() { - return new HashMap<>(); + return properties; } @Override @@ -260,4 +269,34 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider } }; } + + // there is probably a better way to do this... + static class ArbitraryPropertiesJsonDeserializer extends JsonDeserializer> + { + @Override + public Map deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException + { + // given some config input, such as + // druid.test.config.properites.a.b.c=d + // calling jsonParser.readValueAs(Map.class) here results in a map that has both nested objects and also + // flattened string pairs, so the map looks something like this (in JSON form): + // { + // "a" : { "b": { "c" : "d" }}}, + // "a.b.c":"d" + // } + // The string pairs are the values we want to populate this map with, so filtering out the top level keys which + // do not have string values leaves us with + // { "a.b.c":"d"} + // from the given example, which is what we want + Map parsed = jsonParser.readValueAs(Map.class); + Map flat = new HashMap<>(); + for (Map.Entry entry : parsed.entrySet()) { + if (!(entry.getValue() instanceof Map)) { + flat.put(entry.getKey(), (String) entry.getValue()); + } + } + return flat; + } + } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java index b8311a677b2..84993f22d6f 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java @@ -43,8 +43,8 @@ public class KafkaAdminClient implements StreamAdminClient public KafkaAdminClient(IntegrationTestingConfig config) { Properties properties = new Properties(); - KafkaUtil.addPropertiesFromTestConfig(config, properties); properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafkaHost()); + KafkaUtil.addPropertiesFromTestConfig(config, properties); adminClient = AdminClient.create(properties); } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java index 1d5973c4782..79ae219eaa9 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java @@ -42,7 +42,6 @@ public class KafkaEventWriter implements StreamEventWriter public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled) { Properties properties = new Properties(); - KafkaUtil.addPropertiesFromTestConfig(config, properties); properties.setProperty("bootstrap.servers", config.getKafkaHost()); properties.setProperty("acks", "all"); properties.setProperty("retries", "3"); @@ -53,6 +52,7 @@ public class KafkaEventWriter implements StreamEventWriter properties.setProperty("enable.idempotence", "true"); properties.setProperty("transactional.id", IdUtils.getRandomId()); } + KafkaUtil.addPropertiesFromTestConfig(config, properties); this.producer = new KafkaProducer<>( properties, new StringSerializer(), diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java index 36534c2a8a8..0f7e9fad5df 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java @@ -21,12 +21,16 @@ package org.apache.druid.testing.utils; import org.apache.druid.testing.IntegrationTestingConfig; +import java.util.HashMap; import java.util.Map; import java.util.Properties; public class KafkaUtil { private static final String TEST_PROPERTY_PREFIX = "kafka.test.property."; + private static final String TEST_CONFIG_PROPERTY_PREFIX = "kafka.test.config."; + + public static final String TEST_CONFIG_TRANSACTION_ENABLED = "transactionEnabled"; public static void addPropertiesFromTestConfig(IntegrationTestingConfig config, Properties properties) { @@ -36,4 +40,15 @@ public class KafkaUtil } } } + + public static Map getAdditionalKafkaTestConfigFromProperties(IntegrationTestingConfig config) + { + Map theMap = new HashMap<>(); + for (Map.Entry entry : config.getProperties().entrySet()) { + if (entry.getKey().startsWith(TEST_CONFIG_PROPERTY_PREFIX)) { + theMap.put(entry.getKey().substring(TEST_CONFIG_PROPERTY_PREFIX.length()), entry.getValue()); + } + } + return theMap; + } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index cc2a22f827d..204b6ef7259 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -58,9 +58,9 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd { final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Properties consumerProperties = new Properties(); - KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties); consumerProperties.putAll(consumerConfigs); consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost()); + KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties); return spec -> { try { spec = StringUtils.replace( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java index 9143d9b38b7..2286d237855 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java @@ -22,7 +22,9 @@ package org.apache.druid.tests.parallelized; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.testing.utils.KafkaUtil; import org.apache.druid.tests.TestNGGroup; import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest; import org.apache.druid.tests.indexer.AbstractStreamIndexingTest; @@ -78,6 +80,9 @@ public class ITKafkaIndexingServiceDataFormatTest extends AbstractKafkaIndexingS @Inject private @Json ObjectMapper jsonMapper; + @Inject + private IntegrationTestingConfig config; + @BeforeClass public void beforeClass() throws Exception { @@ -88,7 +93,15 @@ public class ITKafkaIndexingServiceDataFormatTest extends AbstractKafkaIndexingS public void testIndexData(boolean transactionEnabled, String serializerPath, String parserType, String specPath) throws Exception { - doTestIndexDataStableState(transactionEnabled, serializerPath, parserType, specPath); + Map testConfig = KafkaUtil.getAdditionalKafkaTestConfigFromProperties(config); + boolean txnEnable = Boolean.parseBoolean( + testConfig.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED, "false") + ); + if (txnEnable != transactionEnabled) { + // do nothing + return; + } + doTestIndexDataStableState(txnEnable, serializerPath, parserType, specPath); } @Override