Merge remote-tracking branch 'upstream/master' into feature/BAEL-7062-GetIndex

This commit is contained in:
Niket Agrawal 2023-10-07 23:25:36 +05:30
commit 64e46d7e95
28 changed files with 967 additions and 13 deletions

View File

@ -0,0 +1,103 @@
package com.baeldung.kafka.multipletopics;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
// This live test needs a Docker Daemon running so that a kafka container can be created
@Testcontainers
public class MultipleTopicsLiveTest {
private final Logger log = LoggerFactory.getLogger(MultipleTopicsLiveTest.class);
private static final String CARD_PAYMENTS_TOPIC = "card-payments";
private static final String BANK_TRANSFERS_TOPIC = "bank-transfers";
private static KafkaProducer<String, String> producer;
private static KafkaConsumer<String, String> consumer;
@Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@BeforeAll
static void setup() {
KAFKA_CONTAINER.addExposedPort(9092);
producer = new KafkaProducer<>(getProducerProperties());
consumer = new KafkaConsumer<>(getConsumerProperties());
}
@AfterAll
static void destroy() {
KAFKA_CONTAINER.stop();
}
private static Properties getProducerProperties() {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return producerProperties;
}
private static Properties getConsumerProperties() {
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "payments");
return consumerProperties;
}
@Test
void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
publishMessages();
consumer.subscribe(Arrays.asList(CARD_PAYMENTS_TOPIC, BANK_TRANSFERS_TOPIC));
int eventsProcessed = 0;
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofSeconds(10))) {
log.info("Event on topic={}, payload={}", record.topic(), record.value());
eventsProcessed++;
}
assertThat(eventsProcessed).isEqualTo(2);
}
private void publishMessages() throws ExecutionException, InterruptedException {
ProducerRecord<String, String> cardPayment = new ProducerRecord<>(CARD_PAYMENTS_TOPIC, createCardPayment());
producer.send(cardPayment).get();
ProducerRecord<String, String> bankTransfer = new ProducerRecord<>(BANK_TRANSFERS_TOPIC, createBankTransfer());
producer.send(bankTransfer).get();
}
private String createCardPayment() {
return "{\"paymentReference\":\"A184028KM0013790\", \"type\":\"card\", \"amount\":\"275\", \"currency\":\"GBP\"}";
}
private String createBankTransfer() {
return "{\"paymentReference\":\"19ae2-18mk73-009\", \"type\":\"bank\", \"amount\":\"150\", \"currency\":\"EUR\"}";
}
}

View File

@ -0,0 +1 @@
## Relevant Articles

