diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java index f5fcf48596..6b6e615d65 100644 --- a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java @@ -1,22 +1,23 @@ package com.baeldung.kafka.commitoffset; import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + import java.time.Duration; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class AsyncCommit { - public static void main(String[] args) { + public static void main(String[] args) { - KafkaConsumer consumer = - new KafkaConsumer<>(KafkaConfigProperties.getProperties()); - consumer.subscribe(KafkaConfigProperties.getTopic()); - ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); - for (ConsumerRecord message : messages) { - // processed message - consumer.commitAsync(); + KafkaConsumer consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + consumer.commitAsync(); + } } - } } diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java index ff8ff52e76..6fca7db43a 100644 --- a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java @@ -1,8 +1,10 @@ package com.baeldung.kafka.commitoffset; import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + import java.time.Duration; import java.util.Properties; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -10,15 +12,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; public class AutomaticCommit { - public static void main(String[] args) { + public static void main(String[] args) { - Properties properties = KafkaConfigProperties.getProperties(); - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - KafkaConsumer consumer = new KafkaConsumer<>(properties); - consumer.subscribe(KafkaConfigProperties.getTopic()); - ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); - for (ConsumerRecord message : messages) { - // processed message + Properties properties = KafkaConfigProperties.getProperties(); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + KafkaConsumer consumer = new KafkaConsumer<>(properties); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + } } - } } diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java index 1ddb0c90b7..07f099a844 100644 --- a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java @@ -1,9 +1,11 @@ package com.baeldung.kafka.commitoffset; import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + import java.time.Duration; import java.util.HashMap; import java.util.Map; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -11,25 +13,22 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; public class SpecificOffsetCommit { - public static void main(String[] args) { + public static void main(String[] args) { - KafkaConsumer consumer = - new KafkaConsumer<>(KafkaConfigProperties.getProperties()); - consumer.subscribe(KafkaConfigProperties.getTopic()); - Map currentOffsets = new HashMap<>(); - int messageProcessed = 0; - while (true) { - ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); - for (ConsumerRecord message : messages) { - // processed message - messageProcessed++; - currentOffsets.put( - new TopicPartition(message.topic(), message.partition()), - new OffsetAndMetadata(message.offset() + 1)); - if (messageProcessed % 50 == 0) { - consumer.commitSync(currentOffsets); + KafkaConsumer consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + Map currentOffsets = new HashMap<>(); + int messageProcessed = 0; + while (true) { + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + messageProcessed++; + currentOffsets.put(new TopicPartition(message.topic(), message.partition()), new OffsetAndMetadata(message.offset() + 1)); + if (messageProcessed % 50 == 0) { + consumer.commitSync(currentOffsets); + } + } } - } } - } } diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java index 08c268070d..54f6b5f826 100644 --- a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java @@ -1,22 +1,23 @@ package com.baeldung.kafka.commitoffset; import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + import java.time.Duration; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class SyncCommit { - public static void main(String[] args) { + public static void main(String[] args) { - KafkaConsumer consumer = - new KafkaConsumer<>(KafkaConfigProperties.getProperties()); - consumer.subscribe(KafkaConfigProperties.getTopic()); - ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); - for (ConsumerRecord message : messages) { - // processed message - consumer.commitSync(); + KafkaConsumer consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + consumer.commitSync(); + } } - } } diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java index 0faa8698ac..9b2096a610 100644 --- a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java @@ -2,6 +2,7 @@ package com.baeldung.kafka.commitoffset.config; import java.util.ArrayList; import java.util.Properties; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; @@ -9,22 +10,22 @@ import org.apache.kafka.common.serialization.StringDeserializer; * @author amitkumar */ public class KafkaConfigProperties { - public static final String MY_TOPIC = "my-topic"; + public static final String MY_TOPIC = "my-topic"; - public static Properties getProperties() { + public static Properties getProperties() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyFirstConsumer"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - return props; - } + Properties props = new Properties(); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyFirstConsumer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + return props; + } - public static ArrayList getTopic() { - ArrayList topics = new ArrayList<>(); - topics.add(MY_TOPIC); - return topics; - } + public static ArrayList getTopic() { + ArrayList topics = new ArrayList<>(); + topics.add(MY_TOPIC); + return topics; + } }