BAEL-1829 - Overview of Apache Crunch

This commit is contained in:
chandra 2018-08-26 22:07:55 -04:00
parent 858ce0ec47
commit 813bc3249f
14 changed files with 466 additions and 1 deletions

View File

@ -10,3 +10,4 @@
- [A Guide to Apache Ignite](http://www.baeldung.com/apache-ignite)
- [Apache Ignite with Spring Data](http://www.baeldung.com/apache-ignite-spring-data)
- [Guide to JMapper](https://github.com/eugenp/tutorials/tree/master/libraries-data)
- [A Guide to Apache Crunch](https://github.com/eugenp/tutorials/tree/master/libraries-data)

View File

@ -147,6 +147,44 @@
<artifactId>jmapper-core</artifactId>
<version>${jmapper.version}</version>
</dependency>
<!-- crunch project -->
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-core</artifactId>
<version>${org.apache.crunch.crunch-core.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${org.apache.hadoop.hadoop-client}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.0.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
@ -252,6 +290,31 @@
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/hadoop-job.xml</descriptor>
</descriptors>
<archive>
<manifest>
<mainClass>com.baeldung.crunch.WordCount</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
@ -282,7 +345,9 @@
<datanucleus-maven-plugin.version>5.0.2</datanucleus-maven-plugin.version>
<datanucleus-xml.version>5.0.0-release</datanucleus-xml.version>
<datanucleus-jdo-query.version>5.0.4</datanucleus-jdo-query.version>
<jmapper.version>1.6.0.1</jmapper.version>
<jmapper.version>1.6.0.1</jmapper.version>
<org.apache.crunch.crunch-core.version>0.15.0</org.apache.crunch.crunch-core.version>
<org.apache.hadoop.hadoop-client>2.2.0</org.apache.hadoop.hadoop-client>
</properties>
</project>

View File

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>job</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<unpack>false</unpack>
<scope>runtime</scope>
<outputDirectory>lib</outputDirectory>
<excludes>
<exclude>${groupId}:${artifactId}</exclude>
</excludes>
</dependencySet>
<dependencySet>
<unpack>true</unpack>
<includes>
<include>${groupId}:${artifactId}</include>
</includes>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,25 @@
package com.baeldung.crunch;
import java.util.Set;
import org.apache.crunch.FilterFn;
import com.google.common.collect.ImmutableSet;
/**
* A filter that removes known stop words.
*/
public class StopWordFilter extends FilterFn<String> {
// English stop words, borrowed from Lucene.
private static final Set<String> STOP_WORDS = ImmutableSet
.copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by",
"for", "if", "in", "into", "is", "it", "no", "not", "of", "on",
"or", "s", "such", "t", "that", "the", "their", "then", "there",
"these", "they", "this", "to", "was", "will", "with" });
@Override
public boolean accept(String word) {
return !STOP_WORDS.contains(word);
}
}

View File

@ -0,0 +1,11 @@
package com.baeldung.crunch;
import org.apache.crunch.MapFn;
public class ToUpperCaseFn extends MapFn<String, String> {
@Override
public String map(String input) {
return input != null ? input.toUpperCase() : input;
}
}

View File

@ -0,0 +1,20 @@
package com.baeldung.crunch;
import org.apache.crunch.MapFn;
@SuppressWarnings("serial")
public class ToUpperCaseWithCounterFn extends MapFn<String, String> {
@Override
public String map(String input) {
if (input == null) {
return input;
} else {
String output = input.toUpperCase();
if (!input.equals(output)) {
increment("UpperCase", "modified");
}
return output;
}
}
}

View File

@ -0,0 +1,23 @@
package com.baeldung.crunch;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import com.google.common.base.Splitter;
/**
* Splits a line of text, filtering known stop words.
*/
public class Tokenizer extends DoFn<String, String> {
private static final Splitter SPLITTER = Splitter
.onPattern("\\s+")
.omitEmptyStrings();
@Override
public void process(String line,
Emitter<String> emitter) {
for (String word : SPLITTER.split(line)) {
emitter.emit(word);
}
}
}

View File

@ -0,0 +1,62 @@
package com.baeldung.crunch;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* A word count example for Apache Crunch, based on Crunch's example projects.
*/
public class WordCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordCount(), args);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: hadoop jar crunch-1.0.0-SNAPSHOT-job.jar" + " [generic options] input output");
System.err.println();
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}
String inputPath = args[0];
String outputPath = args[1];
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
// Reference a given text file as a collection of Strings.
PCollection<String> lines = pipeline.readTextFile(inputPath);
// Define a function that splits each line in a PCollection of Strings into
// a PCollection made up of the individual words in the file.
// The second argument sets the serialization format.
PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());
// Take the collection of words and remove known stop words.
PCollection<String> noStopWords = words.filter(new StopWordFilter());
// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = noStopWords.count();
// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, outputPath);
// Execute the pipeline as a MapReduce.
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
}

