From 112f1fa09eb3e2b1a322dfc118018501da97ab26 Mon Sep 17 00:00:00 2001 From: parthiv39731 <70740707+parthiv39731@users.noreply.github.com> Date: Sun, 18 Feb 2024 23:27:52 +0530 Subject: [PATCH] BAEL-7386, Introduction to Redpanda --- java-redpanda/pom.xml | 62 +++++++ .../baeldung/redpanda/RedpandaLiveTest.java | 151 ++++++++++++++++++ pom.xml | 1 + 3 files changed, 214 insertions(+) create mode 100644 java-redpanda/pom.xml create mode 100644 java-redpanda/src/test/java/com/baeldung/redpanda/RedpandaLiveTest.java diff --git a/java-redpanda/pom.xml b/java-redpanda/pom.xml new file mode 100644 index 0000000000..7aad60550e --- /dev/null +++ b/java-redpanda/pom.xml @@ -0,0 +1,62 @@ + + + 4.0.0 + redpanda + redpanda + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + org.testcontainers + junit-jupiter + ${testcontainers-jupiter.version} + test + + + + org.testcontainers + testcontainers + test + + + + org.testcontainers + redpanda + ${redpanda.version} + test + + + + + + + + org.testcontainers + testcontainers-bom + 1.19.3 + pom + import + + + + + + 1.19.4 + 3.6.1 + 1.15.3 + UTF-8 + + + \ No newline at end of file diff --git a/java-redpanda/src/test/java/com/baeldung/redpanda/RedpandaLiveTest.java b/java-redpanda/src/test/java/com/baeldung/redpanda/RedpandaLiveTest.java new file mode 100644 index 0000000000..0dcbc4d2b2 --- /dev/null +++ b/java-redpanda/src/test/java/com/baeldung/redpanda/RedpandaLiveTest.java @@ -0,0 +1,151 @@ +package com.baeldung.redpanda; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.redpanda.RedpandaContainer; + +public class RedpandaLiveTest { + private static final Logger LOGGER = LoggerFactory.getLogger(RedpandaLiveTest.class); + + private static RedpandaContainer redpandaContainer = null; + + private static final String TOPIC_NAME = "baeldung"; + + private static final Integer BROKER_PORT = 9092; + + @BeforeAll + static void setup() throws ExecutionException, InterruptedException { + installRedpanda(); + + createTopic(TOPIC_NAME); + + publishMessages(TOPIC_NAME); + } + + @AfterAll + static void cleanup() { + redpandaContainer.stop(); + } + + private static void publishMessages(String topic) throws ExecutionException, InterruptedException { + try (final KafkaProducer producer = createProducer()) { + for (int i = 0; i < 10; i++) { + publishMessage("test_msg_key_1_" + i, "How are you redpanda:" + i, topic, producer); + } + } + } + + private static void installRedpanda() { + final String DOCKER_IMAGE = "docker.redpanda.com/redpandadata/redpanda:v23.1.2"; + Network network = Network.newNetwork(); + redpandaContainer = new RedpandaContainer(DOCKER_IMAGE).withNetwork(network) + .withNetworkAliases("redpanda") + .withExposedPorts(BROKER_PORT); + redpandaContainer.start(); + } + + private static AdminClient createAdminClient() { + Properties adminProps = new Properties(); + adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl()); + return KafkaAdminClient.create(adminProps); + } + + private static void createTopic(String topicName) { + + try (AdminClient adminClient = createAdminClient()) { + NewTopic topic = new NewTopic(topicName, 1, (short) 1); + adminClient.createTopics(Collections.singleton(topic)); + } catch (Exception e) { + LOGGER.error("Error occurred during topic creation:", e); + } + } + + private static KafkaProducer createProducer() { + Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + return new KafkaProducer(producerProps); + } + + private static KafkaConsumer createConsumer() { + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl()); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + return new KafkaConsumer(consumerProps); + } + + private static void publishMessage(String msgKey, String msg, String topic, KafkaProducer producer) + throws ExecutionException, InterruptedException { + ProducerRecord record = new ProducerRecord<>(topic, msgKey, msg); + producer.send(record).get(); + } + + private static String getBrokerUrl() { + return redpandaContainer.getHost() + ":" + redpandaContainer.getMappedPort(BROKER_PORT); + } + + @Test + void whenCreateTopic_thenSuccess() throws ExecutionException, InterruptedException { + String topic = "test-topic"; + createTopic(topic); + try(AdminClient adminClient = createAdminClient()) { + assertTrue(adminClient.listTopics() + .names() + .get() + .contains(topic)); + } + } + + @Test + void givenTopic_whenPublishMsg_thenSuccess() { + try (final KafkaProducer producer = createProducer()) { + assertDoesNotThrow(() -> publishMessage("test_msg_key_2", "Hello Redpanda!", "baeldung-topic", producer)); + } + } + + @Test + void givenTopic_whenConsumeMessage_thenSuccess() { + try (KafkaConsumer kafkaConsumer = createConsumer()) { + kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME)); + + while(true) { + ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(1000)); + if(records.count() == 0) { + continue; + } + assertTrue(records.count() >= 1); + break; + } + } + } +} diff --git a/pom.xml b/pom.xml index 6de6042906..e1bdef5fe4 100644 --- a/pom.xml +++ b/pom.xml @@ -960,6 +960,7 @@ jackson-simple java-blockchain java-jdi + java-redpanda javafx javax-sound