From ae29eca0f8188b12b71ea8430ffa429535cf694b Mon Sep 17 00:00:00 2001 From: Pedro Lopes Date: Sun, 1 Oct 2023 12:22:34 -0300 Subject: [PATCH] BAEL-6927: Update article "Understanding Kafka Topics and Partitions" (#14837) * consumer config. topic config. driver and calculator classes. * basic app working. test structure * final version * wraping up * optimizing imports * comments changes * addressing improvement changes --- .../TemperatureConsumer.java | 17 +++++------------ ...KafkaTopicsAndPartitionsIntegrationTest.java | 4 +--- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java index 7cfbdd5fb0..2919ae1d7b 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java @@ -8,29 +8,22 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; @Service public class TemperatureConsumer { - private CountDownLatch latch = new CountDownLatch(1); - Map> consumedRecords = new ConcurrentHashMap<>(); @KafkaListener(topics = "celcius-scale-topic", groupId = "group-1") public void consumer1(ConsumerRecord consumerRecord) { - computeConsumedRecord("consumer-1", consumerRecord.partition()); + trackConsumedPartitions("consumer-1", consumerRecord.partition()); } - private void computeConsumedRecord(String key, int consumerRecord) { - consumedRecords.computeIfAbsent(key, k -> new HashSet<>()); - consumedRecords.computeIfPresent(key, (k, v) -> { - v.add(String.valueOf(consumerRecord)); + private void trackConsumedPartitions(String consumerName, int partitionNumber) { + consumedRecords.computeIfAbsent(consumerName, k -> new HashSet<>()); + consumedRecords.computeIfPresent(consumerName, (k, v) -> { + v.add(String.valueOf(partitionNumber)); return v; }); } - - public CountDownLatch getLatch() { - return latch; - } } diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java index 309c87125a..de720ef955 100644 --- a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java @@ -7,8 +7,6 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; -import java.util.concurrent.TimeUnit; - @SpringBootTest(classes = ThermostatApplicationKafkaApp.class) @EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) public class KafkaTopicsAndPartitionsIntegrationTest { @@ -24,7 +22,7 @@ public class KafkaTopicsAndPartitionsIntegrationTest { @Test public void givenTopic_andConsumerGroup_whenConsumersListenToEvents_thenConsumeItCorrectly() throws Exception { service.measureCelsiusAndPublish(10000); - consumer.getLatch().await(1, TimeUnit.SECONDS); + Thread.sleep(1000); System.out.println(consumer.consumedRecords); } }