diff --git a/spring-kafka/pom.xml b/spring-kafka/pom.xml
index ed3767029e..2801afffd7 100644
--- a/spring-kafka/pom.xml
+++ b/spring-kafka/pom.xml
@@ -19,11 +19,20 @@
org.springframework.boot
spring-boot-starter
+
+ org.springframework.boot
+ spring-boot-starter-web
+
org.springframework.kafka
spring-kafka
${spring-kafka.version}
+
+ org.apache.kafka
+ kafka-streams
+ ${kafka.streams.version}
+
com.fasterxml.jackson.core
jackson-databind
@@ -58,7 +67,8 @@
- 2.7.2
+ 2.7.8
+ 2.7.1
1.15.3
diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaConfig.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaConfig.java
new file mode 100644
index 0000000000..756f46e93e
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaConfig.java
@@ -0,0 +1,55 @@
+package com.baeldung.kafka.streams;
+
+import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.serialization.Serdes;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.annotation.EnableKafkaStreams;
+import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
+import org.springframework.kafka.config.KafkaStreamsConfiguration;
+import org.springframework.kafka.config.TopicBuilder;
+
+@Configuration
+@EnableKafka
+@EnableKafkaStreams
+public class KafkaConfig {
+
+ @Value(value = "${spring.kafka.bootstrap-servers}")
+ private String bootstrapAddress;
+
+ @Value(value = "${spring.kafka.streams.state.dir}")
+ private String stateStoreLocation;
+
+ @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
+ KafkaStreamsConfiguration kStreamsConfig() {
+ Map props = new HashMap<>();
+ props.put(APPLICATION_ID_CONFIG, "streams-app");
+ props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ // configure the state location to allow tests to use clean state for every run
+ props.put(STATE_DIR_CONFIG, stateStoreLocation);
+
+ return new KafkaStreamsConfiguration(props);
+ }
+
+ @Bean
+ NewTopic inputTopic() {
+ return TopicBuilder.name("input-topic")
+ .partitions(1)
+ .replicas(1)
+ .build();
+ }
+
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java
new file mode 100644
index 0000000000..2b8e9bbfbd
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java
@@ -0,0 +1,23 @@
+package com.baeldung.kafka.streams;
+
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AllArgsConstructor
+@Component
+public class KafkaProducer {
+
+ private final KafkaTemplate kafkaTemplate;
+
+ public void sendMessage(String message) {
+ kafkaTemplate.send("input-topic", message)
+ .addCallback(
+ result -> log.info("Message sent to topic: {}", message),
+ ex -> log.error("Failed to send message", ex)
+ );
+ }
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaStreamsApplication.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaStreamsApplication.java
new file mode 100644
index 0000000000..7ecc59ea69
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaStreamsApplication.java
@@ -0,0 +1,13 @@
+package com.baeldung.kafka.streams;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaStreamsApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaStreamsApplication.class, args);
+ }
+
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountProcessor.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountProcessor.java
new file mode 100644
index 0000000000..2a49b79b1e
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountProcessor.java
@@ -0,0 +1,36 @@
+package com.baeldung.kafka.streams;
+
+import java.util.Arrays;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WordCountProcessor {
+
+ private static final Serde STRING_SERDE = Serdes.String();
+
+ @Autowired
+ void buildPipeline(StreamsBuilder streamsBuilder) {
+ KStream messageStream = streamsBuilder
+ .stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));
+
+ KTable wordCounts = messageStream
+ .mapValues((ValueMapper) String::toLowerCase)
+ .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
+ .groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
+ .count(Materialized.as("counts"));
+
+ wordCounts.toStream().to("output-topic");
+ }
+
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountRestService.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountRestService.java
new file mode 100644
index 0000000000..df63b51749
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/WordCountRestService.java
@@ -0,0 +1,36 @@
+package com.baeldung.kafka.streams;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.kafka.config.StreamsBuilderFactoryBean;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import lombok.AllArgsConstructor;
+
+@RestController
+@AllArgsConstructor
+public class WordCountRestService {
+
+ private final StreamsBuilderFactoryBean factoryBean;
+
+ private final KafkaProducer kafkaProducer;
+
+ @GetMapping("/count/{word}")
+ public Long getWordCount(@PathVariable String word) {
+ KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
+ ReadOnlyKeyValueStore counts = kafkaStreams
+ .store(StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore()));
+ return counts.get(word);
+ }
+
+ @PostMapping("/message")
+ public void addMessage(@RequestBody String message) {
+ kafkaProducer.sendMessage(message);
+ }
+}
diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java
new file mode 100644
index 0000000000..85df8485d2
--- /dev/null
+++ b/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java
@@ -0,0 +1,143 @@
+package com.baeldung.kafka.streams;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.springframework.boot.test.context.SpringBootTest.*;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.KafkaMessageListenerContainer;
+import org.springframework.kafka.listener.MessageListener;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Testcontainers
+@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
+class KafkaStreamsApplicationLiveTest {
+
+ private final BlockingQueue output = new LinkedBlockingQueue<>();
+
+ @Container
+ private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
+
+ @TempDir
+ private static File tempDir;
+
+ private KafkaMessageListenerContainer consumer;
+
+ @LocalServerPort
+ private int port;
+
+ @Autowired
+ private TestRestTemplate restTemplate;
+
+ @BeforeEach
+ public void setUp() {
+ output.clear();
+ createConsumer();
+ }
+
+ @Test
+ void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
+ postMessage("test message");
+
+ startOutputTopicConsumer();
+
+ // assert correct counts from output topic
+ assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
+ assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");
+
+ // assert correct count from REST service
+ assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
+ assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
+
+ postMessage("another test message");
+
+ // assert correct counts from output topic
+ assertThat(output.poll(2, MINUTES)).isEqualTo("another:1");
+ assertThat(output.poll(2, MINUTES)).isEqualTo("test:2");
+ assertThat(output.poll(2, MINUTES)).isEqualTo("message:2");
+
+ // assert correct count from REST service
+ assertThat(getCountFromRestServiceFor("another")).isEqualTo(1);
+ assertThat(getCountFromRestServiceFor("test")).isEqualTo(2);
+ assertThat(getCountFromRestServiceFor("message")).isEqualTo(2);
+ }
+
+ private void postMessage(String message) {
+ HttpEntity request = new HttpEntity<>(message, new HttpHeaders());
+ restTemplate.postForEntity(createURLWithPort("/message"), request, null);
+ }
+
+ private int getCountFromRestServiceFor(String word) {
+ HttpEntity entity = new HttpEntity<>(null, new HttpHeaders());
+ ResponseEntity response = restTemplate.exchange(
+ createURLWithPort("/count/" + word),
+ HttpMethod.GET, entity, String.class
+ );
+ return Integer.parseInt(Objects.requireNonNull(response.getBody()));
+ }
+
+ private String createURLWithPort(String uri) {
+ return "http://localhost:" + port + uri;
+ }
+
+ private void createConsumer() {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+
+ // set up the consumer for the word count output
+ DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props);
+ ContainerProperties containerProperties = new ContainerProperties("output-topic");
+ consumer = new KafkaMessageListenerContainer<>(cf, containerProperties);
+ consumer.setBeanName("templateTests");
+
+ consumer.setupMessageListener((MessageListener) record -> {
+ log.info("Record received: {}", record);
+ output.add(record.key() + ":" + record.value());
+ });
+ }
+
+ private void startOutputTopicConsumer() {
+ consumer.start();
+ }
+
+ @DynamicPropertySource
+ static void registerKafkaProperties(DynamicPropertyRegistry registry) {
+ registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
+ registry.add("spring.kafka.streams.state.dir", tempDir::getAbsolutePath);
+ }
+
+}
diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/streams/WordCountProcessorUnitTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/streams/WordCountProcessorUnitTest.java
new file mode 100644
index 0000000000..216226a566
--- /dev/null
+++ b/spring-kafka/src/test/java/com/baeldung/kafka/streams/WordCountProcessorUnitTest.java
@@ -0,0 +1,54 @@
+package com.baeldung.kafka.streams;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Properties;
+
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class WordCountProcessorUnitTest {
+
+ private WordCountProcessor wordCountProcessor;
+
+ @BeforeEach
+ void setUp() {
+ wordCountProcessor = new WordCountProcessor();
+ }
+
+ @Test
+ void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
+ StreamsBuilder streamsBuilder = new StreamsBuilder();
+ wordCountProcessor.buildPipeline(streamsBuilder);
+ Topology topology = streamsBuilder.build();
+
+ try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
+
+ TestInputTopic inputTopic = topologyTestDriver
+ .createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
+
+ TestOutputTopic outputTopic = topologyTestDriver
+ .createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());
+
+ inputTopic.pipeInput("key", "hello world");
+ inputTopic.pipeInput("key2", "hello");
+
+ assertThat(outputTopic.readKeyValuesToList())
+ .containsExactly(
+ KeyValue.pair("hello", 1L),
+ KeyValue.pair("world", 1L),
+ KeyValue.pair("hello", 2L)
+ );
+ }
+ }
+
+}