Merge -r 1203940:1203941 from trunk to branch. FIXES: HADOOP-7590

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1203945 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2011-11-19 01:38:22 +00:00
parent 104bfdea85
commit c36a44d11b
154 changed files with 1089 additions and 201 deletions

View File

@ -13,6 +13,8 @@ Release 0.23.1 - Unreleased
HADOOP-7802. Hadoop scripts unconditionally source
"$bin"/../libexec/hadoop-config.sh. (Bruno Mahé via tomwhite)
HADOOP-7590. Mavenize streaming and MR examples. (tucu)
OPTIMIZATIONS
BUG FIXES

View File

@ -82,6 +82,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>

View File

@ -176,7 +176,7 @@ public abstract class ClusterMapReduceTestCase extends TestCase {
* @return path to the input directory for the tescase.
*/
protected Path getInputDir() {
return new Path("input");
return new Path("target/input");
}
/**
@ -185,7 +185,7 @@ public abstract class ClusterMapReduceTestCase extends TestCase {
* @return path to the output directory for the tescase.
*/
protected Path getOutputDir() {
return new Path("output");
return new Path("target/output");
}
/**

View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>0.24.0-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>
<version>0.24.0-SNAPSHOT</version>
<description>Apache Hadoop MapReduce Examples</description>
<name>Apache Hadoop MapReduce Examples</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,196 @@
package org.apache.hadoop.examples;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordMean extends Configured implements Tool {
private double mean = 0;
private final static Text COUNT = new Text("count");
private final static Text LENGTH = new Text("length");
private final static LongWritable ONE = new LongWritable(1);
/**
* Maps words from line of text into 2 key-value pairs; one key-value pair for
* counting the word, another for counting its length.
*/
public static class WordMeanMapper extends
Mapper<Object, Text, Text, LongWritable> {
private LongWritable wordLen = new LongWritable();
/**
* Emits 2 key-value pairs for counting the word and its length. Outputs are
* (Text, LongWritable).
*
* @param value
* This will be a line of text coming in from our input file.
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String string = itr.nextToken();
this.wordLen.set(string.length());
context.write(LENGTH, this.wordLen);
context.write(COUNT, ONE);
}
}
}
/**
* Performs integer summation of all the values for each key.
*/
public static class WordMeanReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable sum = new LongWritable();
/**
* Sums all the individual values within the iterator and writes them to the
* same key.
*
* @param key
* This will be one of 2 constants: LENGTH_STR or COUNT_STR.
* @param values
* This will be an iterator of all the values associated with that
* key.
*/
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
int theSum = 0;
for (LongWritable val : values) {
theSum += val.get();
}
sum.set(theSum);
context.write(key, sum);
}
}
/**
* Reads the output file and parses the summation of lengths, and the word
* count, to perform a quick calculation of the mean.
*
* @param path
* The path to find the output file in. Set in main to the output
* directory.
* @throws IOException
* If it cannot access the output directory, we throw an exception.
*/
private double readAndCalcMean(Path path, Configuration conf)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path file = new Path(path, "part-r-00000");
if (!fs.exists(file))
throw new IOException("Output not found!");
BufferedReader br = null;
// average = total sum / number of elements;
try {
br = new BufferedReader(new InputStreamReader(fs.open(file)));
long count = 0;
long length = 0;
String line;
while ((line = br.readLine()) != null) {
StringTokenizer st = new StringTokenizer(line);
// grab type
String type = st.nextToken();
// differentiate
if (type.equals(COUNT.toString())) {
String countLit = st.nextToken();
count = Long.parseLong(countLit);
} else if (type.equals(LENGTH.toString())) {
String lengthLit = st.nextToken();
length = Long.parseLong(lengthLit);
}
}
double theMean = (((double) length) / ((double) count));
System.out.println("The mean is: " + theMean);
return theMean;
} finally {
br.close();
}
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordMean(), args);
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: wordmean <in> <out>");
return 0;
}
Configuration conf = getConf();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "word mean");
job.setJarByClass(WordMean.class);
job.setMapperClass(WordMeanMapper.class);
job.setCombinerClass(WordMeanReducer.class);
job.setReducerClass(WordMeanReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputpath);
boolean result = job.waitForCompletion(true);
mean = readAndCalcMean(outputpath, conf);
return (result ? 0 : 1);
}
/**
* Only valuable after run() called.
*
* @return Returns the mean value.
*/
public double getMean() {
return mean;
}
}

