Merge pull request #15912 from parthiv39731/PR-7386

BAEL-7386, Introduction to Redpanda
This commit is contained in:
davidmartinezbarua 2024-03-12 19:05:58 -03:00 committed by GitHub
commit e9b37e1796
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 220 additions and 0 deletions

View File

@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>java-redpanda</artifactId>
<name>java-redpanda</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>messaging-modules</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>redpanda</artifactId>
<version>${redpanda.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.19.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<properties>
<redpanda.version>1.19.4</redpanda.version>
<kafka.version>3.6.1</kafka.version>
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -0,0 +1,7 @@
package com.baeldung.redpanda;
public class Application {
public static void main(String[] args) {
}
}

View File

@ -0,0 +1,151 @@
package com.baeldung.redpanda;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.redpanda.RedpandaContainer;
public class RedpandaLiveTest {
private static final Logger LOGGER = LoggerFactory.getLogger(RedpandaLiveTest.class);
private static RedpandaContainer redpandaContainer = null;
private static final String TOPIC_NAME = "baeldung";
private static final Integer BROKER_PORT = 9092;
@BeforeAll
static void setup() throws ExecutionException, InterruptedException {
installRedpanda();
createTopic(TOPIC_NAME);
publishMessages(TOPIC_NAME);
}
@AfterAll
static void cleanup() {
redpandaContainer.stop();
}
private static void publishMessages(String topic) throws ExecutionException, InterruptedException {
try (final KafkaProducer<String, String> producer = createProducer()) {
for (int i = 0; i < 10; i++) {
publishMessage("test_msg_key_1_" + i, "How are you redpanda:" + i, topic, producer);
}
}
}
private static void installRedpanda() {
final String DOCKER_IMAGE = "docker.redpanda.com/redpandadata/redpanda:v23.1.2";
Network network = Network.newNetwork();
redpandaContainer = new RedpandaContainer(DOCKER_IMAGE).withNetwork(network)
.withNetworkAliases("redpanda")
.withExposedPorts(BROKER_PORT);
redpandaContainer.start();
}
private static AdminClient createAdminClient() {
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
return KafkaAdminClient.create(adminProps);
}
private static void createTopic(String topicName) {
try (AdminClient adminClient = createAdminClient()) {
NewTopic topic = new NewTopic(topicName, 1, (short) 1);
adminClient.createTopics(Collections.singleton(topic));
} catch (Exception e) {
LOGGER.error("Error occurred during topic creation:", e);
}
}
private static KafkaProducer<String, String> createProducer() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<String, String>(producerProps);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<String, String>(consumerProps);
}
private static void publishMessage(String msgKey, String msg, String topic, KafkaProducer<String, String> producer)
throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msgKey, msg);
producer.send(record).get();
}
private static String getBrokerUrl() {
return redpandaContainer.getHost() + ":" + redpandaContainer.getMappedPort(BROKER_PORT);
}
@Test
void whenCreateTopic_thenSuccess() throws ExecutionException, InterruptedException {
String topic = "test-topic";
createTopic(topic);
try(AdminClient adminClient = createAdminClient()) {
assertTrue(adminClient.listTopics()
.names()
.get()
.contains(topic));
}
}
@Test
void givenTopic_whenPublishMsg_thenSuccess() {
try (final KafkaProducer<String, String> producer = createProducer()) {
assertDoesNotThrow(() -> publishMessage("test_msg_key_2", "Hello Redpanda!", "baeldung-topic", producer));
}
}
@Test
void givenTopic_whenConsumeMessage_thenSuccess() {
try (KafkaConsumer<String, String> kafkaConsumer = createConsumer()) {
kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
if(records.count() == 0) {
continue;
}
assertTrue(records.count() >= 1);
break;
}
}
}
}

View File

@ -18,6 +18,7 @@
<module>apache-camel</module> <module>apache-camel</module>
<module>apache-rocketmq</module> <module>apache-rocketmq</module>
<module>jgroups</module> <module>jgroups</module>
<module>java-redpanda</module>
<module>rabbitmq</module> <module>rabbitmq</module>
<module>spring-amqp</module> <module>spring-amqp</module>
<module>spring-apache-camel</module> <module>spring-apache-camel</module>