MAPREDUCE-5870. Support for passing Job priority through Application Submission Context in Mapreduce Side. Contributed by Sunil G

(cherry picked from commit f634505d48)

Conflicts:

	hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
This commit is contained in:
Jason Lowe 2015-11-24 22:15:37 +00:00
parent ef3e01a1e4
commit 2f18218508
18 changed files with 402 additions and 32 deletions

View File

@ -139,6 +139,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6499. Add elapsed time for retired job in JobHistoryServer WebUI. MAPREDUCE-6499. Add elapsed time for retired job in JobHistoryServer WebUI.
(Lin Yiqun via aajisaka) (Lin Yiqun via aajisaka)
MAPREDUCE-5870. Support for passing Job priority through Application
Submission Context in Mapreduce Side (Sunil G via jlowe)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.ClientRMProxy;
@ -146,6 +147,11 @@ public class LocalContainerAllocator extends RMCommunicator
if (token != null) { if (token != null) {
updateAMRMToken(token); updateAMRMToken(token);
} }
Priority priorityFromResponse = Priority.newInstance(allocateResponse
.getApplicationPriority().getPriority());
// Update the job priority to Job directly.
getJob().setJobPriority(priorityFromResponse);
} }
} }

View File

@ -914,7 +914,7 @@ public class TestJobImpl {
assertJobState(job, JobStateInternal.RUNNING); assertJobState(job, JobStateInternal.RUNNING);
// Update priority of job to 8, and see whether its updated // Update priority of job to 8, and see whether its updated
Priority updatedPriority = Priority.newInstance(5); Priority updatedPriority = Priority.newInstance(8);
job.setJobPriority(updatedPriority); job.setJobPriority(updatedPriority);
assertJobState(job, JobStateInternal.RUNNING); assertJobState(job, JobStateInternal.RUNNING);
Priority jobPriority = job.getReport().getJobPriority(); Priority jobPriority = job.getReport().getJobPriority();

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -245,7 +246,7 @@ public class TestLocalContainerAllocator {
amToken.getIdentifier(), amToken.getKind().toString(), amToken.getIdentifier(), amToken.getKind().toString(),
amToken.getPassword(), amToken.getService().toString()); amToken.getPassword(), amToken.getService().toString());
} }
return AllocateResponse.newInstance(responseId, AllocateResponse response = AllocateResponse.newInstance(responseId,
Collections.<ContainerStatus>emptyList(), Collections.<ContainerStatus>emptyList(),
Collections.<Container>emptyList(), Collections.<Container>emptyList(),
Collections.<NodeReport>emptyList(), Collections.<NodeReport>emptyList(),
@ -254,6 +255,8 @@ public class TestLocalContainerAllocator {
yarnToken, yarnToken,
Collections.<Container>emptyList(), Collections.<Container>emptyList(),
Collections.<Container>emptyList()); Collections.<Container>emptyList());
response.setApplicationPriority(Priority.newInstance(0));
return response;
} }
} }
} }

View File

