From cf953b6258b036fc482456b4591cfb98054f48f2 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 29 Oct 2015 18:05:01 +0000 Subject: [PATCH] MAPREDUCE-6515. Update Application priority in AM side from AM-RM heartbeat. Contributed by Sunil G --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/v2/app/job/Job.java | 2 + .../mapreduce/v2/app/job/impl/JobImpl.java | 13 +++++- .../v2/app/rm/RMContainerAllocator.java | 9 ++++ .../hadoop/mapreduce/v2/app/MockJobs.java | 6 +++ .../v2/app/TestRuntimeEstimators.java | 6 +++ .../v2/app/job/impl/TestJobImpl.java | 35 +++++++++++++++ .../v2/app/rm/TestRMContainerAllocator.java | 2 + .../hadoop/mapreduce/TypeConverter.java | 34 +++++++++++--- .../mapreduce/v2/api/records/JobReport.java | 4 ++ .../api/records/impl/pb/JobReportPBImpl.java | 44 +++++++++++++++++-- .../mapreduce/v2/util/MRBuilderUtils.java | 15 ++++++- .../src/main/proto/mr_protos.proto | 1 + .../hadoop/mapreduce/TestTypeConverter.java | 8 +++- .../org/apache/hadoop/mapred/JobPriority.java | 5 ++- .../apache/hadoop/mapreduce/JobPriority.java | 4 +- .../hadoop/mapreduce/v2/hs/CompletedJob.java | 7 +++ .../hadoop/mapreduce/v2/hs/PartialJob.java | 7 +++ .../v2/hs/webapp/TestHsWebServicesAcls.java | 5 +++ 19 files changed, 191 insertions(+), 19 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 66a45fb6730..42ff113ecd4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -620,6 +620,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6508. TestNetworkedJob fails consistently due to delegation token changes on RM. (Akira AJISAKA via junping_du) + MAPREDUCE-6515. Update Application priority in AM side from AM-RM heartbeat + (Sunil G via jlowe) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java index a40e5d2ddc6..77388108b72 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java @@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; /** @@ -100,4 +101,5 @@ public interface Job { boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation); public void setQueueName(String queueName); + public void setJobPriority(Priority priority); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index fc9a3a5e293..5ed07622efe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -120,6 +120,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -653,6 +654,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private JobState lastNonFinalState = JobState.NEW; + private volatile Priority jobPriority = Priority.newInstance(0); + public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, @@ -878,7 +881,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, reporterUserName, state, appSubmitTime, startTime, finishTime, setupProgress, this.mapProgress, this.reduceProgress, - cleanupProgress, jobFile, amInfos, isUber, diagsb.toString()); + cleanupProgress, jobFile, amInfos, isUber, diagsb.toString(), + jobPriority); return report; } finally { readLock.unlock(); @@ -2166,7 +2170,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } } } - + private static class InternalTerminationTransition implements SingleArcTransition { JobStateInternal terminationState = null; @@ -2219,4 +2223,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, public int getMaxFetchFailuresNotifications() { return maxFetchFailuresNotifications; } + + @Override + public void setJobPriority(Priority priority) { + this.jobPriority = priority; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index bf9b1f8772b..496886e6fbc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -797,6 +797,7 @@ public class RMContainerAllocator extends RMContainerRequestor computeIgnoreBlacklisting(); handleUpdatedNodes(response); + handleJobPriorityChange(response); for (ContainerStatus cont : finishedContainers) { LOG.info("Received completed container " + cont.getContainerId()); @@ -921,6 +922,14 @@ public class RMContainerAllocator extends RMContainerRequestor } } + private void handleJobPriorityChange(AllocateResponse response) { + Priority priorityFromResponse = Priority.newInstance(response + .getApplicationPriority().getPriority()); + + // Update the job priority to Job directly. + getJob().setJobPriority(priorityFromResponse); + } + @Private public Resource getResourceLimit() { Resource headRoom = getAvailableResources(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java index fd9c094901a..ccacf1c6e71 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.util.Records; import com.google.common.collect.Iterators; @@ -634,6 +635,11 @@ public class MockJobs extends MockApps { public void setQueueName(String queueName) { // do nothing } + + @Override + public void setJobPriority(Priority priority) { + // do nothing + } }; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 475cd1f5e8a..0b7d1b18aa8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -526,6 +527,11 @@ public class TestRuntimeEstimators { public void setQueueName(String queueName) { // do nothing } + + @Override + public void setJobPriority(Priority priority) { + // do nothing + } } /* diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 2af4380821b..9e1892046b6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -72,6 +72,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; @@ -92,6 +93,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -889,6 +891,39 @@ public class TestJobImpl { job.getDiagnostics().toString().contains(EXCEPTIONMSG)); } + @Test + public void testJobPriorityUpdate() throws Exception { + Configuration conf = new Configuration(); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + Priority submittedPriority = Priority.newInstance(5); + + AppContext mockContext = mock(AppContext.class); + when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false); + JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext); + + JobId jobId = job.getID(); + job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); + assertJobState(job, JobStateInternal.INITED); + job.handle(new JobStartEvent(jobId)); + assertJobState(job, JobStateInternal.SETUP); + // Update priority of job to 5, and it will be updated + job.setJobPriority(submittedPriority); + Assert.assertEquals(submittedPriority, job.getReport().getJobPriority()); + + job.handle(new JobSetupCompletedEvent(jobId)); + assertJobState(job, JobStateInternal.RUNNING); + + // Update priority of job to 8, and see whether its updated + Priority updatedPriority = Priority.newInstance(5); + job.setJobPriority(updatedPriority); + assertJobState(job, JobStateInternal.RUNNING); + Priority jobPriority = job.getReport().getJobPriority(); + Assert.assertNotNull(jobPriority); + + // Verify whether changed priority is same as what is set in Job. + Assert.assertEquals(updatedPriority, jobPriority); + } + private static CommitterEventHandler createCommitterEventHandler( Dispatcher dispatcher, OutputCommitter committer) { final SystemClock clock = new SystemClock(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index e4421a80d71..0a1043419d3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -2948,6 +2948,8 @@ public class TestRMContainerAllocator { Collections.emptyList(), Resource.newInstance(512000, 1024), null, 10, null, Collections.emptyList()); + // RM will always ensure that a default priority is sent to AM + response.setApplicationPriority(Priority.newInstance(0)); containersToComplete.clear(); containersToAllocate.clear(); return response; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index 5b8d3a79fec..88f61b0e4f9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -296,21 +296,41 @@ public class TypeConverter { } return yCntrs; } - + public static JobStatus fromYarn(JobReport jobreport, String trackingUrl) { - JobPriority jobPriority = JobPriority.NORMAL; + JobPriority jobPriority = (jobreport.getJobPriority() == null) + ? JobPriority.DEFAULT + : fromYarnPriority(jobreport.getJobPriority().getPriority()); JobStatus jobStatus = new org.apache.hadoop.mapred.JobStatus( - fromYarn(jobreport.getJobId()), jobreport.getSetupProgress(), jobreport - .getMapProgress(), jobreport.getReduceProgress(), jobreport - .getCleanupProgress(), fromYarn(jobreport.getJobState()), - jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport - .getJobFile(), trackingUrl, jobreport.isUber()); + fromYarn(jobreport.getJobId()), jobreport.getSetupProgress(), + jobreport.getMapProgress(), jobreport.getReduceProgress(), + jobreport.getCleanupProgress(), fromYarn(jobreport.getJobState()), + jobPriority, jobreport.getUser(), jobreport.getJobName(), + jobreport.getJobFile(), trackingUrl, jobreport.isUber()); jobStatus.setStartTime(jobreport.getStartTime()); jobStatus.setFinishTime(jobreport.getFinishTime()); jobStatus.setFailureInfo(jobreport.getDiagnostics()); return jobStatus; } + private static JobPriority fromYarnPriority(int priority) { + switch (priority) { + case 5 : + return JobPriority.VERY_HIGH; + case 4 : + return JobPriority.HIGH; + case 3 : + return JobPriority.NORMAL; + case 2 : + return JobPriority.LOW; + case 1 : + return JobPriority.VERY_LOW; + case 0 : + return JobPriority.DEFAULT; + } + return JobPriority.UNDEFINED_PRIORITY; + } + public static org.apache.hadoop.mapreduce.QueueState fromYarn( QueueState state) { org.apache.hadoop.mapreduce.QueueState qState = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java index b2f2cc1fc80..38dfcae21db 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java @@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.v2.api.records; import java.util.List; +import org.apache.hadoop.yarn.api.records.Priority; + public interface JobReport { public abstract JobId getJobId(); public abstract JobState getJobState(); @@ -37,6 +39,7 @@ public interface JobReport { public abstract String getJobFile(); public abstract List getAMInfos(); public abstract boolean isUber(); + public abstract Priority getJobPriority(); public abstract void setJobId(JobId jobId); public abstract void setJobState(JobState jobState); @@ -54,4 +57,5 @@ public interface JobReport { public abstract void setJobFile(String jobFile); public abstract void setAMInfos(List amInfos); public abstract void setIsUber(boolean isUber); + public abstract void setJobPriority(Priority priority); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java index 5c90942525e..f4cb0a6ee20 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java @@ -32,7 +32,10 @@ import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProto; import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProtoOrBuilder; import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobStateProto; import org.apache.hadoop.mapreduce.v2.util.MRProtoUtils; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; @@ -41,11 +44,11 @@ public class JobReportPBImpl extends ProtoBase implements JobReportProto proto = JobReportProto.getDefaultInstance(); JobReportProto.Builder builder = null; boolean viaProto = false; - + private JobId jobId = null; private List amInfos = null; - - + private Priority jobPriority = null; + public JobReportPBImpl() { builder = JobReportProto.newBuilder(); } @@ -69,6 +72,9 @@ public class JobReportPBImpl extends ProtoBase implements if (this.amInfos != null) { addAMInfosToProto(); } + if (this.jobPriority != null) { + builder.setJobPriority(convertToProtoFormat(this.jobPriority)); + } } private synchronized void mergeLocalToProto() { @@ -333,6 +339,14 @@ public class JobReportPBImpl extends ProtoBase implements return MRProtoUtils.convertFromProtoFormat(e); } + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { + return new PriorityPBImpl(p); + } + + private PriorityProto convertToProtoFormat(Priority t) { + return ((PriorityPBImpl)t).getProto(); + } + @Override public synchronized boolean isUber() { JobReportProtoOrBuilder p = viaProto ? proto : builder; @@ -344,4 +358,26 @@ public class JobReportPBImpl extends ProtoBase implements maybeInitBuilder(); builder.setIsUber(isUber); } -} + + @Override + public synchronized Priority getJobPriority() { + JobReportProtoOrBuilder p = viaProto ? proto : builder; + if (this.jobPriority != null) { + return this.jobPriority; + } + if (!p.hasJobPriority()) { + return null; + } + this.jobPriority = convertFromProtoFormat(p.getJobPriority()); + return this.jobPriority; + } + + @Override + public synchronized void setJobPriority(Priority priority) { + maybeInitBuilder(); + if (priority == null) { + builder.clearJobPriority(); + } + this.jobPriority = priority; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java index 95e0083ec2a..893f76a2c5b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.util.Records; public class MRBuilderUtils { @@ -62,11 +63,22 @@ public class MRBuilderUtils { return taskAttemptId; } + public static JobReport newJobReport(JobId jobId, String jobName, + String userName, JobState state, long submitTime, long startTime, + long finishTime, float setupProgress, float mapProgress, + float reduceProgress, float cleanupProgress, String jobFile, + List amInfos, boolean isUber, String diagnostics) { + return newJobReport(jobId, jobName, userName, state, submitTime, startTime, + finishTime, setupProgress, mapProgress, reduceProgress, + cleanupProgress, jobFile, amInfos, isUber, diagnostics, + Priority.newInstance(0)); + } + public static JobReport newJobReport(JobId jobId, String jobName, String userName, JobState state, long submitTime, long startTime, long finishTime, float setupProgress, float mapProgress, float reduceProgress, float cleanupProgress, String jobFile, List amInfos, - boolean isUber, String diagnostics) { + boolean isUber, String diagnostics, Priority priority) { JobReport report = Records.newRecord(JobReport.class); report.setJobId(jobId); report.setJobName(jobName); @@ -83,6 +95,7 @@ public class MRBuilderUtils { report.setAMInfos(amInfos); report.setIsUber(isUber); report.setDiagnostics(diagnostics); + report.setJobPriority(priority); return report; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto index b74eef63578..5a4bac11a7b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto @@ -146,6 +146,7 @@ message JobReportProto { repeated AMInfoProto am_infos = 14; optional int64 submit_time = 15; optional bool is_uber = 16 [default = false]; + optional hadoop.yarn.PriorityProto jobPriority = 17; } message AMInfoProto { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java index e36efec5c82..60ce1701772 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce; import org.apache.hadoop.util.StringUtils; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -201,10 +203,12 @@ public class TestTypeConverter { jobReport.setJobState(state); jobReport.setStartTime(jobStartTime); jobReport.setFinishTime(jobFinishTime); - jobReport.setUser("TestTypeConverter-user"); + jobReport.setUser("TestTypeConverter-user"); + jobReport.setJobPriority(Priority.newInstance(0)); JobStatus jobStatus = TypeConverter.fromYarn(jobReport, "dummy-jobfile"); Assert.assertEquals(jobStartTime, jobStatus.getStartTime()); Assert.assertEquals(jobFinishTime, jobStatus.getFinishTime()); Assert.assertEquals(state.toString(), jobStatus.getState().toString()); - } + Assert.assertEquals(JobPriority.DEFAULT, jobStatus.getPriority()); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java index 376d8a410f0..b76d46d5179 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java @@ -31,6 +31,7 @@ public enum JobPriority { HIGH, NORMAL, LOW, - VERY_LOW; - + VERY_LOW, + DEFAULT, + UNDEFINED_PRIORITY; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java index b0d232ac28e..71785686e44 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java @@ -31,5 +31,7 @@ public enum JobPriority { HIGH, NORMAL, LOW, - VERY_LOW; + VERY_LOW, + DEFAULT, + UNDEFINED_PRIORITY; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index 0f1f39150ff..3c12bdf2517 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -64,6 +64,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.Records; @@ -481,4 +482,10 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job public void setQueueName(String queueName) { throw new UnsupportedOperationException("Can't set job's queue name in history"); } + + @Override + public void setJobPriority(Priority priority) { + throw new UnsupportedOperationException( + "Can't set job's priority in history"); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java index 0725f465822..b3b181ca336 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java @@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -196,4 +197,10 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job { throw new UnsupportedOperationException("Can't set job's queue name in history"); } + @Override + public void setJobPriority(Priority priority) { + throw new UnsupportedOperationException( + "Can't set job's priority in history"); + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAcls.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAcls.java index fd87b9494b9..14961d20340 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAcls.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAcls.java @@ -58,6 +58,7 @@ import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.webapp.WebApp; import org.junit.Before; import org.junit.Test; @@ -419,5 +420,9 @@ public class TestHsWebServicesAcls { @Override public void setQueueName(String queueName) { } + + @Override + public void setJobPriority(Priority priority) { + } } }