BAEL-1426 Updating an existing test case due to migration to Kafka2.0

This commit is contained in:
Chirag Dewan 2018-09-09 17:26:53 +05:30
parent a76e5fba25
commit 6f2d31b26a
1 changed files with 8 additions and 6 deletions

View File

@ -4,10 +4,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -36,20 +38,20 @@ public class KafkaStreamsLiveTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
// when // when
KStreamBuilder builder = new KStreamBuilder(); StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(inputTopic); KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))).groupBy((key, word) -> word).count(); KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))).groupBy((key, word) -> word).count();
wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); textLines.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
String outputTopic = "outputTopic"; String outputTopic = "outputTopic";
final Serde<String> stringSerde = Serdes.String(); final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long(); final Serde<String> longSerde = Serdes.String();
wordCounts.to(stringSerde, longSerde, outputTopic); textLines.to(outputTopic, Produced.with(stringSerde,longSerde));
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); KafkaStreams streams = new KafkaStreams(new Topology(), streamsConfiguration);
streams.start(); streams.start();
// then // then