@ -84,6 +84,25 @@ public class TypeConverter {
return jobId; return jobId;
} }
public static int toYarnApplicationPriority(String priority) {
JobPriority jobPriority = JobPriority.valueOf(priority);
switch (jobPriority) {
case VERY_HIGH :
return 5;
case HIGH :
return 4;
case NORMAL :
return 3;
case LOW :
return 2;
case VERY_LOW :
return 1;
case DEFAULT :
return 0;
}
throw new IllegalArgumentException("Unrecognized priority: " + priority);
}
private static String fromClusterTimeStamp(long clusterTimeStamp) { private static String fromClusterTimeStamp(long clusterTimeStamp) {
return Long.toString(clusterTimeStamp); return Long.toString(clusterTimeStamp);
} }
@ -165,6 +184,8 @@ public class TypeConverter {
return Phase.REDUCE; return Phase.REDUCE;
case CLEANUP: case CLEANUP:
return Phase.CLEANUP; return Phase.CLEANUP;
default:
break;
} }
throw new YarnRuntimeException("Unrecognized Phase: " + phase); throw new YarnRuntimeException("Unrecognized Phase: " + phase);
} }
@ -327,10 +348,33 @@ public class TypeConverter {
return JobPriority.VERY_LOW; return JobPriority.VERY_LOW;
case 0 : case 0 :
return JobPriority.DEFAULT; return JobPriority.DEFAULT;
default :
break;
} }
return JobPriority.UNDEFINED_PRIORITY; return JobPriority.UNDEFINED_PRIORITY;
} }
public static org.apache.hadoop.mapreduce.JobPriority
fromYarnApplicationPriority(int priority) {
switch (priority) {
case 5 :
return org.apache.hadoop.mapreduce.JobPriority.VERY_HIGH;
case 4 :
return org.apache.hadoop.mapreduce.JobPriority.HIGH;
case 3 :
return org.apache.hadoop.mapreduce.JobPriority.NORMAL;
case 2 :
return org.apache.hadoop.mapreduce.JobPriority.LOW;
case 1 :
return org.apache.hadoop.mapreduce.JobPriority.VERY_LOW;
case 0 :
return org.apache.hadoop.mapreduce.JobPriority.DEFAULT;
default :
break;
}
return org.apache.hadoop.mapreduce.JobPriority.UNDEFINED_PRIORITY;
}
public static org.apache.hadoop.mapreduce.QueueState fromYarn( public static org.apache.hadoop.mapreduce.QueueState fromYarn(
QueueState state) { QueueState state) {
org.apache.hadoop.mapreduce.QueueState qState = org.apache.hadoop.mapreduce.QueueState qState =
@ -462,7 +506,9 @@ public class TypeConverter {
TypeConverter.fromYarn(application.getApplicationId()), TypeConverter.fromYarn(application.getApplicationId()),
0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 0.0f,
TypeConverter.fromYarn(application.getYarnApplicationState(), application.getFinalApplicationStatus()), TypeConverter.fromYarn(application.getYarnApplicationState(), application.getFinalApplicationStatus()),
org.apache.hadoop.mapreduce.JobPriority.NORMAL, fromYarnApplicationPriority(
(application.getPriority() == null) ? 0 :
application.getPriority().getPriority()),
application.getUser(), application.getName(), application.getUser(), application.getName(),
application.getQueue(), jobFile, trackingUrl, false application.getQueue(), jobFile, trackingUrl, false
); );

View File

@ -85,6 +85,7 @@ public class TestTypeConverter {
applicationReport.setStartTime(appStartTime); applicationReport.setStartTime(appStartTime);
applicationReport.setFinishTime(appFinishTime); applicationReport.setFinishTime(appFinishTime);
applicationReport.setUser("TestTypeConverter-user"); applicationReport.setUser("TestTypeConverter-user");
applicationReport.setPriority(Priority.newInstance(3));
ApplicationResourceUsageReport appUsageRpt = Records ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class); .newRecord(ApplicationResourceUsageReport.class);
Resource r = Records.newRecord(Resource.class); Resource r = Records.newRecord(Resource.class);
@ -99,6 +100,7 @@ public class TestTypeConverter {
Assert.assertEquals(appStartTime, jobStatus.getStartTime()); Assert.assertEquals(appStartTime, jobStatus.getStartTime());
Assert.assertEquals(appFinishTime, jobStatus.getFinishTime()); Assert.assertEquals(appFinishTime, jobStatus.getFinishTime());
Assert.assertEquals(state.toString(), jobStatus.getState().toString()); Assert.assertEquals(state.toString(), jobStatus.getState().toString());
Assert.assertEquals(JobPriority.NORMAL, jobStatus.getPriority());
} }
@Test @Test
@ -113,6 +115,7 @@ public class TestTypeConverter {
when(mockReport.getYarnApplicationState()).thenReturn(YarnApplicationState.KILLED); when(mockReport.getYarnApplicationState()).thenReturn(YarnApplicationState.KILLED);
when(mockReport.getUser()).thenReturn("dummy-user"); when(mockReport.getUser()).thenReturn("dummy-user");
when(mockReport.getQueue()).thenReturn("dummy-queue"); when(mockReport.getQueue()).thenReturn("dummy-queue");
when(mockReport.getPriority()).thenReturn(Priority.newInstance(4));
String jobFile = "dummy-path/job.xml"; String jobFile = "dummy-path/job.xml";
try { try {
@ -146,6 +149,7 @@ public class TestTypeConverter {
Assert.assertEquals("num used slots info set incorrectly", 3, status.getNumUsedSlots()); Assert.assertEquals("num used slots info set incorrectly", 3, status.getNumUsedSlots());
Assert.assertEquals("rsvd mem info set incorrectly", 2048, status.getReservedMem()); Assert.assertEquals("rsvd mem info set incorrectly", 2048, status.getReservedMem());
Assert.assertEquals("used mem info set incorrectly", 2048, status.getUsedMem()); Assert.assertEquals("used mem info set incorrectly", 2048, status.getUsedMem());
Assert.assertEquals("priority set incorrectly", JobPriority.HIGH, status.getPriority());
} }
@Test @Test

View File

@ -1561,6 +1561,15 @@ public class JobConf extends Configuration {
set(JobContext.PRIORITY, prio.toString()); set(JobContext.PRIORITY, prio.toString());
} }
/**
* Set {@link JobPriority} for this job.
*
* @param prio the {@link JobPriority} for this job.
*/
public void setJobPriorityAsInteger(int prio) {
set(JobContext.PRIORITY, Integer.toString(prio));
}
/** /**
* Get the {@link JobPriority} for this job. * Get the {@link JobPriority} for this job.
* *
@ -1569,10 +1578,81 @@ public class JobConf extends Configuration {
public JobPriority getJobPriority() { public JobPriority getJobPriority() {
String prio = get(JobContext.PRIORITY); String prio = get(JobContext.PRIORITY);
if (prio == null) { if (prio == null) {
return JobPriority.NORMAL; return JobPriority.DEFAULT;
} }
return JobPriority.valueOf(prio); JobPriority priority = JobPriority.DEFAULT;
try {
priority = JobPriority.valueOf(prio);
} catch (IllegalArgumentException e) {
return convertToJobPriority(Integer.parseInt(prio));
}
return priority;
}
/**
* Get the priority for this job.
*
* @return the priority for this job.
*/
public int getJobPriorityAsInteger() {
String priority = get(JobContext.PRIORITY);
if (priority == null) {
return 0;
}
int jobPriority = 0;
try {
jobPriority = convertPriorityToInteger(priority);
} catch (IllegalArgumentException e) {
return Integer.parseInt(priority);
}
return jobPriority;
}
private int convertPriorityToInteger(String priority) {
JobPriority jobPriority = JobPriority.valueOf(priority);
switch (jobPriority) {
case VERY_HIGH :
return 5;
case HIGH :
return 4;
case NORMAL :
return 3;
case LOW :
return 2;
case VERY_LOW :
return 1;
case DEFAULT :
return 0;
default:
break;
}
// If a user sets the priority as "UNDEFINED_PRIORITY", we can return
// 0 which is also default value.
return 0;
}
private JobPriority convertToJobPriority(int priority) {
switch (priority) {
case 5 :
return JobPriority.VERY_HIGH;
case 4 :
return JobPriority.HIGH;
case 3 :
return JobPriority.NORMAL;
case 2 :
return JobPriority.LOW;
case 1 :
return JobPriority.VERY_LOW;
case 0 :
return JobPriority.DEFAULT;
default:
break;
}
return JobPriority.UNDEFINED_PRIORITY;
} }
/** /**

View File

@ -22,6 +22,12 @@ import org.apache.hadoop.classification.InterfaceStability;
/** /**
* Used to describe the priority of the running job. * Used to describe the priority of the running job.
* DEFAULT : While submitting a job, if the user is not specifying priority,
* YARN has the capability to pick the default priority as per its config.
* Hence MapReduce can indicate such cases with this new enum.
* UNDEFINED_PRIORITY : YARN supports priority as an integer. Hence other than
* the five defined enums, YARN can consider other integers also. To generalize
* such cases, this specific enum is used.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable

View File

@ -148,7 +148,7 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
String user, String jobName, String user, String jobName,
String jobFile, String trackingUrl) { String jobFile, String trackingUrl) {
this(jobid, mapProgress, reduceProgress, cleanupProgress, runState, this(jobid, mapProgress, reduceProgress, cleanupProgress, runState,
JobPriority.NORMAL, user, jobName, jobFile, trackingUrl); JobPriority.DEFAULT, user, jobName, jobFile, trackingUrl);
} }
/** /**

View File

@ -407,7 +407,7 @@ public class Job extends JobContextImpl implements JobContext {
/** /**
* Get scheduling info of the job. * Get scheduling info of the job.
* *
* @return the scheduling info of the job * @return the priority info of the job
*/ */
public JobPriority getPriority() throws IOException, InterruptedException { public JobPriority getPriority() throws IOException, InterruptedException {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
@ -635,27 +635,78 @@ public class Job extends JobContextImpl implements JobContext {
/** /**
* Set the priority of a running job. * Set the priority of a running job.
* @param priority the new priority for the job. * @param jobPriority the new priority for the job.
* @throws IOException * @throws IOException
*/ */
public void setPriority(JobPriority priority) public void setPriority(JobPriority jobPriority) throws IOException,
throws IOException, InterruptedException { InterruptedException {
if (state == JobState.DEFINE) { if (state == JobState.DEFINE) {
conf.setJobPriority( if (jobPriority == JobPriority.UNDEFINED_PRIORITY) {
org.apache.hadoop.mapred.JobPriority.valueOf(priority.name())); conf.setJobPriorityAsInteger(convertPriorityToInteger(jobPriority));
} else {
conf.setJobPriority(org.apache.hadoop.mapred.JobPriority
.valueOf(jobPriority.name()));
}
} else { } else {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
final JobPriority tmpPriority = priority; final int tmpPriority = convertPriorityToInteger(jobPriority);
ugi.doAs(new PrivilegedExceptionAction<Object>() { ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override @Override
public Object run() throws IOException, InterruptedException { public Object run() throws IOException, InterruptedException {
cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString()); cluster.getClient()
.setJobPriority(getJobID(), Integer.toString(tmpPriority));
return null; return null;
} }
}); });
} }
} }
/**
* Set the priority of a running job.
*
* @param jobPriority
* the new priority for the job.
* @throws IOException
*/
public void setPriorityAsInteger(int jobPriority) throws IOException,
InterruptedException {
if (state == JobState.DEFINE) {
conf.setJobPriorityAsInteger(jobPriority);
} else {
ensureState(JobState.RUNNING);
final int tmpPriority = jobPriority;
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException, InterruptedException {
cluster.getClient()
.setJobPriority(getJobID(), Integer.toString(tmpPriority));
return null;
}
});
}
}
private int convertPriorityToInteger(JobPriority jobPriority) {
switch (jobPriority) {
case VERY_HIGH :
return 5;
case HIGH :
return 4;
case NORMAL :
return 3;
case LOW :
return 2;
case VERY_LOW :
return 1;
case DEFAULT :
return 0;
default:
break;
}
// For UNDEFINED_PRIORITY, we can set it to default for better handling
return 0;
}
/** /**
* Get events indicating completion (success/failure) of component tasks. * Get events indicating completion (success/failure) of component tasks.
* *

View File

@ -22,7 +22,12 @@ import org.apache.hadoop.classification.InterfaceStability;
/** /**
* Used to describe the priority of the running job. * Used to describe the priority of the running job.
* * DEFAULT : While submitting a job, if the user is not specifying priority,
* YARN has the capability to pick the default priority as per its config.
* Hence MapReduce can indicate such cases with this new enum.
* UNDEFINED_PRIORITY : YARN supports priority as an integer. Hence other than
* the five defined enums, YARN can consider other integers also. To generalize
* such cases, this specific enum is used.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving

View File

@ -97,6 +97,7 @@ public class CLI extends Configured implements Tool {
String taskState = null; String taskState = null;
int fromEvent = 0; int fromEvent = 0;
int nEvents = 0; int nEvents = 0;
int jpvalue = 0;
boolean getStatus = false; boolean getStatus = false;
boolean getCounter = false; boolean getCounter = false;
boolean killJob = false; boolean killJob = false;
@ -151,10 +152,14 @@ public class CLI extends Configured implements Tool {
try { try {
jp = JobPriority.valueOf(argv[2]); jp = JobPriority.valueOf(argv[2]);
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
LOG.info(iae); try {
jpvalue = Integer.parseInt(argv[2]);
} catch (NumberFormatException ne) {
LOG.info(ne);
displayUsage(cmd); displayUsage(cmd);
return exitCode; return exitCode;
} }
}
setJobPriority = true; setJobPriority = true;
} else if ("-events".equals(cmd)) { } else if ("-events".equals(cmd)) {
if (argv.length != 4) { if (argv.length != 4) {
@ -322,7 +327,11 @@ public class CLI extends Configured implements Tool {
if (job == null) { if (job == null) {
System.out.println("Could not find job " + jobid); System.out.println("Could not find job " + jobid);
} else { } else {
if (jp != null) {
job.setPriority(jp); job.setPriority(jp);
} else {
job.setPriorityAsInteger(jpvalue);
}
System.out.println("Changed job priority."); System.out.println("Changed job priority.");
exitCode = 0; exitCode = 0;
} }
@ -408,6 +417,10 @@ public class CLI extends Configured implements Tool {
private String getJobPriorityNames() { private String getJobPriorityNames() {
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
for (JobPriority p : JobPriority.values()) { for (JobPriority p : JobPriority.values()) {
// UNDEFINED_PRIORITY need not to be displayed in usage
if (JobPriority.UNDEFINED_PRIORITY == p) {
continue;
}
sb.append(p.name()).append(" "); sb.append(p.name()).append(" ");
} }
return sb.substring(0, sb.length()-1); return sb.substring(0, sb.length()-1);
@ -444,7 +457,8 @@ public class CLI extends Configured implements Tool {
} else if ("-set-priority".equals(cmd)) { } else if ("-set-priority".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " + System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
"Valid values for priorities are: " "Valid values for priorities are: "
+ jobPriorityValues); + jobPriorityValues
+ ". In addition to this, integers also can be used.");
} else if ("-list-active-trackers".equals(cmd)) { } else if ("-list-active-trackers".equals(cmd)) {
System.err.println(prefix + "[" + cmd + "]"); System.err.println(prefix + "[" + cmd + "]");
} else if ("-list-blacklisted-trackers".equals(cmd)) { } else if ("-list-blacklisted-trackers".equals(cmd)) {
@ -465,7 +479,8 @@ public class CLI extends Configured implements Tool {
System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n"); System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
System.err.printf("\t[-kill <job-id>]%n"); System.err.printf("\t[-kill <job-id>]%n");
System.err.printf("\t[-set-priority <job-id> <priority>]. " + System.err.printf("\t[-set-priority <job-id> <priority>]. " +
"Valid values for priorities are: " + jobPriorityValues + "%n"); "Valid values for priorities are: " + jobPriorityValues +
". In addition to this, integers also can be used." + "%n");
System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n"); System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
System.err.printf("\t[-history <jobHistoryFile>]%n"); System.err.printf("\t[-history <jobHistoryFile>]%n");
System.err.printf("\t[-list [all]]%n"); System.err.printf("\t[-list [all]]%n");

View File

@ -99,7 +99,7 @@ public class TestJobConf {
assertEquals(70, conf.getMaxReduceTaskFailuresPercent()); assertEquals(70, conf.getMaxReduceTaskFailuresPercent());
// by default // by default
assertEquals(JobPriority.NORMAL.name(), conf.getJobPriority().name()); assertEquals(JobPriority.DEFAULT.name(), conf.getJobPriority().name());
conf.setJobPriority(JobPriority.HIGH); conf.setJobPriority(JobPriority.HIGH);
assertEquals(JobPriority.HIGH.name(), conf.getJobPriority().name()); assertEquals(JobPriority.HIGH.name(), conf.getJobPriority().name());
@ -359,4 +359,49 @@ public class TestJobConf {
jobConf.getMaxTaskFailuresPerTracker() < jobConf.getMaxReduceAttempts() jobConf.getMaxTaskFailuresPerTracker() < jobConf.getMaxReduceAttempts()
); );
} }
/**
* Test various Job Priority
*/
@Test
public void testJobPriorityConf() {
JobConf conf = new JobConf();
// by default
assertEquals(JobPriority.DEFAULT.name(), conf.getJobPriority().name());
assertEquals(0, conf.getJobPriorityAsInteger());
// Set JobPriority.LOW using old API, and verify output from both getter
conf.setJobPriority(JobPriority.LOW);
assertEquals(JobPriority.LOW.name(), conf.getJobPriority().name());
assertEquals(2, conf.getJobPriorityAsInteger());
// Set JobPriority.VERY_HIGH using old API, and verify output
conf.setJobPriority(JobPriority.VERY_HIGH);
assertEquals(JobPriority.VERY_HIGH.name(), conf.getJobPriority().name());
assertEquals(5, conf.getJobPriorityAsInteger());
// Set 3 as priority using new API, and verify output from both getter
conf.setJobPriorityAsInteger(3);
assertEquals(JobPriority.NORMAL.name(), conf.getJobPriority().name());
assertEquals(3, conf.getJobPriorityAsInteger());
// Set 4 as priority using new API, and verify output
conf.setJobPriorityAsInteger(4);
assertEquals(JobPriority.HIGH.name(), conf.getJobPriority().name());
assertEquals(4, conf.getJobPriorityAsInteger());
// Now set some high integer values and verify output from old api
conf.setJobPriorityAsInteger(57);
assertEquals(JobPriority.UNDEFINED_PRIORITY.name(), conf.getJobPriority()
.name());
assertEquals(57, conf.getJobPriorityAsInteger());
// Error case where UNDEFINED_PRIORITY is set explicitly
conf.setJobPriority(JobPriority.UNDEFINED_PRIORITY);
assertEquals(JobPriority.UNDEFINED_PRIORITY.name(), conf.getJobPriority()
.name());
// As UNDEFINED_PRIORITY cannot be mapped to any integer value, resetting
// to default as 0.
assertEquals(0, conf.getJobPriorityAsInteger());
}
} }

