diff --git a/libraries/pom.xml b/libraries/pom.xml
index 3627e74472..8395d1b402 100644
--- a/libraries/pom.xml
+++ b/libraries/pom.xml
@@ -612,16 +612,16 @@
caffeine
${caffeine.version}
-
- org.bouncycastle
- bcprov-jdk15on
- 1.58
-
-
- org.bouncycastle
- bcpkix-jdk15on
- 1.58
-
+
+ org.bouncycastle
+ bcprov-jdk15on
+ 1.58
+
+
+ org.bouncycastle
+ bcpkix-jdk15on
+ 1.58
+
com.google.http-client
google-http-client
@@ -654,6 +654,29 @@
google-api-services-sheets
${google-sheets.version}
+
+ org.apache.kafka
+ kafka-streams
+ ${kafka.version}
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+ test
+ test
+
@@ -669,6 +692,10 @@
bintray
http://dl.bintray.com/cuba-platform/main
+
+ Apache Staging
+ https://repository.apache.org/content/groups/staging
+
1.23.0
@@ -729,5 +756,6 @@
2.5.5
1.23.0
v4-rev493-1.21.0
+ 1.0.0
\ No newline at end of file
diff --git a/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java b/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java
new file mode 100644
index 0000000000..32568e9ea5
--- /dev/null
+++ b/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java
@@ -0,0 +1,62 @@
+package com.baeldung.kafkastreams;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+public class KafkaStreamsLiveTest {
+ private String bootstrapServers = "localhost:9092";
+
+ @Test
+ @Ignore("it needs to have kafka broker running on local")
+ public void shouldTestKafkaStreams() throws InterruptedException {
+ //given
+ String inputTopic = "inputTopic";
+
+ Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ // Use a temporary directory for storing state, which will be automatically removed after the test.
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+
+ //when
+ KStreamBuilder builder = new KStreamBuilder();
+ KStream textLines = builder.stream(inputTopic);
+ Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
+
+ KTable wordCounts = textLines
+ .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
+ .groupBy((key, word) -> word)
+ .count();
+
+ wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
+
+ String outputTopic = "outputTopic";
+ final Serde stringSerde = Serdes.String();
+ final Serde longSerde = Serdes.Long();
+ wordCounts.to(stringSerde, longSerde, outputTopic);
+
+ KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ streams.start();
+
+ //then
+ Thread.sleep(30000);
+ streams.close();
+ }
+}