[JAVA-33842] Fix spring-kafka-2 integration test (#16415)

This commit is contained in:
Harry9656 2024-04-16 08:33:48 +02:00 committed by GitHub
parent a4ecc1ca29
commit b97a573c15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 20 deletions

View File

@ -26,7 +26,10 @@
<dependency> <dependency>
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version> </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
@ -46,11 +49,6 @@
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.testcontainers</groupId> <groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId> <artifactId>kafka</artifactId>
@ -70,9 +68,8 @@
</dependencies> </dependencies>
<properties> <properties>
<testcontainers-kafka.version>1.19.3</testcontainers-kafka.version> <testcontainers-kafka.version>1.19.7</testcontainers-kafka.version>
<spring-kafka.version>3.1.2</spring-kafka.version> <kafka-streams.version>3.7.0</kafka-streams.version>
<kafka-streams.version>3.6.1</kafka-streams.version>
<start-class>com.baeldung.spring.kafka.dlt.KafkaDltApplication</start-class> <start-class>com.baeldung.spring.kafka.dlt.KafkaDltApplication</start-class>
</properties> </properties>

View File

@ -1,5 +1,11 @@
package com.baeldung.spring.kafka.managingkafkaconsumergroups; package com.baeldung.spring.kafka.managingkafkaconsumergroups;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -8,22 +14,19 @@ import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.util.CollectionUtils;
import java.util.Objects;
import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode; import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ActiveProfiles;
import java.util.Set; import org.springframework.util.CollectionUtils;
import static org.junit.jupiter.api.Assertions.assertEquals; import lombok.extern.slf4j.Slf4j;
import static org.junit.jupiter.api.Assertions.assertTrue;
@SpringBootTest(classes = ManagingConsumerGroupsApplicationKafkaApp.class) @SpringBootTest(classes = ManagingConsumerGroupsApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9098", "port=9098"}, topics = {"topic1"}) @EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9098", "port=9098"}, topics = {"topic1"})
@DirtiesContext(classMode = ClassMode.BEFORE_CLASS) @DirtiesContext(classMode = ClassMode.BEFORE_CLASS)
@ActiveProfiles("managed") @ActiveProfiles("managed")
public class ManagingConsumerGroupsIntegrationTest { @Slf4j
class ManagingConsumerGroupsIntegrationTest {
private static final String CONSUMER_1_IDENTIFIER = "org.springframework.kafka.KafkaListenerEndpointContainer#1"; private static final String CONSUMER_1_IDENTIFIER = "org.springframework.kafka.KafkaListenerEndpointContainer#1";
private static final int TOTAL_PRODUCED_MESSAGES = 5000; private static final int TOTAL_PRODUCED_MESSAGES = 5000;
@ -39,11 +42,11 @@ public class ManagingConsumerGroupsIntegrationTest {
MessageConsumerService consumerService; MessageConsumerService consumerService;
@Test @Test
public void givenContinuousMessageFlow_whenAConsumerLeavesTheGroup_thenKafkaTriggersPartitionRebalance() throws InterruptedException { void givenContinuousMessageFlow_whenAConsumerLeavesTheGroup_thenKafkaTriggersPartitionRebalance() throws InterruptedException {
int currentMessage = 0; int currentMessage = 0;
do { do {
kafkaTemplate.send("topic-1", RandomUtils.nextDouble(10.0, 20.0)); kafkaTemplate.send("topic-1", RandomUtils.nextDouble(10.0, 20.0));
Thread.sleep(0,100); // Waiting to let the embedded kafka consume the event.
currentMessage++; currentMessage++;
if (currentMessage == MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP) { if (currentMessage == MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP) {
@ -53,12 +56,13 @@ public class ManagingConsumerGroupsIntegrationTest {
.findFirst() .findFirst()
.orElse(""); .orElse("");
MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer(containerId); MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer(containerId);
Thread.sleep(2000);
Objects.requireNonNull(container).stop(); Objects.requireNonNull(container).stop();
kafkaListenerEndpointRegistry.unregisterListenerContainer(containerId); kafkaListenerEndpointRegistry.unregisterListenerContainer(containerId);
} }
if(currentMessage % 1000 == 0){
log.info("Processed {} of {}", currentMessage, TOTAL_PRODUCED_MESSAGES);
}
} while (currentMessage != TOTAL_PRODUCED_MESSAGES); } while (currentMessage != TOTAL_PRODUCED_MESSAGES);
Thread.sleep(2000);
Set<Integer> partitionsConsumedBy1 = consumerService.consumedPartitions.get("consumer-1"); Set<Integer> partitionsConsumedBy1 = consumerService.consumedPartitions.get("consumer-1");
Set<Integer> partitionsConsumedBy0 = consumerService.consumedPartitions.get("consumer-0"); Set<Integer> partitionsConsumedBy0 = consumerService.consumedPartitions.get("consumer-0");