From f73daf6af1c87c65dd97e5ec4608ba2742dc83ea Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Tue, 24 Jan 2012 18:21:27 +0000 Subject: [PATCH 1/6] MAPREDUCE-3505. yarn APPLICATION_CLASSPATH needs to be overridable. (ahmed via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1235391 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../v2/app/job/impl/TaskAttemptImpl.java | 6 +++--- .../hadoop-mapreduce-client-common/pom.xml | 4 ++++ .../hadoop/mapreduce/v2/util/MRApps.java | 15 +++++++++------ .../hadoop/mapreduce/v2/util/TestMRApps.java | 18 ++++++++++++++++++ .../org/apache/hadoop/mapred/YARNRunner.java | 2 +- .../hadoop/yarn/api/ApplicationConstants.java | 16 +--------------- .../hadoop/yarn/conf/YarnConfiguration.java | 4 ++++ .../src/main/resources/yarn-default.xml | 14 ++++++++++++++ 9 files changed, 57 insertions(+), 25 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 799ce495ec8..f74ebe519b5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -540,6 +540,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3681. Fixed computation of queue's usedCapacity. (acmurthy) + MAPREDUCE-3505. yarn APPLICATION_CLASSPATH needs to be overridable. + (ahmed via tucu) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index b296d02d55f..7cca98031d5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -522,13 +522,13 @@ public abstract class TaskAttemptImpl implements * a parent CLC and use it for all the containers, so this should go away * once the mr-generated-classpath stuff is gone. */ - private static String getInitialClasspath() throws IOException { + private static String getInitialClasspath(Configuration conf) throws IOException { synchronized (classpathLock) { if (initialClasspathFlag.get()) { return initialClasspath; } Map env = new HashMap(); - MRApps.setClasspath(env); + MRApps.setClasspath(env, conf); initialClasspath = env.get(Environment.CLASSPATH.name()); initialClasspathFlag.set(true); return initialClasspath; @@ -631,7 +631,7 @@ public abstract class TaskAttemptImpl implements Apps.addToEnvironment( environment, Environment.CLASSPATH.name(), - getInitialClasspath()); + getInitialClasspath(conf)); } catch (IOException e) { throw new YarnException(e); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml index cb199ac70a9..e33e589c9e2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml @@ -38,6 +38,10 @@ org.apache.hadoop hadoop-mapreduce-client-core + + org.apache.hadoop + hadoop-yarn-server-common + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index cb802f1c5ad..129996e8193 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -171,7 +172,7 @@ public class MRApps extends Apps { } private static void setMRFrameworkClasspath( - Map environment) throws IOException { + Map environment, Configuration conf) throws IOException { InputStream classpathFileStream = null; BufferedReader reader = null; try { @@ -208,8 +209,10 @@ public class MRApps extends Apps { } // Add standard Hadoop classes - for (String c : ApplicationConstants.APPLICATION_CLASSPATH) { - Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c); + for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH) + .split(",")) { + Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c + .trim()); } } finally { if (classpathFileStream != null) { @@ -222,8 +225,8 @@ public class MRApps extends Apps { // TODO: Remove duplicates. } - public static void setClasspath(Map environment) - throws IOException { + public static void setClasspath(Map environment, + Configuration conf) throws IOException { Apps.addToEnvironment( environment, Environment.CLASSPATH.name(), @@ -232,7 +235,7 @@ public class MRApps extends Apps { environment, Environment.CLASSPATH.name(), Environment.PWD.$() + Path.SEPARATOR + "*"); - MRApps.setMRFrameworkClasspath(environment); + MRApps.setMRFrameworkClasspath(environment, conf); } private static final String STAGING_CONSTANT = ".staging"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java index 11589980625..fd9d1d23281 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java @@ -18,7 +18,12 @@ package org.apache.hadoop.mapreduce.v2.util; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -121,4 +126,17 @@ public class TestMRApps { "/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile); } + @Test public void testSetClasspath() throws IOException { + Job job = Job.getInstance(); + Map environment = new HashMap(); + MRApps.setClasspath(environment, job.getConfiguration()); + assertEquals("job.jar:$PWD/*:$HADOOP_CONF_DIR:" + + "$HADOOP_COMMON_HOME/share/hadoop/common/*:" + + "$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:" + + "$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:" + + "$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:" + + "$YARN_HOME/share/hadoop/mapreduce/*:" + + "$YARN_HOME/share/hadoop/mapreduce/lib/*", + environment.get("CLASSPATH")); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 3513d52859a..da48e9c0249 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -406,7 +406,7 @@ public class YARNRunner implements ClientProtocol { // Setup the CLASSPATH in environment // i.e. add { job jar, CWD, Hadoop jars} to classpath. Map environment = new HashMap(); - MRApps.setClasspath(environment); + MRApps.setClasspath(environment, conf); // Parse distributed cache MRApps.setupDistributedCache(jobConf, localResources); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java index 9439e21cfa0..53020472013 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java @@ -84,21 +84,7 @@ public interface ApplicationConstants { public static final String STDERR = "stderr"; public static final String STDOUT = "stdout"; - - /** - * Classpath for typical applications. - */ - public static final String[] APPLICATION_CLASSPATH = - new String[] { - "$HADOOP_CONF_DIR", - "$HADOOP_COMMON_HOME/share/hadoop/common/*", - "$HADOOP_COMMON_HOME/share/hadoop/common/lib/*", - "$HADOOP_HDFS_HOME/share/hadoop/hdfs/*", - "$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*", - "$YARN_HOME/share/hadoop/mapreduce/*", - "$YARN_HOME/share/hadoop/mapreduce/lib/*" - }; - + /** * Environment for Applications. * diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f8a4324302c..a0300797790 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -508,6 +508,10 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS = 2000; + /** Standard Hadoop classes */ + public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX + + "application.classpath"; + public YarnConfiguration() { super(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml index 089a23608b7..b9e5ea47e40 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml @@ -482,4 +482,18 @@ yarn.web-proxy.address + + + Classpath for typical applications. + yarn.application.classpath + + $HADOOP_CONF_DIR, + $HADOOP_COMMON_HOME/share/hadoop/common/*, + $HADOOP_COMMON_HOME/share/hadoop/common/lib/*, + $HADOOP_HDFS_HOME/share/hadoop/hdfs/*, + $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*, + $YARN_HOME/share/hadoop/mapreduce/*, + $YARN_HOME/share/hadoop/mapreduce/lib/* + + From dc615c312b81d2bff17821b59fba1b76aa24f585 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 24 Jan 2012 21:29:39 +0000 Subject: [PATCH 2/6] MAPREDUCE-3710. Improved FileInputFormat to return better locality for the last split. Contributed by Siddarth Seth. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1235510 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../apache/hadoop/mapred/FileInputFormat.java | 6 +- .../mapreduce/lib/input/FileInputFormat.java | 3 +- .../hadoop/mapred/TestFileInputFormat.java | 101 ++++++++++++++++ .../lib/input/TestFileInputFormat.java | 111 ++++++++++++++++++ 5 files changed, 221 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f74ebe519b5..522ea157da3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -189,6 +189,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3692. yarn-resourcemanager out and log files can get big. (eli) + MAPREDUCE-3710. Improved FileInputFormat to return better locality for the + last split. (Siddarth Seth via vinodkv) + OPTIMIZATIONS MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index a7e59562bd5..aaf3c26b789 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -289,8 +289,10 @@ public abstract class FileInputFormat implements InputFormat { } if (bytesRemaining != 0) { - splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, - blkLocations[blkLocations.length-1].getHosts())); + String[] splitHosts = getSplitHosts(blkLocations, length + - bytesRemaining, bytesRemaining, clusterMap); + splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, + splitHosts)); } } else if (length != 0) { String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index 781715dbeef..d86ad156bda 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -286,8 +286,9 @@ public abstract class FileInputFormat extends InputFormat { } if (bytesRemaining != 0) { + int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, - blkLocations[blkLocations.length-1].getHosts())); + blkLocations[blkIndex].getHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java index 3476e635c5c..fca9b358647 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.mapred; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.DataOutputStream; import java.io.IOException; @@ -32,6 +36,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.Text; +@SuppressWarnings("deprecation") public class TestFileInputFormat extends TestCase { Configuration conf = new Configuration(); @@ -186,6 +191,102 @@ public class TestFileInputFormat extends TestCase { assertEquals(splits.length, 2); } + @SuppressWarnings("rawtypes") + public void testLastInputSplitAtSplitBoundary() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(1024l * 1024 * 1024, + 128l * 1024 * 1024); + JobConf job = new JobConf(); + InputSplit[] splits = fif.getSplits(job, 8); + assertEquals(8, splits.length); + for (int i = 0; i < splits.length; i++) { + InputSplit split = splits[i]; + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + @SuppressWarnings("rawtypes") + public void testLastInputSplitExceedingSplitBoundary() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(1027l * 1024 * 1024, + 128l * 1024 * 1024); + JobConf job = new JobConf(); + InputSplit[] splits = fif.getSplits(job, 8); + assertEquals(8, splits.length); + for (int i = 0; i < splits.length; i++) { + InputSplit split = splits[i]; + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + @SuppressWarnings("rawtypes") + public void testLastInputSplitSingleSplit() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(100l * 1024 * 1024, + 128l * 1024 * 1024); + JobConf job = new JobConf(); + InputSplit[] splits = fif.getSplits(job, 1); + assertEquals(1, splits.length); + for (int i = 0; i < splits.length; i++) { + InputSplit split = splits[i]; + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + private class FileInputFormatForTest extends FileInputFormat { + + long splitSize; + long length; + + FileInputFormatForTest(long length, long splitSize) { + this.length = length; + this.splitSize = splitSize; + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + return null; + } + + @Override + protected FileStatus[] listStatus(JobConf job) throws IOException { + FileStatus mockFileStatus = mock(FileStatus.class); + when(mockFileStatus.getBlockSize()).thenReturn(splitSize); + when(mockFileStatus.isDirectory()).thenReturn(false); + Path mockPath = mock(Path.class); + FileSystem mockFs = mock(FileSystem.class); + + BlockLocation[] blockLocations = mockBlockLocations(length, splitSize); + when(mockFs.getFileBlockLocations(mockFileStatus, 0, length)).thenReturn( + blockLocations); + when(mockPath.getFileSystem(any(Configuration.class))).thenReturn(mockFs); + + when(mockFileStatus.getPath()).thenReturn(mockPath); + when(mockFileStatus.getLen()).thenReturn(length); + + FileStatus[] fs = new FileStatus[1]; + fs[0] = mockFileStatus; + return fs; + } + + @Override + protected long computeSplitSize(long blockSize, long minSize, long maxSize) { + return splitSize; + } + + private BlockLocation[] mockBlockLocations(long size, long splitSize) { + int numLocations = (int) (size / splitSize); + if (size % splitSize != 0) + numLocations++; + BlockLocation[] blockLocations = new BlockLocation[numLocations]; + for (int i = 0; i < numLocations; i++) { + String[] names = new String[] { "b" + i }; + String[] hosts = new String[] { "host" + i }; + blockLocations[i] = new BlockLocation(names, hosts, i * splitSize, + Math.min(splitSize, size - (splitSize * i))); + } + return blockLocations; + } + } + static void writeFile(Configuration conf, Path name, short replication, int numBlocks) throws IOException { FileSystem fileSys = FileSystem.get(conf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java index 692a6b6da83..824e6842cff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java @@ -19,7 +19,9 @@ package org.apache.hadoop.mapreduce.lib.input; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.junit.Test; import static org.junit.Assert.*; @@ -28,10 +30,15 @@ import static org.mockito.Mockito.*; import static org.apache.hadoop.test.MockitoMaker.*; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; public class TestFileInputFormat { @@ -80,4 +87,108 @@ public class TestFileInputFormat { ispy.getSplits(job); verify(conf).setLong(FileInputFormat.NUM_INPUT_FILES, 1); } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testLastInputSplitAtSplitBoundary() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(1024l * 1024 * 1024, + 128l * 1024 * 1024); + Configuration conf = new Configuration(); + JobContext jobContext = mock(JobContext.class); + when(jobContext.getConfiguration()).thenReturn(conf); + List splits = fif.getSplits(jobContext); + assertEquals(8, splits.size()); + for (int i = 0 ; i < splits.size() ; i++) { + InputSplit split = splits.get(i); + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testLastInputSplitExceedingSplitBoundary() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(1027l * 1024 * 1024, + 128l * 1024 * 1024); + Configuration conf = new Configuration(); + JobContext jobContext = mock(JobContext.class); + when(jobContext.getConfiguration()).thenReturn(conf); + List splits = fif.getSplits(jobContext); + assertEquals(8, splits.size()); + for (int i = 0; i < splits.size(); i++) { + InputSplit split = splits.get(i); + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testLastInputSplitSingleSplit() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(100l * 1024 * 1024, + 128l * 1024 * 1024); + Configuration conf = new Configuration(); + JobContext jobContext = mock(JobContext.class); + when(jobContext.getConfiguration()).thenReturn(conf); + List splits = fif.getSplits(jobContext); + assertEquals(1, splits.size()); + for (int i = 0; i < splits.size(); i++) { + InputSplit split = splits.get(i); + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + private class FileInputFormatForTest extends FileInputFormat { + + long splitSize; + long length; + + FileInputFormatForTest(long length, long splitSize) { + this.length = length; + this.splitSize = splitSize; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return null; + } + + @Override + protected List listStatus(JobContext job) throws IOException { + FileStatus mockFileStatus = mock(FileStatus.class); + when(mockFileStatus.getBlockSize()).thenReturn(splitSize); + Path mockPath = mock(Path.class); + FileSystem mockFs = mock(FileSystem.class); + + BlockLocation[] blockLocations = mockBlockLocations(length, splitSize); + when(mockFs.getFileBlockLocations(mockFileStatus, 0, length)).thenReturn( + blockLocations); + when(mockPath.getFileSystem(any(Configuration.class))).thenReturn(mockFs); + + when(mockFileStatus.getPath()).thenReturn(mockPath); + when(mockFileStatus.getLen()).thenReturn(length); + + List list = new ArrayList(); + list.add(mockFileStatus); + return list; + } + + @Override + protected long computeSplitSize(long blockSize, long minSize, long maxSize) { + return splitSize; + } + + private BlockLocation[] mockBlockLocations(long size, long splitSize) { + int numLocations = (int) (size / splitSize); + if (size % splitSize != 0) + numLocations++; + BlockLocation[] blockLocations = new BlockLocation[numLocations]; + for (int i = 0; i < numLocations; i++) { + String[] names = new String[] { "b" + i }; + String[] hosts = new String[] { "host" + i }; + blockLocations[i] = new BlockLocation(names, hosts, i * splitSize, + Math.min(splitSize, size - (splitSize * i))); + } + return blockLocations; + } + } } From 078ae89a4793eb6a153a88b106d330fd059a4933 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 24 Jan 2012 23:18:05 +0000 Subject: [PATCH 3/6] MAPREDUCE-3714. Fixed EventFetcher and Fetcher threads to shut-down properly so that reducers don't hang in corner cases. (vinodkv) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1235545 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../mapreduce/task/reduce/EventFetcher.java | 18 +++++++++++++++++- .../hadoop/mapreduce/task/reduce/Fetcher.java | 17 +++++++++++++++-- .../hadoop/mapreduce/task/reduce/Shuffle.java | 19 ++++--------------- 4 files changed, 39 insertions(+), 18 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 522ea157da3..9e0668de1d4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -546,6 +546,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3505. yarn APPLICATION_CLASSPATH needs to be overridable. (ahmed via tucu) + MAPREDUCE-3714. Fixed EventFetcher and Fetcher threads to shut-down properly + so that reducers don't hang in corner cases. (vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java index 6facb47aa21..fd80ec2b1e9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.TaskAttemptID; +@SuppressWarnings("deprecation") class EventFetcher extends Thread { private static final long SLEEP_TIME = 1000; private static final int MAX_EVENTS_TO_FETCH = 10000; @@ -41,6 +42,8 @@ class EventFetcher extends Thread { private ExceptionReporter exceptionReporter = null; private int maxMapRuntime = 0; + + private volatile boolean stopped = false; public EventFetcher(TaskAttemptID reduce, TaskUmbilicalProtocol umbilical, @@ -60,7 +63,7 @@ class EventFetcher extends Thread { LOG.info(reduce + " Thread started: " + getName()); try { - while (true && !Thread.currentThread().isInterrupted()) { + while (!stopped && !Thread.currentThread().isInterrupted()) { try { int numNewMaps = getMapCompletionEvents(); failures = 0; @@ -71,6 +74,9 @@ class EventFetcher extends Thread { if (!Thread.currentThread().isInterrupted()) { Thread.sleep(SLEEP_TIME); } + } catch (InterruptedException e) { + LOG.info("EventFetcher is interrupted.. Returning"); + return; } catch (IOException ie) { LOG.info("Exception in getting events", ie); // check to see whether to abort @@ -90,6 +96,16 @@ class EventFetcher extends Thread { return; } } + + public void shutDown() { + this.stopped = true; + interrupt(); + try { + join(5000); + } catch(InterruptedException ie) { + LOG.warn("Got interrupted while joining " + getName(), ie); + } + } /** * Queries the {@link TaskTracker} for a set of map-completion events diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 5a213f05c1a..93200873d6b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; +@SuppressWarnings({"deprecation"}) class Fetcher extends Thread { private static final Log LOG = LogFactory.getLog(Fetcher.class); @@ -88,6 +89,8 @@ class Fetcher extends Thread { private final Decompressor decompressor; private final SecretKey jobTokenSecret; + private volatile boolean stopped = false; + public Fetcher(JobConf job, TaskAttemptID reduceId, ShuffleScheduler scheduler, MergeManager merger, Reporter reporter, ShuffleClientMetrics metrics, @@ -135,7 +138,7 @@ class Fetcher extends Thread { public void run() { try { - while (true && !Thread.currentThread().isInterrupted()) { + while (!stopped && !Thread.currentThread().isInterrupted()) { MapHost host = null; try { // If merge is on, block @@ -160,7 +163,17 @@ class Fetcher extends Thread { exceptionReporter.reportException(t); } } - + + public void shutDown() throws InterruptedException { + this.stopped = true; + interrupt(); + try { + join(5000); + } catch (InterruptedException ie) { + LOG.warn("Got interrupt while joining " + getName(), ie); + } + } + /** * The crux of the matter... * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java index 4b8b854952c..e7d7d71d079 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java @@ -19,8 +19,6 @@ package org.apache.hadoop.mapreduce.task.reduce; import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; @@ -33,17 +31,17 @@ import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Task; +import org.apache.hadoop.mapred.Task.CombineOutputCollector; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; -import org.apache.hadoop.mapred.Task.CombineOutputCollector; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.Progress; @InterfaceAudience.Private @InterfaceStability.Unstable +@SuppressWarnings({"deprecation", "unchecked", "rawtypes"}) public class Shuffle implements ExceptionReporter { - private static final Log LOG = LogFactory.getLog(Shuffle.class); private static final int PROGRESS_FREQUENCY = 2000; private final TaskAttemptID reduceId; @@ -100,7 +98,6 @@ public class Shuffle implements ExceptionReporter { this, mergePhase, mapOutputFile); } - @SuppressWarnings("unchecked") public RawKeyValueIterator run() throws IOException, InterruptedException { // Start the map-completion events fetcher thread final EventFetcher eventFetcher = @@ -130,19 +127,11 @@ public class Shuffle implements ExceptionReporter { } // Stop the event-fetcher thread - eventFetcher.interrupt(); - try { - eventFetcher.join(); - } catch(Throwable t) { - LOG.info("Failed to stop " + eventFetcher.getName(), t); - } + eventFetcher.shutDown(); // Stop the map-output fetcher threads for (Fetcher fetcher : fetchers) { - fetcher.interrupt(); - } - for (Fetcher fetcher : fetchers) { - fetcher.join(); + fetcher.shutDown(); } fetchers = null; From c7642a135c4e182af762e41855a8bcf77d0fa05d Mon Sep 17 00:00:00 2001 From: Mahadev Konar Date: Wed, 25 Jan 2012 00:14:26 +0000 Subject: [PATCH 4/6] MAPREDUCE-3712. The mapreduce tar does not contain the hadoop-mapreduce-client-jobclient-tests.jar. (mahadev) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1235566 13f79535-47bb-0310-9956-ffa450edef68 --- .../resources/assemblies/hadoop-mapreduce-dist.xml | 11 +++++++++++ hadoop-mapreduce-project/CHANGES.txt | 3 +++ 2 files changed, 14 insertions(+) diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml index 57f3c66dad3..281ce0ddcdd 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml @@ -127,6 +127,17 @@ false + + + org.apache.hadoop:hadoop-mapreduce-client-jobclient + + + tests + share/hadoop/${hadoop.component} + false + false + + diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9e0668de1d4..c6b6290fd6f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -549,6 +549,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3714. Fixed EventFetcher and Fetcher threads to shut-down properly so that reducers don't hang in corner cases. (vinodkv) + MAPREDUCE-3712. The mapreduce tar does not contain the hadoop-mapreduce-client- + jobclient-tests.jar. (mahadev) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES From c43a5a992e4ae716560eb31a82fc9b936fa1727b Mon Sep 17 00:00:00 2001 From: Mahadev Konar Date: Wed, 25 Jan 2012 06:24:46 +0000 Subject: [PATCH 5/6] MAPREDUCE-3717. JobClient test jar has missing files to run all the test programs. (mahadev) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1235639 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop-mapreduce-client-jobclient/pom.xml | 7 + .../hadoop/mapred/GenericMRLoadGenerator.java | 2 +- .../hadoop/mapred/ThreadedMapBenchmark.java | 1 - .../mapreduce/GenericMRLoadGenerator.java | 1 - .../hadoop/mapreduce/RandomTextWriter.java | 757 ++++++++++++++++++ .../apache/hadoop/mapreduce/RandomWriter.java | 298 +++++++ .../apache/hadoop/test/MapredTestDriver.java | 0 .../hadoop-mapreduce-examples/pom.xml | 4 +- 9 files changed, 1068 insertions(+), 5 deletions(-) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/GenericMRLoadGenerator.java (99%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/ThreadedMapBenchmark.java (99%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java (99%) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomWriter.java rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/test/MapredTestDriver.java (100%) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c6b6290fd6f..aedea7f0b3d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -552,6 +552,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3712. The mapreduce tar does not contain the hadoop-mapreduce-client- jobclient-tests.jar. (mahadev) + MAPREDUCE-3717. JobClient test jar has missing files to run all the test programs. + (mahadev) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml index e59dd0ce83b..b12c09aae18 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml @@ -102,6 +102,13 @@ test-compile + + + + org.apache.hadoop.test.MapredTestDriver + + + org.apache.maven.plugins diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/GenericMRLoadGenerator.java similarity index 99% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/GenericMRLoadGenerator.java index bb6d9774e35..f67ca1c17ee 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/GenericMRLoadGenerator.java @@ -29,7 +29,6 @@ import java.util.Stack; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.examples.RandomTextWriter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -40,6 +39,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.mapreduce.RandomTextWriter; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/ThreadedMapBenchmark.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ThreadedMapBenchmark.java similarity index 99% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/ThreadedMapBenchmark.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ThreadedMapBenchmark.java index f2a2e236dd4..f5512327f56 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/ThreadedMapBenchmark.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ThreadedMapBenchmark.java @@ -25,7 +25,6 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.examples.RandomWriter; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java similarity index 99% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java index 268b126cc73..7ee28dfa32a 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java @@ -29,7 +29,6 @@ import java.util.Stack; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.examples.RandomTextWriter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java new file mode 100644 index 00000000000..c3f6476d7b1 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java @@ -0,0 +1,757 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.ClusterStatus; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * This program uses map/reduce to just run a distributed job where there is + * no interaction between the tasks and each task writes a large unsorted + * random sequence of words. + * In order for this program to generate data for terasort with a 5-10 words + * per key and 20-100 words per value, have the following config: + * + * <?xml version="1.0"?> + * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + * <configuration> + * <property> + * <name>mapreduce.randomtextwriter.minwordskey</name> + * <value>5</value> + * </property> + * <property> + * <name>mapreduce.randomtextwriter.maxwordskey</name> + * <value>10</value> + * </property> + * <property> + * <name>mapreduce.randomtextwriter.minwordsvalue</name> + * <value>20</value> + * </property> + * <property> + * <name>mapreduce.randomtextwriter.maxwordsvalue</name> + * <value>100</value> + * </property> + * <property> + * <name>mapreduce.randomtextwriter.totalbytes</name> + * <value>1099511627776</value> + * </property> + * </configuration> + * + * Equivalently, {@link RandomTextWriter} also supports all the above options + * and ones supported by {@link Tool} via the command-line. + * + * To run: bin/hadoop jar hadoop-${version}-examples.jar randomtextwriter + * [-outFormat output format class] output + */ +public class RandomTextWriter extends Configured implements Tool { + public static final String TOTAL_BYTES = + "mapreduce.randomtextwriter.totalbytes"; + public static final String BYTES_PER_MAP = + "mapreduce.randomtextwriter.bytespermap"; + public static final String MAPS_PER_HOST = + "mapreduce.randomtextwriter.mapsperhost"; + public static final String MAX_VALUE = "mapreduce.randomtextwriter.maxwordsvalue"; + public static final String MIN_VALUE = "mapreduce.randomtextwriter.minwordsvalue"; + public static final String MIN_KEY = "mapreduce.randomtextwriter.minwordskey"; + public static final String MAX_KEY = "mapreduce.randomtextwriter.maxwordskey"; + + static int printUsage() { + System.out.println("randomtextwriter " + + "[-outFormat ] " + + ""); + ToolRunner.printGenericCommandUsage(System.out); + return 2; + } + + /** + * User counters + */ + static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN } + + static class RandomTextMapper extends Mapper { + + private long numBytesToWrite; + private int minWordsInKey; + private int wordsInKeyRange; + private int minWordsInValue; + private int wordsInValueRange; + private Random random = new Random(); + + /** + * Save the configuration value that we need to write the data. + */ + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + numBytesToWrite = conf.getLong(BYTES_PER_MAP, + 1*1024*1024*1024); + minWordsInKey = conf.getInt(MIN_KEY, 5); + wordsInKeyRange = (conf.getInt(MAX_KEY, 10) - minWordsInKey); + minWordsInValue = conf.getInt(MIN_VALUE, 10); + wordsInValueRange = (conf.getInt(MAX_VALUE, 100) - minWordsInValue); + } + + /** + * Given an output filename, write a bunch of random records to it. + */ + public void map(Text key, Text value, + Context context) throws IOException,InterruptedException { + int itemCount = 0; + while (numBytesToWrite > 0) { + // Generate the key/value + int noWordsKey = minWordsInKey + + (wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0); + int noWordsValue = minWordsInValue + + (wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0); + Text keyWords = generateSentence(noWordsKey); + Text valueWords = generateSentence(noWordsValue); + + // Write the sentence + context.write(keyWords, valueWords); + + numBytesToWrite -= (keyWords.getLength() + valueWords.getLength()); + + // Update counters, progress etc. + context.getCounter(Counters.BYTES_WRITTEN).increment( + keyWords.getLength() + valueWords.getLength()); + context.getCounter(Counters.RECORDS_WRITTEN).increment(1); + if (++itemCount % 200 == 0) { + context.setStatus("wrote record " + itemCount + ". " + + numBytesToWrite + " bytes left."); + } + } + context.setStatus("done with " + itemCount + " records."); + } + + private Text generateSentence(int noWords) { + StringBuffer sentence = new StringBuffer(); + String space = " "; + for (int i=0; i < noWords; ++i) { + sentence.append(words[random.nextInt(words.length)]); + sentence.append(space); + } + return new Text(sentence.toString()); + } + } + + /** + * This is the main routine for launching a distributed random write job. + * It runs 10 maps/node and each node writes 1 gig of data to a DFS file. + * The reduce doesn't do anything. + * + * @throws IOException + */ + public int run(String[] args) throws Exception { + if (args.length == 0) { + return printUsage(); + } + + Configuration conf = getConf(); + JobClient client = new JobClient(conf); + ClusterStatus cluster = client.getClusterStatus(); + int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); + long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, + 1*1024*1024*1024); + if (numBytesToWritePerMap == 0) { + System.err.println("Cannot have " + BYTES_PER_MAP +" set to 0"); + return -2; + } + long totalBytesToWrite = conf.getLong(TOTAL_BYTES, + numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers()); + int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); + if (numMaps == 0 && totalBytesToWrite > 0) { + numMaps = 1; + conf.setLong(BYTES_PER_MAP, totalBytesToWrite); + } + conf.setInt(MRJobConfig.NUM_MAPS, numMaps); + + Job job = new Job(conf); + + job.setJarByClass(RandomTextWriter.class); + job.setJobName("random-text-writer"); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + job.setInputFormatClass(RandomWriter.RandomInputFormat.class); + job.setMapperClass(RandomTextMapper.class); + + Class outputFormatClass = + SequenceFileOutputFormat.class; + List otherArgs = new ArrayList(); + for(int i=0; i < args.length; ++i) { + try { + if ("-outFormat".equals(args[i])) { + outputFormatClass = + Class.forName(args[++i]).asSubclass(OutputFormat.class); + } else { + otherArgs.add(args[i]); + } + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i-1]); + return printUsage(); // exits + } + } + + job.setOutputFormatClass(outputFormatClass); + FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0))); + + System.out.println("Running " + numMaps + " maps."); + + // reducer NONE + job.setNumReduceTasks(0); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date endTime = new Date(); + System.out.println("Job ended: " + endTime); + System.out.println("The job took " + + (endTime.getTime() - startTime.getTime()) /1000 + + " seconds."); + + return ret; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new RandomTextWriter(), args); + System.exit(res); + } + + /** + * A random list of 100 words from /usr/share/dict/words + */ + private static String[] words = { + "diurnalness", "Homoiousian", + "spiranthic", "tetragynian", + "silverhead", "ungreat", + "lithograph", "exploiter", + "physiologian", "by", + "hellbender", "Filipendula", + "undeterring", "antiscolic", + "pentagamist", "hypoid", + "cacuminal", "sertularian", + "schoolmasterism", "nonuple", + "gallybeggar", "phytonic", + "swearingly", "nebular", + "Confervales", "thermochemically", + "characinoid", "cocksuredom", + "fallacious", "feasibleness", + "debromination", "playfellowship", + "tramplike", "testa", + "participatingly", "unaccessible", + "bromate", "experientialist", + "roughcast", "docimastical", + "choralcelo", "blightbird", + "peptonate", "sombreroed", + "unschematized", "antiabolitionist", + "besagne", "mastication", + "bromic", "sviatonosite", + "cattimandoo", "metaphrastical", + "endotheliomyoma", "hysterolysis", + "unfulminated", "Hester", + "oblongly", "blurredness", + "authorling", "chasmy", + "Scorpaenidae", "toxihaemia", + "Dictograph", "Quakerishly", + "deaf", "timbermonger", + "strammel", "Thraupidae", + "seditious", "plerome", + "Arneb", "eristically", + "serpentinic", "glaumrie", + "socioromantic", "apocalypst", + "tartrous", "Bassaris", + "angiolymphoma", "horsefly", + "kenno", "astronomize", + "euphemious", "arsenide", + "untongued", "parabolicness", + "uvanite", "helpless", + "gemmeous", "stormy", + "templar", "erythrodextrin", + "comism", "interfraternal", + "preparative", "parastas", + "frontoorbital", "Ophiosaurus", + "diopside", "serosanguineous", + "ununiformly", "karyological", + "collegian", "allotropic", + "depravity", "amylogenesis", + "reformatory", "epidymides", + "pleurotropous", "trillium", + "dastardliness", "coadvice", + "embryotic", "benthonic", + "pomiferous", "figureheadship", + "Megaluridae", "Harpa", + "frenal", "commotion", + "abthainry", "cobeliever", + "manilla", "spiciferous", + "nativeness", "obispo", + "monilioid", "biopsic", + "valvula", "enterostomy", + "planosubulate", "pterostigma", + "lifter", "triradiated", + "venialness", "tum", + "archistome", "tautness", + "unswanlike", "antivenin", + "Lentibulariaceae", "Triphora", + "angiopathy", "anta", + "Dawsonia", "becomma", + "Yannigan", "winterproof", + "antalgol", "harr", + "underogating", "ineunt", + "cornberry", "flippantness", + "scyphostoma", "approbation", + "Ghent", "Macraucheniidae", + "scabbiness", "unanatomized", + "photoelasticity", "eurythermal", + "enation", "prepavement", + "flushgate", "subsequentially", + "Edo", "antihero", + "Isokontae", "unforkedness", + "porriginous", "daytime", + "nonexecutive", "trisilicic", + "morphiomania", "paranephros", + "botchedly", "impugnation", + "Dodecatheon", "obolus", + "unburnt", "provedore", + "Aktistetae", "superindifference", + "Alethea", "Joachimite", + "cyanophilous", "chorograph", + "brooky", "figured", + "periclitation", "quintette", + "hondo", "ornithodelphous", + "unefficient", "pondside", + "bogydom", "laurinoxylon", + "Shiah", "unharmed", + "cartful", "noncrystallized", + "abusiveness", "cromlech", + "japanned", "rizzomed", + "underskin", "adscendent", + "allectory", "gelatinousness", + "volcano", "uncompromisingly", + "cubit", "idiotize", + "unfurbelowed", "undinted", + "magnetooptics", "Savitar", + "diwata", "ramosopalmate", + "Pishquow", "tomorn", + "apopenptic", "Haversian", + "Hysterocarpus", "ten", + "outhue", "Bertat", + "mechanist", "asparaginic", + "velaric", "tonsure", + "bubble", "Pyrales", + "regardful", "glyphography", + "calabazilla", "shellworker", + "stradametrical", "havoc", + "theologicopolitical", "sawdust", + "diatomaceous", "jajman", + "temporomastoid", "Serrifera", + "Ochnaceae", "aspersor", + "trailmaking", "Bishareen", + "digitule", "octogynous", + "epididymitis", "smokefarthings", + "bacillite", "overcrown", + "mangonism", "sirrah", + "undecorated", "psychofugal", + "bismuthiferous", "rechar", + "Lemuridae", "frameable", + "thiodiazole", "Scanic", + "sportswomanship", "interruptedness", + "admissory", "osteopaedion", + "tingly", "tomorrowness", + "ethnocracy", "trabecular", + "vitally", "fossilism", + "adz", "metopon", + "prefatorial", "expiscate", + "diathermacy", "chronist", + "nigh", "generalizable", + "hysterogen", "aurothiosulphuric", + "whitlowwort", "downthrust", + "Protestantize", "monander", + "Itea", "chronographic", + "silicize", "Dunlop", + "eer", "componental", + "spot", "pamphlet", + "antineuritic", "paradisean", + "interruptor", "debellator", + "overcultured", "Florissant", + "hyocholic", "pneumatotherapy", + "tailoress", "rave", + "unpeople", "Sebastian", + "thermanesthesia", "Coniferae", + "swacking", "posterishness", + "ethmopalatal", "whittle", + "analgize", "scabbardless", + "naught", "symbiogenetically", + "trip", "parodist", + "columniform", "trunnel", + "yawler", "goodwill", + "pseudohalogen", "swangy", + "cervisial", "mediateness", + "genii", "imprescribable", + "pony", "consumptional", + "carposporangial", "poleax", + "bestill", "subfebrile", + "sapphiric", "arrowworm", + "qualminess", "ultraobscure", + "thorite", "Fouquieria", + "Bermudian", "prescriber", + "elemicin", "warlike", + "semiangle", "rotular", + "misthread", "returnability", + "seraphism", "precostal", + "quarried", "Babylonism", + "sangaree", "seelful", + "placatory", "pachydermous", + "bozal", "galbulus", + "spermaphyte", "cumbrousness", + "pope", "signifier", + "Endomycetaceae", "shallowish", + "sequacity", "periarthritis", + "bathysphere", "pentosuria", + "Dadaism", "spookdom", + "Consolamentum", "afterpressure", + "mutter", "louse", + "ovoviviparous", "corbel", + "metastoma", "biventer", + "Hydrangea", "hogmace", + "seizing", "nonsuppressed", + "oratorize", "uncarefully", + "benzothiofuran", "penult", + "balanocele", "macropterous", + "dishpan", "marten", + "absvolt", "jirble", + "parmelioid", "airfreighter", + "acocotl", "archesporial", + "hypoplastral", "preoral", + "quailberry", "cinque", + "terrestrially", "stroking", + "limpet", "moodishness", + "canicule", "archididascalian", + "pompiloid", "overstaid", + "introducer", "Italical", + "Christianopaganism", "prescriptible", + "subofficer", "danseuse", + "cloy", "saguran", + "frictionlessly", "deindividualization", + "Bulanda", "ventricous", + "subfoliar", "basto", + "scapuloradial", "suspend", + "stiffish", "Sphenodontidae", + "eternal", "verbid", + "mammonish", "upcushion", + "barkometer", "concretion", + "preagitate", "incomprehensible", + "tristich", "visceral", + "hemimelus", "patroller", + "stentorophonic", "pinulus", + "kerykeion", "brutism", + "monstership", "merciful", + "overinstruct", "defensibly", + "bettermost", "splenauxe", + "Mormyrus", "unreprimanded", + "taver", "ell", + "proacquittal", "infestation", + "overwoven", "Lincolnlike", + "chacona", "Tamil", + "classificational", "lebensraum", + "reeveland", "intuition", + "Whilkut", "focaloid", + "Eleusinian", "micromembrane", + "byroad", "nonrepetition", + "bacterioblast", "brag", + "ribaldrous", "phytoma", + "counteralliance", "pelvimetry", + "pelf", "relaster", + "thermoresistant", "aneurism", + "molossic", "euphonym", + "upswell", "ladhood", + "phallaceous", "inertly", + "gunshop", "stereotypography", + "laryngic", "refasten", + "twinling", "oflete", + "hepatorrhaphy", "electrotechnics", + "cockal", "guitarist", + "topsail", "Cimmerianism", + "larklike", "Llandovery", + "pyrocatechol", "immatchable", + "chooser", "metrocratic", + "craglike", "quadrennial", + "nonpoisonous", "undercolored", + "knob", "ultratense", + "balladmonger", "slait", + "sialadenitis", "bucketer", + "magnificently", "unstipulated", + "unscourged", "unsupercilious", + "packsack", "pansophism", + "soorkee", "percent", + "subirrigate", "champer", + "metapolitics", "spherulitic", + "involatile", "metaphonical", + "stachyuraceous", "speckedness", + "bespin", "proboscidiform", + "gul", "squit", + "yeelaman", "peristeropode", + "opacousness", "shibuichi", + "retinize", "yote", + "misexposition", "devilwise", + "pumpkinification", "vinny", + "bonze", "glossing", + "decardinalize", "transcortical", + "serphoid", "deepmost", + "guanajuatite", "wemless", + "arval", "lammy", + "Effie", "Saponaria", + "tetrahedral", "prolificy", + "excerpt", "dunkadoo", + "Spencerism", "insatiately", + "Gilaki", "oratorship", + "arduousness", "unbashfulness", + "Pithecolobium", "unisexuality", + "veterinarian", "detractive", + "liquidity", "acidophile", + "proauction", "sural", + "totaquina", "Vichyite", + "uninhabitedness", "allegedly", + "Gothish", "manny", + "Inger", "flutist", + "ticktick", "Ludgatian", + "homotransplant", "orthopedical", + "diminutively", "monogoneutic", + "Kenipsim", "sarcologist", + "drome", "stronghearted", + "Fameuse", "Swaziland", + "alen", "chilblain", + "beatable", "agglomeratic", + "constitutor", "tendomucoid", + "porencephalous", "arteriasis", + "boser", "tantivy", + "rede", "lineamental", + "uncontradictableness", "homeotypical", + "masa", "folious", + "dosseret", "neurodegenerative", + "subtransverse", "Chiasmodontidae", + "palaeotheriodont", "unstressedly", + "chalcites", "piquantness", + "lampyrine", "Aplacentalia", + "projecting", "elastivity", + "isopelletierin", "bladderwort", + "strander", "almud", + "iniquitously", "theologal", + "bugre", "chargeably", + "imperceptivity", "meriquinoidal", + "mesophyte", "divinator", + "perfunctory", "counterappellant", + "synovial", "charioteer", + "crystallographical", "comprovincial", + "infrastapedial", "pleasurehood", + "inventurous", "ultrasystematic", + "subangulated", "supraoesophageal", + "Vaishnavism", "transude", + "chrysochrous", "ungrave", + "reconciliable", "uninterpleaded", + "erlking", "wherefrom", + "aprosopia", "antiadiaphorist", + "metoxazine", "incalculable", + "umbellic", "predebit", + "foursquare", "unimmortal", + "nonmanufacture", "slangy", + "predisputant", "familist", + "preaffiliate", "friarhood", + "corelysis", "zoonitic", + "halloo", "paunchy", + "neuromimesis", "aconitine", + "hackneyed", "unfeeble", + "cubby", "autoschediastical", + "naprapath", "lyrebird", + "inexistency", "leucophoenicite", + "ferrogoslarite", "reperuse", + "uncombable", "tambo", + "propodiale", "diplomatize", + "Russifier", "clanned", + "corona", "michigan", + "nonutilitarian", "transcorporeal", + "bought", "Cercosporella", + "stapedius", "glandularly", + "pictorially", "weism", + "disilane", "rainproof", + "Caphtor", "scrubbed", + "oinomancy", "pseudoxanthine", + "nonlustrous", "redesertion", + "Oryzorictinae", "gala", + "Mycogone", "reappreciate", + "cyanoguanidine", "seeingness", + "breadwinner", "noreast", + "furacious", "epauliere", + "omniscribent", "Passiflorales", + "uninductive", "inductivity", + "Orbitolina", "Semecarpus", + "migrainoid", "steprelationship", + "phlogisticate", "mesymnion", + "sloped", "edificator", + "beneficent", "culm", + "paleornithology", "unurban", + "throbless", "amplexifoliate", + "sesquiquintile", "sapience", + "astucious", "dithery", + "boor", "ambitus", + "scotching", "uloid", + "uncompromisingness", "hoove", + "waird", "marshiness", + "Jerusalem", "mericarp", + "unevoked", "benzoperoxide", + "outguess", "pyxie", + "hymnic", "euphemize", + "mendacity", "erythremia", + "rosaniline", "unchatteled", + "lienteria", "Bushongo", + "dialoguer", "unrepealably", + "rivethead", "antideflation", + "vinegarish", "manganosiderite", + "doubtingness", "ovopyriform", + "Cephalodiscus", "Muscicapa", + "Animalivora", "angina", + "planispheric", "ipomoein", + "cuproiodargyrite", "sandbox", + "scrat", "Munnopsidae", + "shola", "pentafid", + "overstudiousness", "times", + "nonprofession", "appetible", + "valvulotomy", "goladar", + "uniarticular", "oxyterpene", + "unlapsing", "omega", + "trophonema", "seminonflammable", + "circumzenithal", "starer", + "depthwise", "liberatress", + "unleavened", "unrevolting", + "groundneedle", "topline", + "wandoo", "umangite", + "ordinant", "unachievable", + "oversand", "snare", + "avengeful", "unexplicit", + "mustafina", "sonable", + "rehabilitative", "eulogization", + "papery", "technopsychology", + "impressor", "cresylite", + "entame", "transudatory", + "scotale", "pachydermatoid", + "imaginary", "yeat", + "slipped", "stewardship", + "adatom", "cockstone", + "skyshine", "heavenful", + "comparability", "exprobratory", + "dermorhynchous", "parquet", + "cretaceous", "vesperal", + "raphis", "undangered", + "Glecoma", "engrain", + "counteractively", "Zuludom", + "orchiocatabasis", "Auriculariales", + "warriorwise", "extraorganismal", + "overbuilt", "alveolite", + "tetchy", "terrificness", + "widdle", "unpremonished", + "rebilling", "sequestrum", + "equiconvex", "heliocentricism", + "catabaptist", "okonite", + "propheticism", "helminthagogic", + "calycular", "giantly", + "wingable", "golem", + "unprovided", "commandingness", + "greave", "haply", + "doina", "depressingly", + "subdentate", "impairment", + "decidable", "neurotrophic", + "unpredict", "bicorporeal", + "pendulant", "flatman", + "intrabred", "toplike", + "Prosobranchiata", "farrantly", + "toxoplasmosis", "gorilloid", + "dipsomaniacal", "aquiline", + "atlantite", "ascitic", + "perculsive", "prospectiveness", + "saponaceous", "centrifugalization", + "dinical", "infravaginal", + "beadroll", "affaite", + "Helvidian", "tickleproof", + "abstractionism", "enhedge", + "outwealth", "overcontribute", + "coldfinch", "gymnastic", + "Pincian", "Munychian", + "codisjunct", "quad", + "coracomandibular", "phoenicochroite", + "amender", "selectivity", + "putative", "semantician", + "lophotrichic", "Spatangoidea", + "saccharogenic", "inferent", + "Triconodonta", "arrendation", + "sheepskin", "taurocolla", + "bunghole", "Machiavel", + "triakistetrahedral", "dehairer", + "prezygapophysial", "cylindric", + "pneumonalgia", "sleigher", + "emir", "Socraticism", + "licitness", "massedly", + "instructiveness", "sturdied", + "redecrease", "starosta", + "evictor", "orgiastic", + "squdge", "meloplasty", + "Tsonecan", "repealableness", + "swoony", "myesthesia", + "molecule", "autobiographist", + "reciprocation", "refective", + "unobservantness", "tricae", + "ungouged", "floatability", + "Mesua", "fetlocked", + "chordacentrum", "sedentariness", + "various", "laubanite", + "nectopod", "zenick", + "sequentially", "analgic", + "biodynamics", "posttraumatic", + "nummi", "pyroacetic", + "bot", "redescend", + "dispermy", "undiffusive", + "circular", "trillion", + "Uraniidae", "ploration", + "discipular", "potentness", + "sud", "Hu", + "Eryon", "plugger", + "subdrainage", "jharal", + "abscission", "supermarket", + "countergabion", "glacierist", + "lithotresis", "minniebush", + "zanyism", "eucalypteol", + "sterilely", "unrealize", + "unpatched", "hypochondriacism", + "critically", "cheesecutter", + }; +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomWriter.java new file mode 100644 index 00000000000..def9e546ebb --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomWriter.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.ClusterStatus; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * This program uses map/reduce to just run a distributed job where there is + * no interaction between the tasks and each task write a large unsorted + * random binary sequence file of BytesWritable. + * In order for this program to generate data for terasort with 10-byte keys + * and 90-byte values, have the following config: + * + * <?xml version="1.0"?> + * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + * <configuration> + * <property> + * <name>mapreduce.randomwriter.minkey</name> + * <value>10</value> + * </property> + * <property> + * <name>mapreduce.randomwriter.maxkey</name> + * <value>10</value> + * </property> + * <property> + * <name>mapreduce.randomwriter.minvalue</name> + * <value>90</value> + * </property> + * <property> + * <name>mapreduce.randomwriter.maxvalue</name> + * <value>90</value> + * </property> + * <property> + * <name>mapreduce.randomwriter.totalbytes</name> + * <value>1099511627776</value> + * </property> + * </configuration> + * + * Equivalently, {@link RandomWriter} also supports all the above options + * and ones supported by {@link GenericOptionsParser} via the command-line. + */ +public class RandomWriter extends Configured implements Tool { + public static final String TOTAL_BYTES = "mapreduce.randomwriter.totalbytes"; + public static final String BYTES_PER_MAP = + "mapreduce.randomwriter.bytespermap"; + public static final String MAPS_PER_HOST = + "mapreduce.randomwriter.mapsperhost"; + public static final String MAX_VALUE = "mapreduce.randomwriter.maxvalue"; + public static final String MIN_VALUE = "mapreduce.randomwriter.minvalue"; + public static final String MIN_KEY = "mapreduce.randomwriter.minkey"; + public static final String MAX_KEY = "mapreduce.randomwriter.maxkey"; + + /** + * User counters + */ + static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN } + + /** + * A custom input format that creates virtual inputs of a single string + * for each map. + */ + static class RandomInputFormat extends InputFormat { + + /** + * Generate the requested number of file splits, with the filename + * set to the filename of the output file. + */ + public List getSplits(JobContext job) throws IOException { + List result = new ArrayList(); + Path outDir = FileOutputFormat.getOutputPath(job); + int numSplits = + job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); + for(int i=0; i < numSplits; ++i) { + result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, + (String[])null)); + } + return result; + } + + /** + * Return a single record (filename, "") where the filename is taken from + * the file split. + */ + static class RandomRecordReader extends RecordReader { + Path name; + Text key = null; + Text value = new Text(); + public RandomRecordReader(Path p) { + name = p; + } + + public void initialize(InputSplit split, + TaskAttemptContext context) + throws IOException, InterruptedException { + + } + + public boolean nextKeyValue() { + if (name != null) { + key = new Text(); + key.set(name.getName()); + name = null; + return true; + } + return false; + } + + public Text getCurrentKey() { + return key; + } + + public Text getCurrentValue() { + return value; + } + + public void close() {} + + public float getProgress() { + return 0.0f; + } + } + + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new RandomRecordReader(((FileSplit) split).getPath()); + } + } + + static class RandomMapper extends Mapper { + + private long numBytesToWrite; + private int minKeySize; + private int keySizeRange; + private int minValueSize; + private int valueSizeRange; + private Random random = new Random(); + private BytesWritable randomKey = new BytesWritable(); + private BytesWritable randomValue = new BytesWritable(); + + private void randomizeBytes(byte[] data, int offset, int length) { + for(int i=offset + length - 1; i >= offset; --i) { + data[i] = (byte) random.nextInt(256); + } + } + + /** + * Given an output filename, write a bunch of random records to it. + */ + public void map(WritableComparable key, + Writable value, + Context context) throws IOException,InterruptedException { + int itemCount = 0; + while (numBytesToWrite > 0) { + int keyLength = minKeySize + + (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0); + randomKey.setSize(keyLength); + randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength()); + int valueLength = minValueSize + + (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0); + randomValue.setSize(valueLength); + randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength()); + context.write(randomKey, randomValue); + numBytesToWrite -= keyLength + valueLength; + context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength); + context.getCounter(Counters.RECORDS_WRITTEN).increment(1); + if (++itemCount % 200 == 0) { + context.setStatus("wrote record " + itemCount + ". " + + numBytesToWrite + " bytes left."); + } + } + context.setStatus("done with " + itemCount + " records."); + } + + /** + * Save the values out of the configuaration that we need to write + * the data. + */ + @Override + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + numBytesToWrite = conf.getLong(BYTES_PER_MAP, + 1*1024*1024*1024); + minKeySize = conf.getInt(MIN_KEY, 10); + keySizeRange = + conf.getInt(MAX_KEY, 1000) - minKeySize; + minValueSize = conf.getInt(MIN_VALUE, 0); + valueSizeRange = + conf.getInt(MAX_VALUE, 20000) - minValueSize; + } + } + + /** + * This is the main routine for launching a distributed random write job. + * It runs 10 maps/node and each node writes 1 gig of data to a DFS file. + * The reduce doesn't do anything. + * + * @throws IOException + */ + public int run(String[] args) throws Exception { + if (args.length == 0) { + System.out.println("Usage: writer "); + ToolRunner.printGenericCommandUsage(System.out); + return 2; + } + + Path outDir = new Path(args[0]); + Configuration conf = getConf(); + JobClient client = new JobClient(conf); + ClusterStatus cluster = client.getClusterStatus(); + int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); + long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, + 1*1024*1024*1024); + if (numBytesToWritePerMap == 0) { + System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0"); + return -2; + } + long totalBytesToWrite = conf.getLong(TOTAL_BYTES, + numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers()); + int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); + if (numMaps == 0 && totalBytesToWrite > 0) { + numMaps = 1; + conf.setLong(BYTES_PER_MAP, totalBytesToWrite); + } + conf.setInt(MRJobConfig.NUM_MAPS, numMaps); + + Job job = new Job(conf); + + job.setJarByClass(RandomWriter.class); + job.setJobName("random-writer"); + FileOutputFormat.setOutputPath(job, outDir); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(BytesWritable.class); + job.setInputFormatClass(RandomInputFormat.class); + job.setMapperClass(RandomMapper.class); + job.setReducerClass(Reducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + System.out.println("Running " + numMaps + " maps."); + + // reducer NONE + job.setNumReduceTasks(0); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date endTime = new Date(); + System.out.println("Job ended: " + endTime); + System.out.println("The job took " + + (endTime.getTime() - startTime.getTime()) /1000 + + " seconds."); + + return ret; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new RandomWriter(), args); + System.exit(res); + } + +} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml index 15f81e26f54..ac365b20106 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml @@ -97,9 +97,9 @@ - org.apache.maven.plugins + org.apache.maven.plugins maven-jar-plugin - + org.apache.hadoop.examples.ExampleDriver From a02f07c4bc7174f1470864b696616d34b95d37f8 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Wed, 25 Jan 2012 06:30:09 +0000 Subject: [PATCH 6/6] MAPREDUCE-3630. Fixes a NullPointer exception while running TeraGen - if a map is asked to generate 0 records. (Contributed by Mahadev Konar) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1235641 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../java/org/apache/hadoop/examples/terasort/TeraGen.java | 5 +++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index aedea7f0b3d..0214a638a76 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -555,6 +555,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3717. JobClient test jar has missing files to run all the test programs. (mahadev) + MAPREDUCE-3630. Fixes a NullPointer exception while running TeraGen - if a + map is asked to generate 0 records. (Mahadev Konar via sseth) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraGen.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraGen.java index 9880d54003e..7e679343b4b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraGen.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraGen.java @@ -238,7 +238,9 @@ public class TeraGen extends Configured implements Tool { @Override public void cleanup(Context context) { - checksumCounter.increment(total.getLow8()); + if (checksumCounter != null) { + checksumCounter.increment(total.getLow8()); + } } } @@ -307,5 +309,4 @@ public class TeraGen extends Configured implements Tool { int res = ToolRunner.run(new Configuration(), new TeraGen(), args); System.exit(res); } - }