diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 16c18bcd5c3..44e80a6c234 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -201,6 +202,9 @@ public class CapacityScheduler extends private int threadNum = 0; + private final PendingApplicationComparator applicationComparator = + new PendingApplicationComparator(); + @Override public void setConf(Configuration conf) { yarnConf = conf; @@ -3555,4 +3559,31 @@ public class CapacityScheduler extends return asyncSchedulerThreads; } } + + @Override + public PendingApplicationComparator getPendingApplicationComparator(){ + return applicationComparator; + } + + /** + * Comparator that orders applications by their submit time. + */ + class PendingApplicationComparator + implements Comparator { + + @Override + public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) { + RMApp rmApp1 = rmContext.getRMApps().get(app1.getApplicationId()); + RMApp rmApp2 = rmContext.getRMApps().get(app2.getApplicationId()); + if (rmApp1 != null && rmApp2 != null) { + return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime()); + } else if (rmApp1 != null) { + return -1; + } else if (rmApp2 != null) { + return 1; + } else{ + return 0; + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 1d0600f6680..5795a7e30fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.Comparator; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -104,4 +102,5 @@ public interface CapacitySchedulerContext { */ Clock getClock(); + CapacityScheduler.PendingApplicationComparator getPendingApplicationComparator(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueContext.java index df7a6274566..4d111376552 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueContext.java @@ -19,11 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; @@ -129,8 +127,7 @@ public class CapacitySchedulerQueueContext { return csContext.getApplicationAttempt(applicationAttemptId); } - // TODO this is used in GuaranteedOrZeroCapacityOverTimePolicy, refactor the comparator there - public RMApp getRMApp(ApplicationId applicationId) { - return csContext.getRMContext().getRMApps().get(applicationId); + public CapacityScheduler.PendingApplicationComparator getApplicationComparator() { + return csContext.getPendingApplicationComparator(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java index 35275574b74..0d51983bde6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -22,7 +22,6 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.yarn.api.records.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils; @@ -42,8 +41,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -253,33 +250,6 @@ public class GuaranteedOrZeroCapacityOverTimePolicy } } - /** - * Comparator that orders applications by their submit time - */ - private class PendingApplicationComparator - implements Comparator { - - @Override - public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) { - RMApp rmApp1 = managedParentQueue.getQueueContext().getRMApp( - app1.getApplicationId()); - RMApp rmApp2 = managedParentQueue.getQueueContext().getRMApp( - app2.getApplicationId()); - if (rmApp1 != null && rmApp2 != null) { - return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime()); - } else if (rmApp1 != null) { - return -1; - } else if (rmApp2 != null) { - return 1; - } else{ - return 0; - } - } - } - - private PendingApplicationComparator applicationComparator = - new PendingApplicationComparator(); - @Override public void init(final ParentQueue parentQueue) throws IOException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -809,7 +779,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy private List getSortedPendingApplications() { List apps = new ArrayList<>( managedParentQueue.getAllApplications()); - Collections.sort(apps, applicationComparator); + apps.sort(managedParentQueue.getQueueContext().getApplicationComparator()); return apps; }