From a5de78c2b21737448ee7aed4f7a4449e2cbc60a5 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Sun, 9 Apr 2017 12:23:57 +0200 Subject: [PATCH] Bael 766 flink (#1533) * BAEL-756 code for flink article * reorder * simpler wordCount example * BAEL-766 changes according to PR * BAEL-766 change datasource to dataset * BAEL-766 add sorting example * BAEL-766 add simple streaming example * one missing change to dataSet * windowing example * add window example * add dependency explicitly * add plugin * add surefire plugin, change neme of the test to *IntegrationTest --- libraries/pom.xml | 42 ++++ .../java/com/baeldung/flink/LineSplitter.java | 20 ++ .../java/com/baeldung/flink/WordCount.java | 20 ++ .../flink/WordCountIntegrationTest.java | 196 ++++++++++++++++++ 4 files changed, 278 insertions(+) create mode 100644 libraries/src/main/java/com/baeldung/flink/LineSplitter.java create mode 100644 libraries/src/main/java/com/baeldung/flink/WordCount.java create mode 100644 libraries/src/test/java/com/baeldung/flink/WordCountIntegrationTest.java diff --git a/libraries/pom.xml b/libraries/pom.xml index 0f33c42dc4..a200fe8350 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -20,6 +20,31 @@ 1.8 + + org.apache.maven.plugins + maven-dependency-plugin + + + org.apache.felix + maven-bundle-plugin + 3.3.0 + maven-plugin + + + + true + + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + + **/*IntegrationTest.java + **/*LiveTest.java + + + @@ -92,6 +117,21 @@ commons-io ${commons.io.version} + + org.apache.flink + flink-core + ${flink.version} + + + org.apache.flink + flink-java + ${flink.version} + + + org.apache.flink + flink-test-utils_2.10 + ${flink.version} + @@ -107,6 +147,8 @@ 9.4.2.v20170220 4.5.3 2.5 + 1.2.0 + 2.19.1 \ No newline at end of file diff --git a/libraries/src/main/java/com/baeldung/flink/LineSplitter.java b/libraries/src/main/java/com/baeldung/flink/LineSplitter.java new file mode 100644 index 0000000000..8deeeb01c4 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/LineSplitter.java @@ -0,0 +1,20 @@ +package com.baeldung.flink; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; + +import java.util.stream.Stream; + +@SuppressWarnings("serial") +public class LineSplitter implements FlatMapFunction> { + + @Override + public void flatMap(String value, Collector> out) { + + String[] tokens = value.toLowerCase().split("\\W+"); + Stream.of(tokens) + .filter(t -> t.length() > 0) + .forEach(token -> out.collect(new Tuple2<>(token, 1))); + } +} \ No newline at end of file diff --git a/libraries/src/main/java/com/baeldung/flink/WordCount.java b/libraries/src/main/java/com/baeldung/flink/WordCount.java new file mode 100644 index 0000000000..ab109bdbce --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/WordCount.java @@ -0,0 +1,20 @@ +package com.baeldung.flink; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.aggregation.Aggregations; +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.List; + +public class WordCount { + + public static DataSet> startWordCount(ExecutionEnvironment env, List lines) throws Exception { + DataSet text = env.fromCollection(lines); + + return text.flatMap(new LineSplitter()) + .groupBy(0) + .aggregate(Aggregations.SUM, 1); + + } +} \ No newline at end of file diff --git a/libraries/src/test/java/com/baeldung/flink/WordCountIntegrationTest.java b/libraries/src/test/java/com/baeldung/flink/WordCountIntegrationTest.java new file mode 100644 index 0000000000..91a75c78ba --- /dev/null +++ b/libraries/src/test/java/com/baeldung/flink/WordCountIntegrationTest.java @@ -0,0 +1,196 @@ +package com.baeldung.flink; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.junit.Test; + +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + + +public class WordCountIntegrationTest { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @Test + public void givenDataSet_whenExecuteWordCount_thenReturnWordCount() throws Exception { + //given + List lines = Arrays.asList("This is a first sentence", "This is a second sentence with a one word"); + + //when + DataSet> result = WordCount.startWordCount(env, lines); + + //then + List> collect = result.collect(); + assertThat(collect.size()).isEqualTo(9); + assertThat(collect.contains(new Tuple2<>("a", 3))).isTrue(); + assertThat(collect.contains(new Tuple2<>("sentence", 2))).isTrue(); + assertThat(collect.contains(new Tuple2<>("word", 1))).isTrue(); + assertThat(collect.contains(new Tuple2<>("is", 2))).isTrue(); + assertThat(collect.contains(new Tuple2<>("this", 2))).isTrue(); + assertThat(collect.contains(new Tuple2<>("second", 1))).isTrue(); + assertThat(collect.contains(new Tuple2<>("first", 1))).isTrue(); + } + + @Test + public void givenListOfAmounts_whenUseMapReduce_thenSumAmountsThatAreOnlyAboveThreshold() throws Exception { + //given + DataSet amounts = env.fromElements(1, 29, 40, 50); + int threshold = 30; + + //when + List collect = amounts + .filter(a -> a > threshold) + .reduce((ReduceFunction) (integer, t1) -> integer + t1) + .collect(); + + //then + assertThat(collect.get(0)).isEqualTo(90); + } + + @Test + public void givenDataSetOfComplexObjects_whenMapToGetOneField_thenReturnedListHaveProperElements() throws Exception { + //given + DataSet personDataSource = env.fromCollection(Arrays.asList(new Person(23, "Tom"), new Person(75, "Michael"))); + + //when + List ages = personDataSource.map(p -> p.age).collect(); + + //then + assertThat(ages.size()).isEqualTo(2); + assertThat(ages.containsAll(Arrays.asList(23, 75))).isTrue(); + + } + + @Test + public void givenDataSet_whenSortItByOneField_thenShouldReturnSortedDataSet() throws Exception { + //given + Tuple2 secondPerson = new Tuple2<>(4, "Tom"); + Tuple2 thirdPerson = new Tuple2<>(5, "Scott"); + Tuple2 fourthPerson = new Tuple2<>(200, "Michael"); + Tuple2 firstPerson = new Tuple2<>(1, "Jack"); + DataSet> transactions = env.fromElements(fourthPerson, secondPerson, + thirdPerson, firstPerson); + + + //when + List> sorted = transactions + .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING) + .collect(); + + //then + assertThat(sorted.size()).isEqualTo(4); + assertThat(sorted.get(0)).isEqualTo(firstPerson); + assertThat(sorted.get(1)).isEqualTo(secondPerson); + assertThat(sorted.get(2)).isEqualTo(thirdPerson); + assertThat(sorted.get(3)).isEqualTo(fourthPerson); + + } + + + @Test + public void giveTwoDataSets_whenJoinUsingId_thenProduceJoinedData() throws Exception { + //given + Tuple3 address = new Tuple3<>(1, "5th Avenue", "London"); + DataSet> addresses = env.fromElements(address); + + Tuple2 firstTransaction = new Tuple2<>(1, "Transaction_1"); + DataSet> transactions = + env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2")); + + + //when + List, Tuple3>> joined = + transactions.join(addresses) + .where(new IdKeySelectorTransaction()) + .equalTo(new IdKeySelectorAddress()) + .collect(); + + //then + assertThat(joined.size()).isEqualTo(1); + assertThat(joined.contains(new Tuple2<>(firstTransaction, address))); + + } + + @Test + public void givenStreamOfEvents_whenProcessEvents_thenShouldPrintResultsOnSinkOperation() throws Exception { + //given + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream text + = env.fromElements("This is a first sentence", "This is a second sentence with a one word"); + + + SingleOutputStreamOperator upperCase = text.map(String::toUpperCase); + + upperCase.print(); + + //when + env.execute(); + } + + + @Test + public void givenStreamOfEvents_whenProcessEvents_thenShouldApplyWindowingOnTransformation() throws Exception { + //given + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator> windowed = env.fromElements( + new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()), + new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()) + ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>(Time.seconds(20)) { + @Override + public long extractTimestamp(Tuple2 element) { + return element.f1 * 1000; + } + }); + + SingleOutputStreamOperator> reduced = windowed + .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) + .maxBy(0, true); + + reduced.print(); + + //when + env.execute(); + } + + + private static class IdKeySelectorTransaction implements KeySelector, Integer> { + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + } + + private static class IdKeySelectorAddress implements KeySelector, Integer> { + @Override + public Integer getKey(Tuple3 value) { + return value.f0; + } + } + + private static class Person { + private final int age; + private final String name; + + private Person(int age, String name) { + this.age = age; + this.name = name; + } + } + +} \ No newline at end of file