Incorporate Review comments

This commit is contained in:
Amol Gote 2023-11-04 17:18:20 -04:00
parent b24851bfce
commit 2cecae1dfb
4 changed files with 11 additions and 16 deletions

View File

@ -7,5 +7,5 @@ public class Config {
public static final int MULTIPLE_PARTITIONS = 5;
public static final int SINGLE_PARTITION = 1;
public static short REPLICATION_FACTOR = 1;
public static final short REPLICATION_FACTOR = 1;
}

View File

@ -11,7 +11,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.jupiter.api.AfterAll;
@ -25,8 +24,8 @@ import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.junit.jupiter.api.Assertions.*;
import com.google.common.collect.ImmutableList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@Testcontainers
public class ExternalSequenceWithTimeWindowIntegrationTest {
@ -84,7 +83,6 @@ public class ExternalSequenceWithTimeWindowIntegrationTest {
System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
boolean isOrderMaintained = true;
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
List<UserEvent> buffer = new ArrayList<>();
long lastProcessedTime = System.nanoTime();
@ -102,9 +100,9 @@ public class ExternalSequenceWithTimeWindowIntegrationTest {
buffer.add(record.value());
});
}
assertThat(receivedUserEventList)
.isEqualTo(sentUserEventList)
.containsExactlyElementsOf(sentUserEventList);
assertThat(receivedUserEventList)
.isEqualTo(sentUserEventList)
.containsExactlyElementsOf(sentUserEventList);
}
private static void processBuffer(List<UserEvent> buffer, List<UserEvent> receivedUserEventList) {

View File

@ -11,7 +11,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.jupiter.api.AfterAll;
@ -25,8 +24,8 @@ import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.junit.jupiter.api.Assertions.*;
import com.google.common.collect.ImmutableList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@Testcontainers
public class MultiplePartitionIntegrationTest {
@ -89,7 +88,7 @@ public class MultiplePartitionIntegrationTest {
receivedUserEventList.add(userEvent);
System.out.println("User Event ID: " + userEvent.getUserEventId());
});
assertThat(receivedUserEventList)
assertThat(receivedUserEventList)
.isNotEqualTo(sentUserEventList)
.containsExactlyInAnyOrderElementsOf(sentUserEventList);
}

View File

@ -5,7 +5,6 @@ import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -14,7 +13,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.jupiter.api.AfterAll;
@ -29,8 +27,8 @@ import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.ImmutableList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@Testcontainers
public class SinglePartitionIntegrationTest {