diff --git a/apache-kafka-3/README.md b/apache-kafka-3/README.md new file mode 100644 index 0000000000..0b96e55385 --- /dev/null +++ b/apache-kafka-3/README.md @@ -0,0 +1,9 @@ +## Apache Kafka + +This module contains articles about Apache Kafka. + +##### Building the project +You can build the project from the command line using: *mvn clean install*, or in an IDE. + +### Relevant Articles: + diff --git a/apache-kafka-3/pom.xml b/apache-kafka-3/pom.xml new file mode 100644 index 0000000000..ad51e1de44 --- /dev/null +++ b/apache-kafka-3/pom.xml @@ -0,0 +1,48 @@ + + + 4.0.0 + apache-kafka-3 + apache-kafka-3 + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.slf4j + slf4j-api + ${org.slf4j.version} + + + org.projectlombok + lombok + ${lombok.version} + provided + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.databind.version} + + + + + 3.6.1 + 2.15.2 + + + + integration-jdk9-and-above + + + \ No newline at end of file diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java new file mode 100644 index 0000000000..6b6e615d65 --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AsyncCommit.java @@ -0,0 +1,23 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + +import java.time.Duration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +public class AsyncCommit { + + public static void main(String[] args) { + + KafkaConsumer consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + consumer.commitAsync(); + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java new file mode 100644 index 0000000000..6fca7db43a --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/AutomaticCommit.java @@ -0,0 +1,26 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + +import java.time.Duration; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +public class AutomaticCommit { + + public static void main(String[] args) { + + Properties properties = KafkaConfigProperties.getProperties(); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + KafkaConsumer consumer = new KafkaConsumer<>(properties); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java new file mode 100644 index 0000000000..07f099a844 --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SpecificOffsetCommit.java @@ -0,0 +1,34 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +public class SpecificOffsetCommit { + public static void main(String[] args) { + + KafkaConsumer consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + Map currentOffsets = new HashMap<>(); + int messageProcessed = 0; + while (true) { + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + messageProcessed++; + currentOffsets.put(new TopicPartition(message.topic(), message.partition()), new OffsetAndMetadata(message.offset() + 1)); + if (messageProcessed % 50 == 0) { + consumer.commitSync(currentOffsets); + } + } + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java new file mode 100644 index 0000000000..54f6b5f826 --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/SyncCommit.java @@ -0,0 +1,23 @@ +package com.baeldung.kafka.commitoffset; + +import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties; + +import java.time.Duration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +public class SyncCommit { + + public static void main(String[] args) { + + KafkaConsumer consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties()); + consumer.subscribe(KafkaConfigProperties.getTopic()); + ConsumerRecords messages = consumer.poll(Duration.ofSeconds(10)); + for (ConsumerRecord message : messages) { + // processed message + consumer.commitSync(); + } + } +} diff --git a/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java new file mode 100644 index 0000000000..9b2096a610 --- /dev/null +++ b/apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java @@ -0,0 +1,31 @@ +package com.baeldung.kafka.commitoffset.config; + +import java.util.ArrayList; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; + +/** + * @author amitkumar + */ +public class KafkaConfigProperties { + public static final String MY_TOPIC = "my-topic"; + + public static Properties getProperties() { + + Properties props = new Properties(); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyFirstConsumer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + return props; + } + + public static ArrayList getTopic() { + ArrayList topics = new ArrayList<>(); + topics.add(MY_TOPIC); + return topics; + } +} diff --git a/apache-kafka-3/src/test/resources/logback.xml b/apache-kafka-3/src/test/resources/logback.xml new file mode 100644 index 0000000000..6156c2188e --- /dev/null +++ b/apache-kafka-3/src/test/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1165a5371a..a9b80de9c4 100644 --- a/pom.xml +++ b/pom.xml @@ -660,6 +660,7 @@ apache-httpclient4 apache-httpclient apache-kafka-2 + apache-kafka-3 apache-kafka apache-libraries-2 apache-libraries @@ -907,6 +908,7 @@ apache-httpclient4 apache-httpclient apache-kafka-2 + apache-kafka-3 apache-kafka apache-libraries-2 apache-libraries