From feca50daca29d74b161e7ccb6fff6bde710f01e6 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Tue, 31 Oct 2023 20:17:39 -0400 Subject: [PATCH] Incorporated review comments --- .../baeldung/kafka/message/ordering/Config.java | 3 +++ .../ordering/ExtSeqWithTimeWindowConsumer.java | 6 +++--- .../ordering/ExtSeqWithTimeWindowProducer.java | 12 ++++++++---- .../message/ordering/MultiPartitionConsumer.java | 9 +++++---- .../message/ordering/MultiPartitionProducer.java | 14 +++++++++----- .../message/ordering/ProducerConfigurations.java | 2 +- .../message/ordering/SinglePartitionConsumer.java | 6 +++--- .../message/ordering/SinglePartitionProducer.java | 14 +++++++++----- 8 files changed, 41 insertions(+), 25 deletions(-) diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/Config.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/Config.java index 2635e72431..12acfecf51 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/Config.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/Config.java @@ -2,4 +2,7 @@ package com.baeldung.kafka.message.ordering; public class Config { public static final String CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS = "value.deserializer.serializedClass"; + public static final String KAFKA_LOCAL = "localhost:9092"; + public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic"; + public static final String SINGLE_PARTITION_TOPIC = "single_partition_topic"; } diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java index 639a980462..06cb7104b7 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java @@ -17,14 +17,14 @@ public class ExtSeqWithTimeWindowConsumer { public static void main(String[] args) { Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class); Consumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Collections.singletonList("multi_partition_topic")); + consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC)); List buffer = new ArrayList<>(); long lastProcessedTime = System.nanoTime(); while (true) { @@ -42,7 +42,7 @@ public class ExtSeqWithTimeWindowConsumer { private static void processBuffer(List buffer) { Collections.sort(buffer); buffer.forEach(userEvent -> { - System.out.println("Processing message with Global Sequence number: " + userEvent.getGlobalSequenceNumber() + ", event nano time : " + userEvent.getEventNanoTime()); + System.out.println("Processing message with Global Sequence number: " + userEvent.getGlobalSequenceNumber() + ", User Event Id: " + userEvent.getUserEventId()); }); buffer.clear(); } diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java index c18e35b351..73a62c0bf2 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java @@ -5,15 +5,18 @@ import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.LongSerializer; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; public class ExtSeqWithTimeWindowProducer { - public static void main(String[] args) { + public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); KafkaProducer producer = new KafkaProducer<>(props); @@ -21,8 +24,9 @@ public class ExtSeqWithTimeWindowProducer { UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); userEvent.setEventNanoTime(System.nanoTime()); userEvent.setGlobalSequenceNumber(sequenceNumber); - producer.send(new ProducerRecord<>("multi_partition_topic", sequenceNumber, userEvent)); - System.out.println("User Event Nano time : " + userEvent.getEventNanoTime() + ", User Event Id: " + userEvent.getUserEventId()); + Future future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent)); + RecordMetadata metadata = future.get(); + System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition()); } producer.close(); System.out.println("ExternalSequencingProducer Completed."); diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionConsumer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionConsumer.java index e738832425..82f05cc80e 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionConsumer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionConsumer.java @@ -14,22 +14,23 @@ import java.util.Properties; public class MultiPartitionConsumer { private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100); + public static void main(String[] args) { Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); Consumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Collections.singletonList("multi_partition_topic")); + consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC)); while (true) { ConsumerRecords records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); records.forEach(record -> { UserEvent userEvent = record.value(); if (userEvent != null) { - System.out.println("Process message with event nano time : " + userEvent.getEventNanoTime() + ", Event ID: " + userEvent.getUserEventId()); + System.out.println("User Event ID: " + userEvent.getUserEventId()); } }); } diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java index db02c87bbe..52da49ab80 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java @@ -5,25 +5,29 @@ import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.LongSerializer; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; public class MultiPartitionProducer { - public static void main(String[] args) { + public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); KafkaProducer producer = new KafkaProducer<>(props); for (long count = 1; count <= 10 ; count++) { UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); userEvent.setEventNanoTime(System.nanoTime()); - producer.send(new ProducerRecord<>("multi_partition_topic", count, userEvent)); - System.out.println("Process message with Event ID: " + userEvent.getUserEventId()); + Future future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, count, userEvent)); + RecordMetadata metadata = future.get(); + System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition()); } producer.close(); - System.out.println("SinglePartitionProducer Completed."); + System.out.println("MultiPartitionProducer Completed."); } } diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ProducerConfigurations.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ProducerConfigurations.java index 0eb563910e..61c9cb48aa 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ProducerConfigurations.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ProducerConfigurations.java @@ -10,7 +10,7 @@ import java.util.Properties; public class ProducerConfigurations { public static void main(String[] args) { Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionConsumer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionConsumer.java index 5f5ce86924..1c50f3cf7a 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionConsumer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionConsumer.java @@ -17,19 +17,19 @@ public class SinglePartitionConsumer { public static void main(String[] args) { Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class); Consumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Collections.singletonList("single_partition_topic")); + consumer.subscribe(Collections.singletonList(Config.SINGLE_PARTITION_TOPIC)); while (true) { ConsumerRecords records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); records.forEach(record -> { UserEvent userEvent = record.value(); - System.out.println("Process message with event nano time : " + userEvent.getEventNanoTime() + ", Event ID: " + userEvent.getUserEventId()); + System.out.println("User Event ID: " + userEvent.getUserEventId()); }); } } diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionProducer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionProducer.java index 2a7719e34f..9306abaebf 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionProducer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionProducer.java @@ -5,24 +5,28 @@ import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.LongSerializer; import java.time.Instant; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; public class SinglePartitionProducer { - public static void main(String[] args) { + public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); KafkaProducer producer = new KafkaProducer<>(props); - for (long count = 1; count <= 10 ; count++) { + for (long count = 1; count <= 10; count++) { UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); userEvent.setEventNanoTime(System.nanoTime()); - producer.send(new ProducerRecord<>("single_partition_topic", count, userEvent)); - System.out.println("Process message with Event ID: " + userEvent.getUserEventId()); + Future future = producer.send(new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, count, userEvent)); + RecordMetadata metadata = future.get(); + System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition()); } producer.close(); System.out.println("SinglePartitionProducer Completed.");