Fix formatting and added in direct assignment partition

This commit is contained in:
SGWebFreelancer 2023-12-25 13:33:22 +08:00
parent d10d29e0dc
commit 4a69837406
3 changed files with 32 additions and 7 deletions

View File

@ -18,6 +18,7 @@ public class KafkaMessageConsumer {
@KafkaListener(topics = { "order-topic", "default-topic" }, groupId = "test-group")
public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.RECEIVED_KEY) @Nullable String key) {
ReceivedMessage receivedMessage = new ReceivedMessage(key, message, partition);
System.out.println("Received message: " + receivedMessage);
receivedMessages.add(receivedMessage);
}

View File

@ -16,6 +16,10 @@ public class KafkaProducer {
kafkaTemplate.send(topic, key, message);
}
public void send(String topic, Integer partition, String key, String message) {
kafkaTemplate.send(topic, partition, key, message);
}
public void send(String topic, String message) {
kafkaTemplate.send(topic, message);
}

View File

@ -6,12 +6,14 @@ import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
@ -59,14 +61,12 @@ public class KafkaApplicationIntegrationTesting {
.size() >= 3);
List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
int expectedPartition = records.get(0)
.getPartition();
for (ReceivedMessage record : records) {
if (record.getKey() == null) {
Assert.assertEquals("Message without key should be in the same partition", expectedPartition, record.getPartition());
}
}
Set<Integer> uniquePartitions = records.stream()
.map(ReceivedMessage::getPartition)
.collect(Collectors.toSet());
Assert.assertEquals(1, uniquePartitions.size());
}
@Test
@ -135,6 +135,26 @@ public class KafkaApplicationIntegrationTesting {
}
}
@Test
public void givenDirectPartitionAssignment_whenSendingMessages_shouldRouteToSpecifiedPartitions() throws Exception {
kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message");
kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message");
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 2);
List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
for (ReceivedMessage record : records) {
if ("123_premium".equals(record.getKey())) {
assertEquals("Premium order message should be in partition 0", 0, record.getPartition());
} else if ("456_normal".equals(record.getKey())) {
assertEquals("Normal order message should be in partition 1", 1, record.getPartition());
}
}
}
@Test
public void givenCustomPartitioner_whenSendingMessages_shouldConsumeOnlyFromSpecificPartition() throws InterruptedException {
KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();