Merge pull request #9366 from dev-chirag/master

BAEL3889 - Kafka Mock Producer
This commit is contained in:
Eric Martin 2020-06-07 10:46:27 -05:00 committed by GitHub
commit 21eb84e20d
4 changed files with 180 additions and 0 deletions

View File

@ -153,6 +153,13 @@
<artifactId>renjin-script-engine</artifactId>
<version>${renjin.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>

View File

@ -0,0 +1,17 @@
package com.baeldung.kafka.producer;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
public class EvenOddPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (((String) key).length() % 2 == 0) {
return 0;
}
return 1;
}
}

View File

@ -0,0 +1,40 @@
package com.baeldung.kafka.producer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.concurrent.Future;
public class KafkaProducer {
private final Producer<String, String> producer;
public KafkaProducer(Producer<String, String> producer) {
this.producer = producer;
}
public Future<RecordMetadata> send(String key, String value) {
ProducerRecord record = new ProducerRecord("topic_sports_news",
key, value);
return producer.send(record);
}
public void flush() {
producer.flush();
}
public void beginTransaction() {
producer.beginTransaction();
}
public void initTransaction() {
producer.initTransactions();
}
public void commitTransaction() {
producer.commitTransaction();
}
}

View File

@ -0,0 +1,116 @@
package com.baeldung.kafka.producer;
import com.baeldung.kafka.producer.EvenOddPartitioner;
import com.baeldung.kafka.producer.KafkaProducer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static java.util.Collections.emptySet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class KafkaProducerUnitTest {
private final String TOPIC_NAME = "topic_sports_news";
private KafkaProducer kafkaProducer;
private MockProducer<String, String> mockProducer;
private void buildMockProducer(boolean autoComplete) {
this.mockProducer = new MockProducer<>(autoComplete, new StringSerializer(), new StringSerializer());
}
@Test
void givenKeyValue_whenSend_thenVerifyHistory() throws ExecutionException, InterruptedException {
buildMockProducer(true);
//when
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");
//then
assertTrue(mockProducer.history().size() == 1);
assertTrue(mockProducer.history().get(0).key().equalsIgnoreCase("data"));
assertTrue(recordMetadataFuture.get().partition() == 0);
}
@Test
void givenKeyValue_whenSend_thenSendOnlyAfterFlush() {
buildMockProducer(false);
//when
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");
assertFalse(record.isDone());
//then
kafkaProducer.flush();
assertTrue(record.isDone());
}
@Test
void givenKeyValue_whenSend_thenReturnException() {
buildMockProducer(false);
//when
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> record = kafkaProducer.send("site", "{\"site\" : \"baeldung\"}");
RuntimeException e = new RuntimeException();
mockProducer.errorNext(e);
//then
try {
record.get();
} catch (ExecutionException | InterruptedException ex) {
assertEquals(e, ex.getCause());
}
assertTrue(record.isDone());
}
@Test
void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit() {
buildMockProducer(true);
//when
kafkaProducer = new KafkaProducer(mockProducer);
kafkaProducer.initTransaction();
kafkaProducer.beginTransaction();
Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");
//then
assertTrue(mockProducer.history().isEmpty());
kafkaProducer.commitTransaction();
assertTrue(mockProducer.history().size() == 1);
}
@Test
void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber() throws ExecutionException, InterruptedException {
PartitionInfo partitionInfo0 = new PartitionInfo(TOPIC_NAME, 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo(TOPIC_NAME, 1, null, null, null);
List<PartitionInfo> list = new ArrayList<>();
list.add(partitionInfo0);
list.add(partitionInfo1);
Cluster cluster = new Cluster("kafkab", new ArrayList<Node>(), list, emptySet(), emptySet());
this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(), new StringSerializer(), new StringSerializer());
//when
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("partition", "{\"site\" : \"baeldung\"}");
//then
assertTrue(recordMetadataFuture.get().partition() == 1);
}
}