View File

@ -0,0 +1,208 @@
package org.apache.hadoop.examples;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordMedian extends Configured implements Tool {
private double median = 0;
private final static IntWritable ONE = new IntWritable(1);
/**
* Maps words from line of text into a key-value pair; the length of the word
* as the key, and 1 as the value.
*/
public static class WordMedianMapper extends
Mapper<Object, Text, IntWritable, IntWritable> {
private IntWritable length = new IntWritable();
/**
* Emits a key-value pair for counting the word. Outputs are (IntWritable,
* IntWritable).
*
* @param value
* This will be a line of text coming in from our input file.
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String string = itr.nextToken();
length.set(string.length());
context.write(length, ONE);
}
}
}
/**
* Performs integer summation of all the values for each key.
*/
public static class WordMedianReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private IntWritable val = new IntWritable();
/**
* Sums all the individual values within the iterator and writes them to the
* same key.
*
* @param key
* This will be a length of a word that was read.
* @param values
* This will be an iterator of all the values associated with that
* key.
*/
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
val.set(sum);
context.write(key, val);
}
}
/**
* This is a standard program to read and find a median value based on a file
* of word counts such as: 1 456, 2 132, 3 56... Where the first values are
* the word lengths and the following values are the number of times that
* words of that length appear.
*
* @param path
* The path to read the HDFS file from (part-r-00000...00001...etc).
* @param medianIndex1
* The first length value to look for.
* @param medianIndex2
* The second length value to look for (will be the same as the first
* if there are an even number of words total).
* @throws IOException
* If file cannot be found, we throw an exception.
* */
private double readAndFindMedian(String path, int medianIndex1,
int medianIndex2, Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path file = new Path(path, "part-r-00000");
if (!fs.exists(file))
throw new IOException("Output not found!");
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(fs.open(file)));
int num = 0;
String line;
while ((line = br.readLine()) != null) {
StringTokenizer st = new StringTokenizer(line);
// grab length
String currLen = st.nextToken();
// grab count
String lengthFreq = st.nextToken();
int prevNum = num;
num += Integer.parseInt(lengthFreq);
if (medianIndex2 >= prevNum && medianIndex1 <= num) {
System.out.println("The median is: " + currLen);
br.close();
return Double.parseDouble(currLen);
} else if (medianIndex2 >= prevNum && medianIndex1 < num) {
String nextCurrLen = st.nextToken();
double theMedian = (Integer.parseInt(currLen) + Integer
.parseInt(nextCurrLen)) / 2.0;
System.out.println("The median is: " + theMedian);
br.close();
return theMedian;
}
}
} finally {
br.close();
}
// error, no median found
return -1;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordMedian(), args);
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: wordmedian <in> <out>");
return 0;
}
setConf(new Configuration());
Configuration conf = getConf();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "word median");
job.setJarByClass(WordMedian.class);
job.setMapperClass(WordMedianMapper.class);
job.setCombinerClass(WordMedianReducer.class);
job.setReducerClass(WordMedianReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
// Wait for JOB 1 -- get middle value to check for Median
long totalWords = job.getCounters()
.getGroup(TaskCounter.class.getCanonicalName())
.findCounter("MAP_OUTPUT_RECORDS", "Map output records").getValue();
int medianIndex1 = (int) Math.ceil((totalWords / 2.0));
int medianIndex2 = (int) Math.floor((totalWords / 2.0));
median = readAndFindMedian(args[1], medianIndex1, medianIndex2, conf);
return (result ? 0 : 1);
}
public double getMedian() {
return median;
}
}

View File

