JAVA-6390: Move kafka articles from libraries-6 to new module

apache-kafka
This commit is contained in:
sampadawagde 2021-08-15 17:28:59 +05:30
parent d13f26e437
commit b4b48d4268
3 changed files with 16 additions and 15 deletions

View File

@ -1,4 +1,4 @@
package com.baeldung.kafka; package com.baeldung.kafka.exactlyonce;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
@ -24,16 +24,16 @@ public class TransactionalMessageProducer {
producer.initTransactions(); producer.initTransactions();
try{ try {
producer.beginTransaction(); producer.beginTransaction();
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2).forEach(s -> producer.send( Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2)
new ProducerRecord<String, String>("input", null, s))); .forEach(s -> producer.send(new ProducerRecord<String, String>("input", null, s)));
producer.commitTransaction(); producer.commitTransaction();
}catch (KafkaException e){ } catch (KafkaException e) {
producer.abortTransaction(); producer.abortTransaction();

View File

@ -1,4 +1,4 @@
package com.baeldung.kafka; package com.baeldung.kafka.exactlyonce;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -43,10 +43,11 @@ public class TransactionalWordCount {
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60)); ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0)) Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0))
.stream() .stream()
.flatMap(record -> Stream.of(record.value().split(" "))) .flatMap(record -> Stream.of(record.value()
.map(word -> Tuple.of(word, 1)) .split(" ")))
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2)); .map(word -> Tuple.of(word, 1))
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));
producer.beginTransaction(); producer.beginTransaction();
@ -56,7 +57,8 @@ public class TransactionalWordCount {
for (TopicPartition partition : records.partitions()) { for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition); List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); long offset = partitionedRecords.get(partitionedRecords.size() - 1)
.offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
} }
@ -72,7 +74,6 @@ public class TransactionalWordCount {
} }
} }
private static KafkaConsumer<String, String> createKafkaConsumer() { private static KafkaConsumer<String, String> createKafkaConsumer() {

View File

@ -1,4 +1,4 @@
package com.baeldung.kafka; package com.baeldung.kafka.exactlyonce;
public class Tuple { public class Tuple {
@ -10,8 +10,8 @@ public class Tuple {
this.value = value; this.value = value;
} }
public static Tuple of(String key, Integer value){ public static Tuple of(String key, Integer value) {
return new Tuple(key,value); return new Tuple(key, value);
} }
public String getKey() { public String getKey() {