diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml
index 57f3c66dad3..281ce0ddcdd 100644
--- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml
@@ -127,6 +127,17 @@
false
+
+
+ org.apache.hadoop:hadoop-mapreduce-client-jobclient
+
+
+ tests
+ share/hadoop/${hadoop.component}
+ false
+ false
+
+
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 799ce495ec8..0214a638a76 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -189,6 +189,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3692. yarn-resourcemanager out and log files can get big. (eli)
+ MAPREDUCE-3710. Improved FileInputFormat to return better locality for the
+ last split. (Siddarth Seth via vinodkv)
+
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@@ -540,6 +543,21 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3681. Fixed computation of queue's usedCapacity. (acmurthy)
+ MAPREDUCE-3505. yarn APPLICATION_CLASSPATH needs to be overridable.
+ (ahmed via tucu)
+
+ MAPREDUCE-3714. Fixed EventFetcher and Fetcher threads to shut-down properly
+ so that reducers don't hang in corner cases. (vinodkv)
+
+ MAPREDUCE-3712. The mapreduce tar does not contain the hadoop-mapreduce-client-
+ jobclient-tests.jar. (mahadev)
+
+ MAPREDUCE-3717. JobClient test jar has missing files to run all the test programs.
+ (mahadev)
+
+ MAPREDUCE-3630. Fixes a NullPointer exception while running TeraGen - if a
+ map is asked to generate 0 records. (Mahadev Konar via sseth)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index b296d02d55f..7cca98031d5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -522,13 +522,13 @@ public abstract class TaskAttemptImpl implements
* a parent CLC and use it for all the containers, so this should go away
* once the mr-generated-classpath stuff is gone.
*/
- private static String getInitialClasspath() throws IOException {
+ private static String getInitialClasspath(Configuration conf) throws IOException {
synchronized (classpathLock) {
if (initialClasspathFlag.get()) {
return initialClasspath;
}
Map env = new HashMap();
- MRApps.setClasspath(env);
+ MRApps.setClasspath(env, conf);
initialClasspath = env.get(Environment.CLASSPATH.name());
initialClasspathFlag.set(true);
return initialClasspath;
@@ -631,7 +631,7 @@ public abstract class TaskAttemptImpl implements
Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
- getInitialClasspath());
+ getInitialClasspath(conf));
} catch (IOException e) {
throw new YarnException(e);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
index cb199ac70a9..e33e589c9e2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
@@ -38,6 +38,10 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.apache.hadoop
+ hadoop-yarn-server-common
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
index cb802f1c5ad..129996e8193 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -171,7 +172,7 @@ public class MRApps extends Apps {
}
private static void setMRFrameworkClasspath(
- Map environment) throws IOException {
+ Map environment, Configuration conf) throws IOException {
InputStream classpathFileStream = null;
BufferedReader reader = null;
try {
@@ -208,8 +209,10 @@ public class MRApps extends Apps {
}
// Add standard Hadoop classes
- for (String c : ApplicationConstants.APPLICATION_CLASSPATH) {
- Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c);
+ for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)
+ .split(",")) {
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
+ .trim());
}
} finally {
if (classpathFileStream != null) {
@@ -222,8 +225,8 @@ public class MRApps extends Apps {
// TODO: Remove duplicates.
}
- public static void setClasspath(Map environment)
- throws IOException {
+ public static void setClasspath(Map environment,
+ Configuration conf) throws IOException {
Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
@@ -232,7 +235,7 @@ public class MRApps extends Apps {
environment,
Environment.CLASSPATH.name(),
Environment.PWD.$() + Path.SEPARATOR + "*");
- MRApps.setMRFrameworkClasspath(environment);
+ MRApps.setMRFrameworkClasspath(environment, conf);
}
private static final String STAGING_CONSTANT = ".staging";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
index 11589980625..fd9d1d23281 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
@@ -18,7 +18,12 @@
package org.apache.hadoop.mapreduce.v2.util;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -121,4 +126,17 @@ public class TestMRApps {
"/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);
}
+ @Test public void testSetClasspath() throws IOException {
+ Job job = Job.getInstance();
+ Map environment = new HashMap();
+ MRApps.setClasspath(environment, job.getConfiguration());
+ assertEquals("job.jar:$PWD/*:$HADOOP_CONF_DIR:" +
+ "$HADOOP_COMMON_HOME/share/hadoop/common/*:" +
+ "$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:" +
+ "$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:" +
+ "$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:" +
+ "$YARN_HOME/share/hadoop/mapreduce/*:" +
+ "$YARN_HOME/share/hadoop/mapreduce/lib/*",
+ environment.get("CLASSPATH"));
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
index a7e59562bd5..aaf3c26b789 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
@@ -289,8 +289,10 @@ public abstract class FileInputFormat implements InputFormat {
}
if (bytesRemaining != 0) {
- splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
- blkLocations[blkLocations.length-1].getHosts()));
+ String[] splitHosts = getSplitHosts(blkLocations, length
+ - bytesRemaining, bytesRemaining, clusterMap);
+ splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
+ splitHosts));
}
} else if (length != 0) {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
index 781715dbeef..d86ad156bda 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
@@ -286,8 +286,9 @@ public abstract class FileInputFormat extends InputFormat {
}
if (bytesRemaining != 0) {
+ int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
- blkLocations[blkLocations.length-1].getHosts()));
+ blkLocations[blkIndex].getHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
index 6facb47aa21..fd80ec2b1e9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+@SuppressWarnings("deprecation")
class EventFetcher extends Thread {
private static final long SLEEP_TIME = 1000;
private static final int MAX_EVENTS_TO_FETCH = 10000;
@@ -41,6 +42,8 @@ class EventFetcher extends Thread {
private ExceptionReporter exceptionReporter = null;
private int maxMapRuntime = 0;
+
+ private volatile boolean stopped = false;
public EventFetcher(TaskAttemptID reduce,
TaskUmbilicalProtocol umbilical,
@@ -60,7 +63,7 @@ class EventFetcher extends Thread {
LOG.info(reduce + " Thread started: " + getName());
try {
- while (true && !Thread.currentThread().isInterrupted()) {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
int numNewMaps = getMapCompletionEvents();
failures = 0;
@@ -71,6 +74,9 @@ class EventFetcher extends Thread {
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME);
}
+ } catch (InterruptedException e) {
+ LOG.info("EventFetcher is interrupted.. Returning");
+ return;
} catch (IOException ie) {
LOG.info("Exception in getting events", ie);
// check to see whether to abort
@@ -90,6 +96,16 @@ class EventFetcher extends Thread {
return;
}
}
+
+ public void shutDown() {
+ this.stopped = true;
+ interrupt();
+ try {
+ join(5000);
+ } catch(InterruptedException ie) {
+ LOG.warn("Got interrupted while joining " + getName(), ie);
+ }
+ }
/**
* Queries the {@link TaskTracker} for a set of map-completion events
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 5a213f05c1a..93200873d6b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
+@SuppressWarnings({"deprecation"})
class Fetcher extends Thread {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -88,6 +89,8 @@ class Fetcher extends Thread {
private final Decompressor decompressor;
private final SecretKey jobTokenSecret;
+ private volatile boolean stopped = false;
+
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler scheduler, MergeManager merger,
Reporter reporter, ShuffleClientMetrics metrics,
@@ -135,7 +138,7 @@ class Fetcher extends Thread {
public void run() {
try {
- while (true && !Thread.currentThread().isInterrupted()) {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
@@ -160,7 +163,17 @@ class Fetcher extends Thread {
exceptionReporter.reportException(t);
}
}
-
+
+ public void shutDown() throws InterruptedException {
+ this.stopped = true;
+ interrupt();
+ try {
+ join(5000);
+ } catch (InterruptedException ie) {
+ LOG.warn("Got interrupt while joining " + getName(), ie);
+ }
+ }
+
/**
* The crux of the matter...
*
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
index 4b8b854952c..e7d7d71d079 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
@@ -33,17 +31,17 @@ import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
-import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
@InterfaceAudience.Private
@InterfaceStability.Unstable
+@SuppressWarnings({"deprecation", "unchecked", "rawtypes"})
public class Shuffle implements ExceptionReporter {
- private static final Log LOG = LogFactory.getLog(Shuffle.class);
private static final int PROGRESS_FREQUENCY = 2000;
private final TaskAttemptID reduceId;
@@ -100,7 +98,6 @@ public class Shuffle implements ExceptionReporter {
this, mergePhase, mapOutputFile);
}
- @SuppressWarnings("unchecked")
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Start the map-completion events fetcher thread
final EventFetcher eventFetcher =
@@ -130,19 +127,11 @@ public class Shuffle implements ExceptionReporter {
}
// Stop the event-fetcher thread
- eventFetcher.interrupt();
- try {
- eventFetcher.join();
- } catch(Throwable t) {
- LOG.info("Failed to stop " + eventFetcher.getName(), t);
- }
+ eventFetcher.shutDown();
// Stop the map-output fetcher threads
for (Fetcher fetcher : fetchers) {
- fetcher.interrupt();
- }
- for (Fetcher fetcher : fetchers) {
- fetcher.join();
+ fetcher.shutDown();
}
fetchers = null;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
index e59dd0ce83b..b12c09aae18 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
@@ -102,6 +102,13 @@
test-compile
+
+
+
+ org.apache.hadoop.test.MapredTestDriver
+
+
+
org.apache.maven.plugins
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 3513d52859a..da48e9c0249 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -406,7 +406,7 @@ public class YARNRunner implements ClientProtocol {
// Setup the CLASSPATH in environment
// i.e. add { job jar, CWD, Hadoop jars} to classpath.
Map environment = new HashMap();
- MRApps.setClasspath(environment);
+ MRApps.setClasspath(environment, conf);
// Parse distributed cache
MRApps.setupDistributedCache(jobConf, localResources);
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
similarity index 99%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
index bb6d9774e35..f67ca1c17ee 100644
--- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
@@ -29,7 +29,6 @@ import java.util.Stack;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.examples.RandomTextWriter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,6 +39,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.RandomTextWriter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
index 3476e635c5c..fca9b358647 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.mapred;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.DataOutputStream;
import java.io.IOException;
@@ -32,6 +36,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
+@SuppressWarnings("deprecation")
public class TestFileInputFormat extends TestCase {
Configuration conf = new Configuration();
@@ -186,6 +191,102 @@ public class TestFileInputFormat extends TestCase {
assertEquals(splits.length, 2);
}
+ @SuppressWarnings("rawtypes")
+ public void testLastInputSplitAtSplitBoundary() throws Exception {
+ FileInputFormat fif = new FileInputFormatForTest(1024l * 1024 * 1024,
+ 128l * 1024 * 1024);
+ JobConf job = new JobConf();
+ InputSplit[] splits = fif.getSplits(job, 8);
+ assertEquals(8, splits.length);
+ for (int i = 0; i < splits.length; i++) {
+ InputSplit split = splits[i];
+ assertEquals(("host" + i), split.getLocations()[0]);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void testLastInputSplitExceedingSplitBoundary() throws Exception {
+ FileInputFormat fif = new FileInputFormatForTest(1027l * 1024 * 1024,
+ 128l * 1024 * 1024);
+ JobConf job = new JobConf();
+ InputSplit[] splits = fif.getSplits(job, 8);
+ assertEquals(8, splits.length);
+ for (int i = 0; i < splits.length; i++) {
+ InputSplit split = splits[i];
+ assertEquals(("host" + i), split.getLocations()[0]);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void testLastInputSplitSingleSplit() throws Exception {
+ FileInputFormat fif = new FileInputFormatForTest(100l * 1024 * 1024,
+ 128l * 1024 * 1024);
+ JobConf job = new JobConf();
+ InputSplit[] splits = fif.getSplits(job, 1);
+ assertEquals(1, splits.length);
+ for (int i = 0; i < splits.length; i++) {
+ InputSplit split = splits[i];
+ assertEquals(("host" + i), split.getLocations()[0]);
+ }
+ }
+
+ private class FileInputFormatForTest extends FileInputFormat {
+
+ long splitSize;
+ long length;
+
+ FileInputFormatForTest(long length, long splitSize) {
+ this.length = length;
+ this.splitSize = splitSize;
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+ return null;
+ }
+
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ FileStatus mockFileStatus = mock(FileStatus.class);
+ when(mockFileStatus.getBlockSize()).thenReturn(splitSize);
+ when(mockFileStatus.isDirectory()).thenReturn(false);
+ Path mockPath = mock(Path.class);
+ FileSystem mockFs = mock(FileSystem.class);
+
+ BlockLocation[] blockLocations = mockBlockLocations(length, splitSize);
+ when(mockFs.getFileBlockLocations(mockFileStatus, 0, length)).thenReturn(
+ blockLocations);
+ when(mockPath.getFileSystem(any(Configuration.class))).thenReturn(mockFs);
+
+ when(mockFileStatus.getPath()).thenReturn(mockPath);
+ when(mockFileStatus.getLen()).thenReturn(length);
+
+ FileStatus[] fs = new FileStatus[1];
+ fs[0] = mockFileStatus;
+ return fs;
+ }
+
+ @Override
+ protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
+ return splitSize;
+ }
+
+ private BlockLocation[] mockBlockLocations(long size, long splitSize) {
+ int numLocations = (int) (size / splitSize);
+ if (size % splitSize != 0)
+ numLocations++;
+ BlockLocation[] blockLocations = new BlockLocation[numLocations];
+ for (int i = 0; i < numLocations; i++) {
+ String[] names = new String[] { "b" + i };
+ String[] hosts = new String[] { "host" + i };
+ blockLocations[i] = new BlockLocation(names, hosts, i * splitSize,
+ Math.min(splitSize, size - (splitSize * i)));
+ }
+ return blockLocations;
+ }
+ }
+
static void writeFile(Configuration conf, Path name,
short replication, int numBlocks) throws IOException {
FileSystem fileSys = FileSystem.get(conf);
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/ThreadedMapBenchmark.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ThreadedMapBenchmark.java
similarity index 99%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/ThreadedMapBenchmark.java
rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ThreadedMapBenchmark.java
index f2a2e236dd4..f5512327f56 100644
--- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/ThreadedMapBenchmark.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ThreadedMapBenchmark.java
@@ -25,7 +25,6 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.examples.RandomWriter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
similarity index 99%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
index 268b126cc73..7ee28dfa32a 100644
--- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
@@ -29,7 +29,6 @@ import java.util.Stack;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.examples.RandomTextWriter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java
new file mode 100644
index 00000000000..c3f6476d7b1
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java
@@ -0,0 +1,757 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * This program uses map/reduce to just run a distributed job where there is
+ * no interaction between the tasks and each task writes a large unsorted
+ * random sequence of words.
+ * In order for this program to generate data for terasort with a 5-10 words
+ * per key and 20-100 words per value, have the following config:
+ *
+ *
+ *
+ *
+ *
+ * mapreduce.randomtextwriter.minwordskey
+ * 5
+ *
+ *
+ * mapreduce.randomtextwriter.maxwordskey
+ * 10
+ *
+ *
+ * mapreduce.randomtextwriter.minwordsvalue
+ * 20
+ *
+ *
+ * mapreduce.randomtextwriter.maxwordsvalue
+ * 100
+ *
+ *
+ * mapreduce.randomtextwriter.totalbytes
+ * 1099511627776
+ *
+ *
+ *
+ * Equivalently, {@link RandomTextWriter} also supports all the above options
+ * and ones supported by {@link Tool} via the command-line.
+ *
+ * To run: bin/hadoop jar hadoop-${version}-examples.jar randomtextwriter
+ * [-outFormat output format class] output
+ */
+public class RandomTextWriter extends Configured implements Tool {
+ public static final String TOTAL_BYTES =
+ "mapreduce.randomtextwriter.totalbytes";
+ public static final String BYTES_PER_MAP =
+ "mapreduce.randomtextwriter.bytespermap";
+ public static final String MAPS_PER_HOST =
+ "mapreduce.randomtextwriter.mapsperhost";
+ public static final String MAX_VALUE = "mapreduce.randomtextwriter.maxwordsvalue";
+ public static final String MIN_VALUE = "mapreduce.randomtextwriter.minwordsvalue";
+ public static final String MIN_KEY = "mapreduce.randomtextwriter.minwordskey";
+ public static final String MAX_KEY = "mapreduce.randomtextwriter.maxwordskey";
+
+ static int printUsage() {
+ System.out.println("randomtextwriter " +
+ "[-outFormat