Single and Multiple Partition test.
This commit is contained in:
parent
307d0f5e40
commit
8772eaa673
|
@ -60,7 +60,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
<artifactId>jackson-databind</artifactId>
|
<artifactId>jackson-databind</artifactId>
|
||||||
<version>2.15.2</version>
|
<version>${jackson.databind.version} </version>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
@ -69,6 +69,7 @@
|
||||||
<kafka.version>2.8.0</kafka.version>
|
<kafka.version>2.8.0</kafka.version>
|
||||||
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
|
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
|
||||||
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
|
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
|
||||||
|
<jackson.databind.version>2.15.2</jackson.databind.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
</project>
|
</project>
|
|
@ -1,5 +1,6 @@
|
||||||
package com.baeldung.kafka.message.ordering.payload;
|
package com.baeldung.kafka.message.ordering.payload;
|
||||||
|
|
||||||
|
import javax.swing.*;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
public class Message implements Comparable<Message> {
|
public class Message implements Comparable<Message> {
|
||||||
|
@ -10,6 +11,7 @@ public class Message implements Comparable<Message> {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Required for Kafka Serialization and Deserialization
|
||||||
public Message(long insertPosition, long messageId) {
|
public Message(long insertPosition, long messageId) {
|
||||||
this.insertPosition = insertPosition;
|
this.insertPosition = insertPosition;
|
||||||
this.messageId = messageId;
|
this.messageId = messageId;
|
||||||
|
@ -28,6 +30,18 @@ public class Message implements Comparable<Message> {
|
||||||
return Long.compare(this.messageId, other.messageId);
|
return Long.compare(this.messageId, other.messageId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj == this) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!(obj instanceof Message)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Message message = (Message) obj;
|
||||||
|
return this.messageId == message.getMessageId() && this.insertPosition == message.getInsertPosition();
|
||||||
|
}
|
||||||
|
|
||||||
public static long getRandomMessageId() {
|
public static long getRandomMessageId() {
|
||||||
Random rand = new Random();
|
Random rand = new Random();
|
||||||
return rand.nextInt(1000);
|
return rand.nextInt(1000);
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
package com.baeldung.kafka.message.ordering;
|
||||||
|
|
||||||
|
import com.baeldung.kafka.message.ordering.payload.Message;
|
||||||
|
import lombok.var;
|
||||||
|
import org.apache.kafka.clients.admin.*;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||||
|
import org.apache.kafka.common.KafkaFuture;
|
||||||
|
import org.apache.kafka.common.PartitionInfo;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.testcontainers.containers.KafkaContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
import org.testcontainers.utility.DockerImageName;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
public class MultiplePartitionTest {
|
||||||
|
private static String TOPIC = "multi_partition_topic";
|
||||||
|
private static int PARTITIONS = 5;
|
||||||
|
private static short REPLICATION_FACTOR = 1;
|
||||||
|
private static Admin admin;
|
||||||
|
private static KafkaProducer<String, Message> producer;
|
||||||
|
private static KafkaConsumer<String, Message> consumer;
|
||||||
|
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000);
|
||||||
|
@Container
|
||||||
|
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void setup() throws ExecutionException, InterruptedException {
|
||||||
|
KAFKA_CONTAINER.addExposedPort(9092);
|
||||||
|
|
||||||
|
Properties adminProperties = new Properties();
|
||||||
|
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||||
|
|
||||||
|
Properties producerProperties = new Properties();
|
||||||
|
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||||
|
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
|
producerProperties.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer");
|
||||||
|
|
||||||
|
Properties consumerProperties = new Properties();
|
||||||
|
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||||
|
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||||
|
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer");
|
||||||
|
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
|
consumerProperties.put("value.deserializer.serializedClass", Message.class);
|
||||||
|
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
|
||||||
|
admin = Admin.create(adminProperties);
|
||||||
|
producer = new KafkaProducer<>(producerProperties);
|
||||||
|
consumer = new KafkaConsumer<>(consumerProperties);
|
||||||
|
List<NewTopic> topicList = new ArrayList<>();
|
||||||
|
NewTopic newTopic = new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR);
|
||||||
|
topicList.add(newTopic);
|
||||||
|
CreateTopicsResult result = admin.createTopics(topicList);
|
||||||
|
KafkaFuture<Void> future = result.values().get(TOPIC);
|
||||||
|
future.whenComplete((voidResult, exception) -> {
|
||||||
|
if (exception != null) {
|
||||||
|
System.err.println("Error creating the topic: " + exception.getMessage());
|
||||||
|
} else {
|
||||||
|
System.out.println("Topic created successfully!");
|
||||||
|
}
|
||||||
|
}).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
static void destroy() {
|
||||||
|
KAFKA_CONTAINER.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenMultiplePartitions_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
|
||||||
|
List<Message> sentMessageList = new ArrayList<>();
|
||||||
|
List<Message> receivedMessageList = new ArrayList<>();
|
||||||
|
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
|
||||||
|
long messageId = Message.getRandomMessageId();
|
||||||
|
String key = "Key-" + insertPosition;
|
||||||
|
Message message = new Message(insertPosition, messageId);
|
||||||
|
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC, key, message));
|
||||||
|
sentMessageList.add(message);
|
||||||
|
RecordMetadata metadata = future.get();
|
||||||
|
System.out.println("Partition : " + metadata.partition());
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isOrderMaintained = true;
|
||||||
|
consumer.subscribe(Collections.singletonList(TOPIC));
|
||||||
|
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
|
||||||
|
records.forEach(record -> {
|
||||||
|
Message message = record.value();
|
||||||
|
receivedMessageList.add(message);
|
||||||
|
});
|
||||||
|
for (int insertPosition = 0; insertPosition <= receivedMessageList.size() - 1; insertPosition++) {
|
||||||
|
if (isOrderMaintained){
|
||||||
|
Message sentMessage = sentMessageList.get(insertPosition);
|
||||||
|
Message receivedMessage = receivedMessageList.get(insertPosition);
|
||||||
|
if (!sentMessage.equals(receivedMessage)) {
|
||||||
|
isOrderMaintained = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertFalse(isOrderMaintained);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,115 @@
|
||||||
|
package com.baeldung.kafka.message.ordering;
|
||||||
|
|
||||||
|
import com.baeldung.kafka.message.ordering.payload.Message;
|
||||||
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
|
import org.apache.kafka.clients.admin.CreateTopicsResult;
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||||
|
import org.apache.kafka.common.KafkaFuture;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.testcontainers.containers.KafkaContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
import org.testcontainers.utility.DockerImageName;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
public class SinglePartitionTest {
|
||||||
|
private static String TOPIC = "single_partition_topic";
|
||||||
|
private static int PARTITIONS = 1;
|
||||||
|
private static short REPLICATION_FACTOR = 1;
|
||||||
|
private static Admin admin;
|
||||||
|
private static KafkaProducer<String, Message> producer;
|
||||||
|
private static KafkaConsumer<String, Message> consumer;
|
||||||
|
|
||||||
|
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(5000);
|
||||||
|
|
||||||
|
@Container
|
||||||
|
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void setup() throws ExecutionException, InterruptedException {
|
||||||
|
KAFKA_CONTAINER.addExposedPort(9092);
|
||||||
|
|
||||||
|
Properties adminProperties = new Properties();
|
||||||
|
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||||
|
|
||||||
|
Properties producerProperties = new Properties();
|
||||||
|
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||||
|
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
|
producerProperties.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer");
|
||||||
|
|
||||||
|
Properties consumerProperties = new Properties();
|
||||||
|
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||||
|
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||||
|
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer");
|
||||||
|
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
|
consumerProperties.put("value.deserializer.serializedClass", Message.class);
|
||||||
|
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
|
||||||
|
admin = Admin.create(adminProperties);
|
||||||
|
producer = new KafkaProducer<>(producerProperties);
|
||||||
|
consumer = new KafkaConsumer<>(consumerProperties);
|
||||||
|
List<NewTopic> topicList = new ArrayList<>();
|
||||||
|
NewTopic newTopic = new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR);
|
||||||
|
topicList.add(newTopic);
|
||||||
|
CreateTopicsResult result = admin.createTopics(topicList);
|
||||||
|
KafkaFuture<Void> future = result.values().get(TOPIC);
|
||||||
|
future.whenComplete((voidResult, exception) -> {
|
||||||
|
if (exception != null) {
|
||||||
|
System.err.println("Error creating the topic: " + exception.getMessage());
|
||||||
|
} else {
|
||||||
|
System.out.println("Topic created successfully!");
|
||||||
|
}
|
||||||
|
}).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
static void destroy() {
|
||||||
|
KAFKA_CONTAINER.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
|
||||||
|
List<Message> sentMessageList = new ArrayList<>();
|
||||||
|
List<Message> receivedMessageList = new ArrayList<>();
|
||||||
|
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
|
||||||
|
long messageId = Message.getRandomMessageId();
|
||||||
|
String key = "Key-" + insertPosition;
|
||||||
|
Message message = new Message(insertPosition, messageId);
|
||||||
|
ProducerRecord<String, Message> producerRecord = new ProducerRecord<>(TOPIC, key, message);
|
||||||
|
Future<RecordMetadata> future = producer.send(producerRecord);
|
||||||
|
sentMessageList.add(message);
|
||||||
|
RecordMetadata metadata = future.get();
|
||||||
|
System.out.println("Partition : " + metadata.partition());
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer.subscribe(Collections.singletonList(TOPIC));
|
||||||
|
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
|
||||||
|
records.forEach(record -> {
|
||||||
|
Message message = record.value();
|
||||||
|
receivedMessageList.add(message);
|
||||||
|
});
|
||||||
|
boolean result = true;
|
||||||
|
for (int count = 0; count <= 9 ; count++) {
|
||||||
|
Message sentMessage = sentMessageList.get(count);
|
||||||
|
Message receivedMessage = receivedMessageList.get(count);
|
||||||
|
if (!sentMessage.equals(receivedMessage) && result){
|
||||||
|
result = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(result);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue