removed hard-coded Kafka key and value deserializer (#8112)

* removed hard-coded Kafka key and value deserializer, leaving default deserializer as org.apache.kafka.common.serialization.ByteArrayDeserializer.  Also added checks to ensure that any provided deserializer class extends org.apache.kafka.serialization.Deserializer and outputs a byte array.

* Addressed all comments from original pull request and also added a
unit test.

* Added additional test that uses "poll" to ensure that custom deserializer
works properly.
This commit is contained in:
Aaron Bossert 2019-07-30 19:25:32 -04:00 committed by Gian Merlino
parent 653b558134
commit aba65bb675
4 changed files with 125 additions and 11 deletions

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.util.HashMap;
import java.util.Map;
@ -36,8 +35,6 @@ public class KafkaConsumerConfigs
{
final Map<String, Object> props = new HashMap<>();
props.put("metadata.max.age.ms", "10000");
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
props.put("auto.offset.reset", "none");
props.put("enable.auto.commit", "false");

View File

@ -34,7 +34,6 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.util.ArrayList;
import java.util.HashMap;
@ -154,8 +153,6 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
final Map<String, Object> props = new HashMap<>(((KafkaIndexTaskIOConfig) super.ioConfig).getConsumerProperties());
props.put("auto.offset.reset", "none");
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
return new KafkaRecordSupplier(props, configMapper);
}

View File

@ -33,8 +33,12 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import javax.annotation.Nonnull;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@ -197,6 +201,27 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
}
}
private Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey)
{
Deserializer deserializerObject;
try {
Class deserializerClass = Class.forName(properties.getProperty(kafkaConfigKey, ByteArrayDeserializer.class.getTypeName()));
Method deserializerMethod = deserializerClass.getMethod("deserialize", String.class, byte[].class);
Type deserializerReturnType = deserializerMethod.getGenericReturnType();
if (deserializerReturnType == byte[].class) {
deserializerObject = (Deserializer) deserializerClass.getConstructor().newInstance();
} else {
throw new IllegalArgumentException("Kafka deserializers must return a byte array (byte[]), " + deserializerClass.getName() + " returns " + deserializerReturnType.getTypeName());
}
}
catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new StreamException(e);
}
return deserializerObject;
}
private KafkaConsumer<byte[], byte[]> getKafkaConsumer()
{
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
@ -207,7 +232,10 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer");
Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer");
return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);

View File

@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.TestHelper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@ -126,6 +127,27 @@ public class KafkaRecordSupplierTest
}).collect(Collectors.toList());
}
public static class TestKafkaDeserializer implements Deserializer<byte[]>
{
@Override
public void configure(Map<String, ?> map, boolean b)
{
}
@Override
public void close()
{
}
@Override
public byte[] deserialize(String topic, byte[] data)
{
return data;
}
}
@BeforeClass
public static void setupClass() throws Exception
{
@ -184,6 +206,76 @@ public class KafkaRecordSupplierTest
recordSupplier.close();
}
@Test
public void testSupplierSetupCustomDeserializer() throws ExecutionException, InterruptedException
{
// Insert data
insertData();
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
StreamPartition.of(topic, 0),
StreamPartition.of(topic, 1)
);
Map<String, Object> properties = kafkaServer.consumerProperties();
properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
properties,
objectMapper
);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
recordSupplier.assign(partitions);
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(ImmutableSet.of(0, 1), recordSupplier.getPartitionIds(topic));
recordSupplier.close();
}
@Test
public void testPollCustomDeserializer() throws InterruptedException, ExecutionException
{
// Insert data
insertData();
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
StreamPartition.of(topic, 0),
StreamPartition.of(topic, 1)
);
Map<String, Object> properties = kafkaServer.consumerProperties();
properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
properties,
objectMapper
);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
List<OrderedPartitionableRecord<Integer, Long>> initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
List<OrderedPartitionableRecord<Integer, Long>> polledRecords = recordSupplier.poll(poll_timeout_millis);
for (int i = 0; polledRecords.size() != initialRecords.size() && i < pollRetry; i++) {
polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
Thread.sleep(200);
}
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(initialRecords.size(), polledRecords.size());
Assert.assertTrue(initialRecords.containsAll(polledRecords));
recordSupplier.close();
}
@Test
public void testPoll() throws InterruptedException, ExecutionException
{