From baaef6bcf3599c79d90cf8e6fc9f96df59c47e4d Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Mon, 16 Oct 2023 19:50:02 -0400 Subject: [PATCH] Added tests for External Sequence number with Time Window --- .../ExtSeqWithTimeWindowConsumer.java | 6 +- .../ExtSeqWithTimeWindowProducer.java | 1 + .../message/ordering/payload/Message.java | 12 +- .../ExtSeqWithTimeWindowIntegrationTest.java | 138 ++++++++++++++++++ 4 files changed, 153 insertions(+), 4 deletions(-) create mode 100644 apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowIntegrationTest.java diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java index 19595d9e95..cd424178ae 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java @@ -24,12 +24,12 @@ public class ExtSeqWithTimeWindowConsumer { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class); - Consumer consumer = new KafkaConsumer<>(props); + Consumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("multi_partition_topic")); List buffer = new ArrayList<>(); long lastProcessedTime = System.nanoTime(); while (true) { - ConsumerRecords records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); + ConsumerRecords records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); records.forEach(record -> { buffer.add(record.value()); }); @@ -43,7 +43,7 @@ public class ExtSeqWithTimeWindowConsumer { private static void processBuffer(List buffer) { Collections.sort(buffer); buffer.forEach(message -> { - System.out.println("Processing message with Partition key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier()); + System.out.println("Processing message with Global Sequence number: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier()); }); buffer.clear(); } diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java index a20c569159..99e05990cb 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java @@ -19,6 +19,7 @@ public class ExtSeqWithTimeWindowProducer { for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) { long applicationIdentifier = Message.getRandomApplicationIdentifier(); Message message = new Message(partitionKey, applicationIdentifier); + message.setGlobalSequenceNumber(partitionKey); producer.send(new ProducerRecord<>("multi_partition_topic", partitionKey, message)); System.out.println("Partition key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier()); } diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java index 734ecba53d..694d84f9d8 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java @@ -8,6 +8,8 @@ public class Message implements Comparable { private long partitionKey; private long applicationIdentifier; + private long globalSequenceNumber; + public Message(){ } @@ -26,9 +28,17 @@ public class Message implements Comparable { return applicationIdentifier; } + public long getGlobalSequenceNumber() { + return globalSequenceNumber; + } + + public void setGlobalSequenceNumber(long globalSequenceNumber) { + this.globalSequenceNumber = globalSequenceNumber; + } + @Override public int compareTo(Message other) { - return Long.compare(this.partitionKey, other.partitionKey); + return Long.compare(this.globalSequenceNumber, other.globalSequenceNumber); } @Override diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowIntegrationTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowIntegrationTest.java new file mode 100644 index 0000000000..a01c230026 --- /dev/null +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowIntegrationTest.java @@ -0,0 +1,138 @@ +package com.baeldung.kafka.message.ordering; + +import com.baeldung.kafka.message.ordering.payload.Message; +import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer; +import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.junit.jupiter.api.Assertions.*; + +@Testcontainers +public class ExtSeqWithTimeWindowIntegrationTest { + private static String TOPIC = "multi_partition_topic"; + private static int PARTITIONS = 5; + private static short REPLICATION_FACTOR = 1; + private static Admin admin; + private static KafkaProducer producer; + private static KafkaConsumer consumer; + private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000); + + private static final long BUFFER_PERIOD_NS = 5000L * 1000000; // 5000 milliseconds converted to nanoseconds + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + @BeforeAll + static void setup() throws ExecutionException, InterruptedException { + KAFKA_CONTAINER.addExposedPort(9092); + + Properties adminProperties = new Properties(); + adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); + + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + admin = Admin.create(adminProperties); + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + List topicList = new ArrayList<>(); + NewTopic newTopic = new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR); + topicList.add(newTopic); + CreateTopicsResult result = admin.createTopics(topicList); + KafkaFuture future = result.values().get(TOPIC); + future.whenComplete((voidResult, exception) -> { + if (exception != null) { + System.err.println("Error creating the topic: " + exception.getMessage()); + } else { + System.out.println("Topic created successfully!"); + } + }).get(); + } + + @AfterAll + static void destroy() { + KAFKA_CONTAINER.stop(); + } + + @Test + void givenMultiplePartitions_whenPublishedToKafkaAndConsumedWithExtSeqNumberAndTimeWindow_thenCheckForMessageOrder() throws ExecutionException, InterruptedException { + List sentMessageList = new ArrayList<>(); + List receivedMessageList = new ArrayList<>(); + for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) { + long applicationIdentifier = Message.getRandomApplicationIdentifier(); + Message message = new Message(partitionKey, applicationIdentifier); + message.setGlobalSequenceNumber(partitionKey); + Future future = producer.send(new ProducerRecord<>(TOPIC, partitionKey, message)); + sentMessageList.add(message); + RecordMetadata metadata = future.get(); + System.out.println("Partition : " + metadata.partition()); + } + + boolean isOrderMaintained = true; + consumer.subscribe(Collections.singletonList(TOPIC)); + List buffer = new ArrayList<>(); + long lastProcessedTime = System.nanoTime(); + ConsumerRecords records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); + records.forEach(record -> { + buffer.add(record.value()); + }); + while (buffer.size() > 0) { + if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_NS) { + processBuffer(buffer, receivedMessageList); + lastProcessedTime = System.nanoTime(); + } + records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); + records.forEach(record -> { + buffer.add(record.value()); + }); + } + for (int insertPosition = 0; insertPosition <= receivedMessageList.size() - 1; insertPosition++) { + if (isOrderMaintained){ + Message sentMessage = sentMessageList.get(insertPosition); + Message receivedMessage = receivedMessageList.get(insertPosition); + if (!sentMessage.equals(receivedMessage)) { + isOrderMaintained = false; + } + } + } + assertTrue(isOrderMaintained); + } + + private static void processBuffer(List buffer, List receivedMessageList) { + Collections.sort(buffer); + buffer.forEach(message -> { + receivedMessageList.add(message); + System.out.println("Processing message with Global Sequence number: " + message.getGlobalSequenceNumber() + ", Application Identifier: " + message.getApplicationIdentifier()); + }); + buffer.clear(); + } +}