Fix the whitespace and newline issues - wuth code formatter

This commit is contained in:
Amol Gote 2023-11-05 21:32:49 -05:00
parent 80cd71fff3
commit 5d3f08d0e9
6 changed files with 40 additions and 22 deletions

View File

@ -1,13 +1,14 @@
package com.baeldung.kafka.message.ordering.payload; package com.baeldung.kafka.message.ordering.payload;
import java.util.Objects; import java.util.Objects;
public class UserEvent implements Comparable<UserEvent> { public class UserEvent implements Comparable<UserEvent> {
private String userEventId; private String userEventId;
private long eventNanoTime; private long eventNanoTime;
private long globalSequenceNumber; private long globalSequenceNumber;
@SuppressWarnings("unused") @SuppressWarnings("unused")
public UserEvent(){ public UserEvent() {
// Required for Jackson Serialization and Deserialization // Required for Jackson Serialization and Deserialization
} }

View File

@ -1,6 +1,8 @@
package com.baeldung.kafka.message.ordering.serialization; package com.baeldung.kafka.message.ordering.serialization;
import com.baeldung.kafka.message.ordering.Config; import com.baeldung.kafka.message.ordering.Config;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map; import java.util.Map;
@ -12,7 +14,6 @@ public class JacksonDeserializer<T> implements Deserializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper(); private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> type; private Class<T> type;
@Override @Override
public void configure(Map<String, ?> configs, boolean isKey) { public void configure(Map<String, ?> configs, boolean isKey) {
this.type = (Class<T>) configs.get(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS); this.type = (Class<T>) configs.get(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS);

View File

@ -1,6 +1,7 @@
package com.baeldung.kafka.message.ordering.serialization; package com.baeldung.kafka.message.ordering.serialization;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
/** /**

View File

@ -4,6 +4,7 @@ import com.baeldung.kafka.headers.KafkaMessageHeaders;
import com.baeldung.kafka.message.ordering.payload.UserEvent; import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer; import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer; import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -23,11 +24,14 @@ import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerImageName;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@Testcontainers @Testcontainers
@ -37,8 +41,8 @@ public class ExternalSequenceWithTimeWindowIntegrationTest {
private static KafkaProducer<Long, UserEvent> producer; private static KafkaProducer<Long, UserEvent> producer;
private static KafkaConsumer<Long, UserEvent> consumer; private static KafkaConsumer<Long, UserEvent> consumer;
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofSeconds(5); private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofSeconds(5);
private static final long BUFFER_PERIOD_NS = Duration.ofSeconds(5).toNanos(); private static final long BUFFER_PERIOD_NS = Duration.ofSeconds(5)
.toNanos();
private static Logger logger = LoggerFactory.getLogger(ExternalSequenceWithTimeWindowIntegrationTest.class); private static Logger logger = LoggerFactory.getLogger(ExternalSequenceWithTimeWindowIntegrationTest.class);
@Container @Container
@ -66,7 +70,9 @@ public class ExternalSequenceWithTimeWindowIntegrationTest {
admin = Admin.create(adminProperties); admin = Admin.create(adminProperties);
producer = new KafkaProducer<>(producerProperties); producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties); consumer = new KafkaConsumer<>(consumerProperties);
admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR))).all().get(); admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR)))
.all()
.get();
} }
@AfterAll @AfterAll
@ -78,8 +84,9 @@ public class ExternalSequenceWithTimeWindowIntegrationTest {
void givenMultiplePartitions_whenPublishedToKafkaAndConsumedWithExtSeqNumberAndTimeWindow_thenCheckForMessageOrder() throws ExecutionException, InterruptedException { void givenMultiplePartitions_whenPublishedToKafkaAndConsumedWithExtSeqNumberAndTimeWindow_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
List<UserEvent> sentUserEventList = new ArrayList<>(); List<UserEvent> sentUserEventList = new ArrayList<>();
List<UserEvent> receivedUserEventList = new ArrayList<>(); List<UserEvent> receivedUserEventList = new ArrayList<>();
for (long sequenceNumber = 1; sequenceNumber <= 10 ; sequenceNumber++) { for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); UserEvent userEvent = new UserEvent(UUID.randomUUID()
.toString());
userEvent.setEventNanoTime(System.nanoTime()); userEvent.setEventNanoTime(System.nanoTime());
userEvent.setGlobalSequenceNumber(sequenceNumber); userEvent.setGlobalSequenceNumber(sequenceNumber);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent)); Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
@ -105,9 +112,8 @@ public class ExternalSequenceWithTimeWindowIntegrationTest {
buffer.add(record.value()); buffer.add(record.value());
}); });
} }
assertThat(receivedUserEventList) assertThat(receivedUserEventList).isEqualTo(sentUserEventList)
.isEqualTo(sentUserEventList) .containsExactlyElementsOf(sentUserEventList);
.containsExactlyElementsOf(sentUserEventList);
} }
private static void processBuffer(List<UserEvent> buffer, List<UserEvent> receivedUserEventList) { private static void processBuffer(List<UserEvent> buffer, List<UserEvent> receivedUserEventList) {

View File

@ -3,6 +3,7 @@ package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.UserEvent; import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer; import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer; import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -22,11 +23,14 @@ import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerImageName;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@Testcontainers @Testcontainers
@ -63,7 +67,9 @@ public class MultiplePartitionIntegrationTest {
admin = Admin.create(adminProperties); admin = Admin.create(adminProperties);
producer = new KafkaProducer<>(producerProperties); producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties); consumer = new KafkaConsumer<>(consumerProperties);
admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR))).all().get(); admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR)))
.all()
.get();
} }
@AfterAll @AfterAll
@ -76,7 +82,8 @@ public class MultiplePartitionIntegrationTest {
List<UserEvent> sentUserEventList = new ArrayList<>(); List<UserEvent> sentUserEventList = new ArrayList<>();
List<UserEvent> receivedUserEventList = new ArrayList<>(); List<UserEvent> receivedUserEventList = new ArrayList<>();
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) { for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); UserEvent userEvent = new UserEvent(UUID.randomUUID()
.toString());
userEvent.setGlobalSequenceNumber(sequenceNumber); userEvent.setGlobalSequenceNumber(sequenceNumber);
userEvent.setEventNanoTime(System.nanoTime()); userEvent.setEventNanoTime(System.nanoTime());
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent)); Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
@ -92,8 +99,7 @@ public class MultiplePartitionIntegrationTest {
receivedUserEventList.add(userEvent); receivedUserEventList.add(userEvent);
logger.info("User Event ID: " + userEvent.getUserEventId()); logger.info("User Event ID: " + userEvent.getUserEventId());
}); });
assertThat(receivedUserEventList) assertThat(receivedUserEventList).isNotEqualTo(sentUserEventList)
.isNotEqualTo(sentUserEventList)
.containsExactlyInAnyOrderElementsOf(sentUserEventList); .containsExactlyInAnyOrderElementsOf(sentUserEventList);
} }
} }

View File

@ -3,6 +3,7 @@ package com.baeldung.kafka.message.ordering;
import com.baeldung.kafka.message.ordering.payload.UserEvent; import com.baeldung.kafka.message.ordering.payload.UserEvent;
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer; import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer; import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
@ -29,7 +30,9 @@ import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@Testcontainers @Testcontainers
@ -56,7 +59,6 @@ public class SinglePartitionIntegrationTest {
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
producer = new KafkaProducer<>(producerProperties);
Properties consumerProperties = new Properties(); Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
@ -65,11 +67,12 @@ public class SinglePartitionIntegrationTest {
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class); consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumer = new KafkaConsumer<>(consumerProperties);
admin = Admin.create(adminProperties); admin = Admin.create(adminProperties);
producer = new KafkaProducer<>(producerProperties);
consumer = new KafkaConsumer<>(consumerProperties);
admin.createTopics(ImmutableList.of(new NewTopic(Config.SINGLE_PARTITION_TOPIC, Config.SINGLE_PARTITION, Config.REPLICATION_FACTOR))).all().get(); admin.createTopics(ImmutableList.of(new NewTopic(Config.SINGLE_PARTITION_TOPIC, Config.SINGLE_PARTITION, Config.REPLICATION_FACTOR)))
.all()
.get();
} }
@AfterAll @AfterAll
@ -82,7 +85,8 @@ public class SinglePartitionIntegrationTest {
List<UserEvent> sentUserEventList = new ArrayList<>(); List<UserEvent> sentUserEventList = new ArrayList<>();
List<UserEvent> receivedUserEventList = new ArrayList<>(); List<UserEvent> receivedUserEventList = new ArrayList<>();
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) { for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); UserEvent userEvent = new UserEvent(UUID.randomUUID()
.toString());
userEvent.setGlobalSequenceNumber(sequenceNumber); userEvent.setGlobalSequenceNumber(sequenceNumber);
userEvent.setEventNanoTime(System.nanoTime()); userEvent.setEventNanoTime(System.nanoTime());
ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent); ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent);
@ -99,8 +103,7 @@ public class SinglePartitionIntegrationTest {
receivedUserEventList.add(userEvent); receivedUserEventList.add(userEvent);
logger.info("User Event ID: " + userEvent.getUserEventId()); logger.info("User Event ID: " + userEvent.getUserEventId());
}); });
assertThat(receivedUserEventList) assertThat(receivedUserEventList).isEqualTo(sentUserEventList)
.isEqualTo(sentUserEventList)
.containsExactlyElementsOf(sentUserEventList); .containsExactlyElementsOf(sentUserEventList);
} }
} }