Fix comment

This commit is contained in:
SGWebFreelancer 2023-12-25 11:25:53 +08:00
parent c3b97034ed
commit d10d29e0dc
5 changed files with 44 additions and 34 deletions

View File

@ -1,6 +1,7 @@
package com.baeldung.partitioningstrategy;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

View File

@ -2,6 +2,7 @@ package com.baeldung.partitioningstrategy;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -18,14 +19,12 @@ import org.springframework.kafka.core.ProducerFactory;
public class KafkaApplication {
@Bean
public KafkaTemplate<String, String> kafkaTemplate()
{
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory()
{
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

View File

@ -1,24 +1,23 @@
package com.baeldung.partitioningstrategy;
import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.List;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.kafka.support.KafkaHeaders;
import jakarta.annotation.Nullable;
@Service
public class KafkaMessageConsumer {
private final List<ReceivedMessage> receivedMessages = new ArrayList<>();
private final List<ReceivedMessage> receivedMessages = new CopyOnWriteArrayList<>();
@KafkaListener(topics = {"order-topic", "default-topic"}, groupId = "test-group")
public void listen(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_KEY) @Nullable String key) {
@KafkaListener(topics = { "order-topic", "default-topic" }, groupId = "test-group")
public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.RECEIVED_KEY) @Nullable String key) {
ReceivedMessage receivedMessage = new ReceivedMessage(key, message, partition);
System.out.println("Received message: " + receivedMessage);
receivedMessages.add(receivedMessage);
}
@ -26,5 +25,7 @@ public class KafkaMessageConsumer {
return receivedMessages;
}
public void clearReceivedMessages() { receivedMessages.clear(); }
public void clearReceivedMessages() {
receivedMessages.clear();
}
}

View File

@ -16,7 +16,7 @@ public class KafkaProducer {
kafkaTemplate.send(topic, key, message);
}
public void send(String topic, String message) {
public void send(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}

View File

@ -1,11 +1,13 @@
package com.baeldung.partitioningstrategy;
import static org.junit.Assert.assertEquals;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -23,9 +25,12 @@ import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import static org.awaitility.Awaitility.await;
import static java.util.concurrent.TimeUnit.SECONDS;
@SpringBootTest
@EmbeddedKafka(partitions = 3, brokerProperties = {"listeners=PLAINTEXT://localhost:9092"})
public class KafkaApplicationUnitTesting {
@EmbeddedKafka(partitions = 3, brokerProperties = { "listeners=PLAINTEXT://localhost:9092" })
public class KafkaApplicationIntegrationTesting {
@Autowired
private KafkaProducer kafkaProducer;
@ -46,13 +51,16 @@ public class KafkaApplicationUnitTesting {
@Test
public void givenDefaultPartitioner_whenSendingMessagesWithoutKey_shouldUseStickyDistribution() throws InterruptedException {
kafkaProducer.send("default-topic", "message1");
kafkaProducer.send("default-topic","message2");
kafkaProducer.send("default-topic","message3");
kafkaProducer.send("default-topic", "message2");
kafkaProducer.send("default-topic", "message3");
Thread.sleep(2000);
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 3);
List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
int expectedPartition = records.get(0).getPartition();
int expectedPartition = records.get(0)
.getPartition();
for (ReceivedMessage record : records) {
if (record.getKey() == null) {
@ -68,17 +76,18 @@ public class KafkaApplicationUnitTesting {
kafkaProducer.send("order-topic", "partitionB", "another critical message");
kafkaProducer.send("order-topic", "partitionA", "another more critical data");
Thread.sleep(1000);
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 4);
List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
Map<String, List<ReceivedMessage>> messagesByKey = groupMessagesByKey(records);
messagesByKey.forEach((key, messages) -> {
int expectedPartition = messages.get(0).getPartition();
int expectedPartition = messages.get(0)
.getPartition();
for (ReceivedMessage message : messages) {
assertEquals("Messages with key '" + key + "' should be in the same partition",
message.getPartition(),
expectedPartition);
assertEquals("Messages with key '" + key + "' should be in the same partition", message.getPartition(), expectedPartition);
}
});
}
@ -89,7 +98,9 @@ public class KafkaApplicationUnitTesting {
kafkaProducer.send("order-topic", "partitionA", "message3");
kafkaProducer.send("order-topic", "partitionA", "message4");
Thread.sleep(1000);
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 3);
List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
@ -97,11 +108,7 @@ public class KafkaApplicationUnitTesting {
records.forEach(record -> resultMessage.append(record.getMessage()));
String expectedMessage = "message1message3message4";
assertEquals(
"Messages with the same key should be received in the order they were produced within a partition",
expectedMessage,
resultMessage.toString()
);
assertEquals("Messages with the same key should be received in the order they were produced within a partition", expectedMessage, resultMessage.toString());
}
@Test
@ -112,7 +119,9 @@ public class KafkaApplicationUnitTesting {
kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");
Thread.sleep(1000);
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 2);
List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
@ -133,7 +142,9 @@ public class KafkaApplicationUnitTesting {
kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");
Thread.sleep(1000);
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 2);
consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
@ -142,8 +153,6 @@ public class KafkaApplicationUnitTesting {
assertEquals("Premium order message should be in partition 0", 0, record.partition());
assertEquals("123_premium", record.key());
}
consumer.close();
}
private KafkaTemplate<String, String> setProducerToUseCustomPartitioner() {