diff --git a/libraries-data-3/README.md b/libraries-data-3/README.md new file mode 100644 index 0000000000..fffdf65252 --- /dev/null +++ b/libraries-data-3/README.md @@ -0,0 +1,10 @@ +## Data Libraries + +This module contains articles about libraries for data processing in Java. + +### Relevant articles +- [Kafka Streams vs Kafka Consumer]() +- More articles: [[<-- prev]](/../libraries-data-2) + +##### Building the project +You can build the project from the command line using: *mvn clean install*, or in an IDE. If you have issues with the derive4j imports in your IDE, you have to add the folder: *target/generated-sources/annotations* to the project build path in your IDE. diff --git a/libraries-data-3/log4j.properties b/libraries-data-3/log4j.properties new file mode 100644 index 0000000000..2173c5d96f --- /dev/null +++ b/libraries-data-3/log4j.properties @@ -0,0 +1 @@ +log4j.rootLogger=INFO, stdout diff --git a/libraries-data-3/pom.xml b/libraries-data-3/pom.xml new file mode 100644 index 0000000000..bfc39e537e --- /dev/null +++ b/libraries-data-3/pom.xml @@ -0,0 +1,59 @@ + + + 4.0.0 + libraries-data-3 + libraries-data-3 + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + test + test + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + + + org.assertj + assertj-core + ${assertj.version} + test + + + org.testcontainers + kafka + ${testcontainers-kafka.version} + test + + + + + 3.6.2 + 1.7.25 + 2.8.0 + 1.15.3 + + + \ No newline at end of file diff --git a/libraries-data-3/src/test/java/com/baeldung/kafka/streams/KafkaStreamsLiveTest.java b/libraries-data-3/src/test/java/com/baeldung/kafka/streams/KafkaStreamsLiveTest.java new file mode 100644 index 0000000000..0d4c0606e3 --- /dev/null +++ b/libraries-data-3/src/test/java/com/baeldung/kafka/streams/KafkaStreamsLiveTest.java @@ -0,0 +1,279 @@ +package com.baeldung.kafka.streams; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KGroupedTable; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowStore; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Locale; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + +public class KafkaStreamsLiveTest { + private final String LEFT_TOPIC = "left-stream-topic"; + private final String RIGHT_TOPIC = "right-stream-topic"; + private final String LEFT_RIGHT_TOPIC = "left-right-stream-topic"; + + private KafkaProducer producer = createKafkaProducer(); + private Properties streamsConfiguration = new Properties(); + + static final String TEXT_LINES_TOPIC = "TextLinesTopic"; + + private final String TEXT_EXAMPLE_1 = "test test and test"; + private final String TEXT_EXAMPLE_2 = "test filter filter this sentence"; + + @ClassRule + public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); + + @Before + public void setUp() { + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } + + @Test + public void shouldTestKafkaTableLatestWord() throws InterruptedException { + String inputTopic = "topicTable"; + + final StreamsBuilder builder = new StreamsBuilder(); + + KTable textLinesTable = builder.table(inputTopic, + Consumed.with(Serdes.String(), Serdes.String())); + + textLinesTable.toStream().foreach((word, count) -> System.out.println("Latest word: " + word + " -> " + count)); + + final Topology topology = builder.build(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "latest-word-id"); + KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + + streams.cleanUp(); + streams.start(); + producer.send(new ProducerRecord(inputTopic, "1", TEXT_EXAMPLE_1)); + producer.send(new ProducerRecord(inputTopic, "2", TEXT_EXAMPLE_2)); + + Thread.sleep(2000); + streams.close(); + } + + @Test + public void shouldTestWordCountKafkaStreams() throws InterruptedException { + String wordCountTopic = "wordCountTopic"; + + final StreamsBuilder builder = new StreamsBuilder(); + KStream textLines = builder.stream(wordCountTopic, + Consumed.with(Serdes.String(), Serdes.String())); + + KTable wordCounts = textLines + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT) + .split("\\W+"))) + .groupBy((key, word) -> word) + .count(Materialized.> as("counts-store")); + + wordCounts.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count)); + + wordCounts.toStream().to("outputTopic", + Produced.with(Serdes.String(), Serdes.Long())); + + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-stream-table-id"); + final Topology topology = builder.build(); + KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + + streams.cleanUp(); + streams.start(); + + producer.send(new ProducerRecord(wordCountTopic, "1", TEXT_EXAMPLE_1)); + producer.send(new ProducerRecord(wordCountTopic, "2", TEXT_EXAMPLE_2)); + + Thread.sleep(2000); + streams.close(); + } + + // Filter, map + @Test + public void shouldTestStatelessTransformations() throws InterruptedException { + String wordCountTopic = "wordCountTopic"; + + //when + final StreamsBuilder builder = new StreamsBuilder(); + KStream textLines = builder.stream(wordCountTopic, + Consumed.with(Serdes.String(), Serdes.String())); + + final KStream textLinesUpperCase = + textLines + .map((key, value) -> KeyValue.pair(value, value.toUpperCase())) + .filter((key, value) -> value.contains("FILTER")); + + KTable wordCounts = textLinesUpperCase + .flatMapValues(value -> Arrays.asList(value.split("\\W+"))) + .groupBy((key, word) -> word) + .count(Materialized.> as("counts-store")); + + wordCounts.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count)); + + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-filter-map-id"); + final Topology topology = builder.build(); + KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + + streams.cleanUp(); + streams.start(); + + producer.send(new ProducerRecord(wordCountTopic, "1", TEXT_EXAMPLE_1)); + producer.send(new ProducerRecord(wordCountTopic, "2", TEXT_EXAMPLE_2)); + + Thread.sleep(2000); + streams.close(); + + } + + @Test + public void shouldTestAggregationStatefulTransformations() throws InterruptedException { + String aggregationTopic = "aggregationTopic"; + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream input = builder.stream(aggregationTopic, + Consumed.with(Serdes.ByteArray(), Serdes.String())); + final KTable aggregated = input + .groupBy((key, value) -> (value != null && value.length() > 0) ? value.substring(0, 2).toLowerCase() : "", + Grouped.with(Serdes.String(), Serdes.String())) + .aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(), + Materialized.with(Serdes.String(), Serdes.Long())); + + aggregated.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count)); + + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregation-id"); + final Topology topology = builder.build(); + KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + + streams.cleanUp(); + streams.start(); + + producer.send(new ProducerRecord(aggregationTopic, "1", "one")); + producer.send(new ProducerRecord(aggregationTopic, "2", "two")); + producer.send(new ProducerRecord(aggregationTopic, "3", "three")); + producer.send(new ProducerRecord(aggregationTopic, "4", "four")); + producer.send(new ProducerRecord(aggregationTopic, "5", "five")); + + Thread.sleep(5000); + streams.close(); + + } + + @Test + public void shouldTestWindowingJoinStatefulTransformations() throws InterruptedException { + final StreamsBuilder builder = new StreamsBuilder(); + + KStream leftSource = builder.stream(LEFT_TOPIC); + KStream rightSource = builder.stream(RIGHT_TOPIC); + + KStream leftRightSource = leftSource.outerJoin(rightSource, + (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, + JoinWindows.of(Duration.ofSeconds(5))) + .groupByKey() + .reduce(((key, lastValue) -> lastValue)) + .toStream(); + + leftRightSource.foreach((key, value) -> System.out.println("(key= " + key + ") -> (" + value + ")")); + + final Topology topology = builder.build(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowing-join-id"); + KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + + streams.cleanUp(); + streams.start(); + + producer.send(new ProducerRecord(LEFT_TOPIC, "1", "left")); + producer.send(new ProducerRecord(RIGHT_TOPIC, "2", "right")); + + Thread.sleep(2000); + streams.close(); + } + + @Test + public void shouldTestWordCountWithInteractiveQueries() throws InterruptedException { + + final Serde stringSerde = Serdes.String(); + final StreamsBuilder builder = new StreamsBuilder(); + final KStream + textLines = builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); + + final KGroupedStream groupedByWord = textLines + .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde)); + + groupedByWord.count(Materialized.>as("WordCountsStore") + .withValueSerde(Serdes.Long())); + + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-interactive-queries"); + + final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); + streams.cleanUp(); + streams.start(); + + producer.send(new ProducerRecord(TEXT_LINES_TOPIC, "1", TEXT_EXAMPLE_1)); + producer.send(new ProducerRecord(TEXT_LINES_TOPIC, "2", TEXT_EXAMPLE_2)); + + Thread.sleep(2000); + ReadOnlyKeyValueStore keyValueStore = + streams.store(StoreQueryParameters.fromNameAndType( + "WordCountsStore", QueryableStoreTypes.keyValueStore())); + + KeyValueIterator range = keyValueStore.all(); + while (range.hasNext()) { + KeyValue next = range.next(); + System.out.println("Count for " + next.key + ": " + next.value); + } + + streams.close(); + } + + private static KafkaProducer createKafkaProducer() { + + Properties props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + + return new KafkaProducer(props); + + } +} + +