diff --git a/spring-kafka-2/pom.xml b/spring-kafka-2/pom.xml
index 0bca20447d..fdf7da7438 100644
--- a/spring-kafka-2/pom.xml
+++ b/spring-kafka-2/pom.xml
@@ -65,6 +65,8 @@
1.16.2
+ 3.0.12
+ 3.6.0
1.16.2
diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/IotSensorData.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/IotSensorData.java
new file mode 100644
index 0000000000..350f6911f3
--- /dev/null
+++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/IotSensorData.java
@@ -0,0 +1,31 @@
+package com.baeldung.spring.kafka.kafkasplitting;
+
+public class IotSensorData {
+ private String sensorType;
+ private String value;
+ private String sensorId;
+
+ public String getSensorType() {
+ return sensorType;
+ }
+
+ public void setSensorType(String sensorType) {
+ this.sensorType = sensorType;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public String getSensorId() {
+ return sensorId;
+ }
+
+ public void setSensorId(String sensorId) {
+ this.sensorId = sensorId;
+ }
+}
diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaConsumerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaConsumerConfig.java
new file mode 100644
index 0000000000..70640cb582
--- /dev/null
+++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaConsumerConfig.java
@@ -0,0 +1,43 @@
+package com.baeldung.spring.kafka.kafkasplitting;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+class KafkaConsumerConfig {
+
+ @Value("${spring.kafka.consumer.group-id}")
+ private String groupId;
+ @Value("${spring.kafka.bootstrap-servers}")
+ private String bootstrapServers;
+
+ @Bean
+ public ConsumerFactory consumerFactory() {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+
+ return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(IotSensorData.class));
+ }
+
+ @Bean
+ KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ return factory;
+ }
+}
\ No newline at end of file
diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaIotConsumerService.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaIotConsumerService.java
new file mode 100644
index 0000000000..bbd0c549ab
--- /dev/null
+++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaIotConsumerService.java
@@ -0,0 +1,28 @@
+package com.baeldung.spring.kafka.kafkasplitting;
+
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+
+@Service
+public class KafkaIotConsumerService {
+
+ @KafkaListener(topics = "iot_sensor_data")
+ public void consumeIotData(IotSensorData item) {
+ System.out.println("Consumed Message in original \"iot_sensor_data\" topic :" + item.getSensorType());
+ }
+
+ @KafkaListener(topics = "iot_sensor_data_temp")
+ public void consumeIotTemperatureData(IotSensorData item) {
+ System.out.println("Consumed Temparature data :" + item.getValue());
+ }
+
+ @KafkaListener(topics = "iot_sensor_data_hum")
+ public void consumeIotHumidityData(IotSensorData item) {
+ System.out.println("Consumed Humidity data :" + item.getValue());
+ }
+
+ @KafkaListener(topics = "iot_sensor_data_move")
+ public void consumeIotMovementData(IotSensorData item) {
+ System.out.println("Consumed Movement data :" + item.getValue());
+ }
+}
\ No newline at end of file
diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaStreamsConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaStreamsConfig.java
new file mode 100644
index 0000000000..c4eb1501fd
--- /dev/null
+++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/KafkaStreamsConfig.java
@@ -0,0 +1,62 @@
+package com.baeldung.spring.kafka.kafkasplitting;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.Branched;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.annotation.EnableKafkaStreams;
+import org.springframework.kafka.support.KafkaStreamBrancher;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+@Configuration
+@EnableKafka
+@EnableKafkaStreams
+class KafkaStreamsConfig {
+ @Value("${kafka.topics.iot}")
+ private String iotTopicName;
+
+ @Bean
+ public Serde iotSerde() {
+ return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(IotSensorData.class));
+ }
+
+ @Bean
+ public KStream iotStream(StreamsBuilder streamsBuilder) {
+ KStream stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
+ stream.split()
+ .branch((key, value) -> value.getSensorType() != null,
+ Branched.withConsumer(ks -> ks.to((key, value, recordContext) -> String.format("%s_%s", iotTopicName, value.getSensorType()))))
+ .noDefaultBranch();
+ return stream;
+ }
+
+ @Bean
+ public KStream iotBrancher(StreamsBuilder streamsBuilder) {
+ KStream stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
+
+ new KafkaStreamBrancher()
+ .branch((key, value) -> "temp".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_temp"))
+ .branch((key, value) -> "move".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_move"))
+ .branch((key, value) -> "hum".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_hum"))
+ .defaultBranch(ks -> ks.to(String.format("%s_unknown", iotTopicName)))
+ .onTopOf(stream);
+
+ return stream;
+ }
+
+ @Bean
+ public KStream iotTopicExtractor(StreamsBuilder streamsBuilder) {
+ KStream stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
+ TopicNameExtractor sensorTopicExtractor = (key, value, recordContext) -> String.format("%s_%s", iotTopicName, value.getSensorType());
+ stream.to(sensorTopicExtractor);
+ return stream;
+ }
+}
diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/SpringBootKafkaStreamsSplittingApp.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/SpringBootKafkaStreamsSplittingApp.java
new file mode 100644
index 0000000000..6101eeb13d
--- /dev/null
+++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/kafkasplitting/SpringBootKafkaStreamsSplittingApp.java
@@ -0,0 +1,12 @@
+package com.baeldung.spring.kafka.kafkasplitting;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class SpringBootKafkaStreamsSplittingApp {
+ public static void main(String[] args) {
+ SpringApplication.run(SpringBootKafkaStreamsSplittingApp.class, args);
+ }
+
+}
diff --git a/spring-kafka-2/src/main/resources/application.properties b/spring-kafka-2/src/main/resources/application.properties
index ed844cadf8..9111491b58 100644
--- a/spring-kafka-2/src/main/resources/application.properties
+++ b/spring-kafka-2/src/main/resources/application.properties
@@ -19,4 +19,8 @@ kafka.backoff.max_failure=5
# multiple listeners properties
multiple-listeners.books.topic.name=books
+spring.kafka.streams.application-id=baeldung-streams
+spring.kafka.consumer.group-id=baeldung-group
+spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
+kafka.topics.iot=iot_sensor_data