Merge pull request #15854 from aamolgote/jira/BAEL-7374-start-stop-kafka-consumer

BAEL-7374 - First Cut
This commit is contained in:
Andrea Cerasoni 2024-03-19 18:04:17 +00:00 committed by GitHub
commit c0bce2baac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 258 additions and 0 deletions

View File

@ -0,0 +1,6 @@
package com.baeldung.spring.kafka.startstopconsumer;
public class Constants {
public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic";
public static final String LISTENER_ID = "listener-id-1";
}

View File

@ -0,0 +1,42 @@
package com.baeldung.spring.kafka.startstopconsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<String, UserEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(UserEvent.class));
}
}

View File

@ -0,0 +1,27 @@
package com.baeldung.spring.kafka.startstopconsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;
@Service
public class KafkaListenerControlService {
@Autowired
private KafkaListenerEndpointRegistry registry;
public void startListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && !listenerContainer.isRunning()) {
listenerContainer.start();
}
}
public void stopListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && listenerContainer.isRunning()) {
listenerContainer.stop();
}
}
}

View File

@ -0,0 +1,11 @@
package com.baeldung.spring.kafka.startstopconsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StartStopConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(StartStopConsumerApplication.class, args);
}
}

View File

@ -0,0 +1,20 @@
package com.baeldung.spring.kafka.startstopconsumer;
public class UserEvent {
private String userEventId;
public UserEvent() {
}
public UserEvent(String userEventId) {
this.userEventId = userEventId;
}
public String getUserEventId() {
return userEventId;
}
public void setUserEventId(String userEventId) {
this.userEventId = userEventId;
}
}

View File

@ -0,0 +1,23 @@
package com.baeldung.spring.kafka.startstopconsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class UserEventListener {
private static final Logger logger = LoggerFactory.getLogger(UserEventListener.class);
@Autowired
UserEventStore userEventStore;
@KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group",
containerFactory = "kafkaListenerContainerFactory", autoStartup = "false")
public void processUserEvent(UserEvent userEvent) {
logger.info("Received UserEvent: " + userEvent.getUserEventId());
userEventStore.addUserEvent(userEvent);
}
}

View File

@ -0,0 +1,24 @@
package com.baeldung.spring.kafka.startstopconsumer;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
public class UserEventStore {
private final List<UserEvent> userEvents = new ArrayList<>();
public void addUserEvent(UserEvent userEvent) {
userEvents.add(userEvent);
}
public List<UserEvent> getUserEvents() {
return userEvents;
}
public void clearUserEvents() {
this.userEvents.clear();
}
}

View File

@ -0,0 +1,105 @@
package com.baeldung.spring.kafka.startstopconsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
// This live test needs a Docker Daemon running so that a kafka container can be created
@Testcontainers
@SpringBootTest(classes = StartStopConsumerApplication.class)
public class StartStopConsumerLiveTest {
private static KafkaProducer<Long, UserEvent> producer;
private static final Logger logger = LoggerFactory.getLogger(StartStopConsumerLiveTest.class);
@Container
private static KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@Autowired
KafkaListenerControlService kafkaListenerControlService;
@Autowired
UserEventStore userEventStore;
@DynamicPropertySource
static void setProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA_CONTAINER::getBootstrapServers);
}
@BeforeAll
static void beforeAll() {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
producer = new KafkaProducer<>(producerProperties);
Awaitility.setDefaultTimeout(ofSeconds(5));
Awaitility.setDefaultPollInterval(ofMillis(50));
}
@AfterAll
static void destroy() {
KAFKA_CONTAINER.stop();
}
@BeforeEach
void beforeEach() {
this.userEventStore.clearUserEvents();
}
@Test
void processMessages_whenListenerIsRestarted_thenCorrectNumberOfMessagesAreConsumed() throws ExecutionException, InterruptedException {
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
//Verification that listener has started.
UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString());
producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest));
await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size()));
this.userEventStore.clearUserEvents();
for (long count = 1; count <= 10; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent));
RecordMetadata metadata = future.get();
if (count == 4) {
await().untilAsserted(() -> assertEquals(4, this.userEventStore.getUserEvents().size()));
this.kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
this.userEventStore.clearUserEvents();
}
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
assertEquals(0, this.userEventStore.getUserEvents().size());
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size()));
kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
}
}