Fixed unit test case failures

This commit is contained in:
Amol Gote 2023-11-04 18:09:21 -04:00
parent 2cecae1dfb
commit 2a6e561f76
3 changed files with 12 additions and 7 deletions

View File

@ -42,15 +42,19 @@ public class UserEvent implements Comparable<UserEvent> {
@Override
public boolean equals(Object obj) {
if (obj == this) {
if (this == obj) {
return true;
}
if (!(obj instanceof UserEvent)) {
return false;
}
UserEvent userEvent = (UserEvent) obj;
return Objects.equals(this.userEventId, userEvent.getUserEventId())
&& userEvent.getEventNanoTime() == this.eventNanoTime;
return this.globalSequenceNumber == userEvent.globalSequenceNumber;
}
@Override
public int hashCode() {
return Objects.hash(globalSequenceNumber);
}
}

View File

@ -71,16 +71,16 @@ public class MultiplePartitionIntegrationTest {
void givenMultiplePartitions_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<UserEvent> sentUserEventList = new ArrayList<>();
List<UserEvent> receivedUserEventList = new ArrayList<>();
for (long count = 1; count <= 10 ; count++) {
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setGlobalSequenceNumber(sequenceNumber);
userEvent.setEventNanoTime(System.nanoTime());
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, count, userEvent));
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
boolean isOrderMaintained = true;
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {

View File

@ -78,8 +78,9 @@ public class SinglePartitionIntegrationTest {
void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<UserEvent> sentUserEventList = new ArrayList<>();
List<UserEvent> receivedUserEventList = new ArrayList<>();
for (long count = 1; count <= 10; count++) {
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setGlobalSequenceNumber(sequenceNumber);
userEvent.setEventNanoTime(System.nanoTime());
ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent);
Future<RecordMetadata> future = producer.send(producerRecord);