diff --git a/apache-kafka-2/pom.xml b/apache-kafka-2/pom.xml index 067dedef8a..45b31004b7 100644 --- a/apache-kafka-2/pom.xml +++ b/apache-kafka-2/pom.xml @@ -57,6 +57,11 @@ ${lombok.version} provided + + com.fasterxml.jackson.core + jackson-databind + 2.15.2 + diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ConsumerConfigurations.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ConsumerConfigurations.java new file mode 100644 index 0000000000..b18db3ef24 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ConsumerConfigurations.java @@ -0,0 +1,32 @@ +package com.baeldung.kafka.message.ordering; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +public class ConsumerConfigurations { + public static void main(String[] args) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("max.poll.records", "500"); + props.put("fetch.min.bytes", "1"); + props.put("fetch.max.wait.ms", "500"); + Consumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList("multi_partition_topic")); + + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + records.forEach(record -> { + System.out.println("Partition: " + record.partition() + ", Message: " + record.value()); + }); + } + } +} diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java new file mode 100644 index 0000000000..5b01a86e39 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowConsumer.java @@ -0,0 +1,49 @@ +package com.baeldung.kafka.message.ordering; + +import com.baeldung.kafka.message.ordering.payload.Message; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.util.*; + +public class ExtSeqWithTimeWindowConsumer { + private static final long BUFFER_PERIOD_MS = 5000; + private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100); + + public static void main(String[] args) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put("value.deserializer.serializedClass", Message.class); + Consumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList("multi_partition_topic")); + List buffer = new ArrayList<>(); + long lastProcessedTime = System.nanoTime(); + while (true) { + ConsumerRecords records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); + records.forEach(record -> { + if (record != null && record.value() != null) { + buffer.add(record.value()); + } + }); + if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_MS) { + processBuffer(buffer); + lastProcessedTime = System.nanoTime(); + } + } + } + + private static void processBuffer(List buffer) { + Collections.sort(buffer); + buffer.forEach(message -> { + System.out.println("Processing message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + }); + buffer.clear(); + } +} diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java new file mode 100644 index 0000000000..91c5af716f --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ExtSeqWithTimeWindowProducer.java @@ -0,0 +1,29 @@ +package com.baeldung.kafka.message.ordering; + +import com.baeldung.kafka.message.ordering.payload.Message; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +public class ExtSeqWithTimeWindowProducer { + public static void main(String[] args) { + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer"); + + KafkaProducer producer = new KafkaProducer<>(props); + for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { + long messageId = Message.getRandomMessageId(); + String key = "Key-" + insertPosition; + Message message = new Message(insertPosition, messageId); + producer.send(new ProducerRecord<>("multi_partition_topic", key, message)); + System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + } + producer.close(); + System.out.println("ExternalSequencingProducer Completed."); + } +} diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionConsumer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionConsumer.java new file mode 100644 index 0000000000..f9b0b3b040 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionConsumer.java @@ -0,0 +1,34 @@ +package com.baeldung.kafka.message.ordering; + +import com.baeldung.kafka.message.ordering.payload.Message; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +public class MultiPartitionConsumer { + private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100); + public static void main(String[] args) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put("value.deserializer.serializedClass", Message.class); + Consumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList("multi_partition_topic")); + while (true) { + ConsumerRecords records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); + records.forEach(record -> { + Message message = record.value(); + System.out.println("Process message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + }); + } + } +} + diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java new file mode 100644 index 0000000000..8b2a49b2b5 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java @@ -0,0 +1,27 @@ +package com.baeldung.kafka.message.ordering; + +import com.baeldung.kafka.message.ordering.payload.Message; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +public class MultiPartitionProducer { + public static void main(String[] args) { + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer"); + + KafkaProducer producer = new KafkaProducer<>(props); + for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { + long messageId = Message.getRandomMessageId(); + String key = "Key-" + insertPosition; + Message message = new Message(insertPosition, messageId); + producer.send(new ProducerRecord<>("multi_partition_topic", key, message)); + System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + } + producer.close(); + System.out.println("SinglePartitionProducer Completed."); + } +} diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ProducerConfigurations.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ProducerConfigurations.java new file mode 100644 index 0000000000..bcdf6ceb32 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ProducerConfigurations.java @@ -0,0 +1,27 @@ +package com.baeldung.kafka.message.ordering; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +public class ProducerConfigurations { + public static void main(String[] args) { + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("max.in.flight.requests.per.connection", "1"); + props.put("batch.size", "16384"); + props.put("linger.ms", "5"); + KafkaProducer producer = new KafkaProducer<>(props); + + for (int i = 0; i < 10; i++) { + String key = "Key-" + (i % 3); // Assuming 3 partitions + producer.send(new ProducerRecord<>("multi_partition_topic", key, "Message-" + i)); + } + + producer.close(); + System.out.println("MultiPartitionProducer Completed."); + } +} diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionConsumer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionConsumer.java new file mode 100644 index 0000000000..932a29c394 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionConsumer.java @@ -0,0 +1,35 @@ +package com.baeldung.kafka.message.ordering; + +import com.baeldung.kafka.message.ordering.payload.Message; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +public class SinglePartitionConsumer { + private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100); + + public static void main(String[] args) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put("value.deserializer.serializedClass", Message.class); + Consumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList("single_partition_topic")); + while (true) { + ConsumerRecords records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES); + records.forEach(record -> { + Message message = record.value(); + System.out.println("Process message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + }); + } + } +} + diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionProducer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionProducer.java new file mode 100644 index 0000000000..b5366819c5 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/SinglePartitionProducer.java @@ -0,0 +1,29 @@ +package com.baeldung.kafka.message.ordering; + +import com.baeldung.kafka.message.ordering.payload.Message; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; +import java.util.Random; + +public class SinglePartitionProducer { + public static void main(String[] args) { + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer"); + + KafkaProducer producer = new KafkaProducer<>(props); + for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { + long messageId = Message.getRandomMessageId(); + String key = "Key-" + insertPosition; + Message message = new Message(insertPosition, messageId); + producer.send(new ProducerRecord<>("single_partition_topic", key, message)); + System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId()); + } + producer.close(); + System.out.println("SinglePartitionProducer Completed."); + } + +} diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java new file mode 100644 index 0000000000..b185d663d4 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/Message.java @@ -0,0 +1,36 @@ +package com.baeldung.kafka.message.ordering.payload; + +import java.util.Random; + +public class Message implements Comparable { + private long insertPosition; + private long messageId; + + public Message(){ + + } + + public Message(long insertPosition, long messageId) { + this.insertPosition = insertPosition; + this.messageId = messageId; + } + + public long getInsertPosition() { + return insertPosition; + } + + public long getMessageId() { + return messageId; + } + + @Override + public int compareTo(Message other) { + return Long.compare(this.messageId, other.messageId); + } + + public static long getRandomMessageId() { + Random rand = new Random(); + return rand.nextInt(1000); + } +} + diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java new file mode 100644 index 0000000000..34aa181fcb --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java @@ -0,0 +1,36 @@ +package com.baeldung.kafka.message.ordering.serialization; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; + +public class JacksonDeserializer implements Deserializer { + private final ObjectMapper objectMapper = new ObjectMapper(); + private Class tClass; + + public JacksonDeserializer(Class tClass) { + this.tClass = tClass; + } + + public JacksonDeserializer() { + + } + + @Override + public void configure(Map configs, boolean isKey) { + this.tClass = (Class) configs.get("value.deserializer.serializedClass"); + } + + @Override + public T deserialize(String topic, byte[] bytes) { + if (bytes == null) { + return null; + } + try { + return objectMapper.readValue(bytes, tClass); + } catch (Exception e) { + throw new RuntimeException("Error deserializing value", e); + } + } +} + diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java new file mode 100644 index 0000000000..fa9d25dd85 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java @@ -0,0 +1,20 @@ +package com.baeldung.kafka.message.ordering.serialization; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.serialization.Serializer; + +public class JacksonSerializer implements Serializer { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public byte[] serialize(String topic, T data) { + if (data == null) { + return null; + } + try { + return objectMapper.writeValueAsBytes(data); + } catch (Exception e) { + throw new RuntimeException("Error serializing value", e); + } + } +}