Incorporated Review comments

This commit is contained in:
Amol Gote 2023-11-02 19:40:36 -04:00
parent 88f85963bb
commit b7d743c629
12 changed files with 25 additions and 319 deletions

View File

@ -2,7 +2,10 @@ package com.baeldung.kafka.message.ordering;
public class Config {
public static final String CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS = "value.deserializer.serializedClass";
public static final String KAFKA_LOCAL = "localhost:9092";
public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic";
public static final String SINGLE_PARTITION_TOPIC = "single_partition_topic";
public static final int MULTIPLE_PARTITIONS = 5;
public static final int SINGLE_PARTITION = 1;
public static short REPLICATION_FACTOR = 1;
}

View File

@ -1,36 +0,0 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerConfigurations {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println("Partition: " + record.partition() + ", Message: " + record.value());
});
}
}
}

View File

@ -1,49 +0,0 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.time.Duration;
import java.util.*;
public class ExtSeqWithTimeWindowConsumer {
private static final long BUFFER_PERIOD_NS = 5000L * 1000000; // 5000 milliseconds converted to nanoseconds
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100);
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
Consumer<Long, UserEvent> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
List<UserEvent> buffer = new ArrayList<>();
long lastProcessedTime = System.nanoTime();
while (true) {
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_NS) {
processBuffer(buffer);
lastProcessedTime = System.nanoTime();
}
}
}
private static void processBuffer(List<UserEvent> buffer) {
Collections.sort(buffer);
buffer.forEach(userEvent -> {
System.out.println("Processing message with Global Sequence number: " + userEvent.getGlobalSequenceNumber() + ", User Event Id: " + userEvent.getUserEventId());
});
buffer.clear();
}
}

View File

@ -1,34 +0,0 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class ExtSeqWithTimeWindowProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props);
for (long sequenceNumber = 1; sequenceNumber <= 10 ; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
userEvent.setGlobalSequenceNumber(sequenceNumber);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
RecordMetadata metadata = future.get();
System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
producer.close();
System.out.println("ExternalSequencingProducer Completed.");
}
}

View File

@ -1,39 +0,0 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MultiPartitionConsumer {
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100);
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
Consumer<String, UserEvent> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
while (true) {
ConsumerRecords<String, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
UserEvent userEvent = record.value();
if (userEvent != null) {
System.out.println("User Event ID: " + userEvent.getUserEventId());
}
});
}
}
}

View File

@ -1,33 +0,0 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class MultiPartitionProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props);
for (long count = 1; count <= 10 ; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, count, userEvent));
RecordMetadata metadata = future.get();
System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
producer.close();
System.out.println("MultiPartitionProducer Completed.");
}
}

View File

@ -1,30 +0,0 @@
package com.baeldung.kafka.message.ordering;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerConfigurations {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = "Key-" + (i % 5); // Assuming 5 partitions
producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, key, "Message-" + i));
}
producer.close();
System.out.println("Producer Configurations Completed.");
}
}

View File

@ -1,37 +0,0 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SinglePartitionConsumer {
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100);
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
Consumer<Long, UserEvent> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(Config.SINGLE_PARTITION_TOPIC));
while (true) {
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
UserEvent userEvent = record.value();
System.out.println("User Event ID: " + userEvent.getUserEventId());
});
}
}
}

View File

@ -1,35 +0,0 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import java.time.Instant;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class SinglePartitionProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props);
for (long count = 1; count <= 10; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, count, userEvent));
RecordMetadata metadata = future.get();
System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
producer.close();
System.out.println("SinglePartitionProducer Completed.");
}
}

View File

