From c50e8d74259a21e9c7f966552e0f7a20dce75f75 Mon Sep 17 00:00:00 2001 From: ukhan1980 <82222767+ukhan1980@users.noreply.github.com> Date: Sat, 11 Dec 2021 04:58:11 +0000 Subject: [PATCH] [BAEL-5223] Add code for Spring Boot Streams article (#11568) Co-authored-by: uzma khan --- spring-kafka/pom.xml | 12 +- .../baeldung/kafka/streams/KafkaConfig.java | 55 +++++++ .../baeldung/kafka/streams/KafkaProducer.java | 23 +++ .../streams/KafkaStreamsApplication.java | 13 ++ .../kafka/streams/WordCountProcessor.java | 36 +++++ .../kafka/streams/WordCountRestService.java | 36 +++++ .../KafkaStreamsApplicationLiveTest.java | 143 ++++++++++++++++++ .../streams/WordCountProcessorUnitTest.java | 54 +++++++ 8 files changed, 371 insertions(+), 1 deletion(-) create mode 100644 spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaConfig.java create mode 100644 spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java create mode 100644 spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaStreamsApplication.java create mode 100644 spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountProcessor.java create mode 100644 spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountRestService.java create mode 100644 spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java create mode 100644 spring-kafka/src/test/java/com/baeldung/kafka/streams/WordCountProcessorUnitTest.java diff --git a/spring-kafka/pom.xml b/spring-kafka/pom.xml index ed3767029e..2801afffd7 100644 --- a/spring-kafka/pom.xml +++ b/spring-kafka/pom.xml @@ -19,11 +19,20 @@ org.springframework.boot spring-boot-starter + + org.springframework.boot + spring-boot-starter-web + org.springframework.kafka spring-kafka ${spring-kafka.version} + + org.apache.kafka + kafka-streams + ${kafka.streams.version} + com.fasterxml.jackson.core jackson-databind @@ -58,7 +67,8 @@ - 2.7.2 + 2.7.8 + 2.7.1 1.15.3 diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaConfig.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaConfig.java new file mode 100644 index 0000000000..756f46e93e --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaConfig.java @@ -0,0 +1,55 @@ +package com.baeldung.kafka.streams; + +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.serialization.Serdes; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.config.KafkaStreamsConfiguration; +import org.springframework.kafka.config.TopicBuilder; + +@Configuration +@EnableKafka +@EnableKafkaStreams +public class KafkaConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapAddress; + + @Value(value = "${spring.kafka.streams.state.dir}") + private String stateStoreLocation; + + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + KafkaStreamsConfiguration kStreamsConfig() { + Map props = new HashMap<>(); + props.put(APPLICATION_ID_CONFIG, "streams-app"); + props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + // configure the state location to allow tests to use clean state for every run + props.put(STATE_DIR_CONFIG, stateStoreLocation); + + return new KafkaStreamsConfiguration(props); + } + + @Bean + NewTopic inputTopic() { + return TopicBuilder.name("input-topic") + .partitions(1) + .replicas(1) + .build(); + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java new file mode 100644 index 0000000000..2b8e9bbfbd --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java @@ -0,0 +1,23 @@ +package com.baeldung.kafka.streams; + +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@AllArgsConstructor +@Component +public class KafkaProducer { + + private final KafkaTemplate kafkaTemplate; + + public void sendMessage(String message) { + kafkaTemplate.send("input-topic", message) + .addCallback( + result -> log.info("Message sent to topic: {}", message), + ex -> log.error("Failed to send message", ex) + ); + } +} diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaStreamsApplication.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaStreamsApplication.java new file mode 100644 index 0000000000..7ecc59ea69 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaStreamsApplication.java @@ -0,0 +1,13 @@ +package com.baeldung.kafka.streams; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaStreamsApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaStreamsApplication.class, args); + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountProcessor.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountProcessor.java new file mode 100644 index 0000000000..2a49b79b1e --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountProcessor.java @@ -0,0 +1,36 @@ +package com.baeldung.kafka.streams; + +import java.util.Arrays; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class WordCountProcessor { + + private static final Serde STRING_SERDE = Serdes.String(); + + @Autowired + void buildPipeline(StreamsBuilder streamsBuilder) { + KStream messageStream = streamsBuilder + .stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE)); + + KTable wordCounts = messageStream + .mapValues((ValueMapper) String::toLowerCase) + .flatMapValues(value -> Arrays.asList(value.split("\\W+"))) + .groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE)) + .count(Materialized.as("counts")); + + wordCounts.toStream().to("output-topic"); + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountRestService.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountRestService.java new file mode 100644 index 0000000000..df63b51749 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountRestService.java @@ -0,0 +1,36 @@ +package com.baeldung.kafka.streams; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import lombok.AllArgsConstructor; + +@RestController +@AllArgsConstructor +public class WordCountRestService { + + private final StreamsBuilderFactoryBean factoryBean; + + private final KafkaProducer kafkaProducer; + + @GetMapping("/count/{word}") + public Long getWordCount(@PathVariable String word) { + KafkaStreams kafkaStreams = factoryBean.getKafkaStreams(); + ReadOnlyKeyValueStore counts = kafkaStreams + .store(StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())); + return counts.get(word); + } + + @PostMapping("/message") + public void addMessage(@RequestBody String message) { + kafkaProducer.sendMessage(message); + } +} diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java new file mode 100644 index 0000000000..85df8485d2 --- /dev/null +++ b/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java @@ -0,0 +1,143 @@ +package com.baeldung.kafka.streams; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.boot.test.context.SpringBootTest.*; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.kafka.listener.MessageListener; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Testcontainers +@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) +class KafkaStreamsApplicationLiveTest { + + private final BlockingQueue output = new LinkedBlockingQueue<>(); + + @Container + private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); + + @TempDir + private static File tempDir; + + private KafkaMessageListenerContainer consumer; + + @LocalServerPort + private int port; + + @Autowired + private TestRestTemplate restTemplate; + + @BeforeEach + public void setUp() { + output.clear(); + createConsumer(); + } + + @Test + void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception { + postMessage("test message"); + + startOutputTopicConsumer(); + + // assert correct counts from output topic + assertThat(output.poll(2, MINUTES)).isEqualTo("test:1"); + assertThat(output.poll(2, MINUTES)).isEqualTo("message:1"); + + // assert correct count from REST service + assertThat(getCountFromRestServiceFor("test")).isEqualTo(1); + assertThat(getCountFromRestServiceFor("message")).isEqualTo(1); + + postMessage("another test message"); + + // assert correct counts from output topic + assertThat(output.poll(2, MINUTES)).isEqualTo("another:1"); + assertThat(output.poll(2, MINUTES)).isEqualTo("test:2"); + assertThat(output.poll(2, MINUTES)).isEqualTo("message:2"); + + // assert correct count from REST service + assertThat(getCountFromRestServiceFor("another")).isEqualTo(1); + assertThat(getCountFromRestServiceFor("test")).isEqualTo(2); + assertThat(getCountFromRestServiceFor("message")).isEqualTo(2); + } + + private void postMessage(String message) { + HttpEntity request = new HttpEntity<>(message, new HttpHeaders()); + restTemplate.postForEntity(createURLWithPort("/message"), request, null); + } + + private int getCountFromRestServiceFor(String word) { + HttpEntity entity = new HttpEntity<>(null, new HttpHeaders()); + ResponseEntity response = restTemplate.exchange( + createURLWithPort("/count/" + word), + HttpMethod.GET, entity, String.class + ); + return Integer.parseInt(Objects.requireNonNull(response.getBody())); + } + + private String createURLWithPort(String uri) { + return "http://localhost:" + port + uri; + } + + private void createConsumer() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + + // set up the consumer for the word count output + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + ContainerProperties containerProperties = new ContainerProperties("output-topic"); + consumer = new KafkaMessageListenerContainer<>(cf, containerProperties); + consumer.setBeanName("templateTests"); + + consumer.setupMessageListener((MessageListener) record -> { + log.info("Record received: {}", record); + output.add(record.key() + ":" + record.value()); + }); + } + + private void startOutputTopicConsumer() { + consumer.start(); + } + + @DynamicPropertySource + static void registerKafkaProperties(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); + registry.add("spring.kafka.streams.state.dir", tempDir::getAbsolutePath); + } + +} diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/streams/WordCountProcessorUnitTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/streams/WordCountProcessorUnitTest.java new file mode 100644 index 0000000000..216226a566 --- /dev/null +++ b/spring-kafka/src/test/java/com/baeldung/kafka/streams/WordCountProcessorUnitTest.java @@ -0,0 +1,54 @@ +package com.baeldung.kafka.streams; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; + +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class WordCountProcessorUnitTest { + + private WordCountProcessor wordCountProcessor; + + @BeforeEach + void setUp() { + wordCountProcessor = new WordCountProcessor(); + } + + @Test + void givenInputMessages_whenProcessed_thenWordCountIsProduced() { + StreamsBuilder streamsBuilder = new StreamsBuilder(); + wordCountProcessor.buildPipeline(streamsBuilder); + Topology topology = streamsBuilder.build(); + + try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) { + + TestInputTopic inputTopic = topologyTestDriver + .createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); + + TestOutputTopic outputTopic = topologyTestDriver + .createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer()); + + inputTopic.pipeInput("key", "hello world"); + inputTopic.pipeInput("key2", "hello"); + + assertThat(outputTopic.readKeyValuesToList()) + .containsExactly( + KeyValue.pair("hello", 1L), + KeyValue.pair("world", 1L), + KeyValue.pair("hello", 2L) + ); + } + } + +}