View File

@ -0,0 +1,89 @@
package com.baeldung.crunch;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Calendar;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.Source;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.io.To;
import org.junit.Ignore;
import org.junit.Test;
public class MemPipelineUnitTest {
private static final String INPUT_FILE_PATH = "src/test/resources/crunch/input.txt";
@Test
public void givenPipeLineAndSource_whenSourceRead_thenExpectedNumberOfRecordsRead() {
Pipeline pipeline = MemPipeline.getInstance();
Source<String> source = From.textFile(INPUT_FILE_PATH);
PCollection<String> lines = pipeline.read(source);
assertEquals(21, lines.asCollection()
.getValue()
.size());
}
@Test
public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() {
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> lines = pipeline.readTextFile(INPUT_FILE_PATH);
assertEquals(21, lines.asCollection()
.getValue()
.size());
}
private String createOutputPath() throws IOException {
Path path = Files.createTempDirectory("test");
final String outputFilePath = path.toString() + File.separatorChar
+ "output.text";
return outputFilePath;
}
@Test
@Ignore("Requires Hadoop binaries")
public void givenCollection_whenWriteCalled_fileWrittenSuccessfully()
throws IOException {
PCollection<String> inputStrings = MemPipeline.collectionOf("Hello",
"Apache", "Crunch", Calendar.getInstance()
.toString());
final String outputFilePath = createOutputPath();
Target target = To.textFile(outputFilePath);
inputStrings.write(target);
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> lines = pipeline.readTextFile(outputFilePath);
assertIterableEquals(inputStrings.materialize(), lines.materialize());
}
@Test
@Ignore("Requires Hadoop binaries")
public void givenPipeLine_whenWriteTextFileCalled_fileWrittenSuccessfully()
throws IOException {
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> inputStrings = MemPipeline.collectionOf("Hello",
"Apache", "Crunch", Calendar.getInstance()
.toString());
final String outputFilePath = createOutputPath();
pipeline.writeTextFile(inputStrings, outputFilePath);
PCollection<String> lines = pipeline.readTextFile(outputFilePath);
assertIterableEquals(inputStrings.materialize(), lines.materialize());
}
}

View File

@ -0,0 +1,41 @@
package com.baeldung.crunch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.crunch.FilterFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.impl.mem.MemPipeline;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
public class StopWordFilterUnitTest {
@Test
public void givenFilter_whenStopWordPassed_thenFalseReturned() {
FilterFn<String> filter = new StopWordFilter();
assertFalse(filter.accept("the"));
}
@Test
public void givenFilter_whenNonStopWordPassed_thenTrueReturned() {
FilterFn<String> filter = new StopWordFilter();
assertTrue(filter.accept("Hello"));
}
@Test
public void givenWordCollection_whenFiltered_thenStopWordsRemoved() {
PCollection<String> words = MemPipeline.collectionOf("This", "is", "a",
"test", "sentence");
PCollection<String> noStopWords = words.filter(new StopWordFilter());
assertEquals(ImmutableList.of("This", "test", "sentence"),
Lists.newArrayList(noStopWords.materialize()));
}
}

View File

