From 8867e351790336c0b8fdb985ba88f1d47af1034f Mon Sep 17 00:00:00 2001 From: Amit Kumatr Date: Sat, 27 Jan 2024 01:49:27 +0530 Subject: [PATCH 1/3] added code for commit offsets --- apache-kafka-3/README.md | 9 ++++ apache-kafka-3/pom.xml | 43 +++++++++++++++++++ .../kafka/commitoffset/AsyncCommit.java | 22 ++++++++++ .../kafka/commitoffset/AutomaticCommit.java | 24 +++++++++++ .../commitoffset/SpecificOffsetCommit.java | 35 +++++++++++++++ .../kafka/commitoffset/SyncCommit.java | 22 ++++++++++ .../config/KafkaConfigProperties.java | 30 +++++++++++++ apache-kafka-3/src/test/resources/logback.xml | 11 +++++ pom.xml | 2 + 9 files changed, 198 insertions(+) create mode 100644 apache-kafka-3/README.md create mode 100644 apache-kafka-3/pom.xml create mode 100644 apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java create mode 100644 apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java create mode 100644 apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java create mode 100644 apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java create mode 100644 apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java create mode 100644 apache-kafka-3/src/test/resources/logback.xml diff --git a/apache-kafka-3/README.md b/apache-kafka-3/README.md new file mode 100644 index 0000000000..0b96e55385 --- /dev/null +++ b/apache-kafka-3/README.md @@ -0,0 +1,9 @@ +## Apache Kafka + +This module contains articles about Apache Kafka. + +##### Building the project +You can build the project from the command line using: *mvn clean install*, or in an IDE. + +### Relevant Articles: + diff --git a/apache-kafka-3/pom.xml b/apache-kafka-3/pom.xml new file mode 100644 index 0000000000..87ab2a7052 --- /dev/null +++ b/apache-kafka-3/pom.xml @@ -0,0 +1,43 @@ + + + 4.0.0 + apache-kafka-3 + apache-kafka-3 + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.slf4j + slf4j-api + ${org.slf4j.version} + + + org.projectlombok + lombok + ${lombok.version} + provided + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.databind.version} + + + + + 3.6.1 + 2.15.2 + + \ No newline at end of file diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java new file mode 100644 index 0000000000..f5fcf48596 --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java @@ -0,0 +1,22 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; +import java.time.Duration; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +public class AsyncCommit { + + public static void main(String[] args) { + + KafkaConsumer consumer = + new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + consumer.commitAsync(); + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java new file mode 100644 index 0000000000..ff8ff52e76 --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java @@ -0,0 +1,24 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; +import java.time.Duration; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +public class AutomaticCommit { + + public static void main(String[] args) { + + Properties properties = KafkaConfigProperties.getProperties(); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + KafkaConsumer consumer = new KafkaConsumer<>(properties); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java new file mode 100644 index 0000000000..1ddb0c90b7 --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java @@ -0,0 +1,35 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +public class SpecificOffsetCommit { + public static void main(String[] args) { + + KafkaConsumer consumer = + new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + Map currentOffsets = new HashMap<>(); + int messageProcessed = 0; + while (true) { + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + messageProcessed++; + currentOffsets.put( + new TopicPartition(message.topic(), message.partition()), + new OffsetAndMetadata(message.offset() + 1)); + if (messageProcessed % 50 == 0) { + consumer.commitSync(currentOffsets); + } + } + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java new file mode 100644 index 0000000000..08c268070d --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java @@ -0,0 +1,22 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; +import java.time.Duration; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +public class SyncCommit { + + public static void main(String[] args) { + + KafkaConsumer consumer = + new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + consumer.commitSync(); + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java new file mode 100644 index 0000000000..0faa8698ac --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java @@ -0,0 +1,30 @@ +package com.baeldung.kafka.commitoffset.config; + +import java.util.ArrayList; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; + +/** + * @author amitkumar + */ +public class KafkaConfigProperties { + public static final String MY_TOPIC = "my-topic"; + + public static Properties getProperties() { + + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyFirstConsumer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + return props; + } + + public static ArrayList getTopic() { + ArrayList topics = new ArrayList<>(); + topics.add(MY_TOPIC); + return topics; + } +} diff --git a/apache-kafka-3/src/test/resources/logback.xml b/apache-kafka-3/src/test/resources/logback.xml new file mode 100644 index 0000000000..6156c2188e --- /dev/null +++ b/apache-kafka-3/src/test/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index a73d508d89..cab9136c59 100644 --- a/pom.xml +++ b/pom.xml @@ -690,6 +690,7 @@ apache-httpclient4 apache-httpclient apache-kafka-2 + apache-kafka-3 apache-kafka apache-libraries-2 apache-libraries @@ -930,6 +931,7 @@ apache-httpclient4 apache-httpclient apache-kafka-2 + apache-kafka-3 apache-kafka apache-libraries-2 apache-libraries From 6cd68f9e48f639a10757fd70da3cb3f9e72c2f22 Mon Sep 17 00:00:00 2001 From: Amit Kumatr Date: Thu, 8 Feb 2024 00:05:25 +0530 Subject: [PATCH 2/3] added jdk9 profile --- apache-kafka-3/pom.xml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/apache-kafka-3/pom.xml b/apache-kafka-3/pom.xml index 87ab2a7052..ad51e1de44 100644 --- a/apache-kafka-3/pom.xml +++ b/apache-kafka-3/pom.xml @@ -1,7 +1,7 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 apache-kafka-3 apache-kafka-3 @@ -40,4 +40,9 @@ 3.6.1 2.15.2 + + + integration-jdk9-and-above + + \ No newline at end of file From b57553eeb0356f6a8817746b02c3d867cc6cd683 Mon Sep 17 00:00:00 2001 From: Amit Kumatr Date: Thu, 15 Feb 2024 23:17:30 +0530 Subject: [PATCH 3/3] format the code --- .../kafka/commitoffset/AsyncCommit.java | 19 +++++----- .../kafka/commitoffset/AutomaticCommit.java | 20 ++++++----- .../commitoffset/SpecificOffsetCommit.java | 35 +++++++++---------- .../kafka/commitoffset/SyncCommit.java | 19 +++++----- .../config/KafkaConfigProperties.java | 31 ++++++++-------- 5 files changed, 64 insertions(+), 60 deletions(-) diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java index f5fcf48596..6b6e615d65 100644 --- a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java @@ -1,22 +1,23 @@ package com.baeldung.kafka.commitoffset; import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + import java.time.Duration; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class AsyncCommit { - public static void main(String[] args) { + public static void main(String[] args) { - KafkaConsumer consumer = - new KafkaConsumer<>(KafkaConfigProperties.getProperties()); - consumer.subscribe(KafkaConfigProperties.getTopic()); - ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); - for (ConsumerRecord message : messages) { - // processed message - consumer.commitAsync(); + KafkaConsumer consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + consumer.commitAsync(); + } } - } } diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java index ff8ff52e76..6fca7db43a 100644 --- a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java @@ -1,8 +1,10 @@ package com.baeldung.kafka.commitoffset; import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + import java.time.Duration; import java.util.Properties; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -10,15 +12,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; public class AutomaticCommit { - public static void main(String[] args) { + public static void main(String[] args) { - Properties properties = KafkaConfigProperties.getProperties(); - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - KafkaConsumer consumer = new KafkaConsumer<>(properties); - consumer.subscribe(KafkaConfigProperties.getTopic()); - ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); - for (ConsumerRecord message : messages) { - // processed message + Properties properties = KafkaConfigProperties.getProperties(); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + KafkaConsumer consumer = new KafkaConsumer<>(properties); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + } } - } } diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java index 1ddb0c90b7..07f099a844 100644 --- a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java @@ -1,9 +1,11 @@ package com.baeldung.kafka.commitoffset; import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + import java.time.Duration; import java.util.HashMap; import java.util.Map; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -11,25 +13,22 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; public class SpecificOffsetCommit { - public static void main(String[] args) { + public static void main(String[] args) { - KafkaConsumer consumer = - new KafkaConsumer<>(KafkaConfigProperties.getProperties()); - consumer.subscribe(KafkaConfigProperties.getTopic()); - Map currentOffsets = new HashMap<>(); - int messageProcessed = 0; - while (true) { - ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); - for (ConsumerRecord message : messages) { - // processed message - messageProcessed++; - currentOffsets.put( - new TopicPartition(message.topic(), message.partition()), - new OffsetAndMetadata(message.offset() + 1)); - if (messageProcessed % 50 == 0) { - consumer.commitSync(currentOffsets); + KafkaConsumer consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + Map currentOffsets = new HashMap<>(); + int messageProcessed = 0; + while (true) { + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + messageProcessed++; + currentOffsets.put(new TopicPartition(message.topic(), message.partition()), new OffsetAndMetadata(message.offset() + 1)); + if (messageProcessed % 50 == 0) { + consumer.commitSync(currentOffsets); + } + } } - } } - } } diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java index 08c268070d..54f6b5f826 100644 --- a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java @@ -1,22 +1,23 @@ package com.baeldung.kafka.commitoffset; import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + import java.time.Duration; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class SyncCommit { - public static void main(String[] args) { + public static void main(String[] args) { - KafkaConsumer consumer = - new KafkaConsumer<>(KafkaConfigProperties.getProperties()); - consumer.subscribe(KafkaConfigProperties.getTopic()); - ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); - for (ConsumerRecord message : messages) { - // processed message - consumer.commitSync(); + KafkaConsumer consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + consumer.commitSync(); + } } - } } diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java index 0faa8698ac..9b2096a610 100644 --- a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java @@ -2,6 +2,7 @@ package com.baeldung.kafka.commitoffset.config; import java.util.ArrayList; import java.util.Properties; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; @@ -9,22 +10,22 @@ import org.apache.kafka.common.serialization.StringDeserializer; * @author amitkumar */ public class KafkaConfigProperties { - public static final String MY_TOPIC = "my-topic"; + public static final String MY_TOPIC = "my-topic"; - public static Properties getProperties() { + public static Properties getProperties() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyFirstConsumer"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - return props; - } + Properties props = new Properties(); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyFirstConsumer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + return props; + } - public static ArrayList getTopic() { - ArrayList topics = new ArrayList<>(); - topics.add(MY_TOPIC); - return topics; - } + public static ArrayList getTopic() { + ArrayList topics = new ArrayList<>(); + topics.add(MY_TOPIC); + return topics; + } }