From 6d4e6886b311ffe28794033bc555e80b274fe40b Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sun, 15 Oct 2023 18:42:20 -0400 Subject: [PATCH] Renamed Inser position to patition key --- .../ordering/ExtSeqWithTimeWindowConsumer.java | 2 +- .../ordering/ExtSeqWithTimeWindowProducer.java | 6 ++---- .../message/ordering/MultiPartitionConsumer.java | 2 +- .../message/ordering/MultiPartitionProducer.java | 4 ++-- .../message/ordering/SinglePartitionConsumer.java | 3 +-- .../message/ordering/SinglePartitionProducer.java | 5 ++--- .../kafka/message/ordering/payload/Message.java | 14 +++++++------- .../message/ordering/MultiplePartitionTest.java | 2 +- .../message/ordering/SinglePartitionTest.java | 2 +- 9 files changed, 18 insertions(+), 22 deletions(-) diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java index f5a0dbd640..d342c1a950 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java @@ -42,7 +42,7 @@ public class ExtSeqWithTimeWindowConsumer { private static void processBuffer(List buffer) { Collections.sort(buffer); buffer.forEach(message -> { - System.out.println("Processing message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + System.out.println("Processing message with Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId()); }); buffer.clear(); } diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java index 110015de25..d1480522e5 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java @@ -8,8 +8,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; -import java.util.Random; -import java.util.concurrent.atomic.AtomicLong; public class ExtSeqWithTimeWindowProducer { public static void main(String[] args) { @@ -22,9 +20,9 @@ public class ExtSeqWithTimeWindowProducer { for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { long messageId = Message.getRandomMessageId(); String key = "Key-" + insertPosition; - Message message = new Message(insertPosition, messageId); + Message message = new Message(key, messageId); producer.send(new ProducerRecord<>("multi_partition_topic", key, message)); - System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + System.out.println("Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId()); } producer.close(); System.out.println("ExternalSequencingProducer Completed."); diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionConsumer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionConsumer.java index 542a664745..4471070f0f 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionConsumer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionConsumer.java @@ -29,7 +29,7 @@ public class MultiPartitionConsumer { records.forEach(record -> { Message message = record.value(); if (message != null) { - System.out.println("Process message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + System.out.println("Process message with Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId()); } }); } diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java index bf9db58392..04e3dcce0a 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java @@ -20,9 +20,9 @@ public class MultiPartitionProducer { for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { long messageId = Message.getRandomMessageId(); String key = "Key-" + insertPosition; - Message message = new Message(insertPosition, messageId); + Message message = new Message(key, messageId); producer.send(new ProducerRecord<>("multi_partition_topic", key, message)); - System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + System.out.println("Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId()); } producer.close(); System.out.println("SinglePartitionProducer Completed."); diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionConsumer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionConsumer.java index e1a449055e..b47e4ca3b0 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionConsumer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionConsumer.java @@ -7,7 +7,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.Collections; @@ -30,7 +29,7 @@ public class SinglePartitionConsumer { ConsumerRecords records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); records.forEach(record -> { Message message = record.value(); - System.out.println("Process message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + System.out.println("Process message with Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId()); }); } } diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionProducer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionProducer.java index c986089841..d669a0fd69 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionProducer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionProducer.java @@ -8,7 +8,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; -import java.util.Random; public class SinglePartitionProducer { public static void main(String[] args) { @@ -21,9 +20,9 @@ public class SinglePartitionProducer { for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { long messageId = Message.getRandomMessageId(); String key = "Key-" + insertPosition; - Message message = new Message(insertPosition, messageId); + Message message = new Message(key, messageId); producer.send(new ProducerRecord<>("single_partition_topic", key, message)); - System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + System.out.println("Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId()); } producer.close(); System.out.println("SinglePartitionProducer Completed."); diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java index 095aeef89a..de1e5135da 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java @@ -1,11 +1,11 @@ package com.baeldung.kafka.message.ordering.payload; -import javax.swing.*; +import java.util.Objects; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; public class Message implements Comparable { - private long insertPosition; + private String partitionKey; private long messageId; public Message(){ @@ -13,13 +13,13 @@ public class Message implements Comparable { } //Required for Kafka Serialization and Deserialization - public Message(long insertPosition, long messageId) { - this.insertPosition = insertPosition; + public Message(String partitionKey, long messageId) { + this.partitionKey = partitionKey; this.messageId = messageId; } - public long getInsertPosition() { - return insertPosition; + public String getPartitionKey() { + return partitionKey; } public long getMessageId() { @@ -40,7 +40,7 @@ public class Message implements Comparable { return false; } Message message = (Message) obj; - return this.messageId == message.getMessageId() && this.insertPosition == message.getInsertPosition(); + return this.messageId == message.getMessageId() && Objects.equals(this.partitionKey, message.getPartitionKey()); } public static long getRandomMessageId() { diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionTest.java index 5b68544f95..aed5f30e9d 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionTest.java @@ -90,7 +90,7 @@ public class MultiplePartitionTest { for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { long messageId = Message.getRandomMessageId(); String key = "Key-" + insertPosition; - Message message = new Message(insertPosition, messageId); + Message message = new Message(key, messageId); Future future = producer.send(new ProducerRecord<>(TOPIC, key, message)); sentMessageList.add(message); RecordMetadata metadata = future.get(); diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionTest.java index 807e21bfa8..5751c8d0e0 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionTest.java @@ -92,7 +92,7 @@ public class SinglePartitionTest { for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { long messageId = Message.getRandomMessageId(); String key = "Key-" + insertPosition; - Message message = new Message(insertPosition, messageId); + Message message = new Message(key, messageId); ProducerRecord producerRecord = new ProducerRecord<>(TOPIC, key, message); Future future = producer.send(producerRecord); sentMessageList.add(message);