Incorporated review comments

This commit is contained in:
Amol Gote 2023-10-31 20:17:39 -04:00
parent 5b936c47a0
commit feca50daca
8 changed files with 41 additions and 25 deletions

View File

@ -2,4 +2,7 @@ package com.baeldung.kafka.message.ordering;
public class Config { public class Config {
public static final String CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS = "value.deserializer.serializedClass"; public static final String CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS = "value.deserializer.serializedClass";
public static final String KAFKA_LOCAL = "localhost:9092";
public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic";
public static final String SINGLE_PARTITION_TOPIC = "single_partition_topic";
} }

View File

@ -17,14 +17,14 @@ public class ExtSeqWithTimeWindowConsumer {
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
Consumer<Long, UserEvent> consumer = new KafkaConsumer<>(props); Consumer<Long, UserEvent> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("multi_partition_topic")); consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
List<UserEvent> buffer = new ArrayList<>(); List<UserEvent> buffer = new ArrayList<>();
long lastProcessedTime = System.nanoTime(); long lastProcessedTime = System.nanoTime();
while (true) { while (true) {
@ -42,7 +42,7 @@ public class ExtSeqWithTimeWindowConsumer {
private static void processBuffer(List<UserEvent> buffer) { private static void processBuffer(List<UserEvent> buffer) {
Collections.sort(buffer); Collections.sort(buffer);
buffer.forEach(userEvent -> { buffer.forEach(userEvent -> {
System.out.println("Processing message with Global Sequence number: " + userEvent.getGlobalSequenceNumber() + ", event nano time : " + userEvent.getEventNanoTime()); System.out.println("Processing message with Global Sequence number: " + userEvent.getGlobalSequenceNumber() + ", User Event Id: " + userEvent.getUserEventId());
}); });
buffer.clear(); buffer.clear();
} }

View File

@ -5,15 +5,18 @@ import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.LongSerializer;
import java.util.Properties; import java.util.Properties;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class ExtSeqWithTimeWindowProducer { public class ExtSeqWithTimeWindowProducer {
public static void main(String[] args) { public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props); KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props);
@ -21,8 +24,9 @@ public class ExtSeqWithTimeWindowProducer {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime()); userEvent.setEventNanoTime(System.nanoTime());
userEvent.setGlobalSequenceNumber(sequenceNumber); userEvent.setGlobalSequenceNumber(sequenceNumber);
producer.send(new ProducerRecord<>("multi_partition_topic", sequenceNumber, userEvent)); Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
System.out.println("User Event Nano time : " + userEvent.getEventNanoTime() + ", User Event Id: " + userEvent.getUserEventId()); RecordMetadata metadata = future.get();
System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
} }
producer.close(); producer.close();
System.out.println("ExternalSequencingProducer Completed."); System.out.println("ExternalSequencingProducer Completed.");

View File

@ -14,22 +14,23 @@ import java.util.Properties;
public class MultiPartitionConsumer { public class MultiPartitionConsumer {
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100); private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100);
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
Consumer<String, UserEvent> consumer = new KafkaConsumer<>(props); Consumer<String, UserEvent> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("multi_partition_topic")); consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
while (true) { while (true) {
ConsumerRecords<String, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); ConsumerRecords<String, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> { records.forEach(record -> {
UserEvent userEvent = record.value(); UserEvent userEvent = record.value();
if (userEvent != null) { if (userEvent != null) {
System.out.println("Process message with event nano time : " + userEvent.getEventNanoTime() + ", Event ID: " + userEvent.getUserEventId()); System.out.println("User Event ID: " + userEvent.getUserEventId());
} }
}); });
} }

View File

@ -5,25 +5,29 @@ import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.LongSerializer;
import java.util.Properties; import java.util.Properties;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class MultiPartitionProducer { public class MultiPartitionProducer {
public static void main(String[] args) { public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props); KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props);
for (long count = 1; count <= 10 ; count++) { for (long count = 1; count <= 10 ; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime()); userEvent.setEventNanoTime(System.nanoTime());
producer.send(new ProducerRecord<>("multi_partition_topic", count, userEvent)); Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, count, userEvent));
System.out.println("Process message with Event ID: " + userEvent.getUserEventId()); RecordMetadata metadata = future.get();
System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
} }
producer.close(); producer.close();
System.out.println("SinglePartitionProducer Completed."); System.out.println("MultiPartitionProducer Completed.");
} }
} }

View File

@ -10,7 +10,7 @@ import java.util.Properties;
public class ProducerConfigurations { public class ProducerConfigurations {
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");

View File

@ -17,19 +17,19 @@ public class SinglePartitionConsumer {
public static void main(String[] args) { public static void main(String[] args) {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class); props.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
Consumer<Long, UserEvent> consumer = new KafkaConsumer<>(props); Consumer<Long, UserEvent> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("single_partition_topic")); consumer.subscribe(Collections.singletonList(Config.SINGLE_PARTITION_TOPIC));
while (true) { while (true) {
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> { records.forEach(record -> {
UserEvent userEvent = record.value(); UserEvent userEvent = record.value();
System.out.println("Process message with event nano time : " + userEvent.getEventNanoTime() + ", Event ID: " + userEvent.getUserEventId()); System.out.println("User Event ID: " + userEvent.getUserEventId());
}); });
} }
} }

View File

@ -5,24 +5,28 @@ import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.LongSerializer;
import java.time.Instant; import java.time.Instant;
import java.util.Properties; import java.util.Properties;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class SinglePartitionProducer { public class SinglePartitionProducer {
public static void main(String[] args) { public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props); KafkaProducer<Long, UserEvent> producer = new KafkaProducer<>(props);
for (long count = 1; count <= 10 ; count++) { for (long count = 1; count <= 10; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime()); userEvent.setEventNanoTime(System.nanoTime());
producer.send(new ProducerRecord<>("single_partition_topic", count, userEvent)); Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, count, userEvent));
System.out.println("Process message with Event ID: " + userEvent.getUserEventId()); RecordMetadata metadata = future.get();
System.out.println("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
} }
producer.close(); producer.close();
System.out.println("SinglePartitionProducer Completed."); System.out.println("SinglePartitionProducer Completed.");