@ -0,0 +1,210 @@
package org.apache.hadoop.examples;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordStandardDeviation extends Configured implements Tool {
private double stddev = 0;
private final static Text LENGTH = new Text("length");
private final static Text SQUARE = new Text("square");
private final static Text COUNT = new Text("count");
private final static LongWritable ONE = new LongWritable(1);
/**
* Maps words from line of text into 3 key-value pairs; one key-value pair for
* counting the word, one for counting its length, and one for counting the
* square of its length.
*/
public static class WordStandardDeviationMapper extends
Mapper<Object, Text, Text, LongWritable> {
private LongWritable wordLen = new LongWritable();
private LongWritable wordLenSq = new LongWritable();
/**
* Emits 3 key-value pairs for counting the word, its length, and the
* squares of its length. Outputs are (Text, LongWritable).
*
* @param value
* This will be a line of text coming in from our input file.
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String string = itr.nextToken();
this.wordLen.set(string.length());
// the square of an integer is an integer...
this.wordLenSq.set((long) Math.pow(string.length(), 2.0));
context.write(LENGTH, this.wordLen);
context.write(SQUARE, this.wordLenSq);
context.write(COUNT, ONE);
}
}
}
/**
* Performs integer summation of all the values for each key.
*/
public static class WordStandardDeviationReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable val = new LongWritable();
/**
* Sums all the individual values within the iterator and writes them to the
* same key.
*
* @param key
* This will be one of 2 constants: LENGTH_STR, COUNT_STR, or
* SQUARE_STR.
* @param values
* This will be an iterator of all the values associated with that
* key.
*/
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
val.set(sum);
context.write(key, val);
}
}
/**
* Reads the output file and parses the summation of lengths, the word count,
* and the lengths squared, to perform a quick calculation of the standard
* deviation.
*
* @param path
* The path to find the output file in. Set in main to the output
* directory.
* @throws IOException
* If it cannot access the output directory, we throw an exception.
*/
private double readAndCalcStdDev(Path path, Configuration conf)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path file = new Path(path, "part-r-00000");
if (!fs.exists(file))
throw new IOException("Output not found!");
double stddev = 0;
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(fs.open(file)));
long count = 0;
long length = 0;
long square = 0;
String line;
while ((line = br.readLine()) != null) {
StringTokenizer st = new StringTokenizer(line);
// grab type
String type = st.nextToken();
// differentiate
if (type.equals(COUNT.toString())) {
String countLit = st.nextToken();
count = Long.parseLong(countLit);
} else if (type.equals(LENGTH.toString())) {
String lengthLit = st.nextToken();
length = Long.parseLong(lengthLit);
} else if (type.equals(SQUARE.toString())) {
String squareLit = st.nextToken();
square = Long.parseLong(squareLit);
}
}
// average = total sum / number of elements;
double mean = (((double) length) / ((double) count));
// standard deviation = sqrt((sum(lengths ^ 2)/count) - (mean ^ 2))
mean = Math.pow(mean, 2.0);
double term = (((double) square / ((double) count)));
stddev = Math.sqrt((term - mean));
System.out.println("The standard deviation is: " + stddev);
} finally {
br.close();
}
return stddev;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordStandardDeviation(),
args);
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: wordstddev <in> <out>");
return 0;
}
Configuration conf = getConf();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "word stddev");
job.setJarByClass(WordStandardDeviation.class);
job.setMapperClass(WordStandardDeviationMapper.class);
job.setCombinerClass(WordStandardDeviationReducer.class);
job.setReducerClass(WordStandardDeviationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputpath);
boolean result = job.waitForCompletion(true);
// read output and calculate standard deviation
stddev = readAndCalcStdDev(outputpath, conf);
return (result ? 0 : 1);
}
public double getStandardDeviation() {
return stddev;
}
}

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.InputFormat;
@ -53,6 +52,7 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@ -386,8 +386,11 @@ public final class DistSum extends Configured implements Tool {
@Override
public synchronized void init(Job job) throws IOException {
final Configuration conf = job.getConfiguration();
if (cluster == null)
cluster = new Cluster(JobTracker.getAddress(conf), conf);
if (cluster == null) {
String jobTrackerStr = conf.get("mapreduce.jobtracker.address", "localhost:8012");
cluster = new Cluster(NetUtils.createSocketAddr(jobTrackerStr), conf);
}
chooseMachine(conf).init(job);
}
@ -604,4 +607,4 @@ public final class DistSum extends Configured implements Tool {
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(null, new DistSum(), args));
}
}
}

View File

