diff --git a/spring-kafka/pom.xml b/spring-kafka/pom.xml index 2db62044b2..ed3767029e 100644 --- a/spring-kafka/pom.xml +++ b/spring-kafka/pom.xml @@ -28,6 +28,10 @@ com.fasterxml.jackson.core jackson-databind + + org.projectlombok + lombok + org.springframework.kafka spring-kafka-test @@ -41,9 +45,15 @@ test - commons-collections - commons-collections - 3.2.1 + org.testcontainers + junit-jupiter + ${testcontainers-kafka.version} + test + + + org.awaitility + awaitility + test diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaConsumer.java b/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaConsumer.java new file mode 100644 index 0000000000..77df74b6c9 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaConsumer.java @@ -0,0 +1,25 @@ +package com.baeldung.kafka.ssl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +@Component +@Slf4j +public class KafkaConsumer { + + public static final String TOPIC = "test-topic"; + + public final List messages = new ArrayList<>(); + + @KafkaListener(topics = TOPIC) + public void receive(ConsumerRecord consumerRecord) { + log.info("Received payload: '{}'", consumerRecord.toString()); + messages.add(consumerRecord.value()); + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaProducer.java new file mode 100644 index 0000000000..895d437c6b --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaProducer.java @@ -0,0 +1,23 @@ +package com.baeldung.kafka.ssl; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@AllArgsConstructor +@Component +public class KafkaProducer { + + private final KafkaTemplate kafkaTemplate; + + public void sendMessage(String message, String topic) { + log.info("Producing message: {}", message); + kafkaTemplate.send(topic, "key", message) + .addCallback( + result -> log.info("Message sent to topic: {}", message), + ex -> log.error("Failed to send message", ex) + ); + } +} diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaSslApplication.java b/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaSslApplication.java new file mode 100644 index 0000000000..ae6df5bee2 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaSslApplication.java @@ -0,0 +1,15 @@ +package com.baeldung.kafka.ssl; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +@EnableAutoConfiguration +public class KafkaSslApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaSslApplication.class, args); + } + +} diff --git a/spring-kafka/src/main/resources/application-ssl.yml b/spring-kafka/src/main/resources/application-ssl.yml new file mode 100644 index 0000000000..8974e62a4c --- /dev/null +++ b/spring-kafka/src/main/resources/application-ssl.yml @@ -0,0 +1,18 @@ +spring: + kafka: + security: + protocol: "SSL" + bootstrap-servers: localhost:9093 + ssl: + trust-store-location: classpath:/client-certs/kafka.client.truststore.jks + trust-store-password: password + key-store-location: classpath:/client-certs/kafka.client.keystore.jks + key-store-password: password + consumer: + group-id: demo-group-id + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + producer: + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer \ No newline at end of file diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/ssl/KafkaSslApplicationLiveTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/ssl/KafkaSslApplicationLiveTest.java new file mode 100644 index 0000000000..e05298face --- /dev/null +++ b/spring-kafka/src/test/java/com/baeldung/kafka/ssl/KafkaSslApplicationLiveTest.java @@ -0,0 +1,54 @@ +package com.baeldung.kafka.ssl; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; + +import static com.baeldung.kafka.ssl.KafkaConsumer.TOPIC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@Slf4j +@ActiveProfiles("ssl") +@Testcontainers +@SpringBootTest(classes = KafkaSslApplication.class) +class KafkaSslApplicationLiveTest { + + private static final File KAFKA_COMPOSE_FILE = new File("src/test/resources/docker/docker-compose.yml"); + private static final String KAFKA_SERVICE = "kafka"; + private static final int SSL_PORT = 9093; + + @Container + public DockerComposeContainer container = + new DockerComposeContainer<>(KAFKA_COMPOSE_FILE) + .withExposedService(KAFKA_SERVICE, SSL_PORT, Wait.forListeningPort()); + + @Autowired + private KafkaProducer kafkaProducer; + + @Autowired + private KafkaConsumer kafkaConsumer; + + @Test + void givenSslIsConfigured_whenProducerSendsMessageOverSsl_thenConsumerReceivesOverSsl() { + String message = generateSampleMessage(); + kafkaProducer.sendMessage(message, TOPIC); + + await().atMost(Duration.ofMinutes(2)) + .untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message)); + } + + private static String generateSampleMessage() { + return UUID.randomUUID().toString(); + } +} diff --git a/spring-kafka/src/test/resources/client-certs/kafka.client.keystore.jks b/spring-kafka/src/test/resources/client-certs/kafka.client.keystore.jks new file mode 100644 index 0000000000..62ddfe199d Binary files /dev/null and b/spring-kafka/src/test/resources/client-certs/kafka.client.keystore.jks differ diff --git a/spring-kafka/src/test/resources/client-certs/kafka.client.truststore.jks b/spring-kafka/src/test/resources/client-certs/kafka.client.truststore.jks new file mode 100644 index 0000000000..2b07327b13 Binary files /dev/null and b/spring-kafka/src/test/resources/client-certs/kafka.client.truststore.jks differ diff --git a/spring-kafka/src/test/resources/docker/certs/kafka.server.keystore.jks b/spring-kafka/src/test/resources/docker/certs/kafka.server.keystore.jks new file mode 100644 index 0000000000..c038ad86b9 Binary files /dev/null and b/spring-kafka/src/test/resources/docker/certs/kafka.server.keystore.jks differ diff --git a/spring-kafka/src/test/resources/docker/certs/kafka.server.truststore.jks b/spring-kafka/src/test/resources/docker/certs/kafka.server.truststore.jks new file mode 100644 index 0000000000..1ec806b50d Binary files /dev/null and b/spring-kafka/src/test/resources/docker/certs/kafka.server.truststore.jks differ diff --git a/spring-kafka/src/test/resources/docker/certs/kafka_keystore_credentials b/spring-kafka/src/test/resources/docker/certs/kafka_keystore_credentials new file mode 100644 index 0000000000..7aa311adf9 --- /dev/null +++ b/spring-kafka/src/test/resources/docker/certs/kafka_keystore_credentials @@ -0,0 +1 @@ +password \ No newline at end of file diff --git a/spring-kafka/src/test/resources/docker/certs/kafka_sslkey_credentials b/spring-kafka/src/test/resources/docker/certs/kafka_sslkey_credentials new file mode 100644 index 0000000000..7aa311adf9 --- /dev/null +++ b/spring-kafka/src/test/resources/docker/certs/kafka_sslkey_credentials @@ -0,0 +1 @@ +password \ No newline at end of file diff --git a/spring-kafka/src/test/resources/docker/certs/kafka_truststore_credentials b/spring-kafka/src/test/resources/docker/certs/kafka_truststore_credentials new file mode 100644 index 0000000000..7aa311adf9 --- /dev/null +++ b/spring-kafka/src/test/resources/docker/certs/kafka_truststore_credentials @@ -0,0 +1 @@ +password \ No newline at end of file diff --git a/spring-kafka/src/test/resources/docker/docker-compose.yml b/spring-kafka/src/test/resources/docker/docker-compose.yml new file mode 100644 index 0000000000..d65dd58b19 --- /dev/null +++ b/spring-kafka/src/test/resources/docker/docker-compose.yml @@ -0,0 +1,30 @@ +--- +version: '2' +services: + zookeeper: + image: confluentinc/cp-zookeeper:6.2.0 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:6.2.0 + depends_on: + - zookeeper + ports: + - 9092:9092 + - 9093:9093 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SSL://localhost:9093 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_SSL_CLIENT_AUTH: "required" + KAFKA_SSL_KEYSTORE_FILENAME: '/certs/kafka.server.keystore.jks' + KAFKA_SSL_KEYSTORE_CREDENTIALS: '/certs/kafka_keystore_credentials' + KAFKA_SSL_KEY_CREDENTIALS: '/certs/kafka_sslkey_credentials' + KAFKA_SSL_TRUSTSTORE_FILENAME: '/certs/kafka.server.truststore.jks' + KAFKA_SSL_TRUSTSTORE_CREDENTIALS: '/certs/kafka_truststore_credentials' + volumes: + - ./certs/:/etc/kafka/secrets/certs diff --git a/spring-kafka/src/test/resources/logback.xml b/spring-kafka/src/test/resources/logback-test.xml similarity index 59% rename from spring-kafka/src/test/resources/logback.xml rename to spring-kafka/src/test/resources/logback-test.xml index 7d900d8ea8..74f126ebc1 100644 --- a/spring-kafka/src/test/resources/logback.xml +++ b/spring-kafka/src/test/resources/logback-test.xml @@ -10,4 +10,10 @@ + + + + + + \ No newline at end of file