* BAEL-6410 | Article code

* Adding consumer code for article
This commit is contained in:
Avin Buricha 2023-05-06 08:53:31 +05:30 committed by GitHub
parent dea82bf189
commit 4a9e72664c
2 changed files with 192 additions and 0 deletions

View File

@ -0,0 +1,88 @@
package com.baeldung.kafka.headers;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaMessageHeaders {
private static Logger logger = LoggerFactory.getLogger(KafkaMessageHeaders.class);
private static String TOPIC = "baeldung";
private static String MESSAGE_KEY = "message";
private static String MESSAGE_VALUE = "Hello World";
private static String HEADER_KEY = "website";
private static String HEADER_VALUE = "baeldung.com";
private static KafkaProducer<String, String> producer;
private static KafkaConsumer<String, String> consumer;
public static void main(String[] args) {
setup();
publishMessageWithCustomHeaders();
consumeMessageWithCustomHeaders();
}
private static void consumeMessageWithCustomHeaders() {
consumer.subscribe(Arrays.asList(TOPIC));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
for (ConsumerRecord<String, String> record : records) {
logger.info(record.key());
logger.info(record.value());
Headers headers = record.headers();
for (Header header : headers) {
logger.info(header.key());
logger.info(new String(header.value()));
}
}
}
private static void publishMessageWithCustomHeaders() {
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader(HEADER_KEY, HEADER_VALUE.getBytes()));
ProducerRecord<String, String> record1 = new ProducerRecord<>(TOPIC, null, MESSAGE_KEY, MESSAGE_VALUE, headers);
producer.send(record1);
ProducerRecord<String, String> record2 = new ProducerRecord<>(TOPIC, null, System.currentTimeMillis(), MESSAGE_KEY, MESSAGE_VALUE, headers);
producer.send(record2);
}
private static void setup() {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");
producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties);
}
}

View File

@ -0,0 +1,104 @@
package com.baeldung.kafka.headers;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
// This live test needs a Docker Daemon running so that a kafka container can be created
@Testcontainers
public class KafkaMessageHeadersLiveTest {
private static String TOPIC = "baeldung";
private static String MESSAGE_KEY = "message";
private static String MESSAGE_VALUE = "Hello World";
private static String HEADER_KEY = "website";
private static String HEADER_VALUE = "baeldung.com";
private static KafkaProducer<String, String> producer;
private static KafkaConsumer<String, String> consumer;
@Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@BeforeAll
static void setup() {
KAFKA_CONTAINER.addExposedPort(9092);
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");
producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties);
}
@AfterAll
static void destroy() {
KAFKA_CONTAINER.stop();
}
@Test
void givenAMessageWithCustomHeaders_whenPublishedToKafkaAndConsumed_thenCheckForCustomHeaders() throws ExecutionException, InterruptedException {
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader(HEADER_KEY, HEADER_VALUE.getBytes()));
ProducerRecord<String, String> record1 = new ProducerRecord<>(TOPIC, null, MESSAGE_KEY, MESSAGE_VALUE, headers);
Future<RecordMetadata> future = producer.send(record1);
RecordMetadata metadata = future.get();
assertNotNull(metadata);
consumer.subscribe(Arrays.asList(TOPIC));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
for (ConsumerRecord<String, String> record : records) {
assertEquals(MESSAGE_KEY, record.key());
assertEquals(MESSAGE_VALUE, record.value());
Headers consumedHeaders = record.headers();
assertNotNull(consumedHeaders);
for (Header header : consumedHeaders) {
assertEquals(HEADER_KEY, header.key());
assertEquals(HEADER_VALUE, new String(header.value()));
}
}
}
}