From 46765cf410196a629a54ee83befc741525b6cfdc Mon Sep 17 00:00:00 2001 From: pazis Date: Wed, 19 Feb 2020 21:28:48 +0000 Subject: [PATCH 1/6] init --- apache-beam/.gitignore | 1 + apache-beam/pom.xml | 44 ++++++++++++ .../com/baeldung/apache/beam/WordCount.java | 71 +++++++++++++++++++ .../apache/beam/WordCountUnitTest.java | 17 +++++ apache-beam/src/test/resources/wordcount.txt | 16 +++++ 5 files changed, 149 insertions(+) create mode 100644 apache-beam/.gitignore create mode 100644 apache-beam/pom.xml create mode 100644 apache-beam/src/main/java/com/baeldung/apache/beam/WordCount.java create mode 100644 apache-beam/src/test/java/com/baeldung/apache/beam/WordCountUnitTest.java create mode 100644 apache-beam/src/test/resources/wordcount.txt diff --git a/apache-beam/.gitignore b/apache-beam/.gitignore new file mode 100644 index 0000000000..84c048a73c --- /dev/null +++ b/apache-beam/.gitignore @@ -0,0 +1 @@ +/build/ diff --git a/apache-beam/pom.xml b/apache-beam/pom.xml new file mode 100644 index 0000000000..1076d06e2a --- /dev/null +++ b/apache-beam/pom.xml @@ -0,0 +1,44 @@ + + 4.0.0 + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + com.baeldung.apache + apache-beam + 0.0.1-SNAPSHOT + apache-beam + + + + org.apache.beam + beam-sdks-java-core + ${beam.version} + + + + org.apache.beam + beam-runners-direct-java + ${beam.version} + runtime + + + + org.assertj + assertj-core + ${assertj.version} + test + + + + + 2.19.0 + 3.6.1 + + + \ No newline at end of file diff --git a/apache-beam/src/main/java/com/baeldung/apache/beam/WordCount.java b/apache-beam/src/main/java/com/baeldung/apache/beam/WordCount.java new file mode 100644 index 0000000000..632cc97e6c --- /dev/null +++ b/apache-beam/src/main/java/com/baeldung/apache/beam/WordCount.java @@ -0,0 +1,71 @@ +package com.baeldung.apache.beam; + +import java.util.Arrays; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.FlatMapElements; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptors; + +public class WordCount { + + public static boolean wordCount(String inputFilePath, String outputFilePath) { + // We use default options + PipelineOptions options = PipelineOptionsFactory.create(); + // to create the pipeline + Pipeline p = Pipeline.create(options); + // Here is our workflow graph + PCollection> wordCount = p + .apply("(1) Read all lines", TextIO.read().from(inputFilePath)) + .apply("(2) Flatmap to a list of words", FlatMapElements.into(TypeDescriptors.strings()) + .via(line -> Arrays.asList(line.split("\\s")))) + .apply("(3) Lowercase all", MapElements.into(TypeDescriptors.strings()) + .via(word -> word.toLowerCase())) + .apply("(4) Trim punctuations", MapElements.into(TypeDescriptors.strings()) + .via(word -> trim(word))) + .apply("(5) Filter stopwords", Filter.by(word -> !isStopWord(word))) + .apply("(6) Count words", Count.perElement()); + // We convert the PCollection to String so that we can write it to file + wordCount.apply(MapElements.into(TypeDescriptors.strings()) + .via(count -> count.getKey() + " --> " + count.getValue())) + .apply(TextIO.write().to(outputFilePath)); + // Finally we must run the pipeline, otherwise it's only a definition + p.run().waitUntilFinish(); + return true; + } + + public static boolean isStopWord(String word) { + String[] stopwords = {"am", "are", "is", "i", "you", "me", + "he", "she", "they", "them", "was", + "were", "from", "in", "of", "to", "be", + "him", "her", "us", "and", "or"}; + for (String stopword : stopwords) { + if (stopword.compareTo(word) == 0) { + return true; + } + } + return false; + } + + public static String trim(String word) { + return word.replace("(","") + .replace(")", "") + .replace(",", "") + .replace(".", "") + .replace("\"", "") + .replace("'", "") + .replace(":", "") + .replace(";", "") + .replace("-", "") + .replace("?", "") + .replace("!", ""); + } + +} diff --git a/apache-beam/src/test/java/com/baeldung/apache/beam/WordCountUnitTest.java b/apache-beam/src/test/java/com/baeldung/apache/beam/WordCountUnitTest.java new file mode 100644 index 0000000000..ff338a292c --- /dev/null +++ b/apache-beam/src/test/java/com/baeldung/apache/beam/WordCountUnitTest.java @@ -0,0 +1,17 @@ +package com.baeldung.apache.beam; + +import static org.junit.Assert.assertTrue; + +import org.junit.Ignore; +import org.junit.Test; + +public class WordCountUnitTest { + + @Test + @Ignore + public void givenInputFile_whenWordCountRuns_thenJobFinishWithoutError() { + boolean jobDone = WordCount.wordCount("src/test/resources/wordcount.txt", "target/output"); + assertTrue(jobDone); + } + +} diff --git a/apache-beam/src/test/resources/wordcount.txt b/apache-beam/src/test/resources/wordcount.txt new file mode 100644 index 0000000000..542385379b --- /dev/null +++ b/apache-beam/src/test/resources/wordcount.txt @@ -0,0 +1,16 @@ +We've all heard the scare stories about North Korea: the homemade nuclear arsenal built while their people starve and then aimed imprecisely at the rest of the world, a +leader so deluded he makes L Ron Hubbard look like a man excessively overburdened with self-doubt and their deep-seated belief that foreign capitalists will invade at any +moment and steal all their bauxite. +The popular portrayal of this Marxist nation is something like one of the more harrowing episodes of M*A*S*H, only with the cast of wacky characters replaced by twitchy, +heavily armed Stalinist meth addicts +Cracked would like to take a moment to celebrate the good things about North Korea though, the things that the country's enemies prefer to suppress as part of their politically +motivated jealousy. Like how no different to you and me, there's nothing every North Korean likes more after an 18 hour shift at the phosphorus plant than a nice beer to go with +his dried fish ration. Ever attentive to its people's needs and in the twinkling of a decade, North Korea's leadership bought, disassembled, transported and rebuilt a British +brewery in order to discover and reproduce the secrets of beer and then brew the sweet nectar for its hardworking people, up to 18 bottles at a time. And with minimal fatalities. +When was the last time YOUR leader got a beer for YOU, American? (NB do not answer this question if you are Henry Louis Gates). +Or how about the fried chicken restaurant that downtown Pyongyang boasts? Yes real chicken, fried and then delivered to your sleeping cube, with optional beer if you like! You +don't even have to remove the feathers or pull out the gizzard yourself. Mostly. Americans must eat their fried chicken from a bucket, like swine, sold by a company so secretive +that even the very blend of seasoning used is intentionally kept from them. And they call North Korea paranoid? +And how many nations would entertain the syphilitic, bourgeois ramblings of Bill Clinton let alone permit him anywhere near their proud womenfolk? Only wise Kim Jong Il could see +past Bill's many, many imperfections and treat him with the pity and kindness he deserves, accepting his feeble pleas to pardon the American spies rightly convicted of photographing +the nation's sensitive beetroot fields. From 43f74f9c47bdbf2e524596c42d0cdd159e2b3cc1 Mon Sep 17 00:00:00 2001 From: pazis Date: Wed, 26 Feb 2020 09:20:31 +0000 Subject: [PATCH 2/6] new package name --- apache-beam/pom.xml | 74 +++++++++---------- .../apache/beam/{ => intro}/WordCount.java | 2 +- .../beam/{ => intro}/WordCountUnitTest.java | 6 +- 3 files changed, 42 insertions(+), 40 deletions(-) rename apache-beam/src/main/java/com/baeldung/apache/beam/{ => intro}/WordCount.java (98%) rename apache-beam/src/test/java/com/baeldung/apache/beam/{ => intro}/WordCountUnitTest.java (77%) diff --git a/apache-beam/pom.xml b/apache-beam/pom.xml index 1076d06e2a..355e5b32e6 100644 --- a/apache-beam/pom.xml +++ b/apache-beam/pom.xml @@ -1,44 +1,44 @@ - 4.0.0 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - - com.baeldung - parent-modules - 1.0.0-SNAPSHOT - + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + - com.baeldung.apache - apache-beam - 0.0.1-SNAPSHOT - apache-beam + com.baeldung.apache + apache-beam + 0.0.1-SNAPSHOT + apache-beam - - - org.apache.beam - beam-sdks-java-core - ${beam.version} - - - - org.apache.beam - beam-runners-direct-java - ${beam.version} - runtime - - - - org.assertj - assertj-core - ${assertj.version} - test - - + + + org.apache.beam + beam-sdks-java-core + ${beam.version} + + + + org.apache.beam + beam-runners-direct-java + ${beam.version} + runtime + + + + org.assertj + assertj-core + ${assertj.version} + test + + - - 2.19.0 - 3.6.1 - + + 2.19.0 + 3.6.1 + \ No newline at end of file diff --git a/apache-beam/src/main/java/com/baeldung/apache/beam/WordCount.java b/apache-beam/src/main/java/com/baeldung/apache/beam/intro/WordCount.java similarity index 98% rename from apache-beam/src/main/java/com/baeldung/apache/beam/WordCount.java rename to apache-beam/src/main/java/com/baeldung/apache/beam/intro/WordCount.java index 632cc97e6c..f2dfb47810 100644 --- a/apache-beam/src/main/java/com/baeldung/apache/beam/WordCount.java +++ b/apache-beam/src/main/java/com/baeldung/apache/beam/intro/WordCount.java @@ -1,4 +1,4 @@ -package com.baeldung.apache.beam; +package com.baeldung.apache.beam.intro; import java.util.Arrays; diff --git a/apache-beam/src/test/java/com/baeldung/apache/beam/WordCountUnitTest.java b/apache-beam/src/test/java/com/baeldung/apache/beam/intro/WordCountUnitTest.java similarity index 77% rename from apache-beam/src/test/java/com/baeldung/apache/beam/WordCountUnitTest.java rename to apache-beam/src/test/java/com/baeldung/apache/beam/intro/WordCountUnitTest.java index ff338a292c..f2558635dc 100644 --- a/apache-beam/src/test/java/com/baeldung/apache/beam/WordCountUnitTest.java +++ b/apache-beam/src/test/java/com/baeldung/apache/beam/intro/WordCountUnitTest.java @@ -1,14 +1,16 @@ -package com.baeldung.apache.beam; +package com.baeldung.apache.beam.intro; import static org.junit.Assert.assertTrue; import org.junit.Ignore; import org.junit.Test; +import com.baeldung.apache.beam.intro.WordCount; + public class WordCountUnitTest { @Test - @Ignore + // @Ignore public void givenInputFile_whenWordCountRuns_thenJobFinishWithoutError() { boolean jobDone = WordCount.wordCount("src/test/resources/wordcount.txt", "target/output"); assertTrue(jobDone); From e57dbf3c72c264f1622e6c1d3d8e672e47ccb160 Mon Sep 17 00:00:00 2001 From: Pazis Date: Wed, 26 Feb 2020 12:51:26 +0330 Subject: [PATCH 3/6] delete .gitignore --- apache-beam/.gitignore | 1 - 1 file changed, 1 deletion(-) delete mode 100644 apache-beam/.gitignore diff --git a/apache-beam/.gitignore b/apache-beam/.gitignore deleted file mode 100644 index 84c048a73c..0000000000 --- a/apache-beam/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/build/ From 862e7a3f20d3e8e599b805413fa944d5d48f50b7 Mon Sep 17 00:00:00 2001 From: Pazis Date: Wed, 26 Feb 2020 12:53:01 +0330 Subject: [PATCH 4/6] update main pom.xml added apache-beam module --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index ca5331b287..63f7b1dcc4 100644 --- a/pom.xml +++ b/pom.xml @@ -350,6 +350,7 @@ antlr apache-avro + apache-beam apache-bval apache-curator apache-cxf From e4f3ba564899b207b5a3acb15c4a66f3bbb099ae Mon Sep 17 00:00:00 2001 From: Pazis Date: Wed, 26 Feb 2020 20:03:23 +0330 Subject: [PATCH 5/6] update main pom.xml again --- pom.xml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 63f7b1dcc4..bf87ac0cbf 100644 --- a/pom.xml +++ b/pom.xml @@ -350,7 +350,6 @@ antlr apache-avro - apache-beam apache-bval apache-curator apache-cxf @@ -640,7 +639,6 @@ spring-batch spring-bom spring-boot-modules - spring-boot-parent spring-boot-rest spring-caching @@ -862,6 +860,7 @@ antlr apache-avro + apache-beam apache-bval apache-curator apache-cxf @@ -1142,7 +1141,6 @@ spring-batch spring-bom spring-boot-modules - spring-boot-parent spring-boot-rest spring-caching From 17852cfe6d0523c33bd448c6d73a38b34bfba707 Mon Sep 17 00:00:00 2001 From: Pazis Date: Thu, 5 Mar 2020 06:42:17 +0330 Subject: [PATCH 6/6] Update pom.xml --- apache-beam/pom.xml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apache-beam/pom.xml b/apache-beam/pom.xml index 355e5b32e6..7a714ac480 100644 --- a/apache-beam/pom.xml +++ b/apache-beam/pom.xml @@ -12,7 +12,6 @@ com.baeldung.apache apache-beam 0.0.1-SNAPSHOT - apache-beam @@ -41,4 +40,4 @@ 3.6.1 - \ No newline at end of file +