View File

@ -41,7 +41,7 @@ public class TestJob {
when(cluster.getClient()).thenReturn(client); when(cluster.getClient()).thenReturn(client);
JobID jobid = new JobID("1014873536921", 6); JobID jobid = new JobID("1014873536921", 6);
JobStatus status = new JobStatus(jobid, 0.0f, 0.0f, 0.0f, 0.0f, JobStatus status = new JobStatus(jobid, 0.0f, 0.0f, 0.0f, 0.0f,
State.FAILED, JobPriority.NORMAL, "root", "TestJobToString", State.FAILED, JobPriority.DEFAULT, "root", "TestJobToString",
"job file", "tracking url"); "job file", "tracking url");
when(client.getJobStatus(jobid)).thenReturn(status); when(client.getJobStatus(jobid)).thenReturn(status);
when(client.getTaskReports(jobid, TaskType.MAP)).thenReturn( when(client.getTaskReports(jobid, TaskType.MAP)).thenReturn(

View File

@ -562,13 +562,30 @@ public class YARNRunner implements ClientProtocol {
appContext.setApplicationTags(new HashSet<String>(tagsFromConf)); appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
} }
String jobPriority = jobConf.get(MRJobConfig.PRIORITY);
if (jobPriority != null) {
int iPriority;
try {
iPriority = TypeConverter.toYarnApplicationPriority(jobPriority);
} catch (IllegalArgumentException e) {
iPriority = Integer.parseInt(jobPriority);
}
appContext.setPriority(Priority.newInstance(iPriority));
}
return appContext; return appContext;
} }
@Override @Override
public void setJobPriority(JobID arg0, String arg1) throws IOException, public void setJobPriority(JobID arg0, String arg1) throws IOException,
InterruptedException { InterruptedException {
resMgrDelegate.setJobPriority(arg0, arg1); ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();
try {
resMgrDelegate.updateApplicationPriority(appId,
Priority.newInstance(Integer.parseInt(arg1)));
} catch (YarnException e) {
throw new IOException(e);
}
} }
@Override @Override

