diff --git a/libraries-data-3/src/main/java/com/baeldung/kafka/admin/KafkaTopicApplication.java b/apache-kafka/src/main/java/com/baeldung/kafka/admin/KafkaTopicApplication.java similarity index 77% rename from libraries-data-3/src/main/java/com/baeldung/kafka/admin/KafkaTopicApplication.java rename to apache-kafka/src/main/java/com/baeldung/kafka/admin/KafkaTopicApplication.java index 0d74e27d4e..25d621166d 100644 --- a/libraries-data-3/src/main/java/com/baeldung/kafka/admin/KafkaTopicApplication.java +++ b/apache-kafka/src/main/java/com/baeldung/kafka/admin/KafkaTopicApplication.java @@ -27,11 +27,11 @@ public class KafkaTopicApplication { short replicationFactor = 1; NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor); - CreateTopicsResult result = admin.createTopics( - Collections.singleton(newTopic)); + CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic)); // get the async result for the new topic creation - KafkaFuture future = result.values().get(topicName); + KafkaFuture future = result.values() + .get(topicName); // call get() to block until topic creation has completed or failed future.get(); @@ -47,15 +47,13 @@ public class KafkaTopicApplication { short replicationFactor = 1; NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor); - CreateTopicsOptions topicOptions = new CreateTopicsOptions() - .validateOnly(true) - .retryOnQuotaViolation(true); + CreateTopicsOptions topicOptions = new CreateTopicsOptions().validateOnly(true) + .retryOnQuotaViolation(true); - CreateTopicsResult result = admin.createTopics( - Collections.singleton(newTopic), topicOptions - ); + CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic), topicOptions); - KafkaFuture future = result.values().get(topicName); + KafkaFuture future = result.values() + .get(topicName); future.get(); } } @@ -72,14 +70,12 @@ public class KafkaTopicApplication { Map newTopicConfig = new HashMap<>(); newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"); - NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor) - .configs(newTopicConfig); + NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor).configs(newTopicConfig); - CreateTopicsResult result = admin.createTopics( - Collections.singleton(newTopic) - ); + CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic)); - KafkaFuture future = result.values().get(topicName); + KafkaFuture future = result.values() + .get(topicName); future.get(); } } diff --git a/libraries-data-3/src/test/java/com/baeldung/kafka/admin/KafkaTopicApplicationIntegrationTest.java b/apache-kafka/src/test/java/com/baeldung/kafka/admin/KafkaTopicApplicationIntegrationTest.java similarity index 100% rename from libraries-data-3/src/test/java/com/baeldung/kafka/admin/KafkaTopicApplicationIntegrationTest.java rename to apache-kafka/src/test/java/com/baeldung/kafka/admin/KafkaTopicApplicationIntegrationTest.java diff --git a/apache-kafka/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java b/apache-kafka/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java new file mode 100644 index 0000000000..0b66dd8fec --- /dev/null +++ b/apache-kafka/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java @@ -0,0 +1,77 @@ +package com.baeldung.kafkastreams; + +import java.util.Arrays; +import java.util.Properties; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; +import org.junit.Ignore; +import org.junit.Test; + +public class KafkaStreamsLiveTest { + private String bootstrapServers = "localhost:9092"; + + @Test + @Ignore("it needs to have kafka broker running on local") + public void shouldTestKafkaStreams() throws InterruptedException { + //given + String inputTopic = "inputTopic"; + + Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test"); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + // Use a temporary directory for storing state, which will be automatically removed after the test. + // streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); + + /* + * final StreamsBuilder builder = new StreamsBuilder(); + KStream textLines = builder.stream(wordCountTopic, + Consumed.with(Serdes.String(), Serdes.String())); + + KTable wordCounts = textLines + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT) + .split("\\W+"))) + .groupBy((key, word) -> word) + .count(Materialized.> as("counts-store")); + */ + //when + final StreamsBuilder builder = new StreamsBuilder(); + KStream textLines = builder.stream(inputTopic); + Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); + + KTable wordCounts = textLines + .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) + .groupBy((key, word) -> word) + .count(); + + wordCounts.toStream().foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); + + String outputTopic = "outputTopic"; + //final Serde stringSerde = Serdes.String(); + //final Serde longSerde = Serdes.Long(); + //wordCounts.toStream().to(stringSerde, longSerde, outputTopic); + + wordCounts.toStream().to("outputTopic", + Produced.with(Serdes.String(), Serdes.Long())); + + final Topology topology = builder.build(); + KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + streams.start(); + + //then + Thread.sleep(30000); + streams.close(); + } +} diff --git a/libraries-data-3/src/test/java/com/baeldung/kafka/streams/KafkaStreamsLiveTest.java b/libraries-data-3/src/test/java/com/baeldung/kafka/streams/KafkaStreamsLiveTest.java deleted file mode 100644 index 0d4c0606e3..0000000000 --- a/libraries-data-3/src/test/java/com/baeldung/kafka/streams/KafkaStreamsLiveTest.java +++ /dev/null @@ -1,279 +0,0 @@ -package com.baeldung.kafka.streams; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StoreQueryParameters; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Grouped; -import org.apache.kafka.streams.kstream.JoinWindows; -import org.apache.kafka.streams.kstream.KGroupedStream; -import org.apache.kafka.streams.kstream.KGroupedTable; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.WindowStore; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.utility.DockerImageName; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Locale; -import java.util.Properties; - -import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; - -public class KafkaStreamsLiveTest { - private final String LEFT_TOPIC = "left-stream-topic"; - private final String RIGHT_TOPIC = "right-stream-topic"; - private final String LEFT_RIGHT_TOPIC = "left-right-stream-topic"; - - private KafkaProducer producer = createKafkaProducer(); - private Properties streamsConfiguration = new Properties(); - - static final String TEXT_LINES_TOPIC = "TextLinesTopic"; - - private final String TEXT_EXAMPLE_1 = "test test and test"; - private final String TEXT_EXAMPLE_2 = "test filter filter this sentence"; - - @ClassRule - public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); - - @Before - public void setUp() { - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - } - - @Test - public void shouldTestKafkaTableLatestWord() throws InterruptedException { - String inputTopic = "topicTable"; - - final StreamsBuilder builder = new StreamsBuilder(); - - KTable textLinesTable = builder.table(inputTopic, - Consumed.with(Serdes.String(), Serdes.String())); - - textLinesTable.toStream().foreach((word, count) -> System.out.println("Latest word: " + word + " -> " + count)); - - final Topology topology = builder.build(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "latest-word-id"); - KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); - - streams.cleanUp(); - streams.start(); - producer.send(new ProducerRecord(inputTopic, "1", TEXT_EXAMPLE_1)); - producer.send(new ProducerRecord(inputTopic, "2", TEXT_EXAMPLE_2)); - - Thread.sleep(2000); - streams.close(); - } - - @Test - public void shouldTestWordCountKafkaStreams() throws InterruptedException { - String wordCountTopic = "wordCountTopic"; - - final StreamsBuilder builder = new StreamsBuilder(); - KStream textLines = builder.stream(wordCountTopic, - Consumed.with(Serdes.String(), Serdes.String())); - - KTable wordCounts = textLines - .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT) - .split("\\W+"))) - .groupBy((key, word) -> word) - .count(Materialized.> as("counts-store")); - - wordCounts.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count)); - - wordCounts.toStream().to("outputTopic", - Produced.with(Serdes.String(), Serdes.Long())); - - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-stream-table-id"); - final Topology topology = builder.build(); - KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); - - streams.cleanUp(); - streams.start(); - - producer.send(new ProducerRecord(wordCountTopic, "1", TEXT_EXAMPLE_1)); - producer.send(new ProducerRecord(wordCountTopic, "2", TEXT_EXAMPLE_2)); - - Thread.sleep(2000); - streams.close(); - } - - // Filter, map - @Test - public void shouldTestStatelessTransformations() throws InterruptedException { - String wordCountTopic = "wordCountTopic"; - - //when - final StreamsBuilder builder = new StreamsBuilder(); - KStream textLines = builder.stream(wordCountTopic, - Consumed.with(Serdes.String(), Serdes.String())); - - final KStream textLinesUpperCase = - textLines - .map((key, value) -> KeyValue.pair(value, value.toUpperCase())) - .filter((key, value) -> value.contains("FILTER")); - - KTable wordCounts = textLinesUpperCase - .flatMapValues(value -> Arrays.asList(value.split("\\W+"))) - .groupBy((key, word) -> word) - .count(Materialized.> as("counts-store")); - - wordCounts.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count)); - - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-filter-map-id"); - final Topology topology = builder.build(); - KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); - - streams.cleanUp(); - streams.start(); - - producer.send(new ProducerRecord(wordCountTopic, "1", TEXT_EXAMPLE_1)); - producer.send(new ProducerRecord(wordCountTopic, "2", TEXT_EXAMPLE_2)); - - Thread.sleep(2000); - streams.close(); - - } - - @Test - public void shouldTestAggregationStatefulTransformations() throws InterruptedException { - String aggregationTopic = "aggregationTopic"; - - final StreamsBuilder builder = new StreamsBuilder(); - final KStream input = builder.stream(aggregationTopic, - Consumed.with(Serdes.ByteArray(), Serdes.String())); - final KTable aggregated = input - .groupBy((key, value) -> (value != null && value.length() > 0) ? value.substring(0, 2).toLowerCase() : "", - Grouped.with(Serdes.String(), Serdes.String())) - .aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(), - Materialized.with(Serdes.String(), Serdes.Long())); - - aggregated.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count)); - - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregation-id"); - final Topology topology = builder.build(); - KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); - - streams.cleanUp(); - streams.start(); - - producer.send(new ProducerRecord(aggregationTopic, "1", "one")); - producer.send(new ProducerRecord(aggregationTopic, "2", "two")); - producer.send(new ProducerRecord(aggregationTopic, "3", "three")); - producer.send(new ProducerRecord(aggregationTopic, "4", "four")); - producer.send(new ProducerRecord(aggregationTopic, "5", "five")); - - Thread.sleep(5000); - streams.close(); - - } - - @Test - public void shouldTestWindowingJoinStatefulTransformations() throws InterruptedException { - final StreamsBuilder builder = new StreamsBuilder(); - - KStream leftSource = builder.stream(LEFT_TOPIC); - KStream rightSource = builder.stream(RIGHT_TOPIC); - - KStream leftRightSource = leftSource.outerJoin(rightSource, - (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, - JoinWindows.of(Duration.ofSeconds(5))) - .groupByKey() - .reduce(((key, lastValue) -> lastValue)) - .toStream(); - - leftRightSource.foreach((key, value) -> System.out.println("(key= " + key + ") -> (" + value + ")")); - - final Topology topology = builder.build(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowing-join-id"); - KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); - - streams.cleanUp(); - streams.start(); - - producer.send(new ProducerRecord(LEFT_TOPIC, "1", "left")); - producer.send(new ProducerRecord(RIGHT_TOPIC, "2", "right")); - - Thread.sleep(2000); - streams.close(); - } - - @Test - public void shouldTestWordCountWithInteractiveQueries() throws InterruptedException { - - final Serde stringSerde = Serdes.String(); - final StreamsBuilder builder = new StreamsBuilder(); - final KStream - textLines = builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); - - final KGroupedStream groupedByWord = textLines - .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) - .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde)); - - groupedByWord.count(Materialized.>as("WordCountsStore") - .withValueSerde(Serdes.Long())); - - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-interactive-queries"); - - final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); - streams.cleanUp(); - streams.start(); - - producer.send(new ProducerRecord(TEXT_LINES_TOPIC, "1", TEXT_EXAMPLE_1)); - producer.send(new ProducerRecord(TEXT_LINES_TOPIC, "2", TEXT_EXAMPLE_2)); - - Thread.sleep(2000); - ReadOnlyKeyValueStore keyValueStore = - streams.store(StoreQueryParameters.fromNameAndType( - "WordCountsStore", QueryableStoreTypes.keyValueStore())); - - KeyValueIterator range = keyValueStore.all(); - while (range.hasNext()) { - KeyValue next = range.next(); - System.out.println("Count for " + next.key + ": " + next.value); - } - - streams.close(); - } - - private static KafkaProducer createKafkaProducer() { - - Properties props = new Properties(); - props.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); - props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - - return new KafkaProducer(props); - - } -} - -