MAPREDUCE-6515. Update Application priority in AM side from AM-RM heartbeat. Contributed by Sunil G

(cherry picked from commit cf953b6258)
This commit is contained in:
Jason Lowe 2015-10-29 18:05:01 +00:00
parent c75d8b164f
commit 145058003a
19 changed files with 191 additions and 19 deletions

View File

@ -339,6 +339,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6508. TestNetworkedJob fails consistently due to delegation MAPREDUCE-6508. TestNetworkedJob fails consistently due to delegation
token changes on RM. (Akira AJISAKA via junping_du) token changes on RM. (Akira AJISAKA via junping_du)
MAPREDUCE-6515. Update Application priority in AM side from AM-RM heartbeat
(Sunil G via jlowe)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
/** /**
@ -100,4 +101,5 @@ public interface Job {
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation); boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
public void setQueueName(String queueName); public void setQueueName(String queueName);
public void setJobPriority(Priority priority);
} }

View File

@ -120,6 +120,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@ -653,6 +654,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private JobState lastNonFinalState = JobState.NEW; private JobState lastNonFinalState = JobState.NEW;
private volatile Priority jobPriority = Priority.newInstance(0);
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler, Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, TaskAttemptListener taskAttemptListener,
@ -878,7 +881,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
reporterUserName, reporterUserName,
state, appSubmitTime, startTime, finishTime, setupProgress, state, appSubmitTime, startTime, finishTime, setupProgress,
this.mapProgress, this.reduceProgress, this.mapProgress, this.reduceProgress,
cleanupProgress, jobFile, amInfos, isUber, diagsb.toString()); cleanupProgress, jobFile, amInfos, isUber, diagsb.toString(),
jobPriority);
return report; return report;
} finally { } finally {
readLock.unlock(); readLock.unlock();
@ -2164,7 +2168,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
} }
} }
} }
private static class InternalTerminationTransition implements private static class InternalTerminationTransition implements
SingleArcTransition<JobImpl, JobEvent> { SingleArcTransition<JobImpl, JobEvent> {
JobStateInternal terminationState = null; JobStateInternal terminationState = null;
@ -2217,4 +2221,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
public int getMaxFetchFailuresNotifications() { public int getMaxFetchFailuresNotifications() {
return maxFetchFailuresNotifications; return maxFetchFailuresNotifications;
} }
@Override
public void setJobPriority(Priority priority) {
this.jobPriority = priority;
}
} }

View File

@ -779,6 +779,7 @@ public class RMContainerAllocator extends RMContainerRequestor
computeIgnoreBlacklisting(); computeIgnoreBlacklisting();
handleUpdatedNodes(response); handleUpdatedNodes(response);
handleJobPriorityChange(response);
for (ContainerStatus cont : finishedContainers) { for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont.getContainerId()); LOG.info("Received completed container " + cont.getContainerId());
@ -901,6 +902,14 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
} }
private void handleJobPriorityChange(AllocateResponse response) {
Priority priorityFromResponse = Priority.newInstance(response
.getApplicationPriority().getPriority());
// Update the job priority to Job directly.
getJob().setJobPriority(priorityFromResponse);
}
@Private @Private
public Resource getResourceLimit() { public Resource getResourceLimit() {
Resource headRoom = getAvailableResources(); Resource headRoom = getAvailableResources();

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
@ -634,6 +635,11 @@ public class MockJobs extends MockApps {
public void setQueueName(String queueName) { public void setQueueName(String queueName) {
// do nothing // do nothing
} }
@Override
public void setJobPriority(Priority priority) {
// do nothing
}
}; };
} }

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -526,6 +527,11 @@ public class TestRuntimeEstimators {
public void setQueueName(String queueName) { public void setQueueName(String queueName) {
// do nothing // do nothing
} }
@Override
public void setJobPriority(Priority priority) {
// do nothing
}
} }
/* /*

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
@ -92,6 +93,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
@ -889,6 +891,39 @@ public class TestJobImpl {
job.getDiagnostics().toString().contains(EXCEPTIONMSG)); job.getDiagnostics().toString().contains(EXCEPTIONMSG));
} }
@Test
public void testJobPriorityUpdate() throws Exception {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
Priority submittedPriority = Priority.newInstance(5);
AppContext mockContext = mock(AppContext.class);
when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.SETUP);
// Update priority of job to 5, and it will be updated
job.setJobPriority(submittedPriority);
Assert.assertEquals(submittedPriority, job.getReport().getJobPriority());
job.handle(new JobSetupCompletedEvent(jobId));
assertJobState(job, JobStateInternal.RUNNING);
// Update priority of job to 8, and see whether its updated
Priority updatedPriority = Priority.newInstance(5);
job.setJobPriority(updatedPriority);
assertJobState(job, JobStateInternal.RUNNING);
Priority jobPriority = job.getReport().getJobPriority();
Assert.assertNotNull(jobPriority);
// Verify whether changed priority is same as what is set in Job.
Assert.assertEquals(updatedPriority, jobPriority);
}
private static CommitterEventHandler createCommitterEventHandler( private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) { Dispatcher dispatcher, OutputCommitter committer) {
final SystemClock clock = new SystemClock(); final SystemClock clock = new SystemClock();

View File

@ -2943,6 +2943,8 @@ public class TestRMContainerAllocator {
Collections.<NodeReport>emptyList(), Collections.<NodeReport>emptyList(),
Resource.newInstance(512000, 1024), null, 10, null, Resource.newInstance(512000, 1024), null, 10, null,
Collections.<NMToken>emptyList()); Collections.<NMToken>emptyList());
// RM will always ensure that a default priority is sent to AM
response.setApplicationPriority(Priority.newInstance(0));
containersToComplete.clear(); containersToComplete.clear();
containersToAllocate.clear(); containersToAllocate.clear();
return response; return response;

View File

@ -296,21 +296,41 @@ public class TypeConverter {
} }
return yCntrs; return yCntrs;
} }
public static JobStatus fromYarn(JobReport jobreport, String trackingUrl) { public static JobStatus fromYarn(JobReport jobreport, String trackingUrl) {
JobPriority jobPriority = JobPriority.NORMAL; JobPriority jobPriority = (jobreport.getJobPriority() == null)
? JobPriority.DEFAULT
: fromYarnPriority(jobreport.getJobPriority().getPriority());
JobStatus jobStatus = new org.apache.hadoop.mapred.JobStatus( JobStatus jobStatus = new org.apache.hadoop.mapred.JobStatus(
fromYarn(jobreport.getJobId()), jobreport.getSetupProgress(), jobreport fromYarn(jobreport.getJobId()), jobreport.getSetupProgress(),
.getMapProgress(), jobreport.getReduceProgress(), jobreport jobreport.getMapProgress(), jobreport.getReduceProgress(),
.getCleanupProgress(), fromYarn(jobreport.getJobState()), jobreport.getCleanupProgress(), fromYarn(jobreport.getJobState()),
jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport jobPriority, jobreport.getUser(), jobreport.getJobName(),
.getJobFile(), trackingUrl, jobreport.isUber()); jobreport.getJobFile(), trackingUrl, jobreport.isUber());
jobStatus.setStartTime(jobreport.getStartTime()); jobStatus.setStartTime(jobreport.getStartTime());
jobStatus.setFinishTime(jobreport.getFinishTime()); jobStatus.setFinishTime(jobreport.getFinishTime());
jobStatus.setFailureInfo(jobreport.getDiagnostics()); jobStatus.setFailureInfo(jobreport.getDiagnostics());
return jobStatus; return jobStatus;
} }
private static JobPriority fromYarnPriority(int priority) {
switch (priority) {
case 5 :
return JobPriority.VERY_HIGH;
case 4 :
return JobPriority.HIGH;
case 3 :
return JobPriority.NORMAL;
case 2 :
return JobPriority.LOW;
case 1 :
return JobPriority.VERY_LOW;
case 0 :
return JobPriority.DEFAULT;
}
return JobPriority.UNDEFINED_PRIORITY;
}
public static org.apache.hadoop.mapreduce.QueueState fromYarn( public static org.apache.hadoop.mapreduce.QueueState fromYarn(
QueueState state) { QueueState state) {
org.apache.hadoop.mapreduce.QueueState qState = org.apache.hadoop.mapreduce.QueueState qState =

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.v2.api.records;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.Priority;
public interface JobReport { public interface JobReport {
public abstract JobId getJobId(); public abstract JobId getJobId();
public abstract JobState getJobState(); public abstract JobState getJobState();
@ -37,6 +39,7 @@ public interface JobReport {
public abstract String getJobFile(); public abstract String getJobFile();
public abstract List<AMInfo> getAMInfos(); public abstract List<AMInfo> getAMInfos();
public abstract boolean isUber(); public abstract boolean isUber();
public abstract Priority getJobPriority();
public abstract void setJobId(JobId jobId); public abstract void setJobId(JobId jobId);
public abstract void setJobState(JobState jobState); public abstract void setJobState(JobState jobState);
@ -54,4 +57,5 @@ public interface JobReport {
public abstract void setJobFile(String jobFile); public abstract void setJobFile(String jobFile);
public abstract void setAMInfos(List<AMInfo> amInfos); public abstract void setAMInfos(List<AMInfo> amInfos);
public abstract void setIsUber(boolean isUber); public abstract void setIsUber(boolean isUber);
public abstract void setJobPriority(Priority priority);
} }

View File

@ -32,7 +32,10 @@ import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProtoOrBuilder; import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProtoOrBuilder;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobStateProto; import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobStateProto;
import org.apache.hadoop.mapreduce.v2.util.MRProtoUtils; import org.apache.hadoop.mapreduce.v2.util.MRProtoUtils;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
@ -41,11 +44,11 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements
JobReportProto proto = JobReportProto.getDefaultInstance(); JobReportProto proto = JobReportProto.getDefaultInstance();
JobReportProto.Builder builder = null; JobReportProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private JobId jobId = null; private JobId jobId = null;
private List<AMInfo> amInfos = null; private List<AMInfo> amInfos = null;
private Priority jobPriority = null;
public JobReportPBImpl() { public JobReportPBImpl() {
builder = JobReportProto.newBuilder(); builder = JobReportProto.newBuilder();
} }
@ -69,6 +72,9 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements
if (this.amInfos != null) { if (this.amInfos != null) {
addAMInfosToProto(); addAMInfosToProto();
} }
if (this.jobPriority != null) {
builder.setJobPriority(convertToProtoFormat(this.jobPriority));
}
} }
private synchronized void mergeLocalToProto() { private synchronized void mergeLocalToProto() {
@ -333,6 +339,14 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements
return MRProtoUtils.convertFromProtoFormat(e); return MRProtoUtils.convertFromProtoFormat(e);
} }
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}
private PriorityProto convertToProtoFormat(Priority t) {
return ((PriorityPBImpl)t).getProto();
}
@Override @Override
public synchronized boolean isUber() { public synchronized boolean isUber() {
JobReportProtoOrBuilder p = viaProto ? proto : builder; JobReportProtoOrBuilder p = viaProto ? proto : builder;
@ -344,4 +358,26 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements
maybeInitBuilder(); maybeInitBuilder();
builder.setIsUber(isUber); builder.setIsUber(isUber);
} }
}
@Override
public synchronized Priority getJobPriority() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
if (this.jobPriority != null) {
return this.jobPriority;
}
if (!p.hasJobPriority()) {
return null;
}
this.jobPriority = convertFromProtoFormat(p.getJobPriority());
return this.jobPriority;
}
@Override
public synchronized void setJobPriority(Priority priority) {
maybeInitBuilder();
if (priority == null) {
builder.clearJobPriority();
}
this.jobPriority = priority;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
public class MRBuilderUtils { public class MRBuilderUtils {
@ -62,11 +63,22 @@ public class MRBuilderUtils {
return taskAttemptId; return taskAttemptId;
} }
public static JobReport newJobReport(JobId jobId, String jobName,
String userName, JobState state, long submitTime, long startTime,
long finishTime, float setupProgress, float mapProgress,
float reduceProgress, float cleanupProgress, String jobFile,
List<AMInfo> amInfos, boolean isUber, String diagnostics) {
return newJobReport(jobId, jobName, userName, state, submitTime, startTime,
finishTime, setupProgress, mapProgress, reduceProgress,
cleanupProgress, jobFile, amInfos, isUber, diagnostics,
Priority.newInstance(0));
}
public static JobReport newJobReport(JobId jobId, String jobName, public static JobReport newJobReport(JobId jobId, String jobName,
String userName, JobState state, long submitTime, long startTime, long finishTime, String userName, JobState state, long submitTime, long startTime, long finishTime,
float setupProgress, float mapProgress, float reduceProgress, float setupProgress, float mapProgress, float reduceProgress,
float cleanupProgress, String jobFile, List<AMInfo> amInfos, float cleanupProgress, String jobFile, List<AMInfo> amInfos,
boolean isUber, String diagnostics) { boolean isUber, String diagnostics, Priority priority) {
JobReport report = Records.newRecord(JobReport.class); JobReport report = Records.newRecord(JobReport.class);
report.setJobId(jobId); report.setJobId(jobId);
report.setJobName(jobName); report.setJobName(jobName);
@ -83,6 +95,7 @@ public class MRBuilderUtils {
report.setAMInfos(amInfos); report.setAMInfos(amInfos);
report.setIsUber(isUber); report.setIsUber(isUber);
report.setDiagnostics(diagnostics); report.setDiagnostics(diagnostics);
report.setJobPriority(priority);
return report; return report;
} }

View File

@ -146,6 +146,7 @@ message JobReportProto {
repeated AMInfoProto am_infos = 14; repeated AMInfoProto am_infos = 14;
optional int64 submit_time = 15; optional int64 submit_time = 15;
optional bool is_uber = 16 [default = false]; optional bool is_uber = 16 [default = false];
optional hadoop.yarn.PriorityProto jobPriority = 17;
} }
message AMInfoProto { message AMInfoProto {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce; package org.apache.hadoop.mapreduce;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -201,10 +203,12 @@ public class TestTypeConverter {
jobReport.setJobState(state); jobReport.setJobState(state);
jobReport.setStartTime(jobStartTime); jobReport.setStartTime(jobStartTime);
jobReport.setFinishTime(jobFinishTime); jobReport.setFinishTime(jobFinishTime);
jobReport.setUser("TestTypeConverter-user"); jobReport.setUser("TestTypeConverter-user");
jobReport.setJobPriority(Priority.newInstance(0));
JobStatus jobStatus = TypeConverter.fromYarn(jobReport, "dummy-jobfile"); JobStatus jobStatus = TypeConverter.fromYarn(jobReport, "dummy-jobfile");
Assert.assertEquals(jobStartTime, jobStatus.getStartTime()); Assert.assertEquals(jobStartTime, jobStatus.getStartTime());
Assert.assertEquals(jobFinishTime, jobStatus.getFinishTime()); Assert.assertEquals(jobFinishTime, jobStatus.getFinishTime());
Assert.assertEquals(state.toString(), jobStatus.getState().toString()); Assert.assertEquals(state.toString(), jobStatus.getState().toString());
} Assert.assertEquals(JobPriority.DEFAULT, jobStatus.getPriority());
}
} }

View File

@ -31,6 +31,7 @@ public enum JobPriority {
HIGH, HIGH,
NORMAL, NORMAL,
LOW, LOW,
VERY_LOW; VERY_LOW,
DEFAULT,
UNDEFINED_PRIORITY;
} }

View File

@ -31,5 +31,7 @@ public enum JobPriority {
HIGH, HIGH,
NORMAL, NORMAL,
LOW, LOW,
VERY_LOW; VERY_LOW,
DEFAULT,
UNDEFINED_PRIORITY;
} }

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil; import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -481,4 +482,10 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
public void setQueueName(String queueName) { public void setQueueName(String queueName) {
throw new UnsupportedOperationException("Can't set job's queue name in history"); throw new UnsupportedOperationException("Can't set job's queue name in history");
} }
@Override
public void setJobPriority(Priority priority) {
throw new UnsupportedOperationException(
"Can't set job's priority in history");
}
} }

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -196,4 +197,10 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
throw new UnsupportedOperationException("Can't set job's queue name in history"); throw new UnsupportedOperationException("Can't set job's queue name in history");
} }
@Override
public void setJobPriority(Priority priority) {
throw new UnsupportedOperationException(
"Can't set job's priority in history");
}
} }

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApp;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -419,5 +420,9 @@ public class TestHsWebServicesAcls {
@Override @Override
public void setQueueName(String queueName) { public void setQueueName(String queueName) {
} }
@Override
public void setJobPriority(Priority priority) {
}
} }
} }