@ -35,12 +35,13 @@
<fork.mode>once</fork.mode>
<mr.basedir>${basedir}</mr.basedir>
</properties>
<modules>
<module>hadoop-yarn</module>
<module>hadoop-mapreduce-client</module>
<module>hadoop-mapreduce-client</module>
<module>hadoop-mapreduce-examples</module>
</modules>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
@ -106,7 +107,7 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@ -166,9 +167,9 @@
<artifactId>clover</artifactId>
<version>3.0.2</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
@ -321,7 +322,7 @@
</executions>
</plugin>
</plugins>
</build>
</build>
</profile>
<profile>
<id>dist</id>

View File

@ -45,6 +45,9 @@
<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
<commons-daemon.version>1.0.3</commons-daemon.version>
<test.build.dir>${project.build.directory}/test-dir</test.build.dir>
<test.build.data>${test.build.dir}</test.build.data>
</properties>
<dependencyManagement>
@ -96,6 +99,51 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -174,6 +222,11 @@
<version>1.8</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-servlet-tester</artifactId>
<version>6.1.26</version>
</dependency>
<dependency>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>

View File

@ -0,0 +1,121 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>0.23.0-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-streaming</artifactId>
<version>0.23.0-SNAPSHOT</version>
<description>Apache Hadoop MapReduce Streaming</description>
<name>Apache Hadoop MapReduce Streaming</name>
<packaging>jar</packaging>
<properties>
<hadoop.log.dir>${project.build.directory}/log</hadoop.log.dir>
<test.exclude.pattern>%regex[.*(TestStreamingBadRecords|TestStreamingCombiner|TestStreamingStatus|TestUlimit).*]</test.exclude.pattern>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>create-log-dir</id>
<phase>process-test-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<delete dir="${test.build.data}"/>
<mkdir dir="${test.build.data}"/>
<mkdir dir="${hadoop.log.dir}"/>
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -80,13 +80,13 @@ public class StreamJob implements Tool {
protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
final static String REDUCE_NONE = "NONE";
/** -----------Streaming CLI Implementation **/
private CommandLineParser parser = new BasicParser();
private CommandLineParser parser = new BasicParser();
private Options allOptions;
/**@deprecated use StreamJob() with ToolRunner or set the
* Configuration using {@link #setConf(Configuration)} and
* run with {@link #run(String[])}.
/**@deprecated use StreamJob() with ToolRunner or set the
* Configuration using {@link #setConf(Configuration)} and
* run with {@link #run(String[])}.
*/
@Deprecated
public StreamJob(String[] argv, boolean mayExit) {
@ -94,12 +94,12 @@ public class StreamJob implements Tool {
argv_ = argv;
this.config_ = new Configuration();
}
public StreamJob() {
setupOptions();
this.config_ = new Configuration();
}
@Override
public Configuration getConf() {
return config_;
@ -109,13 +109,13 @@ public class StreamJob implements Tool {
public void setConf(Configuration conf) {
this.config_ = conf;
}
@Override
public int run(String[] args) throws Exception {
try {
this.argv_ = args;
init();
preProcessArgs();
parseArgv();
if (printUsage) {
@ -123,7 +123,7 @@ public class StreamJob implements Tool {
return 0;
}
postProcessArgs();
setJobConf();
} catch (IllegalArgumentException ex) {
//ignore, since log will already be printed
@ -133,13 +133,13 @@ public class StreamJob implements Tool {
}
return submitAndMonitorJob();
}
/**
* This method creates a streaming job from the given argument list.
* The created object can be used and/or submitted to a jobtracker for
* The created object can be used and/or submitted to a jobtracker for
* execution by a job agent such as JobControl
* @param argv the list args for creating a streaming job
* @return the created JobConf object
* @return the created JobConf object
* @throws IOException
*/
static public JobConf createJob(String[] argv) throws IOException {
@ -154,7 +154,7 @@ public class StreamJob implements Tool {
}
/**
* This is the method that actually
* This is the method that actually
* intializes the job conf and submits the job
* to the jobtracker
* @throws IOException
@ -169,7 +169,7 @@ public class StreamJob implements Tool {
throw new IOException(ex.getMessage());
}
}
protected void init() {
try {
env_ = new Environment();
@ -186,7 +186,7 @@ public class StreamJob implements Tool {
}
void postProcessArgs() throws IOException {
if (inputSpecs_.size() == 0) {
fail("Required argument: -input <name>");
}
@ -253,7 +253,7 @@ public class StreamJob implements Tool {
LOG.error(oe.getMessage());
exitUsage(argv_.length > 0 && "-info".equals(argv_[0]));
}
if (cmdLine != null) {
detailedUsage_ = cmdLine.hasOption("info");
if (cmdLine.hasOption("help") || detailedUsage_) {
@ -263,21 +263,21 @@ public class StreamJob implements Tool {
verbose_ = cmdLine.hasOption("verbose");
background_ = cmdLine.hasOption("background");
debug_ = cmdLine.hasOption("debug")? debug_ + 1 : debug_;
String[] values = cmdLine.getOptionValues("input");
if (values != null && values.length > 0) {
for (String input : values) {
inputSpecs_.add(input);
}
}
output_ = cmdLine.getOptionValue("output");
mapCmd_ = cmdLine.getOptionValue("mapper");
comCmd_ = cmdLine.getOptionValue("combiner");
redCmd_ = cmdLine.getOptionValue("reducer");
output_ = cmdLine.getOptionValue("output");
mapCmd_ = cmdLine.getOptionValue("mapper");
comCmd_ = cmdLine.getOptionValue("combiner");
redCmd_ = cmdLine.getOptionValue("reducer");
lazyOutput_ = cmdLine.hasOption("lazyOutput");
values = cmdLine.getOptionValues("file");
if (values != null && values.length > 0) {
LOG.warn("-file option is deprecated, please use generic option" +
@ -306,34 +306,34 @@ public class StreamJob implements Tool {
LOG.warn("-dfs option is deprecated, please use -fs instead.");
config_.set("fs.default.name", fsName);
}
additionalConfSpec_ = cmdLine.getOptionValue("additionalconfspec");
inputFormatSpec_ = cmdLine.getOptionValue("inputformat");
additionalConfSpec_ = cmdLine.getOptionValue("additionalconfspec");
inputFormatSpec_ = cmdLine.getOptionValue("inputformat");
outputFormatSpec_ = cmdLine.getOptionValue("outputformat");
numReduceTasksSpec_ = cmdLine.getOptionValue("numReduceTasks");
numReduceTasksSpec_ = cmdLine.getOptionValue("numReduceTasks");
partitionerSpec_ = cmdLine.getOptionValue("partitioner");
inReaderSpec_ = cmdLine.getOptionValue("inputreader");
mapDebugSpec_ = cmdLine.getOptionValue("mapdebug");
inReaderSpec_ = cmdLine.getOptionValue("inputreader");
mapDebugSpec_ = cmdLine.getOptionValue("mapdebug");
reduceDebugSpec_ = cmdLine.getOptionValue("reducedebug");
ioSpec_ = cmdLine.getOptionValue("io");
String[] car = cmdLine.getOptionValues("cacheArchive");
String[] car = cmdLine.getOptionValues("cacheArchive");
if (null != car && car.length > 0){
LOG.warn("-cacheArchive option is deprecated, please use -archives instead.");
for(String s : car){
cacheArchives = (cacheArchives == null)?s :cacheArchives + "," + s;
cacheArchives = (cacheArchives == null)?s :cacheArchives + "," + s;
}
}
String[] caf = cmdLine.getOptionValues("cacheFile");
String[] caf = cmdLine.getOptionValues("cacheFile");
if (null != caf && caf.length > 0){
LOG.warn("-cacheFile option is deprecated, please use -files instead.");
for(String s : caf){
cacheFiles = (cacheFiles == null)?s :cacheFiles + "," + s;
cacheFiles = (cacheFiles == null)?s :cacheFiles + "," + s;
}
}
String[] jobconf = cmdLine.getOptionValues("jobconf");
String[] jobconf = cmdLine.getOptionValues("jobconf");
if (null != jobconf && jobconf.length > 0){
LOG.warn("-jobconf option is deprecated, please use -D instead.");
for(String s : jobconf){
@ -341,8 +341,8 @@ public class StreamJob implements Tool {
config_.set(parts[0], parts[1]);
}
}
String[] cmd = cmdLine.getOptionValues("cmdenv");
String[] cmd = cmdLine.getOptionValues("cmdenv");
if (null != cmd && cmd.length > 0){
for(String s : cmd) {
if (addTaskEnvironment_.length() > 0) {
@ -361,8 +361,8 @@ public class StreamJob implements Tool {
System.out.println("STREAM: " + msg);
}
}
private Option createOption(String name, String desc,
private Option createOption(String name, String desc,
String argName, int max, boolean required){
return OptionBuilder
.withArgName(argName)
@ -371,87 +371,87 @@ public class StreamJob implements Tool {
.isRequired(required)
.create(name);
}
private Option createBoolOption(String name, String desc){
return OptionBuilder.withDescription(desc).create(name);
}
private void validate(final List<String> values)
private void validate(final List<String> values)
throws IllegalArgumentException {
for (String file : values) {
File f = new File(file);
File f = new File(file);
if (!f.canRead()) {
fail("File: " + f.getAbsolutePath()
+ " does not exist, or is not readable.");
fail("File: " + f.getAbsolutePath()
+ " does not exist, or is not readable.");
}
}
}
private void setupOptions(){
// input and output are not required for -info and -help options,
// though they are required for streaming job to be run.
Option input = createOption("input",
"DFS input file(s) for the Map step",
"path",
Integer.MAX_VALUE,
false);
Option output = createOption("output",
"DFS output directory for the Reduce step",
"path", 1, false);
Option mapper = createOption("mapper",
Option input = createOption("input",
"DFS input file(s) for the Map step",
"path",
Integer.MAX_VALUE,
false);
Option output = createOption("output",
"DFS output directory for the Reduce step",
"path", 1, false);
Option mapper = createOption("mapper",
"The streaming command to run", "cmd", 1, false);
Option combiner = createOption("combiner",
Option combiner = createOption("combiner",
"The streaming command to run", "cmd", 1, false);
// reducer could be NONE
Option reducer = createOption("reducer",
"The streaming command to run", "cmd", 1, false);
Option file = createOption("file",
"File to be shipped in the Job jar file",
"file", Integer.MAX_VALUE, false);
Option dfs = createOption("dfs",
"Optional. Override DFS configuration", "<h:p>|local", 1, false);
Option additionalconfspec = createOption("additionalconfspec",
// reducer could be NONE
Option reducer = createOption("reducer",
"The streaming command to run", "cmd", 1, false);
Option file = createOption("file",
"File to be shipped in the Job jar file",
"file", Integer.MAX_VALUE, false);
Option dfs = createOption("dfs",
"Optional. Override DFS configuration", "<h:p>|local", 1, false);
Option additionalconfspec = createOption("additionalconfspec",
"Optional.", "spec", 1, false);
Option inputformat = createOption("inputformat",
Option inputformat = createOption("inputformat",
"Optional.", "spec", 1, false);
Option outputformat = createOption("outputformat",
Option outputformat = createOption("outputformat",
"Optional.", "spec", 1, false);
Option partitioner = createOption("partitioner",
Option partitioner = createOption("partitioner",
"Optional.", "spec", 1, false);
Option numReduceTasks = createOption("numReduceTasks",
Option numReduceTasks = createOption("numReduceTasks",
"Optional.", "spec",1, false );
Option inputreader = createOption("inputreader",
Option inputreader = createOption("inputreader",
"Optional.", "spec", 1, false);
Option mapDebug = createOption("mapdebug",
"Optional.", "spec", 1, false);
Option reduceDebug = createOption("reducedebug",
"Optional", "spec",1, false);
Option jobconf =
createOption("jobconf",
"(n=v) Optional. Add or override a JobConf property.",
Option jobconf =
createOption("jobconf",
"(n=v) Optional. Add or override a JobConf property.",
"spec", 1, false);
Option cmdenv =
createOption("cmdenv", "(n=v) Pass env.var to streaming commands.",
Option cmdenv =
createOption("cmdenv", "(n=v) Pass env.var to streaming commands.",
"spec", 1, false);
Option cacheFile = createOption("cacheFile",
Option cacheFile = createOption("cacheFile",
"File name URI", "fileNameURI", Integer.MAX_VALUE, false);
Option cacheArchive = createOption("cacheArchive",
Option cacheArchive = createOption("cacheArchive",
"File name URI", "fileNameURI", Integer.MAX_VALUE, false);
Option io = createOption("io",
"Optional.", "spec", 1, false);
// boolean properties
Option background = createBoolOption("background", "Submit the job and don't wait till it completes.");
Option verbose = createBoolOption("verbose", "print verbose output");
Option info = createBoolOption("info", "print verbose output");
Option help = createBoolOption("help", "print this help message");
Option debug = createBoolOption("debug", "print debug output");
Option background = createBoolOption("background", "Submit the job and don't wait till it completes.");
Option verbose = createBoolOption("verbose", "print verbose output");
Option info = createBoolOption("info", "print verbose output");
Option help = createBoolOption("help", "print this help message");
Option debug = createBoolOption("debug", "print debug output");
Option lazyOutput = createBoolOption("lazyOutput", "create outputs lazily");
allOptions = new Options().
addOption(input).
addOption(output).
@ -490,9 +490,9 @@ public class StreamJob implements Tool {
System.out.println("Usage: $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar"
+ " [options]");
System.out.println("Options:");
System.out.println(" -input <path> DFS input file(s) for the Map"
System.out.println(" -input <path> DFS input file(s) for the Map"
+ " step.");
System.out.println(" -output <path> DFS output directory for the"
System.out.println(" -output <path> DFS output directory for the"
+ " Reduce step.");
System.out.println(" -mapper <cmd|JavaClassName> Optional. Command"
+ " to be run as mapper.");
@ -501,7 +501,7 @@ public class StreamJob implements Tool {
System.out.println(" -reducer <cmd|JavaClassName> Optional. Command"
+ " to be run as reducer.");
System.out.println(" -file <file> Optional. File/dir to be "
+ "shipped in the Job jar file.\n" +
+ "shipped in the Job jar file.\n" +
" Deprecated. Use generic option \"-files\" instead.");
System.out.println(" -inputformat <TextInputFormat(default)"
+ "|SequenceFileAsTextInputFormat|JavaClassName>\n"
@ -533,7 +533,7 @@ public class StreamJob implements Tool {
GenericOptionsParser.printGenericCommandUsage(System.out);
if (!detailed) {
System.out.println();
System.out.println();
System.out.println("For more details about these options:");
System.out.println("Use " +
"$HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar -info");
@ -592,7 +592,7 @@ public class StreamJob implements Tool {
System.out.println(" -D " + MRConfig.LOCAL_DIR + "=/tmp/local");
System.out.println(" -D " + JTConfig.JT_SYSTEM_DIR + "=/tmp/system");
System.out.println(" -D " + MRConfig.TEMP_DIR + "=/tmp/temp");
System.out.println("To treat tasks with non-zero exit status as SUCCEDED:");
System.out.println("To treat tasks with non-zero exit status as SUCCEDED:");
System.out.println(" -D stream.non.zero.exit.is.failure=false");
System.out.println("Use a custom hadoop streaming build along with standard"
+ " hadoop install:");
@ -621,7 +621,7 @@ public class StreamJob implements Tool {
System.out.println(" daily logs for days in month 2006-04");
}
public void fail(String message) {
public void fail(String message) {
System.err.println(message);
System.err.println("Try -help for more information");
throw new IllegalArgumentException(message);
@ -659,7 +659,7 @@ public class StreamJob implements Tool {
// $HADOOP_PREFIX/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
// where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_PREFIX
String runtimeClasses = config_.get("stream.shipped.hadoopstreaming"); // jar or class dir
if (runtimeClasses == null) {
runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
}
@ -700,7 +700,7 @@ public class StreamJob implements Tool {
builder.merge(packageFiles_, unjarFiles, jobJarName);
return jobJarName;
}
/**
* get the uris of all the files/caches
*/
@ -710,7 +710,7 @@ public class StreamJob implements Tool {
fileURIs = StringUtils.stringToURI(files);
archiveURIs = StringUtils.stringToURI(archives);
}
protected void setJobConf() throws IOException {
if (additionalConfSpec_ != null) {
LOG.warn("-additionalconfspec option is deprecated, please use -conf instead.");
@ -719,15 +719,15 @@ public class StreamJob implements Tool {
// general MapRed job properties
jobConf_ = new JobConf(config_, StreamJob.class);
// All streaming jobs get the task timeout value
// from the configuration settings.
// The correct FS must be set before this is called!
// (to resolve local vs. dfs drive letter differences)
// (to resolve local vs. dfs drive letter differences)
// (mapreduce.job.working.dir will be lazily initialized ONCE and depends on FS)
for (int i = 0; i < inputSpecs_.size(); i++) {
FileInputFormat.addInputPaths(jobConf_,
FileInputFormat.addInputPaths(jobConf_,
(String) inputSpecs_.get(i));
}
@ -773,7 +773,7 @@ public class StreamJob implements Tool {
fail("-inputformat : class not found : " + inputFormatSpec_);
}
}
}
}
if (fmt == null) {
fmt = StreamInputFormat.class;
}
@ -786,20 +786,20 @@ public class StreamJob implements Tool {
jobConf_.set("stream.reduce.input", ioSpec_);
jobConf_.set("stream.reduce.output", ioSpec_);
}
Class<? extends IdentifierResolver> idResolverClass =
Class<? extends IdentifierResolver> idResolverClass =
jobConf_.getClass("stream.io.identifier.resolver.class",
IdentifierResolver.class, IdentifierResolver.class);
IdentifierResolver idResolver = ReflectionUtils.newInstance(idResolverClass, jobConf_);
idResolver.resolve(jobConf_.get("stream.map.input", IdentifierResolver.TEXT_ID));
jobConf_.setClass("stream.map.input.writer.class",
idResolver.getInputWriterClass(), InputWriter.class);
idResolver.resolve(jobConf_.get("stream.reduce.input", IdentifierResolver.TEXT_ID));
jobConf_.setClass("stream.reduce.input.writer.class",
idResolver.getInputWriterClass(), InputWriter.class);
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
boolean isMapperACommand = false;
@ -811,7 +811,7 @@ public class StreamJob implements Tool {
isMapperACommand = true;
jobConf_.setMapperClass(PipeMapper.class);
jobConf_.setMapRunnerClass(PipeMapRunner.class);
jobConf_.set("stream.map.streamprocessor",
jobConf_.set("stream.map.streamprocessor",
URLEncoder.encode(mapCmd_, "UTF-8"));
}
}
@ -900,7 +900,7 @@ public class StreamJob implements Tool {
jobConf_.set(k, v);
}
}
FileOutputFormat.setOutputPath(jobConf_, new Path(output_));
fmt = null;
if (outputFormatSpec_!= null) {
@ -928,7 +928,7 @@ public class StreamJob implements Tool {
fail("-partitioner : class not found : " + partitionerSpec_);
}
}
if(mapDebugSpec_ != null){
jobConf_.setMapDebugScript(mapDebugSpec_);
}
@ -942,7 +942,7 @@ public class StreamJob implements Tool {
if (jar_ != null) {
jobConf_.setJar(jar_);
}
if ((cacheArchives != null) || (cacheFiles != null)){
getURIs(cacheArchives, cacheFiles);
boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs);
@ -955,11 +955,11 @@ public class StreamJob implements Tool {
DistributedCache.setCacheArchives(archiveURIs, jobConf_);
if (cacheFiles != null)
DistributedCache.setCacheFiles(fileURIs, jobConf_);
if (verbose_) {
listJobConfProperties();
}
msg("submitting to jobconf: " + getJobTrackerHostPort());
}
@ -1013,7 +1013,7 @@ public class StreamJob implements Tool {
LOG.error("Error launching job , Invalid job conf : " + je.getMessage());
return 3;
} catch(FileAlreadyExistsException fae) {
LOG.error("Error launching job , Output path already exists : "
LOG.error("Error launching job , Output path already exists : "
+ fae.getMessage());
return 4;
} catch(IOException ioe) {
@ -1047,9 +1047,9 @@ public class StreamJob implements Tool {
protected ArrayList<String> inputSpecs_ = new ArrayList<String>();
protected TreeSet<String> seenPrimary_ = new TreeSet<String>();
protected boolean hasSimpleInputSpecs_;
protected ArrayList<String> packageFiles_ = new ArrayList<String>();
protected ArrayList<String> packageFiles_ = new ArrayList<String>();
protected ArrayList<String> shippedCanonFiles_ = new ArrayList<String>();
//protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>();
//protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>();
protected String output_;
protected String mapCmd_;
protected String comCmd_;

Some files were not shown because too many files have changed in this diff Show More