Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1235856 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-01-25 18:07:43 +00:00
commit 1853b9d188
26 changed files with 1405 additions and 53 deletions

View File

@ -127,6 +127,17 @@
<unpack>false</unpack>
</binaries>
</moduleSet>
<moduleSet>
<includes>
<include>org.apache.hadoop:hadoop-mapreduce-client-jobclient</include>
</includes>
<binaries>
<attachmentClassifier>tests</attachmentClassifier>
<outputDirectory>share/hadoop/${hadoop.component}</outputDirectory>
<includeDependencies>false</includeDependencies>
<unpack>false</unpack>
</binaries>
</moduleSet>
</moduleSets>
<dependencySets>
<dependencySet>

View File

@ -189,6 +189,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3692. yarn-resourcemanager out and log files can get big. (eli)
MAPREDUCE-3710. Improved FileInputFormat to return better locality for the
last split. (Siddarth Seth via vinodkv)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@ -540,6 +543,21 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3681. Fixed computation of queue's usedCapacity. (acmurthy)
MAPREDUCE-3505. yarn APPLICATION_CLASSPATH needs to be overridable.
(ahmed via tucu)
MAPREDUCE-3714. Fixed EventFetcher and Fetcher threads to shut-down properly
so that reducers don't hang in corner cases. (vinodkv)
MAPREDUCE-3712. The mapreduce tar does not contain the hadoop-mapreduce-client-
jobclient-tests.jar. (mahadev)
MAPREDUCE-3717. JobClient test jar has missing files to run all the test programs.
(mahadev)
MAPREDUCE-3630. Fixes a NullPointer exception while running TeraGen - if a
map is asked to generate 0 records. (Mahadev Konar via sseth)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -522,13 +522,13 @@ public abstract class TaskAttemptImpl implements
* a parent CLC and use it for all the containers, so this should go away
* once the mr-generated-classpath stuff is gone.
*/
private static String getInitialClasspath() throws IOException {
private static String getInitialClasspath(Configuration conf) throws IOException {
synchronized (classpathLock) {
if (initialClasspathFlag.get()) {
return initialClasspath;
}
Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env);
MRApps.setClasspath(env, conf);
initialClasspath = env.get(Environment.CLASSPATH.name());
initialClasspathFlag.set(true);
return initialClasspath;
@ -631,7 +631,7 @@ public abstract class TaskAttemptImpl implements
Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
getInitialClasspath());
getInitialClasspath(conf));
} catch (IOException e) {
throw new YarnException(e);
}

View File

@ -38,6 +38,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -171,7 +172,7 @@ public class MRApps extends Apps {
}
private static void setMRFrameworkClasspath(
Map<String, String> environment) throws IOException {
Map<String, String> environment, Configuration conf) throws IOException {
InputStream classpathFileStream = null;
BufferedReader reader = null;
try {
@ -208,8 +209,10 @@ public class MRApps extends Apps {
}
// Add standard Hadoop classes
for (String c : ApplicationConstants.APPLICATION_CLASSPATH) {
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c);
for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)
.split(",")) {
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
.trim());
}
} finally {
if (classpathFileStream != null) {
@ -222,8 +225,8 @@ public class MRApps extends Apps {
// TODO: Remove duplicates.
}
public static void setClasspath(Map<String, String> environment)
throws IOException {
public static void setClasspath(Map<String, String> environment,
Configuration conf) throws IOException {
Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
@ -232,7 +235,7 @@ public class MRApps extends Apps {
environment,
Environment.CLASSPATH.name(),
Environment.PWD.$() + Path.SEPARATOR + "*");
MRApps.setMRFrameworkClasspath(environment);
MRApps.setMRFrameworkClasspath(environment, conf);
}
private static final String STAGING_CONSTANT = ".staging";

View File

@ -18,7 +18,12 @@
package org.apache.hadoop.mapreduce.v2.util;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -121,4 +126,17 @@ public class TestMRApps {
"/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);
}
@Test public void testSetClasspath() throws IOException {
Job job = Job.getInstance();
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, job.getConfiguration());
assertEquals("job.jar:$PWD/*:$HADOOP_CONF_DIR:" +
"$HADOOP_COMMON_HOME/share/hadoop/common/*:" +
"$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:" +
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:" +
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:" +
"$YARN_HOME/share/hadoop/mapreduce/*:" +
"$YARN_HOME/share/hadoop/mapreduce/lib/*",
environment.get("CLASSPATH"));
}
}

View File

@ -289,8 +289,10 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
}
if (bytesRemaining != 0) {
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
String[] splitHosts = getSplitHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts));
}
} else if (length != 0) {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);

View File

@ -286,8 +286,9 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
blkLocations[blkIndex].getHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@SuppressWarnings("deprecation")
class EventFetcher<K,V> extends Thread {
private static final long SLEEP_TIME = 1000;
private static final int MAX_EVENTS_TO_FETCH = 10000;
@ -41,6 +42,8 @@ class EventFetcher<K,V> extends Thread {
private ExceptionReporter exceptionReporter = null;
private int maxMapRuntime = 0;
private volatile boolean stopped = false;
public EventFetcher(TaskAttemptID reduce,
TaskUmbilicalProtocol umbilical,
@ -60,7 +63,7 @@ class EventFetcher<K,V> extends Thread {
LOG.info(reduce + " Thread started: " + getName());
try {
while (true && !Thread.currentThread().isInterrupted()) {
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
int numNewMaps = getMapCompletionEvents();
failures = 0;
@ -71,6 +74,9 @@ class EventFetcher<K,V> extends Thread {
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME);
}
} catch (InterruptedException e) {
LOG.info("EventFetcher is interrupted.. Returning");
return;
} catch (IOException ie) {
LOG.info("Exception in getting events", ie);
// check to see whether to abort
@ -90,6 +96,16 @@ class EventFetcher<K,V> extends Thread {
return;
}
}
public void shutDown() {
this.stopped = true;
interrupt();
try {
join(5000);
} catch(InterruptedException ie) {
LOG.warn("Got interrupted while joining " + getName(), ie);
}
}
/**
* Queries the {@link TaskTracker} for a set of map-completion events

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
@SuppressWarnings({"deprecation"})
class Fetcher<K,V> extends Thread {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
@ -88,6 +89,8 @@ class Fetcher<K,V> extends Thread {
private final Decompressor decompressor;
private final SecretKey jobTokenSecret;
private volatile boolean stopped = false;
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
@ -135,7 +138,7 @@ class Fetcher<K,V> extends Thread {
public void run() {
try {
while (true && !Thread.currentThread().isInterrupted()) {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
@ -160,7 +163,17 @@ class Fetcher<K,V> extends Thread {
exceptionReporter.reportException(t);
}
}
public void shutDown() throws InterruptedException {
this.stopped = true;
interrupt();
try {
join(5000);
} catch (InterruptedException ie) {
LOG.warn("Got interrupt while joining " + getName(), ie);
}
}
/**
* The crux of the matter...
*

View File

@ -19,8 +19,6 @@ package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
@ -33,17 +31,17 @@ import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@SuppressWarnings({"deprecation", "unchecked", "rawtypes"})
public class Shuffle<K, V> implements ExceptionReporter {
private static final Log LOG = LogFactory.getLog(Shuffle.class);
private static final int PROGRESS_FREQUENCY = 2000;
private final TaskAttemptID reduceId;
@ -100,7 +98,6 @@ public class Shuffle<K, V> implements ExceptionReporter {
this, mergePhase, mapOutputFile);
}
@SuppressWarnings("unchecked")
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher =
@ -130,19 +127,11 @@ public class Shuffle<K, V> implements ExceptionReporter {
}
// Stop the event-fetcher thread
eventFetcher.interrupt();
try {
eventFetcher.join();
} catch(Throwable t) {
LOG.info("Failed to stop " + eventFetcher.getName(), t);
}
eventFetcher.shutDown();
// Stop the map-output fetcher threads
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.interrupt();
}
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.join();
fetcher.shutDown();
}
fetchers = null;

View File

@ -102,6 +102,13 @@
<phase>test-compile</phase>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.test.MapredTestDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

View File

@ -406,7 +406,7 @@ public class YARNRunner implements ClientProtocol {
// Setup the CLASSPATH in environment
// i.e. add { job jar, CWD, Hadoop jars} to classpath.
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment);
MRApps.setClasspath(environment, conf);
// Parse distributed cache
MRApps.setupDistributedCache(jobConf, localResources);

View File

@ -29,7 +29,6 @@ import java.util.Stack;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.examples.RandomTextWriter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -40,6 +39,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.RandomTextWriter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.mapred;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.DataOutputStream;
import java.io.IOException;
@ -32,6 +36,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
@SuppressWarnings("deprecation")
public class TestFileInputFormat extends TestCase {
Configuration conf = new Configuration();
@ -186,6 +191,102 @@ public class TestFileInputFormat extends TestCase {
assertEquals(splits.length, 2);
}
@SuppressWarnings("rawtypes")
public void testLastInputSplitAtSplitBoundary() throws Exception {
FileInputFormat fif = new FileInputFormatForTest(1024l * 1024 * 1024,
128l * 1024 * 1024);
JobConf job = new JobConf();
InputSplit[] splits = fif.getSplits(job, 8);
assertEquals(8, splits.length);
for (int i = 0; i < splits.length; i++) {
InputSplit split = splits[i];
assertEquals(("host" + i), split.getLocations()[0]);
}
}
@SuppressWarnings("rawtypes")
public void testLastInputSplitExceedingSplitBoundary() throws Exception {
FileInputFormat fif = new FileInputFormatForTest(1027l * 1024 * 1024,
128l * 1024 * 1024);
JobConf job = new JobConf();
InputSplit[] splits = fif.getSplits(job, 8);
assertEquals(8, splits.length);
for (int i = 0; i < splits.length; i++) {
InputSplit split = splits[i];
assertEquals(("host" + i), split.getLocations()[0]);
}
}
@SuppressWarnings("rawtypes")
public void testLastInputSplitSingleSplit() throws Exception {
FileInputFormat fif = new FileInputFormatForTest(100l * 1024 * 1024,
128l * 1024 * 1024);
JobConf job = new JobConf();
InputSplit[] splits = fif.getSplits(job, 1);
assertEquals(1, splits.length);
for (int i = 0; i < splits.length; i++) {
InputSplit split = splits[i];
assertEquals(("host" + i), split.getLocations()[0]);
}
}
private class FileInputFormatForTest<K, V> extends FileInputFormat<K, V> {
long splitSize;
long length;
FileInputFormatForTest(long length, long splitSize) {
this.length = length;
this.splitSize = splitSize;
}
@Override
public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
return null;
}
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
FileStatus mockFileStatus = mock(FileStatus.class);
when(mockFileStatus.getBlockSize()).thenReturn(splitSize);
when(mockFileStatus.isDirectory()).thenReturn(false);
Path mockPath = mock(Path.class);
FileSystem mockFs = mock(FileSystem.class);
BlockLocation[] blockLocations = mockBlockLocations(length, splitSize);
when(mockFs.getFileBlockLocations(mockFileStatus, 0, length)).thenReturn(
blockLocations);
when(mockPath.getFileSystem(any(Configuration.class))).thenReturn(mockFs);
when(mockFileStatus.getPath()).thenReturn(mockPath);
when(mockFileStatus.getLen()).thenReturn(length);
FileStatus[] fs = new FileStatus[1];
fs[0] = mockFileStatus;
return fs;
}
@Override
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return splitSize;
}
private BlockLocation[] mockBlockLocations(long size, long splitSize) {
int numLocations = (int) (size / splitSize);
if (size % splitSize != 0)
numLocations++;
BlockLocation[] blockLocations = new BlockLocation[numLocations];
for (int i = 0; i < numLocations; i++) {
String[] names = new String[] { "b" + i };
String[] hosts = new String[] { "host" + i };
blockLocations[i] = new BlockLocation(names, hosts, i * splitSize,
Math.min(splitSize, size - (splitSize * i)));
}
return blockLocations;
}
}
static void writeFile(Configuration conf, Path name,
short replication, int numBlocks) throws IOException {
FileSystem fileSys = FileSystem.get(conf);

View File

@ -25,7 +25,6 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.examples.RandomWriter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;

View File

@ -29,7 +29,6 @@ import java.util.Stack;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.examples.RandomTextWriter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

View File

@ -0,0 +1,757 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* This program uses map/reduce to just run a distributed job where there is
* no interaction between the tasks and each task writes a large unsorted
* random sequence of words.
* In order for this program to generate data for terasort with a 5-10 words
* per key and 20-100 words per value, have the following config:
* <xmp>
* <?xml version="1.0"?>
* <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
* <configuration>
* <property>
* <name>mapreduce.randomtextwriter.minwordskey</name>
* <value>5</value>
* </property>
* <property>
* <name>mapreduce.randomtextwriter.maxwordskey</name>
* <value>10</value>
* </property>
* <property>
* <name>mapreduce.randomtextwriter.minwordsvalue</name>
* <value>20</value>
* </property>
* <property>
* <name>mapreduce.randomtextwriter.maxwordsvalue</name>
* <value>100</value>
* </property>
* <property>
* <name>mapreduce.randomtextwriter.totalbytes</name>
* <value>1099511627776</value>
* </property>
* </configuration></xmp>
*
* Equivalently, {@link RandomTextWriter} also supports all the above options
* and ones supported by {@link Tool} via the command-line.
*
* To run: bin/hadoop jar hadoop-${version}-examples.jar randomtextwriter
* [-outFormat <i>output format class</i>] <i>output</i>
*/
public class RandomTextWriter extends Configured implements Tool {
public static final String TOTAL_BYTES =
"mapreduce.randomtextwriter.totalbytes";
public static final String BYTES_PER_MAP =
"mapreduce.randomtextwriter.bytespermap";
public static final String MAPS_PER_HOST =
"mapreduce.randomtextwriter.mapsperhost";
public static final String MAX_VALUE = "mapreduce.randomtextwriter.maxwordsvalue";
public static final String MIN_VALUE = "mapreduce.randomtextwriter.minwordsvalue";
public static final String MIN_KEY = "mapreduce.randomtextwriter.minwordskey";
public static final String MAX_KEY = "mapreduce.randomtextwriter.maxwordskey";
static int printUsage() {
System.out.println("randomtextwriter " +
"[-outFormat <output format class>] " +
"<output>");
ToolRunner.printGenericCommandUsage(System.out);
return 2;
}
/**
* User counters
*/
static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
static class RandomTextMapper extends Mapper<Text, Text, Text, Text> {
private long numBytesToWrite;
private int minWordsInKey;
private int wordsInKeyRange;
private int minWordsInValue;
private int wordsInValueRange;
private Random random = new Random();
/**
* Save the configuration value that we need to write the data.
*/
public void setup(Context context) {
Configuration conf = context.getConfiguration();
numBytesToWrite = conf.getLong(BYTES_PER_MAP,
1*1024*1024*1024);
minWordsInKey = conf.getInt(MIN_KEY, 5);
wordsInKeyRange = (conf.getInt(MAX_KEY, 10) - minWordsInKey);
minWordsInValue = conf.getInt(MIN_VALUE, 10);
wordsInValueRange = (conf.getInt(MAX_VALUE, 100) - minWordsInValue);
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(Text key, Text value,
Context context) throws IOException,InterruptedException {
int itemCount = 0;
while (numBytesToWrite > 0) {
// Generate the key/value
int noWordsKey = minWordsInKey +
(wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
int noWordsValue = minWordsInValue +
(wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
Text keyWords = generateSentence(noWordsKey);
Text valueWords = generateSentence(noWordsValue);
// Write the sentence
context.write(keyWords, valueWords);
numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
// Update counters, progress etc.
context.getCounter(Counters.BYTES_WRITTEN).increment(
keyWords.getLength() + valueWords.getLength());
context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
if (++itemCount % 200 == 0) {
context.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
context.setStatus("done with " + itemCount + " records.");
}
private Text generateSentence(int noWords) {
StringBuffer sentence = new StringBuffer();
String space = " ";
for (int i=0; i < noWords; ++i) {
sentence.append(words[random.nextInt(words.length)]);
sentence.append(space);
}
return new Text(sentence.toString());
}
}
/**
* This is the main routine for launching a distributed random write job.
* It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
* The reduce doesn't do anything.
*
* @throws IOException
*/
public int run(String[] args) throws Exception {
if (args.length == 0) {
return printUsage();
}
Configuration conf = getConf();
JobClient client = new JobClient(conf);
ClusterStatus cluster = client.getClusterStatus();
int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
1*1024*1024*1024);
if (numBytesToWritePerMap == 0) {
System.err.println("Cannot have " + BYTES_PER_MAP +" set to 0");
return -2;
}
long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
}
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
Job job = new Job(conf);
job.setJarByClass(RandomTextWriter.class);
job.setJobName("random-text-writer");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(RandomWriter.RandomInputFormat.class);
job.setMapperClass(RandomTextMapper.class);
Class<? extends OutputFormat> outputFormatClass =
SequenceFileOutputFormat.class;
List<String> otherArgs = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
if ("-outFormat".equals(args[i])) {
outputFormatClass =
Class.forName(args[++i]).asSubclass(OutputFormat.class);
} else {
otherArgs.add(args[i]);
}
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage(); // exits
}
}
job.setOutputFormatClass(outputFormatClass);
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0)));
System.out.println("Running " + numMaps + " maps.");
// reducer NONE
job.setNumReduceTasks(0);
Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true) ? 0 : 1;
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return ret;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new RandomTextWriter(), args);
System.exit(res);
}
/**
* A random list of 100 words from /usr/share/dict/words
*/
private static String[] words = {
"diurnalness", "Homoiousian",
"spiranthic", "tetragynian",
"silverhead", "ungreat",
"lithograph", "exploiter",
"physiologian", "by",
"hellbender", "Filipendula",
"undeterring", "antiscolic",
"pentagamist", "hypoid",
"cacuminal", "sertularian",
"schoolmasterism", "nonuple",
"gallybeggar", "phytonic",
"swearingly", "nebular",
"Confervales", "thermochemically",
"characinoid", "cocksuredom",
"fallacious", "feasibleness",
"debromination", "playfellowship",
"tramplike", "testa",
"participatingly", "unaccessible",
"bromate", "experientialist",
"roughcast", "docimastical",
"choralcelo", "blightbird",
"peptonate", "sombreroed",
"unschematized", "antiabolitionist",
"besagne", "mastication",
"bromic", "sviatonosite",
"cattimandoo", "metaphrastical",
"endotheliomyoma", "hysterolysis",
"unfulminated", "Hester",
"oblongly", "blurredness",
"authorling", "chasmy",
"Scorpaenidae", "toxihaemia",
"Dictograph", "Quakerishly",
"deaf", "timbermonger",
"strammel", "Thraupidae",
"seditious", "plerome",
"Arneb", "eristically",
"serpentinic", "glaumrie",
"socioromantic", "apocalypst",
"tartrous", "Bassaris",
"angiolymphoma", "horsefly",
"kenno", "astronomize",
"euphemious", "arsenide",
"untongued", "parabolicness",
"uvanite", "helpless",
"gemmeous", "stormy",
"templar", "erythrodextrin",
"comism", "interfraternal",
"preparative", "parastas",
"frontoorbital", "Ophiosaurus",
"diopside", "serosanguineous",
"ununiformly", "karyological",
"collegian", "allotropic",
"depravity", "amylogenesis",
"reformatory", "epidymides",
"pleurotropous", "trillium",
"dastardliness", "coadvice",
"embryotic", "benthonic",
"pomiferous", "figureheadship",
"Megaluridae", "Harpa",
"frenal", "commotion",
"abthainry", "cobeliever",
"manilla", "spiciferous",
"nativeness", "obispo",
"monilioid", "biopsic",
"valvula", "enterostomy",
"planosubulate", "pterostigma",
"lifter", "triradiated",
"venialness", "tum",
"archistome", "tautness",
"unswanlike", "antivenin",
"Lentibulariaceae", "Triphora",
"angiopathy", "anta",
"Dawsonia", "becomma",
"Yannigan", "winterproof",
"antalgol", "harr",
"underogating", "ineunt",
"cornberry", "flippantness",
"scyphostoma", "approbation",
"Ghent", "Macraucheniidae",
"scabbiness", "unanatomized",
"photoelasticity", "eurythermal",
"enation", "prepavement",
"flushgate", "subsequentially",
"Edo", "antihero",
"Isokontae", "unforkedness",
"porriginous", "daytime",
"nonexecutive", "trisilicic",
"morphiomania", "paranephros",
"botchedly", "impugnation",
"Dodecatheon", "obolus",
"unburnt", "provedore",
"Aktistetae", "superindifference",
"Alethea", "Joachimite",
"cyanophilous", "chorograph",
"brooky", "figured",
"periclitation", "quintette",
"hondo", "ornithodelphous",
"unefficient", "pondside",
"bogydom", "laurinoxylon",
"Shiah", "unharmed",
"cartful", "noncrystallized",
"abusiveness", "cromlech",
"japanned", "rizzomed",
"underskin", "adscendent",
"allectory", "gelatinousness",
"volcano", "uncompromisingly",
"cubit", "idiotize",
"unfurbelowed", "undinted",
"magnetooptics", "Savitar",
"diwata", "ramosopalmate",
"Pishquow", "tomorn",
"apopenptic", "Haversian",
"Hysterocarpus", "ten",
"outhue", "Bertat",
"mechanist", "asparaginic",
"velaric", "tonsure",
"bubble", "Pyrales",
"regardful", "glyphography",
"calabazilla", "shellworker",
"stradametrical", "havoc",
"theologicopolitical", "sawdust",
"diatomaceous", "jajman",
"temporomastoid", "Serrifera",
"Ochnaceae", "aspersor",
"trailmaking", "Bishareen",
"digitule", "octogynous",
"epididymitis", "smokefarthings",
"bacillite", "overcrown",
"mangonism", "sirrah",
"undecorated", "psychofugal",
"bismuthiferous", "rechar",
"Lemuridae", "frameable",
"thiodiazole", "Scanic",
"sportswomanship", "interruptedness",
"admissory", "osteopaedion",
"tingly", "tomorrowness",
"ethnocracy", "trabecular",
"vitally", "fossilism",
"adz", "metopon",
"prefatorial", "expiscate",
"diathermacy", "chronist",
"nigh", "generalizable",
"hysterogen", "aurothiosulphuric",
"whitlowwort", "downthrust",
"Protestantize", "monander",
"Itea", "chronographic",
"silicize", "Dunlop",
"eer", "componental",
"spot", "pamphlet",
"antineuritic", "paradisean",
"interruptor", "debellator",
"overcultured", "Florissant",
"hyocholic", "pneumatotherapy",
"tailoress", "rave",
"unpeople", "Sebastian",
"thermanesthesia", "Coniferae",
"swacking", "posterishness",
"ethmopalatal", "whittle",
"analgize", "scabbardless",
"naught", "symbiogenetically",
"trip", "parodist",
"columniform", "trunnel",
"yawler", "goodwill",
"pseudohalogen", "swangy",
"cervisial", "mediateness",
"genii", "imprescribable",
"pony", "consumptional",
"carposporangial", "poleax",
"bestill", "subfebrile",
"sapphiric", "arrowworm",
"qualminess", "ultraobscure",
"thorite", "Fouquieria",
"Bermudian", "prescriber",
"elemicin", "warlike",
"semiangle", "rotular",
"misthread", "returnability",
"seraphism", "precostal",
"quarried", "Babylonism",
"sangaree", "seelful",
"placatory", "pachydermous",
"bozal", "galbulus",
"spermaphyte", "cumbrousness",
"pope", "signifier",
"Endomycetaceae", "shallowish",
"sequacity", "periarthritis",
"bathysphere", "pentosuria",
"Dadaism", "spookdom",
"Consolamentum", "afterpressure",
"mutter", "louse",
"ovoviviparous", "corbel",
"metastoma", "biventer",
"Hydrangea", "hogmace",
"seizing", "nonsuppressed",
"oratorize", "uncarefully",
"benzothiofuran", "penult",
"balanocele", "macropterous",
"dishpan", "marten",
"absvolt", "jirble",
"parmelioid", "airfreighter",
"acocotl", "archesporial",
"hypoplastral", "preoral",
"quailberry", "cinque",
"terrestrially", "stroking",
"limpet", "moodishness",
"canicule", "archididascalian",
"pompiloid", "overstaid",
"introducer", "Italical",
"Christianopaganism", "prescriptible",
"subofficer", "danseuse",
"cloy", "saguran",
"frictionlessly", "deindividualization",
"Bulanda", "ventricous",
"subfoliar", "basto",
"scapuloradial", "suspend",
"stiffish", "Sphenodontidae",
"eternal", "verbid",
"mammonish", "upcushion",
"barkometer", "concretion",
"preagitate", "incomprehensible",
"tristich", "visceral",
"hemimelus", "patroller",
"stentorophonic", "pinulus",
"kerykeion", "brutism",
"monstership", "merciful",
"overinstruct", "defensibly",
"bettermost", "splenauxe",
"Mormyrus", "unreprimanded",
"taver", "ell",
"proacquittal", "infestation",
"overwoven", "Lincolnlike",
"chacona", "Tamil",
"classificational", "lebensraum",
"reeveland", "intuition",
"Whilkut", "focaloid",
"Eleusinian", "micromembrane",
"byroad", "nonrepetition",
"bacterioblast", "brag",
"ribaldrous", "phytoma",
"counteralliance", "pelvimetry",
"pelf", "relaster",
"thermoresistant", "aneurism",
"molossic", "euphonym",
"upswell", "ladhood",
"phallaceous", "inertly",
"gunshop", "stereotypography",
"laryngic", "refasten",
"twinling", "oflete",
"hepatorrhaphy", "electrotechnics",
"cockal", "guitarist",
"topsail", "Cimmerianism",
"larklike", "Llandovery",
"pyrocatechol", "immatchable",
"chooser", "metrocratic",
"craglike", "quadrennial",
"nonpoisonous", "undercolored",
"knob", "ultratense",
"balladmonger", "slait",
"sialadenitis", "bucketer",
"magnificently", "unstipulated",
"unscourged", "unsupercilious",
"packsack", "pansophism",
"soorkee", "percent",
"subirrigate", "champer",
"metapolitics", "spherulitic",
"involatile", "metaphonical",
"stachyuraceous", "speckedness",
"bespin", "proboscidiform",
"gul", "squit",
"yeelaman", "peristeropode",
"opacousness", "shibuichi",
"retinize", "yote",
"misexposition", "devilwise",
"pumpkinification", "vinny",
"bonze", "glossing",
"decardinalize", "transcortical",
"serphoid", "deepmost",
"guanajuatite", "wemless",
"arval", "lammy",
"Effie", "Saponaria",
"tetrahedral", "prolificy",
"excerpt", "dunkadoo",
"Spencerism", "insatiately",
"Gilaki", "oratorship",
"arduousness", "unbashfulness",
"Pithecolobium", "unisexuality",
"veterinarian", "detractive",
"liquidity", "acidophile",
"proauction", "sural",
"totaquina", "Vichyite",
"uninhabitedness", "allegedly",
"Gothish", "manny",
"Inger", "flutist",
"ticktick", "Ludgatian",
"homotransplant", "orthopedical",
"diminutively", "monogoneutic",
"Kenipsim", "sarcologist",
"drome", "stronghearted",
"Fameuse", "Swaziland",
"alen", "chilblain",
"beatable", "agglomeratic",
"constitutor", "tendomucoid",
"porencephalous", "arteriasis",
"boser", "tantivy",
"rede", "lineamental",
"uncontradictableness", "homeotypical",
"masa", "folious",
"dosseret", "neurodegenerative",
"subtransverse", "Chiasmodontidae",
"palaeotheriodont", "unstressedly",
"chalcites", "piquantness",
"lampyrine", "Aplacentalia",
"projecting", "elastivity",
"isopelletierin", "bladderwort",
"strander", "almud",
"iniquitously", "theologal",
"bugre", "chargeably",
"imperceptivity", "meriquinoidal",
"mesophyte", "divinator",
"perfunctory", "counterappellant",
"synovial", "charioteer",
"crystallographical", "comprovincial",
"infrastapedial", "pleasurehood",
"inventurous", "ultrasystematic",
"subangulated", "supraoesophageal",
"Vaishnavism", "transude",
"chrysochrous", "ungrave",
"reconciliable", "uninterpleaded",
"erlking", "wherefrom",
"aprosopia", "antiadiaphorist",
"metoxazine", "incalculable",
"umbellic", "predebit",
"foursquare", "unimmortal",
"nonmanufacture", "slangy",
"predisputant", "familist",
"preaffiliate", "friarhood",
"corelysis", "zoonitic",
"halloo", "paunchy",
"neuromimesis", "aconitine",
"hackneyed", "unfeeble",
"cubby", "autoschediastical",
"naprapath", "lyrebird",
"inexistency", "leucophoenicite",
"ferrogoslarite", "reperuse",
"uncombable", "tambo",
"propodiale", "diplomatize",
"Russifier", "clanned",
"corona", "michigan",
"nonutilitarian", "transcorporeal",
"bought", "Cercosporella",
"stapedius", "glandularly",
"pictorially", "weism",
"disilane", "rainproof",
"Caphtor", "scrubbed",
"oinomancy", "pseudoxanthine",
"nonlustrous", "redesertion",
"Oryzorictinae", "gala",
"Mycogone", "reappreciate",
"cyanoguanidine", "seeingness",
"breadwinner", "noreast",
"furacious", "epauliere",
"omniscribent", "Passiflorales",
"uninductive", "inductivity",
"Orbitolina", "Semecarpus",
"migrainoid", "steprelationship",
"phlogisticate", "mesymnion",
"sloped", "edificator",
"beneficent", "culm",
"paleornithology", "unurban",
"throbless", "amplexifoliate",
"sesquiquintile", "sapience",
"astucious", "dithery",
"boor", "ambitus",
"scotching", "uloid",
"uncompromisingness", "hoove",
"waird", "marshiness",
"Jerusalem", "mericarp",
"unevoked", "benzoperoxide",
"outguess", "pyxie",
"hymnic", "euphemize",
"mendacity", "erythremia",
"rosaniline", "unchatteled",
"lienteria", "Bushongo",
"dialoguer", "unrepealably",
"rivethead", "antideflation",
"vinegarish", "manganosiderite",
"doubtingness", "ovopyriform",
"Cephalodiscus", "Muscicapa",
"Animalivora", "angina",
"planispheric", "ipomoein",
"cuproiodargyrite", "sandbox",
"scrat", "Munnopsidae",
"shola", "pentafid",
"overstudiousness", "times",
"nonprofession", "appetible",
"valvulotomy", "goladar",
"uniarticular", "oxyterpene",
"unlapsing", "omega",
"trophonema", "seminonflammable",
"circumzenithal", "starer",
"depthwise", "liberatress",
"unleavened", "unrevolting",
"groundneedle", "topline",
"wandoo", "umangite",
"ordinant", "unachievable",
"oversand", "snare",
"avengeful", "unexplicit",
"mustafina", "sonable",
"rehabilitative", "eulogization",
"papery", "technopsychology",
"impressor", "cresylite",
"entame", "transudatory",
"scotale", "pachydermatoid",
"imaginary", "yeat",
"slipped", "stewardship",
"adatom", "cockstone",
"skyshine", "heavenful",
"comparability", "exprobratory",
"dermorhynchous", "parquet",
"cretaceous", "vesperal",
"raphis", "undangered",
"Glecoma", "engrain",
"counteractively", "Zuludom",
"orchiocatabasis", "Auriculariales",
"warriorwise", "extraorganismal",
"overbuilt", "alveolite",
"tetchy", "terrificness",
"widdle", "unpremonished",
"rebilling", "sequestrum",
"equiconvex", "heliocentricism",
"catabaptist", "okonite",
"propheticism", "helminthagogic",
"calycular", "giantly",
"wingable", "golem",
"unprovided", "commandingness",
"greave", "haply",
"doina", "depressingly",
"subdentate", "impairment",
"decidable", "neurotrophic",
"unpredict", "bicorporeal",
"pendulant", "flatman",
"intrabred", "toplike",
"Prosobranchiata", "farrantly",
"toxoplasmosis", "gorilloid",
"dipsomaniacal", "aquiline",
"atlantite", "ascitic",
"perculsive", "prospectiveness",
"saponaceous", "centrifugalization",
"dinical", "infravaginal",
"beadroll", "affaite",
"Helvidian", "tickleproof",
"abstractionism", "enhedge",
"outwealth", "overcontribute",
"coldfinch", "gymnastic",
"Pincian", "Munychian",
"codisjunct", "quad",
"coracomandibular", "phoenicochroite",
"amender", "selectivity",
"putative", "semantician",
"lophotrichic", "Spatangoidea",
"saccharogenic", "inferent",
"Triconodonta", "arrendation",
"sheepskin", "taurocolla",
"bunghole", "Machiavel",
"triakistetrahedral", "dehairer",
"prezygapophysial", "cylindric",
"pneumonalgia", "sleigher",
"emir", "Socraticism",
"licitness", "massedly",
"instructiveness", "sturdied",
"redecrease", "starosta",
"evictor", "orgiastic",
"squdge", "meloplasty",
"Tsonecan", "repealableness",
"swoony", "myesthesia",
"molecule", "autobiographist",
"reciprocation", "refective",
"unobservantness", "tricae",
"ungouged", "floatability",
"Mesua", "fetlocked",
"chordacentrum", "sedentariness",
"various", "laubanite",
"nectopod", "zenick",
"sequentially", "analgic",
"biodynamics", "posttraumatic",
"nummi", "pyroacetic",
"bot", "redescend",
"dispermy", "undiffusive",
"circular", "trillion",
"Uraniidae", "ploration",
"discipular", "potentness",
"sud", "Hu",
"Eryon", "plugger",
"subdrainage", "jharal",
"abscission", "supermarket",
"countergabion", "glacierist",
"lithotresis", "minniebush",
"zanyism", "eucalypteol",
"sterilely", "unrealize",
"unpatched", "hypochondriacism",
"critically", "cheesecutter",
};
}

View File

@ -0,0 +1,298 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* This program uses map/reduce to just run a distributed job where there is
* no interaction between the tasks and each task write a large unsorted
* random binary sequence file of BytesWritable.
* In order for this program to generate data for terasort with 10-byte keys
* and 90-byte values, have the following config:
* <xmp>
* <?xml version="1.0"?>
* <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
* <configuration>
* <property>
* <name>mapreduce.randomwriter.minkey</name>
* <value>10</value>
* </property>
* <property>
* <name>mapreduce.randomwriter.maxkey</name>
* <value>10</value>
* </property>
* <property>
* <name>mapreduce.randomwriter.minvalue</name>
* <value>90</value>
* </property>
* <property>
* <name>mapreduce.randomwriter.maxvalue</name>
* <value>90</value>
* </property>
* <property>
* <name>mapreduce.randomwriter.totalbytes</name>
* <value>1099511627776</value>
* </property>
* </configuration></xmp>
*
* Equivalently, {@link RandomWriter} also supports all the above options
* and ones supported by {@link GenericOptionsParser} via the command-line.
*/
public class RandomWriter extends Configured implements Tool {
public static final String TOTAL_BYTES = "mapreduce.randomwriter.totalbytes";
public static final String BYTES_PER_MAP =
"mapreduce.randomwriter.bytespermap";
public static final String MAPS_PER_HOST =
"mapreduce.randomwriter.mapsperhost";
public static final String MAX_VALUE = "mapreduce.randomwriter.maxvalue";
public static final String MIN_VALUE = "mapreduce.randomwriter.minvalue";
public static final String MIN_KEY = "mapreduce.randomwriter.minkey";
public static final String MAX_KEY = "mapreduce.randomwriter.maxkey";
/**
* User counters
*/
static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
/**
* A custom input format that creates virtual inputs of a single string
* for each map.
*/
static class RandomInputFormat extends InputFormat<Text, Text> {
/**
* Generate the requested number of file splits, with the filename
* set to the filename of the output file.
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> result = new ArrayList<InputSplit>();
Path outDir = FileOutputFormat.getOutputPath(job);
int numSplits =
job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
for(int i=0; i < numSplits; ++i) {
result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
(String[])null));
}
return result;
}
/**
* Return a single record (filename, "") where the filename is taken from
* the file split.
*/
static class RandomRecordReader extends RecordReader<Text, Text> {
Path name;
Text key = null;
Text value = new Text();
public RandomRecordReader(Path p) {
name = p;
}
public void initialize(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
}
public boolean nextKeyValue() {
if (name != null) {
key = new Text();
key.set(name.getName());
name = null;
return true;
}
return false;
}
public Text getCurrentKey() {
return key;
}
public Text getCurrentValue() {
return value;
}
public void close() {}
public float getProgress() {
return 0.0f;
}
}
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new RandomRecordReader(((FileSplit) split).getPath());
}
}
static class RandomMapper extends Mapper<WritableComparable, Writable,
BytesWritable, BytesWritable> {
private long numBytesToWrite;
private int minKeySize;
private int keySizeRange;
private int minValueSize;
private int valueSizeRange;
private Random random = new Random();
private BytesWritable randomKey = new BytesWritable();
private BytesWritable randomValue = new BytesWritable();
private void randomizeBytes(byte[] data, int offset, int length) {
for(int i=offset + length - 1; i >= offset; --i) {
data[i] = (byte) random.nextInt(256);
}
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(WritableComparable key,
Writable value,
Context context) throws IOException,InterruptedException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
(keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize +
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
context.write(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
if (++itemCount % 200 == 0) {
context.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
context.setStatus("done with " + itemCount + " records.");
}
/**
* Save the values out of the configuaration that we need to write
* the data.
*/
@Override
public void setup(Context context) {
Configuration conf = context.getConfiguration();
numBytesToWrite = conf.getLong(BYTES_PER_MAP,
1*1024*1024*1024);
minKeySize = conf.getInt(MIN_KEY, 10);
keySizeRange =
conf.getInt(MAX_KEY, 1000) - minKeySize;
minValueSize = conf.getInt(MIN_VALUE, 0);
valueSizeRange =
conf.getInt(MAX_VALUE, 20000) - minValueSize;
}
}
/**
* This is the main routine for launching a distributed random write job.
* It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
* The reduce doesn't do anything.
*
* @throws IOException
*/
public int run(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("Usage: writer <out-dir>");
ToolRunner.printGenericCommandUsage(System.out);
return 2;
}
Path outDir = new Path(args[0]);
Configuration conf = getConf();
JobClient client = new JobClient(conf);
ClusterStatus cluster = client.getClusterStatus();
int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
1*1024*1024*1024);
if (numBytesToWritePerMap == 0) {
System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0");
return -2;
}
long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
}
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
Job job = new Job(conf);
job.setJarByClass(RandomWriter.class);
job.setJobName("random-writer");
FileOutputFormat.setOutputPath(job, outDir);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(RandomInputFormat.class);
job.setMapperClass(RandomMapper.class);
job.setReducerClass(Reducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
System.out.println("Running " + numMaps + " maps.");
// reducer NONE
job.setNumReduceTasks(0);
Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true) ? 0 : 1;
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return ret;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);
System.exit(res);
}
}

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.mapreduce.lib.input;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import static org.junit.Assert.*;
@ -28,10 +30,15 @@ import static org.mockito.Mockito.*;
import static org.apache.hadoop.test.MockitoMaker.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class TestFileInputFormat {
@ -80,4 +87,108 @@ public class TestFileInputFormat {
ispy.getSplits(job);
verify(conf).setLong(FileInputFormat.NUM_INPUT_FILES, 1);
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testLastInputSplitAtSplitBoundary() throws Exception {
FileInputFormat fif = new FileInputFormatForTest(1024l * 1024 * 1024,
128l * 1024 * 1024);
Configuration conf = new Configuration();
JobContext jobContext = mock(JobContext.class);
when(jobContext.getConfiguration()).thenReturn(conf);
List<InputSplit> splits = fif.getSplits(jobContext);
assertEquals(8, splits.size());
for (int i = 0 ; i < splits.size() ; i++) {
InputSplit split = splits.get(i);
assertEquals(("host" + i), split.getLocations()[0]);
}
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void testLastInputSplitExceedingSplitBoundary() throws Exception {
FileInputFormat fif = new FileInputFormatForTest(1027l * 1024 * 1024,
128l * 1024 * 1024);
Configuration conf = new Configuration();
JobContext jobContext = mock(JobContext.class);
when(jobContext.getConfiguration()).thenReturn(conf);
List<InputSplit> splits = fif.getSplits(jobContext);
assertEquals(8, splits.size());
for (int i = 0; i < splits.size(); i++) {
InputSplit split = splits.get(i);
assertEquals(("host" + i), split.getLocations()[0]);
}
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void testLastInputSplitSingleSplit() throws Exception {
FileInputFormat fif = new FileInputFormatForTest(100l * 1024 * 1024,
128l * 1024 * 1024);
Configuration conf = new Configuration();
JobContext jobContext = mock(JobContext.class);
when(jobContext.getConfiguration()).thenReturn(conf);
List<InputSplit> splits = fif.getSplits(jobContext);
assertEquals(1, splits.size());
for (int i = 0; i < splits.size(); i++) {
InputSplit split = splits.get(i);
assertEquals(("host" + i), split.getLocations()[0]);
}
}
private class FileInputFormatForTest<K, V> extends FileInputFormat<K, V> {
long splitSize;
long length;
FileInputFormatForTest(long length, long splitSize) {
this.length = length;
this.splitSize = splitSize;
}
@Override
public RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return null;
}
@Override
protected List<FileStatus> listStatus(JobContext job) throws IOException {
FileStatus mockFileStatus = mock(FileStatus.class);
when(mockFileStatus.getBlockSize()).thenReturn(splitSize);
Path mockPath = mock(Path.class);
FileSystem mockFs = mock(FileSystem.class);
BlockLocation[] blockLocations = mockBlockLocations(length, splitSize);
when(mockFs.getFileBlockLocations(mockFileStatus, 0, length)).thenReturn(
blockLocations);
when(mockPath.getFileSystem(any(Configuration.class))).thenReturn(mockFs);
when(mockFileStatus.getPath()).thenReturn(mockPath);
when(mockFileStatus.getLen()).thenReturn(length);
List<FileStatus> list = new ArrayList<FileStatus>();
list.add(mockFileStatus);
return list;
}
@Override
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return splitSize;
}
private BlockLocation[] mockBlockLocations(long size, long splitSize) {
int numLocations = (int) (size / splitSize);
if (size % splitSize != 0)
numLocations++;
BlockLocation[] blockLocations = new BlockLocation[numLocations];
for (int i = 0; i < numLocations; i++) {
String[] names = new String[] { "b" + i };
String[] hosts = new String[] { "host" + i };
blockLocations[i] = new BlockLocation(names, hosts, i * splitSize,
Math.min(splitSize, size - (splitSize * i)));
}
return blockLocations;
}
}
}

View File

@ -97,9 +97,9 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.examples.ExampleDriver</mainClass>

View File

@ -238,7 +238,9 @@ public class TeraGen extends Configured implements Tool {
@Override
public void cleanup(Context context) {
checksumCounter.increment(total.getLow8());
if (checksumCounter != null) {
checksumCounter.increment(total.getLow8());
}
}
}
@ -307,5 +309,4 @@ public class TeraGen extends Configured implements Tool {
int res = ToolRunner.run(new Configuration(), new TeraGen(), args);
System.exit(res);
}
}

