BAEL-6612: How to subscribe a Kafka consumer to multiple topics (#14800)
* BAEL-6612: Subscribe consumer to multiple topics using Spring Kafka * BAEL-6612: Subscribe consumer to multiple topics using Apache Kafka * BAEL-6612: Revert README files * BAEL-6612: Fix identation; remove static qualifier * BAEL-6612: Fix identation; remove static qualifier
This commit is contained in:
parent
5aea5d5c29
commit
ed2f3234e6
|
@ -0,0 +1,103 @@
|
|||
package com.baeldung.kafka.multipletopics;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.KafkaContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
// This live test needs a Docker Daemon running so that a kafka container can be created
|
||||
|
||||
@Testcontainers
|
||||
public class MultipleTopicsLiveTest {
|
||||
|
||||
private final Logger log = LoggerFactory.getLogger(MultipleTopicsLiveTest.class);
|
||||
|
||||
private static final String CARD_PAYMENTS_TOPIC = "card-payments";
|
||||
private static final String BANK_TRANSFERS_TOPIC = "bank-transfers";
|
||||
private static KafkaProducer<String, String> producer;
|
||||
private static KafkaConsumer<String, String> consumer;
|
||||
|
||||
@Container
|
||||
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
|
||||
|
||||
@BeforeAll
|
||||
static void setup() {
|
||||
KAFKA_CONTAINER.addExposedPort(9092);
|
||||
producer = new KafkaProducer<>(getProducerProperties());
|
||||
consumer = new KafkaConsumer<>(getConsumerProperties());
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void destroy() {
|
||||
KAFKA_CONTAINER.stop();
|
||||
}
|
||||
|
||||
private static Properties getProducerProperties() {
|
||||
Properties producerProperties = new Properties();
|
||||
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
return producerProperties;
|
||||
}
|
||||
|
||||
private static Properties getConsumerProperties() {
|
||||
Properties consumerProperties = new Properties();
|
||||
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "payments");
|
||||
return consumerProperties;
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
|
||||
publishMessages();
|
||||
|
||||
consumer.subscribe(Arrays.asList(CARD_PAYMENTS_TOPIC, BANK_TRANSFERS_TOPIC));
|
||||
|
||||
int eventsProcessed = 0;
|
||||
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofSeconds(10))) {
|
||||
log.info("Event on topic={}, payload={}", record.topic(), record.value());
|
||||
eventsProcessed++;
|
||||
}
|
||||
|
||||
assertThat(eventsProcessed).isEqualTo(2);
|
||||
}
|
||||
|
||||
private void publishMessages() throws ExecutionException, InterruptedException {
|
||||
ProducerRecord<String, String> cardPayment = new ProducerRecord<>(CARD_PAYMENTS_TOPIC, createCardPayment());
|
||||
producer.send(cardPayment).get();
|
||||
|
||||
ProducerRecord<String, String> bankTransfer = new ProducerRecord<>(BANK_TRANSFERS_TOPIC, createBankTransfer());
|
||||
producer.send(bankTransfer).get();
|
||||
}
|
||||
|
||||
private String createCardPayment() {
|
||||
return "{\"paymentReference\":\"A184028KM0013790\", \"type\":\"card\", \"amount\":\"275\", \"currency\":\"GBP\"}";
|
||||
}
|
||||
|
||||
private String createBankTransfer() {
|
||||
return "{\"paymentReference\":\"19ae2-18mk73-009\", \"type\":\"bank\", \"amount\":\"150\", \"currency\":\"EUR\"}";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
package com.baeldung.spring.kafka.multipletopics;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||
|
||||
@Configuration
|
||||
public class KafkaConsumerConfig {
|
||||
|
||||
@Value(value = "${spring.kafka.bootstrap-servers}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, PaymentData> consumerFactory() {
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(PaymentData.class));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, PaymentData> kafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, PaymentData> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
return factory;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package com.baeldung.spring.kafka.multipletopics;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
|
||||
@EnableKafka
|
||||
@SpringBootApplication
|
||||
public class KafkaMultipleTopicsApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(KafkaMultipleTopicsApplication.class, args);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package com.baeldung.spring.kafka.multipletopics;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||
|
||||
@Configuration
|
||||
public class KafkaProducerConfig {
|
||||
|
||||
@Value(value = "${spring.kafka.bootstrap-servers}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, PaymentData> producerFactory() {
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
|
||||
return new DefaultKafkaProducerFactory<>(config, new StringSerializer(), new JsonSerializer<>());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, PaymentData> kafkaProducer() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
package com.baeldung.spring.kafka.multipletopics;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Currency;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public class PaymentData {
|
||||
private String paymentReference;
|
||||
private String type;
|
||||
private BigDecimal amount;
|
||||
private Currency currency;
|
||||
|
||||
public String getPaymentReference() {
|
||||
return paymentReference;
|
||||
}
|
||||
|
||||
public void setPaymentReference(String paymentReference) {
|
||||
this.paymentReference = paymentReference;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public BigDecimal getAmount() {
|
||||
return amount;
|
||||
}
|
||||
|
||||
public void setAmount(BigDecimal amount) {
|
||||
this.amount = amount;
|
||||
}
|
||||
|
||||
public Currency getCurrency() {
|
||||
return currency;
|
||||
}
|
||||
|
||||
public void setCurrency(Currency currency) {
|
||||
this.currency = currency;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", PaymentData.class.getSimpleName() + "[", "]")
|
||||
.add("paymentReference='" + paymentReference + "'")
|
||||
.add("type='" + type + "'")
|
||||
.add("amount=" + amount)
|
||||
.add("currency=" + currency)
|
||||
.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package com.baeldung.spring.kafka.multipletopics;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class PaymentDataListener {
|
||||
private final Logger log = LoggerFactory.getLogger(PaymentDataListener.class);
|
||||
|
||||
@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
|
||||
public void handlePaymentEvents(PaymentData paymentData, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
|
||||
log.info("Event on topic={}, payload={}", topic, paymentData);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
package com.baeldung.spring.kafka.multipletopics;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Currency;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.mock.mockito.SpyBean;
|
||||
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.listener.MessageListenerContainer;
|
||||
import org.springframework.kafka.test.context.EmbeddedKafka;
|
||||
import org.springframework.kafka.test.utils.ContainerTestUtils;
|
||||
|
||||
@SpringBootTest(classes = KafkaMultipleTopicsApplication.class)
|
||||
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
|
||||
public class KafkaMultipleTopicsIntegrationTest {
|
||||
private static final String CARD_PAYMENTS_TOPIC = "card-payments";
|
||||
private static final String BANK_TRANSFERS_TOPIC = "bank-transfers";
|
||||
|
||||
@Autowired
|
||||
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
|
||||
|
||||
@Autowired
|
||||
private KafkaTemplate<String, PaymentData> kafkaProducer;
|
||||
|
||||
@SpyBean
|
||||
private PaymentDataListener paymentsConsumer;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
// wait for embedded Kafka
|
||||
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
|
||||
ContainerTestUtils.waitForAssignment(messageListenerContainer, 2);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(2);
|
||||
doAnswer(invocation -> {
|
||||
countDownLatch.countDown();
|
||||
return null;
|
||||
}).when(paymentsConsumer)
|
||||
.handlePaymentEvents(any(), any());
|
||||
|
||||
kafkaProducer.send(CARD_PAYMENTS_TOPIC, createCardPayment());
|
||||
kafkaProducer.send(BANK_TRANSFERS_TOPIC, createBankTransfer());
|
||||
|
||||
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
|
||||
}
|
||||
|
||||
private PaymentData createCardPayment() {
|
||||
PaymentData cardPayment = new PaymentData();
|
||||
cardPayment.setAmount(BigDecimal.valueOf(275));
|
||||
cardPayment.setPaymentReference("A184028KM0013790");
|
||||
cardPayment.setCurrency(Currency.getInstance("GBP"));
|
||||
cardPayment.setType("card");
|
||||
return cardPayment;
|
||||
}
|
||||
|
||||
private PaymentData createBankTransfer() {
|
||||
PaymentData bankTransfer = new PaymentData();
|
||||
bankTransfer.setAmount(BigDecimal.valueOf(150));
|
||||
bankTransfer.setPaymentReference("19ae2-18mk73-009");
|
||||
bankTransfer.setCurrency(Currency.getInstance("EUR"));
|
||||
bankTransfer.setType("bank");
|
||||
return bankTransfer;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue