diff --git a/libraries/pom.xml b/libraries/pom.xml
index 0f33c42dc4..a200fe8350 100644
--- a/libraries/pom.xml
+++ b/libraries/pom.xml
@@ -20,6 +20,31 @@
1.8
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ 3.3.0
+ maven-plugin
+
+
+
+ true
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${maven-surefire-plugin.version}
+
+
+ **/*IntegrationTest.java
+ **/*LiveTest.java
+
+
+
@@ -92,6 +117,21 @@
commons-io
${commons.io.version}
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-test-utils_2.10
+ ${flink.version}
+
@@ -107,6 +147,8 @@
9.4.2.v20170220
4.5.3
2.5
+ 1.2.0
+ 2.19.1
\ No newline at end of file
diff --git a/libraries/src/main/java/com/baeldung/flink/LineSplitter.java b/libraries/src/main/java/com/baeldung/flink/LineSplitter.java
new file mode 100644
index 0000000000..8deeeb01c4
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/LineSplitter.java
@@ -0,0 +1,20 @@
+package com.baeldung.flink;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+import java.util.stream.Stream;
+
+@SuppressWarnings("serial")
+public class LineSplitter implements FlatMapFunction> {
+
+ @Override
+ public void flatMap(String value, Collector> out) {
+
+ String[] tokens = value.toLowerCase().split("\\W+");
+ Stream.of(tokens)
+ .filter(t -> t.length() > 0)
+ .forEach(token -> out.collect(new Tuple2<>(token, 1)));
+ }
+}
\ No newline at end of file
diff --git a/libraries/src/main/java/com/baeldung/flink/WordCount.java b/libraries/src/main/java/com/baeldung/flink/WordCount.java
new file mode 100644
index 0000000000..ab109bdbce
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/WordCount.java
@@ -0,0 +1,20 @@
+package com.baeldung.flink;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.List;
+
+public class WordCount {
+
+ public static DataSet> startWordCount(ExecutionEnvironment env, List lines) throws Exception {
+ DataSet text = env.fromCollection(lines);
+
+ return text.flatMap(new LineSplitter())
+ .groupBy(0)
+ .aggregate(Aggregations.SUM, 1);
+
+ }
+}
\ No newline at end of file
diff --git a/libraries/src/test/java/com/baeldung/flink/WordCountIntegrationTest.java b/libraries/src/test/java/com/baeldung/flink/WordCountIntegrationTest.java
new file mode 100644
index 0000000000..91a75c78ba
--- /dev/null
+++ b/libraries/src/test/java/com/baeldung/flink/WordCountIntegrationTest.java
@@ -0,0 +1,196 @@
+package com.baeldung.flink;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.junit.Test;
+
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class WordCountIntegrationTest {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @Test
+ public void givenDataSet_whenExecuteWordCount_thenReturnWordCount() throws Exception {
+ //given
+ List lines = Arrays.asList("This is a first sentence", "This is a second sentence with a one word");
+
+ //when
+ DataSet> result = WordCount.startWordCount(env, lines);
+
+ //then
+ List> collect = result.collect();
+ assertThat(collect.size()).isEqualTo(9);
+ assertThat(collect.contains(new Tuple2<>("a", 3))).isTrue();
+ assertThat(collect.contains(new Tuple2<>("sentence", 2))).isTrue();
+ assertThat(collect.contains(new Tuple2<>("word", 1))).isTrue();
+ assertThat(collect.contains(new Tuple2<>("is", 2))).isTrue();
+ assertThat(collect.contains(new Tuple2<>("this", 2))).isTrue();
+ assertThat(collect.contains(new Tuple2<>("second", 1))).isTrue();
+ assertThat(collect.contains(new Tuple2<>("first", 1))).isTrue();
+ }
+
+ @Test
+ public void givenListOfAmounts_whenUseMapReduce_thenSumAmountsThatAreOnlyAboveThreshold() throws Exception {
+ //given
+ DataSet amounts = env.fromElements(1, 29, 40, 50);
+ int threshold = 30;
+
+ //when
+ List collect = amounts
+ .filter(a -> a > threshold)
+ .reduce((ReduceFunction) (integer, t1) -> integer + t1)
+ .collect();
+
+ //then
+ assertThat(collect.get(0)).isEqualTo(90);
+ }
+
+ @Test
+ public void givenDataSetOfComplexObjects_whenMapToGetOneField_thenReturnedListHaveProperElements() throws Exception {
+ //given
+ DataSet personDataSource = env.fromCollection(Arrays.asList(new Person(23, "Tom"), new Person(75, "Michael")));
+
+ //when
+ List ages = personDataSource.map(p -> p.age).collect();
+
+ //then
+ assertThat(ages.size()).isEqualTo(2);
+ assertThat(ages.containsAll(Arrays.asList(23, 75))).isTrue();
+
+ }
+
+ @Test
+ public void givenDataSet_whenSortItByOneField_thenShouldReturnSortedDataSet() throws Exception {
+ //given
+ Tuple2 secondPerson = new Tuple2<>(4, "Tom");
+ Tuple2 thirdPerson = new Tuple2<>(5, "Scott");
+ Tuple2 fourthPerson = new Tuple2<>(200, "Michael");
+ Tuple2 firstPerson = new Tuple2<>(1, "Jack");
+ DataSet> transactions = env.fromElements(fourthPerson, secondPerson,
+ thirdPerson, firstPerson);
+
+
+ //when
+ List> sorted = transactions
+ .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
+ .collect();
+
+ //then
+ assertThat(sorted.size()).isEqualTo(4);
+ assertThat(sorted.get(0)).isEqualTo(firstPerson);
+ assertThat(sorted.get(1)).isEqualTo(secondPerson);
+ assertThat(sorted.get(2)).isEqualTo(thirdPerson);
+ assertThat(sorted.get(3)).isEqualTo(fourthPerson);
+
+ }
+
+
+ @Test
+ public void giveTwoDataSets_whenJoinUsingId_thenProduceJoinedData() throws Exception {
+ //given
+ Tuple3 address = new Tuple3<>(1, "5th Avenue", "London");
+ DataSet> addresses = env.fromElements(address);
+
+ Tuple2 firstTransaction = new Tuple2<>(1, "Transaction_1");
+ DataSet> transactions =
+ env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));
+
+
+ //when
+ List, Tuple3>> joined =
+ transactions.join(addresses)
+ .where(new IdKeySelectorTransaction())
+ .equalTo(new IdKeySelectorAddress())
+ .collect();
+
+ //then
+ assertThat(joined.size()).isEqualTo(1);
+ assertThat(joined.contains(new Tuple2<>(firstTransaction, address)));
+
+ }
+
+ @Test
+ public void givenStreamOfEvents_whenProcessEvents_thenShouldPrintResultsOnSinkOperation() throws Exception {
+ //given
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream text
+ = env.fromElements("This is a first sentence", "This is a second sentence with a one word");
+
+
+ SingleOutputStreamOperator upperCase = text.map(String::toUpperCase);
+
+ upperCase.print();
+
+ //when
+ env.execute();
+ }
+
+
+ @Test
+ public void givenStreamOfEvents_whenProcessEvents_thenShouldApplyWindowingOnTransformation() throws Exception {
+ //given
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ SingleOutputStreamOperator> windowed = env.fromElements(
+ new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
+ new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond())
+ ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>(Time.seconds(20)) {
+ @Override
+ public long extractTimestamp(Tuple2 element) {
+ return element.f1 * 1000;
+ }
+ });
+
+ SingleOutputStreamOperator> reduced = windowed
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
+ .maxBy(0, true);
+
+ reduced.print();
+
+ //when
+ env.execute();
+ }
+
+
+ private static class IdKeySelectorTransaction implements KeySelector, Integer> {
+ @Override
+ public Integer getKey(Tuple2 value) {
+ return value.f0;
+ }
+ }
+
+ private static class IdKeySelectorAddress implements KeySelector, Integer> {
+ @Override
+ public Integer getKey(Tuple3 value) {
+ return value.f0;
+ }
+ }
+
+ private static class Person {
+ private final int age;
+ private final String name;
+
+ private Person(int age, String name) {
+ this.age = age;
+ this.name = name;
+ }
+ }
+
+}
\ No newline at end of file