diff --git a/libraries-data-3/pom.xml b/libraries-data-3/pom.xml index bfc39e537e..37d5c7ca0d 100644 --- a/libraries-data-3/pom.xml +++ b/libraries-data-3/pom.xml @@ -17,8 +17,6 @@ org.apache.kafka kafka-clients ${kafka.version} - test - test org.apache.kafka @@ -47,6 +45,12 @@ ${testcontainers-kafka.version} test + + org.testcontainers + junit-jupiter + ${testcontainers-jupiter.version} + test + @@ -54,6 +58,7 @@ 1.7.25 2.8.0 1.15.3 + 1.15.3 \ No newline at end of file diff --git a/libraries-data-3/src/main/java/com/baeldung/kafka/admin/KafkaTopicApplication.java b/libraries-data-3/src/main/java/com/baeldung/kafka/admin/KafkaTopicApplication.java new file mode 100644 index 0000000000..0d74e27d4e --- /dev/null +++ b/libraries-data-3/src/main/java/com/baeldung/kafka/admin/KafkaTopicApplication.java @@ -0,0 +1,87 @@ +package com.baeldung.kafka.admin; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.config.TopicConfig; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class KafkaTopicApplication { + + private final Properties properties; + + public KafkaTopicApplication(Properties properties) { + this.properties = properties; + } + + public void createTopic(String topicName) throws Exception { + try (Admin admin = Admin.create(properties)) { + int partitions = 1; + short replicationFactor = 1; + NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor); + + CreateTopicsResult result = admin.createTopics( + Collections.singleton(newTopic)); + + // get the async result for the new topic creation + KafkaFuture future = result.values().get(topicName); + + // call get() to block until topic creation has completed or failed + future.get(); + } + } + + public void createTopicWithOptions(String topicName) throws Exception { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + + try (Admin admin = Admin.create(props)) { + int partitions = 1; + short replicationFactor = 1; + NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor); + + CreateTopicsOptions topicOptions = new CreateTopicsOptions() + .validateOnly(true) + .retryOnQuotaViolation(true); + + CreateTopicsResult result = admin.createTopics( + Collections.singleton(newTopic), topicOptions + ); + + KafkaFuture future = result.values().get(topicName); + future.get(); + } + } + + public void createCompactedTopicWithCompression(String topicName) throws Exception { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + + try (Admin admin = Admin.create(props)) { + int partitions = 1; + short replicationFactor = 1; + + // Create a compacted topic with 'lz4' compression codec + Map newTopicConfig = new HashMap<>(); + newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); + newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"); + NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor) + .configs(newTopicConfig); + + CreateTopicsResult result = admin.createTopics( + Collections.singleton(newTopic) + ); + + KafkaFuture future = result.values().get(topicName); + future.get(); + } + } + +} diff --git a/libraries-data-3/src/test/java/com/baeldung/kafka/admin/KafkaTopicApplicationIntegrationTest.java b/libraries-data-3/src/test/java/com/baeldung/kafka/admin/KafkaTopicApplicationIntegrationTest.java new file mode 100644 index 0000000000..d79f6af7c1 --- /dev/null +++ b/libraries-data-3/src/test/java/com/baeldung/kafka/admin/KafkaTopicApplicationIntegrationTest.java @@ -0,0 +1,40 @@ +package com.baeldung.kafka.admin; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class KafkaTopicApplicationIntegrationTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); + + private KafkaTopicApplication kafkaTopicApplication; + + @BeforeEach + void setup() { + Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + kafkaTopicApplication = new KafkaTopicApplication(properties); + } + + @Test + void givenTopicName_whenCreateNewTopic_thenTopicIsCreated() throws Exception { + kafkaTopicApplication.createTopic("test-topic"); + + String topicCommand = "/usr/bin/kafka-topics --bootstrap-server=localhost:9092 --list"; + String stdout = KAFKA_CONTAINER.execInContainer("/bin/sh", "-c", topicCommand) + .getStdout(); + + assertThat(stdout).contains("test-topic"); + } +}