Added tests for External Sequence number with Time Window

This commit is contained in:
Amol Gote 2023-10-16 19:50:02 -04:00
parent 7c40b82bf9
commit baaef6bcf3
4 changed files with 153 additions and 4 deletions

View File

@ -24,12 +24,12 @@ public class ExtSeqWithTimeWindowConsumer {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
Consumer<String, Message> consumer = new KafkaConsumer<>(props);
Consumer<Long, Message> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("multi_partition_topic"));
List<Message> buffer = new ArrayList<>();
long lastProcessedTime = System.nanoTime();
while (true) {
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
ConsumerRecords<Long, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
@ -43,7 +43,7 @@ public class ExtSeqWithTimeWindowConsumer {
private static void processBuffer(List<Message> buffer) {
Collections.sort(buffer);
buffer.forEach(message -> {
System.out.println("Processing message with Partition key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
System.out.println("Processing message with Global Sequence number: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
});
buffer.clear();
}

View File

@ -19,6 +19,7 @@ public class ExtSeqWithTimeWindowProducer {
for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
long applicationIdentifier = Message.getRandomApplicationIdentifier();
Message message = new Message(partitionKey, applicationIdentifier);
message.setGlobalSequenceNumber(partitionKey);
producer.send(new ProducerRecord<>("multi_partition_topic", partitionKey, message));
System.out.println("Partition key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
}

View File

@ -8,6 +8,8 @@ public class Message implements Comparable<Message> {
private long partitionKey;
private long applicationIdentifier;
private long globalSequenceNumber;
public Message(){
}
@ -26,9 +28,17 @@ public class Message implements Comparable<Message> {
return applicationIdentifier;
}
public long getGlobalSequenceNumber() {
return globalSequenceNumber;
}
public void setGlobalSequenceNumber(long globalSequenceNumber) {
this.globalSequenceNumber = globalSequenceNumber;
}
@Override
public int compareTo(Message other) {
return Long.compare(this.partitionKey, other.partitionKey);
return Long.compare(this.globalSequenceNumber, other.globalSequenceNumber);
}
@Override

View File

@ -0,0 +1,138 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.junit.jupiter.api.Assertions.*;
@Testcontainers
public class ExtSeqWithTimeWindowIntegrationTest {
private static String TOPIC = "multi_partition_topic";
private static int PARTITIONS = 5;
private static short REPLICATION_FACTOR = 1;
private static Admin admin;
private static KafkaProducer<Long, Message> producer;
private static KafkaConsumer<Long, Message> consumer;
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000);
private static final long BUFFER_PERIOD_NS = 5000L * 1000000; // 5000 milliseconds converted to nanoseconds
@Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@BeforeAll
static void setup() throws ExecutionException, InterruptedException {
KAFKA_CONTAINER.addExposedPort(9092);
Properties adminProperties = new Properties();
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
admin = Admin.create(adminProperties);
producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties);
List<NewTopic> topicList = new ArrayList<>();
NewTopic newTopic = new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR);
topicList.add(newTopic);
CreateTopicsResult result = admin.createTopics(topicList);
KafkaFuture<Void> future = result.values().get(TOPIC);
future.whenComplete((voidResult, exception) -> {
if (exception != null) {
System.err.println("Error creating the topic: " + exception.getMessage());
} else {
System.out.println("Topic created successfully!");
}
}).get();
}
@AfterAll
static void destroy() {
KAFKA_CONTAINER.stop();
}
@Test
void givenMultiplePartitions_whenPublishedToKafkaAndConsumedWithExtSeqNumberAndTimeWindow_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<Message> sentMessageList = new ArrayList<>();
List<Message> receivedMessageList = new ArrayList<>();
for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
long applicationIdentifier = Message.getRandomApplicationIdentifier();
Message message = new Message(partitionKey, applicationIdentifier);
message.setGlobalSequenceNumber(partitionKey);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC, partitionKey, message));
sentMessageList.add(message);
RecordMetadata metadata = future.get();
System.out.println("Partition : " + metadata.partition());
}
boolean isOrderMaintained = true;
consumer.subscribe(Collections.singletonList(TOPIC));
List<Message> buffer = new ArrayList<>();
long lastProcessedTime = System.nanoTime();
ConsumerRecords<Long, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
while (buffer.size() > 0) {
if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_NS) {
processBuffer(buffer, receivedMessageList);
lastProcessedTime = System.nanoTime();
}
records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
}
for (int insertPosition = 0; insertPosition <= receivedMessageList.size() - 1; insertPosition++) {
if (isOrderMaintained){
Message sentMessage = sentMessageList.get(insertPosition);
Message receivedMessage = receivedMessageList.get(insertPosition);
if (!sentMessage.equals(receivedMessage)) {
isOrderMaintained = false;
}
}
}
assertTrue(isOrderMaintained);
}
private static void processBuffer(List<Message> buffer, List<Message> receivedMessageList) {
Collections.sort(buffer);
buffer.forEach(message -> {
receivedMessageList.add(message);
System.out.println("Processing message with Global Sequence number: " + message.getGlobalSequenceNumber() + ", Application Identifier: " + message.getApplicationIdentifier());
});
buffer.clear();
}
}