From aba65bb675e0c34216c8d1d0efbe93f17842c1f4 Mon Sep 17 00:00:00 2001 From: Aaron Bossert Date: Tue, 30 Jul 2019 19:25:32 -0400 Subject: [PATCH] removed hard-coded Kafka key and value deserializer (#8112) * removed hard-coded Kafka key and value deserializer, leaving default deserializer as org.apache.kafka.common.serialization.ByteArrayDeserializer. Also added checks to ensure that any provided deserializer class extends org.apache.kafka.serialization.Deserializer and outputs a byte array. * Addressed all comments from original pull request and also added a unit test. * Added additional test that uses "poll" to ensure that custom deserializer works properly. --- .../indexing/kafka/KafkaConsumerConfigs.java | 3 - .../druid/indexing/kafka/KafkaIndexTask.java | 3 - .../indexing/kafka/KafkaRecordSupplier.java | 32 +++++- .../kafka/KafkaRecordSupplierTest.java | 98 ++++++++++++++++++- 4 files changed, 125 insertions(+), 11 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index b5f78694c51..cdce155e82d 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka; import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; import org.apache.druid.java.util.common.StringUtils; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.util.HashMap; import java.util.Map; @@ -36,8 +35,6 @@ public class KafkaConsumerConfigs { final Map props = new HashMap<>(); props.put("metadata.max.age.ms", "10000"); - props.put("key.deserializer", ByteArrayDeserializer.class.getName()); - props.put("value.deserializer", ByteArrayDeserializer.class.getName()); props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId())); props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index fa900d2a662..b55d05b5846 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -34,7 +34,6 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.util.ArrayList; import java.util.HashMap; @@ -154,8 +153,6 @@ public class KafkaIndexTask extends SeekableStreamIndexTask final Map props = new HashMap<>(((KafkaIndexTaskIOConfig) super.ioConfig).getConsumerProperties()); props.put("auto.offset.reset", "none"); - props.put("key.deserializer", ByteArrayDeserializer.class.getName()); - props.put("value.deserializer", ByteArrayDeserializer.class.getName()); return new KafkaRecordSupplier(props, configMapper); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index e40f77a1152..0859665d600 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -33,8 +33,12 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; import javax.annotation.Nonnull; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Type; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -196,7 +200,28 @@ public class KafkaRecordSupplier implements RecordSupplier } } } - + + private Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey) + { + Deserializer deserializerObject; + try { + Class deserializerClass = Class.forName(properties.getProperty(kafkaConfigKey, ByteArrayDeserializer.class.getTypeName())); + Method deserializerMethod = deserializerClass.getMethod("deserialize", String.class, byte[].class); + + Type deserializerReturnType = deserializerMethod.getGenericReturnType(); + + if (deserializerReturnType == byte[].class) { + deserializerObject = (Deserializer) deserializerClass.getConstructor().newInstance(); + } else { + throw new IllegalArgumentException("Kafka deserializers must return a byte array (byte[]), " + deserializerClass.getName() + " returns " + deserializerReturnType.getTypeName()); + } + } + catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new StreamException(e); + } + return deserializerObject; + } + private KafkaConsumer getKafkaConsumer() { final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); @@ -207,7 +232,10 @@ public class KafkaRecordSupplier implements RecordSupplier ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer"); + Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer"); + + return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject); } finally { Thread.currentThread().setContextClassLoader(currCtxCl); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index b5058840616..476a1939c4b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.TestHelper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Deserializer; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -54,7 +55,7 @@ public class KafkaRecordSupplierTest private static int pollRetry = 5; private static int topicPosFix = 0; private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); - + private static TestingCluster zkServer; private static TestBroker kafkaServer; @@ -125,7 +126,28 @@ public class KafkaRecordSupplierTest ); }).collect(Collectors.toList()); } - + + public static class TestKafkaDeserializer implements Deserializer + { + @Override + public void configure(Map map, boolean b) + { + + } + + @Override + public void close() + { + + } + + @Override + public byte[] deserialize(String topic, byte[] data) + { + return data; + } + } + @BeforeClass public static void setupClass() throws Exception { @@ -183,7 +205,77 @@ public class KafkaRecordSupplierTest recordSupplier.close(); } - + + @Test + public void testSupplierSetupCustomDeserializer() throws ExecutionException, InterruptedException + { + + // Insert data + insertData(); + + Set> partitions = ImmutableSet.of( + StreamPartition.of(topic, 0), + StreamPartition.of(topic, 1) + ); + + Map properties = kafkaServer.consumerProperties(); + properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName()); + properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName()); + + KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( + properties, + objectMapper + ); + + Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); + + recordSupplier.assign(partitions); + + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + Assert.assertEquals(ImmutableSet.of(0, 1), recordSupplier.getPartitionIds(topic)); + + recordSupplier.close(); + } + + @Test + public void testPollCustomDeserializer() throws InterruptedException, ExecutionException + { + + // Insert data + insertData(); + + Set> partitions = ImmutableSet.of( + StreamPartition.of(topic, 0), + StreamPartition.of(topic, 1) + ); + + Map properties = kafkaServer.consumerProperties(); + properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName()); + properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName()); + + KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( + properties, + objectMapper + ); + + recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); + + List> initialRecords = new ArrayList<>(createOrderedPartitionableRecords()); + + List> polledRecords = recordSupplier.poll(poll_timeout_millis); + for (int i = 0; polledRecords.size() != initialRecords.size() && i < pollRetry; i++) { + polledRecords.addAll(recordSupplier.poll(poll_timeout_millis)); + Thread.sleep(200); + } + + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + Assert.assertEquals(initialRecords.size(), polledRecords.size()); + Assert.assertTrue(initialRecords.containsAll(polledRecords)); + + recordSupplier.close(); + } + @Test public void testPoll() throws InterruptedException, ExecutionException {