From ed2f3234e6574f814adb24323ae3bedc6b1b29d7 Mon Sep 17 00:00:00 2001 From: Constantin <50400363+constantinurs@users.noreply.github.com> Date: Sat, 7 Oct 2023 07:21:30 +0300 Subject: [PATCH] BAEL-6612: How to subscribe a Kafka consumer to multiple topics (#14800) * BAEL-6612: Subscribe consumer to multiple topics using Spring Kafka * BAEL-6612: Subscribe consumer to multiple topics using Apache Kafka * BAEL-6612: Revert README files * BAEL-6612: Fix identation; remove static qualifier * BAEL-6612: Fix identation; remove static qualifier --- .../MultipleTopicsLiveTest.java | 103 ++++++++++++++++++ .../multipletopics/KafkaConsumerConfig.java | 36 ++++++ .../KafkaMultipleTopicsApplication.java | 13 +++ .../multipletopics/KafkaProducerConfig.java | 34 ++++++ .../kafka/multipletopics/PaymentData.java | 54 +++++++++ .../multipletopics/PaymentDataListener.java | 18 +++ .../KafkaMultipleTopicsIntegrationTest.java | 78 +++++++++++++ 7 files changed, 336 insertions(+) create mode 100644 apache-kafka-2/src/test/java/com/baeldung/kafka/multipletopics/MultipleTopicsLiveTest.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaConsumerConfig.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaMultipleTopicsApplication.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaProducerConfig.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/PaymentData.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/PaymentDataListener.java create mode 100644 spring-kafka-2/src/test/java/com/baeldung/spring/kafka/multipletopics/KafkaMultipleTopicsIntegrationTest.java diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/multipletopics/MultipleTopicsLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/multipletopics/MultipleTopicsLiveTest.java new file mode 100644 index 0000000000..653456a678 --- /dev/null +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/multipletopics/MultipleTopicsLiveTest.java @@ -0,0 +1,103 @@ +package com.baeldung.kafka.multipletopics; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +// This live test needs a Docker Daemon running so that a kafka container can be created + +@Testcontainers +public class MultipleTopicsLiveTest { + + private final Logger log = LoggerFactory.getLogger(MultipleTopicsLiveTest.class); + + private static final String CARD_PAYMENTS_TOPIC = "card-payments"; + private static final String BANK_TRANSFERS_TOPIC = "bank-transfers"; + private static KafkaProducer producer; + private static KafkaConsumer consumer; + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + @BeforeAll + static void setup() { + KAFKA_CONTAINER.addExposedPort(9092); + producer = new KafkaProducer<>(getProducerProperties()); + consumer = new KafkaConsumer<>(getConsumerProperties()); + } + + @AfterAll + static void destroy() { + KAFKA_CONTAINER.stop(); + } + + private static Properties getProducerProperties() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return producerProperties; + } + + private static Properties getConsumerProperties() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "payments"); + return consumerProperties; + } + + @Test + void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception { + publishMessages(); + + consumer.subscribe(Arrays.asList(CARD_PAYMENTS_TOPIC, BANK_TRANSFERS_TOPIC)); + + int eventsProcessed = 0; + for (ConsumerRecord record : consumer.poll(Duration.ofSeconds(10))) { + log.info("Event on topic={}, payload={}", record.topic(), record.value()); + eventsProcessed++; + } + + assertThat(eventsProcessed).isEqualTo(2); + } + + private void publishMessages() throws ExecutionException, InterruptedException { + ProducerRecord cardPayment = new ProducerRecord<>(CARD_PAYMENTS_TOPIC, createCardPayment()); + producer.send(cardPayment).get(); + + ProducerRecord bankTransfer = new ProducerRecord<>(BANK_TRANSFERS_TOPIC, createBankTransfer()); + producer.send(bankTransfer).get(); + } + + private String createCardPayment() { + return "{\"paymentReference\":\"A184028KM0013790\", \"type\":\"card\", \"amount\":\"275\", \"currency\":\"GBP\"}"; + } + + private String createBankTransfer() { + return "{\"paymentReference\":\"19ae2-18mk73-009\", \"type\":\"bank\", \"amount\":\"150\", \"currency\":\"EUR\"}"; + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaConsumerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaConsumerConfig.java new file mode 100644 index 0000000000..741fb6bba4 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaConsumerConfig.java @@ -0,0 +1,36 @@ +package com.baeldung.spring.kafka.multipletopics; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +@Configuration +public class KafkaConsumerConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Bean + public ConsumerFactory consumerFactory() { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(PaymentData.class)); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaMultipleTopicsApplication.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaMultipleTopicsApplication.java new file mode 100644 index 0000000000..2135a27f39 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaMultipleTopicsApplication.java @@ -0,0 +1,13 @@ +package com.baeldung.spring.kafka.multipletopics; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.annotation.EnableKafka; + +@EnableKafka +@SpringBootApplication +public class KafkaMultipleTopicsApplication { + public static void main(String[] args) { + SpringApplication.run(KafkaMultipleTopicsApplication.class, args); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaProducerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaProducerConfig.java new file mode 100644 index 0000000000..2cb0117bf1 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/KafkaProducerConfig.java @@ -0,0 +1,34 @@ +package com.baeldung.spring.kafka.multipletopics; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +@Configuration +public class KafkaProducerConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Bean + public ProducerFactory producerFactory() { + Map config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + return new DefaultKafkaProducerFactory<>(config, new StringSerializer(), new JsonSerializer<>()); + } + + @Bean + public KafkaTemplate kafkaProducer() { + return new KafkaTemplate<>(producerFactory()); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/PaymentData.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/PaymentData.java new file mode 100644 index 0000000000..e81138c089 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/PaymentData.java @@ -0,0 +1,54 @@ +package com.baeldung.spring.kafka.multipletopics; + +import java.math.BigDecimal; +import java.util.Currency; +import java.util.StringJoiner; + +public class PaymentData { + private String paymentReference; + private String type; + private BigDecimal amount; + private Currency currency; + + public String getPaymentReference() { + return paymentReference; + } + + public void setPaymentReference(String paymentReference) { + this.paymentReference = paymentReference; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public BigDecimal getAmount() { + return amount; + } + + public void setAmount(BigDecimal amount) { + this.amount = amount; + } + + public Currency getCurrency() { + return currency; + } + + public void setCurrency(Currency currency) { + this.currency = currency; + } + + @Override + public String toString() { + return new StringJoiner(", ", PaymentData.class.getSimpleName() + "[", "]") + .add("paymentReference='" + paymentReference + "'") + .add("type='" + type + "'") + .add("amount=" + amount) + .add("currency=" + currency) + .toString(); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/PaymentDataListener.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/PaymentDataListener.java new file mode 100644 index 0000000000..fb640cca25 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multipletopics/PaymentDataListener.java @@ -0,0 +1,18 @@ +package com.baeldung.spring.kafka.multipletopics; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +@Service +public class PaymentDataListener { + private final Logger log = LoggerFactory.getLogger(PaymentDataListener.class); + + @KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments") + public void handlePaymentEvents(PaymentData paymentData, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + log.info("Event on topic={}, payload={}", topic, paymentData); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/multipletopics/KafkaMultipleTopicsIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/multipletopics/KafkaMultipleTopicsIntegrationTest.java new file mode 100644 index 0000000000..345e84b65b --- /dev/null +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/multipletopics/KafkaMultipleTopicsIntegrationTest.java @@ -0,0 +1,78 @@ +package com.baeldung.spring.kafka.multipletopics; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; + +import java.math.BigDecimal; +import java.util.Currency; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.ContainerTestUtils; + +@SpringBootTest(classes = KafkaMultipleTopicsApplication.class) +@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) +public class KafkaMultipleTopicsIntegrationTest { + private static final String CARD_PAYMENTS_TOPIC = "card-payments"; + private static final String BANK_TRANSFERS_TOPIC = "bank-transfers"; + + @Autowired + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + + @Autowired + private KafkaTemplate kafkaProducer; + + @SpyBean + private PaymentDataListener paymentsConsumer; + + @BeforeEach + void setUp() { + // wait for embedded Kafka + for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { + ContainerTestUtils.waitForAssignment(messageListenerContainer, 2); + } + } + + @Test + public void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception { + CountDownLatch countDownLatch = new CountDownLatch(2); + doAnswer(invocation -> { + countDownLatch.countDown(); + return null; + }).when(paymentsConsumer) + .handlePaymentEvents(any(), any()); + + kafkaProducer.send(CARD_PAYMENTS_TOPIC, createCardPayment()); + kafkaProducer.send(BANK_TRANSFERS_TOPIC, createBankTransfer()); + + assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + } + + private PaymentData createCardPayment() { + PaymentData cardPayment = new PaymentData(); + cardPayment.setAmount(BigDecimal.valueOf(275)); + cardPayment.setPaymentReference("A184028KM0013790"); + cardPayment.setCurrency(Currency.getInstance("GBP")); + cardPayment.setType("card"); + return cardPayment; + } + + private PaymentData createBankTransfer() { + PaymentData bankTransfer = new PaymentData(); + bankTransfer.setAmount(BigDecimal.valueOf(150)); + bankTransfer.setPaymentReference("19ae2-18mk73-009"); + bankTransfer.setCurrency(Currency.getInstance("EUR")); + bankTransfer.setType("bank"); + return bankTransfer; + } +} \ No newline at end of file