diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionIntegrationTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionIntegrationTest.java index 35b6602510..0c65618014 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionIntegrationTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionIntegrationTest.java @@ -24,10 +24,12 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; + import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; + import static org.junit.jupiter.api.Assertions.assertTrue; @Testcontainers @@ -53,6 +55,7 @@ public class SinglePartitionIntegrationTest { producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); + producer = new KafkaProducer<>(producerProperties); Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); @@ -61,9 +64,10 @@ public class SinglePartitionIntegrationTest { consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); - admin = Admin.create(adminProperties); - producer = new KafkaProducer<>(producerProperties); consumer = new KafkaConsumer<>(consumerProperties); + admin = Admin.create(adminProperties); + + List topicList = new ArrayList<>(); NewTopic newTopic = new NewTopic(Config.SINGLE_PARTITION_TOPIC, Config.SINGLE_PARTITION, Config.REPLICATION_FACTOR); topicList.add(newTopic); @@ -87,7 +91,7 @@ public class SinglePartitionIntegrationTest { void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException { List sentUserEventList = new ArrayList<>(); List receivedUserEventList = new ArrayList<>(); - for (long count = 1; count <= 10 ; count++) { + for (long count = 1; count <= 10; count++) { UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); userEvent.setEventNanoTime(System.nanoTime()); ProducerRecord producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent); @@ -105,10 +109,10 @@ public class SinglePartitionIntegrationTest { System.out.println("User Event ID: " + userEvent.getUserEventId()); }); boolean result = true; - for (int count = 0; count <= 9 ; count++) { + for (int count = 0; count <= 9; count++) { UserEvent sentUserEvent = sentUserEventList.get(count); UserEvent receivedUserEvent = receivedUserEventList.get(count); - if (!sentUserEvent.equals(receivedUserEvent) && result){ + if (!sentUserEvent.equals(receivedUserEvent) && result) { result = false; } }