Merge pull request #14945 from aamolgote/kafka/message-ordering-BAEL-6796

Message Ordering - First Draft.
This commit is contained in:
Liam Williams 2023-11-15 21:05:52 +00:00 committed by GitHub
commit 1a555cd9a7
10 changed files with 487 additions and 6 deletions

View File

@ -1 +0,0 @@
log4j.rootLogger=INFO, stdout

View File

@ -23,11 +23,6 @@
<artifactId>slf4j-api</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -57,6 +52,11 @@
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.databind.version} </version>
</dependency>
</dependencies>
<properties>
@ -64,6 +64,7 @@
<kafka.version>2.8.0</kafka.version>
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
<jackson.databind.version>2.15.2</jackson.databind.version>
</properties>
</project>

View File

@ -0,0 +1,11 @@
package com.baeldung.kafka.message.ordering;
public class Config {
public static final String CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS = "value.deserializer.serializedClass";
public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic";
public static final String SINGLE_PARTITION_TOPIC = "single_partition_topic";
public static final int MULTIPLE_PARTITIONS = 5;
public static final int SINGLE_PARTITION = 1;
public static final short REPLICATION_FACTOR = 1;
}

View File

@ -0,0 +1,61 @@
package com.baeldung.kafka.message.ordering.payload;
import java.util.Objects;
public class UserEvent implements Comparable<UserEvent> {
private String userEventId;
private long eventNanoTime;
private long globalSequenceNumber;
@SuppressWarnings("unused")
public UserEvent() {
// Required for Jackson Serialization and Deserialization
}
public UserEvent(String userEventId) {
this.userEventId = userEventId;
}
public String getUserEventId() {
return userEventId;
}
public long getEventNanoTime() {
return eventNanoTime;
}
public void setEventNanoTime(long eventNanoTime) {
this.eventNanoTime = eventNanoTime;
}
public long getGlobalSequenceNumber() {
return globalSequenceNumber;
}
public void setGlobalSequenceNumber(long globalSequenceNumber) {
this.globalSequenceNumber = globalSequenceNumber;
}
@Override
public int compareTo(UserEvent other) {
return Long.compare(this.globalSequenceNumber, other.globalSequenceNumber);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof UserEvent)) {
return false;
}
UserEvent userEvent = (UserEvent) obj;
return this.globalSequenceNumber == userEvent.globalSequenceNumber;
}
@Override
public int hashCode() {
return Objects.hash(globalSequenceNumber);
}
}

View File

@ -0,0 +1,34 @@
package com.baeldung.kafka.message.ordering.serialization;
import com.baeldung.kafka.message.ordering.Config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
/**
* Configured via {@link org.apache.kafka.clients.consumer.ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
*/
public class JacksonDeserializer<T> implements Deserializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> type;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.type = (Class<T>) configs.get(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS);
}
@Override
public T deserialize(String topic, byte[] bytes) {
if (bytes == null) {
return null;
}
try {
return objectMapper.readValue(bytes, type);
} catch (Exception e) {
throw new RuntimeException("Error deserializing value", e);
}
}
}

View File

@ -0,0 +1,24 @@
package com.baeldung.kafka.message.ordering.serialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
/**
* Configured via {@link org.apache.kafka.clients.producer.ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG}
*/
public class JacksonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, T data) {
if (data == null) {
return null;
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new RuntimeException("Error serializing value", e);
}
}
}

View File

