From 5b596e0c49cea6914ba563d15d116303c40670c2 Mon Sep 17 00:00:00 2001 From: Alexander Molochko Date: Wed, 8 Nov 2023 18:05:37 -0700 Subject: [PATCH] BAEL-7012: Splitting Streams in Kafka (#15029) * BAEL-7012: Splitting Streams in Kafka * Move sources to the spring-kafka module * Move sources to the spring-kafka-2 module --- spring-kafka-2/pom.xml | 2 + .../kafka/kafkasplitting/IotSensorData.java | 31 ++++++++++ .../kafkasplitting/KafkaConsumerConfig.java | 43 +++++++++++++ .../KafkaIotConsumerService.java | 28 +++++++++ .../kafkasplitting/KafkaStreamsConfig.java | 62 +++++++++++++++++++ .../SpringBootKafkaStreamsSplittingApp.java | 12 ++++ .../src/main/resources/application.properties | 4 ++ 7 files changed, 182 insertions(+) create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/IotSensorData.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaConsumerConfig.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaIotConsumerService.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaStreamsConfig.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/SpringBootKafkaStreamsSplittingApp.java diff --git a/spring-kafka-2/pom.xml b/spring-kafka-2/pom.xml index 0bca20447d..fdf7da7438 100644 --- a/spring-kafka-2/pom.xml +++ b/spring-kafka-2/pom.xml @@ -65,6 +65,8 @@ 1.16.2 + 3.0.12 + 3.6.0 1.16.2 diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/IotSensorData.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/IotSensorData.java new file mode 100644 index 0000000000..350f6911f3 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/IotSensorData.java @@ -0,0 +1,31 @@ +package com.baeldung.spring.kafka.kafkasplitting; + +public class IotSensorData { + private String sensorType; + private String value; + private String sensorId; + + public String getSensorType() { + return sensorType; + } + + public void setSensorType(String sensorType) { + this.sensorType = sensorType; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getSensorId() { + return sensorId; + } + + public void setSensorId(String sensorId) { + this.sensorId = sensorId; + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaConsumerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaConsumerConfig.java new file mode 100644 index 0000000000..70640cb582 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaConsumerConfig.java @@ -0,0 +1,43 @@ +package com.baeldung.spring.kafka.kafkasplitting; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +class KafkaConsumerConfig { + + @Value("${spring.kafka.consumer.group-id}") + private String groupId; + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + + return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(IotSensorData.class)); + } + + @Bean + KafkaListenerContainerFactory> kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaIotConsumerService.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaIotConsumerService.java new file mode 100644 index 0000000000..bbd0c549ab --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaIotConsumerService.java @@ -0,0 +1,28 @@ +package com.baeldung.spring.kafka.kafkasplitting; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +@Service +public class KafkaIotConsumerService { + + @KafkaListener(topics = "iot_sensor_data") + public void consumeIotData(IotSensorData item) { + System.out.println("Consumed Message in original \"iot_sensor_data\" topic :" + item.getSensorType()); + } + + @KafkaListener(topics = "iot_sensor_data_temp") + public void consumeIotTemperatureData(IotSensorData item) { + System.out.println("Consumed Temparature data :" + item.getValue()); + } + + @KafkaListener(topics = "iot_sensor_data_hum") + public void consumeIotHumidityData(IotSensorData item) { + System.out.println("Consumed Humidity data :" + item.getValue()); + } + + @KafkaListener(topics = "iot_sensor_data_move") + public void consumeIotMovementData(IotSensorData item) { + System.out.println("Consumed Movement data :" + item.getValue()); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaStreamsConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaStreamsConfig.java new file mode 100644 index 0000000000..c4eb1501fd --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaStreamsConfig.java @@ -0,0 +1,62 @@ +package com.baeldung.spring.kafka.kafkasplitting; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Branched; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.TopicNameExtractor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.support.KafkaStreamBrancher; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +@Configuration +@EnableKafka +@EnableKafkaStreams +class KafkaStreamsConfig { + @Value("${kafka.topics.iot}") + private String iotTopicName; + + @Bean + public Serde iotSerde() { + return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(IotSensorData.class)); + } + + @Bean + public KStream iotStream(StreamsBuilder streamsBuilder) { + KStream stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde())); + stream.split() + .branch((key, value) -> value.getSensorType() != null, + Branched.withConsumer(ks -> ks.to((key, value, recordContext) -> String.format("%s_%s", iotTopicName, value.getSensorType())))) + .noDefaultBranch(); + return stream; + } + + @Bean + public KStream iotBrancher(StreamsBuilder streamsBuilder) { + KStream stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde())); + + new KafkaStreamBrancher() + .branch((key, value) -> "temp".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_temp")) + .branch((key, value) -> "move".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_move")) + .branch((key, value) -> "hum".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_hum")) + .defaultBranch(ks -> ks.to(String.format("%s_unknown", iotTopicName))) + .onTopOf(stream); + + return stream; + } + + @Bean + public KStream iotTopicExtractor(StreamsBuilder streamsBuilder) { + KStream stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde())); + TopicNameExtractor sensorTopicExtractor = (key, value, recordContext) -> String.format("%s_%s", iotTopicName, value.getSensorType()); + stream.to(sensorTopicExtractor); + return stream; + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/SpringBootKafkaStreamsSplittingApp.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/SpringBootKafkaStreamsSplittingApp.java new file mode 100644 index 0000000000..6101eeb13d --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/SpringBootKafkaStreamsSplittingApp.java @@ -0,0 +1,12 @@ +package com.baeldung.spring.kafka.kafkasplitting; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SpringBootKafkaStreamsSplittingApp { + public static void main(String[] args) { + SpringApplication.run(SpringBootKafkaStreamsSplittingApp.class, args); + } + +} diff --git a/spring-kafka-2/src/main/resources/application.properties b/spring-kafka-2/src/main/resources/application.properties index ed844cadf8..9111491b58 100644 --- a/spring-kafka-2/src/main/resources/application.properties +++ b/spring-kafka-2/src/main/resources/application.properties @@ -19,4 +19,8 @@ kafka.backoff.max_failure=5 # multiple listeners properties multiple-listeners.books.topic.name=books +spring.kafka.streams.application-id=baeldung-streams +spring.kafka.consumer.group-id=baeldung-group +spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde +kafka.topics.iot=iot_sensor_data