Merge pull request #8757 from pazis/BAEL-3861

[BAEL-3861] Introduction to Apache Beam
This commit is contained in:
Josh Cummings 2020-03-06 08:24:44 -07:00 committed by GitHub
commit 99fa431fcc
5 changed files with 150 additions and 0 deletions

43
apache-beam/pom.xml Normal file
View File

@ -0,0 +1,43 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<groupId>com.baeldung.apache</groupId>
<artifactId>apache-beam</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- runtime scoped -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<!-- test scoped -->
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<beam.version>2.19.0</beam.version>
<assertj.version>3.6.1</assertj.version>
</properties>
</project>

View File

@ -0,0 +1,71 @@
package com.baeldung.apache.beam.intro;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
public class WordCount {
public static boolean wordCount(String inputFilePath, String outputFilePath) {
// We use default options
PipelineOptions options = PipelineOptionsFactory.create();
// to create the pipeline
Pipeline p = Pipeline.create(options);
// Here is our workflow graph
PCollection<KV<String, Long>> wordCount = p
.apply("(1) Read all lines", TextIO.read().from(inputFilePath))
.apply("(2) Flatmap to a list of words", FlatMapElements.into(TypeDescriptors.strings())
.via(line -> Arrays.asList(line.split("\\s"))))
.apply("(3) Lowercase all", MapElements.into(TypeDescriptors.strings())
.via(word -> word.toLowerCase()))
.apply("(4) Trim punctuations", MapElements.into(TypeDescriptors.strings())
.via(word -> trim(word)))
.apply("(5) Filter stopwords", Filter.by(word -> !isStopWord(word)))
.apply("(6) Count words", Count.perElement());
// We convert the PCollection to String so that we can write it to file
wordCount.apply(MapElements.into(TypeDescriptors.strings())
.via(count -> count.getKey() + " --> " + count.getValue()))
.apply(TextIO.write().to(outputFilePath));
// Finally we must run the pipeline, otherwise it's only a definition
p.run().waitUntilFinish();
return true;
}
public static boolean isStopWord(String word) {
String[] stopwords = {"am", "are", "is", "i", "you", "me",
"he", "she", "they", "them", "was",
"were", "from", "in", "of", "to", "be",
"him", "her", "us", "and", "or"};
for (String stopword : stopwords) {
if (stopword.compareTo(word) == 0) {
return true;
}
}
return false;
}
public static String trim(String word) {
return word.replace("(","")
.replace(")", "")
.replace(",", "")
.replace(".", "")
.replace("\"", "")
.replace("'", "")
.replace(":", "")
.replace(";", "")
.replace("-", "")
.replace("?", "")
.replace("!", "");
}
}

View File

@ -0,0 +1,19 @@
package com.baeldung.apache.beam.intro;
import static org.junit.Assert.assertTrue;
import org.junit.Ignore;
import org.junit.Test;
import com.baeldung.apache.beam.intro.WordCount;
public class WordCountUnitTest {
@Test
// @Ignore
public void givenInputFile_whenWordCountRuns_thenJobFinishWithoutError() {
boolean jobDone = WordCount.wordCount("src/test/resources/wordcount.txt", "target/output");
assertTrue(jobDone);
}
}

View File

@ -0,0 +1,16 @@
We've all heard the scare stories about North Korea: the homemade nuclear arsenal built while their people starve and then aimed imprecisely at the rest of the world, a
leader so deluded he makes L Ron Hubbard look like a man excessively overburdened with self-doubt and their deep-seated belief that foreign capitalists will invade at any
moment and steal all their bauxite.
The popular portrayal of this Marxist nation is something like one of the more harrowing episodes of M*A*S*H, only with the cast of wacky characters replaced by twitchy,
heavily armed Stalinist meth addicts
Cracked would like to take a moment to celebrate the good things about North Korea though, the things that the country's enemies prefer to suppress as part of their politically
motivated jealousy. Like how no different to you and me, there's nothing every North Korean likes more after an 18 hour shift at the phosphorus plant than a nice beer to go with
his dried fish ration. Ever attentive to its people's needs and in the twinkling of a decade, North Korea's leadership bought, disassembled, transported and rebuilt a British
brewery in order to discover and reproduce the secrets of beer and then brew the sweet nectar for its hardworking people, up to 18 bottles at a time. And with minimal fatalities.
When was the last time YOUR leader got a beer for YOU, American? (NB do not answer this question if you are Henry Louis Gates).
Or how about the fried chicken restaurant that downtown Pyongyang boasts? Yes real chicken, fried and then delivered to your sleeping cube, with optional beer if you like! You
don't even have to remove the feathers or pull out the gizzard yourself. Mostly. Americans must eat their fried chicken from a bucket, like swine, sold by a company so secretive
that even the very blend of seasoning used is intentionally kept from them. And they call North Korea paranoid?
And how many nations would entertain the syphilitic, bourgeois ramblings of Bill Clinton let alone permit him anywhere near their proud womenfolk? Only wise Kim Jong Il could see
past Bill's many, many imperfections and treat him with the pity and kindness he deserves, accepting his feeble pleas to pardon the American spies rightly convicted of photographing
the nation's sensitive beetroot fields.

View File

@ -862,6 +862,7 @@
<module>antlr</module>
<module>apache-avro</module>
<module>apache-beam</module>
<module>apache-bval</module>
<module>apache-curator</module>
<module>apache-cxf</module>