View File

@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -604,6 +605,30 @@ public class TestYARNRunner extends TestCase {
assertEquals("Bad SHELL setting", USER_SHELL, shell); assertEquals("Bad SHELL setting", USER_SHELL, shell);
} }
@Test
public void testJobPriority() throws Exception {
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.PRIORITY, "LOW");
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext appSubCtx = buildSubmitContext(yarnRunner,
jobConf);
// 2 corresponds to LOW
assertEquals(appSubCtx.getPriority(), Priority.newInstance(2));
// Set an integer explicitly
jobConf.set(MRJobConfig.PRIORITY, "12");
yarnRunner = new YARNRunner(jobConf);
appSubCtx = buildSubmitContext(yarnRunner,
jobConf);
// Verify whether 12 is set to submission context
assertEquals(appSubCtx.getPriority(), Priority.newInstance(12));
}
private ApplicationSubmissionContext buildSubmitContext( private ApplicationSubmissionContext buildSubmitContext(
YARNRunner yarnRunner, JobConf jobConf) throws IOException { YARNRunner yarnRunner, JobConf jobConf) throws IOException {
File jobxml = new File(testWorkDir, MRJobConfig.JOB_CONF_FILE); File jobxml = new File(testWorkDir, MRJobConfig.JOB_CONF_FILE);

View File

@ -527,8 +527,9 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
exitCode = runTool(conf, createJobClient(), new String[] { "-set-priority", exitCode = runTool(conf, createJobClient(), new String[] { "-set-priority",
jobId, "VERY_LOW" }, new ByteArrayOutputStream()); jobId, "VERY_LOW" }, new ByteArrayOutputStream());
assertEquals("Exit code", 0, exitCode); assertEquals("Exit code", 0, exitCode);
// because this method does not implemented still. // set-priority is fired after job is completed in YARN, hence need not
verifyJobPriority(jobId, "NORMAL", conf, createJobClient()); // have to update the priority.
verifyJobPriority(jobId, "DEFAULT", conf, createJobClient());
} }
protected CLI createJobClient() throws IOException { protected CLI createJobClient() throws IOException {

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
@ -159,6 +160,7 @@ public class TestMRJobs {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
conf.setInt("yarn.cluster.max-application-priority", 10);
mrCluster.init(conf); mrCluster.init(conf);
mrCluster.start(); mrCluster.start();
} }
@ -242,6 +244,67 @@ public class TestMRJobs {
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
} }
@Test(timeout = 3000000)
public void testJobWithChangePriority() throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set master address to local to test that local mode applied if framework
// equals local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
sleepConf
.setInt("yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms", 5);
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 1000, 20, 50, 1);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(SleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
// Set the priority to HIGH
job.setPriority(JobPriority.HIGH);
waitForPriorityToUpdate(job, JobPriority.HIGH);
// Verify the priority from job itself
Assert.assertEquals(job.getPriority(), JobPriority.HIGH);
// Change priority to NORMAL (3) with new api
job.setPriorityAsInteger(3); // Verify the priority from job itself
waitForPriorityToUpdate(job, JobPriority.NORMAL);
Assert.assertEquals(job.getPriority(), JobPriority.NORMAL);
// Change priority to a high integer value with new api
job.setPriorityAsInteger(89); // Verify the priority from job itself
waitForPriorityToUpdate(job, JobPriority.UNDEFINED_PRIORITY);
Assert.assertEquals(job.getPriority(), JobPriority.UNDEFINED_PRIORITY);
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
}
private void waitForPriorityToUpdate(Job job, JobPriority expectedStatus)
throws IOException, InterruptedException {
// Max wait time to get the priority update can be kept as 20sec (100 *
// 100ms)
int waitCnt = 200;
while (waitCnt-- > 0) {
if (job.getPriority().equals(expectedStatus)) {
// Stop waiting as priority is updated.
break;
} else {
Thread.sleep(100);
}
}
}
@Test(timeout = 300000) @Test(timeout = 300000)
public void testConfVerificationWithClassloader() throws Exception { public void testConfVerificationWithClassloader() throws Exception {
testConfVerification(true, false, false, false); testConfVerification(true, false, false, false);