MAPREDUCE-5807. Print usage for TeraSort job. Contributed by Rohith.
(cherry picked from commit 9d72f93975
)
This commit is contained in:
parent
18740f9383
commit
4e80c4cd5e
|
@ -8,6 +8,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
MAPREDUCE-5807. Print usage by TeraSort job. (Rohith via harsh)
|
||||||
|
|
||||||
MAPREDUCE-4653. TestRandomAlgorithm has an unused "import" statement.
|
MAPREDUCE-4653. TestRandomAlgorithm has an unused "import" statement.
|
||||||
(Amir Sanjar via harsh)
|
(Amir Sanjar via harsh)
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,6 @@ public class TeraGen extends Configured implements Tool {
|
||||||
|
|
||||||
public static enum Counters {CHECKSUM}
|
public static enum Counters {CHECKSUM}
|
||||||
|
|
||||||
public static final String NUM_ROWS = "mapreduce.terasort.num-rows";
|
|
||||||
/**
|
/**
|
||||||
* An input format that assigns ranges of longs to each mapper.
|
* An input format that assigns ranges of longs to each mapper.
|
||||||
*/
|
*/
|
||||||
|
@ -189,11 +188,12 @@ public class TeraGen extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
static long getNumberOfRows(JobContext job) {
|
static long getNumberOfRows(JobContext job) {
|
||||||
return job.getConfiguration().getLong(NUM_ROWS, 0);
|
return job.getConfiguration().getLong(TeraSortConfigKeys.NUM_ROWS.key(),
|
||||||
|
TeraSortConfigKeys.DEFAULT_NUM_ROWS);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setNumberOfRows(Job job, long numRows) {
|
static void setNumberOfRows(Job job, long numRows) {
|
||||||
job.getConfiguration().setLong(NUM_ROWS, numRows);
|
job.getConfiguration().setLong(TeraSortConfigKeys.NUM_ROWS.key(), numRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -50,10 +50,6 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
||||||
|
|
||||||
static final String PARTITION_FILENAME = "_partition.lst";
|
static final String PARTITION_FILENAME = "_partition.lst";
|
||||||
private static final String NUM_PARTITIONS =
|
|
||||||
"mapreduce.terasort.num.partitions";
|
|
||||||
private static final String SAMPLE_SIZE =
|
|
||||||
"mapreduce.terasort.partitions.sample";
|
|
||||||
static final int KEY_LENGTH = 10;
|
static final int KEY_LENGTH = 10;
|
||||||
static final int VALUE_LENGTH = 90;
|
static final int VALUE_LENGTH = 90;
|
||||||
static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
|
static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
|
||||||
|
@ -123,11 +119,16 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
||||||
final TeraInputFormat inFormat = new TeraInputFormat();
|
final TeraInputFormat inFormat = new TeraInputFormat();
|
||||||
final TextSampler sampler = new TextSampler();
|
final TextSampler sampler = new TextSampler();
|
||||||
int partitions = job.getNumReduceTasks();
|
int partitions = job.getNumReduceTasks();
|
||||||
long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
|
long sampleSize =
|
||||||
|
conf.getLong(TeraSortConfigKeys.SAMPLE_SIZE.key(),
|
||||||
|
TeraSortConfigKeys.DEFAULT_SAMPLE_SIZE);
|
||||||
final List<InputSplit> splits = inFormat.getSplits(job);
|
final List<InputSplit> splits = inFormat.getSplits(job);
|
||||||
long t2 = System.currentTimeMillis();
|
long t2 = System.currentTimeMillis();
|
||||||
System.out.println("Computing input splits took " + (t2 - t1) + "ms");
|
System.out.println("Computing input splits took " + (t2 - t1) + "ms");
|
||||||
int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size());
|
int samples =
|
||||||
|
Math.min(conf.getInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
|
||||||
|
TeraSortConfigKeys.DEFAULT_NUM_PARTITIONS),
|
||||||
|
splits.size());
|
||||||
System.out.println("Sampling " + samples + " splits of " + splits.size());
|
System.out.println("Sampling " + samples + " splits of " + splits.size());
|
||||||
final long recordsPerSample = sampleSize / samples;
|
final long recordsPerSample = sampleSize / samples;
|
||||||
final int sampleStep = splits.size() / samples;
|
final int sampleStep = splits.size() / samples;
|
||||||
|
@ -294,7 +295,8 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
||||||
lastResult = super.getSplits(job);
|
lastResult = super.getSplits(job);
|
||||||
t2 = System.currentTimeMillis();
|
t2 = System.currentTimeMillis();
|
||||||
System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
|
System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
|
||||||
if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
|
if (job.getConfiguration().getBoolean(TeraSortConfigKeys.USE_TERA_SCHEDULER.key(),
|
||||||
|
TeraSortConfigKeys.DEFAULT_USE_TERA_SCHEDULER)) {
|
||||||
TeraScheduler scheduler = new TeraScheduler(
|
TeraScheduler scheduler = new TeraScheduler(
|
||||||
lastResult.toArray(new FileSplit[0]), job.getConfiguration());
|
lastResult.toArray(new FileSplit[0]), job.getConfiguration());
|
||||||
lastResult = scheduler.getNewFileSplits();
|
lastResult = scheduler.getNewFileSplits();
|
||||||
|
|
|
@ -40,21 +40,23 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||||
* An output format that writes the key and value appended together.
|
* An output format that writes the key and value appended together.
|
||||||
*/
|
*/
|
||||||
public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
|
public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
|
||||||
static final String FINAL_SYNC_ATTRIBUTE = "mapreduce.terasort.final.sync";
|
|
||||||
private OutputCommitter committer = null;
|
private OutputCommitter committer = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the requirement for a final sync before the stream is closed.
|
* Set the requirement for a final sync before the stream is closed.
|
||||||
*/
|
*/
|
||||||
static void setFinalSync(JobContext job, boolean newValue) {
|
static void setFinalSync(JobContext job, boolean newValue) {
|
||||||
job.getConfiguration().setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
|
job.getConfiguration().setBoolean(
|
||||||
|
TeraSortConfigKeys.FINAL_SYNC_ATTRIBUTE.key(), newValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Does the user want a final sync at close?
|
* Does the user want a final sync at close?
|
||||||
*/
|
*/
|
||||||
public static boolean getFinalSync(JobContext job) {
|
public static boolean getFinalSync(JobContext job) {
|
||||||
return job.getConfiguration().getBoolean(FINAL_SYNC_ATTRIBUTE, false);
|
return job.getConfiguration().getBoolean(
|
||||||
|
TeraSortConfigKeys.FINAL_SYNC_ATTRIBUTE.key(),
|
||||||
|
TeraSortConfigKeys.DEFAULT_FINAL_SYNC_ATTRIBUTE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static class TeraRecordWriter extends RecordWriter<Text,Text> {
|
static class TeraRecordWriter extends RecordWriter<Text,Text> {
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
|
|
||||||
class TeraScheduler {
|
class TeraScheduler {
|
||||||
static String USE = "mapreduce.terasort.use.terascheduler";
|
|
||||||
private static final Log LOG = LogFactory.getLog(TeraScheduler.class);
|
private static final Log LOG = LogFactory.getLog(TeraScheduler.class);
|
||||||
private Split[] splits;
|
private Split[] splits;
|
||||||
private List<Host> hosts = new ArrayList<Host>();
|
private List<Host> hosts = new ArrayList<Host>();
|
||||||
|
|
|
@ -48,8 +48,6 @@ import org.apache.hadoop.util.ToolRunner;
|
||||||
*/
|
*/
|
||||||
public class TeraSort extends Configured implements Tool {
|
public class TeraSort extends Configured implements Tool {
|
||||||
private static final Log LOG = LogFactory.getLog(TeraSort.class);
|
private static final Log LOG = LogFactory.getLog(TeraSort.class);
|
||||||
static String SIMPLE_PARTITIONER = "mapreduce.terasort.simplepartitioner";
|
|
||||||
static String OUTPUT_REPLICATION = "mapreduce.terasort.output.replication";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A partitioner that splits text keys into roughly equal partitions
|
* A partitioner that splits text keys into roughly equal partitions
|
||||||
|
@ -262,22 +260,40 @@ public class TeraSort extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean getUseSimplePartitioner(JobContext job) {
|
public static boolean getUseSimplePartitioner(JobContext job) {
|
||||||
return job.getConfiguration().getBoolean(SIMPLE_PARTITIONER, false);
|
return job.getConfiguration().getBoolean(
|
||||||
|
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
|
||||||
|
TeraSortConfigKeys.DEFAULT_USE_SIMPLE_PARTITIONER);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void setUseSimplePartitioner(Job job, boolean value) {
|
public static void setUseSimplePartitioner(Job job, boolean value) {
|
||||||
job.getConfiguration().setBoolean(SIMPLE_PARTITIONER, value);
|
job.getConfiguration().setBoolean(
|
||||||
|
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static int getOutputReplication(JobContext job) {
|
public static int getOutputReplication(JobContext job) {
|
||||||
return job.getConfiguration().getInt(OUTPUT_REPLICATION, 1);
|
return job.getConfiguration().getInt(
|
||||||
|
TeraSortConfigKeys.OUTPUT_REPLICATION.key(),
|
||||||
|
TeraSortConfigKeys.DEFAULT_OUTPUT_REPLICATION);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void setOutputReplication(Job job, int value) {
|
public static void setOutputReplication(Job job, int value) {
|
||||||
job.getConfiguration().setInt(OUTPUT_REPLICATION, value);
|
job.getConfiguration().setInt(TeraSortConfigKeys.OUTPUT_REPLICATION.key(),
|
||||||
|
value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void usage() throws IOException {
|
||||||
|
System.err.println("Usage: terasort [-Dproperty=value] <in> <out>");
|
||||||
|
System.err.println("TeraSort configurations are:");
|
||||||
|
for (TeraSortConfigKeys teraSortConfigKeys : TeraSortConfigKeys.values()) {
|
||||||
|
System.err.println(teraSortConfigKeys.toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public int run(String[] args) throws Exception {
|
public int run(String[] args) throws Exception {
|
||||||
|
if (args.length != 2) {
|
||||||
|
usage();
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
LOG.info("starting");
|
LOG.info("starting");
|
||||||
Job job = Job.getInstance(getConf());
|
Job job = Job.getInstance(getConf());
|
||||||
Path inputDir = new Path(args[0]);
|
Path inputDir = new Path(args[0]);
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.examples.terasort;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* TeraSort configurations.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public enum TeraSortConfigKeys {
|
||||||
|
|
||||||
|
NUM_ROWS("mapreduce.terasort.num-rows",
|
||||||
|
"Number of rows to generate during teragen."),
|
||||||
|
|
||||||
|
NUM_PARTITIONS("mapreduce.terasort.num.partitions",
|
||||||
|
"Number of partitions used for sampling."),
|
||||||
|
|
||||||
|
SAMPLE_SIZE("mapreduce.terasort.partitions.sample",
|
||||||
|
"Sample size for each partition."),
|
||||||
|
|
||||||
|
FINAL_SYNC_ATTRIBUTE("mapreduce.terasort.final.sync",
|
||||||
|
"Perform a disk-persisting hsync at end of every file-write."),
|
||||||
|
|
||||||
|
USE_TERA_SCHEDULER("mapreduce.terasort.use.terascheduler",
|
||||||
|
"Use TeraScheduler for computing input split distribution."),
|
||||||
|
|
||||||
|
USE_SIMPLE_PARTITIONER("mapreduce.terasort.simplepartitioner",
|
||||||
|
"Use SimplePartitioner instead of TotalOrderPartitioner."),
|
||||||
|
|
||||||
|
OUTPUT_REPLICATION("mapreduce.terasort.output.replication",
|
||||||
|
"Replication factor to use for output data files.");
|
||||||
|
|
||||||
|
private String confName;
|
||||||
|
private String description;
|
||||||
|
|
||||||
|
TeraSortConfigKeys(String configName, String description) {
|
||||||
|
this.confName = configName;
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String key() {
|
||||||
|
return this.confName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return "<" + confName + "> " + description;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final long DEFAULT_NUM_ROWS = 0L;
|
||||||
|
public static final int DEFAULT_NUM_PARTITIONS = 10;
|
||||||
|
public static final long DEFAULT_SAMPLE_SIZE = 100000L;
|
||||||
|
public static final boolean DEFAULT_FINAL_SYNC_ATTRIBUTE = false;
|
||||||
|
public static final boolean DEFAULT_USE_TERA_SCHEDULER = true;
|
||||||
|
public static final boolean DEFAULT_USE_SIMPLE_PARTITIONER = false;
|
||||||
|
public static final int DEFAULT_OUTPUT_REPLICATION = 1;
|
||||||
|
}
|
|
@ -104,4 +104,9 @@ public class TestTeraSort extends HadoopTestCase {
|
||||||
TERA_OUTPUT_PATH);
|
TERA_OUTPUT_PATH);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testTeraSortWithLessThanTwoArgs() throws Exception {
|
||||||
|
String[] args = new String[1];
|
||||||
|
assertEquals(new TeraSort().run(args), 2);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue