Code Formatting
This commit is contained in:
parent
1ab4017ad6
commit
45bc3d94d4
|
@ -24,10 +24,12 @@ import org.testcontainers.containers.KafkaContainer;
|
||||||
import org.testcontainers.junit.jupiter.Container;
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
import org.testcontainers.utility.DockerImageName;
|
import org.testcontainers.utility.DockerImageName;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@Testcontainers
|
@Testcontainers
|
||||||
|
@ -53,6 +55,7 @@ public class SinglePartitionIntegrationTest {
|
||||||
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||||
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
|
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
|
||||||
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
|
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
|
||||||
|
producer = new KafkaProducer<>(producerProperties);
|
||||||
|
|
||||||
Properties consumerProperties = new Properties();
|
Properties consumerProperties = new Properties();
|
||||||
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||||
|
@ -61,9 +64,10 @@ public class SinglePartitionIntegrationTest {
|
||||||
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
|
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
|
||||||
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
|
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
|
||||||
admin = Admin.create(adminProperties);
|
|
||||||
producer = new KafkaProducer<>(producerProperties);
|
|
||||||
consumer = new KafkaConsumer<>(consumerProperties);
|
consumer = new KafkaConsumer<>(consumerProperties);
|
||||||
|
admin = Admin.create(adminProperties);
|
||||||
|
|
||||||
|
|
||||||
List<NewTopic> topicList = new ArrayList<>();
|
List<NewTopic> topicList = new ArrayList<>();
|
||||||
NewTopic newTopic = new NewTopic(Config.SINGLE_PARTITION_TOPIC, Config.SINGLE_PARTITION, Config.REPLICATION_FACTOR);
|
NewTopic newTopic = new NewTopic(Config.SINGLE_PARTITION_TOPIC, Config.SINGLE_PARTITION, Config.REPLICATION_FACTOR);
|
||||||
topicList.add(newTopic);
|
topicList.add(newTopic);
|
||||||
|
@ -87,7 +91,7 @@ public class SinglePartitionIntegrationTest {
|
||||||
void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
|
void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
|
||||||
List<UserEvent> sentUserEventList = new ArrayList<>();
|
List<UserEvent> sentUserEventList = new ArrayList<>();
|
||||||
List<UserEvent> receivedUserEventList = new ArrayList<>();
|
List<UserEvent> receivedUserEventList = new ArrayList<>();
|
||||||
for (long count = 1; count <= 10 ; count++) {
|
for (long count = 1; count <= 10; count++) {
|
||||||
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
|
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
|
||||||
userEvent.setEventNanoTime(System.nanoTime());
|
userEvent.setEventNanoTime(System.nanoTime());
|
||||||
ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent);
|
ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent);
|
||||||
|
@ -105,10 +109,10 @@ public class SinglePartitionIntegrationTest {
|
||||||
System.out.println("User Event ID: " + userEvent.getUserEventId());
|
System.out.println("User Event ID: " + userEvent.getUserEventId());
|
||||||
});
|
});
|
||||||
boolean result = true;
|
boolean result = true;
|
||||||
for (int count = 0; count <= 9 ; count++) {
|
for (int count = 0; count <= 9; count++) {
|
||||||
UserEvent sentUserEvent = sentUserEventList.get(count);
|
UserEvent sentUserEvent = sentUserEventList.get(count);
|
||||||
UserEvent receivedUserEvent = receivedUserEventList.get(count);
|
UserEvent receivedUserEvent = receivedUserEventList.get(count);
|
||||||
if (!sentUserEvent.equals(receivedUserEvent) && result){
|
if (!sentUserEvent.equals(receivedUserEvent) && result) {
|
||||||
result = false;
|
result = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue