[BAEL-5223] Add code for Spring Boot Streams article (#11568)

Co-authored-by: uzma khan <uzma.khan@nominet.uk>
This commit is contained in:
ukhan1980 2021-12-11 04:58:11 +00:00 committed by GitHub
parent 601904b210
commit c50e8d7425
8 changed files with 371 additions and 1 deletions

View File

@ -19,11 +19,20 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.streams.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
@ -58,7 +67,8 @@
</dependencies>
<properties>
<spring-kafka.version>2.7.2</spring-kafka.version>
<spring-kafka.version>2.7.8</spring-kafka.version>
<kafka.streams.version>2.7.1</kafka.streams.version>
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
</properties>

View File

@ -0,0 +1,55 @@
package com.baeldung.kafka.streams;
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${spring.kafka.streams.state.dir}")
private String stateStoreLocation;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, "streams-app");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// configure the state location to allow tests to use clean state for every run
props.put(STATE_DIR_CONFIG, stateStoreLocation);
return new KafkaStreamsConfiguration(props);
}
@Bean
NewTopic inputTopic() {
return TopicBuilder.name("input-topic")
.partitions(1)
.replicas(1)
.build();
}
}

View File

@ -0,0 +1,23 @@
package com.baeldung.kafka.streams;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@AllArgsConstructor
@Component
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("input-topic", message)
.addCallback(
result -> log.info("Message sent to topic: {}", message),
ex -> log.error("Failed to send message", ex)
);
}
}

View File

@ -0,0 +1,13 @@
package com.baeldung.kafka.streams;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}
}

View File

@ -0,0 +1,36 @@
package com.baeldung.kafka.streams;
import java.util.Arrays;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class WordCountProcessor {
private static final Serde<String> STRING_SERDE = Serdes.String();
@Autowired
void buildPipeline(StreamsBuilder streamsBuilder) {
KStream<String, String> messageStream = streamsBuilder
.stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));
KTable<String, Long> wordCounts = messageStream
.mapValues((ValueMapper<String, String>) String::toLowerCase)
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
.count(Materialized.as("counts"));
wordCounts.toStream().to("output-topic");
}
}

View File

@ -0,0 +1,36 @@
package com.baeldung.kafka.streams;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import lombok.AllArgsConstructor;
@RestController
@AllArgsConstructor
public class WordCountRestService {
private final StreamsBuilderFactoryBean factoryBean;
private final KafkaProducer kafkaProducer;
@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams
.store(StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore()));
return counts.get(word);
}
@PostMapping("/message")
public void addMessage(@RequestBody String message) {
kafkaProducer.sendMessage(message);
}
}

View File

@ -0,0 +1,143 @@
package com.baeldung.kafka.streams;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.*;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Testcontainers
@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class KafkaStreamsApplicationLiveTest {
private final BlockingQueue<String> output = new LinkedBlockingQueue<>();
@Container
private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@TempDir
private static File tempDir;
private KafkaMessageListenerContainer<Integer, String> consumer;
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate restTemplate;
@BeforeEach
public void setUp() {
output.clear();
createConsumer();
}
@Test
void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
postMessage("test message");
startOutputTopicConsumer();
// assert correct counts from output topic
assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");
// assert correct count from REST service
assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
postMessage("another test message");
// assert correct counts from output topic
assertThat(output.poll(2, MINUTES)).isEqualTo("another:1");
assertThat(output.poll(2, MINUTES)).isEqualTo("test:2");
assertThat(output.poll(2, MINUTES)).isEqualTo("message:2");
// assert correct count from REST service
assertThat(getCountFromRestServiceFor("another")).isEqualTo(1);
assertThat(getCountFromRestServiceFor("test")).isEqualTo(2);
assertThat(getCountFromRestServiceFor("message")).isEqualTo(2);
}
private void postMessage(String message) {
HttpEntity<String> request = new HttpEntity<>(message, new HttpHeaders());
restTemplate.postForEntity(createURLWithPort("/message"), request, null);
}
private int getCountFromRestServiceFor(String word) {
HttpEntity<String> entity = new HttpEntity<>(null, new HttpHeaders());
ResponseEntity<String> response = restTemplate.exchange(
createURLWithPort("/count/" + word),
HttpMethod.GET, entity, String.class
);
return Integer.parseInt(Objects.requireNonNull(response.getBody()));
}
private String createURLWithPort(String uri) {
return "http://localhost:" + port + uri;
}
private void createConsumer() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
// set up the consumer for the word count output
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProperties = new ContainerProperties("output-topic");
consumer = new KafkaMessageListenerContainer<>(cf, containerProperties);
consumer.setBeanName("templateTests");
consumer.setupMessageListener((MessageListener<String, Long>) record -> {
log.info("Record received: {}", record);
output.add(record.key() + ":" + record.value());
});
}
private void startOutputTopicConsumer() {
consumer.start();
}
@DynamicPropertySource
static void registerKafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
registry.add("spring.kafka.streams.state.dir", tempDir::getAbsolutePath);
}
}

View File

@ -0,0 +1,54 @@
package com.baeldung.kafka.streams;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Properties;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class WordCountProcessorUnitTest {
private WordCountProcessor wordCountProcessor;
@BeforeEach
void setUp() {
wordCountProcessor = new WordCountProcessor();
}
@Test
void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
wordCountProcessor.buildPipeline(streamsBuilder);
Topology topology = streamsBuilder.build();
try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
TestInputTopic<String, String> inputTopic = topologyTestDriver
.createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
TestOutputTopic<String, Long> outputTopic = topologyTestDriver
.createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());
inputTopic.pipeInput("key", "hello world");
inputTopic.pipeInput("key2", "hello");
assertThat(outputTopic.readKeyValuesToList())
.containsExactly(
KeyValue.pair("hello", 1L),
KeyValue.pair("world", 1L),
KeyValue.pair("hello", 2L)
);
}
}
}