BAEL-7374 - First Cut
This commit is contained in:
parent
a57ecf1bef
commit
ef44b839ea
|
@ -17,13 +17,11 @@ import org.testcontainers.containers.KafkaContainer;
|
|||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
|
|
@ -50,6 +50,11 @@
|
|||
<version>${awaitility.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
package com.baeldung.spring.kafka.start.stop.consumer;
|
||||
|
||||
public class Constants {
|
||||
public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic";
|
||||
public static final int MULTIPLE_PARTITIONS = 5;
|
||||
public static final short REPLICATION_FACTOR = 1;
|
||||
public static final String LISTENER_ID = "listener-id-1";
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package com.baeldung.spring.kafka.start.stop.consumer;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@EnableKafka
|
||||
@Configuration
|
||||
public class KafkaConsumerConfig {
|
||||
|
||||
@Value("${spring.kafka.bootstrap-servers}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DefaultKafkaConsumerFactory<String, UserEvent> consumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer");
|
||||
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
|
||||
new JsonDeserializer<>(UserEvent.class));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.baeldung.spring.kafka.start.stop.consumer;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
|
||||
import org.springframework.kafka.listener.MessageListenerContainer;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class KafkaListenerControlService {
|
||||
|
||||
@Autowired
|
||||
private KafkaListenerEndpointRegistry registry;
|
||||
|
||||
// Method to start a listener
|
||||
public void startListener(String listenerId) {
|
||||
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
|
||||
if (listenerContainer != null && !listenerContainer.isRunning()) {
|
||||
listenerContainer.start();
|
||||
}
|
||||
}
|
||||
|
||||
// Method to stop a listener
|
||||
public void stopListener(String listenerId) {
|
||||
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
|
||||
if (listenerContainer != null && listenerContainer.isRunning()) {
|
||||
listenerContainer.stop();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package com.baeldung.spring.kafka.start.stop.consumer;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class StartStopConsumerApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(com.baeldung.spring.kafka.deserialization.exception.Application.class, args);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
package com.baeldung.spring.kafka.start.stop.consumer;
|
||||
|
||||
public class UserEvent {
|
||||
private String userEventId;
|
||||
private long eventNanoTime;
|
||||
|
||||
public UserEvent(){}
|
||||
|
||||
public UserEvent(String userEventId) {
|
||||
this.userEventId = userEventId;
|
||||
}
|
||||
|
||||
public String getUserEventId() {
|
||||
return userEventId;
|
||||
}
|
||||
|
||||
public void setUserEventId(String userEventId) {
|
||||
this.userEventId = userEventId;
|
||||
}
|
||||
|
||||
public long getEventNanoTime() {
|
||||
return eventNanoTime;
|
||||
}
|
||||
|
||||
public void setEventNanoTime(long eventNanoTime) {
|
||||
this.eventNanoTime = eventNanoTime;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package com.baeldung.spring.kafka.start.stop.consumer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class UserEventListener {
|
||||
private static final Logger logger = LoggerFactory.getLogger(UserEventListener.class);
|
||||
|
||||
@Autowired
|
||||
UserEventStore userEventStore;
|
||||
|
||||
@KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group",
|
||||
containerFactory = "kafkaListenerContainerFactory", autoStartup = "false")
|
||||
public void userEventListener(UserEvent userEvent) {
|
||||
logger.info("Received UserEvent: " + userEvent.getUserEventId() + ", Time: " + userEvent.getEventNanoTime());
|
||||
userEventStore.addUserEvent(userEvent);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package com.baeldung.spring.kafka.start.stop.consumer;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Component
|
||||
public class UserEventStore {
|
||||
private final List<UserEvent> userEvents = new ArrayList<>();
|
||||
|
||||
public void addUserEvent(UserEvent userEvent){
|
||||
userEvents.add(userEvent);
|
||||
}
|
||||
|
||||
public List<UserEvent> getUserEvents(){
|
||||
return userEvents;
|
||||
}
|
||||
|
||||
public void clearUserEvents(){
|
||||
this.userEvents.clear();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
package com.baeldung.spring.kafka.startstopconsumer;
|
||||
|
||||
import com.baeldung.spring.kafka.start.stop.consumer.*;
|
||||
import org.apache.kafka.clients.admin.Admin;
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.serialization.LongSerializer;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||
import org.springframework.test.context.DynamicPropertyRegistry;
|
||||
import org.springframework.test.context.DynamicPropertySource;
|
||||
import org.testcontainers.containers.KafkaContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Testcontainers
|
||||
@SpringBootTest(classes = StartStopConsumerApplication.class)
|
||||
public class StartStopConsumerTest {
|
||||
|
||||
private static KafkaProducer<Long, UserEvent> producer;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(StartStopConsumerTest.class);
|
||||
|
||||
@Container
|
||||
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
|
||||
|
||||
@Autowired
|
||||
KafkaListenerControlService kafkaListenerControlService;
|
||||
|
||||
@Autowired
|
||||
UserEventStore userEventStore;
|
||||
|
||||
@DynamicPropertySource
|
||||
static void setProps(DynamicPropertyRegistry registry) {
|
||||
registry.add("spring.kafka.bootstrap-servers", KAFKA_CONTAINER::getBootstrapServers);
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
static void setup() throws ExecutionException, InterruptedException {
|
||||
KAFKA_CONTAINER.addExposedPort(9092);
|
||||
|
||||
Properties adminProperties = new Properties();
|
||||
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||
|
||||
Properties producerProperties = new Properties();
|
||||
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
|
||||
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
|
||||
Admin admin = Admin.create(adminProperties);
|
||||
producer = new KafkaProducer<>(producerProperties);
|
||||
admin.createTopics(ImmutableList.of(new NewTopic(Constants.MULTI_PARTITION_TOPIC, Constants.MULTIPLE_PARTITIONS, Constants.REPLICATION_FACTOR)))
|
||||
.all()
|
||||
.get();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void destroy() {
|
||||
KAFKA_CONTAINER.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
void processMessages_whenListenerIsRestarted_thenCorrectNumberOfMessagesAreConsumed() throws ExecutionException, InterruptedException {
|
||||
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
|
||||
|
||||
//Verification that listener has started.
|
||||
UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString());
|
||||
startUserEventTest.setEventNanoTime(System.nanoTime());
|
||||
producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest));
|
||||
await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size()));
|
||||
this.userEventStore.clearUserEvents();
|
||||
|
||||
for (long count = 1; count <= 10; count++) {
|
||||
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
|
||||
userEvent.setEventNanoTime(System.nanoTime());
|
||||
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent));
|
||||
RecordMetadata metadata = future.get();
|
||||
if (count == 4) {
|
||||
await().untilAsserted(() -> assertEquals(4, this.userEventStore.getUserEvents().size()));
|
||||
this.kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
|
||||
this.userEventStore.clearUserEvents();
|
||||
}
|
||||
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
|
||||
}
|
||||
assertEquals(0, this.userEventStore.getUserEvents().size());
|
||||
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
|
||||
await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size()));
|
||||
kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue