kafka consumer: custom serializer can't be configured after it's instantiation (#12960) (#13097)

* allow kakfa custom serializer to be configured

  * add unit tests

Co-authored-by: ellen shen <ellenshen@apple.com>
This commit is contained in:
Ellen Shen 2022-09-17 05:42:21 -07:00 committed by GitHub
parent d4967c38f8
commit da30c8070a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 3 deletions

View File

@ -48,6 +48,7 @@ import java.lang.reflect.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -234,7 +235,7 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
}
}
private static Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey)
private static Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey, boolean isKey)
{
Deserializer deserializerObject;
try {
@ -257,6 +258,13 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new StreamException(e);
}
Map<String, Object> configs = new HashMap<>();
for (String key : properties.stringPropertyNames()) {
configs.put(key, properties.get(key));
}
deserializerObject.configure(configs, isKey);
return deserializerObject;
}
@ -272,8 +280,8 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(KafkaRecordSupplier.class.getClassLoader());
Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer");
Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer");
Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer", true);
Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer", false);
return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject);
}

View File

@ -57,6 +57,7 @@ public class KafkaRecordSupplierTest
{
private static String topic = "topic";
private static String additonal_parameter = "additional.parameter";
private static long poll_timeout_millis = 1000;
private static int pollRetry = 5;
private static int topicPosFix = 0;
@ -156,6 +157,30 @@ public class KafkaRecordSupplierTest
}
}
public static class TestKafkaDeserializerRequiresParameter implements Deserializer<byte[]>
{
@Override
public void configure(Map<String, ?> map, boolean b)
{
if (!map.containsKey("additional.parameter")) {
throw new IllegalStateException("require additional.parameter configured");
}
}
@Override
public void close()
{
}
@Override
public byte[] deserialize(String topic, byte[] data)
{
return data;
}
}
@BeforeClass
public static void setupClass() throws Exception
{
@ -245,6 +270,42 @@ public class KafkaRecordSupplierTest
recordSupplier.close();
}
@Test
public void testSupplierSetupCustomDeserializerRequiresParameter()
{
Map<String, Object> properties = kafkaServer.consumerProperties();
properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
properties.put(additonal_parameter, "stringValue");
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
properties,
OBJECT_MAPPER
);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
recordSupplier.close();
}
@Test(expected = IllegalStateException.class)
public void testSupplierSetupCustomDeserializerRequiresParameterButMissingIt()
{
Map<String, Object> properties = kafkaServer.consumerProperties();
properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
properties,
OBJECT_MAPPER
);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
recordSupplier.close();
}
@Test
public void testPollCustomDeserializer() throws InterruptedException, ExecutionException
{