diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java index 7cfbdd5fb0..2919ae1d7b 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java @@ -8,29 +8,22 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; @Service public class TemperatureConsumer { - private CountDownLatch latch = new CountDownLatch(1); - Map> consumedRecords = new ConcurrentHashMap<>(); @KafkaListener(topics = "celcius-scale-topic", groupId = "group-1") public void consumer1(ConsumerRecord consumerRecord) { - computeConsumedRecord("consumer-1", consumerRecord.partition()); + trackConsumedPartitions("consumer-1", consumerRecord.partition()); } - private void computeConsumedRecord(String key, int consumerRecord) { - consumedRecords.computeIfAbsent(key, k -> new HashSet<>()); - consumedRecords.computeIfPresent(key, (k, v) -> { - v.add(String.valueOf(consumerRecord)); + private void trackConsumedPartitions(String consumerName, int partitionNumber) { + consumedRecords.computeIfAbsent(consumerName, k -> new HashSet<>()); + consumedRecords.computeIfPresent(consumerName, (k, v) -> { + v.add(String.valueOf(partitionNumber)); return v; }); } - - public CountDownLatch getLatch() { - return latch; - } } diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java index 309c87125a..de720ef955 100644 --- a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java @@ -7,8 +7,6 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; -import java.util.concurrent.TimeUnit; - @SpringBootTest(classes = ThermostatApplicationKafkaApp.class) @EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) public class KafkaTopicsAndPartitionsIntegrationTest { @@ -24,7 +22,7 @@ public class KafkaTopicsAndPartitionsIntegrationTest { @Test public void givenTopic_andConsumerGroup_whenConsumersListenToEvents_thenConsumeItCorrectly() throws Exception { service.measureCelsiusAndPublish(10000); - consumer.getLatch().await(1, TimeUnit.SECONDS); + Thread.sleep(1000); System.out.println(consumer.consumedRecords); } }