MAPREDUCE-5870. Support for passing Job priority through Application Submission Context in Mapreduce Side. Contributed by Sunil G

This commit is contained in:
Jason Lowe 2015-11-24 22:07:26 +00:00
parent cab3c7c889
commit f634505d48
18 changed files with 402 additions and 32 deletions

View File

@ -434,6 +434,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6499. Add elapsed time for retired job in JobHistoryServer WebUI.
(Lin Yiqun via aajisaka)
MAPREDUCE-5870. Support for passing Job priority through Application
Submission Context in Mapreduce Side (Sunil G via jlowe)
OPTIMIZATIONS
MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.ClientRMProxy;
@ -146,6 +147,11 @@ protected synchronized void heartbeat() throws Exception {
if (token != null) {
updateAMRMToken(token);
}
Priority priorityFromResponse = Priority.newInstance(allocateResponse
.getApplicationPriority().getPriority());
// Update the job priority to Job directly.
getJob().setJobPriority(priorityFromResponse);
}
}

View File

@ -914,7 +914,7 @@ public void testJobPriorityUpdate() throws Exception {
assertJobState(job, JobStateInternal.RUNNING);
// Update priority of job to 8, and see whether its updated
Priority updatedPriority = Priority.newInstance(5);
Priority updatedPriority = Priority.newInstance(8);
job.setJobPriority(updatedPriority);
assertJobState(job, JobStateInternal.RUNNING);
Priority jobPriority = job.getReport().getJobPriority();

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.event.EventHandler;
@ -245,7 +246,7 @@ public AllocateResponse allocate(AllocateRequest request)
amToken.getIdentifier(), amToken.getKind().toString(),
amToken.getPassword(), amToken.getService().toString());
}
return AllocateResponse.newInstance(responseId,
AllocateResponse response = AllocateResponse.newInstance(responseId,
Collections.<ContainerStatus>emptyList(),
Collections.<Container>emptyList(),
Collections.<NodeReport>emptyList(),
@ -254,6 +255,8 @@ public AllocateResponse allocate(AllocateRequest request)
yarnToken,
Collections.<Container>emptyList(),
Collections.<Container>emptyList());
response.setApplicationPriority(Priority.newInstance(0));
return response;
}
}
}

View File

@ -84,6 +84,25 @@ public static JobId toYarn(org.apache.hadoop.mapreduce.JobID id) {
return jobId;
}
public static int toYarnApplicationPriority(String priority) {
JobPriority jobPriority = JobPriority.valueOf(priority);
switch (jobPriority) {
case VERY_HIGH :
return 5;
case HIGH :
return 4;
case NORMAL :
return 3;
case LOW :
return 2;
case VERY_LOW :
return 1;
case DEFAULT :
return 0;
}
throw new IllegalArgumentException("Unrecognized priority: " + priority);
}
private static String fromClusterTimeStamp(long clusterTimeStamp) {
return Long.toString(clusterTimeStamp);
}
@ -165,6 +184,8 @@ public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) {
return Phase.REDUCE;
case CLEANUP:
return Phase.CLEANUP;
default:
break;
}
throw new YarnRuntimeException("Unrecognized Phase: " + phase);
}
@ -327,10 +348,33 @@ private static JobPriority fromYarnPriority(int priority) {
return JobPriority.VERY_LOW;
case 0 :
return JobPriority.DEFAULT;
default :
break;
}
return JobPriority.UNDEFINED_PRIORITY;
}
public static org.apache.hadoop.mapreduce.JobPriority
fromYarnApplicationPriority(int priority) {
switch (priority) {
case 5 :
return org.apache.hadoop.mapreduce.JobPriority.VERY_HIGH;
case 4 :
return org.apache.hadoop.mapreduce.JobPriority.HIGH;
case 3 :
return org.apache.hadoop.mapreduce.JobPriority.NORMAL;
case 2 :
return org.apache.hadoop.mapreduce.JobPriority.LOW;
case 1 :
return org.apache.hadoop.mapreduce.JobPriority.VERY_LOW;
case 0 :
return org.apache.hadoop.mapreduce.JobPriority.DEFAULT;
default :
break;
}
return org.apache.hadoop.mapreduce.JobPriority.UNDEFINED_PRIORITY;
}
public static org.apache.hadoop.mapreduce.QueueState fromYarn(
QueueState state) {
org.apache.hadoop.mapreduce.QueueState qState =
@ -462,7 +506,9 @@ public static JobStatus fromYarn(ApplicationReport application,
TypeConverter.fromYarn(application.getApplicationId()),
0.0f, 0.0f, 0.0f, 0.0f,
TypeConverter.fromYarn(application.getYarnApplicationState(), application.getFinalApplicationStatus()),
org.apache.hadoop.mapreduce.JobPriority.NORMAL,
fromYarnApplicationPriority(
(application.getPriority() == null) ? 0 :
application.getPriority().getPriority()),
application.getUser(), application.getName(),
application.getQueue(), jobFile, trackingUrl, false
);

View File

@ -85,6 +85,7 @@ public void testFromYarn() throws Exception {
applicationReport.setStartTime(appStartTime);
applicationReport.setFinishTime(appFinishTime);
applicationReport.setUser("TestTypeConverter-user");
applicationReport.setPriority(Priority.newInstance(3));
ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class);
Resource r = Records.newRecord(Resource.class);
@ -99,6 +100,7 @@ public void testFromYarn() throws Exception {
Assert.assertEquals(appStartTime, jobStatus.getStartTime());
Assert.assertEquals(appFinishTime, jobStatus.getFinishTime());
Assert.assertEquals(state.toString(), jobStatus.getState().toString());
Assert.assertEquals(JobPriority.NORMAL, jobStatus.getPriority());
}
@Test
@ -113,6 +115,7 @@ public void testFromYarnApplicationReport() {
when(mockReport.getYarnApplicationState()).thenReturn(YarnApplicationState.KILLED);
when(mockReport.getUser()).thenReturn("dummy-user");
when(mockReport.getQueue()).thenReturn("dummy-queue");
when(mockReport.getPriority()).thenReturn(Priority.newInstance(4));
String jobFile = "dummy-path/job.xml";
try {
@ -146,6 +149,7 @@ public void testFromYarnApplicationReport() {
Assert.assertEquals("num used slots info set incorrectly", 3, status.getNumUsedSlots());
Assert.assertEquals("rsvd mem info set incorrectly", 2048, status.getReservedMem());
Assert.assertEquals("used mem info set incorrectly", 2048, status.getUsedMem());
Assert.assertEquals("priority set incorrectly", JobPriority.HIGH, status.getPriority());
}
@Test

View File

@ -1559,25 +1559,105 @@ public void setMaxReduceTaskFailuresPercent(int percent) {
/**
* Set {@link JobPriority} for this job.
*
*
* @param prio the {@link JobPriority} for this job.
*/
public void setJobPriority(JobPriority prio) {
set(JobContext.PRIORITY, prio.toString());
}
/**
* Set {@link JobPriority} for this job.
*
* @param prio the {@link JobPriority} for this job.
*/
public void setJobPriorityAsInteger(int prio) {
set(JobContext.PRIORITY, Integer.toString(prio));
}
/**
* Get the {@link JobPriority} for this job.
*
*
* @return the {@link JobPriority} for this job.
*/
public JobPriority getJobPriority() {
String prio = get(JobContext.PRIORITY);
if(prio == null) {
return JobPriority.NORMAL;
if (prio == null) {
return JobPriority.DEFAULT;
}
return JobPriority.valueOf(prio);
JobPriority priority = JobPriority.DEFAULT;
try {
priority = JobPriority.valueOf(prio);
} catch (IllegalArgumentException e) {
return convertToJobPriority(Integer.parseInt(prio));
}
return priority;
}
/**
* Get the priority for this job.
*
* @return the priority for this job.
*/
public int getJobPriorityAsInteger() {
String priority = get(JobContext.PRIORITY);
if (priority == null) {
return 0;
}
int jobPriority = 0;
try {
jobPriority = convertPriorityToInteger(priority);
} catch (IllegalArgumentException e) {
return Integer.parseInt(priority);
}
return jobPriority;
}
private int convertPriorityToInteger(String priority) {
JobPriority jobPriority = JobPriority.valueOf(priority);
switch (jobPriority) {
case VERY_HIGH :
return 5;
case HIGH :
return 4;
case NORMAL :
return 3;
case LOW :
return 2;
case VERY_LOW :
return 1;
case DEFAULT :
return 0;
default:
break;
}
// If a user sets the priority as "UNDEFINED_PRIORITY", we can return
// 0 which is also default value.
return 0;
}
private JobPriority convertToJobPriority(int priority) {
switch (priority) {
case 5 :
return JobPriority.VERY_HIGH;
case 4 :
return JobPriority.HIGH;
case 3 :
return JobPriority.NORMAL;
case 2 :
return JobPriority.LOW;
case 1 :
return JobPriority.VERY_LOW;
case 0 :
return JobPriority.DEFAULT;
default:
break;
}
return JobPriority.UNDEFINED_PRIORITY;
}
/**

View File

@ -22,6 +22,12 @@
/**
* Used to describe the priority of the running job.
* DEFAULT : While submitting a job, if the user is not specifying priority,
* YARN has the capability to pick the default priority as per its config.
* Hence MapReduce can indicate such cases with this new enum.
* UNDEFINED_PRIORITY : YARN supports priority as an integer. Hence other than
* the five defined enums, YARN can consider other integers also. To generalize
* such cases, this specific enum is used.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable

View File

@ -148,7 +148,7 @@ public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
String user, String jobName,
String jobFile, String trackingUrl) {
this(jobid, mapProgress, reduceProgress, cleanupProgress, runState,
JobPriority.NORMAL, user, jobName, jobFile, trackingUrl);
JobPriority.DEFAULT, user, jobName, jobFile, trackingUrl);
}
/**

View File

@ -407,7 +407,7 @@ public String getSchedulingInfo() {
/**
* Get scheduling info of the job.
*
* @return the scheduling info of the job
* @return the priority info of the job
*/
public JobPriority getPriority() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
@ -635,27 +635,78 @@ public void killJob() throws IOException {
/**
* Set the priority of a running job.
* @param priority the new priority for the job.
* @param jobPriority the new priority for the job.
* @throws IOException
*/
public void setPriority(JobPriority priority)
throws IOException, InterruptedException {
public void setPriority(JobPriority jobPriority) throws IOException,
InterruptedException {
if (state == JobState.DEFINE) {
conf.setJobPriority(
org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
if (jobPriority == JobPriority.UNDEFINED_PRIORITY) {
conf.setJobPriorityAsInteger(convertPriorityToInteger(jobPriority));
} else {
conf.setJobPriority(org.apache.hadoop.mapred.JobPriority
.valueOf(jobPriority.name()));
}
} else {
ensureState(JobState.RUNNING);
final JobPriority tmpPriority = priority;
final int tmpPriority = convertPriorityToInteger(jobPriority);
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException, InterruptedException {
cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
cluster.getClient()
.setJobPriority(getJobID(), Integer.toString(tmpPriority));
return null;
}
});
}
}
/**
* Set the priority of a running job.
*
* @param jobPriority
* the new priority for the job.
* @throws IOException
*/
public void setPriorityAsInteger(int jobPriority) throws IOException,
InterruptedException {
if (state == JobState.DEFINE) {
conf.setJobPriorityAsInteger(jobPriority);
} else {
ensureState(JobState.RUNNING);
final int tmpPriority = jobPriority;
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException, InterruptedException {
cluster.getClient()
.setJobPriority(getJobID(), Integer.toString(tmpPriority));
return null;
}
});
}
}
private int convertPriorityToInteger(JobPriority jobPriority) {
switch (jobPriority) {
case VERY_HIGH :
return 5;
case HIGH :
return 4;
case NORMAL :
return 3;
case LOW :
return 2;
case VERY_LOW :
return 1;
case DEFAULT :
return 0;
default:
break;
}
// For UNDEFINED_PRIORITY, we can set it to default for better handling
return 0;
}
/**
* Get events indicating completion (success/failure) of component tasks.
*

View File

@ -22,7 +22,12 @@
/**
* Used to describe the priority of the running job.
*
* DEFAULT : While submitting a job, if the user is not specifying priority,
* YARN has the capability to pick the default priority as per its config.
* Hence MapReduce can indicate such cases with this new enum.
* UNDEFINED_PRIORITY : YARN supports priority as an integer. Hence other than
* the five defined enums, YARN can consider other integers also. To generalize
* such cases, this specific enum is used.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving

View File

@ -97,6 +97,7 @@ public int run(String[] argv) throws Exception {
String taskState = null;
int fromEvent = 0;
int nEvents = 0;
int jpvalue = 0;
boolean getStatus = false;
boolean getCounter = false;
boolean killJob = false;
@ -149,11 +150,15 @@ public int run(String[] argv) throws Exception {
}
jobid = argv[1];
try {
jp = JobPriority.valueOf(argv[2]);
jp = JobPriority.valueOf(argv[2]);
} catch (IllegalArgumentException iae) {
LOG.info(iae);
displayUsage(cmd);
return exitCode;
try {
jpvalue = Integer.parseInt(argv[2]);
} catch (NumberFormatException ne) {
LOG.info(ne);
displayUsage(cmd);
return exitCode;
}
}
setJobPriority = true;
} else if ("-events".equals(cmd)) {
@ -322,7 +327,11 @@ public int run(String[] argv) throws Exception {
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
job.setPriority(jp);
if (jp != null) {
job.setPriority(jp);
} else {
job.setPriorityAsInteger(jpvalue);
}
System.out.println("Changed job priority.");
exitCode = 0;
}
@ -408,6 +417,10 @@ Cluster createCluster() throws IOException {
private String getJobPriorityNames() {
StringBuffer sb = new StringBuffer();
for (JobPriority p : JobPriority.values()) {
// UNDEFINED_PRIORITY need not to be displayed in usage
if (JobPriority.UNDEFINED_PRIORITY == p) {
continue;
}
sb.append(p.name()).append(" ");
}
return sb.substring(0, sb.length()-1);
@ -444,7 +457,8 @@ private void displayUsage(String cmd) {
} else if ("-set-priority".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
"Valid values for priorities are: "
+ jobPriorityValues);
+ jobPriorityValues
+ ". In addition to this, integers also can be used.");
} else if ("-list-active-trackers".equals(cmd)) {
System.err.println(prefix + "[" + cmd + "]");
} else if ("-list-blacklisted-trackers".equals(cmd)) {
@ -465,7 +479,8 @@ private void displayUsage(String cmd) {
System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
System.err.printf("\t[-kill <job-id>]%n");
System.err.printf("\t[-set-priority <job-id> <priority>]. " +
"Valid values for priorities are: " + jobPriorityValues + "%n");
"Valid values for priorities are: " + jobPriorityValues +
". In addition to this, integers also can be used." + "%n");
System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
System.err.printf("\t[-history <jobHistoryFile>]%n");
System.err.printf("\t[-list [all]]%n");

View File

@ -99,7 +99,7 @@ public void testJobConf() {
assertEquals(70, conf.getMaxReduceTaskFailuresPercent());
// by default
assertEquals(JobPriority.NORMAL.name(), conf.getJobPriority().name());
assertEquals(JobPriority.DEFAULT.name(), conf.getJobPriority().name());
conf.setJobPriority(JobPriority.HIGH);
assertEquals(JobPriority.HIGH.name(), conf.getJobPriority().name());
@ -377,4 +377,49 @@ public void testParseMaximumHeapSizeMB() {
Assert.assertEquals(-1, JobConf.parseMaximumHeapSizeMB("-Xmx4?"));
Assert.assertEquals(-1, JobConf.parseMaximumHeapSizeMB(""));
}
/**
* Test various Job Priority
*/
@Test
public void testJobPriorityConf() {
JobConf conf = new JobConf();
// by default
assertEquals(JobPriority.DEFAULT.name(), conf.getJobPriority().name());
assertEquals(0, conf.getJobPriorityAsInteger());
// Set JobPriority.LOW using old API, and verify output from both getter
conf.setJobPriority(JobPriority.LOW);
assertEquals(JobPriority.LOW.name(), conf.getJobPriority().name());
assertEquals(2, conf.getJobPriorityAsInteger());
// Set JobPriority.VERY_HIGH using old API, and verify output
conf.setJobPriority(JobPriority.VERY_HIGH);
assertEquals(JobPriority.VERY_HIGH.name(), conf.getJobPriority().name());
assertEquals(5, conf.getJobPriorityAsInteger());
// Set 3 as priority using new API, and verify output from both getter
conf.setJobPriorityAsInteger(3);
assertEquals(JobPriority.NORMAL.name(), conf.getJobPriority().name());
assertEquals(3, conf.getJobPriorityAsInteger());
// Set 4 as priority using new API, and verify output
conf.setJobPriorityAsInteger(4);
assertEquals(JobPriority.HIGH.name(), conf.getJobPriority().name());
assertEquals(4, conf.getJobPriorityAsInteger());
// Now set some high integer values and verify output from old api
conf.setJobPriorityAsInteger(57);
assertEquals(JobPriority.UNDEFINED_PRIORITY.name(), conf.getJobPriority()
.name());
assertEquals(57, conf.getJobPriorityAsInteger());
// Error case where UNDEFINED_PRIORITY is set explicitly
conf.setJobPriority(JobPriority.UNDEFINED_PRIORITY);
assertEquals(JobPriority.UNDEFINED_PRIORITY.name(), conf.getJobPriority()
.name());
// As UNDEFINED_PRIORITY cannot be mapped to any integer value, resetting
// to default as 0.
assertEquals(0, conf.getJobPriorityAsInteger());
}
}

View File

@ -41,7 +41,7 @@ public void testJobToString() throws IOException, InterruptedException {
when(cluster.getClient()).thenReturn(client);
JobID jobid = new JobID("1014873536921", 6);
JobStatus status = new JobStatus(jobid, 0.0f, 0.0f, 0.0f, 0.0f,
State.FAILED, JobPriority.NORMAL, "root", "TestJobToString",
State.FAILED, JobPriority.DEFAULT, "root", "TestJobToString",
"job file", "tracking url");
when(client.getJobStatus(jobid)).thenReturn(status);
when(client.getTaskReports(jobid, TaskType.MAP)).thenReturn(

View File

@ -562,13 +562,30 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
}
String jobPriority = jobConf.get(MRJobConfig.PRIORITY);
if (jobPriority != null) {
int iPriority;
try {
iPriority = TypeConverter.toYarnApplicationPriority(jobPriority);
} catch (IllegalArgumentException e) {
iPriority = Integer.parseInt(jobPriority);
}
appContext.setPriority(Priority.newInstance(iPriority));
}
return appContext;
}
@Override
public void setJobPriority(JobID arg0, String arg1) throws IOException,
InterruptedException {
resMgrDelegate.setJobPriority(arg0, arg1);
ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();
try {
resMgrDelegate.updateApplicationPriority(appId,
Priority.newInstance(Integer.parseInt(arg1)));
} catch (YarnException e) {
throw new IOException(e);
}
}
@Override

View File

@ -87,6 +87,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -604,6 +605,30 @@ public void testAMStandardEnv() throws Exception {
assertEquals("Bad SHELL setting", USER_SHELL, shell);
}
@Test
public void testJobPriority() throws Exception {
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.PRIORITY, "LOW");
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext appSubCtx = buildSubmitContext(yarnRunner,
jobConf);
// 2 corresponds to LOW
assertEquals(appSubCtx.getPriority(), Priority.newInstance(2));
// Set an integer explicitly
jobConf.set(MRJobConfig.PRIORITY, "12");
yarnRunner = new YARNRunner(jobConf);
appSubCtx = buildSubmitContext(yarnRunner,
jobConf);
// Verify whether 12 is set to submission context
assertEquals(appSubCtx.getPriority(), Priority.newInstance(12));
}
private ApplicationSubmissionContext buildSubmitContext(
YARNRunner yarnRunner, JobConf jobConf) throws IOException {
File jobxml = new File(testWorkDir, MRJobConfig.JOB_CONF_FILE);

View File

@ -527,8 +527,9 @@ public void testChangingJobPriority(String jobId, Configuration conf)
exitCode = runTool(conf, createJobClient(), new String[] { "-set-priority",
jobId, "VERY_LOW" }, new ByteArrayOutputStream());
assertEquals("Exit code", 0, exitCode);
// because this method does not implemented still.
verifyJobPriority(jobId, "NORMAL", conf, createJobClient());
// set-priority is fired after job is completed in YARN, hence need not
// have to update the priority.
verifyJobPriority(jobId, "DEFAULT", conf, createJobClient());
}
/**

View File

@ -66,6 +66,7 @@
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -159,6 +160,7 @@ public static void setup() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
conf.setInt("yarn.cluster.max-application-priority", 10);
mrCluster.init(conf);
mrCluster.start();
}
@ -242,6 +244,67 @@ private void testSleepJobInternal(boolean useRemoteJar) throws Exception {
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
}
@Test(timeout = 3000000)
public void testJobWithChangePriority() throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set master address to local to test that local mode applied if framework
// equals local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
sleepConf
.setInt("yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms", 5);
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 1000, 20, 50, 1);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(SleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
// Set the priority to HIGH
job.setPriority(JobPriority.HIGH);
waitForPriorityToUpdate(job, JobPriority.HIGH);
// Verify the priority from job itself
Assert.assertEquals(job.getPriority(), JobPriority.HIGH);
// Change priority to NORMAL (3) with new api
job.setPriorityAsInteger(3); // Verify the priority from job itself
waitForPriorityToUpdate(job, JobPriority.NORMAL);
Assert.assertEquals(job.getPriority(), JobPriority.NORMAL);
// Change priority to a high integer value with new api
job.setPriorityAsInteger(89); // Verify the priority from job itself
waitForPriorityToUpdate(job, JobPriority.UNDEFINED_PRIORITY);
Assert.assertEquals(job.getPriority(), JobPriority.UNDEFINED_PRIORITY);
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
}
private void waitForPriorityToUpdate(Job job, JobPriority expectedStatus)
throws IOException, InterruptedException {
// Max wait time to get the priority update can be kept as 20sec (100 *
// 100ms)
int waitCnt = 200;
while (waitCnt-- > 0) {
if (job.getPriority().equals(expectedStatus)) {
// Stop waiting as priority is updated.
break;
} else {
Thread.sleep(100);
}
}
}
@Test(timeout = 300000)
public void testConfVerificationWithClassloader() throws Exception {
testConfVerification(true, false, false, false);