From 3dd113fa7c07b65cfcdf960dcce1ab8a526557b5 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Tue, 21 Jul 2015 09:56:59 -0700 Subject: [PATCH] YARN-2003. Support for Application priority : Changes in RM and Capacity Scheduler. (Sunil G via wangda) (cherry picked from commit c39ca541f498712133890961598bbff50d89d68b) --- .../scheduler/ResourceSchedulerWrapper.java | 10 + hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 5 + .../server/resourcemanager/RMAppManager.java | 20 +- .../resourcemanager/rmapp/RMAppImpl.java | 15 +- .../scheduler/AbstractYarnScheduler.java | 10 + .../resourcemanager/scheduler/Queue.java | 8 + .../scheduler/SchedulerApplication.java | 22 ++ .../SchedulerApplicationAttempt.java | 15 +- .../scheduler/YarnScheduler.java | 20 + .../scheduler/capacity/AbstractCSQueue.java | 7 + .../scheduler/capacity/CapacityScheduler.java | 73 +++- .../CapacitySchedulerConfiguration.java | 13 + .../scheduler/capacity/LeafQueue.java | 19 +- .../common/fica/FiCaSchedulerApp.java | 8 + .../event/AppAddedSchedulerEvent.java | 28 +- .../scheduler/fair/FSQueue.java | 6 + .../scheduler/fifo/FifoScheduler.java | 6 + .../scheduler/policy/FifoComparator.java | 11 +- .../scheduler/policy/SchedulableEntity.java | 5 + .../yarn/server/resourcemanager/MockRM.java | 31 +- .../resourcemanager/TestAppManager.java | 1 + .../TestWorkPreservingRMRestart.java | 2 +- ...cityPreemptionPolicyForNodePartitions.java | 1 + .../capacity/TestApplicationLimits.java | 5 +- .../capacity/TestApplicationPriority.java | 345 ++++++++++++++++++ .../capacity/TestCapacityScheduler.java | 5 + .../policy/MockSchedulableEntity.java | 13 +- .../security/TestDelegationTokenRenewer.java | 10 +- .../TestRMWebServicesAppsModification.java | 2 +- 30 files changed, 664 insertions(+), 55 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 5a2f50d9afc..0cfc38d8364 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -950,5 +951,14 @@ protected void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { // do nothing } + + @Override + public Priority checkAndGetApplicationPriority(Priority priority, + String user, String queueName, ApplicationId applicationId) + throws YarnException { + // TODO Dummy implementation. + return Priority.newInstance(0); + } + } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 56a4f159123..7174c85119f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -86,6 +86,9 @@ Release 2.8.0 - UNRELEASED YARN-3116. RM notifies NM whether a container is an AM container or normal task container. (Giovanni Matteo Fumarola via zjshen) + YARN-2003. Support for Application priority : Changes in RM and Capacity + Scheduler. (Sunil G via wangda) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index b08e2dd8558..cdfb393dc1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1928,6 +1928,11 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE = CENTALIZED_NODELABEL_CONFIGURATION_TYPE; + public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY = + YARN_PREFIX + "cluster.max-application-priority"; + + public static final int DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY = 0; + @Private public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { return DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 2d9431d5297..6fd183875b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -329,14 +330,19 @@ private RMAppImpl createAndPopulateNewRMApp( ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext, isRecovery); + // Verify and get the update application priority and set back to + // submissionContext + Priority appPriority = rmContext.getScheduler() + .checkAndGetApplicationPriority(submissionContext.getPriority(), user, + submissionContext.getQueue(), applicationId); + submissionContext.setPriority(appPriority); + // Create RMApp - RMAppImpl application = - new RMAppImpl(applicationId, rmContext, this.conf, - submissionContext.getApplicationName(), user, - submissionContext.getQueue(), - submissionContext, this.scheduler, this.masterService, - submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags(), amReq); + RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, + submissionContext.getApplicationName(), user, + submissionContext.getQueue(), submissionContext, this.scheduler, + this.masterService, submitTime, submissionContext.getApplicationType(), + submissionContext.getApplicationTags(), amReq); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 62d5555358f..d480c24237d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -924,17 +924,15 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { - app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, - app.submissionContext.getQueue(), app.user, - app.submissionContext.getReservationID())); + app.scheduler.handle(new AppAddedSchedulerEvent(app.user, + app.submissionContext, false)); return RMAppState.SUBMITTED; } // Add application to scheduler synchronously to guarantee scheduler // knows applications before AM or NM re-registers. - app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, - app.submissionContext.getQueue(), app.user, true, - app.submissionContext.getReservationID())); + app.scheduler.handle(new AppAddedSchedulerEvent(app.user, + app.submissionContext, true)); // recover attempts app.recoverAppAttempts(); @@ -960,9 +958,8 @@ private static final class AddApplicationToSchedulerTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, - app.submissionContext.getQueue(), app.user, - app.submissionContext.getReservationID())); + app.handler.handle(new AppAddedSchedulerEvent(app.user, + app.submissionContext, false)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index aad76fd7ba9..094f77d5067 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -691,4 +692,13 @@ public List getPendingResourceRequestsForAttempt( } return null; } + + @Override + public Priority checkAndGetApplicationPriority(Priority priorityFromContext, + String user, String queueName, ApplicationId applicationId) + throws YarnException { + // Dummy Implementation till Application Priority changes are done in + // specific scheduler. + return Priority.newInstance(0); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index 02003c145ae..8646381febc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -110,4 +111,11 @@ public void recoverContainer(Resource clusterResource, * new resource asked */ public void decPendingResource(String nodeLabel, Resource resourceToDec); + + /** + * Get the Default Application Priority for this queue + * + * @return default application priority + */ + public Priority getDefaultApplicationPriority(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index 2c788aaf1fb..519de9896a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @Private @@ -28,10 +29,18 @@ public class SchedulerApplication { private Queue queue; private final String user; private T currentAttempt; + private volatile Priority priority; public SchedulerApplication(Queue queue, String user) { this.queue = queue; this.user = user; + this.priority = null; + } + + public SchedulerApplication(Queue queue, String user, Priority priority) { + this.queue = queue; + this.user = user; + this.priority = priority; } public Queue getQueue() { @@ -58,4 +67,17 @@ public void stop(RMAppState rmAppFinalState) { queue.getMetrics().finishApp(user, rmAppFinalState); } + public Priority getPriority() { + return priority; + } + + public void setPriority(Priority priority) { + this.priority = priority; + + // Also set priority in current running attempt + if (null != currentAttempt) { + currentAttempt.setPriority(priority); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 475f2c7fe3b..cf543bde96e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -97,7 +97,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private boolean unmanagedAM = true; private boolean amRunning = false; private LogAggregationContext logAggregationContext; - + + private Priority appPriority = null; + protected ResourceUsage attemptResourceUsage = new ResourceUsage(); private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); private AtomicLong firstContainerAllocatedTime = new AtomicLong(0); @@ -726,7 +728,16 @@ public boolean hasPendingResourceRequest(ResourceCalculator rc, public ResourceUsage getAppAttemptResourceUsage() { return this.attemptResourceUsage; } - + + @Override + public Priority getPriority() { + return appPriority; + } + + public void setPriority(Priority appPriority) { + this.appPriority = appPriority; + } + @Override public String getId() { return getApplicationId().toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index b99b2170d0a..f6295794c72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -286,4 +287,23 @@ void setEntitlement(String queue, QueueEntitlement entitlement) * @return an EnumSet containing the resource types */ public EnumSet getSchedulingResourceTypes(); + + /** + * + * Verify whether a submitted application priority is valid as per configured + * Queue + * + * @param priorityFromContext + * Submitted Application priority. + * @param user + * User who submitted the Application + * @param queueName + * Name of the Queue + * @param applicationId + * Application ID + * @return Updated Priority from scheduler + */ + public Priority checkAndGetApplicationPriority(Priority priorityFromContext, + String user, String queueName, ApplicationId applicationId) + throws YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index cd5bd8d0c52..7f8e164b1dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; @@ -574,4 +575,10 @@ public boolean accessibleToPartition(String nodePartition) { // sorry, you cannot access return false; } + + @Override + public Priority getDefaultApplicationPriority() { + // TODO add dummy implementation + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 559dfc6713b..5a20f8b2ff4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -159,6 +160,9 @@ public int compare(CSQueue q1, CSQueue q2) { new Comparator() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + if (!a1.getPriority().equals(a2.getPriority())) { + return a1.getPriority().compareTo(a2.getPriority()); + } return a1.getApplicationId().compareTo(a2.getApplicationId()); } }; @@ -226,6 +230,7 @@ public Configuration getConf() { private RMNodeLabelsManager labelManager; private SchedulerHealth schedulerHealth = new SchedulerHealth(); long lastNodeUpdateTime; + private Priority maxClusterLevelAppPriority; /** * EXPERT */ @@ -326,6 +331,9 @@ private synchronized void initScheduler(Configuration configuration) throws if (scheduleAsynchronously) { asyncSchedulerThread = new AsyncScheduleThread(this); } + maxClusterLevelAppPriority = Priority.newInstance(yarnConf.getInt( + YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, + YarnConfiguration.DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY)); LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + @@ -692,7 +700,7 @@ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user, boolean isAppRecovering) { + String queueName, String user, boolean isAppRecovering, Priority priority) { if (mappings != null && mappings.size() > 0) { try { @@ -761,7 +769,7 @@ private synchronized void addApplication(ApplicationId applicationId, // update the metrics queue.getMetrics().submitApp(user); SchedulerApplication application = - new SchedulerApplication(queue, user); + new SchedulerApplication(queue, user, priority); applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); @@ -783,9 +791,9 @@ private synchronized void addApplicationAttempt( applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); - FiCaSchedulerApp attempt = - new FiCaSchedulerApp(applicationAttemptId, application.getUser(), - queue, queue.getActiveUsersManager(), rmContext); + FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, + application.getUser(), queue, queue.getActiveUsersManager(), rmContext, + application.getPriority()); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt(application .getCurrentAppAttempt()); @@ -1307,7 +1315,8 @@ public void handle(SchedulerEvent event) { addApplication(appAddedEvent.getApplicationId(), queueName, appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + appAddedEvent.getIsAppRecovering(), + appAddedEvent.getApplicatonPriority()); } } break; @@ -1833,4 +1842,56 @@ public SchedulerHealth getSchedulerHealth() { private synchronized void setLastNodeUpdateTime(long time) { this.lastNodeUpdateTime = time; } + + @Override + public Priority checkAndGetApplicationPriority(Priority priorityFromContext, + String user, String queueName, ApplicationId applicationId) + throws YarnException { + Priority appPriority = null; + + // ToDo: Verify against priority ACLs + + // Verify the scenario where priority is null from submissionContext. + if (null == priorityFromContext) { + // Get the default priority for the Queue. If Queue is non-existent, then + // use default priority + priorityFromContext = getDefaultPriorityForQueue(queueName); + + LOG.info("Application '" + applicationId + + "' is submitted without priority " + + "hence considering default queue/cluster priority:" + + priorityFromContext.getPriority()); + } + + // Verify whether submitted priority is lesser than max priority + // in the cluster. If it is out of found, defining a max cap. + if (priorityFromContext.compareTo(getMaxClusterLevelAppPriority()) < 0) { + priorityFromContext = Priority + .newInstance(getMaxClusterLevelAppPriority().getPriority()); + } + + appPriority = priorityFromContext; + + LOG.info("Priority '" + appPriority.getPriority() + + "' is acceptable in queue :" + queueName + "for application:" + + applicationId + "for the user: " + user); + + return appPriority; + } + + private Priority getDefaultPriorityForQueue(String queueName) { + Queue queue = getQueue(queueName); + if (null == queue) { + // Return with default application priority + return Priority.newInstance(CapacitySchedulerConfiguration + .DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); + } + + return Priority.newInstance(queue.getDefaultApplicationPriority() + .getPriority()); + } + + public Priority getMaxClusterLevelAppPriority() { + return maxClusterLevelAppPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 563643cdd6d..be5e6dd4869 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -206,6 +206,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption"; + @Private + public static final String DEFAULT_APPLICATION_PRIORITY = "default-application-priority"; + + @Private + public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0; + @Private public static class QueueMapping { @@ -947,4 +953,11 @@ public Set getConfiguredNodeLabels(String queuePath) { return configuredNodeLabels; } + + public Integer getDefaultApplicationPriorityConfPerQueue(String queue) { + Integer defaultPriority = getInt(getQueuePrefix(queue) + + DEFAULT_APPLICATION_PRIORITY, + DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); + return defaultPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 598f2790638..0ce4d680f9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -95,9 +95,11 @@ public class LeafQueue extends AbstractCSQueue { private int nodeLocalityDelay; - Map applicationAttemptMap = + Map applicationAttemptMap = new HashMap(); - + + private Priority defaultAppPriorityPerQueue; + Set pendingApplications; private float minimumAllocationFactor; @@ -220,6 +222,9 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) } } + defaultAppPriorityPerQueue = Priority.newInstance(conf + .getDefaultApplicationPriorityConfPerQueue(getQueuePath())); + LOG.info("Initializing " + queueName + "\n" + "capacity = " + queueCapacities.getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n" + @@ -265,7 +270,8 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "reservationsContinueLooking = " + reservationsContinueLooking + "\n" + - "preemptionDisabled = " + getPreemptionDisabled() + "\n"); + "preemptionDisabled = " + getPreemptionDisabled() + "\n" + + "defaultAppPriorityPerQueue = " + defaultAppPriorityPerQueue); } @Override @@ -2060,7 +2066,12 @@ public synchronized void setOrderingPolicy( ); this.orderingPolicy = orderingPolicy; } - + + @Override + public Priority getDefaultApplicationPriority() { + return defaultAppPriorityPerQueue; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 3085d93ab2f..dfeb30f3e30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -72,6 +72,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { + this(applicationAttemptId, user, queue, activeUsersManager, rmContext, + Priority.newInstance(0)); + } + + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext, Priority appPriority) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); @@ -87,6 +94,7 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, } setAMResource(amResource); + setPriority(appPriority); } synchronized public boolean containerCompleted(RMContainer rmContainer, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java index a54e4bfb146..89d2f66b094 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; public class AppAddedSchedulerEvent extends SchedulerEvent { @@ -28,25 +30,35 @@ public class AppAddedSchedulerEvent extends SchedulerEvent { private final String user; private final ReservationId reservationID; private final boolean isAppRecovering; + private final Priority appPriority; - public AppAddedSchedulerEvent( - ApplicationId applicationId, String queue, String user) { - this(applicationId, queue, user, false, null); + public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, + String user) { + this(applicationId, queue, user, false, null, Priority.newInstance(0)); } public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, - String user, ReservationId reservationID) { - this(applicationId, queue, user, false, reservationID); + String user, ReservationId reservationID, Priority appPriority) { + this(applicationId, queue, user, false, reservationID, appPriority); + } + + public AppAddedSchedulerEvent(String user, + ApplicationSubmissionContext submissionContext, boolean isAppRecovering) { + this(submissionContext.getApplicationId(), submissionContext.getQueue(), + user, isAppRecovering, submissionContext.getReservationID(), + submissionContext.getPriority()); } public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, - String user, boolean isAppRecovering, ReservationId reservationID) { + String user, boolean isAppRecovering, ReservationId reservationID, + Priority appPriority) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; this.reservationID = reservationID; this.isAppRecovering = isAppRecovering; + this.appPriority = appPriority; } public ApplicationId getApplicationId() { @@ -68,4 +80,8 @@ public boolean getIsAppRecovering() { public ReservationId getReservationID() { return reservationID; } + + public Priority getApplicatonPriority() { + return appPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index e488c76ed66..713bdcad7c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -331,6 +331,12 @@ public void incPendingResource(String nodeLabel, Resource resourceToInc) { public void decPendingResource(String nodeLabel, Resource resourceToDec) { } + @Override + public Priority getDefaultApplicationPriority() { + // TODO add implementation for FSParentQueue + return null; + } + public boolean fitsInMaxShare(Resource additionalResource) { Resource usagePlusAddition = Resources.add(getResourceUsage(), additionalResource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index e66c02cc7b6..6b77ceb8dc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -210,6 +210,12 @@ public void incPendingResource(String nodeLabel, Resource resourceToInc) { @Override public void decPendingResource(String nodeLabel, Resource resourceToDec) { } + + @Override + public Priority getDefaultApplicationPriority() { + // TODO add implementation for FIFO scheduler + return null; + } }; public FifoScheduler() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java index b92b2644c28..1045386251c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; import java.util.*; + import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; @@ -29,9 +30,13 @@ public class FifoComparator implements Comparator { @Override - public int compare(SchedulableEntity r1, SchedulableEntity r2) { - int res = r1.compareInputOrderTo(r2); - return res; + public int compare(SchedulableEntity r1, SchedulableEntity r2) { + if (r1.getPriority() != null + && !r1.getPriority().equals(r2.getPriority())) { + return r1.getPriority().compareTo(r2.getPriority()); } + int res = r1.compareInputOrderTo(r2); + return res; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java index 9b9d73d920c..2ccb1cd3b19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java @@ -48,4 +48,9 @@ public interface SchedulableEntity { */ public ResourceUsage getSchedulingResourceUsage(); + /** + * Get the priority of the application + */ + public Priority getPriority(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index d068a94f16b..50803550af5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -289,6 +290,15 @@ public RMApp submitApp(int masterMemory) throws Exception { return submitApp(masterMemory, false); } + public RMApp submitApp(int masterMemory, Priority priority) throws Exception { + Resource resource = Resource.newInstance(masterMemory, 0); + return submitApp(resource, "", UserGroupInformation.getCurrentUser() + .getShortUserName(), null, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, + false, false, null, 0, null, true, priority); + } + public RMApp submitApp(int masterMemory, boolean unmanaged) throws Exception { return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser() @@ -327,7 +337,7 @@ public RMApp submitApp(Resource resource, String name, String user, return submitApp(resource, name, user, acls, false, queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, - true, false, false, null, 0, null, true); + true, false, false, null, 0, null, true, null); } public RMApp submitApp(int masterMemory, String name, String user, @@ -370,18 +380,19 @@ public RMApp submitApp(int masterMemory, String name, String user, resource.setMemory(masterMemory); return submitApp(resource, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - false, null, 0, null, true); + false, null, 0, null, true, Priority.newInstance(0)); } public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) throws Exception { Resource resource = Records.newRecord(Resource.class); resource.setMemory(masterMemory); + Priority priority = Priority.newInstance(0); return submitApp(resource, "", UserGroupInformation.getCurrentUser() .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, attemptFailuresValidityInterval, null, true); + false, null, attemptFailuresValidityInterval, null, true, priority); } public RMApp submitApp(int masterMemory, String name, String user, @@ -391,20 +402,22 @@ public RMApp submitApp(int masterMemory, String name, String user, ApplicationId applicationId) throws Exception { Resource resource = Records.newRecord(Resource.class); resource.setMemory(masterMemory); + Priority priority = Priority.newInstance(0); return submitApp(resource, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - isAppIdProvided, applicationId, 0, null, true); + isAppIdProvided, applicationId, 0, null, true, priority); } public RMApp submitApp(int masterMemory, LogAggregationContext logAggregationContext) throws Exception { Resource resource = Records.newRecord(Resource.class); resource.setMemory(masterMemory); + Priority priority = Priority.newInstance(0); return submitApp(resource, "", UserGroupInformation.getCurrentUser() .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, 0, logAggregationContext, true); + false, null, 0, logAggregationContext, true, priority); } public RMApp submitApp(Resource capability, String name, String user, @@ -412,7 +425,8 @@ public RMApp submitApp(Resource capability, String name, String user, int maxAppAttempts, Credentials ts, String appType, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, ApplicationId applicationId, long attemptFailuresValidityInterval, - LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete) + LogAggregationContext logAggregationContext, + boolean cancelTokensWhenComplete, Priority priority) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); @@ -429,12 +443,15 @@ public RMApp submitApp(Resource capability, String name, String user, sub.setApplicationId(appId); sub.setApplicationName(name); sub.setMaxAppAttempts(maxAppAttempts); - if(unmanaged) { + if (unmanaged) { sub.setUnmanagedAM(true); } if (queue != null) { sub.setQueue(queue); } + if (priority != null) { + sub.setPriority(priority); + } sub.setApplicationType(appType); ContainerLaunchContext clc = Records .newRecord(ContainerLaunchContext.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 3db8b7c3394..f073763c40a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -219,6 +219,7 @@ public void setUp() { rmContext = mockRMContext(1, now - 10); ResourceScheduler scheduler = mockResourceScheduler(); + ((RMContextImpl)rmContext).setScheduler(scheduler); Configuration conf = new Configuration(); ApplicationMasterService masterService = new ApplicationMasterService(rmContext, scheduler); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 32743c97bad..b5563355b3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -1056,7 +1056,7 @@ public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Ex RMApp app0 = rm1.submitApp(resource, "", UserGroupInformation .getCurrentUser().getShortUserName(), null, false, null, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true, - false, null, 0, null, true); + false, null, 0, null, true, null); MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); am0.allocate("127.0.0.1", 1000, 2, new ArrayList()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index b3ac79bb072..d6f64bfb880 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -1025,6 +1025,7 @@ private void mockApplications(String appsConfig) { when(app.getReservedContainers()).thenReturn(reservedContainers); when(app.getApplicationAttemptId()).thenReturn(appAttemptId); when(app.getApplicationId()).thenReturn(appId); + when(app.getPriority()).thenReturn(Priority.newInstance(0)); // add to LeafQueue LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 484090daeae..1afebb6247d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -155,7 +155,8 @@ private FiCaSchedulerApp getMockApplication(int appId, String user, doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); doReturn(user).when(application).getUser(); doReturn(amResource).when(application).getAMResource(); - when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod(); + doReturn(Priority.newInstance(0)).when(application).getPriority(); + when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod(); return application; } @@ -175,7 +176,7 @@ public void testAMResourceLimit() throws Exception { ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class); when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); - + assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit()); assertEquals(Resource.newInstance(4 * GB, 1), queue.getUserAMResourceLimit()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java new file mode 100644 index 00000000000..80eff064cc0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestApplicationPriority { + private static final Log LOG = LogFactory + .getLog(TestApplicationPriority.class); + private final int GB = 1024; + + private YarnConfiguration conf; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + } + + @Test + public void testApplicationOrderingWithPriority() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + LeafQueue q = (LeafQueue) cs.getQueue("default"); + Assert.assertNotNull(q); + + String host = "127.0.0.1"; + RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(16 * GB), 1, + host); + cs.handle(new NodeAddedSchedulerEvent(node)); + + // add app 1 start + ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( + appId1, 1); + + RMAppAttemptMetrics attemptMetric1 = new RMAppAttemptMetrics(appAttemptId1, + rm.getRMContext()); + RMAppImpl app1 = mock(RMAppImpl.class); + when(app1.getApplicationId()).thenReturn(appId1); + RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); + when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1); + when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1); + when(app1.getCurrentAppAttempt()).thenReturn(attempt1); + + rm.getRMContext().getRMApps().put(appId1, app1); + + SchedulerEvent addAppEvent1 = new AppAddedSchedulerEvent(appId1, "default", + "user", null, Priority.newInstance(5)); + cs.handle(addAppEvent1); + SchedulerEvent addAttemptEvent1 = new AppAttemptAddedSchedulerEvent( + appAttemptId1, false); + cs.handle(addAttemptEvent1); + // add app1 end + + // add app2 begin + ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2); + ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId( + appId2, 1); + + RMAppAttemptMetrics attemptMetric2 = new RMAppAttemptMetrics(appAttemptId2, + rm.getRMContext()); + RMAppImpl app2 = mock(RMAppImpl.class); + when(app2.getApplicationId()).thenReturn(appId2); + RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class); + when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2); + when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2); + when(app2.getCurrentAppAttempt()).thenReturn(attempt2); + + rm.getRMContext().getRMApps().put(appId2, app2); + + SchedulerEvent addAppEvent2 = new AppAddedSchedulerEvent(appId2, "default", + "user", null, Priority.newInstance(8)); + cs.handle(addAppEvent2); + SchedulerEvent addAttemptEvent2 = new AppAttemptAddedSchedulerEvent( + appAttemptId2, false); + cs.handle(addAttemptEvent2); + // add app end + + // Now, the first assignment will be for app2 since app2 is of highest + // priority + assertEquals(q.getApplications().size(), 2); + assertEquals(q.getApplications().iterator().next() + .getApplicationAttemptId(), appAttemptId2); + + rm.stop(); + } + + @Test + public void testApplicationPriorityAllocation() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + MockRM rm = new MockRM(conf); + rm.start(); + + Priority appPriority1 = Priority.newInstance(5); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); + RMApp app1 = rm.submitApp(1 * GB, appPriority1); + + // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 + MockAM am1 = MockRM.launchAM(app1, rm, nm1); + am1.registerAppAttempt(); + + // add request for containers + am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 7); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler, 7 containers will be allocated for App1 + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + alloc1Response = am1.schedule(); + } + + List allocated1 = alloc1Response.getAllocatedContainers(); + Assert.assertEquals(7, allocated1.size()); + Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory()); + + // check node report, 15 GB used (1 AM and 7 containers) and 1 GB available + SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( + nm1.getNodeId()); + Assert.assertEquals(15 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(1 * GB, report_nm1.getAvailableResource().getMemory()); + + // Submit the second app App2 with priority 8 (Higher than App1) + Priority appPriority2 = Priority.newInstance(8); + RMApp app2 = rm.submitApp(1 * GB, appPriority2); + + // kick the scheduler, 1 GB which was free is given to AM of App2 + nm1.nodeHeartbeat(true); + MockAM am2 = rm.sendAMLaunched(app2.getCurrentAppAttempt() + .getAppAttemptId()); + am2.registerAppAttempt(); + + // check node report, 16 GB used and 0 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory()); + + // get scheduler + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // get scheduler app + FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() + .get(app1.getApplicationId()).getCurrentAppAttempt(); + + // kill 2 containers to free up some space + int counter = 0; + for (Container c : allocated1) { + if (++counter > 2) { + break; + } + cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); + } + + // check node report, 12 GB used and 4 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory()); + + // add request for containers App1 + am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 10); + am1.schedule(); // send the request for App1 + + // add request for containers App2 + am2.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 3); + AllocateResponse alloc1Response4 = am2.schedule(); // send the request + + // kick the scheduler, since App2 priority is more than App1, it will get + // remaining cluster space. + nm1.nodeHeartbeat(true); + while (alloc1Response4.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 2..."); + Thread.sleep(100); + alloc1Response4 = am2.schedule(); + } + + // check node report, 16 GB used and 0 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory()); + + rm.stop(); + } + + @Test + public void testPriorityWithPendingApplications() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + MockRM rm = new MockRM(conf); + rm.start(); + + Priority appPriority1 = Priority.newInstance(5); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * GB); + RMApp app1 = rm.submitApp(1 * GB, appPriority1); + + // kick the scheduler, 1 GB given to AM1, remaining 7GB on nm1 + MockAM am1 = MockRM.launchAM(app1, rm, nm1); + am1.registerAppAttempt(); + + // add request for containers + am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 1 * GB, 1, 7); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler, 7 containers will be allocated for App1 + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + alloc1Response = am1.schedule(); + } + + List allocated1 = alloc1Response.getAllocatedContainers(); + Assert.assertEquals(7, allocated1.size()); + Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory()); + + // check node report, 8 GB used (1 AM and 7 containers) and 0 GB available + SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( + nm1.getNodeId()); + Assert.assertEquals(8 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory()); + + // Submit the second app App2 with priority 7 + Priority appPriority2 = Priority.newInstance(7); + RMApp app2 = rm.submitApp(1 * GB, appPriority2); + + // Submit the third app App3 with priority 8 + Priority appPriority3 = Priority.newInstance(8); + RMApp app3 = rm.submitApp(1 * GB, appPriority3); + + // Submit the second app App4 with priority 6 + Priority appPriority4 = Priority.newInstance(6); + RMApp app4 = rm.submitApp(1 * GB, appPriority4); + + // Only one app can run as AM resource limit restricts it. Kill app1, + // If app3 (highest priority among rest) gets active, it indicates that + // priority is working with pendingApplications. + rm.killApp(app1.getApplicationId()); + + // kick the scheduler, app3 (high among pending) gets free space + nm1.nodeHeartbeat(true); + MockAM am3 = rm.sendAMLaunched(app3.getCurrentAppAttempt() + .getAppAttemptId()); + am3.registerAppAttempt(); + + // check node report, 1 GB used and 7 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(7 * GB, report_nm1.getAvailableResource().getMemory()); + + rm.stop(); + } + + @Test + public void testMaxPriorityValidation() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + Priority maxPriority = Priority.newInstance(10); + MockRM rm = new MockRM(conf); + rm.start(); + + Priority appPriority1 = Priority.newInstance(15); + rm.registerNode("127.0.0.1:1234", 8 * GB); + RMApp app1 = rm.submitApp(1 * GB, appPriority1); + + // Application submission should be successful and verify priority + Assert.assertEquals(app1.getApplicationSubmissionContext().getPriority(), + maxPriority); + rm.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index e8afab2ae9b..a8bbac3db8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -129,6 +129,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; @@ -898,13 +899,17 @@ public void testApplicationComparator() ApplicationId id1 = ApplicationId.newInstance(1, 1); ApplicationId id2 = ApplicationId.newInstance(1, 2); ApplicationId id3 = ApplicationId.newInstance(2, 1); + Priority priority = Priority.newInstance(0); //same clusterId FiCaSchedulerApp app1 = Mockito.mock(FiCaSchedulerApp.class); when(app1.getApplicationId()).thenReturn(id1); + when(app1.getPriority()).thenReturn(priority); FiCaSchedulerApp app2 = Mockito.mock(FiCaSchedulerApp.class); when(app2.getApplicationId()).thenReturn(id2); + when(app2.getPriority()).thenReturn(priority); FiCaSchedulerApp app3 = Mockito.mock(FiCaSchedulerApp.class); when(app3.getApplicationId()).thenReturn(id3); + when(app3.getPriority()).thenReturn(priority); assertTrue(appComparator.compare(app1, app2) < 0); //different clusterId assertTrue(appComparator.compare(app1, app3) < 0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java index fe8c455cdf8..bf4c98a554a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java @@ -31,7 +31,8 @@ public class MockSchedulableEntity implements SchedulableEntity { private String id; private long serial = 0; - + private Priority priority; + public MockSchedulableEntity() { } public void setId(String id) { @@ -74,5 +75,13 @@ public int compareInputOrderTo(SchedulableEntity other) { } return 1;//let other types go before this, if any } - + + @Override + public Priority getPriority() { + return priority; + } + + public void setApplicationPriority(Priority priority) { + this.priority = priority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 49c7bf96ac6..d85e928b791 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -1048,13 +1048,13 @@ public void testAppSubmissionWithPreviousToken() throws Exception{ Resource resource = Records.newRecord(Resource.class); resource.setMemory(200); RMApp app1 = rm.submitApp(resource, "name", "user", null, false, null, 2, - credentials, null, true, false, false, null, 0, null, false); + credentials, null, true, false, false, null, 0, null, false, null); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING); // submit app2 with the same token, set cancelTokenWhenComplete to true; RMApp app2 = rm.submitApp(resource, "name", "user", null, false, null, 2, - credentials, null, true, false, false, null, 0, null, true); + credentials, null, true, false, false, null, 0, null, true, null); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING); MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2); @@ -1114,7 +1114,7 @@ public void testCancelWithMultipleAppSubmissions() throws Exception{ resource.setMemory(200); RMApp app1 = rm.submitApp(resource, "name", "user", null, false, null, 2, credentials, - null, true, false, false, null, 0, null, true); + null, true, false, false, null, 0, null, true, null); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING); @@ -1122,7 +1122,7 @@ public void testCancelWithMultipleAppSubmissions() throws Exception{ Assert.assertNotNull(dttr); Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId())); RMApp app2 = rm.submitApp(resource, "name", "user", null, false, null, 2, - credentials, null, true, false, false, null, 0, null, true); + credentials, null, true, false, false, null, 0, null, true, null); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING); Assert.assertTrue(renewer.getAllTokens().containsKey(token1)); @@ -1139,7 +1139,7 @@ public void testCancelWithMultipleAppSubmissions() throws Exception{ Assert.assertFalse(Renewer.cancelled); RMApp app3 = rm.submitApp(resource, "name", "user", null, false, null, 2, - credentials, null, true, false, false, null, 0, null, true); + credentials, null, true, false, false, null, 0, null, true, null); MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm1); rm.waitForState(app3.getApplicationId(), RMAppState.RUNNING); Assert.assertTrue(renewer.getAllTokens().containsKey(token1)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java index 8e5e6015b8e..de4d116eb2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java @@ -759,10 +759,10 @@ public void testAppSubmit(String acceptMedia, String contentMedia) ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo(); appInfo.setApplicationId(appId); appInfo.setApplicationName(appName); - appInfo.setPriority(3); appInfo.setMaxAppAttempts(2); appInfo.setQueue(queueName); appInfo.setApplicationType(appType); + appInfo.setPriority(0); HashMap lr = new HashMap<>(); LocalResourceInfo y = new LocalResourceInfo(); y.setUrl(new URI("http://www.test.com/file.txt"));