MAPREDUCE-5807. Print usage for TeraSort job. Contributed by Rohith.

This commit is contained in:
Harsh J 2015-03-18 15:34:44 +05:30
parent 34117325b2
commit 9d72f93975
8 changed files with 123 additions and 20 deletions

View File

@ -253,6 +253,8 @@ Release 2.8.0 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-5807. Print usage by TeraSort job. (Rohith via harsh)
MAPREDUCE-4653. TestRandomAlgorithm has an unused "import" statement. MAPREDUCE-4653. TestRandomAlgorithm has an unused "import" statement.
(Amir Sanjar via harsh) (Amir Sanjar via harsh)

View File

@ -70,7 +70,6 @@ public class TeraGen extends Configured implements Tool {
public static enum Counters {CHECKSUM} public static enum Counters {CHECKSUM}
public static final String NUM_ROWS = "mapreduce.terasort.num-rows";
/** /**
* An input format that assigns ranges of longs to each mapper. * An input format that assigns ranges of longs to each mapper.
*/ */
@ -189,11 +188,12 @@ public class TeraGen extends Configured implements Tool {
} }
static long getNumberOfRows(JobContext job) { static long getNumberOfRows(JobContext job) {
return job.getConfiguration().getLong(NUM_ROWS, 0); return job.getConfiguration().getLong(TeraSortConfigKeys.NUM_ROWS.key(),
TeraSortConfigKeys.DEFAULT_NUM_ROWS);
} }
static void setNumberOfRows(Job job, long numRows) { static void setNumberOfRows(Job job, long numRows) {
job.getConfiguration().setLong(NUM_ROWS, numRows); job.getConfiguration().setLong(TeraSortConfigKeys.NUM_ROWS.key(), numRows);
} }
/** /**

View File

@ -50,10 +50,6 @@ import org.apache.hadoop.util.StringUtils;
public class TeraInputFormat extends FileInputFormat<Text,Text> { public class TeraInputFormat extends FileInputFormat<Text,Text> {
static final String PARTITION_FILENAME = "_partition.lst"; static final String PARTITION_FILENAME = "_partition.lst";
private static final String NUM_PARTITIONS =
"mapreduce.terasort.num.partitions";
private static final String SAMPLE_SIZE =
"mapreduce.terasort.partitions.sample";
static final int KEY_LENGTH = 10; static final int KEY_LENGTH = 10;
static final int VALUE_LENGTH = 90; static final int VALUE_LENGTH = 90;
static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH; static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
@ -123,11 +119,16 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
final TeraInputFormat inFormat = new TeraInputFormat(); final TeraInputFormat inFormat = new TeraInputFormat();
final TextSampler sampler = new TextSampler(); final TextSampler sampler = new TextSampler();
int partitions = job.getNumReduceTasks(); int partitions = job.getNumReduceTasks();
long sampleSize = conf.getLong(SAMPLE_SIZE, 100000); long sampleSize =
conf.getLong(TeraSortConfigKeys.SAMPLE_SIZE.key(),
TeraSortConfigKeys.DEFAULT_SAMPLE_SIZE);
final List<InputSplit> splits = inFormat.getSplits(job); final List<InputSplit> splits = inFormat.getSplits(job);
long t2 = System.currentTimeMillis(); long t2 = System.currentTimeMillis();
System.out.println("Computing input splits took " + (t2 - t1) + "ms"); System.out.println("Computing input splits took " + (t2 - t1) + "ms");
int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size()); int samples =
Math.min(conf.getInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
TeraSortConfigKeys.DEFAULT_NUM_PARTITIONS),
splits.size());
System.out.println("Sampling " + samples + " splits of " + splits.size()); System.out.println("Sampling " + samples + " splits of " + splits.size());
final long recordsPerSample = sampleSize / samples; final long recordsPerSample = sampleSize / samples;
final int sampleStep = splits.size() / samples; final int sampleStep = splits.size() / samples;
@ -294,7 +295,8 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
lastResult = super.getSplits(job); lastResult = super.getSplits(job);
t2 = System.currentTimeMillis(); t2 = System.currentTimeMillis();
System.out.println("Spent " + (t2 - t1) + "ms computing base-splits."); System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) { if (job.getConfiguration().getBoolean(TeraSortConfigKeys.USE_TERA_SCHEDULER.key(),
TeraSortConfigKeys.DEFAULT_USE_TERA_SCHEDULER)) {
TeraScheduler scheduler = new TeraScheduler( TeraScheduler scheduler = new TeraScheduler(
lastResult.toArray(new FileSplit[0]), job.getConfiguration()); lastResult.toArray(new FileSplit[0]), job.getConfiguration());
lastResult = scheduler.getNewFileSplits(); lastResult = scheduler.getNewFileSplits();

View File

@ -40,21 +40,23 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
* An output format that writes the key and value appended together. * An output format that writes the key and value appended together.
*/ */
public class TeraOutputFormat extends FileOutputFormat<Text,Text> { public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
static final String FINAL_SYNC_ATTRIBUTE = "mapreduce.terasort.final.sync";
private OutputCommitter committer = null; private OutputCommitter committer = null;
/** /**
* Set the requirement for a final sync before the stream is closed. * Set the requirement for a final sync before the stream is closed.
*/ */
static void setFinalSync(JobContext job, boolean newValue) { static void setFinalSync(JobContext job, boolean newValue) {
job.getConfiguration().setBoolean(FINAL_SYNC_ATTRIBUTE, newValue); job.getConfiguration().setBoolean(
TeraSortConfigKeys.FINAL_SYNC_ATTRIBUTE.key(), newValue);
} }
/** /**
* Does the user want a final sync at close? * Does the user want a final sync at close?
*/ */
public static boolean getFinalSync(JobContext job) { public static boolean getFinalSync(JobContext job) {
return job.getConfiguration().getBoolean(FINAL_SYNC_ATTRIBUTE, false); return job.getConfiguration().getBoolean(
TeraSortConfigKeys.FINAL_SYNC_ATTRIBUTE.key(),
TeraSortConfigKeys.DEFAULT_FINAL_SYNC_ATTRIBUTE);
} }
static class TeraRecordWriter extends RecordWriter<Text,Text> { static class TeraRecordWriter extends RecordWriter<Text,Text> {

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
class TeraScheduler { class TeraScheduler {
static String USE = "mapreduce.terasort.use.terascheduler";
private static final Log LOG = LogFactory.getLog(TeraScheduler.class); private static final Log LOG = LogFactory.getLog(TeraScheduler.class);
private Split[] splits; private Split[] splits;
private List<Host> hosts = new ArrayList<Host>(); private List<Host> hosts = new ArrayList<Host>();

View File

@ -48,8 +48,6 @@ import org.apache.hadoop.util.ToolRunner;
*/ */
public class TeraSort extends Configured implements Tool { public class TeraSort extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(TeraSort.class); private static final Log LOG = LogFactory.getLog(TeraSort.class);
static String SIMPLE_PARTITIONER = "mapreduce.terasort.simplepartitioner";
static String OUTPUT_REPLICATION = "mapreduce.terasort.output.replication";
/** /**
* A partitioner that splits text keys into roughly equal partitions * A partitioner that splits text keys into roughly equal partitions
@ -262,22 +260,40 @@ public class TeraSort extends Configured implements Tool {
} }
public static boolean getUseSimplePartitioner(JobContext job) { public static boolean getUseSimplePartitioner(JobContext job) {
return job.getConfiguration().getBoolean(SIMPLE_PARTITIONER, false); return job.getConfiguration().getBoolean(
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
TeraSortConfigKeys.DEFAULT_USE_SIMPLE_PARTITIONER);
} }
public static void setUseSimplePartitioner(Job job, boolean value) { public static void setUseSimplePartitioner(Job job, boolean value) {
job.getConfiguration().setBoolean(SIMPLE_PARTITIONER, value); job.getConfiguration().setBoolean(
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), value);
} }
public static int getOutputReplication(JobContext job) { public static int getOutputReplication(JobContext job) {
return job.getConfiguration().getInt(OUTPUT_REPLICATION, 1); return job.getConfiguration().getInt(
TeraSortConfigKeys.OUTPUT_REPLICATION.key(),
TeraSortConfigKeys.DEFAULT_OUTPUT_REPLICATION);
} }
public static void setOutputReplication(Job job, int value) { public static void setOutputReplication(Job job, int value) {
job.getConfiguration().setInt(OUTPUT_REPLICATION, value); job.getConfiguration().setInt(TeraSortConfigKeys.OUTPUT_REPLICATION.key(),
value);
}
private static void usage() throws IOException {
System.err.println("Usage: terasort [-Dproperty=value] <in> <out>");
System.err.println("TeraSort configurations are:");
for (TeraSortConfigKeys teraSortConfigKeys : TeraSortConfigKeys.values()) {
System.err.println(teraSortConfigKeys.toString());
}
} }
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
if (args.length != 2) {
usage();
return 2;
}
LOG.info("starting"); LOG.info("starting");
Job job = Job.getInstance(getConf()); Job job = Job.getInstance(getConf());
Path inputDir = new Path(args[0]); Path inputDir = new Path(args[0]);

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.examples.terasort;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* <p>
* TeraSort configurations.
* </p>
*/
@Private
@Unstable
public enum TeraSortConfigKeys {
NUM_ROWS("mapreduce.terasort.num-rows",
"Number of rows to generate during teragen."),
NUM_PARTITIONS("mapreduce.terasort.num.partitions",
"Number of partitions used for sampling."),
SAMPLE_SIZE("mapreduce.terasort.partitions.sample",
"Sample size for each partition."),
FINAL_SYNC_ATTRIBUTE("mapreduce.terasort.final.sync",
"Perform a disk-persisting hsync at end of every file-write."),
USE_TERA_SCHEDULER("mapreduce.terasort.use.terascheduler",
"Use TeraScheduler for computing input split distribution."),
USE_SIMPLE_PARTITIONER("mapreduce.terasort.simplepartitioner",
"Use SimplePartitioner instead of TotalOrderPartitioner."),
OUTPUT_REPLICATION("mapreduce.terasort.output.replication",
"Replication factor to use for output data files.");
private String confName;
private String description;
TeraSortConfigKeys(String configName, String description) {
this.confName = configName;
this.description = description;
}
public String key() {
return this.confName;
}
public String toString() {
return "<" + confName + "> " + description;
}
public static final long DEFAULT_NUM_ROWS = 0L;
public static final int DEFAULT_NUM_PARTITIONS = 10;
public static final long DEFAULT_SAMPLE_SIZE = 100000L;
public static final boolean DEFAULT_FINAL_SYNC_ATTRIBUTE = false;
public static final boolean DEFAULT_USE_TERA_SCHEDULER = true;
public static final boolean DEFAULT_USE_SIMPLE_PARTITIONER = false;
public static final int DEFAULT_OUTPUT_REPLICATION = 1;
}

View File

@ -104,4 +104,9 @@ public class TestTeraSort extends HadoopTestCase {
TERA_OUTPUT_PATH); TERA_OUTPUT_PATH);
} }
public void testTeraSortWithLessThanTwoArgs() throws Exception {
String[] args = new String[1];
assertEquals(new TeraSort().run(args), 2);
}
} }