svn merge -c 1328031 from trunk. FIXES: MAPREDUCE-4159. Job is running in Uber mode after setting "mapreduce.job.ubertask.maxreduces" to zero (Devaraj K via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1328033 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-04-19 16:23:35 +00:00
parent ebb3792a6a
commit a18ad09c94
3 changed files with 81 additions and 5 deletions

View File

@ -263,6 +263,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4074. Client continuously retries to RM When RM goes down MAPREDUCE-4074. Client continuously retries to RM When RM goes down
before launching Application Master (xieguiming via tgraves) before launching Application Master (xieguiming via tgraves)
MAPREDUCE-4159. Job is running in Uber mode after setting
"mapreduce.job.ubertask.maxreduces" to zero (Devaraj K via bobby)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -822,9 +822,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
//FIXME: handling multiple reduces within a single AM does not seem to //FIXME: handling multiple reduces within a single AM does not seem to
//work. //work.
// int sysMaxReduces = int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
// job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1); boolean isValidUberMaxReduces = (sysMaxReduces == 0)
int sysMaxReduces = 1; || (sysMaxReduces == 1);
long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
fs.getDefaultBlockSize()); // FIXME: this is wrong; get FS from fs.getDefaultBlockSize()); // FIXME: this is wrong; get FS from
@ -856,7 +856,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
// and thus requires sequential execution. // and thus requires sequential execution.
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
&& smallInput && smallMemory && notChainJob; && smallInput && smallMemory && notChainJob && isValidUberMaxReduces;
if (isUber) { if (isUber) {
LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+" LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
@ -889,7 +889,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
if (!smallMemory) if (!smallMemory)
msg.append(" too much RAM;"); msg.append(" too much RAM;");
if (!notChainJob) if (!notChainJob)
msg.append(" chainjob"); msg.append(" chainjob;");
if (!isValidUberMaxReduces)
msg.append(" not supported uber max reduces");
LOG.info(msg.toString()); LOG.info(msg.toString());
} }
} }

View File

@ -37,14 +37,20 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -233,4 +239,69 @@ public class TestJobImpl {
Assert.assertTrue(job5.checkAccess(ugi1, null)); Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null)); Assert.assertTrue(job5.checkAccess(ugi2, null));
} }
@Test
public void testUberDecision() throws Exception {
// with default values, no of maps is 2
Configuration conf = new Configuration();
boolean isUber = testUberDecision(conf);
Assert.assertFalse(isUber);
// enable uber mode, no of maps is 2
conf = new Configuration();
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
isUber = testUberDecision(conf);
Assert.assertTrue(isUber);
// enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
// reduces is 0
conf = new Configuration();
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 0);
conf.setInt(MRJobConfig.NUM_REDUCES, 1);
isUber = testUberDecision(conf);
Assert.assertFalse(isUber);
// enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
// reduces is 1
conf = new Configuration();
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
conf.setInt(MRJobConfig.NUM_REDUCES, 1);
isUber = testUberDecision(conf);
Assert.assertTrue(isUber);
// enable uber mode, no of maps is 2 and uber task max maps is 0
conf = new Configuration();
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1);
isUber = testUberDecision(conf);
Assert.assertFalse(isUber);
}
private boolean testUberDecision(Configuration conf) {
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
MRAppMetrics mrAppMetrics = MRAppMetrics.create();
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, null,
mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null);
InitTransition initTransition = getInitTransition();
JobEvent mockJobEvent = mock(JobEvent.class);
initTransition.transition(job, mockJobEvent);
boolean isUber = job.isUber();
return isUber;
}
private InitTransition getInitTransition() {
InitTransition initTransition = new InitTransition() {
@Override
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(),
new TaskSplitMetaInfo() };
}
};
return initTransition;
}
} }