@ -30,9 +30,7 @@ import static org.junit.jupiter.api.Assertions.*;
@Testcontainers
public class ExtSeqWithTimeWindowIntegrationTest {
private static String TOPIC = "multi_partition_topic";
private static int PARTITIONS = 5;
private static short REPLICATION_FACTOR = 1;
private static Admin admin;
private static KafkaProducer<Long, UserEvent> producer;
private static KafkaConsumer<Long, UserEvent> consumer;
@ -65,10 +63,10 @@ public class ExtSeqWithTimeWindowIntegrationTest {
producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties);
List<NewTopic> topicList = new ArrayList<>();
NewTopic newTopic = new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR);
NewTopic newTopic = new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR);
topicList.add(newTopic);
CreateTopicsResult result = admin.createTopics(topicList);
KafkaFuture<Void> future = result.values().get(TOPIC);
KafkaFuture<Void> future = result.values().get(Config.MULTI_PARTITION_TOPIC);
future.whenComplete((voidResult, exception) -> {
if (exception != null) {
System.err.println("Error creating the topic: " + exception.getMessage());
@ -91,14 +89,14 @@ public class ExtSeqWithTimeWindowIntegrationTest {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
userEvent.setGlobalSequenceNumber(sequenceNumber);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC, sequenceNumber, userEvent));
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
System.out.println("Partition : " + metadata.partition());
System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
boolean isOrderMaintained = true;
consumer.subscribe(Collections.singletonList(TOPIC));
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
List<UserEvent> buffer = new ArrayList<>();
long lastProcessedTime = System.nanoTime();
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
@ -131,7 +129,7 @@ public class ExtSeqWithTimeWindowIntegrationTest {
Collections.sort(buffer);
buffer.forEach(userEvent -> {
receivedUserEventList.add(userEvent);
System.out.println("Process message with Event ID: " + userEvent.getUserEventId());
System.out.println("Processing message with Global Sequence number: " + userEvent.getGlobalSequenceNumber() + ", User Event Id: " + userEvent.getUserEventId());
});
buffer.clear();
}

View File

@ -30,9 +30,7 @@ import static org.junit.jupiter.api.Assertions.*;
@Testcontainers
public class MultiplePartitionIntegrationTest {
private static String TOPIC = "multi_partition_topic";
private static int PARTITIONS = 5;
private static short REPLICATION_FACTOR = 1;
private static Admin admin;
private static KafkaProducer<Long, UserEvent> producer;
private static KafkaConsumer<Long, UserEvent> consumer;
@ -63,10 +61,10 @@ public class MultiplePartitionIntegrationTest {
producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties);
List<NewTopic> topicList = new ArrayList<>();
NewTopic newTopic = new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR);
NewTopic newTopic = new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR);
topicList.add(newTopic);
CreateTopicsResult result = admin.createTopics(topicList);
KafkaFuture<Void> future = result.values().get(TOPIC);
KafkaFuture<Void> future = result.values().get(Config.MULTI_PARTITION_TOPIC);
future.whenComplete((voidResult, exception) -> {
if (exception != null) {
System.err.println("Error creating the topic: " + exception.getMessage());
@ -88,18 +86,19 @@ public class MultiplePartitionIntegrationTest {
for (long count = 1; count <= 10 ; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC, count, userEvent));
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, count, userEvent));
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
System.out.println("Partition : " + metadata.partition());
System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
boolean isOrderMaintained = true;
consumer.subscribe(Collections.singletonList(TOPIC));
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
UserEvent userEvent = record.value();
receivedUserEventList.add(userEvent);
System.out.println("User Event ID: " + userEvent.getUserEventId());
});
for (int insertPosition = 0; insertPosition <= receivedUserEventList.size() - 1; insertPosition++) {
if (isOrderMaintained){

View File

@ -32,9 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Testcontainers
public class SinglePartitionIntegrationTest {
private static String TOPIC = "single_partition_topic";
private static int PARTITIONS = 1;
private static short REPLICATION_FACTOR = 1;
private static Admin admin;
private static KafkaProducer<Long, UserEvent> producer;
private static KafkaConsumer<Long, UserEvent> consumer;
@ -67,10 +65,10 @@ public class SinglePartitionIntegrationTest {
producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties);
List<NewTopic> topicList = new ArrayList<>();
NewTopic newTopic = new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR);
NewTopic newTopic = new NewTopic(Config.SINGLE_PARTITION_TOPIC, Config.SINGLE_PARTITION, Config.REPLICATION_FACTOR);
topicList.add(newTopic);
CreateTopicsResult result = admin.createTopics(topicList);
KafkaFuture<Void> future = result.values().get(TOPIC);
KafkaFuture<Void> future = result.values().get(Config.SINGLE_PARTITION_TOPIC);
future.whenComplete((voidResult, exception) -> {
if (exception != null) {
System.err.println("Error creating the topic: " + exception.getMessage());
@ -92,18 +90,19 @@ public class SinglePartitionIntegrationTest {
for (long count = 1; count <= 10 ; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(TOPIC, userEvent);
ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent);
Future<RecordMetadata> future = producer.send(producerRecord);
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
System.out.println("Partition : " + metadata.partition());
System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
consumer.subscribe(Collections.singletonList(TOPIC));
consumer.subscribe(Collections.singletonList(Config.SINGLE_PARTITION_TOPIC));
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
UserEvent userEvent = record.value();
receivedUserEventList.add(userEvent);
System.out.println("User Event ID: " + userEvent.getUserEventId());
});
boolean result = true;
for (int count = 0; count <= 9 ; count++) {