MAPREDUCE-6515. Update Application priority in AM side from AM-RM heartbeat. Contributed by Sunil G

This commit is contained in:
Jason Lowe 2015-10-29 18:05:01 +00:00
parent e2267de207
commit cf953b6258
19 changed files with 191 additions and 19 deletions

View File

@ -620,6 +620,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6508. TestNetworkedJob fails consistently due to delegation
token changes on RM. (Akira AJISAKA via junping_du)
MAPREDUCE-6515. Update Application priority in AM side from AM-RM heartbeat
(Sunil G via jlowe)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
/**
@ -100,4 +101,5 @@ public interface Job {
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
public void setQueueName(String queueName);
public void setJobPriority(Priority priority);
}

View File

@ -120,6 +120,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@ -653,6 +654,8 @@ JobEventType.JOB_KILL, new KillTasksTransition())
private JobState lastNonFinalState = JobState.NEW;
private volatile Priority jobPriority = Priority.newInstance(0);
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
@ -878,7 +881,8 @@ public JobReport getReport() {
reporterUserName,
state, appSubmitTime, startTime, finishTime, setupProgress,
this.mapProgress, this.reduceProgress,
cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
cleanupProgress, jobFile, amInfos, isUber, diagsb.toString(),
jobPriority);
return report;
} finally {
readLock.unlock();
@ -2219,4 +2223,9 @@ public float getMaxAllowedFetchFailuresFraction() {
public int getMaxFetchFailuresNotifications() {
return maxFetchFailuresNotifications;
}
@Override
public void setJobPriority(Priority priority) {
this.jobPriority = priority;
}
}

View File

@ -797,6 +797,7 @@ private List<Container> getResources() throws Exception {
computeIgnoreBlacklisting();
handleUpdatedNodes(response);
handleJobPriorityChange(response);
for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont.getContainerId());
@ -921,6 +922,14 @@ private void handleUpdatedNodes(AllocateResponse response) {
}
}
private void handleJobPriorityChange(AllocateResponse response) {
Priority priorityFromResponse = Priority.newInstance(response
.getApplicationPriority().getPriority());
// Update the job priority to Job directly.
getJob().setJobPriority(priorityFromResponse);
}
@Private
public Resource getResourceLimit() {
Resource headRoom = getAvailableResources();

View File

@ -66,6 +66,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.collect.Iterators;
@ -634,6 +635,11 @@ public Configuration loadConfFile() throws IOException {
public void setQueueName(String queueName) {
// do nothing
}
@Override
public void setJobPriority(Priority priority) {
// do nothing
}
};
}

View File

@ -69,6 +69,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -526,6 +527,11 @@ public Configuration loadConfFile() {
public void setQueueName(String queueName) {
// do nothing
}
@Override
public void setJobPriority(Priority priority) {
// do nothing
}
}
/*

View File

@ -72,6 +72,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
@ -92,6 +93,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@ -889,6 +891,39 @@ protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
job.getDiagnostics().toString().contains(EXCEPTIONMSG));
}
@Test
public void testJobPriorityUpdate() throws Exception {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
Priority submittedPriority = Priority.newInstance(5);
AppContext mockContext = mock(AppContext.class);
when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.SETUP);
// Update priority of job to 5, and it will be updated
job.setJobPriority(submittedPriority);
Assert.assertEquals(submittedPriority, job.getReport().getJobPriority());
job.handle(new JobSetupCompletedEvent(jobId));
assertJobState(job, JobStateInternal.RUNNING);
// Update priority of job to 8, and see whether its updated
Priority updatedPriority = Priority.newInstance(5);
job.setJobPriority(updatedPriority);
assertJobState(job, JobStateInternal.RUNNING);
Priority jobPriority = job.getReport().getJobPriority();
Assert.assertNotNull(jobPriority);
// Verify whether changed priority is same as what is set in Job.
Assert.assertEquals(updatedPriority, jobPriority);
}
private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) {
final SystemClock clock = new SystemClock();

View File

@ -2948,6 +2948,8 @@ public AllocateResponse allocate(AllocateRequest request)
Collections.<NodeReport>emptyList(),
Resource.newInstance(512000, 1024), null, 10, null,
Collections.<NMToken>emptyList());
// RM will always ensure that a default priority is sent to AM
response.setApplicationPriority(Priority.newInstance(0));
containersToComplete.clear();
containersToAllocate.clear();
return response;

View File

@ -298,19 +298,39 @@ public static Counters toYarn(org.apache.hadoop.mapreduce.Counters counters) {
}
public static JobStatus fromYarn(JobReport jobreport, String trackingUrl) {
JobPriority jobPriority = JobPriority.NORMAL;
JobPriority jobPriority = (jobreport.getJobPriority() == null)
? JobPriority.DEFAULT
: fromYarnPriority(jobreport.getJobPriority().getPriority());
JobStatus jobStatus = new org.apache.hadoop.mapred.JobStatus(
fromYarn(jobreport.getJobId()), jobreport.getSetupProgress(), jobreport
.getMapProgress(), jobreport.getReduceProgress(), jobreport
.getCleanupProgress(), fromYarn(jobreport.getJobState()),
jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport
.getJobFile(), trackingUrl, jobreport.isUber());
fromYarn(jobreport.getJobId()), jobreport.getSetupProgress(),
jobreport.getMapProgress(), jobreport.getReduceProgress(),
jobreport.getCleanupProgress(), fromYarn(jobreport.getJobState()),
jobPriority, jobreport.getUser(), jobreport.getJobName(),
jobreport.getJobFile(), trackingUrl, jobreport.isUber());
jobStatus.setStartTime(jobreport.getStartTime());
jobStatus.setFinishTime(jobreport.getFinishTime());
jobStatus.setFailureInfo(jobreport.getDiagnostics());
return jobStatus;
}
private static JobPriority fromYarnPriority(int priority) {
switch (priority) {
case 5 :
return JobPriority.VERY_HIGH;
case 4 :
return JobPriority.HIGH;
case 3 :
return JobPriority.NORMAL;
case 2 :
return JobPriority.LOW;
case 1 :
return JobPriority.VERY_LOW;
case 0 :
return JobPriority.DEFAULT;
}
return JobPriority.UNDEFINED_PRIORITY;
}
public static org.apache.hadoop.mapreduce.QueueState fromYarn(
QueueState state) {
org.apache.hadoop.mapreduce.QueueState qState =

View File

@ -20,6 +20,8 @@
import java.util.List;
import org.apache.hadoop.yarn.api.records.Priority;
public interface JobReport {
public abstract JobId getJobId();
public abstract JobState getJobState();
@ -37,6 +39,7 @@ public interface JobReport {
public abstract String getJobFile();
public abstract List<AMInfo> getAMInfos();
public abstract boolean isUber();
public abstract Priority getJobPriority();
public abstract void setJobId(JobId jobId);
public abstract void setJobState(JobState jobState);
@ -54,4 +57,5 @@ public interface JobReport {
public abstract void setJobFile(String jobFile);
public abstract void setAMInfos(List<AMInfo> amInfos);
public abstract void setIsUber(boolean isUber);
public abstract void setJobPriority(Priority priority);
}

View File

@ -32,7 +32,10 @@
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProtoOrBuilder;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobStateProto;
import org.apache.hadoop.mapreduce.v2.util.MRProtoUtils;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
@ -44,7 +47,7 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements
private JobId jobId = null;
private List<AMInfo> amInfos = null;
private Priority jobPriority = null;
public JobReportPBImpl() {
builder = JobReportProto.newBuilder();
@ -69,6 +72,9 @@ private synchronized void mergeLocalToBuilder() {
if (this.amInfos != null) {
addAMInfosToProto();
}
if (this.jobPriority != null) {
builder.setJobPriority(convertToProtoFormat(this.jobPriority));
}
}
private synchronized void mergeLocalToProto() {
@ -333,6 +339,14 @@ private JobState convertFromProtoFormat(JobStateProto e) {
return MRProtoUtils.convertFromProtoFormat(e);
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}
private PriorityProto convertToProtoFormat(Priority t) {
return ((PriorityPBImpl)t).getProto();
}
@Override
public synchronized boolean isUber() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
@ -344,4 +358,26 @@ public synchronized void setIsUber(boolean isUber) {
maybeInitBuilder();
builder.setIsUber(isUber);
}
@Override
public synchronized Priority getJobPriority() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
if (this.jobPriority != null) {
return this.jobPriority;
}
if (!p.hasJobPriority()) {
return null;
}
this.jobPriority = convertFromProtoFormat(p.getJobPriority());
return this.jobPriority;
}
@Override
public synchronized void setJobPriority(Priority priority) {
maybeInitBuilder();
if (priority == null) {
builder.clearJobPriority();
}
this.jobPriority = priority;
}
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.util.Records;
public class MRBuilderUtils {
@ -62,11 +63,22 @@ public static TaskAttemptId newTaskAttemptId(TaskId taskId, int attemptId) {
return taskAttemptId;
}
public static JobReport newJobReport(JobId jobId, String jobName,
String userName, JobState state, long submitTime, long startTime,
long finishTime, float setupProgress, float mapProgress,
float reduceProgress, float cleanupProgress, String jobFile,
List<AMInfo> amInfos, boolean isUber, String diagnostics) {
return newJobReport(jobId, jobName, userName, state, submitTime, startTime,
finishTime, setupProgress, mapProgress, reduceProgress,
cleanupProgress, jobFile, amInfos, isUber, diagnostics,
Priority.newInstance(0));
}
public static JobReport newJobReport(JobId jobId, String jobName,
String userName, JobState state, long submitTime, long startTime, long finishTime,
float setupProgress, float mapProgress, float reduceProgress,
float cleanupProgress, String jobFile, List<AMInfo> amInfos,
boolean isUber, String diagnostics) {
boolean isUber, String diagnostics, Priority priority) {
JobReport report = Records.newRecord(JobReport.class);
report.setJobId(jobId);
report.setJobName(jobName);
@ -83,6 +95,7 @@ public static JobReport newJobReport(JobId jobId, String jobName,
report.setAMInfos(amInfos);
report.setIsUber(isUber);
report.setDiagnostics(diagnostics);
report.setJobPriority(priority);
return report;
}

View File

@ -146,6 +146,7 @@ message JobReportProto {
repeated AMInfoProto am_infos = 14;
optional int64 submit_time = 15;
optional bool is_uber = 16 [default = false];
optional hadoop.yarn.PriorityProto jobPriority = 17;
}
message AMInfoProto {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce;
import org.apache.hadoop.util.StringUtils;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -35,6 +36,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -202,9 +204,11 @@ public void testFromYarnJobReport() throws Exception {
jobReport.setStartTime(jobStartTime);
jobReport.setFinishTime(jobFinishTime);
jobReport.setUser("TestTypeConverter-user");
jobReport.setJobPriority(Priority.newInstance(0));
JobStatus jobStatus = TypeConverter.fromYarn(jobReport, "dummy-jobfile");
Assert.assertEquals(jobStartTime, jobStatus.getStartTime());
Assert.assertEquals(jobFinishTime, jobStatus.getFinishTime());
Assert.assertEquals(state.toString(), jobStatus.getState().toString());
Assert.assertEquals(JobPriority.DEFAULT, jobStatus.getPriority());
}
}

View File

@ -31,6 +31,7 @@ public enum JobPriority {
HIGH,
NORMAL,
LOW,
VERY_LOW;
VERY_LOW,
DEFAULT,
UNDEFINED_PRIORITY;
}

View File

@ -31,5 +31,7 @@ public enum JobPriority {
HIGH,
NORMAL,
LOW,
VERY_LOW;
VERY_LOW,
DEFAULT,
UNDEFINED_PRIORITY;
}

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Records;
@ -481,4 +482,10 @@ public List<AMInfo> getAMInfos() {
public void setQueueName(String queueName) {
throw new UnsupportedOperationException("Can't set job's queue name in history");
}
@Override
public void setJobPriority(Priority priority) {
throw new UnsupportedOperationException(
"Can't set job's priority in history");
}
}

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -196,4 +197,10 @@ public void setQueueName(String queueName) {
throw new UnsupportedOperationException("Can't set job's queue name in history");
}
@Override
public void setJobPriority(Priority priority) {
throw new UnsupportedOperationException(
"Can't set job's priority in history");
}
}

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.junit.Before;
import org.junit.Test;
@ -419,5 +420,9 @@ public boolean checkAccess(UserGroupInformation callerUGI,
@Override
public void setQueueName(String queueName) {
}
@Override
public void setJobPriority(Priority priority) {
}
}
}