View File

@ -84,21 +84,7 @@ public interface ApplicationConstants {
public static final String STDERR = "stderr";
public static final String STDOUT = "stdout";
/**
* Classpath for typical applications.
*/
public static final String[] APPLICATION_CLASSPATH =
new String[] {
"$HADOOP_CONF_DIR",
"$HADOOP_COMMON_HOME/share/hadoop/common/*",
"$HADOOP_COMMON_HOME/share/hadoop/common/lib/*",
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/*",
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*",
"$YARN_HOME/share/hadoop/mapreduce/*",
"$YARN_HOME/share/hadoop/mapreduce/lib/*"
};
/**
* Environment for Applications.
*

View File

@ -508,6 +508,10 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS =
2000;
/** Standard Hadoop classes */
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
+ "application.classpath";
public YarnConfiguration() {
super();
}

View File

@ -482,4 +482,18 @@
<name>yarn.web-proxy.address</name>
<value/>
</property>
<property>
<description>Classpath for typical applications.</description>
<name>yarn.application.classpath</name>
<value>
$HADOOP_CONF_DIR,
$HADOOP_COMMON_HOME/share/hadoop/common/*,
$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,
$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,
$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,
$YARN_HOME/share/hadoop/mapreduce/*,
$YARN_HOME/share/hadoop/mapreduce/lib/*
</value>
</property>
</configuration>