From 4a9e72664c9a8d94f7eb944b887ee62c36f57b4a Mon Sep 17 00:00:00 2001 From: Avin Buricha Date: Sat, 6 May 2023 08:53:31 +0530 Subject: [PATCH] BAEL-6410 (#13919) * BAEL-6410 | Article code * Adding consumer code for article --- .../kafka/headers/KafkaMessageHeaders.java | 88 +++++++++++++++ .../headers/KafkaMessageHeadersLiveTest.java | 104 ++++++++++++++++++ 2 files changed, 192 insertions(+) create mode 100644 apache-kafka-2/src/main/java/com/baeldung/kafka/headers/KafkaMessageHeaders.java create mode 100644 apache-kafka-2/src/test/java/com/baeldung/kafka/headers/KafkaMessageHeadersLiveTest.java diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/headers/KafkaMessageHeaders.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/headers/KafkaMessageHeaders.java new file mode 100644 index 0000000000..2c8c14bfc5 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/headers/KafkaMessageHeaders.java @@ -0,0 +1,88 @@ +package com.baeldung.kafka.headers; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaMessageHeaders { + + private static Logger logger = LoggerFactory.getLogger(KafkaMessageHeaders.class); + + private static String TOPIC = "baeldung"; + private static String MESSAGE_KEY = "message"; + private static String MESSAGE_VALUE = "Hello World"; + private static String HEADER_KEY = "website"; + private static String HEADER_VALUE = "baeldung.com"; + + private static KafkaProducer producer; + private static KafkaConsumer consumer; + + public static void main(String[] args) { + setup(); + + publishMessageWithCustomHeaders(); + + consumeMessageWithCustomHeaders(); + } + + private static void consumeMessageWithCustomHeaders() { + consumer.subscribe(Arrays.asList(TOPIC)); + + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + for (ConsumerRecord record : records) { + logger.info(record.key()); + logger.info(record.value()); + + Headers headers = record.headers(); + for (Header header : headers) { + logger.info(header.key()); + logger.info(new String(header.value())); + } + } + } + + private static void publishMessageWithCustomHeaders() { + List
headers = new ArrayList<>(); + headers.add(new RecordHeader(HEADER_KEY, HEADER_VALUE.getBytes())); + + ProducerRecord record1 = new ProducerRecord<>(TOPIC, null, MESSAGE_KEY, MESSAGE_VALUE, headers); + producer.send(record1); + + ProducerRecord record2 = new ProducerRecord<>(TOPIC, null, System.currentTimeMillis(), MESSAGE_KEY, MESSAGE_VALUE, headers); + producer.send(record2); + } + + private static void setup() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1"); + + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + } + +} diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/headers/KafkaMessageHeadersLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/headers/KafkaMessageHeadersLiveTest.java new file mode 100644 index 0000000000..42cc572e07 --- /dev/null +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/headers/KafkaMessageHeadersLiveTest.java @@ -0,0 +1,104 @@ +package com.baeldung.kafka.headers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +// This live test needs a Docker Daemon running so that a kafka container can be created + +@Testcontainers +public class KafkaMessageHeadersLiveTest { + + private static String TOPIC = "baeldung"; + private static String MESSAGE_KEY = "message"; + private static String MESSAGE_VALUE = "Hello World"; + private static String HEADER_KEY = "website"; + private static String HEADER_VALUE = "baeldung.com"; + + private static KafkaProducer producer; + private static KafkaConsumer consumer; + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + @BeforeAll + static void setup() { + KAFKA_CONTAINER.addExposedPort(9092); + + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1"); + + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + } + + @AfterAll + static void destroy() { + KAFKA_CONTAINER.stop(); + } + + @Test + void givenAMessageWithCustomHeaders_whenPublishedToKafkaAndConsumed_thenCheckForCustomHeaders() throws ExecutionException, InterruptedException { + List
headers = new ArrayList<>(); + headers.add(new RecordHeader(HEADER_KEY, HEADER_VALUE.getBytes())); + + ProducerRecord record1 = new ProducerRecord<>(TOPIC, null, MESSAGE_KEY, MESSAGE_VALUE, headers); + Future future = producer.send(record1); + + RecordMetadata metadata = future.get(); + + assertNotNull(metadata); + + consumer.subscribe(Arrays.asList(TOPIC)); + + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + for (ConsumerRecord record : records) { + assertEquals(MESSAGE_KEY, record.key()); + assertEquals(MESSAGE_VALUE, record.value()); + + Headers consumedHeaders = record.headers(); + assertNotNull(consumedHeaders); + + for (Header header : consumedHeaders) { + assertEquals(HEADER_KEY, header.key()); + assertEquals(HEADER_VALUE, new String(header.value())); + } + } + } +}