Message to User Event Rename

This commit is contained in:
Amol Gote 2023-10-17 19:39:27 -04:00
parent baaef6bcf3
commit 5b936c47a0
11 changed files with 163 additions and 165 deletions

View File

@ -1,13 +1,12 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
@ -23,13 +22,13 @@ public class ExtSeqWithTimeWindowConsumer {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
Consumer<Long, Message> consumer = new KafkaConsumer<>(props);
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
Consumer<Long, UserEvent> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("multi_partition_topic"));
List<Message> buffer = new ArrayList<>();
List<UserEvent> buffer = new ArrayList<>();
long lastProcessedTime = System.nanoTime();
while (true) {
ConsumerRecords<Long, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
@ -40,10 +39,10 @@ public class ExtSeqWithTimeWindowConsumer {
}
}
private static void processBuffer(List<Message> buffer) {
private static void processBuffer(List<UserEvent> buffer) {
Collections.sort(buffer);
buffer.forEach(message -> {
System.out.println("Processing message with Global Sequence number: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
buffer.forEach(userEvent -> {
System.out.println("Processing message with Global Sequence number: " + userEvent.getGlobalSequenceNumber() + ", event nano time : " + userEvent.getEventNanoTime());
});
buffer.clear();
}

View File

@ -1,6 +1,6 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -8,6 +8,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import java.util.Properties;
import java.util.UUID;
public class ExtSeqWithTimeWindowProducer {
public static void main(String[] args) {
@ -15,13 +16,13 @@ public class ExtSeqWithTimeWindowProducer {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, Message> producer = new KafkaProducer<>(props);
for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
long applicationIdentifier = Message.getRandomApplicationIdentifier();
Message message = new Message(partitionKey, applicationIdentifier);
message.setGlobalSequenceNumber(partitionKey);
producer.send(new ProducerRecord<>("multi_partition_topic", partitionKey, message));
System.out.println("Partition key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props);
for (long sequenceNumber = 1; sequenceNumber <= 10 ; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
userEvent.setGlobalSequenceNumber(sequenceNumber);
producer.send(new ProducerRecord<>("multi_partition_topic", sequenceNumber, userEvent));
System.out.println("User Event Nano time : " + userEvent.getEventNanoTime() + ", User Event Id: " + userEvent.getUserEventId());
}
producer.close();
System.out.println("ExternalSequencingProducer Completed.");

View File

@ -1,13 +1,12 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
@ -22,15 +21,15 @@ public class MultiPartitionConsumer {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
Consumer<String, Message> consumer = new KafkaConsumer<>(props);
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
Consumer<String, UserEvent> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("multi_partition_topic"));
while (true) {
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
ConsumerRecords<String, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
Message message = record.value();
if (message != null) {
System.out.println("Process message with Partition key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
UserEvent userEvent = record.value();
if (userEvent != null) {
System.out.println("Process message with event nano time : " + userEvent.getEventNanoTime() + ", Event ID: " + userEvent.getUserEventId());
}
});
}

View File

@ -1,6 +1,6 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -8,6 +8,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import java.util.Properties;
import java.util.UUID;
public class MultiPartitionProducer {
public static void main(String[] args) {
@ -15,12 +16,12 @@ public class MultiPartitionProducer {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, Message> producer = new KafkaProducer<>(props);
for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
long applicationIdentifier = Message.getRandomApplicationIdentifier();
Message message = new Message(partitionKey, applicationIdentifier);
producer.send(new ProducerRecord<>("multi_partition_topic", partitionKey, message));
System.out.println("Partition Key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props);
for (long count = 1; count <= 10 ; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
producer.send(new ProducerRecord<>("multi_partition_topic", count, userEvent));
System.out.println("Process message with Event ID: " + userEvent.getUserEventId());
}
producer.close();
System.out.println("SinglePartitionProducer Completed.");

View File

@ -1,13 +1,12 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
@ -23,14 +22,14 @@ public class SinglePartitionConsumer {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
Consumer<Long, Message> consumer = new KafkaConsumer<>(props);
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
Consumer<Long, UserEvent> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("single_partition_topic"));
while (true) {
ConsumerRecords<Long, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
Message message = record.value();
System.out.println("Process message with Partition Key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
UserEvent userEvent = record.value();
System.out.println("Process message with event nano time : " + userEvent.getEventNanoTime() + ", Event ID: " + userEvent.getUserEventId());
});
}
}

View File

@ -1,13 +1,15 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import java.time.Instant;
import java.util.Properties;
import java.util.UUID;
public class SinglePartitionProducer {
public static void main(String[] args) {
@ -15,12 +17,12 @@ public class SinglePartitionProducer {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, Message> producer = new KafkaProducer<>(props);
for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
long applicationIdentifier = Message.getRandomApplicationIdentifier();
Message message = new Message(partitionKey, applicationIdentifier);
producer.send(new ProducerRecord<>("single_partition_topic", partitionKey, message));
System.out.println("Partition key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props);
for (long count = 1; count <= 10 ; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
producer.send(new ProducerRecord<>("single_partition_topic", count, userEvent));
System.out.println("Process message with Event ID: " + userEvent.getUserEventId());
}
producer.close();
System.out.println("SinglePartitionProducer Completed.");

View File

@ -1,61 +0,0 @@
package com.baeldung.kafka.message.ordering.payload;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
public class Message implements Comparable<Message> {
private long partitionKey;
private long applicationIdentifier;
private long globalSequenceNumber;
public Message(){
}
//Required for Kafka Serialization and Deserialization
public Message(long partitionKey, long applicationIdentifier) {
this.partitionKey = partitionKey;
this.applicationIdentifier = applicationIdentifier;
}
public long getPartitionKey() {
return partitionKey;
}
public long getApplicationIdentifier() {
return applicationIdentifier;
}
public long getGlobalSequenceNumber() {
return globalSequenceNumber;
}
public void setGlobalSequenceNumber(long globalSequenceNumber) {
this.globalSequenceNumber = globalSequenceNumber;
}
@Override
public int compareTo(Message other) {
return Long.compare(this.globalSequenceNumber, other.globalSequenceNumber);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof Message)) {
return false;
}
Message message = (Message) obj;
return this.applicationIdentifier == message.getApplicationIdentifier() && Objects.equals(this.partitionKey, message.getPartitionKey());
}
public static long getRandomApplicationIdentifier() {
Random rand = new Random();
return ThreadLocalRandom.current().nextInt(1000);
}
}

View File

@ -0,0 +1,58 @@
package com.baeldung.kafka.message.ordering.payload;
import java.util.Objects;
public class UserEvent implements Comparable<UserEvent> {
private String userEventId;
private long eventNanoTime;
private long globalSequenceNumber;
public UserEvent(){
}
//Required for Kafka Serialization and Deserialization
public UserEvent(String userEventId) {
this.userEventId = userEventId;
}
public String getUserEventId() {
return userEventId;
}
public long getEventNanoTime() {
return eventNanoTime;
}
public void setEventNanoTime(long eventNanoTime) {
this.eventNanoTime = eventNanoTime;
}
public long getGlobalSequenceNumber() {
return globalSequenceNumber;
}
public void setGlobalSequenceNumber(long globalSequenceNumber) {
this.globalSequenceNumber = globalSequenceNumber;
}
@Override
public int compareTo(UserEvent other) {
return Long.compare(this.globalSequenceNumber, other.globalSequenceNumber);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof UserEvent)) {
return false;
}
UserEvent userEvent = (UserEvent) obj;
return Objects.equals(this.userEventId, userEvent.getUserEventId())
&& userEvent.getEventNanoTime() == this.eventNanoTime;
}
}

View File

@ -1,6 +1,6 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.admin.*;
@ -34,8 +34,8 @@ public class ExtSeqWithTimeWindowIntegrationTest {
private static int PARTITIONS = 5;
private static short REPLICATION_FACTOR = 1;
private static Admin admin;
private static KafkaProducer<Long, Message> producer;
private static KafkaConsumer<Long, Message> consumer;
private static KafkaProducer<Long, UserEvent> producer;
private static KafkaConsumer<Long, UserEvent> consumer;
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000);
private static final long BUFFER_PERIOD_NS = 5000L * 1000000; // 5000 milliseconds converted to nanoseconds
@ -59,7 +59,7 @@ public class ExtSeqWithTimeWindowIntegrationTest {
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
admin = Admin.create(adminProperties);
producer = new KafkaProducer<>(producerProperties);
@ -85,29 +85,29 @@ public class ExtSeqWithTimeWindowIntegrationTest {
@Test
void givenMultiplePartitions_whenPublishedToKafkaAndConsumedWithExtSeqNumberAndTimeWindow_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<Message> sentMessageList = new ArrayList<>();
List<Message> receivedMessageList = new ArrayList<>();
for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
long applicationIdentifier = Message.getRandomApplicationIdentifier();
Message message = new Message(partitionKey, applicationIdentifier);
message.setGlobalSequenceNumber(partitionKey);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC, partitionKey, message));
sentMessageList.add(message);
List<UserEvent> sentUserEventList = new ArrayList<>();
List<UserEvent> receivedUserEventList = new ArrayList<>();
for (long sequenceNumber = 1; sequenceNumber <= 10 ; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
userEvent.setGlobalSequenceNumber(sequenceNumber);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC, sequenceNumber, userEvent));
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
System.out.println("Partition : " + metadata.partition());
}
boolean isOrderMaintained = true;
consumer.subscribe(Collections.singletonList(TOPIC));
List<Message> buffer = new ArrayList<>();
List<UserEvent> buffer = new ArrayList<>();
long lastProcessedTime = System.nanoTime();
ConsumerRecords<Long, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
while (buffer.size() > 0) {
if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_NS) {
processBuffer(buffer, receivedMessageList);
processBuffer(buffer, receivedUserEventList);
lastProcessedTime = System.nanoTime();
}
records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
@ -115,11 +115,11 @@ public class ExtSeqWithTimeWindowIntegrationTest {
buffer.add(record.value());
});
}
for (int insertPosition = 0; insertPosition <= receivedMessageList.size() - 1; insertPosition++) {
for (int insertPosition = 0; insertPosition <= receivedUserEventList.size() - 1; insertPosition++) {
if (isOrderMaintained){
Message sentMessage = sentMessageList.get(insertPosition);
Message receivedMessage = receivedMessageList.get(insertPosition);
if (!sentMessage.equals(receivedMessage)) {
UserEvent sentUserEvent = sentUserEventList.get(insertPosition);
UserEvent receivedUserEvent = receivedUserEventList.get(insertPosition);
if (!sentUserEvent.equals(receivedUserEvent)) {
isOrderMaintained = false;
}
}
@ -127,11 +127,11 @@ public class ExtSeqWithTimeWindowIntegrationTest {
assertTrue(isOrderMaintained);
}
private static void processBuffer(List<Message> buffer, List<Message> receivedMessageList) {
private static void processBuffer(List<UserEvent> buffer, List<UserEvent> receivedUserEventList) {
Collections.sort(buffer);
buffer.forEach(message -> {
receivedMessageList.add(message);
System.out.println("Processing message with Global Sequence number: " + message.getGlobalSequenceNumber() + ", Application Identifier: " + message.getApplicationIdentifier());
buffer.forEach(userEvent -> {
receivedUserEventList.add(userEvent);
System.out.println("Process message with Event ID: " + userEvent.getUserEventId());
});
buffer.clear();
}

View File

@ -1,6 +1,6 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.admin.*;
@ -34,8 +34,8 @@ public class MultiplePartitionIntegrationTest {
private static int PARTITIONS = 5;
private static short REPLICATION_FACTOR = 1;
private static Admin admin;
private static KafkaProducer<Long, Message> producer;
private static KafkaConsumer<Long, Message> consumer;
private static KafkaProducer<Long, UserEvent> producer;
private static KafkaConsumer<Long, UserEvent> consumer;
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000);
@Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@ -57,7 +57,7 @@ public class MultiplePartitionIntegrationTest {
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
admin = Admin.create(adminProperties);
producer = new KafkaProducer<>(producerProperties);
@ -83,29 +83,29 @@ public class MultiplePartitionIntegrationTest {
@Test
void givenMultiplePartitions_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<Message> sentMessageList = new ArrayList<>();
List<Message> receivedMessageList = new ArrayList<>();
for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
long applicationIdentifier = Message.getRandomApplicationIdentifier();
Message message = new Message(partitionKey, applicationIdentifier);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC, partitionKey, message));
sentMessageList.add(message);
List<UserEvent> sentUserEventList = new ArrayList<>();
List<UserEvent> receivedUserEventList = new ArrayList<>();
for (long count = 1; count <= 10 ; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC, count, userEvent));
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
System.out.println("Partition : " + metadata.partition());
}
boolean isOrderMaintained = true;
consumer.subscribe(Collections.singletonList(TOPIC));
ConsumerRecords<Long, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
Message message = record.value();
receivedMessageList.add(message);
UserEvent userEvent = record.value();
receivedUserEventList.add(userEvent);
});
for (int insertPosition = 0; insertPosition <= receivedMessageList.size() - 1; insertPosition++) {
for (int insertPosition = 0; insertPosition <= receivedUserEventList.size() - 1; insertPosition++) {
if (isOrderMaintained){
Message sentMessage = sentMessageList.get(insertPosition);
Message receivedMessage = receivedMessageList.get(insertPosition);
if (!sentMessage.equals(receivedMessage)) {
UserEvent sentUserEvent = sentUserEventList.get(insertPosition);
UserEvent receivedUserEvent = receivedUserEventList.get(insertPosition);
if (!sentUserEvent.equals(receivedUserEvent)) {
isOrderMaintained = false;
}
}

View File

@ -1,6 +1,6 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.admin.Admin;
@ -36,8 +36,8 @@ public class SinglePartitionIntegrationTest {
private static int PARTITIONS = 1;
private static short REPLICATION_FACTOR = 1;
private static Admin admin;
private static KafkaProducer<Long, Message> producer;
private static KafkaConsumer<Long, Message> consumer;
private static KafkaProducer<Long, UserEvent> producer;
private static KafkaConsumer<Long, UserEvent> consumer;
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000);
@ -61,7 +61,7 @@ public class SinglePartitionIntegrationTest {
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
admin = Admin.create(adminProperties);
producer = new KafkaProducer<>(producerProperties);
@ -87,29 +87,29 @@ public class SinglePartitionIntegrationTest {
@Test
void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<Message> sentMessageList = new ArrayList<>();
List<Message> receivedMessageList = new ArrayList<>();
for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
long applicationIdentifier = Message.getRandomApplicationIdentifier();
Message message = new Message(partitionKey, applicationIdentifier);
ProducerRecord<Long, Message> producerRecord = new ProducerRecord<>(TOPIC, partitionKey, message);
List<UserEvent> sentUserEventList = new ArrayList<>();
List<UserEvent> receivedUserEventList = new ArrayList<>();
for (long count = 1; count <= 10 ; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(TOPIC, userEvent);
Future<RecordMetadata> future = producer.send(producerRecord);
sentMessageList.add(message);
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
System.out.println("Partition : " + metadata.partition());
}
consumer.subscribe(Collections.singletonList(TOPIC));
ConsumerRecords<Long, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
Message message = record.value();
receivedMessageList.add(message);
UserEvent userEvent = record.value();
receivedUserEventList.add(userEvent);
});
boolean result = true;
for (int count = 0; count <= 9 ; count++) {
Message sentMessage = sentMessageList.get(count);
Message receivedMessage = receivedMessageList.get(count);
if (!sentMessage.equals(receivedMessage) && result){
UserEvent sentUserEvent = sentUserEventList.get(count);
UserEvent receivedUserEvent = receivedUserEventList.get(count);
if (!sentUserEvent.equals(receivedUserEvent) && result){
result = false;
}
}