@ -0,0 +1,21 @@
package com.baeldung.crunch;
import static org.junit.Assert.assertEquals;
import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
public class ToUpperCaseFnUnitTest {
@Test
public void givenString_whenToUpperCaseFnCalled_UpperCaseStringReturned() {
InMemoryEmitter<String> emitter = new InMemoryEmitter<String>();
new ToUpperCaseFn().process("input", emitter);
assertEquals(ImmutableList.of("INPUT"), emitter.getOutput());
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.crunch;
import static org.junit.Assert.assertEquals;
import org.apache.crunch.PCollection;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.types.writable.Writables;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
public class ToUpperCaseWithCounterFnUnitTest {
@Before
public void setUp() throws Exception {
MemPipeline.clearCounters();
}
@Test
public void whenFunctionCalled_counterIncementendForChangedValues() {
PCollection<String> inputStrings = MemPipeline.collectionOf("This", "is", "a", "TEST", "string");
PCollection<String> upperCaseStrings = inputStrings.parallelDo(new ToUpperCaseWithCounterFn(), Writables.strings());
assertEquals(ImmutableList.of("THIS", "IS", "A", "TEST", "STRING"), Lists.newArrayList(upperCaseStrings.materialize()));
assertEquals(4L, MemPipeline.getCounters()
.findCounter("UpperCase", "modified")
.getValue());
}
}

View File

@ -0,0 +1,27 @@
package com.baeldung.crunch;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import org.apache.crunch.Emitter;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TokenizerUnitTest {
@Mock
private Emitter<String> emitter;
@Test
public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() {
Tokenizer splitter = new Tokenizer();
splitter.process(" hello world ", emitter);
verify(emitter).emit("hello");
verify(emitter).emit("world");
verifyNoMoreInteractions(emitter);
}
}

View File

@ -0,0 +1,21 @@
An an valley indeed so no wonder future nature vanity. Debating all she mistaken indulged believed provided declared. He many kept on draw lain song as same. Whether at dearest certain spirits is entered in to. Rich fine bred real use too many good. She compliment unaffected expression favourable any. Unknown chiefly showing to conduct no. Hung as love evil able to post at as.
Is he staying arrival address earnest. To preference considered it themselves inquietude collecting estimating. View park for why gay knew face. Next than near to four so hand. Times so do he downs me would. Witty abode party her found quiet law. They door four bed fail now have.
Tolerably earnestly middleton extremely distrusts she boy now not. Add and offered prepare how cordial two promise. Greatly who affixed suppose but enquire compact prepare all put. Added forth chief trees but rooms think may. Wicket do manner others seemed enable rather in. Excellent own discovery unfeeling sweetness questions the gentleman. Chapter shyness matters mr parlors if mention thought.
Extended kindness trifling remember he confined outlived if. Assistance sentiments yet unpleasing say. Open they an busy they my such high. An active dinner wishes at unable hardly no talked on. Immediate him her resolving his favourite. Wished denote abroad at branch at.
Village did removed enjoyed explain nor ham saw calling talking. Securing as informed declared or margaret. Joy horrible moreover man feelings own shy. Request norland neither mistake for yet. Between the for morning assured country believe. On even feet time have an no at. Relation so in confined smallest children unpacked delicate. Why sir end believe uncivil respect. Always get adieus nature day course for common. My little garret repair to desire he esteem.
You vexed shy mirth now noise. Talked him people valley add use her depend letter. Allowance too applauded now way something recommend. Mrs age men and trees jokes fancy. Gay pretended engrossed eagerness continued ten. Admitting day him contained unfeeling attention mrs out.
Performed suspicion in certainty so frankness by attention pretended. Newspaper or in tolerably education enjoyment. Extremity excellent certainty discourse sincerity no he so resembled. Joy house worse arise total boy but. Elderly up chicken do at feeling is. Like seen drew no make fond at on rent. Behaviour extremely her explained situation yet september gentleman are who. Is thought or pointed hearing he.
If wandered relation no surprise of screened doubtful. Overcame no insisted ye of trifling husbands. Might am order hours on found. Or dissimilar companions friendship impossible at diminution. Did yourself carriage learning she man its replying. Sister piqued living her you enable mrs off spirit really. Parish oppose repair is me misery. Quick may saw style after money mrs.
Effects present letters inquiry no an removed or friends. Desire behind latter me though in. Supposing shameless am he engrossed up additions. My possible peculiar together to. Desire so better am cannot he up before points. Remember mistaken opinions it pleasure of debating. Court front maids forty if aware their at. Chicken use are pressed removed.
To sorry world an at do spoil along. Incommode he depending do frankness remainder to. Edward day almost active him friend thirty piqued. People as period twenty my extent as. Set was better abroad ham plenty secure had horses. Admiration has sir decisively excellence say everything inhabiting acceptance. Sooner settle add put you sudden him.