Merge pull request #11221 from carloscaverobarca/BAEL-4854-Custom-Serializer-Apache-Kafka
[BAEL-4854] - Custom Serializers in Apache Kafka
This commit is contained in:
commit
2086ac5fc3
|
@ -161,6 +161,12 @@
|
||||||
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
|
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
|
||||||
<version>${com.datastax.spark.spark-cassandra-connector-java.version}</version>
|
<version>${com.datastax.spark.spark-cassandra-connector-java.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
<version>${lombok.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
@ -175,6 +181,7 @@
|
||||||
<graphframes.version>0.8.1-spark3.0-s_2.12</graphframes.version>
|
<graphframes.version>0.8.1-spark3.0-s_2.12</graphframes.version>
|
||||||
<com.datastax.spark.spark-cassandra-connector.version>2.5.2</com.datastax.spark.spark-cassandra-connector.version>
|
<com.datastax.spark.spark-cassandra-connector.version>2.5.2</com.datastax.spark.spark-cassandra-connector.version>
|
||||||
<com.datastax.spark.spark-cassandra-connector-java.version>1.6.0-M1</com.datastax.spark.spark-cassandra-connector-java.version>
|
<com.datastax.spark.spark-cassandra-connector-java.version>1.6.0-M1</com.datastax.spark.spark-cassandra-connector-java.version>
|
||||||
|
<lombok.version>1.18.20</lombok.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
</project>
|
</project>
|
|
@ -0,0 +1,15 @@
|
||||||
|
package com.baeldung.kafka.dto;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
@Builder
|
||||||
|
public class MessageDto {
|
||||||
|
private String message;
|
||||||
|
private String version;
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
package com.baeldung.kafka.serdes;
|
||||||
|
|
||||||
|
import com.baeldung.kafka.dto.MessageDto;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.apache.kafka.common.errors.SerializationException;
|
||||||
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class CustomDeserializer implements Deserializer<MessageDto> {
|
||||||
|
|
||||||
|
private ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageDto deserialize(String topic, byte[] data) {
|
||||||
|
try {
|
||||||
|
if (data == null){
|
||||||
|
System.out.println("Null received at deserializing");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
System.out.println("Deserializing...");
|
||||||
|
return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new SerializationException("Error when deserializing byte[] to MessageDto");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
package com.baeldung.kafka.serdes;
|
||||||
|
|
||||||
|
import com.baeldung.kafka.dto.MessageDto;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.apache.kafka.common.errors.SerializationException;
|
||||||
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class CustomSerializer implements Serializer<MessageDto> {
|
||||||
|
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] serialize(String topic, MessageDto data) {
|
||||||
|
try {
|
||||||
|
if (data == null){
|
||||||
|
System.out.println("Null received at serializing");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
System.out.println("Serializing...");
|
||||||
|
return objectMapper.writeValueAsBytes(data);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new SerializationException("Error when serializing MessageDto to byte[]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,92 @@
|
||||||
|
package com.baeldung.kafka.serdes;
|
||||||
|
|
||||||
|
import com.baeldung.kafka.dto.MessageDto;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.testcontainers.containers.KafkaContainer;
|
||||||
|
import org.testcontainers.utility.DockerImageName;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
public class KafkaSerDesLiveTest {
|
||||||
|
private static final String CONSUMER_APP_ID = "consumer_id";
|
||||||
|
private static final String CONSUMER_GROUP_ID = "group_id";
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
|
||||||
|
private final String TOPIC = "mytopic";
|
||||||
|
|
||||||
|
private static KafkaConsumer<String, MessageDto> createKafkaConsumer() {
|
||||||
|
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
|
||||||
|
props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
|
||||||
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
|
||||||
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||||
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomDeserializer");
|
||||||
|
|
||||||
|
return new KafkaConsumer<>(props);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static KafkaProducer<String, MessageDto> createKafkaProducer() {
|
||||||
|
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
|
||||||
|
props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
|
||||||
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomSerializer");
|
||||||
|
|
||||||
|
return new KafkaProducer(props);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenKafkaClientShouldSerializeAndDeserialize() throws InterruptedException {
|
||||||
|
|
||||||
|
MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build();
|
||||||
|
|
||||||
|
KafkaProducer<String, MessageDto> producer = createKafkaProducer();
|
||||||
|
producer.send(new ProducerRecord<String, MessageDto>(TOPIC, "1", msgProd));
|
||||||
|
System.out.println("Message sent " + msgProd);
|
||||||
|
producer.close();
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
AtomicReference<MessageDto> msgCons = new AtomicReference<>();
|
||||||
|
|
||||||
|
KafkaConsumer<String, MessageDto> consumer = createKafkaConsumer();
|
||||||
|
consumer.subscribe(Arrays.asList(TOPIC));
|
||||||
|
|
||||||
|
ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
|
||||||
|
records.forEach(record -> {
|
||||||
|
msgCons.set(record.value());
|
||||||
|
System.out.println("Message received " + record.value());
|
||||||
|
});
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
assertEquals(msgProd, msgCons.get());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue