diff --git a/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java b/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java index 4406494d30..e61f4158a7 100644 --- a/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java +++ b/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java @@ -4,10 +4,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.TestUtils; import org.junit.Ignore; import org.junit.Test; @@ -36,20 +38,20 @@ public class KafkaStreamsLiveTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); // when - KStreamBuilder builder = new KStreamBuilder(); + StreamsBuilder builder = new StreamsBuilder(); KStream textLines = builder.stream(inputTopic); Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))).groupBy((key, word) -> word).count(); - wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); + textLines.foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); String outputTopic = "outputTopic"; final Serde stringSerde = Serdes.String(); - final Serde longSerde = Serdes.Long(); - wordCounts.to(stringSerde, longSerde, outputTopic); + final Serde longSerde = Serdes.String(); + textLines.to(outputTopic, Produced.with(stringSerde,longSerde)); - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + KafkaStreams streams = new KafkaStreams(new Topology(), streamsConfiguration); streams.start(); // then