adding null checks for assertions. reducing total number of consumed messages (#15711)
This commit is contained in:
parent
dc30dc426b
commit
ee63f1c19d
@ -8,11 +8,13 @@ import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
|
|||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.kafka.listener.MessageListenerContainer;
|
import org.springframework.kafka.listener.MessageListenerContainer;
|
||||||
import org.springframework.kafka.test.context.EmbeddedKafka;
|
import org.springframework.kafka.test.context.EmbeddedKafka;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import org.springframework.test.annotation.DirtiesContext;
|
import org.springframework.test.annotation.DirtiesContext;
|
||||||
import org.springframework.test.annotation.DirtiesContext.ClassMode;
|
import org.springframework.test.annotation.DirtiesContext.ClassMode;
|
||||||
import org.springframework.test.context.ActiveProfiles;
|
import org.springframework.test.context.ActiveProfiles;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
@ -24,8 +26,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||||||
public class ManagingConsumerGroupsIntegrationTest {
|
public class ManagingConsumerGroupsIntegrationTest {
|
||||||
|
|
||||||
private static final String CONSUMER_1_IDENTIFIER = "org.springframework.kafka.KafkaListenerEndpointContainer#1";
|
private static final String CONSUMER_1_IDENTIFIER = "org.springframework.kafka.KafkaListenerEndpointContainer#1";
|
||||||
private static final int TOTAL_PRODUCED_MESSAGES = 50000;
|
private static final int TOTAL_PRODUCED_MESSAGES = 5000;
|
||||||
private static final int MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP = 10000;
|
private static final int MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP = 10;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
KafkaTemplate<String, Double> kafkaTemplate;
|
KafkaTemplate<String, Double> kafkaTemplate;
|
||||||
@ -57,11 +59,16 @@ public class ManagingConsumerGroupsIntegrationTest {
|
|||||||
}
|
}
|
||||||
} while (currentMessage != TOTAL_PRODUCED_MESSAGES);
|
} while (currentMessage != TOTAL_PRODUCED_MESSAGES);
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
if (consumerService.consumedPartitions != null
|
|
||||||
&& consumerService.consumedPartitions.get("consumer-1") != null
|
Set<Integer> partitionsConsumedBy1 = consumerService.consumedPartitions.get("consumer-1");
|
||||||
&& consumerService.consumedPartitions.get("consumer-0") != null) {
|
Set<Integer> partitionsConsumedBy0 = consumerService.consumedPartitions.get("consumer-0");
|
||||||
assertTrue(consumerService.consumedPartitions.get("consumer-1").size() >= 1);
|
|
||||||
assertTrue( consumerService.consumedPartitions.get("consumer-0").size() >= 1);
|
if (!CollectionUtils.isEmpty(partitionsConsumedBy0)) {
|
||||||
|
assertEquals(2, partitionsConsumedBy0.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!CollectionUtils.isEmpty(partitionsConsumedBy1)) {
|
||||||
|
assertEquals(1, partitionsConsumedBy1.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user