Remove the Kafka Producer wrapper, fix the class name

This commit is contained in:
SGWebFreelancer 2023-12-27 09:29:52 +08:00
parent 4a69837406
commit f58b136ff9
2 changed files with 18 additions and 44 deletions

View File

@ -1,26 +0,0 @@
package com.baeldung.partitioningstrategy;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void send(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
}
public void send(String topic, Integer partition, String key, String message) {
kafkaTemplate.send(topic, partition, key, message);
}
public void send(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}

View File

@ -13,10 +13,8 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@ -32,9 +30,10 @@ import static java.util.concurrent.TimeUnit.SECONDS;
@SpringBootTest
@EmbeddedKafka(partitions = 3, brokerProperties = { "listeners=PLAINTEXT://localhost:9092" })
public class KafkaApplicationIntegrationTesting {
public class KafkaApplicationIntegrationTest {
@Autowired
private KafkaProducer kafkaProducer;
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaMessageConsumer kafkaMessageConsumer;
@ -52,9 +51,9 @@ public class KafkaApplicationIntegrationTesting {
@Test
public void givenDefaultPartitioner_whenSendingMessagesWithoutKey_shouldUseStickyDistribution() throws InterruptedException {
kafkaProducer.send("default-topic", "message1");
kafkaProducer.send("default-topic", "message2");
kafkaProducer.send("default-topic", "message3");
kafkaTemplate.send("default-topic", "message1");
kafkaTemplate.send("default-topic", "message2");
kafkaTemplate.send("default-topic", "message3");
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
@ -66,15 +65,15 @@ public class KafkaApplicationIntegrationTesting {
.map(ReceivedMessage::getPartition)
.collect(Collectors.toSet());
Assert.assertEquals(1, uniquePartitions.size());
assertEquals(1, uniquePartitions.size());
}
@Test
void givenProducerWithSameKeyMessages_whenSendingMessages_shouldRouteToSamePartition() throws InterruptedException {
kafkaProducer.send("order-topic", "partitionA", "critical data");
kafkaProducer.send("order-topic", "partitionA", "more critical data");
kafkaProducer.send("order-topic", "partitionB", "another critical message");
kafkaProducer.send("order-topic", "partitionA", "another more critical data");
kafkaTemplate.send("order-topic", "partitionA", "critical data");
kafkaTemplate.send("order-topic", "partitionA", "more critical data");
kafkaTemplate.send("order-topic", "partitionB", "another critical message");
kafkaTemplate.send("order-topic", "partitionA", "another more critical data");
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
@ -94,9 +93,9 @@ public class KafkaApplicationIntegrationTesting {
@Test
public void givenProducerWithSameKeyMessages_whenSendingMessages_shouldReceiveInProducedOrder() throws InterruptedException {
kafkaProducer.send("order-topic", "partitionA", "message1");
kafkaProducer.send("order-topic", "partitionA", "message3");
kafkaProducer.send("order-topic", "partitionA", "message4");
kafkaTemplate.send("order-topic", "partitionA", "message1");
kafkaTemplate.send("order-topic", "partitionA", "message3");
kafkaTemplate.send("order-topic", "partitionA", "message4");
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
@ -108,7 +107,8 @@ public class KafkaApplicationIntegrationTesting {
records.forEach(record -> resultMessage.append(record.getMessage()));
String expectedMessage = "message1message3message4";
assertEquals("Messages with the same key should be received in the order they were produced within a partition", expectedMessage, resultMessage.toString());
assertEquals("Messages with the same key should be received in the order they were produced within a partition", expectedMessage,
resultMessage.toString());
}
@Test
@ -137,8 +137,8 @@ public class KafkaApplicationIntegrationTesting {
@Test
public void givenDirectPartitionAssignment_whenSendingMessages_shouldRouteToSpecifiedPartitions() throws Exception {
kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message");
kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message");
kafkaTemplate.send("order-topic", 0, "123_premium", "Premium order message");
kafkaTemplate.send("order-topic", 1, "456_normal", "Normal order message");
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()