Bael 1421 (#3222)
* BAEL-1421 live test of kafka streams * BAEL-1421 Removed not-needed dependency * BAEL-1421 rearannge * BAEL-1421 rearannge * fix pom
This commit is contained in:
parent
85f12cd254
commit
2ef351434e
@ -612,16 +612,16 @@
|
||||
<artifactId>caffeine</artifactId>
|
||||
<version>${caffeine.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.bouncycastle</groupId>
|
||||
<artifactId>bcprov-jdk15on</artifactId>
|
||||
<version>1.58</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.bouncycastle</groupId>
|
||||
<artifactId>bcpkix-jdk15on</artifactId>
|
||||
<version>1.58</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.bouncycastle</groupId>
|
||||
<artifactId>bcprov-jdk15on</artifactId>
|
||||
<version>1.58</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.bouncycastle</groupId>
|
||||
<artifactId>bcpkix-jdk15on</artifactId>
|
||||
<version>1.58</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.http-client</groupId>
|
||||
<artifactId>google-http-client</artifactId>
|
||||
@ -654,6 +654,29 @@
|
||||
<artifactId>google-api-services-sheets</artifactId>
|
||||
<version>${google-sheets.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-streams</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<classifier>test</classifier>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<repositories>
|
||||
<repository>
|
||||
@ -669,6 +692,10 @@
|
||||
<name>bintray</name>
|
||||
<url>http://dl.bintray.com/cuba-platform/main</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>Apache Staging</id>
|
||||
<url>https://repository.apache.org/content/groups/staging</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
<properties>
|
||||
<googleclient.version>1.23.0</googleclient.version>
|
||||
@ -729,5 +756,6 @@
|
||||
<caffeine.version>2.5.5</caffeine.version>
|
||||
<google-api.version>1.23.0</google-api.version>
|
||||
<google-sheets.version>v4-rev493-1.21.0</google-sheets.version>
|
||||
<kafka.version>1.0.0</kafka.version>
|
||||
</properties>
|
||||
</project>
|
@ -0,0 +1,62 @@
|
||||
package com.baeldung.kafkastreams;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.KStreamBuilder;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class KafkaStreamsLiveTest {
|
||||
private String bootstrapServers = "localhost:9092";
|
||||
|
||||
@Test
|
||||
@Ignore("it needs to have kafka broker running on local")
|
||||
public void shouldTestKafkaStreams() throws InterruptedException {
|
||||
//given
|
||||
String inputTopic = "inputTopic";
|
||||
|
||||
Properties streamsConfiguration = new Properties();
|
||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
|
||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
// Use a temporary directory for storing state, which will be automatically removed after the test.
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
|
||||
|
||||
//when
|
||||
KStreamBuilder builder = new KStreamBuilder();
|
||||
KStream<String, String> textLines = builder.stream(inputTopic);
|
||||
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
|
||||
|
||||
KTable<String, Long> wordCounts = textLines
|
||||
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
|
||||
.groupBy((key, word) -> word)
|
||||
.count();
|
||||
|
||||
wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
|
||||
|
||||
String outputTopic = "outputTopic";
|
||||
final Serde<String> stringSerde = Serdes.String();
|
||||
final Serde<Long> longSerde = Serdes.Long();
|
||||
wordCounts.to(stringSerde, longSerde, outputTopic);
|
||||
|
||||
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
|
||||
streams.start();
|
||||
|
||||
//then
|
||||
Thread.sleep(30000);
|
||||
streams.close();
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user