diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 48c147be01a..2df0cd7752c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -370,6 +370,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4074. Client continuously retries to RM When RM goes down before launching Application Master (xieguiming via tgraves) + MAPREDUCE-4159. Job is running in Uber mode after setting + "mapreduce.job.ubertask.maxreduces" to zero (Devaraj K via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 6bc78fbfb9b..363541121dc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -822,9 +822,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, //FIXME: handling multiple reduces within a single AM does not seem to //work. - // int sysMaxReduces = - // job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1); - int sysMaxReduces = 1; + int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1); + boolean isValidUberMaxReduces = (sysMaxReduces == 0) + || (sysMaxReduces == 1); long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, fs.getDefaultBlockSize()); // FIXME: this is wrong; get FS from @@ -856,7 +856,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks // and thus requires sequential execution. isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks - && smallInput && smallMemory && notChainJob; + && smallInput && smallMemory && notChainJob && isValidUberMaxReduces; if (isUber) { LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+" @@ -889,7 +889,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, if (!smallMemory) msg.append(" too much RAM;"); if (!notChainJob) - msg.append(" chainjob"); + msg.append(" chainjob;"); + if (!isValidUberMaxReduces) + msg.append(" not supported uber max reduces"); LOG.info(msg.toString()); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index fb4dafcdb0f..4fea3fb233a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -37,14 +37,20 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition; +import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -233,4 +239,69 @@ public class TestJobImpl { Assert.assertTrue(job5.checkAccess(ugi1, null)); Assert.assertTrue(job5.checkAccess(ugi2, null)); } + @Test + public void testUberDecision() throws Exception { + + // with default values, no of maps is 2 + Configuration conf = new Configuration(); + boolean isUber = testUberDecision(conf); + Assert.assertFalse(isUber); + + // enable uber mode, no of maps is 2 + conf = new Configuration(); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true); + isUber = testUberDecision(conf); + Assert.assertTrue(isUber); + + // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max + // reduces is 0 + conf = new Configuration(); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true); + conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 0); + conf.setInt(MRJobConfig.NUM_REDUCES, 1); + isUber = testUberDecision(conf); + Assert.assertFalse(isUber); + + // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max + // reduces is 1 + conf = new Configuration(); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true); + conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1); + conf.setInt(MRJobConfig.NUM_REDUCES, 1); + isUber = testUberDecision(conf); + Assert.assertTrue(isUber); + + // enable uber mode, no of maps is 2 and uber task max maps is 0 + conf = new Configuration(); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true); + conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1); + isUber = testUberDecision(conf); + Assert.assertFalse(isUber); + } + + private boolean testUberDecision(Configuration conf) { + JobID jobID = JobID.forName("job_1234567890000_0001"); + JobId jobId = TypeConverter.toYarn(jobID); + MRAppMetrics mrAppMetrics = MRAppMetrics.create(); + JobImpl job = new JobImpl(jobId, Records + .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class), + null, mock(JobTokenSecretManager.class), null, null, null, + mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null); + InitTransition initTransition = getInitTransition(); + JobEvent mockJobEvent = mock(JobEvent.class); + initTransition.transition(job, mockJobEvent); + boolean isUber = job.isUber(); + return isUber; + } + + private InitTransition getInitTransition() { + InitTransition initTransition = new InitTransition() { + @Override + protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { + return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(), + new TaskSplitMetaInfo() }; + } + }; + return initTransition; + } }