From d10d29e0dc598d8f8dae47c2d59141785faeabac Mon Sep 17 00:00:00 2001 From: SGWebFreelancer Date: Mon, 25 Dec 2023 11:25:53 +0800 Subject: [PATCH] Fix comment --- .../CustomPartitioner.java | 1 + .../KafkaApplication.java | 7 ++- .../KafkaMessageConsumer.java | 17 ++++--- .../partitioningstrategy/KafkaProducer.java | 2 +- ...> KafkaApplicationIntegrationTesting.java} | 51 +++++++++++-------- 5 files changed, 44 insertions(+), 34 deletions(-) rename spring-kafka/src/test/java/com/baeldung/partitioningstrategy/{KafkaApplicationUnitTesting.java => KafkaApplicationIntegrationTesting.java} (81%) diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java index 63db2e1b2d..f02a91f6fc 100644 --- a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java @@ -1,6 +1,7 @@ package com.baeldung.partitioningstrategy; import java.util.Map; + import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java index ddde9ede3a..57ecc5e187 100644 --- a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java @@ -2,6 +2,7 @@ package com.baeldung.partitioningstrategy; import java.util.HashMap; import java.util.Map; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -18,14 +19,12 @@ import org.springframework.kafka.core.ProducerFactory; public class KafkaApplication { @Bean - public KafkaTemplate kafkaTemplate() - { + public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean - public ProducerFactory producerFactory() - { + public ProducerFactory producerFactory() { Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java index 308f34be56..131a112731 100644 --- a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java @@ -1,24 +1,23 @@ package com.baeldung.partitioningstrategy; -import java.util.ArrayList; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.List; + import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; import org.springframework.kafka.support.KafkaHeaders; + import jakarta.annotation.Nullable; @Service public class KafkaMessageConsumer { - private final List receivedMessages = new ArrayList<>(); + private final List receivedMessages = new CopyOnWriteArrayList<>(); - @KafkaListener(topics = {"order-topic", "default-topic"}, groupId = "test-group") - public void listen(@Payload String message, - @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, - @Header(KafkaHeaders.RECEIVED_KEY) @Nullable String key) { + @KafkaListener(topics = { "order-topic", "default-topic" }, groupId = "test-group") + public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.RECEIVED_KEY) @Nullable String key) { ReceivedMessage receivedMessage = new ReceivedMessage(key, message, partition); - System.out.println("Received message: " + receivedMessage); receivedMessages.add(receivedMessage); } @@ -26,5 +25,7 @@ public class KafkaMessageConsumer { return receivedMessages; } - public void clearReceivedMessages() { receivedMessages.clear(); } + public void clearReceivedMessages() { + receivedMessages.clear(); + } } diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java index 380292c5b8..c28f855976 100644 --- a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java @@ -16,7 +16,7 @@ public class KafkaProducer { kafkaTemplate.send(topic, key, message); } - public void send(String topic, String message) { + public void send(String topic, String message) { kafkaTemplate.send(topic, message); } } \ No newline at end of file diff --git a/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationUnitTesting.java b/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java similarity index 81% rename from spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationUnitTesting.java rename to spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java index 1ae9670deb..1e44ae1714 100644 --- a/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationUnitTesting.java +++ b/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java @@ -1,11 +1,13 @@ package com.baeldung.partitioningstrategy; import static org.junit.Assert.assertEquals; + import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; + import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -23,9 +25,12 @@ import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; +import static org.awaitility.Awaitility.await; +import static java.util.concurrent.TimeUnit.SECONDS; + @SpringBootTest -@EmbeddedKafka(partitions = 3, brokerProperties = {"listeners=PLAINTEXT://localhost:9092"}) -public class KafkaApplicationUnitTesting { +@EmbeddedKafka(partitions = 3, brokerProperties = { "listeners=PLAINTEXT://localhost:9092" }) +public class KafkaApplicationIntegrationTesting { @Autowired private KafkaProducer kafkaProducer; @@ -46,13 +51,16 @@ public class KafkaApplicationUnitTesting { @Test public void givenDefaultPartitioner_whenSendingMessagesWithoutKey_shouldUseStickyDistribution() throws InterruptedException { kafkaProducer.send("default-topic", "message1"); - kafkaProducer.send("default-topic","message2"); - kafkaProducer.send("default-topic","message3"); + kafkaProducer.send("default-topic", "message2"); + kafkaProducer.send("default-topic", "message3"); - Thread.sleep(2000); + await().atMost(2, SECONDS) + .until(() -> kafkaMessageConsumer.getReceivedMessages() + .size() >= 3); List records = kafkaMessageConsumer.getReceivedMessages(); - int expectedPartition = records.get(0).getPartition(); + int expectedPartition = records.get(0) + .getPartition(); for (ReceivedMessage record : records) { if (record.getKey() == null) { @@ -68,17 +76,18 @@ public class KafkaApplicationUnitTesting { kafkaProducer.send("order-topic", "partitionB", "another critical message"); kafkaProducer.send("order-topic", "partitionA", "another more critical data"); - Thread.sleep(1000); + await().atMost(2, SECONDS) + .until(() -> kafkaMessageConsumer.getReceivedMessages() + .size() >= 4); List records = kafkaMessageConsumer.getReceivedMessages(); Map> messagesByKey = groupMessagesByKey(records); messagesByKey.forEach((key, messages) -> { - int expectedPartition = messages.get(0).getPartition(); + int expectedPartition = messages.get(0) + .getPartition(); for (ReceivedMessage message : messages) { - assertEquals("Messages with key '" + key + "' should be in the same partition", - message.getPartition(), - expectedPartition); + assertEquals("Messages with key '" + key + "' should be in the same partition", message.getPartition(), expectedPartition); } }); } @@ -89,7 +98,9 @@ public class KafkaApplicationUnitTesting { kafkaProducer.send("order-topic", "partitionA", "message3"); kafkaProducer.send("order-topic", "partitionA", "message4"); - Thread.sleep(1000); + await().atMost(2, SECONDS) + .until(() -> kafkaMessageConsumer.getReceivedMessages() + .size() >= 3); List records = kafkaMessageConsumer.getReceivedMessages(); @@ -97,11 +108,7 @@ public class KafkaApplicationUnitTesting { records.forEach(record -> resultMessage.append(record.getMessage())); String expectedMessage = "message1message3message4"; - assertEquals( - "Messages with the same key should be received in the order they were produced within a partition", - expectedMessage, - resultMessage.toString() - ); + assertEquals("Messages with the same key should be received in the order they were produced within a partition", expectedMessage, resultMessage.toString()); } @Test @@ -112,7 +119,9 @@ public class KafkaApplicationUnitTesting { kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message"); kafkaTemplate.send("order-topic", "456_normal", "Normal order message"); - Thread.sleep(1000); + await().atMost(2, SECONDS) + .until(() -> kafkaMessageConsumer.getReceivedMessages() + .size() >= 2); List records = kafkaMessageConsumer.getReceivedMessages(); @@ -133,7 +142,9 @@ public class KafkaApplicationUnitTesting { kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message"); kafkaTemplate.send("order-topic", "456_normal", "Normal order message"); - Thread.sleep(1000); + await().atMost(2, SECONDS) + .until(() -> kafkaMessageConsumer.getReceivedMessages() + .size() >= 2); consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0))); ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); @@ -142,8 +153,6 @@ public class KafkaApplicationUnitTesting { assertEquals("Premium order message should be in partition 0", 0, record.partition()); assertEquals("123_premium", record.key()); } - - consumer.close(); } private KafkaTemplate setProducerToUseCustomPartitioner() {