From 8867e351790336c0b8fdb985ba88f1d47af1034f Mon Sep 17 00:00:00 2001 From: Amit Kumatr Date: Sat, 27 Jan 2024 01:49:27 +0530 Subject: [PATCH] added code for commit offsets --- apache-kafka-3/README.md | 9 ++++ apache-kafka-3/pom.xml | 43 +++++++++++++++++++ .../kafka/commitoffset/AsyncCommit.java | 22 ++++++++++ .../kafka/commitoffset/AutomaticCommit.java | 24 +++++++++++ .../commitoffset/SpecificOffsetCommit.java | 35 +++++++++++++++ .../kafka/commitoffset/SyncCommit.java | 22 ++++++++++ .../config/KafkaConfigProperties.java | 30 +++++++++++++ apache-kafka-3/src/test/resources/logback.xml | 11 +++++ pom.xml | 2 + 9 files changed, 198 insertions(+) create mode 100644 apache-kafka-3/README.md create mode 100644 apache-kafka-3/pom.xml create mode 100644 apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java create mode 100644 apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java create mode 100644 apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java create mode 100644 apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java create mode 100644 apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java create mode 100644 apache-kafka-3/src/test/resources/logback.xml diff --git a/apache-kafka-3/README.md b/apache-kafka-3/README.md new file mode 100644 index 0000000000..0b96e55385 --- /dev/null +++ b/apache-kafka-3/README.md @@ -0,0 +1,9 @@ +## Apache Kafka + +This module contains articles about Apache Kafka. + +##### Building the project +You can build the project from the command line using: *mvn clean install*, or in an IDE. + +### Relevant Articles: + diff --git a/apache-kafka-3/pom.xml b/apache-kafka-3/pom.xml new file mode 100644 index 0000000000..87ab2a7052 --- /dev/null +++ b/apache-kafka-3/pom.xml @@ -0,0 +1,43 @@ + + + 4.0.0 + apache-kafka-3 + apache-kafka-3 + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.slf4j + slf4j-api + ${org.slf4j.version} + + + org.projectlombok + lombok + ${lombok.version} + provided + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.databind.version} + + + + + 3.6.1 + 2.15.2 + + \ No newline at end of file diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java new file mode 100644 index 0000000000..f5fcf48596 --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java @@ -0,0 +1,22 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; +import java.time.Duration; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +public class AsyncCommit { + + public static void main(String[] args) { + + KafkaConsumer consumer = + new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + consumer.commitAsync(); + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java new file mode 100644 index 0000000000..ff8ff52e76 --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java @@ -0,0 +1,24 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; +import java.time.Duration; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +public class AutomaticCommit { + + public static void main(String[] args) { + + Properties properties = KafkaConfigProperties.getProperties(); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + KafkaConsumer consumer = new KafkaConsumer<>(properties); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java new file mode 100644 index 0000000000..1ddb0c90b7 --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java @@ -0,0 +1,35 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +public class SpecificOffsetCommit { + public static void main(String[] args) { + + KafkaConsumer consumer = + new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + Map currentOffsets = new HashMap<>(); + int messageProcessed = 0; + while (true) { + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + messageProcessed++; + currentOffsets.put( + new TopicPartition(message.topic(), message.partition()), + new OffsetAndMetadata(message.offset() + 1)); + if (messageProcessed % 50 == 0) { + consumer.commitSync(currentOffsets); + } + } + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java new file mode 100644 index 0000000000..08c268070d --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java @@ -0,0 +1,22 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; +import java.time.Duration; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +public class SyncCommit { + + public static void main(String[] args) { + + KafkaConsumer consumer = + new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + consumer.commitSync(); + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java new file mode 100644 index 0000000000..0faa8698ac --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java @@ -0,0 +1,30 @@ +package com.baeldung.kafka.commitoffset.config; + +import java.util.ArrayList; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; + +/** + * @author amitkumar + */ +public class KafkaConfigProperties { + public static final String MY_TOPIC = "my-topic"; + + public static Properties getProperties() { + + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyFirstConsumer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + return props; + } + + public static ArrayList getTopic() { + ArrayList topics = new ArrayList<>(); + topics.add(MY_TOPIC); + return topics; + } +} diff --git a/apache-kafka-3/src/test/resources/logback.xml b/apache-kafka-3/src/test/resources/logback.xml new file mode 100644 index 0000000000..6156c2188e --- /dev/null +++ b/apache-kafka-3/src/test/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index a73d508d89..cab9136c59 100644 --- a/pom.xml +++ b/pom.xml @@ -690,6 +690,7 @@ apache-httpclient4 apache-httpclient apache-kafka-2 + apache-kafka-3 apache-kafka apache-libraries-2 apache-libraries @@ -930,6 +931,7 @@ apache-httpclient4 apache-httpclient apache-kafka-2 + apache-kafka-3 apache-kafka apache-libraries-2 apache-libraries