diff --git a/apache-kafka-2/pom.xml b/apache-kafka-2/pom.xml index 45b31004b7..d0838a386e 100644 --- a/apache-kafka-2/pom.xml +++ b/apache-kafka-2/pom.xml @@ -60,7 +60,7 @@ com.fasterxml.jackson.core jackson-databind - 2.15.2 + ${jackson.databind.version} @@ -69,6 +69,7 @@ 2.8.0 1.15.3 1.15.3 + 2.15.2 \ No newline at end of file diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java index b185d663d4..317aec699e 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java @@ -1,5 +1,6 @@ package com.baeldung.kafka.message.ordering.payload; +import javax.swing.*; import java.util.Random; public class Message implements Comparable { @@ -10,6 +11,7 @@ public class Message implements Comparable { } + //Required for Kafka Serialization and Deserialization public Message(long insertPosition, long messageId) { this.insertPosition = insertPosition; this.messageId = messageId; @@ -28,6 +30,18 @@ public class Message implements Comparable { return Long.compare(this.messageId, other.messageId); } + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof Message)) { + return false; + } + Message message = (Message) obj; + return this.messageId == message.getMessageId() && this.insertPosition == message.getInsertPosition(); + } + public static long getRandomMessageId() { Random rand = new Random(); return rand.nextInt(1000); diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionTest.java new file mode 100644 index 0000000000..586c328f79 --- /dev/null +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionTest.java @@ -0,0 +1,114 @@ +package com.baeldung.kafka.message.ordering; + +import com.baeldung.kafka.message.ordering.payload.Message; +import lombok.var; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.PartitionInfo; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.junit.jupiter.api.Assertions.*; + +@Testcontainers +public class MultiplePartitionTest { + private static String TOPIC = "multi_partition_topic"; + private static int PARTITIONS = 5; + private static short REPLICATION_FACTOR = 1; + private static Admin admin; + private static KafkaProducer producer; + private static KafkaConsumer consumer; + private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000); + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + @BeforeAll + static void setup() throws ExecutionException, InterruptedException { + KAFKA_CONTAINER.addExposedPort(9092); + + Properties adminProperties = new Properties(); + adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProperties.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer"); + + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer"); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put("value.deserializer.serializedClass", Message.class); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + admin = Admin.create(adminProperties); + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + List topicList = new ArrayList<>(); + NewTopic newTopic = new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR); + topicList.add(newTopic); + CreateTopicsResult result = admin.createTopics(topicList); + KafkaFuture future = result.values().get(TOPIC); + future.whenComplete((voidResult, exception) -> { + if (exception != null) { + System.err.println("Error creating the topic: " + exception.getMessage()); + } else { + System.out.println("Topic created successfully!"); + } + }).get(); + } + + @AfterAll + static void destroy() { + KAFKA_CONTAINER.stop(); + } + + @Test + void givenMultiplePartitions_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException { + List sentMessageList = new ArrayList<>(); + List receivedMessageList = new ArrayList<>(); + for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { + long messageId = Message.getRandomMessageId(); + String key = "Key-" + insertPosition; + Message message = new Message(insertPosition, messageId); + Future future = producer.send(new ProducerRecord<>(TOPIC, key, message)); + sentMessageList.add(message); + RecordMetadata metadata = future.get(); + System.out.println("Partition : " + metadata.partition()); + } + + boolean isOrderMaintained = true; + consumer.subscribe(Collections.singletonList(TOPIC)); + ConsumerRecords records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); + records.forEach(record -> { + Message message = record.value(); + receivedMessageList.add(message); + }); + for (int insertPosition = 0; insertPosition <= receivedMessageList.size() - 1; insertPosition++) { + if (isOrderMaintained){ + Message sentMessage = sentMessageList.get(insertPosition); + Message receivedMessage = receivedMessageList.get(insertPosition); + if (!sentMessage.equals(receivedMessage)) { + isOrderMaintained = false; + } + } + } + assertFalse(isOrderMaintained); + } +} diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionTest.java new file mode 100644 index 0000000000..afffbcc28e --- /dev/null +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionTest.java @@ -0,0 +1,115 @@ +package com.baeldung.kafka.message.ordering; + +import com.baeldung.kafka.message.ordering.payload.Message; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaFuture; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Testcontainers +public class SinglePartitionTest { + private static String TOPIC = "single_partition_topic"; + private static int PARTITIONS = 1; + private static short REPLICATION_FACTOR = 1; + private static Admin admin; + private static KafkaProducer producer; + private static KafkaConsumer consumer; + + private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000); + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + @BeforeAll + static void setup() throws ExecutionException, InterruptedException { + KAFKA_CONTAINER.addExposedPort(9092); + + Properties adminProperties = new Properties(); + adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProperties.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer"); + + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer"); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put("value.deserializer.serializedClass", Message.class); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + admin = Admin.create(adminProperties); + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + List topicList = new ArrayList<>(); + NewTopic newTopic = new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR); + topicList.add(newTopic); + CreateTopicsResult result = admin.createTopics(topicList); + KafkaFuture future = result.values().get(TOPIC); + future.whenComplete((voidResult, exception) -> { + if (exception != null) { + System.err.println("Error creating the topic: " + exception.getMessage()); + } else { + System.out.println("Topic created successfully!"); + } + }).get(); + } + + @AfterAll + static void destroy() { + KAFKA_CONTAINER.stop(); + } + + @Test + void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException { + List sentMessageList = new ArrayList<>(); + List receivedMessageList = new ArrayList<>(); + for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { + long messageId = Message.getRandomMessageId(); + String key = "Key-" + insertPosition; + Message message = new Message(insertPosition, messageId); + ProducerRecord producerRecord = new ProducerRecord<>(TOPIC, key, message); + Future future = producer.send(producerRecord); + sentMessageList.add(message); + RecordMetadata metadata = future.get(); + System.out.println("Partition : " + metadata.partition()); + } + + consumer.subscribe(Collections.singletonList(TOPIC)); + ConsumerRecords records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); + records.forEach(record -> { + Message message = record.value(); + receivedMessageList.add(message); + }); + boolean result = true; + for (int count = 0; count <= 9 ; count++) { + Message sentMessage = sentMessageList.get(count); + Message receivedMessage = receivedMessageList.get(count); + if (!sentMessage.equals(receivedMessage) && result){ + result = false; + } + } + assertTrue(result); + } +}