From 2ef351434ebb83470c753ddc963fef2cf08c9f92 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Sun, 24 Dec 2017 06:33:37 +0100 Subject: [PATCH] Bael 1421 (#3222) * BAEL-1421 live test of kafka streams * BAEL-1421 Removed not-needed dependency * BAEL-1421 rearannge * BAEL-1421 rearannge * fix pom --- libraries/pom.xml | 48 +++++++++++--- .../kafkastreams/KafkaStreamsLiveTest.java | 62 +++++++++++++++++++ 2 files changed, 100 insertions(+), 10 deletions(-) create mode 100644 libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java diff --git a/libraries/pom.xml b/libraries/pom.xml index 3627e74472..8395d1b402 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -612,16 +612,16 @@ caffeine ${caffeine.version} - - org.bouncycastle - bcprov-jdk15on - 1.58 - - - org.bouncycastle - bcpkix-jdk15on - 1.58 - + + org.bouncycastle + bcprov-jdk15on + 1.58 + + + org.bouncycastle + bcpkix-jdk15on + 1.58 + com.google.http-client google-http-client @@ -654,6 +654,29 @@ google-api-services-sheets ${google-sheets.version} + + org.apache.kafka + kafka-streams + ${kafka.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + test + test + @@ -669,6 +692,10 @@ bintray http://dl.bintray.com/cuba-platform/main + + Apache Staging + https://repository.apache.org/content/groups/staging + 1.23.0 @@ -729,5 +756,6 @@ 2.5.5 1.23.0 v4-rev493-1.21.0 + 1.0.0 \ No newline at end of file diff --git a/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java b/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java new file mode 100644 index 0000000000..32568e9ea5 --- /dev/null +++ b/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java @@ -0,0 +1,62 @@ +package com.baeldung.kafkastreams; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.test.TestUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Properties; +import java.util.regex.Pattern; + +public class KafkaStreamsLiveTest { + private String bootstrapServers = "localhost:9092"; + + @Test + @Ignore("it needs to have kafka broker running on local") + public void shouldTestKafkaStreams() throws InterruptedException { + //given + String inputTopic = "inputTopic"; + + Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test"); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + // Use a temporary directory for storing state, which will be automatically removed after the test. + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); + + //when + KStreamBuilder builder = new KStreamBuilder(); + KStream textLines = builder.stream(inputTopic); + Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); + + KTable wordCounts = textLines + .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) + .groupBy((key, word) -> word) + .count(); + + wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); + + String outputTopic = "outputTopic"; + final Serde stringSerde = Serdes.String(); + final Serde longSerde = Serdes.Long(); + wordCounts.to(stringSerde, longSerde, outputTopic); + + KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + + //then + Thread.sleep(30000); + streams.close(); + } +}