* BAEL-1421 live test of kafka streams

* BAEL-1421 Removed not-needed dependency

* BAEL-1421 rearannge

* BAEL-1421 rearannge

* fix pom

* move to libraries-data

* move to libraries-data
This commit is contained in:
Tomasz Lelek 2017-12-28 11:21:11 +01:00 committed by Grzegorz Piwowarek
parent e87114be6e
commit 0a76081ac3
2 changed files with 112 additions and 19 deletions

View File

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
<groupId>com.baeldung</groupId> <groupId>com.baeldung</groupId>
@ -41,6 +42,29 @@
<artifactId>ormlite-jdbc</artifactId> <artifactId>ormlite-jdbc</artifactId>
<version>${ormlite.version}</version> <version>${ormlite.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
@ -143,6 +167,12 @@
</plugins> </plugins>
</build> </build>
<repositories>
<repository>
<id>Apache Staging</id>
<url>https://repository.apache.org/content/groups/staging</url>
</repository>
</repositories>
<properties> <properties>
<kryo.version>4.0.1</kryo.version> <kryo.version>4.0.1</kryo.version>
<h2.version>1.4.196</h2.version> <h2.version>1.4.196</h2.version>
@ -150,5 +180,6 @@
<junit.version>4.12</junit.version> <junit.version>4.12</junit.version>
<maven-compiler-plugin.version>3.6.2</maven-compiler-plugin.version> <maven-compiler-plugin.version>3.6.2</maven-compiler-plugin.version>
<ormlite.version>5.0</ormlite.version> <ormlite.version>5.0</ormlite.version>
<kafka.version>1.0.0</kafka.version>
</properties> </properties>
</project> </project>

View File

@ -0,0 +1,62 @@
package com.baeldung.kafkastreams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.TestUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;
public class KafkaStreamsLiveTest {
private String bootstrapServers = "localhost:9092";
@Test
@Ignore("it needs to have kafka broker running on local")
public void shouldTestKafkaStreams() throws InterruptedException {
//given
String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
//when
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
String outputTopic = "outputTopic";
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
wordCounts.to(stringSerde, longSerde, outputTopic);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
//then
Thread.sleep(30000);
streams.close();
}
}