Merge pull request #15759 from s9m33r/articles/BAEL-7351
BAEL-7351 Demo code
This commit is contained in:
commit
d6ab24068d
|
@ -0,0 +1,47 @@
|
||||||
|
package com.baeldung.spring.kafka.groupId;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.annotation.EnableKafka;
|
||||||
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
|
import org.springframework.kafka.listener.CommonErrorHandler;
|
||||||
|
|
||||||
|
@EnableKafka
|
||||||
|
@Configuration
|
||||||
|
class KafkaConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
ConsumerFactory<String, String> consumerFactory(@Value("${spring.kafka.bootstrap-servers:localhost:9092}") String bootstrapServers) {
|
||||||
|
Map<String, Object> props = new HashMap<>();
|
||||||
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||||
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, "${kafka.consumer.groupId:test-consumer-group}");
|
||||||
|
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "rex");
|
||||||
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory,
|
||||||
|
CommonErrorHandler commonErrorHandler) {
|
||||||
|
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
|
||||||
|
factory.setConsumerFactory(consumerFactory);
|
||||||
|
factory.setCommonErrorHandler(commonErrorHandler);
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
CommonErrorHandler kafkaErrorHandler() {
|
||||||
|
return new KafkaErrorHandler();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
package com.baeldung.spring.kafka.groupId;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.Consumer;
|
||||||
|
import org.apache.kafka.common.errors.RecordDeserializationException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.kafka.listener.CommonErrorHandler;
|
||||||
|
import org.springframework.kafka.listener.MessageListenerContainer;
|
||||||
|
import org.springframework.lang.NonNull;
|
||||||
|
|
||||||
|
class KafkaErrorHandler implements CommonErrorHandler {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(KafkaErrorHandler.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleOtherException(@NonNull Exception exception, @NonNull Consumer<?, ?> consumer, @NonNull MessageListenerContainer container,
|
||||||
|
boolean batchListener) {
|
||||||
|
handle(exception, consumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handle(Exception exception, Consumer<?, ?> consumer) {
|
||||||
|
log.error("Exception thrown", exception);
|
||||||
|
if (exception instanceof RecordDeserializationException ex) {
|
||||||
|
consumer.seek(ex.topicPartition(), ex.offset() + 1L);
|
||||||
|
consumer.commitSync();
|
||||||
|
} else {
|
||||||
|
log.error("Exception not handled", exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,14 @@
|
||||||
|
package com.baeldung.spring.kafka.groupId;
|
||||||
|
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
import org.springframework.context.annotation.ComponentScan;
|
||||||
|
|
||||||
|
@SpringBootApplication
|
||||||
|
@ComponentScan(basePackages = "com.baeldung.spring.kafka.groupId")
|
||||||
|
public class Main {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
SpringApplication.run(Main.class, args);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
package com.baeldung.spring.kafka.groupId;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.Consumer;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
import org.springframework.messaging.handler.annotation.Payload;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class MyKafkaConsumer {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaConsumer.class);
|
||||||
|
|
||||||
|
private CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
private String payload;
|
||||||
|
|
||||||
|
@KafkaListener(topics = "${kafka.topic.name:test-topic}", clientIdPrefix = "neo", groupId = "${kafka.consumer.groupId:test-consumer-group}", concurrency = "4")
|
||||||
|
public void receive(@Payload String payload, Consumer<String, String> consumer) {
|
||||||
|
LOGGER.info("Consumer='{}' received payload='{}'", consumer.groupMetadata()
|
||||||
|
.memberId(), payload);
|
||||||
|
this.payload = payload;
|
||||||
|
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public CountDownLatch getLatch() {
|
||||||
|
return latch;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void resetLatch() {
|
||||||
|
latch = new CountDownLatch(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPayload() {
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
package com.baeldung.spring.kafka.groupId;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class MyKafkaProducer {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaProducer.class);
|
||||||
|
|
||||||
|
@Value("${kafka.topic.name:test-topic}")
|
||||||
|
private String topic;
|
||||||
|
@Autowired
|
||||||
|
private KafkaTemplate<String, String> kafkaTemplate;
|
||||||
|
|
||||||
|
public void send(String payload) {
|
||||||
|
LOGGER.info("Sending payload='{}' to topic='{}'", payload, topic);
|
||||||
|
kafkaTemplate.send(topic, payload);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
package com.baeldung.spring.kafka.groupId;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.context.annotation.ComponentScan;
|
||||||
|
import org.springframework.kafka.test.context.EmbeddedKafka;
|
||||||
|
import org.springframework.test.annotation.DirtiesContext;
|
||||||
|
import org.springframework.test.context.ActiveProfiles;
|
||||||
|
|
||||||
|
@SpringBootTest(classes = Main.class)
|
||||||
|
@ActiveProfiles("groupId")
|
||||||
|
@ComponentScan(basePackages = "com.baeldung.spring.kafka.groupId")
|
||||||
|
@DirtiesContext
|
||||||
|
@EmbeddedKafka(partitions = 4, topics = { "${kafka.topic.name:test-topic}" }, brokerProperties = { "listeners=PLAINTEXT://localhost:8000", "port=8000" })
|
||||||
|
public class MainLiveTest {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MyKafkaConsumer consumer;
|
||||||
|
@Autowired
|
||||||
|
private MyKafkaProducer producer;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setup() {
|
||||||
|
consumer.resetLatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception {
|
||||||
|
String data = "Test 123...";
|
||||||
|
producer.send(data);
|
||||||
|
boolean messageConsumed = consumer.getLatch()
|
||||||
|
.await(10, TimeUnit.SECONDS);
|
||||||
|
assertTrue(messageConsumed);
|
||||||
|
assertThat(consumer.getPayload(), containsString(data));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
spring.kafka.bootstrap-servers=localhost:8000
|
Loading…
Reference in New Issue