diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/UserEvent.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/UserEvent.java index 676b469ce8..99e0cc6c7e 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/UserEvent.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/UserEvent.java @@ -1,13 +1,14 @@ package com.baeldung.kafka.message.ordering.payload; import java.util.Objects; + public class UserEvent implements Comparable { private String userEventId; private long eventNanoTime; private long globalSequenceNumber; @SuppressWarnings("unused") - public UserEvent(){ + public UserEvent() { // Required for Jackson Serialization and Deserialization } diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java index 4868ecaf2e..cf72ab12df 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java @@ -1,6 +1,8 @@ package com.baeldung.kafka.message.ordering.serialization; + import com.baeldung.kafka.message.ordering.Config; import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; @@ -12,7 +14,6 @@ public class JacksonDeserializer implements Deserializer { private final ObjectMapper objectMapper = new ObjectMapper(); private Class type; - @Override public void configure(Map configs, boolean isKey) { this.type = (Class) configs.get(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS); diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java index 4c081de3cc..b2ace3b8ed 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java @@ -1,6 +1,7 @@ package com.baeldung.kafka.message.ordering.serialization; import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.kafka.common.serialization.Serializer; /** diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/ExternalSequenceWithTimeWindowIntegrationTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/ExternalSequenceWithTimeWindowIntegrationTest.java index a5ec7a98a3..caffe12620 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/ExternalSequenceWithTimeWindowIntegrationTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/ExternalSequenceWithTimeWindowIntegrationTest.java @@ -4,6 +4,7 @@ import com.baeldung.kafka.headers.KafkaMessageHeaders; import com.baeldung.kafka.message.ordering.payload.UserEvent; import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer; import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer; + import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -23,11 +24,14 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; + import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; + import com.google.common.collect.ImmutableList; + import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; @Testcontainers @@ -37,8 +41,8 @@ public class ExternalSequenceWithTimeWindowIntegrationTest { private static KafkaProducer producer; private static KafkaConsumer consumer; private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofSeconds(5); - private static final long BUFFER_PERIOD_NS = Duration.ofSeconds(5).toNanos(); - + private static final long BUFFER_PERIOD_NS = Duration.ofSeconds(5) + .toNanos(); private static Logger logger = LoggerFactory.getLogger(ExternalSequenceWithTimeWindowIntegrationTest.class); @Container @@ -66,7 +70,9 @@ public class ExternalSequenceWithTimeWindowIntegrationTest { admin = Admin.create(adminProperties); producer = new KafkaProducer<>(producerProperties); consumer = new KafkaConsumer<>(consumerProperties); - admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR))).all().get(); + admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR))) + .all() + .get(); } @AfterAll @@ -78,8 +84,9 @@ public class ExternalSequenceWithTimeWindowIntegrationTest { void givenMultiplePartitions_whenPublishedToKafkaAndConsumedWithExtSeqNumberAndTimeWindow_thenCheckForMessageOrder() throws ExecutionException, InterruptedException { List sentUserEventList = new ArrayList<>(); List receivedUserEventList = new ArrayList<>(); - for (long sequenceNumber = 1; sequenceNumber <= 10 ; sequenceNumber++) { - UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); + for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) { + UserEvent userEvent = new UserEvent(UUID.randomUUID() + .toString()); userEvent.setEventNanoTime(System.nanoTime()); userEvent.setGlobalSequenceNumber(sequenceNumber); Future future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent)); @@ -105,9 +112,8 @@ public class ExternalSequenceWithTimeWindowIntegrationTest { buffer.add(record.value()); }); } - assertThat(receivedUserEventList) - .isEqualTo(sentUserEventList) - .containsExactlyElementsOf(sentUserEventList); + assertThat(receivedUserEventList).isEqualTo(sentUserEventList) + .containsExactlyElementsOf(sentUserEventList); } private static void processBuffer(List buffer, List receivedUserEventList) { diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionIntegrationTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionIntegrationTest.java index adfa9a0399..bb25486f00 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionIntegrationTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionIntegrationTest.java @@ -3,6 +3,7 @@ package com.baeldung.kafka.message.ordering; import com.baeldung.kafka.message.ordering.payload.UserEvent; import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer; import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer; + import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -22,11 +23,14 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; + import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; + import com.google.common.collect.ImmutableList; + import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; @Testcontainers @@ -63,7 +67,9 @@ public class MultiplePartitionIntegrationTest { admin = Admin.create(adminProperties); producer = new KafkaProducer<>(producerProperties); consumer = new KafkaConsumer<>(consumerProperties); - admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR))).all().get(); + admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR))) + .all() + .get(); } @AfterAll @@ -76,7 +82,8 @@ public class MultiplePartitionIntegrationTest { List sentUserEventList = new ArrayList<>(); List receivedUserEventList = new ArrayList<>(); for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) { - UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); + UserEvent userEvent = new UserEvent(UUID.randomUUID() + .toString()); userEvent.setGlobalSequenceNumber(sequenceNumber); userEvent.setEventNanoTime(System.nanoTime()); Future future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent)); @@ -92,8 +99,7 @@ public class MultiplePartitionIntegrationTest { receivedUserEventList.add(userEvent); logger.info("User Event ID: " + userEvent.getUserEventId()); }); - assertThat(receivedUserEventList) - .isNotEqualTo(sentUserEventList) + assertThat(receivedUserEventList).isNotEqualTo(sentUserEventList) .containsExactlyInAnyOrderElementsOf(sentUserEventList); } } diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionIntegrationTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionIntegrationTest.java index 7280a1218a..8656df1bf3 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionIntegrationTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionIntegrationTest.java @@ -3,6 +3,7 @@ package com.baeldung.kafka.message.ordering; import com.baeldung.kafka.message.ordering.payload.UserEvent; import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer; import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer; + import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; @@ -29,7 +30,9 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; + import com.google.common.collect.ImmutableList; + import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; @Testcontainers @@ -56,7 +59,6 @@ public class SinglePartitionIntegrationTest { producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); - producer = new KafkaProducer<>(producerProperties); Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); @@ -65,11 +67,12 @@ public class SinglePartitionIntegrationTest { consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); - consumer = new KafkaConsumer<>(consumerProperties); admin = Admin.create(adminProperties); - - - admin.createTopics(ImmutableList.of(new NewTopic(Config.SINGLE_PARTITION_TOPIC, Config.SINGLE_PARTITION, Config.REPLICATION_FACTOR))).all().get(); + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + admin.createTopics(ImmutableList.of(new NewTopic(Config.SINGLE_PARTITION_TOPIC, Config.SINGLE_PARTITION, Config.REPLICATION_FACTOR))) + .all() + .get(); } @AfterAll @@ -82,7 +85,8 @@ public class SinglePartitionIntegrationTest { List sentUserEventList = new ArrayList<>(); List receivedUserEventList = new ArrayList<>(); for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) { - UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); + UserEvent userEvent = new UserEvent(UUID.randomUUID() + .toString()); userEvent.setGlobalSequenceNumber(sequenceNumber); userEvent.setEventNanoTime(System.nanoTime()); ProducerRecord producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent); @@ -99,8 +103,7 @@ public class SinglePartitionIntegrationTest { receivedUserEventList.add(userEvent); logger.info("User Event ID: " + userEvent.getUserEventId()); }); - assertThat(receivedUserEventList) - .isEqualTo(sentUserEventList) + assertThat(receivedUserEventList).isEqualTo(sentUserEventList) .containsExactlyElementsOf(sentUserEventList); } }