@ -0,0 +1,126 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.google.common.collect.ImmutableList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@Testcontainers
public class ExternalSequenceWithTimeWindowLiveTest {
private static Admin admin;
private static KafkaProducer<Long, UserEvent> producer;
private static KafkaConsumer<Long, UserEvent> consumer;
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofSeconds(5);
private static final long BUFFER_PERIOD_NS = Duration.ofSeconds(5)
.toNanos();
private static Logger logger = LoggerFactory.getLogger(ExternalSequenceWithTimeWindowLiveTest.class);
@Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@BeforeAll
static void setup() throws ExecutionException, InterruptedException {
KAFKA_CONTAINER.addExposedPort(9092);
Properties adminProperties = new Properties();
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
admin = Admin.create(adminProperties);
producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties);
admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR)))
.all()
.get();
}
@AfterAll
static void destroy() {
KAFKA_CONTAINER.stop();
}
@Test
void givenMultiplePartitions_whenPublishedToKafkaAndConsumedWithExtSeqNumberAndTimeWindow_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<UserEvent> sentUserEventList = new ArrayList<>();
List<UserEvent> receivedUserEventList = new ArrayList<>();
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID()
.toString());
userEvent.setEventNanoTime(System.nanoTime());
userEvent.setGlobalSequenceNumber(sequenceNumber);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
List<UserEvent> buffer = new ArrayList<>();
long lastProcessedTime = System.nanoTime();
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
while (!buffer.isEmpty()) {
if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_NS) {
processBuffer(buffer, receivedUserEventList);
lastProcessedTime = System.nanoTime();
}
records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
}
assertThat(receivedUserEventList).isEqualTo(sentUserEventList)
.containsExactlyElementsOf(sentUserEventList);
}
private static void processBuffer(List<UserEvent> buffer, List<UserEvent> receivedUserEventList) {
Collections.sort(buffer);
buffer.forEach(userEvent -> {
receivedUserEventList.add(userEvent);
logger.info("Processing message with Global Sequence number: " + userEvent.getGlobalSequenceNumber() + ", User Event Id: " + userEvent.getUserEventId());
});
buffer.clear();
}
}

View File

@ -0,0 +1,105 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.google.common.collect.ImmutableList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@Testcontainers
public class MultiplePartitionLiveTest {
private static Admin admin;
private static KafkaProducer<Long, UserEvent> producer;
private static KafkaConsumer<Long, UserEvent> consumer;
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofSeconds(5);
private static Logger logger = LoggerFactory.getLogger(MultiplePartitionLiveTest.class);
@Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@BeforeAll
static void setup() throws ExecutionException, InterruptedException {
KAFKA_CONTAINER.addExposedPort(9092);
Properties adminProperties = new Properties();
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
admin = Admin.create(adminProperties);
producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties);
admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR)))
.all()
.get();
}
@AfterAll
static void destroy() {
KAFKA_CONTAINER.stop();
}
@Test
void givenMultiplePartitions_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<UserEvent> sentUserEventList = new ArrayList<>();
List<UserEvent> receivedUserEventList = new ArrayList<>();
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID()
.toString());
userEvent.setGlobalSequenceNumber(sequenceNumber);
userEvent.setEventNanoTime(System.nanoTime());
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
UserEvent userEvent = record.value();
receivedUserEventList.add(userEvent);
logger.info("User Event ID: " + userEvent.getUserEventId());
});
assertThat(receivedUserEventList).isNotEqualTo(sentUserEventList)
.containsExactlyInAnyOrderElementsOf(sentUserEventList);
}
}

View File

@ -0,0 +1,109 @@
package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.google.common.collect.ImmutableList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@Testcontainers
public class SinglePartitionLiveTest {
private static Admin admin;
private static KafkaProducer<Long, UserEvent> producer;
private static KafkaConsumer<Long, UserEvent> consumer;
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofSeconds(5);
private static Logger logger = LoggerFactory.getLogger(SinglePartitionLiveTest.class);
@Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@BeforeAll
static void setup() throws ExecutionException, InterruptedException {
KAFKA_CONTAINER.addExposedPort(9092);
Properties adminProperties = new Properties();
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
admin = Admin.create(adminProperties);
producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties);
admin.createTopics(ImmutableList.of(new NewTopic(Config.SINGLE_PARTITION_TOPIC, Config.SINGLE_PARTITION, Config.REPLICATION_FACTOR)))
.all()
.get();
}
@AfterAll
static void destroy() {
KAFKA_CONTAINER.stop();
}
@Test
void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<UserEvent> sentUserEventList = new ArrayList<>();
List<UserEvent> receivedUserEventList = new ArrayList<>();
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID()
.toString());
userEvent.setGlobalSequenceNumber(sequenceNumber);
userEvent.setEventNanoTime(System.nanoTime());
ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent);
Future<RecordMetadata> future = producer.send(producerRecord);
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
consumer.subscribe(Collections.singletonList(Config.SINGLE_PARTITION_TOPIC));
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
UserEvent userEvent = record.value();
receivedUserEventList.add(userEvent);
logger.info("User Event ID: " + userEvent.getUserEventId());
});
assertThat(receivedUserEventList).isEqualTo(sentUserEventList)
.containsExactlyElementsOf(sentUserEventList);
}
}

View File

@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>