HADOOP-7590. Mavenize streaming and MR examples. (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203941 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2011-11-19 01:24:32 +00:00
parent 905a127850
commit 26447229ba
154 changed files with 473 additions and 202 deletions

View File

@ -60,6 +60,8 @@ Trunk (unreleased changes)
HADOOP-7688. Add servlet handler check in HttpServer.start().
(Uma Maheswara Rao G via szetszwo)
HADOOP-7590. Mavenize streaming and MR examples. (tucu)
BUGS
HADOOP-7606. Upgrade Jackson to version 1.7.1 to match the version required

View File

@ -82,6 +82,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>

View File

@ -176,7 +176,7 @@ public abstract class ClusterMapReduceTestCase extends TestCase {
* @return path to the input directory for the tescase.
*/
protected Path getInputDir() {
return new Path("input");
return new Path("target/input");
}
/**
@ -185,7 +185,7 @@ public abstract class ClusterMapReduceTestCase extends TestCase {
* @return path to the output directory for the tescase.
*/
protected Path getOutputDir() {
return new Path("output");
return new Path("target/output");
}
/**

View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>0.24.0-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>
<version>0.24.0-SNAPSHOT</version>
<description>Apache Hadoop MapReduce Examples</description>
<name>Apache Hadoop MapReduce Examples</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.InputFormat;
@ -53,6 +52,7 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@ -386,8 +386,11 @@ public final class DistSum extends Configured implements Tool {
@Override
public synchronized void init(Job job) throws IOException {
final Configuration conf = job.getConfiguration();
if (cluster == null)
cluster = new Cluster(JobTracker.getAddress(conf), conf);
if (cluster == null) {
String jobTrackerStr = conf.get("mapreduce.jobtracker.address", "localhost:8012");
cluster = new Cluster(NetUtils.createSocketAddr(jobTrackerStr), conf);
}
chooseMachine(conf).init(job);
}
@ -604,4 +607,4 @@ public final class DistSum extends Configured implements Tool {
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(null, new DistSum(), args));
}
}
}

View File

@ -35,12 +35,13 @@
<fork.mode>once</fork.mode>
<mr.basedir>${basedir}</mr.basedir>
</properties>
<modules>
<module>hadoop-yarn</module>
<module>hadoop-mapreduce-client</module>
<module>hadoop-mapreduce-client</module>
<module>hadoop-mapreduce-examples</module>
</modules>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
@ -106,7 +107,7 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@ -166,9 +167,9 @@
<artifactId>clover</artifactId>
<version>3.0.2</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
@ -321,7 +322,7 @@
</executions>
</plugin>
</plugins>
</build>
</build>
</profile>
<profile>
<id>dist</id>

View File

@ -45,7 +45,7 @@
<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
<commons-daemon.version>1.0.3</commons-daemon.version>
<test.build.dir>${project.build.directory}/test-dir</test.build.dir>
<test.build.data>${test.build.dir}</test.build.data>
</properties>
@ -99,6 +99,51 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -177,6 +222,11 @@
<version>1.8</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-servlet-tester</artifactId>
<version>6.1.26</version>
</dependency>
<dependency>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>

View File

@ -0,0 +1,121 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>0.24.0-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-streaming</artifactId>
<version>0.24.0-SNAPSHOT</version>
<description>Apache Hadoop MapReduce Streaming</description>
<name>Apache Hadoop MapReduce Streaming</name>
<packaging>jar</packaging>
<properties>
<hadoop.log.dir>${project.build.directory}/log</hadoop.log.dir>
<test.exclude.pattern>%regex[.*(TestStreamingBadRecords|TestStreamingCombiner|TestStreamingStatus|TestUlimit).*]</test.exclude.pattern>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>create-log-dir</id>
<phase>process-test-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<delete dir="${test.build.data}"/>
<mkdir dir="${test.build.data}"/>
<mkdir dir="${hadoop.log.dir}"/>
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -80,13 +80,13 @@ public class StreamJob implements Tool {
protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
final static String REDUCE_NONE = "NONE";
/** -----------Streaming CLI Implementation **/
private CommandLineParser parser = new BasicParser();
private CommandLineParser parser = new BasicParser();
private Options allOptions;
/**@deprecated use StreamJob() with ToolRunner or set the
* Configuration using {@link #setConf(Configuration)} and
* run with {@link #run(String[])}.
/**@deprecated use StreamJob() with ToolRunner or set the
* Configuration using {@link #setConf(Configuration)} and
* run with {@link #run(String[])}.
*/
@Deprecated
public StreamJob(String[] argv, boolean mayExit) {
@ -94,12 +94,12 @@ public class StreamJob implements Tool {
argv_ = argv;
this.config_ = new Configuration();
}
public StreamJob() {
setupOptions();
this.config_ = new Configuration();
}
@Override
public Configuration getConf() {
return config_;
@ -109,13 +109,13 @@ public class StreamJob implements Tool {
public void setConf(Configuration conf) {
this.config_ = conf;
}
@Override
public int run(String[] args) throws Exception {
try {
this.argv_ = args;
init();
preProcessArgs();
parseArgv();
if (printUsage) {
@ -123,7 +123,7 @@ public class StreamJob implements Tool {
return 0;
}
postProcessArgs();
setJobConf();
} catch (IllegalArgumentException ex) {
//ignore, since log will already be printed
@ -133,13 +133,13 @@ public class StreamJob implements Tool {
}
return submitAndMonitorJob();
}
/**
* This method creates a streaming job from the given argument list.
* The created object can be used and/or submitted to a jobtracker for
* The created object can be used and/or submitted to a jobtracker for
* execution by a job agent such as JobControl
* @param argv the list args for creating a streaming job
* @return the created JobConf object
* @return the created JobConf object
* @throws IOException
*/
static public JobConf createJob(String[] argv) throws IOException {
@ -154,7 +154,7 @@ public class StreamJob implements Tool {
}
/**
* This is the method that actually
* This is the method that actually
* intializes the job conf and submits the job
* to the jobtracker
* @throws IOException
@ -169,7 +169,7 @@ public class StreamJob implements Tool {
throw new IOException(ex.getMessage());
}
}
protected void init() {
try {
env_ = new Environment();
@ -186,7 +186,7 @@ public class StreamJob implements Tool {
}
void postProcessArgs() throws IOException {
if (inputSpecs_.size() == 0) {
fail("Required argument: -input <name>");
}
@ -253,7 +253,7 @@ public class StreamJob implements Tool {
LOG.error(oe.getMessage());
exitUsage(argv_.length > 0 && "-info".equals(argv_[0]));
}
if (cmdLine != null) {
detailedUsage_ = cmdLine.hasOption("info");
if (cmdLine.hasOption("help") || detailedUsage_) {
@ -263,21 +263,21 @@ public class StreamJob implements Tool {
verbose_ = cmdLine.hasOption("verbose");
background_ = cmdLine.hasOption("background");
debug_ = cmdLine.hasOption("debug")? debug_ + 1 : debug_;
String[] values = cmdLine.getOptionValues("input");
if (values != null && values.length > 0) {
for (String input : values) {
inputSpecs_.add(input);
}
}
output_ = cmdLine.getOptionValue("output");
mapCmd_ = cmdLine.getOptionValue("mapper");
comCmd_ = cmdLine.getOptionValue("combiner");
redCmd_ = cmdLine.getOptionValue("reducer");
output_ = cmdLine.getOptionValue("output");
mapCmd_ = cmdLine.getOptionValue("mapper");
comCmd_ = cmdLine.getOptionValue("combiner");
redCmd_ = cmdLine.getOptionValue("reducer");
lazyOutput_ = cmdLine.hasOption("lazyOutput");
values = cmdLine.getOptionValues("file");
if (values != null && values.length > 0) {
LOG.warn("-file option is deprecated, please use generic option" +
@ -306,34 +306,34 @@ public class StreamJob implements Tool {
LOG.warn("-dfs option is deprecated, please use -fs instead.");
config_.set("fs.default.name", fsName);
}
additionalConfSpec_ = cmdLine.getOptionValue("additionalconfspec");
inputFormatSpec_ = cmdLine.getOptionValue("inputformat");
additionalConfSpec_ = cmdLine.getOptionValue("additionalconfspec");
inputFormatSpec_ = cmdLine.getOptionValue("inputformat");
outputFormatSpec_ = cmdLine.getOptionValue("outputformat");
numReduceTasksSpec_ = cmdLine.getOptionValue("numReduceTasks");
numReduceTasksSpec_ = cmdLine.getOptionValue("numReduceTasks");
partitionerSpec_ = cmdLine.getOptionValue("partitioner");
inReaderSpec_ = cmdLine.getOptionValue("inputreader");
mapDebugSpec_ = cmdLine.getOptionValue("mapdebug");
inReaderSpec_ = cmdLine.getOptionValue("inputreader");
mapDebugSpec_ = cmdLine.getOptionValue("mapdebug");
reduceDebugSpec_ = cmdLine.getOptionValue("reducedebug");
ioSpec_ = cmdLine.getOptionValue("io");
String[] car = cmdLine.getOptionValues("cacheArchive");
String[] car = cmdLine.getOptionValues("cacheArchive");
if (null != car && car.length > 0){
LOG.warn("-cacheArchive option is deprecated, please use -archives instead.");
for(String s : car){
cacheArchives = (cacheArchives == null)?s :cacheArchives + "," + s;
cacheArchives = (cacheArchives == null)?s :cacheArchives + "," + s;
}
}
String[] caf = cmdLine.getOptionValues("cacheFile");
String[] caf = cmdLine.getOptionValues("cacheFile");
if (null != caf && caf.length > 0){
LOG.warn("-cacheFile option is deprecated, please use -files instead.");
for(String s : caf){
cacheFiles = (cacheFiles == null)?s :cacheFiles + "," + s;
cacheFiles = (cacheFiles == null)?s :cacheFiles + "," + s;
}
}
String[] jobconf = cmdLine.getOptionValues("jobconf");
String[] jobconf = cmdLine.getOptionValues("jobconf");
if (null != jobconf && jobconf.length > 0){
LOG.warn("-jobconf option is deprecated, please use -D instead.");
for(String s : jobconf){
@ -341,8 +341,8 @@ public class StreamJob implements Tool {
config_.set(parts[0], parts[1]);
}
}
String[] cmd = cmdLine.getOptionValues("cmdenv");
String[] cmd = cmdLine.getOptionValues("cmdenv");
if (null != cmd && cmd.length > 0){
for(String s : cmd) {
if (addTaskEnvironment_.length() > 0) {
@ -361,8 +361,8 @@ public class StreamJob implements Tool {
System.out.println("STREAM: " + msg);
}
}
private Option createOption(String name, String desc,
private Option createOption(String name, String desc,
String argName, int max, boolean required){
return OptionBuilder
.withArgName(argName)
@ -371,87 +371,87 @@ public class StreamJob implements Tool {
.isRequired(required)
.create(name);
}
private Option createBoolOption(String name, String desc){
return OptionBuilder.withDescription(desc).create(name);
}
private void validate(final List<String> values)
private void validate(final List<String> values)
throws IllegalArgumentException {
for (String file : values) {
File f = new File(file);
File f = new File(file);
if (!f.canRead()) {
fail("File: " + f.getAbsolutePath()
+ " does not exist, or is not readable.");
fail("File: " + f.getAbsolutePath()
+ " does not exist, or is not readable.");
}
}
}
private void setupOptions(){
// input and output are not required for -info and -help options,
// though they are required for streaming job to be run.
Option input = createOption("input",
"DFS input file(s) for the Map step",
"path",
Integer.MAX_VALUE,
false);
Option output = createOption("output",
"DFS output directory for the Reduce step",
"path", 1, false);
Option mapper = createOption("mapper",
Option input = createOption("input",
"DFS input file(s) for the Map step",
"path",
Integer.MAX_VALUE,
false);
Option output = createOption("output",
"DFS output directory for the Reduce step",
"path", 1, false);
Option mapper = createOption("mapper",
"The streaming command to run", "cmd", 1, false);
Option combiner = createOption("combiner",
Option combiner = createOption("combiner",
"The streaming command to run", "cmd", 1, false);
// reducer could be NONE
Option reducer = createOption("reducer",
"The streaming command to run", "cmd", 1, false);
Option file = createOption("file",
"File to be shipped in the Job jar file",
"file", Integer.MAX_VALUE, false);
Option dfs = createOption("dfs",
"Optional. Override DFS configuration", "<h:p>|local", 1, false);
Option additionalconfspec = createOption("additionalconfspec",
// reducer could be NONE
Option reducer = createOption("reducer",
"The streaming command to run", "cmd", 1, false);
Option file = createOption("file",
"File to be shipped in the Job jar file",
"file", Integer.MAX_VALUE, false);
Option dfs = createOption("dfs",
"Optional. Override DFS configuration", "<h:p>|local", 1, false);
Option additionalconfspec = createOption("additionalconfspec",
"Optional.", "spec", 1, false);
Option inputformat = createOption("inputformat",
Option inputformat = createOption("inputformat",
"Optional.", "spec", 1, false);
Option outputformat = createOption("outputformat",
Option outputformat = createOption("outputformat",
"Optional.", "spec", 1, false);
Option partitioner = createOption("partitioner",
Option partitioner = createOption("partitioner",
"Optional.", "spec", 1, false);
Option numReduceTasks = createOption("numReduceTasks",
Option numReduceTasks = createOption("numReduceTasks",
"Optional.", "spec",1, false );
Option inputreader = createOption("inputreader",
Option inputreader = createOption("inputreader",
"Optional.", "spec", 1, false);
Option mapDebug = createOption("mapdebug",
"Optional.", "spec", 1, false);
Option reduceDebug = createOption("reducedebug",
"Optional", "spec",1, false);
Option jobconf =
createOption("jobconf",
"(n=v) Optional. Add or override a JobConf property.",
Option jobconf =
createOption("jobconf",
"(n=v) Optional. Add or override a JobConf property.",
"spec", 1, false);
Option cmdenv =
createOption("cmdenv", "(n=v) Pass env.var to streaming commands.",
Option cmdenv =
createOption("cmdenv", "(n=v) Pass env.var to streaming commands.",
"spec", 1, false);
Option cacheFile = createOption("cacheFile",
Option cacheFile = createOption("cacheFile",
"File name URI", "fileNameURI", Integer.MAX_VALUE, false);
Option cacheArchive = createOption("cacheArchive",
Option cacheArchive = createOption("cacheArchive",
"File name URI", "fileNameURI", Integer.MAX_VALUE, false);
Option io = createOption("io",
"Optional.", "spec", 1, false);
// boolean properties
Option background = createBoolOption("background", "Submit the job and don't wait till it completes.");
Option verbose = createBoolOption("verbose", "print verbose output");
Option info = createBoolOption("info", "print verbose output");
Option help = createBoolOption("help", "print this help message");
Option debug = createBoolOption("debug", "print debug output");
Option background = createBoolOption("background", "Submit the job and don't wait till it completes.");
Option verbose = createBoolOption("verbose", "print verbose output");
Option info = createBoolOption("info", "print verbose output");
Option help = createBoolOption("help", "print this help message");
Option debug = createBoolOption("debug", "print debug output");
Option lazyOutput = createBoolOption("lazyOutput", "create outputs lazily");
allOptions = new Options().
addOption(input).
addOption(output).
@ -490,9 +490,9 @@ public class StreamJob implements Tool {
System.out.println("Usage: $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar"
+ " [options]");
System.out.println("Options:");
System.out.println(" -input <path> DFS input file(s) for the Map"
System.out.println(" -input <path> DFS input file(s) for the Map"
+ " step.");
System.out.println(" -output <path> DFS output directory for the"
System.out.println(" -output <path> DFS output directory for the"
+ " Reduce step.");
System.out.println(" -mapper <cmd|JavaClassName> Optional. Command"
+ " to be run as mapper.");
@ -501,7 +501,7 @@ public class StreamJob implements Tool {
System.out.println(" -reducer <cmd|JavaClassName> Optional. Command"
+ " to be run as reducer.");
System.out.println(" -file <file> Optional. File/dir to be "
+ "shipped in the Job jar file.\n" +
+ "shipped in the Job jar file.\n" +
" Deprecated. Use generic option \"-files\" instead.");
System.out.println(" -inputformat <TextInputFormat(default)"
+ "|SequenceFileAsTextInputFormat|JavaClassName>\n"
@ -533,7 +533,7 @@ public class StreamJob implements Tool {
GenericOptionsParser.printGenericCommandUsage(System.out);
if (!detailed) {
System.out.println();
System.out.println();
System.out.println("For more details about these options:");
System.out.println("Use " +
"$HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar -info");
@ -592,7 +592,7 @@ public class StreamJob implements Tool {
System.out.println(" -D " + MRConfig.LOCAL_DIR + "=/tmp/local");
System.out.println(" -D " + JTConfig.JT_SYSTEM_DIR + "=/tmp/system");
System.out.println(" -D " + MRConfig.TEMP_DIR + "=/tmp/temp");
System.out.println("To treat tasks with non-zero exit status as SUCCEDED:");
System.out.println("To treat tasks with non-zero exit status as SUCCEDED:");
System.out.println(" -D stream.non.zero.exit.is.failure=false");
System.out.println("Use a custom hadoop streaming build along with standard"
+ " hadoop install:");
@ -621,7 +621,7 @@ public class StreamJob implements Tool {
System.out.println(" daily logs for days in month 2006-04");
}
public void fail(String message) {
public void fail(String message) {
System.err.println(message);
System.err.println("Try -help for more information");
throw new IllegalArgumentException(message);
@ -659,7 +659,7 @@ public class StreamJob implements Tool {
// $HADOOP_PREFIX/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
// where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_PREFIX
String runtimeClasses = config_.get("stream.shipped.hadoopstreaming"); // jar or class dir
if (runtimeClasses == null) {
runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
}
@ -700,7 +700,7 @@ public class StreamJob implements Tool {
builder.merge(packageFiles_, unjarFiles, jobJarName);
return jobJarName;
}
/**
* get the uris of all the files/caches
*/
@ -710,7 +710,7 @@ public class StreamJob implements Tool {
fileURIs = StringUtils.stringToURI(files);
archiveURIs = StringUtils.stringToURI(archives);
}
protected void setJobConf() throws IOException {
if (additionalConfSpec_ != null) {
LOG.warn("-additionalconfspec option is deprecated, please use -conf instead.");
@ -719,15 +719,15 @@ public class StreamJob implements Tool {
// general MapRed job properties
jobConf_ = new JobConf(config_, StreamJob.class);
// All streaming jobs get the task timeout value
// from the configuration settings.
// The correct FS must be set before this is called!
// (to resolve local vs. dfs drive letter differences)
// (to resolve local vs. dfs drive letter differences)
// (mapreduce.job.working.dir will be lazily initialized ONCE and depends on FS)
for (int i = 0; i < inputSpecs_.size(); i++) {
FileInputFormat.addInputPaths(jobConf_,
FileInputFormat.addInputPaths(jobConf_,
(String) inputSpecs_.get(i));
}
@ -773,7 +773,7 @@ public class StreamJob implements Tool {
fail("-inputformat : class not found : " + inputFormatSpec_);
}
}
}
}
if (fmt == null) {
fmt = StreamInputFormat.class;
}
@ -786,20 +786,20 @@ public class StreamJob implements Tool {
jobConf_.set("stream.reduce.input", ioSpec_);
jobConf_.set("stream.reduce.output", ioSpec_);
}
Class<? extends IdentifierResolver> idResolverClass =
Class<? extends IdentifierResolver> idResolverClass =
jobConf_.getClass("stream.io.identifier.resolver.class",
IdentifierResolver.class, IdentifierResolver.class);
IdentifierResolver idResolver = ReflectionUtils.newInstance(idResolverClass, jobConf_);
idResolver.resolve(jobConf_.get("stream.map.input", IdentifierResolver.TEXT_ID));
jobConf_.setClass("stream.map.input.writer.class",
idResolver.getInputWriterClass(), InputWriter.class);
idResolver.resolve(jobConf_.get("stream.reduce.input", IdentifierResolver.TEXT_ID));
jobConf_.setClass("stream.reduce.input.writer.class",
idResolver.getInputWriterClass(), InputWriter.class);
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
boolean isMapperACommand = false;
@ -811,7 +811,7 @@ public class StreamJob implements Tool {
isMapperACommand = true;
jobConf_.setMapperClass(PipeMapper.class);
jobConf_.setMapRunnerClass(PipeMapRunner.class);
jobConf_.set("stream.map.streamprocessor",
jobConf_.set("stream.map.streamprocessor",
URLEncoder.encode(mapCmd_, "UTF-8"));
}
}
@ -900,7 +900,7 @@ public class StreamJob implements Tool {
jobConf_.set(k, v);
}
}
FileOutputFormat.setOutputPath(jobConf_, new Path(output_));
fmt = null;
if (outputFormatSpec_!= null) {
@ -928,7 +928,7 @@ public class StreamJob implements Tool {
fail("-partitioner : class not found : " + partitionerSpec_);
}
}
if(mapDebugSpec_ != null){
jobConf_.setMapDebugScript(mapDebugSpec_);
}
@ -942,7 +942,7 @@ public class StreamJob implements Tool {
if (jar_ != null) {
jobConf_.setJar(jar_);
}
if ((cacheArchives != null) || (cacheFiles != null)){
getURIs(cacheArchives, cacheFiles);
boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs);
@ -955,11 +955,11 @@ public class StreamJob implements Tool {
DistributedCache.setCacheArchives(archiveURIs, jobConf_);
if (cacheFiles != null)
DistributedCache.setCacheFiles(fileURIs, jobConf_);
if (verbose_) {
listJobConfProperties();
}
msg("submitting to jobconf: " + getJobTrackerHostPort());
}
@ -1013,7 +1013,7 @@ public class StreamJob implements Tool {
LOG.error("Error launching job , Invalid job conf : " + je.getMessage());
return 3;
} catch(FileAlreadyExistsException fae) {
LOG.error("Error launching job , Output path already exists : "
LOG.error("Error launching job , Output path already exists : "
+ fae.getMessage());
return 4;
} catch(IOException ioe) {
@ -1047,9 +1047,9 @@ public class StreamJob implements Tool {
protected ArrayList<String> inputSpecs_ = new ArrayList<String>();
protected TreeSet<String> seenPrimary_ = new TreeSet<String>();
protected boolean hasSimpleInputSpecs_;
protected ArrayList<String> packageFiles_ = new ArrayList<String>();
protected ArrayList<String> packageFiles_ = new ArrayList<String>();
protected ArrayList<String> shippedCanonFiles_ = new ArrayList<String>();
//protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>();
//protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>();
protected String output_;
protected String mapCmd_;
protected String comCmd_;

Some files were not shown because too many files have changed in this diff Show More