diff --git a/libraries-data/pom.xml b/libraries-data/pom.xml index 90b1f6bb1d..aacc3317bc 100644 --- a/libraries-data/pom.xml +++ b/libraries-data/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 com.baeldung @@ -15,18 +16,18 @@ kryo ${kryo.version} - + com.h2database h2 ${h2.version} - + junit junit ${junit.version} test - + com.goldmansachs.reladomo reladomo ${reladomo.version} @@ -41,8 +42,31 @@ ormlite-jdbc ${ormlite.version} + + org.apache.kafka + kafka-streams + ${kafka.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + test + test + - + @@ -68,24 +92,24 @@ - + + classname="com.gs.fw.common.mithra.generator.MithraGenerator"/> + xml="${project.basedir}/src/main/resources/reladomo/ReladomoClassList.xml" + generateGscListMethod="true" + generatedDir="${project.build.directory}/generated-sources/reladomo" + nonGeneratedDir="${project.basedir}/src/main/java"/> - + classname="com.gs.fw.common.mithra.generator.dbgenerator.MithraDbDefinitionGenerator" + loaderRef="reladomoGenerator"> + + xml="${project.basedir}/src/main/resources/reladomo/ReladomoClassList.xml" + generatedDir="${project.build.directory}/generated-db/sql" + databaseType="postgres"/> @@ -139,10 +163,16 @@ - + - + + + + Apache Staging + https://repository.apache.org/content/groups/staging + + 4.0.1 1.4.196 @@ -150,5 +180,6 @@ 4.12 3.6.2 5.0 + 1.0.0 \ No newline at end of file diff --git a/libraries-data/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java b/libraries-data/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java new file mode 100644 index 0000000000..32568e9ea5 --- /dev/null +++ b/libraries-data/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java @@ -0,0 +1,62 @@ +package com.baeldung.kafkastreams; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.test.TestUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Properties; +import java.util.regex.Pattern; + +public class KafkaStreamsLiveTest { + private String bootstrapServers = "localhost:9092"; + + @Test + @Ignore("it needs to have kafka broker running on local") + public void shouldTestKafkaStreams() throws InterruptedException { + //given + String inputTopic = "inputTopic"; + + Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test"); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + // Use a temporary directory for storing state, which will be automatically removed after the test. + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); + + //when + KStreamBuilder builder = new KStreamBuilder(); + KStream textLines = builder.stream(inputTopic); + Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); + + KTable wordCounts = textLines + .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) + .groupBy((key, word) -> word) + .count(); + + wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); + + String outputTopic = "outputTopic"; + final Serde stringSerde = Serdes.String(); + final Serde longSerde = Serdes.Long(); + wordCounts.to(stringSerde, longSerde, outputTopic); + + KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + + //then + Thread.sleep(30000); + streams.close(); + } +}