View File

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>core-java-collections-maps-7</artifactId>
<name>core-java-collections-maps-7</name>
<packaging>jar</packaging>
<parent>
<artifactId>core-java-modules</artifactId>
<groupId>com.baeldung.core-java-modules</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<properties>
<spring.version>5.2.5.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.4</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.36</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20230227</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,60 @@
package com.baeldung.map;
import java.util.HashMap;
import java.util.Map;
public class ConvertHashMapStringToHashMapObjectUsingtoString {
public String name;
public int age;
public ConvertHashMapStringToHashMapObjectUsingtoString(String name, int age) {
this.name = name;
this.age = age;
}
public static ConvertHashMapStringToHashMapObjectUsingtoString deserializeCustomObject(String valueString) {
if (valueString.startsWith("{") && valueString.endsWith("}")) {
valueString = valueString.substring(1, valueString.length() - 1);
String[] parts = valueString.split(",");
String name = null;
int age = -1;
for (String part : parts) {
String[] keyValue = part.split("=");
if (keyValue.length == 2) {
String key = keyValue[0].trim();
String val = keyValue[1].trim();
if (key.equals("name")) {
name = val;
} else if (key.equals("age")) {
age = Integer.parseInt(val);
}
}
}
if (name != null && age >= 0) {
return new ConvertHashMapStringToHashMapObjectUsingtoString(name, age);
}
}
return new ConvertHashMapStringToHashMapObjectUsingtoString("", -1);
}
public static void main(String[] args) {
String hashMapString = "{key1={name=John, age=30}, key2={name=Alice, age=25}}";
String keyValuePairs = hashMapString.replaceAll("[{}\\s]", "");
String[] pairs = keyValuePairs.split(",");
Map<String, ConvertHashMapStringToHashMapObjectUsingtoString> actualHashMap = new HashMap<>();
for (String pair : pairs) {
String[] keyValue = pair.split("=");
if (keyValue.length == 2) {
String key = keyValue[0];
ConvertHashMapStringToHashMapObjectUsingtoString value = deserializeCustomObject(keyValue[1]);
actualHashMap.put(key, value);
}
}
System.out.println(actualHashMap);
}
@Override
public String toString() {
return "{name=" + name + ", age=" + age + "}";
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.map;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
class ConvertHashMapStringToHashMapObjectUsingtoStringUnitTest {
@Test
void givenValidCustomObject_whenSerializing_thenSerializedStringIsCorrect() {
ConvertHashMapStringToHashMapObjectUsingtoString customObject = new ConvertHashMapStringToHashMapObjectUsingtoString("John", 30);
String expectedSerializedString = "{name=John, age=30}";
assertEquals(expectedSerializedString, customObject.toString());
}
@Test
void givenValidSerializedString_whenDeserializing_thenCustomObjectIsCorrect() {
String serializedString = "{name=Alice, age=25}";
ConvertHashMapStringToHashMapObjectUsingtoString customObject = ConvertHashMapStringToHashMapObjectUsingtoString.deserializeCustomObject(serializedString);
assertEquals("Alice", customObject.name);
assertEquals(25, customObject.age);
}
@Test
void givenInvalidSerializedString_whenDeserializing_thenDefaultCustomObjectIsCreated() {
String invalidSerializedString = "{invalidString}";
ConvertHashMapStringToHashMapObjectUsingtoString customObject = ConvertHashMapStringToHashMapObjectUsingtoString.deserializeCustomObject(invalidSerializedString);
assertEquals("", customObject.name);
assertEquals(-1, customObject.age);
}
}

View File

@ -44,19 +44,9 @@
<version>${vavr.version}</version>
</dependency>
<dependency>
<groupId>com.codepoetics</groupId>
<artifactId>protonpack</artifactId>
<version>1.16</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>one.util</groupId>
<artifactId>streamex</artifactId>
<version>0.8.1</version>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
@ -87,6 +77,7 @@
<maven.compiler.source>12</maven.compiler.source>
<maven.compiler.target>12</maven.compiler.target>
<vavr.version>0.10.2</vavr.version>
<guava.version>32.1.2-jre</guava.version>
</properties>
</project>

View File

@ -0,0 +1,90 @@
package com.baeldung.streams.partitioning;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.collect.Iterables;
public class PartitionStream {
public static <T> Stream<List<T>> partitionList(List<T> source, int batchSize) {
if (batchSize <= 0) {
throw new IllegalArgumentException(String.format("Expected the batchSize to be greater than ZERO, actual value was: %s", batchSize));
}
if (source.isEmpty()) {
return Stream.empty();
}
int nrOfFullBatches = (source.size() - 1) / batchSize;
return IntStream.rangeClosed(0, nrOfFullBatches)
.mapToObj(batch -> {
int startIndex = batch * batchSize;
int endIndex = (batch == nrOfFullBatches) ? source.size() : (batch + 1) * batchSize;
return source.subList(startIndex, endIndex);
});
}
public static <T> Iterable<List<T>> partitionUsingGuava(Stream<T> source, int batchSize) {
return Iterables.partition(source::iterator, batchSize);
}
public static <T> List<List<T>> partitionStream(Stream<T> source, int batchSize) {
return source.collect(partitionBySize(batchSize, Collectors.toList()));
}
public static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List<T>, A, R> downstream) {
Supplier<Accumulator<T, A>> supplier = () -> new Accumulator<>(
batchSize,
downstream.supplier().get(),
downstream.accumulator()::accept
);
BiConsumer<Accumulator<T, A>, T> accumulator = (acc, value) -> acc.add(value);
BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> acc1.combine(acc2, downstream.combiner());
Function<Accumulator<T, A>, R> finisher = acc -> {
if (!acc.values.isEmpty()) {
downstream.accumulator().accept(acc.downstreamAccumulator, acc.values);
}
return downstream.finisher().apply(acc.downstreamAccumulator);
};
return Collector.of(supplier, accumulator, combiner, finisher, Collector.Characteristics.UNORDERED);
}
static class Accumulator<T, A> {
private final List<T> values = new ArrayList<>();
private final int batchSize;
private A downstreamAccumulator;
private final BiConsumer<A, List<T>> batchFullListener;
Accumulator(int batchSize, A accumulator, BiConsumer<A, List<T>> onBatchFull) {
this.batchSize = batchSize;
this.downstreamAccumulator = accumulator;
this.batchFullListener = onBatchFull;
}
void add(T value) {
values.add(value);
if (values.size() == batchSize) {
batchFullListener.accept(downstreamAccumulator, new ArrayList<>(values));
values.clear();
}
}
Accumulator<T, A> combine(Accumulator<T, A> other, BinaryOperator<A> accumulatorCombiner) {
this.downstreamAccumulator = accumulatorCombiner.apply(downstreamAccumulator, other.downstreamAccumulator);
other.values.forEach(this::add);
return this;
}
}
}

View File

@ -0,0 +1,82 @@
package com.baeldung.partitioning;
import static com.baeldung.streams.partitioning.PartitionStream.partitionList;
import static com.baeldung.streams.partitioning.PartitionStream.partitionStream;
import static com.baeldung.streams.partitioning.PartitionStream.partitionUsingGuava;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.atIndex;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
public class PartitionStreamsUnitTest {
@Test
void whenPartitionList_thenReturnThreeSubLists() {
List<Integer> source = List.of(1, 2, 3, 4, 5, 6, 7, 8);
Stream<List<Integer>> result = partitionList(source, 3);
assertThat(result)
.containsExactlyInAnyOrder(
List.of(1, 2, 3),
List.of(4, 5, 6),
List.of(7, 8)
);
}
@Test
void whenPartitionEmptyList_thenReturnEmptyStream() {
Stream<List<Integer>> result = partitionList(Collections.emptyList(), 3);
assertThat(result).isEmpty();
}
@Test
void whenPartitionListWithNegativeBatchSize_thenThrowException() {
assertThatThrownBy(() -> partitionList(List.of(1,2,3), -1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Expected the batchSize to be greater than ZERO, actual value was: -1");
}
@Test
void whenPartitionParallelStream_thenReturnThreeSubLists() {
Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();
List<List<Integer>> result = partitionStream(source, 3);
assertThat(result)
.hasSize(3)
.satisfies(batch -> assertThat(batch).hasSize(3), atIndex(0))
.satisfies(batch -> assertThat(batch).hasSize(3), atIndex(1))
.satisfies(batch -> assertThat(batch).hasSize(2), atIndex(2));
}
@Test
void whenPartitionEmptyParallelStream_thenReturnEmptyList() {
Stream<Integer> source = Stream.<Integer>empty().parallel();
List<List<Integer>> result = partitionStream(source, 3);
assertThat(result).isEmpty();
}
@Test
void whenPartitionParallelStreamWithGuava_thenReturnThreeSubLists() {
Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();
Iterable<List<Integer>> result = partitionUsingGuava(source, 3);
assertThat(result)
.map(ArrayList::new)
.hasSize(3)
.satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(0))
.satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(1))
.satisfies(batch -> assertThat(batch).asList().hasSize(2), atIndex(2));
}
}

View File

@ -28,12 +28,18 @@
<artifactId>commons-vfs2</artifactId>
<version>${commons-vfs2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>${apache-commons-text.version}</version>
</dependency>
</dependencies>
<properties>
<commons-compress.version>1.23.0</commons-compress.version>
<ant.version>1.10.13</ant.version>
<commons-vfs2.version>2.9.0</commons-vfs2.version>
<apache-commons-text.version>1.10.0</apache-commons-text.version>
</properties>
</project>

View File

@ -0,0 +1,29 @@
package com.baeldung.commons.convertunicode;
import org.apache.commons.text.StringEscapeUtils;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class UnicodeConverterUtil {
public static String decodeWithApacheCommons(String input) {
return StringEscapeUtils.unescapeJava(input);
}
public static String decodeWithPlainJava(String input) {
Pattern pattern = Pattern.compile("\\\\u[0-9a-fA-F]{4}");
Matcher matcher = pattern.matcher(input);
StringBuilder decodedString = new StringBuilder();
while (matcher.find()) {
String unicodeSequence = matcher.group();
char unicodeChar = (char) Integer.parseInt(unicodeSequence.substring(2), 16);
matcher.appendReplacement(decodedString, Character.toString(unicodeChar));
}
matcher.appendTail(decodedString);
return decodedString.toString();
}
}

View File

@ -0,0 +1,39 @@
package com.baeldung.commons.convertunicode;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class UnicodeConverterUnitTest {
@Test
public void whenInputHaveUnicodeSequences_ThenDecode() {
String encodedString = "\\u0048\\u0065\\u006C\\u006C\\u006F World";
String expectedDecodedString = "Hello World";
assertEquals(expectedDecodedString, UnicodeConverterUtil.decodeWithApacheCommons(encodedString));
assertEquals(expectedDecodedString, UnicodeConverterUtil.decodeWithPlainJava(encodedString));
}
@Test
public void whenInputHaveNoUnicodeSequences_ThenDoNothing() {
String inputString = "Hello World";
assertEquals(inputString, UnicodeConverterUtil.decodeWithApacheCommons(inputString));
assertEquals(inputString, UnicodeConverterUtil.decodeWithPlainJava(inputString));
}
@Test
public void whenInputHaveUnicodeSequencesInMiddle_ThenDecode() {
String encodedString = "This is a test \\u0069\\u006E the middle.";
String expectedDecodedString = "This is a test in the middle.";
assertEquals(expectedDecodedString, UnicodeConverterUtil.decodeWithApacheCommons(encodedString));
assertEquals(expectedDecodedString, UnicodeConverterUtil.decodeWithPlainJava(encodedString));
}
@Test
public void whenInputHaveMultipleUnicodeSequences_ThenDecode() {
String encodedString = "Unicode: \\u0048\\u0065\\u006C\\u006C\\u006F \\u0057\\u006F\\u0072\\u006C\\u0064";
String expectedDecodedString = "Unicode: Hello World";
assertEquals(expectedDecodedString, UnicodeConverterUtil.decodeWithApacheCommons(encodedString));
assertEquals(expectedDecodedString, UnicodeConverterUtil.decodeWithPlainJava(encodedString));
}
}

View File

@ -5,6 +5,18 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>libraries-io</artifactId>
<name>libraries-io</name>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
</plugins>
</build>
<parent>
<groupId>com.baeldung</groupId>
@ -34,6 +46,11 @@
<artifactId>zip4j</artifactId>
<version>${zip4j.version}</version>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>${opencsv.version}</version>
</dependency>
</dependencies>
<properties>
@ -42,6 +59,10 @@
<sshj.version>0.27.0</sshj.version>
<vfs.version>2.4</vfs.version>
<zip4j.version>2.9.0</zip4j.version>
<opencsv.version>5.7.1</opencsv.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -0,0 +1,4 @@
package com.baeldung.java.io.pojotocsv;
public record Application(String id, String name, Integer age, String created_at) {
}

View File

@ -0,0 +1,8 @@
package com.baeldung.java.io.pojotocsv;
import com.opencsv.bean.CsvBindByName;
import com.opencsv.bean.CsvBindByPosition;
public record ApplicationWithAnnotation(@CsvBindByName(column = "id", required = true) @CsvBindByPosition(position = 1) String id, @CsvBindByName(column = "name", required = true) @CsvBindByPosition(position = 0) String name,
@CsvBindByName(column = "age", required = true) @CsvBindByPosition(position = 2) Integer age, @CsvBindByName(column = "position", required = true) @CsvBindByPosition(position = 3) String created_at) {
}

View File

@ -0,0 +1,55 @@
package com.baeldung.java.io.pojotocsv;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import com.opencsv.CSVWriter;
import com.opencsv.bean.StatefulBeanToCsvBuilder;
import com.opencsv.exceptions.CsvDataTypeMismatchException;
import com.opencsv.exceptions.CsvRequiredFieldEmptyException;
public class BeanToCsv {
public void beanToCSVWithDefault(List<Application> applications) throws Exception {
try (FileWriter writer = new FileWriter("src/main/resources/application.csv")) {
var builder = new StatefulBeanToCsvBuilder<Application>(writer).withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
.withSeparator(',')
.build();
builder.write(applications);
}
}
public void beanToCSVWithCustomHeaderStrategy(List<Application> applications) throws IOException, CsvRequiredFieldEmptyException, CsvDataTypeMismatchException {
try (FileWriter writer = new FileWriter("src/main/resources/application2.csv")) {
var mappingStrategy = new CustomHeaderStrategy<Application>();
mappingStrategy.setType(Application.class);
var builder = new StatefulBeanToCsvBuilder<Application>(writer).withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
.withMappingStrategy(mappingStrategy)
.build();
builder.write(applications);
}
}
public void beanToCSVWithCustomPositionStrategy(List<ApplicationWithAnnotation> applications) throws Exception {
try (FileWriter writer = new FileWriter("src/main/resources/application3.csv")) {
var builder = new StatefulBeanToCsvBuilder<ApplicationWithAnnotation>(writer).withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
.build();
builder.write(applications);
}
}
public void beanToCSVWithCustomHeaderAndPositionStrategy(List<ApplicationWithAnnotation> applications) throws IOException, CsvRequiredFieldEmptyException, CsvDataTypeMismatchException {
try (FileWriter writer = new FileWriter("src/main/resources/application4.csv")) {
var mappingStrategy = new CustomColumnPositionStrategy<ApplicationWithAnnotation>();
mappingStrategy.setType(ApplicationWithAnnotation.class);
var builder = new StatefulBeanToCsvBuilder<ApplicationWithAnnotation>(writer).withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
.withMappingStrategy(mappingStrategy)
.build();
builder.write(applications);
}
}
}

View File

@ -0,0 +1,12 @@
package com.baeldung.java.io.pojotocsv;
import com.opencsv.bean.ColumnPositionMappingStrategy;
import com.opencsv.exceptions.CsvRequiredFieldEmptyException;
public class CustomColumnPositionStrategy<T> extends ColumnPositionMappingStrategy<T> {
@Override
public String[] generateHeader(T bean) throws CsvRequiredFieldEmptyException {
super.generateHeader(bean);
return super.getColumnMapping();
}
}

View File

@ -0,0 +1,16 @@
package com.baeldung.java.io.pojotocsv;
import java.util.Arrays;
import com.opencsv.bean.HeaderColumnNameMappingStrategy;
import com.opencsv.exceptions.CsvRequiredFieldEmptyException;
public class CustomHeaderStrategy<T> extends HeaderColumnNameMappingStrategy<T> {
@Override
public String[] generateHeader(T bean) throws CsvRequiredFieldEmptyException {
String[] header = super.generateHeader(bean);
return Arrays.stream(header)
.map(String::toLowerCase)
.toArray(String[]::new);
}
}

View File

@ -0,0 +1,4 @@
AGE,CREATED_AT,ID,NAME
34,2023-08-11,123,Sam
44,2023-02-11,456,Tam
54,2023-03-11,890,Jam
1 AGE CREATED_AT ID NAME
2 34 2023-08-11 123 Sam
3 44 2023-02-11 456 Tam
4 54 2023-03-11 890 Jam

View File

@ -0,0 +1,4 @@
age,created_at,id,name
34,2023-08-11,123,Sam
44,2023-02-11,456,Tam
54,2023-03-11,890,Jam
1 age created_at id name
2 34 2023-08-11 123 Sam
3 44 2023-02-11 456 Tam
4 54 2023-03-11 890 Jam

View File

@ -0,0 +1,3 @@
Sam,123,34,2023-08-11
Tam,456,44,2023-02-11
Jam,789,54,2023-03-11
1 Sam 123 34 2023-08-11
2 Tam 456 44 2023-02-11
3 Jam 789 54 2023-03-11

View File

@ -0,0 +1,4 @@
name,id,age,created_at
Sam,123,34,2023-08-11
Tam,456,44,2023-02-11
Jam,789,54,2023-03-11
1 name id age created_at
2 Sam 123 34 2023-08-11
3 Tam 456 44 2023-02-11
4 Jam 789 54 2023-03-11

View File

@ -0,0 +1,80 @@
package com.baeldung.java.io.pojotocsv;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import java.io.BufferedReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class BeanToCsvUnitTest {
List<Application> applications = new ArrayList<>();
List<ApplicationWithAnnotation> applicationsWithAnnotation = new ArrayList<>();
@BeforeEach
public void beforeEach() {
applications = List.of(new Application("123", "Sam", 34, "2023-08-11"), new Application("456", "Tam", 44, "2023-02-11"), new Application("890", "Jam", 54, "2023-03-11"));
applicationsWithAnnotation = List.of(new ApplicationWithAnnotation("123", "Sam", 34, "2023-08-11"), new ApplicationWithAnnotation("456", "Tam", 44, "2023-02-11"), new ApplicationWithAnnotation("789", "Jam", 54, "2023-03-11"));
}
@Test
public void givenApplicationPOJO_whenUsingDefaultStrategy_thenReceiveCSVFormatWithAscendingOrderOfField() throws Exception {
BeanToCsv beanToCsv = new BeanToCsv();
beanToCsv.beanToCSVWithDefault(applications);
try (BufferedReader bufferedReader = Files.newBufferedReader(Path.of("src/main/resources/application.csv"))) {
List<String> content = bufferedReader.lines()
.toList();
assertThat(content.get(0)).isEqualTo("AGE,CREATED_AT,ID,NAME");
assertThat(content.get(1)).isEqualTo("34,2023-08-11,123,Sam");
assertThat(content.get(2)).isEqualTo("44,2023-02-11,456,Tam");
assertThat(content.get(3)).isEqualTo("54,2023-03-11,890,Jam");
}
}
@Test
public void givenApplicationPOJO_whenUsingCustomHeaderStrategy_thenReceiveCSVFormatWithCustomHeaders() throws Exception {
BeanToCsv beanToCsv = new BeanToCsv();
beanToCsv.beanToCSVWithCustomHeaderStrategy(applications);
try (BufferedReader bufferedReader = Files.newBufferedReader(Path.of("src/main/resources/application2.csv"))) {
List<String> content = bufferedReader.lines()
.toList();
assertThat(content.get(0)).isEqualTo("age,created_at,id,name");
assertThat(content.get(1)).isEqualTo("34,2023-08-11,123,Sam");
assertThat(content.get(2)).isEqualTo("44,2023-02-11,456,Tam");
assertThat(content.get(3)).isEqualTo("54,2023-03-11,890,Jam");
}
}
@Test
public void givenApplicationPOJOWithAnnotation_whenUsingCustomPositionStrategy_thenReceiveCSVFormatWithCustomPosition() throws Exception {
BeanToCsv beanToCsv = new BeanToCsv();
beanToCsv.beanToCSVWithCustomPositionStrategy(applicationsWithAnnotation);
try (BufferedReader bufferedReader = Files.newBufferedReader(Path.of("src/main/resources/application3.csv"))) {
List<String> content = bufferedReader.lines()
.toList();
assertThat(content.get(0)).isEqualTo("Sam,123,34,2023-08-11");
assertThat(content.get(1)).isEqualTo("Tam,456,44,2023-02-11");
assertThat(content.get(2)).isEqualTo("Jam,789,54,2023-03-11");
}
}
@Test
public void givenApplicationPOJOWithAnnotation_whenUsingCustomHeaderPositionStrategy_thenReceiveCSVFormatWithCustomHeaderPosition() throws Exception {
BeanToCsv beanToCsv = new BeanToCsv();
beanToCsv.beanToCSVWithCustomHeaderAndPositionStrategy(applicationsWithAnnotation);
try (BufferedReader bufferedReader = Files.newBufferedReader(Path.of("src/main/resources/application4.csv"))) {
List<String> content = bufferedReader.lines()
.toList();
assertThat(content.get(0)).isEqualTo("name,id,age,created_at");
assertThat(content.get(1)).isEqualTo("Sam,123,34,2023-08-11");
assertThat(content.get(2)).isEqualTo("Tam,456,44,2023-02-11");
assertThat(content.get(3)).isEqualTo("Jam,789,54,2023-03-11");
}
}
}

View File

@ -0,0 +1,36 @@
package com.baeldung.spring.kafka.multipletopics;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, PaymentData> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(PaymentData.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PaymentData> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PaymentData> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View File

@ -0,0 +1,13 @@
package com.baeldung.spring.kafka.multipletopics;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
@EnableKafka
@SpringBootApplication
public class KafkaMultipleTopicsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaMultipleTopicsApplication.class, args);
}
}

