BAEL-7374 - Code clean Up

This commit is contained in:
Amol Gote 2024-02-10 19:34:06 -05:00
parent 45315f4f48
commit 3b5e91b0dd
1 changed files with 13 additions and 14 deletions

View File

@ -1,16 +1,15 @@
package com.baeldung.spring.kafka.startstopconsumer;
import com.baeldung.spring.kafka.start.stop.consumer.*;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -22,8 +21,10 @@ import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import org.testcontainers.utility.DockerImageName;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -55,21 +56,14 @@ public class StartStopConsumerUnitTest {
}
@BeforeAll
static void setup() throws ExecutionException, InterruptedException {
KAFKA_CONTAINER.addExposedPort(9092);
Properties adminProperties = new Properties();
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
static void beforeAll() {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
Admin admin = Admin.create(adminProperties);
producer = new KafkaProducer<>(producerProperties);
admin.createTopics(ImmutableList.of(new NewTopic(Constants.MULTI_PARTITION_TOPIC, Constants.MULTIPLE_PARTITIONS, Constants.REPLICATION_FACTOR)))
.all()
.get();
Awaitility.setDefaultTimeout(ofSeconds(5));
Awaitility.setDefaultPollInterval(ofMillis(50));
}
@AfterAll
@ -77,6 +71,11 @@ public class StartStopConsumerUnitTest {
KAFKA_CONTAINER.stop();
}
@BeforeEach
void beforeEach() {
this.userEventStore.clearUserEvents();
}
@Test
void processMessages_whenListenerIsRestarted_thenCorrectNumberOfMessagesAreConsumed() throws ExecutionException, InterruptedException {
kafkaListenerControlService.startListener(Constants.LISTENER_ID);