Renamed Inser position to patition key

This commit is contained in:
Amol Gote 2023-10-15 18:42:20 -04:00
parent b94d5a1f8a
commit 6d4e6886b3
9 changed files with 18 additions and 22 deletions

@ -42,7 +42,7 @@ public class ExtSeqWithTimeWindowConsumer {
private static void processBuffer(List<Message> buffer) {
Collections.sort(buffer);
buffer.forEach(message -> {
System.out.println("Processing message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
System.out.println("Processing message with Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId());
});
buffer.clear();
}

@ -8,8 +8,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
public class ExtSeqWithTimeWindowProducer {
public static void main(String[] args) {
@ -22,9 +20,9 @@ public class ExtSeqWithTimeWindowProducer {
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
long messageId = Message.getRandomMessageId();
String key = "Key-" + insertPosition;
Message message = new Message(insertPosition, messageId);
Message message = new Message(key, messageId);
producer.send(new ProducerRecord<>("multi_partition_topic", key, message));
System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
System.out.println("Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId());
}
producer.close();
System.out.println("ExternalSequencingProducer Completed.");

@ -29,7 +29,7 @@ public class MultiPartitionConsumer {
records.forEach(record -> {
Message message = record.value();
if (message != null) {
System.out.println("Process message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
System.out.println("Process message with Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId());
}
});
}

@ -20,9 +20,9 @@ public class MultiPartitionProducer {
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
long messageId = Message.getRandomMessageId();
String key = "Key-" + insertPosition;
Message message = new Message(insertPosition, messageId);
Message message = new Message(key, messageId);
producer.send(new ProducerRecord<>("multi_partition_topic", key, message));
System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
System.out.println("Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId());
}
producer.close();
System.out.println("SinglePartitionProducer Completed.");

@ -7,7 +7,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
@ -30,7 +29,7 @@ public class SinglePartitionConsumer {
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
Message message = record.value();
System.out.println("Process message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
System.out.println("Process message with Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId());
});
}
}

@ -8,7 +8,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Random;
public class SinglePartitionProducer {
public static void main(String[] args) {
@ -21,9 +20,9 @@ public class SinglePartitionProducer {
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
long messageId = Message.getRandomMessageId();
String key = "Key-" + insertPosition;
Message message = new Message(insertPosition, messageId);
Message message = new Message(key, messageId);
producer.send(new ProducerRecord<>("single_partition_topic", key, message));
System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
System.out.println("Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId());
}
producer.close();
System.out.println("SinglePartitionProducer Completed.");

@ -1,11 +1,11 @@
package com.baeldung.kafka.message.ordering.payload;
import javax.swing.*;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
public class Message implements Comparable<Message> {
private long insertPosition;
private String partitionKey;
private long messageId;
public Message(){
@ -13,13 +13,13 @@ public class Message implements Comparable<Message> {
}
//Required for Kafka Serialization and Deserialization
public Message(long insertPosition, long messageId) {
this.insertPosition = insertPosition;
public Message(String partitionKey, long messageId) {
this.partitionKey = partitionKey;
this.messageId = messageId;
}
public long getInsertPosition() {
return insertPosition;
public String getPartitionKey() {
return partitionKey;
}
public long getMessageId() {
@ -40,7 +40,7 @@ public class Message implements Comparable<Message> {
return false;
}
Message message = (Message) obj;
return this.messageId == message.getMessageId() && this.insertPosition == message.getInsertPosition();
return this.messageId == message.getMessageId() && Objects.equals(this.partitionKey, message.getPartitionKey());
}
public static long getRandomMessageId() {

@ -90,7 +90,7 @@ public class MultiplePartitionTest {
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
long messageId = Message.getRandomMessageId();
String key = "Key-" + insertPosition;
Message message = new Message(insertPosition, messageId);
Message message = new Message(key, messageId);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC, key, message));
sentMessageList.add(message);
RecordMetadata metadata = future.get();

@ -92,7 +92,7 @@ public class SinglePartitionTest {
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
long messageId = Message.getRandomMessageId();
String key = "Key-" + insertPosition;
Message message = new Message(insertPosition, messageId);
Message message = new Message(key, messageId);
ProducerRecord<String, Message> producerRecord = new ProducerRecord<>(TOPIC, key, message);
Future<RecordMetadata> future = producer.send(producerRecord);
sentMessageList.add(message);