diff --git a/apache-httpclient/pom.xml b/apache-httpclient/pom.xml
index c371d1fc06..5c3ea5b3b3 100644
--- a/apache-httpclient/pom.xml
+++ b/apache-httpclient/pom.xml
@@ -90,6 +90,12 @@
+
+ org.mock-server
+ mockserver-netty
+ ${mockserver.version}
+
+
com.github.tomakehurst
wiremock
@@ -112,6 +118,7 @@
4.1.4
+ 5.6.1
2.5.1
4.5.8
diff --git a/apache-httpclient/src/test/java/com/baeldung/httpclient/GetRequestMockServer.java b/apache-httpclient/src/test/java/com/baeldung/httpclient/GetRequestMockServer.java
new file mode 100644
index 0000000000..f65558906f
--- /dev/null
+++ b/apache-httpclient/src/test/java/com/baeldung/httpclient/GetRequestMockServer.java
@@ -0,0 +1,80 @@
+package com.baeldung.httpclient;
+
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.matchers.Times.exactly;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.URISyntaxException;
+
+import org.apache.http.HttpStatus;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.integration.ClientAndServer;
+
+public class GetRequestMockServer {
+
+ public static ClientAndServer mockServer;
+ public static String serviceOneUrl;
+ public static String serviceTwoUrl;
+
+ private static int serverPort;
+
+ public static final String SERVER_ADDRESS = "127.0.0.1";
+ public static final String PATH_ONE = "/test1";
+ public static final String PATH_TWO = "/test2";
+ public static final String METHOD = "GET";
+
+ @BeforeAll
+ static void startServer() throws IOException, URISyntaxException {
+ //serverPort = getFreePort();
+ serverPort = 8080;
+ System.out.println("Free port "+serverPort);
+ serviceOneUrl = "http://" + SERVER_ADDRESS + ":" + serverPort + PATH_ONE;
+ serviceTwoUrl = "http://" + SERVER_ADDRESS + ":" + serverPort + PATH_TWO;
+ mockServer = startClientAndServer(serverPort);
+ mockGetRequest();
+ }
+
+ @AfterAll
+ static void stopServer() {
+ mockServer.stop();
+ }
+
+ private static void mockGetRequest() {
+ new MockServerClient(SERVER_ADDRESS, serverPort)
+ .when(
+ request()
+ .withPath(PATH_ONE)
+ .withMethod(METHOD),
+ exactly(5)
+ )
+ .respond(
+ response()
+ .withStatusCode(HttpStatus.SC_OK)
+ .withBody("{\"status\":\"ok\"}")
+ );
+ new MockServerClient(SERVER_ADDRESS, serverPort)
+ .when(
+ request()
+ .withPath(PATH_TWO)
+ .withMethod(METHOD),
+ exactly(1)
+ )
+ .respond(
+ response()
+ .withStatusCode(HttpStatus.SC_OK)
+ .withBody("{\"status\":\"ok\"}")
+ );
+ }
+
+ private static int getFreePort () throws IOException {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ return serverSocket.getLocalPort();
+ }
+ }
+
+}
diff --git a/apache-httpclient/src/test/java/com/baeldung/httpclient/HttpAsyncClientLiveTest.java b/apache-httpclient/src/test/java/com/baeldung/httpclient/HttpAsyncClientLiveTest.java
index ab0e4e6308..f4b9266d7e 100644
--- a/apache-httpclient/src/test/java/com/baeldung/httpclient/HttpAsyncClientLiveTest.java
+++ b/apache-httpclient/src/test/java/com/baeldung/httpclient/HttpAsyncClientLiveTest.java
@@ -38,7 +38,7 @@ import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.ssl.TrustStrategy;
-class HttpAsyncClientLiveTest {
+class HttpAsyncClientLiveTest extends GetRequestMockServer {
private static final String HOST = "http://www.google.com";
private static final String HOST_WITH_SSL = "https://mms.nw.ru/";
diff --git a/apache-kafka-2/README.md b/apache-kafka-2/README.md
index e86504d605..9a5f6e15ae 100644
--- a/apache-kafka-2/README.md
+++ b/apache-kafka-2/README.md
@@ -9,3 +9,5 @@ You can build the project from the command line using: *mvn clean install*, or i
- [Guide to Check if Apache Kafka Server Is Running](https://www.baeldung.com/apache-kafka-check-server-is-running)
- [Add Custom Headers to a Kafka Message](https://www.baeldung.com/java-kafka-custom-headers)
- [Get Last N Messages in Apache Kafka Topic](https://www.baeldung.com/java-apache-kafka-get-last-n-messages)
+- [Is a Key Required as Part of Sending Messages to Kafka?](https://www.baeldung.com/java-kafka-message-key)
+- [Read Data From the Beginning Using Kafka Consumer API](https://www.baeldung.com/java-kafka-consumer-api-read)
diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/ConsumeFromBeginning.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/ConsumeFromBeginning.java
new file mode 100644
index 0000000000..569c5aa9e9
--- /dev/null
+++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/ConsumeFromBeginning.java
@@ -0,0 +1,81 @@
+package com.baeldung.kafka.consumer;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumeFromBeginning {
+
+ private static Logger logger = LoggerFactory.getLogger(ConsumeFromBeginning.class);
+
+ private static String TOPIC = "baeldung";
+ private static int messagesInTopic = 10;
+
+ private static KafkaProducer producer;
+ private static KafkaConsumer consumer;
+
+ public static void main(String[] args) {
+ setup();
+
+ publishMessages();
+
+ consumeFromBeginning();
+ }
+
+ private static void consumeFromBeginning() {
+ consumer.subscribe(Arrays.asList(TOPIC));
+
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
+
+ for (ConsumerRecord record : records) {
+ logger.info(record.value());
+ }
+
+ consumer.seekToBeginning(consumer.assignment());
+
+ records = consumer.poll(Duration.ofSeconds(10));
+
+ for (ConsumerRecord record : records) {
+ logger.info(record.value());
+ }
+ }
+
+ private static void publishMessages() {
+ for (int i = 1; i <= messagesInTopic; i++) {
+ ProducerRecord record = new ProducerRecord<>(TOPIC, String.valueOf(i));
+ producer.send(record);
+ }
+ }
+
+ private static void setup() {
+ Properties producerProperties = new Properties();
+ producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ Properties consumerProperties = new Properties();
+ consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID()
+ .toString());
+
+ producer = new KafkaProducer<>(producerProperties);
+ consumer = new KafkaConsumer<>(consumerProperties);
+ }
+
+}
diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/MessageWithKey.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/MessageWithKey.java
new file mode 100644
index 0000000000..b03c1e1adc
--- /dev/null
+++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/MessageWithKey.java
@@ -0,0 +1,106 @@
+package com.baeldung.kafka.message;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageWithKey {
+
+ private static Logger logger = LoggerFactory.getLogger(MessageWithKey.class);
+
+ private static String TOPIC = "baeldung";
+ private static int PARTITIONS = 5;
+ private static short REPLICATION_FACTOR = 1;
+
+ private static String MESSAGE_KEY = "message-key";
+
+ private static Admin admin;
+ private static KafkaProducer producer;
+ private static KafkaConsumer consumer;
+
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+ setup();
+
+ publishMessagesWithoutKey();
+
+ consumeMessages();
+
+ publishMessagesWithKey();
+
+ consumeMessages();
+ }
+
+ private static void consumeMessages() {
+ consumer.subscribe(Arrays.asList(TOPIC));
+
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(5));
+ for (ConsumerRecord record : records) {
+ logger.info("Key : {}, Value : {}", record.key(), record.value());
+ }
+ }
+
+ private static void publishMessagesWithKey() throws ExecutionException, InterruptedException {
+ for (int i = 1; i <= 10; i++) {
+ ProducerRecord record = new ProducerRecord<>(TOPIC, MESSAGE_KEY, String.valueOf(i));
+ Future future = producer.send(record);
+ RecordMetadata metadata = future.get();
+
+ logger.info(String.valueOf(metadata.partition()));
+ }
+ }
+
+ private static void publishMessagesWithoutKey() throws ExecutionException, InterruptedException {
+ for (int i = 1; i <= 10; i++) {
+ ProducerRecord record = new ProducerRecord<>(TOPIC, String.valueOf(i));
+ Future future = producer.send(record);
+ RecordMetadata metadata = future.get();
+
+ logger.info(String.valueOf(metadata.partition()));
+ }
+ }
+
+ private static void setup() {
+ Properties adminProperties = new Properties();
+ adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+
+ Properties producerProperties = new Properties();
+ producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ Properties consumerProperties = new Properties();
+ consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID()
+ .toString());
+
+ admin = Admin.create(adminProperties);
+ producer = new KafkaProducer<>(producerProperties);
+ consumer = new KafkaConsumer<>(consumerProperties);
+
+ admin.createTopics(Collections.singleton(new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR)));
+ }
+
+}
\ No newline at end of file
diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/ConsumeFromBeginningLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/ConsumeFromBeginningLiveTest.java
new file mode 100644
index 0000000000..6bfba1eca9
--- /dev/null
+++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/ConsumeFromBeginningLiveTest.java
@@ -0,0 +1,109 @@
+package com.baeldung.kafka.consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+// This live test needs a Docker Daemon running so that a kafka container can be created
+
+@Testcontainers
+public class ConsumeFromBeginningLiveTest {
+
+ private static Logger logger = LoggerFactory.getLogger(ConsumeFromBeginningLiveTest.class);
+
+ private static String TOPIC = "baeldung";
+ private static int messagesInTopic = 10;
+
+ private static KafkaProducer producer;
+ private static KafkaConsumer consumer;
+
+ @Container
+ private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
+
+ @BeforeAll
+ static void setup() {
+ KAFKA_CONTAINER.addExposedPort(9092);
+
+ Properties producerProperties = new Properties();
+ producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
+ producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ Properties consumerProperties = new Properties();
+ consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
+ consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID()
+ .toString());
+
+ producer = new KafkaProducer<>(producerProperties);
+ consumer = new KafkaConsumer<>(consumerProperties);
+ }
+
+ private static void publishMessages() throws ExecutionException, InterruptedException {
+ for (int i = 1; i <= messagesInTopic; i++) {
+ ProducerRecord record = new ProducerRecord<>(TOPIC, String.valueOf(i));
+ producer.send(record)
+ .get();
+ }
+ }
+
+ @AfterAll
+ static void destroy() {
+ KAFKA_CONTAINER.stop();
+ }
+
+ @Test
+ void givenMessages_whenConsumedFromBeginning_thenCheckIfConsumedFromBeginning() throws ExecutionException, InterruptedException {
+
+ publishMessages();
+
+ consumer.subscribe(Arrays.asList(TOPIC));
+
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
+
+ int messageCount = 0;
+ for (ConsumerRecord record : records) {
+ logger.info(record.value());
+ messageCount++;
+ }
+
+ assertEquals(messagesInTopic, messageCount);
+
+ consumer.seekToBeginning(consumer.assignment());
+
+ records = consumer.poll(Duration.ofSeconds(10));
+
+ messageCount = 0;
+ for (ConsumerRecord record : records) {
+ logger.info(record.value());
+ messageCount++;
+ }
+
+ assertEquals(messagesInTopic, messageCount);
+ }
+}
diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/MessageWithKeyLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/MessageWithKeyLiveTest.java
new file mode 100644
index 0000000000..093dc629cb
--- /dev/null
+++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/MessageWithKeyLiveTest.java
@@ -0,0 +1,130 @@
+package com.baeldung.kafka.message;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+// This live test needs a Docker Daemon running so that a kafka container can be created
+
+@Testcontainers
+public class MessageWithKeyLiveTest {
+
+ private static String TOPIC = "baeldung";
+ private static int PARTITIONS = 5;
+ private static short REPLICATION_FACTOR = 1;
+
+ private static String MESSAGE_KEY = "message-key";
+ private static String MESSAGE_VALUE = "Hello World";
+
+ private static Admin admin;
+ private static KafkaProducer producer;
+ private static KafkaConsumer consumer;
+
+ @Container
+ private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
+
+ @BeforeAll
+ static void setup() {
+ KAFKA_CONTAINER.addExposedPort(9092);
+
+ Properties adminProperties = new Properties();
+ adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
+
+ Properties producerProperties = new Properties();
+ producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
+ producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ Properties consumerProperties = new Properties();
+ consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
+ consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID()
+ .toString());
+
+ admin = Admin.create(adminProperties);
+ producer = new KafkaProducer<>(producerProperties);
+ consumer = new KafkaConsumer<>(consumerProperties);
+
+ admin.createTopics(Collections.singleton(new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR)));
+ }
+
+ @AfterAll
+ static void destroy() {
+ KAFKA_CONTAINER.stop();
+ }
+
+ @Test
+ void givenAMessageWithKey_whenPublishedToKafkaAndConsumed_thenCheckForKey() throws ExecutionException, InterruptedException {
+
+ ProducerRecord producerRecord = new ProducerRecord<>(TOPIC, MESSAGE_KEY, MESSAGE_VALUE);
+ Future future = producer.send(producerRecord);
+
+ RecordMetadata metadata = future.get();
+
+ assertNotNull(metadata);
+
+ consumer.subscribe(Arrays.asList(TOPIC));
+
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(5));
+ for (ConsumerRecord consumerRecord : records) {
+ assertEquals(MESSAGE_KEY, consumerRecord.key());
+ assertEquals(MESSAGE_VALUE, consumerRecord.value());
+ }
+ }
+
+ @Test
+ void givenAListOfMessageWithKeys_whenPublishedToKafka_thenCheckedIfPublishedToSamePartition() throws ExecutionException, InterruptedException {
+
+ boolean isSamePartition = true;
+ int partition = 0;
+
+ for (int i = 1; i <= 10; i++) {
+ ProducerRecord producerRecord = new ProducerRecord<>(TOPIC, MESSAGE_KEY, MESSAGE_VALUE);
+ Future future = producer.send(producerRecord);
+
+ RecordMetadata metadata = future.get();
+
+ assertNotNull(metadata);
+ if (i == 1) {
+ partition = metadata.partition();
+ } else {
+ if (partition != metadata.partition()) {
+ isSamePartition = false;
+ }
+ }
+ }
+
+ assertTrue(isSamePartition);
+ }
+}
diff --git a/apache-poi-2/pom.xml b/apache-poi-2/pom.xml
index af959292fa..9a01a76d73 100644
--- a/apache-poi-2/pom.xml
+++ b/apache-poi-2/pom.xml
@@ -1,7 +1,7 @@
-
+
4.0.0
apache-poi-2
0.0.1-SNAPSHOT
@@ -19,10 +19,15 @@
poi-ooxml
${poi.version}
+
+ org.apache.poi
+ poi-scratchpad
+ ${poi.version}
+
- 5.2.0
+ 5.2.3
\ No newline at end of file
diff --git a/apache-poi-2/src/main/java/com/baeldung/poi/replacevariables/DocTextReplacer.java b/apache-poi-2/src/main/java/com/baeldung/poi/replacevariables/DocTextReplacer.java
new file mode 100644
index 0000000000..f661551ce9
--- /dev/null
+++ b/apache-poi-2/src/main/java/com/baeldung/poi/replacevariables/DocTextReplacer.java
@@ -0,0 +1,38 @@
+package com.baeldung.poi.replacevariables;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.poi.hwpf.HWPFDocument;
+import org.apache.poi.hwpf.usermodel.Range;
+import org.apache.poi.poifs.filesystem.POIFSFileSystem;
+
+public class DocTextReplacer {
+
+ public void replaceText() throws IOException {
+ String filePath = getClass().getClassLoader()
+ .getResource("baeldung.doc")
+ .getPath();
+ try (InputStream inputStream = new FileInputStream(filePath); POIFSFileSystem fileSystem = new POIFSFileSystem(inputStream)) {
+ HWPFDocument doc = new HWPFDocument(fileSystem);
+ doc = replaceText(doc, "Baeldung", "Hello");
+ saveFile(filePath, doc);
+ doc.close();
+ }
+ }
+
+ private HWPFDocument replaceText(HWPFDocument doc, String originalText, String updatedText) {
+ Range range = doc.getRange();
+ range.replaceText(originalText, updatedText);
+ return doc;
+ }
+
+ private void saveFile(String filePath, HWPFDocument doc) throws IOException {
+ try (FileOutputStream out = new FileOutputStream(filePath)) {
+ doc.write(out);
+ }
+ }
+
+}
diff --git a/apache-poi-2/src/main/java/com/baeldung/poi/replacevariables/DocxNaiveTextReplacer.java b/apache-poi-2/src/main/java/com/baeldung/poi/replacevariables/DocxNaiveTextReplacer.java
new file mode 100644
index 0000000000..34c2bc43e5
--- /dev/null
+++ b/apache-poi-2/src/main/java/com/baeldung/poi/replacevariables/DocxNaiveTextReplacer.java
@@ -0,0 +1,63 @@
+package com.baeldung.poi.replacevariables;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.poi.xwpf.usermodel.XWPFDocument;
+import org.apache.poi.xwpf.usermodel.XWPFParagraph;
+import org.apache.poi.xwpf.usermodel.XWPFRun;
+import org.apache.poi.xwpf.usermodel.XWPFTable;
+import org.apache.poi.xwpf.usermodel.XWPFTableCell;
+import org.apache.poi.xwpf.usermodel.XWPFTableRow;
+
+public class DocxNaiveTextReplacer {
+
+ public void replaceText() throws IOException {
+ String filePath = getClass().getClassLoader()
+ .getResource("baeldung-copy.docx")
+ .getPath();
+ try (InputStream inputStream = new FileInputStream(filePath)) {
+ XWPFDocument doc = new XWPFDocument(inputStream);
+ doc = replaceText(doc, "Baeldung", "Hello");
+ saveFile(filePath, doc);
+ doc.close();
+ }
+ }
+
+ private XWPFDocument replaceText(XWPFDocument doc, String originalText, String updatedText) {
+ replaceTextInParagraphs(doc.getParagraphs(), originalText, updatedText);
+ for (XWPFTable tbl : doc.getTables()) {
+ for (XWPFTableRow row : tbl.getRows()) {
+ for (XWPFTableCell cell : row.getTableCells()) {
+ replaceTextInParagraphs(cell.getParagraphs(), originalText, updatedText);
+ }
+ }
+ }
+ return doc;
+ }
+
+ private void replaceTextInParagraphs(List paragraphs, String originalText, String updatedText) {
+ paragraphs.forEach(paragraph -> replaceTextInParagraph(paragraph, originalText, updatedText));
+ }
+
+ private void replaceTextInParagraph(XWPFParagraph paragraph, String originalText, String updatedText) {
+ List runs = paragraph.getRuns();
+ for (XWPFRun run : runs) {
+ String text = run.getText(0);
+ if (text != null && text.contains(originalText)) {
+ String updatedRunText = text.replace(originalText, updatedText);
+ run.setText(updatedRunText, 0);
+ }
+ }
+ }
+
+ private void saveFile(String filePath, XWPFDocument doc) throws IOException {
+ try (FileOutputStream out = new FileOutputStream(filePath)) {
+ doc.write(out);
+ }
+ }
+
+}
diff --git a/apache-poi-2/src/main/java/com/baeldung/poi/replacevariables/DocxTextReplacer.java b/apache-poi-2/src/main/java/com/baeldung/poi/replacevariables/DocxTextReplacer.java
new file mode 100644
index 0000000000..2d08d24a4e
--- /dev/null
+++ b/apache-poi-2/src/main/java/com/baeldung/poi/replacevariables/DocxTextReplacer.java
@@ -0,0 +1,65 @@
+package com.baeldung.poi.replacevariables;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.poi.xwpf.usermodel.XWPFDocument;
+import org.apache.poi.xwpf.usermodel.XWPFParagraph;
+import org.apache.poi.xwpf.usermodel.XWPFRun;
+import org.apache.poi.xwpf.usermodel.XWPFTable;
+import org.apache.poi.xwpf.usermodel.XWPFTableCell;
+import org.apache.poi.xwpf.usermodel.XWPFTableRow;
+
+public class DocxTextReplacer {
+
+ public void replaceText() throws IOException {
+ String filePath = getClass().getClassLoader()
+ .getResource("baeldung.docx")
+ .getPath();
+ try (InputStream inputStream = new FileInputStream(filePath)) {
+ XWPFDocument doc = new XWPFDocument(inputStream);
+ doc = replaceText(doc, "Baeldung", "Hello");
+ saveFile(filePath, doc);
+ doc.close();
+ }
+ }
+
+ private XWPFDocument replaceText(XWPFDocument doc, String originalText, String updatedText) {
+ replaceTextInParagraphs(doc.getParagraphs(), originalText, updatedText);
+ for (XWPFTable tbl : doc.getTables()) {
+ for (XWPFTableRow row : tbl.getRows()) {
+ for (XWPFTableCell cell : row.getTableCells()) {
+ replaceTextInParagraphs(cell.getParagraphs(), originalText, updatedText);
+ }
+ }
+ }
+ return doc;
+ }
+
+ private void replaceTextInParagraphs(List paragraphs, String originalText, String updatedText) {
+ paragraphs.forEach(paragraph -> replaceTextInParagraph(paragraph, originalText, updatedText));
+ }
+
+ private void replaceTextInParagraph(XWPFParagraph paragraph, String originalText, String updatedText) {
+ String paragraphText = paragraph.getParagraphText();
+ if (paragraphText.contains(originalText)) {
+ String updatedParagraphText = paragraphText.replace(originalText, updatedText);
+ while (paragraph.getRuns().size() > 0) {
+ paragraph.removeRun(0);
+ }
+ XWPFRun newRun = paragraph.createRun();
+ newRun.setText(updatedParagraphText);
+ }
+ }
+
+ private void saveFile(String filePath, XWPFDocument doc) throws IOException {
+ try (FileOutputStream out = new FileOutputStream(filePath)) {
+ doc.write(out);
+ }
+ }
+
+}
diff --git a/apache-poi-2/src/main/resources/baeldung-copy.docx b/apache-poi-2/src/main/resources/baeldung-copy.docx
new file mode 100644
index 0000000000..2cb76e8ffd
Binary files /dev/null and b/apache-poi-2/src/main/resources/baeldung-copy.docx differ
diff --git a/apache-poi-2/src/main/resources/baeldung.doc b/apache-poi-2/src/main/resources/baeldung.doc
new file mode 100644
index 0000000000..1b8474d65b
Binary files /dev/null and b/apache-poi-2/src/main/resources/baeldung.doc differ
diff --git a/apache-poi-2/src/main/resources/baeldung.docx b/apache-poi-2/src/main/resources/baeldung.docx
new file mode 100644
index 0000000000..f0de4e057b
Binary files /dev/null and b/apache-poi-2/src/main/resources/baeldung.docx differ
diff --git a/apache-poi-2/src/test/java/com/baeldung/poi/replacevariables/DocTextReplacerUnitTest.java b/apache-poi-2/src/test/java/com/baeldung/poi/replacevariables/DocTextReplacerUnitTest.java
new file mode 100644
index 0000000000..0c3d80a354
--- /dev/null
+++ b/apache-poi-2/src/test/java/com/baeldung/poi/replacevariables/DocTextReplacerUnitTest.java
@@ -0,0 +1,31 @@
+package com.baeldung.poi.replacevariables;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.poi.hwpf.HWPFDocument;
+import org.apache.poi.hwpf.extractor.WordExtractor;
+import org.junit.jupiter.api.Test;
+
+class DocTextReplacerUnitTest {
+
+ @Test
+ void whenReplaceText_ThenTextReplaced() throws IOException {
+ new DocTextReplacer().replaceText();
+
+ String filePath = getClass().getClassLoader()
+ .getResource("baeldung.doc")
+ .getPath();
+ try (FileInputStream fis = new FileInputStream(filePath); HWPFDocument document = new HWPFDocument(fis); WordExtractor extractor = new WordExtractor(document)) {
+ long occurrencesOfHello = Arrays.stream(extractor.getText()
+ .split("\\s+"))
+ .filter(s -> s.contains("Hello"))
+ .count();
+ assertEquals(5, occurrencesOfHello);
+ }
+ }
+
+}
diff --git a/apache-poi-2/src/test/java/com/baeldung/poi/replacevariables/DocxNaiveTextReplacerUnitTest.java b/apache-poi-2/src/test/java/com/baeldung/poi/replacevariables/DocxNaiveTextReplacerUnitTest.java
new file mode 100644
index 0000000000..324e63eb51
--- /dev/null
+++ b/apache-poi-2/src/test/java/com/baeldung/poi/replacevariables/DocxNaiveTextReplacerUnitTest.java
@@ -0,0 +1,31 @@
+package com.baeldung.poi.replacevariables;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.poi.xwpf.extractor.XWPFWordExtractor;
+import org.apache.poi.xwpf.usermodel.XWPFDocument;
+import org.junit.jupiter.api.Test;
+
+class DocxNaiveTextReplacerUnitTest {
+
+ @Test
+ void whenReplaceText_ThenTextReplaced() throws IOException {
+ new DocxNaiveTextReplacer().replaceText();
+
+ String filePath = getClass().getClassLoader()
+ .getResource("baeldung-copy.docx")
+ .getPath();
+ try (FileInputStream fis = new FileInputStream(filePath); XWPFDocument document = new XWPFDocument(fis); XWPFWordExtractor extractor = new XWPFWordExtractor(document)) {
+ long occurrencesOfHello = Arrays.stream(extractor.getText()
+ .split("\\s+"))
+ .filter(s -> s.contains("Hello"))
+ .count();
+ assertTrue(occurrencesOfHello < 5);
+ }
+ }
+
+}
diff --git a/apache-poi-2/src/test/java/com/baeldung/poi/replacevariables/DocxTestReplacerUnitTest.java b/apache-poi-2/src/test/java/com/baeldung/poi/replacevariables/DocxTestReplacerUnitTest.java
new file mode 100644
index 0000000000..d09f6b003d
--- /dev/null
+++ b/apache-poi-2/src/test/java/com/baeldung/poi/replacevariables/DocxTestReplacerUnitTest.java
@@ -0,0 +1,31 @@
+package com.baeldung.poi.replacevariables;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.poi.xwpf.extractor.XWPFWordExtractor;
+import org.apache.poi.xwpf.usermodel.XWPFDocument;
+import org.junit.jupiter.api.Test;
+
+class DocxTestReplacerUnitTest {
+
+ @Test
+ void whenReplaceText_ThenTextReplaced() throws IOException {
+ new DocxTextReplacer().replaceText();
+
+ String filePath = getClass().getClassLoader()
+ .getResource("baeldung.docx")
+ .getPath();
+ try (FileInputStream fis = new FileInputStream(filePath); XWPFDocument document = new XWPFDocument(fis); XWPFWordExtractor extractor = new XWPFWordExtractor(document)) {
+ long occurrencesOfHello = Arrays.stream(extractor.getText()
+ .split("\\s+"))
+ .filter(s -> s.contains("Hello"))
+ .count();
+ assertEquals(5, occurrencesOfHello);
+ }
+ }
+
+}
diff --git a/core-java-modules/core-java-collections-list-5/README.md b/core-java-modules/core-java-collections-list-5/README.md
index 31688bc9b1..ff40ae3725 100644
--- a/core-java-modules/core-java-collections-list-5/README.md
+++ b/core-java-modules/core-java-collections-list-5/README.md
@@ -7,3 +7,5 @@ This module contains articles about the Java List collection
- [Finding All Duplicates in a List in Java](https://www.baeldung.com/java-list-find-duplicates)
- [Moving Items Around in an Arraylist](https://www.baeldung.com/java-arraylist-move-items)
- [Check if a List Contains an Element From Another List in Java](https://www.baeldung.com/java-check-elements-between-lists)
+- [Array vs. List Performance in Java](https://www.baeldung.com/java-array-vs-list-performance)
+- [Set Default Value for Elements in List](https://www.baeldung.com/java-list-set-default-values)
diff --git a/core-java-modules/core-java-collections-maps-6/README.md b/core-java-modules/core-java-collections-maps-6/README.md
index fc12a1bb25..5b45752e74 100644
--- a/core-java-modules/core-java-collections-maps-6/README.md
+++ b/core-java-modules/core-java-collections-maps-6/README.md
@@ -1,2 +1,3 @@
## Relevant Articles
- [Copying All Keys and Values From One Hashmap Onto Another Without Replacing Existing Keys and Values](https://www.baeldung.com/java-copy-hashmap-no-changes)
+- [Convert Hashmap to JSON Object in Java](https://www.baeldung.com/java-convert-hashmap-to-json-object)
diff --git a/core-java-modules/core-java-collections-maps-6/src/test/com/baeldung/objecttomap/ObjectToMapUnitTest.java b/core-java-modules/core-java-collections-maps-6/src/test/com/baeldung/objecttomap/ObjectToMapUnitTest.java
new file mode 100644
index 0000000000..52c2fb2bea
--- /dev/null
+++ b/core-java-modules/core-java-collections-maps-6/src/test/com/baeldung/objecttomap/ObjectToMapUnitTest.java
@@ -0,0 +1,76 @@
+package java.com.baeldung.objecttomap;
+import com.google.gson.Gson;
+import org.junit.Assert;
+import org.junit.Test;
+import wiremock.com.fasterxml.jackson.core.type.TypeReference;
+import wiremock.com.fasterxml.jackson.databind.ObjectMapper;
+import wiremock.com.google.common.reflect.TypeToken;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ObjectToMapUnitTest {
+ Employee employee = new Employee("John", 3000.0);
+
+ @Test
+ public void givenJavaObject_whenUsingReflection_thenConvertToMap() throws IllegalAccessException {
+ Map map = convertUsingReflection(employee);
+ Assert.assertEquals(employee.getName(), map.get("name"));
+ Assert.assertEquals(employee.getSalary(), map.get("salary"));
+ }
+
+ private Map convertUsingReflection(Object object) throws IllegalAccessException {
+ Map map = new HashMap<>();
+ Field[] fields = object.getClass().getDeclaredFields();
+
+ for (Field field : fields) {
+ field.setAccessible(true);
+ map.put(field.getName(), field.get(object));
+ }
+
+ return map;
+ }
+
+ @Test
+ public void givenJavaObject_whenUsingJackson_thenConvertToMap() {
+ ObjectMapper objectMapper = new ObjectMapper();
+ Map map = objectMapper.convertValue(employee, new TypeReference