Incorporated Review Comments

This commit is contained in:
Amol Gote 2023-10-15 16:36:33 -04:00
parent 29096686cc
commit b94d5a1f8a
9 changed files with 50 additions and 27 deletions

View File

@ -1,9 +1,13 @@
package com.baeldung.kafka.message.ordering; package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
@ -14,8 +18,8 @@ public class ConsumerConfigurations {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1"); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");

View File

@ -1,10 +1,12 @@
package com.baeldung.kafka.message.ordering; package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message; import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
@ -17,10 +19,10 @@ public class ExtSeqWithTimeWindowConsumer {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("value.deserializer.serializedClass", Message.class); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
Consumer<String, Message> consumer = new KafkaConsumer<>(props); Consumer<String, Message> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("multi_partition_topic")); consumer.subscribe(Collections.singletonList("multi_partition_topic"));
List<Message> buffer = new ArrayList<>(); List<Message> buffer = new ArrayList<>();
@ -28,9 +30,7 @@ public class ExtSeqWithTimeWindowConsumer {
while (true) { while (true) {
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> { records.forEach(record -> {
if (record != null && record.value() != null) { buffer.add(record.value());
buffer.add(record.value());
}
}); });
if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_MS) { if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_MS) {
processBuffer(buffer); processBuffer(buffer);

View File

@ -1,8 +1,11 @@
package com.baeldung.kafka.message.ordering; package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message; import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.Properties;
import java.util.Random; import java.util.Random;
@ -11,9 +14,9 @@ import java.util.concurrent.atomic.AtomicLong;
public class ExtSeqWithTimeWindowProducer { public class ExtSeqWithTimeWindowProducer {
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<String, Message> producer = new KafkaProducer<>(props); KafkaProducer<String, Message> producer = new KafkaProducer<>(props);
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {

View File

@ -1,10 +1,12 @@
package com.baeldung.kafka.message.ordering; package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message; import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
@ -16,17 +18,19 @@ public class MultiPartitionConsumer {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("value.deserializer.serializedClass", Message.class); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
Consumer<String, Message> consumer = new KafkaConsumer<>(props); Consumer<String, Message> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("multi_partition_topic")); consumer.subscribe(Collections.singletonList("multi_partition_topic"));
while (true) { while (true) {
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> { records.forEach(record -> {
Message message = record.value(); Message message = record.value();
System.out.println("Process message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); if (message != null) {
System.out.println("Process message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
}
}); });
} }
} }

View File

@ -1,8 +1,11 @@
package com.baeldung.kafka.message.ordering; package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message; import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.Properties;
@ -10,8 +13,8 @@ public class MultiPartitionProducer {
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<String, Message> producer = new KafkaProducer<>(props); KafkaProducer<String, Message> producer = new KafkaProducer<>(props);
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {

View File

@ -3,6 +3,7 @@ package com.baeldung.kafka.message.ordering;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.Properties;
@ -10,8 +11,8 @@ public class ProducerConfigurations {
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.put(ProducerConfig.LINGER_MS_CONFIG, "5"); props.put(ProducerConfig.LINGER_MS_CONFIG, "5");

View File

@ -1,10 +1,13 @@
package com.baeldung.kafka.message.ordering; package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message; import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
@ -17,10 +20,10 @@ public class SinglePartitionConsumer {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("value.deserializer.serializedClass", Message.class); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, Message.class);
Consumer<String, Message> consumer = new KafkaConsumer<>(props); Consumer<String, Message> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("single_partition_topic")); consumer.subscribe(Collections.singletonList("single_partition_topic"));
while (true) { while (true) {

View File

@ -1,8 +1,11 @@
package com.baeldung.kafka.message.ordering; package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.Message; import com.baeldung.kafka.message.ordering.payload.Message;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.Properties;
import java.util.Random; import java.util.Random;
@ -10,9 +13,9 @@ import java.util.Random;
public class SinglePartitionProducer { public class SinglePartitionProducer {
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<String, Message> producer = new KafkaProducer<>(props); KafkaProducer<String, Message> producer = new KafkaProducer<>(props);
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {

View File

@ -1,4 +1,5 @@
package com.baeldung.kafka.message.ordering.serialization; package com.baeldung.kafka.message.ordering.serialization;
import com.baeldung.kafka.message.ordering.Config;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
@ -22,7 +23,7 @@ public class JacksonDeserializer<T> implements Deserializer<T> {
@Override @Override
public void configure(Map<String, ?> configs, boolean isKey) { public void configure(Map<String, ?> configs, boolean isKey) {
this.type = (Class<T>) configs.get("value.deserializer.serializedClass"); this.type = (Class<T>) configs.get(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS);
} }
@Override @Override
@ -33,8 +34,9 @@ public class JacksonDeserializer<T> implements Deserializer<T> {
try { try {
return objectMapper.readValue(bytes, type); return objectMapper.readValue(bytes, type);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Error deserializing value", e); //throw new RuntimeException("Error deserializing value", e);
} }
return null;
} }
} }