View File

@ -0,0 +1,34 @@
package com.baeldung.spring.kafka.multipletopics;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, PaymentData> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaProducerFactory<>(config, new StringSerializer(), new JsonSerializer<>());
}
@Bean
public KafkaTemplate<String, PaymentData> kafkaProducer() {
return new KafkaTemplate<>(producerFactory());
}
}

View File

@ -0,0 +1,54 @@
package com.baeldung.spring.kafka.multipletopics;
import java.math.BigDecimal;
import java.util.Currency;
import java.util.StringJoiner;
public class PaymentData {
private String paymentReference;
private String type;
private BigDecimal amount;
private Currency currency;
public String getPaymentReference() {
return paymentReference;
}
public void setPaymentReference(String paymentReference) {
this.paymentReference = paymentReference;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public BigDecimal getAmount() {
return amount;
}
public void setAmount(BigDecimal amount) {
this.amount = amount;
}
public Currency getCurrency() {
return currency;
}
public void setCurrency(Currency currency) {
this.currency = currency;
}
@Override
public String toString() {
return new StringJoiner(", ", PaymentData.class.getSimpleName() + "[", "]")
.add("paymentReference='" + paymentReference + "'")
.add("type='" + type + "'")
.add("amount=" + amount)
.add("currency=" + currency)
.toString();
}
}

View File

@ -0,0 +1,18 @@
package com.baeldung.spring.kafka.multipletopics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
@Service
public class PaymentDataListener {
private final Logger log = LoggerFactory.getLogger(PaymentDataListener.class);
@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
public void handlePaymentEvents(PaymentData paymentData, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on topic={}, payload={}", topic, paymentData);
}
}

View File

@ -0,0 +1,78 @@
package com.baeldung.spring.kafka.multipletopics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import java.math.BigDecimal;
import java.util.Currency;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
@SpringBootTest(classes = KafkaMultipleTopicsApplication.class)
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class KafkaMultipleTopicsIntegrationTest {
private static final String CARD_PAYMENTS_TOPIC = "card-payments";
private static final String BANK_TRANSFERS_TOPIC = "bank-transfers";
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaTemplate<String, PaymentData> kafkaProducer;
@SpyBean
private PaymentDataListener paymentsConsumer;
@BeforeEach
void setUp() {
// wait for embedded Kafka
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer, 2);
}
}
@Test
public void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
doAnswer(invocation -> {
countDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePaymentEvents(any(), any());
kafkaProducer.send(CARD_PAYMENTS_TOPIC, createCardPayment());
kafkaProducer.send(BANK_TRANSFERS_TOPIC, createBankTransfer());
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
}
private PaymentData createCardPayment() {
PaymentData cardPayment = new PaymentData();
cardPayment.setAmount(BigDecimal.valueOf(275));
cardPayment.setPaymentReference("A184028KM0013790");
cardPayment.setCurrency(Currency.getInstance("GBP"));
cardPayment.setType("card");
return cardPayment;
}
private PaymentData createBankTransfer() {
PaymentData bankTransfer = new PaymentData();
bankTransfer.setAmount(BigDecimal.valueOf(150));
bankTransfer.setPaymentReference("19ae2-18mk73-009");
bankTransfer.setCurrency(Currency.getInstance("EUR"));
bankTransfer.setType("bank");
return bankTransfer;
}
}