Removed insertPosition, renamed messageId to applicationIdentifier

This commit is contained in:
Amol Gote 2023-10-15 19:19:01 -04:00
parent 6d4e6886b3
commit d1d456e59a
9 changed files with 69 additions and 76 deletions

View File

@ -6,20 +6,21 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
public class ExtSeqWithTimeWindowConsumer { public class ExtSeqWithTimeWindowConsumer {
private static final long BUFFER_PERIOD_MS = 5000; private static final long BUFFER_PERIOD_NS = 5000L * 1000000; // 5000 milliseconds converted to nanoseconds
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100); private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100);
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
@ -32,7 +33,7 @@ public class ExtSeqWithTimeWindowConsumer {
records.forEach(record -> { records.forEach(record -> {
buffer.add(record.value()); buffer.add(record.value());
}); });
if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_MS) { if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_NS) {
processBuffer(buffer); processBuffer(buffer);
lastProcessedTime = System.nanoTime(); lastProcessedTime = System.nanoTime();
} }
@ -42,7 +43,7 @@ public class ExtSeqWithTimeWindowConsumer {
private static void processBuffer(List<Message> buffer) { private static void processBuffer(List<Message> buffer) {
Collections.sort(buffer); Collections.sort(buffer);
buffer.forEach(message -> { buffer.forEach(message -> {
System.out.println("Processing message with Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId()); System.out.println("Processing message with Partition key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
}); });
buffer.clear(); buffer.clear();
} }

View File

@ -5,7 +5,7 @@ import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.LongSerializer;
import java.util.Properties; import java.util.Properties;
@ -13,16 +13,14 @@ public class ExtSeqWithTimeWindowProducer {
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, Message> producer = new KafkaProducer<>(props);
KafkaProducer<String, Message> producer = new KafkaProducer<>(props); for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { long applicationIdentifier = Message.getRandomApplicationIdentifier();
long messageId = Message.getRandomMessageId(); Message message = new Message(partitionKey, applicationIdentifier);
String key = "Key-" + insertPosition; producer.send(new ProducerRecord<>("multi_partition_topic", partitionKey, message));
Message message = new Message(key, messageId); System.out.println("Partition key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
producer.send(new ProducerRecord<>("multi_partition_topic", key, message));
System.out.println("Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId());
} }
producer.close(); producer.close();
System.out.println("ExternalSequencingProducer Completed."); System.out.println("ExternalSequencingProducer Completed.");

View File

@ -6,6 +6,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.time.Duration;
@ -18,7 +19,7 @@ public class MultiPartitionConsumer {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
@ -29,7 +30,7 @@ public class MultiPartitionConsumer {
records.forEach(record -> { records.forEach(record -> {
Message message = record.value(); Message message = record.value();
if (message != null) { if (message != null) {
System.out.println("Process message with Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId()); System.out.println("Process message with Partition key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
} }
}); });
} }

View File

@ -5,7 +5,7 @@ import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.LongSerializer;
import java.util.Properties; import java.util.Properties;
@ -13,16 +13,14 @@ public class MultiPartitionProducer {
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, Message> producer = new KafkaProducer<>(props);
KafkaProducer<String, Message> producer = new KafkaProducer<>(props); for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { long applicationIdentifier = Message.getRandomApplicationIdentifier();
long messageId = Message.getRandomMessageId(); Message message = new Message(partitionKey, applicationIdentifier);
String key = "Key-" + insertPosition; producer.send(new ProducerRecord<>("multi_partition_topic", partitionKey, message));
Message message = new Message(key, messageId); System.out.println("Partition Key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
producer.send(new ProducerRecord<>("multi_partition_topic", key, message));
System.out.println("Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId());
} }
producer.close(); producer.close();
System.out.println("SinglePartitionProducer Completed."); System.out.println("SinglePartitionProducer Completed.");

View File

@ -6,6 +6,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.time.Duration;
@ -19,17 +20,17 @@ public class SinglePartitionConsumer {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
Consumer<String, Message> consumer = new KafkaConsumer<>(props); Consumer<Long, Message> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("single_partition_topic")); consumer.subscribe(Collections.singletonList("single_partition_topic"));
while (true) { while (true) {
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); ConsumerRecords<Long, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> { records.forEach(record -> {
Message message = record.value(); Message message = record.value();
System.out.println("Process message with Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId()); System.out.println("Process message with Partition Key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
}); });
} }
} }

View File

@ -5,7 +5,7 @@ import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.LongSerializer;
import java.util.Properties; import java.util.Properties;
@ -13,16 +13,14 @@ public class SinglePartitionProducer {
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, Message> producer = new KafkaProducer<>(props);
KafkaProducer<String, Message> producer = new KafkaProducer<>(props); for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { long applicationIdentifier = Message.getRandomApplicationIdentifier();
long messageId = Message.getRandomMessageId(); Message message = new Message(partitionKey, applicationIdentifier);
String key = "Key-" + insertPosition; producer.send(new ProducerRecord<>("single_partition_topic", partitionKey, message));
Message message = new Message(key, messageId); System.out.println("Partition key: " + message.getPartitionKey() + ", Application Identifier: " + message.getApplicationIdentifier());
producer.send(new ProducerRecord<>("single_partition_topic", key, message));
System.out.println("Insert Position: " + message.getPartitionKey() + ", Message Id: " + message.getMessageId());
} }
producer.close(); producer.close();
System.out.println("SinglePartitionProducer Completed."); System.out.println("SinglePartitionProducer Completed.");

View File

@ -5,30 +5,30 @@ import java.util.Random;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
public class Message implements Comparable<Message> { public class Message implements Comparable<Message> {
private String partitionKey; private long partitionKey;
private long messageId; private long applicationIdentifier;
public Message(){ public Message(){
} }
//Required for Kafka Serialization and Deserialization //Required for Kafka Serialization and Deserialization
public Message(String partitionKey, long messageId) { public Message(long partitionKey, long applicationIdentifier) {
this.partitionKey = partitionKey; this.partitionKey = partitionKey;
this.messageId = messageId; this.applicationIdentifier = applicationIdentifier;
} }
public String getPartitionKey() { public long getPartitionKey() {
return partitionKey; return partitionKey;
} }
public long getMessageId() { public long getApplicationIdentifier() {
return messageId; return applicationIdentifier;
} }
@Override @Override
public int compareTo(Message other) { public int compareTo(Message other) {
return Long.compare(this.messageId, other.messageId); return Long.compare(this.partitionKey, other.partitionKey);
} }
@Override @Override
@ -40,10 +40,10 @@ public class Message implements Comparable<Message> {
return false; return false;
} }
Message message = (Message) obj; Message message = (Message) obj;
return this.messageId == message.getMessageId() && Objects.equals(this.partitionKey, message.getPartitionKey()); return this.applicationIdentifier == message.getApplicationIdentifier() && Objects.equals(this.partitionKey, message.getPartitionKey());
} }
public static long getRandomMessageId() { public static long getRandomApplicationIdentifier() {
Random rand = new Random(); Random rand = new Random();
return ThreadLocalRandom.current().nextInt(1000); return ThreadLocalRandom.current().nextInt(1000);
} }

View File

@ -3,7 +3,6 @@ package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message; import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer; import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer; import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import lombok.var;
import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -13,9 +12,8 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -36,8 +34,8 @@ public class MultiplePartitionTest {
private static int PARTITIONS = 5; private static int PARTITIONS = 5;
private static short REPLICATION_FACTOR = 1; private static short REPLICATION_FACTOR = 1;
private static Admin admin; private static Admin admin;
private static KafkaProducer<String, Message> producer; private static KafkaProducer<Long, Message> producer;
private static KafkaConsumer<String, Message> consumer; private static KafkaConsumer<Long, Message> consumer;
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000); private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000);
@Container @Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@ -51,12 +49,12 @@ public class MultiplePartitionTest {
Properties producerProperties = new Properties(); Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
Properties consumerProperties = new Properties(); Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class); consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
@ -87,11 +85,10 @@ public class MultiplePartitionTest {
void givenMultiplePartitions_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException { void givenMultiplePartitions_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<Message> sentMessageList = new ArrayList<>(); List<Message> sentMessageList = new ArrayList<>();
List<Message> receivedMessageList = new ArrayList<>(); List<Message> receivedMessageList = new ArrayList<>();
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
long messageId = Message.getRandomMessageId(); long applicationIdentifier = Message.getRandomApplicationIdentifier();
String key = "Key-" + insertPosition; Message message = new Message(partitionKey, applicationIdentifier);
Message message = new Message(key, messageId); Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC, partitionKey, message));
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC, key, message));
sentMessageList.add(message); sentMessageList.add(message);
RecordMetadata metadata = future.get(); RecordMetadata metadata = future.get();
System.out.println("Partition : " + metadata.partition()); System.out.println("Partition : " + metadata.partition());
@ -99,7 +96,7 @@ public class MultiplePartitionTest {
boolean isOrderMaintained = true; boolean isOrderMaintained = true;
consumer.subscribe(Collections.singletonList(TOPIC)); consumer.subscribe(Collections.singletonList(TOPIC));
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); ConsumerRecords<Long, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> { records.forEach(record -> {
Message message = record.value(); Message message = record.value();
receivedMessageList.add(message); receivedMessageList.add(message);

View File

@ -15,8 +15,8 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -36,8 +36,8 @@ public class SinglePartitionTest {
private static int PARTITIONS = 1; private static int PARTITIONS = 1;
private static short REPLICATION_FACTOR = 1; private static short REPLICATION_FACTOR = 1;
private static Admin admin; private static Admin admin;
private static KafkaProducer<String, Message> producer; private static KafkaProducer<Long, Message> producer;
private static KafkaConsumer<String, Message> consumer; private static KafkaConsumer<Long, Message> consumer;
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000); private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000);
@ -53,12 +53,12 @@ public class SinglePartitionTest {
Properties producerProperties = new Properties(); Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
Properties consumerProperties = new Properties(); Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class); consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
@ -89,11 +89,10 @@ public class SinglePartitionTest {
void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException { void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<Message> sentMessageList = new ArrayList<>(); List<Message> sentMessageList = new ArrayList<>();
List<Message> receivedMessageList = new ArrayList<>(); List<Message> receivedMessageList = new ArrayList<>();
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { for (long partitionKey = 1; partitionKey <= 10 ; partitionKey++) {
long messageId = Message.getRandomMessageId(); long applicationIdentifier = Message.getRandomApplicationIdentifier();
String key = "Key-" + insertPosition; Message message = new Message(partitionKey, applicationIdentifier);
Message message = new Message(key, messageId); ProducerRecord<Long, Message> producerRecord = new ProducerRecord<>(TOPIC, partitionKey, message);
ProducerRecord<String, Message> producerRecord = new ProducerRecord<>(TOPIC, key, message);
Future<RecordMetadata> future = producer.send(producerRecord); Future<RecordMetadata> future = producer.send(producerRecord);
sentMessageList.add(message); sentMessageList.add(message);
RecordMetadata metadata = future.get(); RecordMetadata metadata = future.get();
@ -101,7 +100,7 @@ public class SinglePartitionTest {
} }
consumer.subscribe(Collections.singletonList(TOPIC)); consumer.subscribe(Collections.singletonList(TOPIC));
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); ConsumerRecords<Long, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> { records.forEach(record -> {
Message message = record.value(); Message message = record.value();
receivedMessageList.add(message); receivedMessageList.add(message);