Bael 766 flink (#1533)

* BAEL-756 code for flink article

* reorder

* simpler wordCount example

* BAEL-766 changes according to PR

* BAEL-766 change datasource to dataset

* BAEL-766 add sorting example

* BAEL-766 add simple streaming example

* one missing change to dataSet

* windowing example

* add window example

* add dependency explicitly

* add plugin

* add surefire plugin, change neme of the test to *IntegrationTest
This commit is contained in:
Tomasz Lelek 2017-04-09 12:23:57 +02:00 committed by Grzegorz Piwowarek
parent 60332bb563
commit a5de78c2b2
4 changed files with 278 additions and 0 deletions

View File

@ -20,6 +20,31 @@
<target>1.8</target> <target>1.8</target>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>3.3.0</version>
<type>maven-plugin</type>
</dependency>
</dependencies>
<extensions>true</extensions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<excludes>
<exclude>**/*IntegrationTest.java</exclude>
<exclude>**/*LiveTest.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
@ -92,6 +117,21 @@
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>
<version>${commons.io.version}</version> <version>${commons.io.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies> </dependencies>
<properties> <properties>
@ -107,6 +147,8 @@
<jetty.version>9.4.2.v20170220</jetty.version> <jetty.version>9.4.2.v20170220</jetty.version>
<httpclient.version>4.5.3</httpclient.version> <httpclient.version>4.5.3</httpclient.version>
<commons.io.version>2.5</commons.io.version> <commons.io.version>2.5</commons.io.version>
<flink.version>1.2.0</flink.version>
<maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
</properties> </properties>
</project> </project>

View File

@ -0,0 +1,20 @@
package com.baeldung.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.stream.Stream;
@SuppressWarnings("serial")
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
Stream.of(tokens)
.filter(t -> t.length() > 0)
.forEach(token -> out.collect(new Tuple2<>(token, 1)));
}
}

View File

@ -0,0 +1,20 @@
package com.baeldung.flink;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.List;
public class WordCount {
public static DataSet<Tuple2<String, Integer>> startWordCount(ExecutionEnvironment env, List<String> lines) throws Exception {
DataSet<String> text = env.fromCollection(lines);
return text.flatMap(new LineSplitter())
.groupBy(0)
.aggregate(Aggregations.SUM, 1);
}
}

View File

@ -0,0 +1,196 @@
package com.baeldung.flink;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.Test;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class WordCountIntegrationTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@Test
public void givenDataSet_whenExecuteWordCount_thenReturnWordCount() throws Exception {
//given
List<String> lines = Arrays.asList("This is a first sentence", "This is a second sentence with a one word");
//when
DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);
//then
List<Tuple2<String, Integer>> collect = result.collect();
assertThat(collect.size()).isEqualTo(9);
assertThat(collect.contains(new Tuple2<>("a", 3))).isTrue();
assertThat(collect.contains(new Tuple2<>("sentence", 2))).isTrue();
assertThat(collect.contains(new Tuple2<>("word", 1))).isTrue();
assertThat(collect.contains(new Tuple2<>("is", 2))).isTrue();
assertThat(collect.contains(new Tuple2<>("this", 2))).isTrue();
assertThat(collect.contains(new Tuple2<>("second", 1))).isTrue();
assertThat(collect.contains(new Tuple2<>("first", 1))).isTrue();
}
@Test
public void givenListOfAmounts_whenUseMapReduce_thenSumAmountsThatAreOnlyAboveThreshold() throws Exception {
//given
DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);
int threshold = 30;
//when
List<Integer> collect = amounts
.filter(a -> a > threshold)
.reduce((ReduceFunction<Integer>) (integer, t1) -> integer + t1)
.collect();
//then
assertThat(collect.get(0)).isEqualTo(90);
}
@Test
public void givenDataSetOfComplexObjects_whenMapToGetOneField_thenReturnedListHaveProperElements() throws Exception {
//given
DataSet<Person> personDataSource = env.fromCollection(Arrays.asList(new Person(23, "Tom"), new Person(75, "Michael")));
//when
List<Integer> ages = personDataSource.map(p -> p.age).collect();
//then
assertThat(ages.size()).isEqualTo(2);
assertThat(ages.containsAll(Arrays.asList(23, 75))).isTrue();
}
@Test
public void givenDataSet_whenSortItByOneField_thenShouldReturnSortedDataSet() throws Exception {
//given
Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");
Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");
Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");
Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(fourthPerson, secondPerson,
thirdPerson, firstPerson);
//when
List<Tuple2<Integer, String>> sorted = transactions
.sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
.collect();
//then
assertThat(sorted.size()).isEqualTo(4);
assertThat(sorted.get(0)).isEqualTo(firstPerson);
assertThat(sorted.get(1)).isEqualTo(secondPerson);
assertThat(sorted.get(2)).isEqualTo(thirdPerson);
assertThat(sorted.get(3)).isEqualTo(fourthPerson);
}
@Test
public void giveTwoDataSets_whenJoinUsingId_thenProduceJoinedData() throws Exception {
//given
Tuple3<Integer, String, String> address = new Tuple3<>(1, "5th Avenue", "London");
DataSet<Tuple3<Integer, String, String>> addresses = env.fromElements(address);
Tuple2<Integer, String> firstTransaction = new Tuple2<>(1, "Transaction_1");
DataSet<Tuple2<Integer, String>> transactions =
env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));
//when
List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>> joined =
transactions.join(addresses)
.where(new IdKeySelectorTransaction())
.equalTo(new IdKeySelectorAddress())
.collect();
//then
assertThat(joined.size()).isEqualTo(1);
assertThat(joined.contains(new Tuple2<>(firstTransaction, address)));
}
@Test
public void givenStreamOfEvents_whenProcessEvents_thenShouldPrintResultsOnSinkOperation() throws Exception {
//given
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text
= env.fromElements("This is a first sentence", "This is a second sentence with a one word");
SingleOutputStreamOperator<String> upperCase = text.map(String::toUpperCase);
upperCase.print();
//when
env.execute();
}
@Test
public void givenStreamOfEvents_whenProcessEvents_thenShouldApplyWindowingOnTransformation() throws Exception {
//given
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed = env.fromElements(
new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond())
).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Integer, Long>>(Time.seconds(20)) {
@Override
public long extractTimestamp(Tuple2<Integer, Long> element) {
return element.f1 * 1000;
}
});
SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.maxBy(0, true);
reduced.print();
//when
env.execute();
}
private static class IdKeySelectorTransaction implements KeySelector<Tuple2<Integer, String>, Integer> {
@Override
public Integer getKey(Tuple2<Integer, String> value) {
return value.f0;
}
}
private static class IdKeySelectorAddress implements KeySelector<Tuple3<Integer, String, String>, Integer> {
@Override
public Integer getKey(Tuple3<Integer, String, String> value) {
return value.f0;
}
}
private static class Person {
private final int age;
private final String name;
private Person(int age, String name) {
this.age = age;
this.name = name;
}
}
}