BAEL-7012: Splitting Streams in Kafka (#15029)

* BAEL-7012: Splitting Streams in Kafka

* Move sources to the spring-kafka module

* Move sources to the spring-kafka-2 module
This commit is contained in:
Alexander Molochko 2023-11-08 18:05:37 -07:00 committed by GitHub
parent 7e81d666c9
commit 5b596e0c49
7 changed files with 182 additions and 0 deletions

View File

@ -65,6 +65,8 @@
<properties>
<testcontainers-kafka.version>1.16.2</testcontainers-kafka.version>
<kafka-version>3.0.12</kafka-version>
<kafka-streams-version>3.6.0</kafka-streams-version>
<testcontainers-junit-jupiter.version>1.16.2</testcontainers-junit-jupiter.version>
</properties>

View File

@ -0,0 +1,31 @@
package com.baeldung.spring.kafka.kafkasplitting;
public class IotSensorData {
private String sensorType;
private String value;
private String sensorId;
public String getSensorType() {
return sensorType;
}
public void setSensorType(String sensorType) {
this.sensorType = sensorType;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getSensorId() {
return sensorId;
}
public void setSensorId(String sensorId) {
this.sensorId = sensorId;
}
}

View File

@ -0,0 +1,43 @@
package com.baeldung.spring.kafka.kafkasplitting;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, IotSensorData> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(IotSensorData.class));
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, IotSensorData>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, IotSensorData> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View File

@ -0,0 +1,28 @@
package com.baeldung.spring.kafka.kafkasplitting;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaIotConsumerService {
@KafkaListener(topics = "iot_sensor_data")
public void consumeIotData(IotSensorData item) {
System.out.println("Consumed Message in original \"iot_sensor_data\" topic :" + item.getSensorType());
}
@KafkaListener(topics = "iot_sensor_data_temp")
public void consumeIotTemperatureData(IotSensorData item) {
System.out.println("Consumed Temparature data :" + item.getValue());
}
@KafkaListener(topics = "iot_sensor_data_hum")
public void consumeIotHumidityData(IotSensorData item) {
System.out.println("Consumed Humidity data :" + item.getValue());
}
@KafkaListener(topics = "iot_sensor_data_move")
public void consumeIotMovementData(IotSensorData item) {
System.out.println("Consumed Movement data :" + item.getValue());
}
}

View File

@ -0,0 +1,62 @@
package com.baeldung.spring.kafka.kafkasplitting;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.KafkaStreamBrancher;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
@EnableKafka
@EnableKafkaStreams
class KafkaStreamsConfig {
@Value("${kafka.topics.iot}")
private String iotTopicName;
@Bean
public Serde<IotSensorData> iotSerde() {
return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(IotSensorData.class));
}
@Bean
public KStream<String, IotSensorData> iotStream(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
stream.split()
.branch((key, value) -> value.getSensorType() != null,
Branched.withConsumer(ks -> ks.to((key, value, recordContext) -> String.format("%s_%s", iotTopicName, value.getSensorType()))))
.noDefaultBranch();
return stream;
}
@Bean
public KStream<String, IotSensorData> iotBrancher(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
new KafkaStreamBrancher<String, IotSensorData>()
.branch((key, value) -> "temp".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_temp"))
.branch((key, value) -> "move".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_move"))
.branch((key, value) -> "hum".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_hum"))
.defaultBranch(ks -> ks.to(String.format("%s_unknown", iotTopicName)))
.onTopOf(stream);
return stream;
}
@Bean
public KStream<String, IotSensorData> iotTopicExtractor(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
TopicNameExtractor<String, IotSensorData> sensorTopicExtractor = (key, value, recordContext) -> String.format("%s_%s", iotTopicName, value.getSensorType());
stream.to(sensorTopicExtractor);
return stream;
}
}

View File

@ -0,0 +1,12 @@
package com.baeldung.spring.kafka.kafkasplitting;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootKafkaStreamsSplittingApp {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaStreamsSplittingApp.class, args);
}
}

View File

@ -19,4 +19,8 @@ kafka.backoff.max_failure=5
# multiple listeners properties
multiple-listeners.books.topic.name=books
spring.kafka.streams.application-id=baeldung-streams
spring.kafka.consumer.group-id=baeldung-group
spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
kafka.topics.iot=iot_sensor_data