diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ad747453a81..ed3022a1628 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -72,6 +72,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-5612. Add javadoc for TaskCompletionEvent.Status. (Chris Palmer via aajisaka) + MAPREDUCE-6248. Exposed the internal MapReduce job's information as a public + API in DistCp. (Jing Zhao via vinodkv) + OPTIMIZATIONS MAPREDUCE-6169. MergeQueue should release reference to the current item diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index d202f0a9bdf..08c88bfc105 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -20,6 +20,8 @@ package org.apache.hadoop.tools; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -51,12 +53,14 @@ import com.google.common.annotations.VisibleForTesting; * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune * behaviour. */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public class DistCp extends Configured implements Tool { /** - * Priority of the ResourceManager shutdown hook. + * Priority of the shutdown hook. */ - public static final int SHUTDOWN_HOOK_PRIORITY = 30; + static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final Log LOG = LogFactory.getLog(DistCp.class); @@ -66,7 +70,7 @@ public class DistCp extends Configured implements Tool { private static final String PREFIX = "_distcp"; private static final String WIP_PREFIX = "._WIP_"; private static final String DISTCP_DEFAULT_XML = "distcp-default.xml"; - public static final Random rand = new Random(); + static final Random rand = new Random(); private boolean submitted; private FileSystem jobFS; @@ -90,7 +94,7 @@ public class DistCp extends Configured implements Tool { * To be used with the ToolRunner. Not for public consumption. */ @VisibleForTesting - public DistCp() {} + DistCp() {} /** * Implementation of Tool::run(). Orchestrates the copy of source file(s) @@ -100,6 +104,7 @@ public class DistCp extends Configured implements Tool { * @param argv List of arguments passed to DistCp, from the ToolRunner. * @return On success, it returns 0. Else, -1. */ + @Override public int run(String[] argv) { if (argv.length < 1) { OptionsParser.usage(); @@ -145,9 +150,21 @@ public class DistCp extends Configured implements Tool { * @throws Exception, on failure. */ public Job execute() throws Exception { + Job job = createAndSubmitJob(); + + if (inputOptions.shouldBlock()) { + waitForJobCompletion(job); + } + return job; + } + + /** + * Create and submit the mapreduce job. + * @return The mapreduce job object that has been submitted + */ + public Job createAndSubmitJob() throws Exception { assert inputOptions != null; assert getConf() != null; - Job job = null; try { synchronized(this) { @@ -169,15 +186,23 @@ public class DistCp extends Configured implements Tool { String jobID = job.getJobID().toString(); job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); - LOG.info("DistCp job-id: " + jobID); - if (inputOptions.shouldBlock() && !job.waitForCompletion(true)) { - throw new IOException("DistCp failure: Job " + jobID + " has failed: " - + job.getStatus().getFailureInfo()); - } + return job; } + /** + * Wait for the given job to complete. + * @param job the given mapreduce job that has already been submitted + */ + public void waitForJobCompletion(Job job) throws Exception { + assert job != null; + if (!job.waitForCompletion(true)) { + throw new IOException("DistCp failure: Job " + job.getJobID() + + " has failed: " + job.getStatus().getFailureInfo()); + } + } + /** * Set targetPathExists in both inputOptions and job config, * for the benefit of CopyCommitter @@ -436,7 +461,7 @@ public class DistCp extends Configured implements Tool { private static class Cleanup implements Runnable { private final DistCp distCp; - public Cleanup(DistCp distCp) { + Cleanup(DistCp distCp) { this.distCp = distCp; }