Bael 766 flink (#1632)

* BAEL-756 code for flink article

* reorder

* simpler wordCount example

* BAEL-766 changes according to PR

* BAEL-766 change datasource to dataset

* BAEL-766 add sorting example

* BAEL-766 add simple streaming example

* one missing change to dataSet

* windowing example

* add window example

* add dependency explicitly

* add plugin

* add surefire plugin, change neme of the test to *IntegrationTest

* fluent assertions
This commit is contained in:
Tomasz Lelek 2017-04-11 13:08:11 +02:00 committed by Grzegorz Piwowarek
parent 6cafa3d5a7
commit 3286018dd2
2 changed files with 21 additions and 16 deletions

View File

@ -152,6 +152,21 @@
<artifactId>flink-test-utils_2.10</artifactId> <artifactId>flink-test-utils_2.10</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies> </dependencies>
<properties> <properties>

View File

@ -1,6 +1,5 @@
package com.baeldung.flink; package com.baeldung.flink;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
@ -35,14 +34,10 @@ public class WordCountIntegrationTest {
//then //then
List<Tuple2<String, Integer>> collect = result.collect(); List<Tuple2<String, Integer>> collect = result.collect();
assertThat(collect.size()).isEqualTo(9); assertThat(collect).containsExactlyInAnyOrder(
assertThat(collect.contains(new Tuple2<>("a", 3))).isTrue(); new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),
assertThat(collect.contains(new Tuple2<>("sentence", 2))).isTrue(); new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),
assertThat(collect.contains(new Tuple2<>("word", 1))).isTrue(); new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));
assertThat(collect.contains(new Tuple2<>("is", 2))).isTrue();
assertThat(collect.contains(new Tuple2<>("this", 2))).isTrue();
assertThat(collect.contains(new Tuple2<>("second", 1))).isTrue();
assertThat(collect.contains(new Tuple2<>("first", 1))).isTrue();
} }
@Test @Test
@ -54,7 +49,7 @@ public class WordCountIntegrationTest {
//when //when
List<Integer> collect = amounts List<Integer> collect = amounts
.filter(a -> a > threshold) .filter(a -> a > threshold)
.reduce((ReduceFunction<Integer>) (integer, t1) -> integer + t1) .reduce((integer, t1) -> integer + t1)
.collect(); .collect();
//then //then
@ -92,12 +87,7 @@ public class WordCountIntegrationTest {
.collect(); .collect();
//then //then
assertThat(sorted.size()).isEqualTo(4); assertThat(sorted).containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);
assertThat(sorted.get(0)).isEqualTo(firstPerson);
assertThat(sorted.get(1)).isEqualTo(secondPerson);
assertThat(sorted.get(2)).isEqualTo(thirdPerson);
assertThat(sorted.get(3)).isEqualTo(fourthPerson);
} }