From 806c7b7dfb0517b9904df8be132fd2c011b74b9f Mon Sep 17 00:00:00 2001 From: Jonathan Hung Date: Tue, 24 Sep 2019 12:13:29 -0700 Subject: [PATCH] YARN-9730. Support forcing configured partitions to be exclusive based on app node label (cherry picked from commit 73a044a63822303f792183244e25432528ecfb1e) --- .../hadoop/yarn/conf/YarnConfiguration.java | 6 + .../src/main/resources/yarn-default.xml | 9 + .../resourcemanager/DefaultAMSProcessor.java | 6 + .../server/resourcemanager/RMAppManager.java | 6 + .../server/resourcemanager/RMContext.java | 4 + .../server/resourcemanager/RMContextImpl.java | 15 ++ .../SchedulerApplicationAttempt.java | 9 + .../scheduler/SchedulerUtils.java | 23 ++ .../CapacitySchedulerConfiguration.java | 7 + .../scheduler/capacity/LeafQueue.java | 8 +- .../AbstractComparatorOrderingPolicy.java | 8 +- ...OrderingPolicyWithExclusivePartitions.java | 144 +++++++++++ .../scheduler/policy/IteratorSelector.java | 48 ++++ .../scheduler/policy/OrderingPolicy.java | 3 +- .../scheduler/policy/SchedulableEntity.java | 5 + .../scheduler/TestSchedulerUtils.java | 142 ++++++++++ .../capacity/TestCapacityScheduler.java | 11 +- .../scheduler/capacity/TestLeafQueue.java | 145 ++++++++++- .../policy/MockSchedulableEntity.java | 15 +- .../policy/TestFairOrderingPolicy.java | 12 +- .../policy/TestFifoOrderingPolicy.java | 2 +- .../TestFifoOrderingPolicyForPendingApps.java | 5 +- ...OrderingPolicyWithExclusivePartitions.java | 244 ++++++++++++++++++ 23 files changed, 850 insertions(+), 27 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 87d2f0cbca5..43ee826cd8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3624,6 +3624,12 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE = CENTRALIZED_NODELABEL_CONFIGURATION_TYPE; + public static final String EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX + = "exclusive-enforced-partitions"; + + public static final String EXCLUSIVE_ENFORCED_PARTITIONS = NODE_LABELS_PREFIX + + EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX; + public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY = YARN_PREFIX + "cluster.max-application-priority"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 0835d02d1e2..f9efeb9b201 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -4103,4 +4103,13 @@ 60000 + + + Comma-separated list of partitions. If a label P is in this list, + then the RM will enforce that an app has resource requests with label + P iff that app's node label expression is P. + + yarn.node-labels.exclusive-enforced-partitions + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index a3bdb2f482a..dc1c952d8bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler .SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -235,6 +236,11 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { && ResourceRequest.ANY.equals(req.getResourceName())) { req.setNodeLabelExpression(asc.getNodeLabelExpression()); } + if (ResourceRequest.ANY.equals(req.getResourceName())) { + SchedulerUtils.enforcePartitionExclusivity(req, + getRmContext().getExclusiveEnforcedPartitions(), + asc.getNodeLabelExpression()); + } } Resource maximumCapacity = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index ffe34d03e3c..a991d9229c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -99,6 +100,7 @@ public class RMAppManager implements EventHandler, private YarnAuthorizationProvider authorizer; private boolean timelineServiceV2Enabled; private boolean nodeLabelsEnabled; + private Set exclusiveEnforcedPartitions; public RMAppManager(RMContext context, YarnScheduler scheduler, ApplicationMasterService masterService, @@ -123,6 +125,7 @@ public class RMAppManager implements EventHandler, timelineServiceV2Enabled(conf); this.nodeLabelsEnabled = YarnConfiguration .areNodeLabelsEnabled(rmContext.getYarnConfiguration()); + this.exclusiveEnforcedPartitions = context.getExclusiveEnforcedPartitions(); } /** @@ -566,6 +569,9 @@ public class RMAppManager implements EventHandler, throw new InvalidResourceRequestException("Invalid resource request, " + "no resource request specified with " + ResourceRequest.ANY); } + SchedulerUtils.enforcePartitionExclusivity(anyReq, + exclusiveEnforcedPartitions, + submissionContext.getNodeLabelExpression()); // Make sure that all of the requests agree with the ANY request // and have correct values diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index d3daa05e16a..08585c98a89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.nio.ByteBuffer; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; @@ -188,4 +189,7 @@ public interface RMContext extends ApplicationMasterServiceContext { void setMultiNodeSortingManager( MultiNodeSortingManager multiNodeSortingManager); + + Set getExclusiveEnforcedPartitions(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 5b295f63c17..cc48e8b7874 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -33,6 +35,7 @@ import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; @@ -643,4 +646,16 @@ public class RMContextImpl implements RMContext { public NodeAttributesManager getNodeAttributesManager() { return activeServiceContext.getNodeAttributesManager(); } + + public Set getExclusiveEnforcedPartitions() { + String[] configuredPartitions = getYarnConfiguration().getStrings( + YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS); + Set exclusiveEnforcedPartitions = new HashSet<>(); + if (configuredPartitions != null) { + for (String partition : configuredPartitions) { + exclusiveEnforcedPartitions.add(partition); + } + } + return exclusiveEnforcedPartitions; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index c777db02bed..68c18b26cb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -205,6 +205,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private AtomicLong unconfirmedAllocatedMem = new AtomicLong(); private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger(); + private String nodeLabelExpression; + public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, String user, Queue queue, AbstractUsersManager abstractUsersManager, RMContext rmContext) { @@ -226,6 +228,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { unmanagedAM = appSubmissionContext.getUnmanagedAM(); this.logAggregationContext = appSubmissionContext.getLogAggregationContext(); + this.nodeLabelExpression = + appSubmissionContext.getNodeLabelExpression(); } applicationSchedulingEnvs = rmApp.getApplicationSchedulingEnvs(); } @@ -1468,4 +1472,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public Map getApplicationSchedulingEnvs() { return this.applicationSchedulingEnvs; } + + @Override + public String getPartition() { + return nodeLabelExpression == null ? "" : nodeLabelExpression; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 36746c02809..62be747a429 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -302,6 +302,29 @@ public class SchedulerUtils { rmContext, queueInfo, nodeLabelsEnabled); } + /** + * If RM should enforce partition exclusivity for enforced partition "x": + * 1) If request is "x" and app label is not "x", + * override request to app's label. + * 2) If app label is "x", ensure request is "x". + * @param resReq resource request + * @param enforcedPartitions list of exclusive enforced partitions + * @param appLabel app's node label expression + */ + public static void enforcePartitionExclusivity(ResourceRequest resReq, + Set enforcedPartitions, String appLabel) { + if (enforcedPartitions == null || enforcedPartitions.isEmpty()) { + return; + } + if (!enforcedPartitions.contains(appLabel) + && enforcedPartitions.contains(resReq.getNodeLabelExpression())) { + resReq.setNodeLabelExpression(appLabel); + } + if (enforcedPartitions.contains(appLabel)) { + resReq.setNodeLabelExpression(appLabel); + } + } + /** * Utility method to validate a resource request, by insuring that the * requested memory/vcore is non-negative and not greater than max diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 4e49474782c..dda607ee81f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodePolicySpec; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.util.UnitsConversionUtil; @@ -159,6 +160,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String FAIR_APP_ORDERING_POLICY = "fair"; + public static final String FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY + = "fifo-with-partitions"; + public static final String DEFAULT_APP_ORDERING_POLICY = FIFO_APP_ORDERING_POLICY; @@ -555,6 +559,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) { policyType = FairOrderingPolicy.class.getName(); } + if (policyType.trim().equals(FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY)) { + policyType = FifoOrderingPolicyWithExclusivePartitions.class.getName(); + } try { orderingPolicy = (OrderingPolicy) Class.forName(policyType).newInstance(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3cfb111f034..fbf94360149 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; @@ -806,7 +807,8 @@ public class LeafQueue extends AbstractCSQueue { } for (Iterator fsApp = - getPendingAppsOrderingPolicy().getAssignmentIterator(); + getPendingAppsOrderingPolicy() + .getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR); fsApp.hasNext(); ) { FiCaSchedulerApp application = fsApp.next(); ApplicationId applicationId = application.getApplicationId(); @@ -1103,8 +1105,10 @@ public class LeafQueue extends AbstractCSQueue { Map userLimits = new HashMap<>(); boolean needAssignToQueueCheck = true; + IteratorSelector sel = new IteratorSelector(); + sel.setPartition(candidates.getPartition()); for (Iterator assignmentIterator = - orderingPolicy.getAssignmentIterator(); + orderingPolicy.getAssignmentIterator(sel); assignmentIterator.hasNext(); ) { FiCaSchedulerApp application = assignmentIterator.next(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index 09dd3bf3cc6..9f40d66cd1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -46,13 +46,13 @@ public abstract class AbstractComparatorOrderingPolicy getSchedulableEntities() { return schedulableEntities; } - + @Override - public Iterator getAssignmentIterator() { + public Iterator getAssignmentIterator(IteratorSelector sel) { reorderScheduleEntities(); return schedulableEntities.iterator(); } - + @Override public Iterator getPreemptionIterator() { reorderScheduleEntities(); @@ -137,5 +137,5 @@ public abstract class AbstractComparatorOrderingPolicy.ordering-policy.exclusive-enforced-partitions}. + */ +public class FifoOrderingPolicyWithExclusivePartitions + implements OrderingPolicy { + + private static final String DEFAULT_PARTITION = "DEFAULT_PARTITION"; + + private Map> orderingPolicies; + + public FifoOrderingPolicyWithExclusivePartitions() { + this.orderingPolicies = new HashMap<>(); + this.orderingPolicies.put(DEFAULT_PARTITION, new FifoOrderingPolicy()); + } + + public Collection getSchedulableEntities() { + return unionOrderingPolicies().getSchedulableEntities(); + } + + public Iterator getAssignmentIterator(IteratorSelector sel) { + // Return schedulable entities only from filtered partition + return getPartitionOrderingPolicy(sel.getPartition()) + .getAssignmentIterator(sel); + } + + public Iterator getPreemptionIterator() { + // Entities from all partitions should be preemptible + return unionOrderingPolicies().getPreemptionIterator(); + } + + /** + * Union all schedulable entities from all ordering policies. + * @return ordering policy containing all schedulable entities + */ + private OrderingPolicy unionOrderingPolicies() { + OrderingPolicy ret = new FifoOrderingPolicy<>(); + for (Map.Entry> entry + : orderingPolicies.entrySet()) { + ret.addAllSchedulableEntities(entry.getValue().getSchedulableEntities()); + } + return ret; + } + + public void addSchedulableEntity(S s) { + getPartitionOrderingPolicy(s.getPartition()).addSchedulableEntity(s); + } + + public boolean removeSchedulableEntity(S s) { + return getPartitionOrderingPolicy(s.getPartition()) + .removeSchedulableEntity(s); + } + + public void addAllSchedulableEntities(Collection sc) { + for (S entity : sc) { + getPartitionOrderingPolicy(entity.getPartition()) + .addSchedulableEntity(entity); + } + } + + public int getNumSchedulableEntities() { + // Return total number of schedulable entities, to maintain parity with + // existing FifoOrderingPolicy e.g. when determining if queue has reached + // its max app limit + int ret = 0; + for (Map.Entry> entry + : orderingPolicies.entrySet()) { + ret += entry.getValue().getNumSchedulableEntities(); + } + return ret; + } + + public void containerAllocated(S schedulableEntity, RMContainer r) { + getPartitionOrderingPolicy(schedulableEntity.getPartition()) + .containerAllocated(schedulableEntity, r); + } + + public void containerReleased(S schedulableEntity, RMContainer r) { + getPartitionOrderingPolicy(schedulableEntity.getPartition()) + .containerReleased(schedulableEntity, r); + } + + public void demandUpdated(S schedulableEntity) { + getPartitionOrderingPolicy(schedulableEntity.getPartition()) + .demandUpdated(schedulableEntity); + } + + @Override + public void configure(Map conf) { + if (conf == null) { + return; + } + String partitions = + conf.get(YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX); + if (partitions != null) { + for (String partition : partitions.split(",")) { + partition = partition.trim(); + if (!partition.isEmpty()) { + this.orderingPolicies.put(partition, new FifoOrderingPolicy()); + } + } + } + } + + @Override + public String getInfo() { + return "FifoOrderingPolicyWithExclusivePartitions"; + } + + private OrderingPolicy getPartitionOrderingPolicy(String partition) { + String keyPartition = orderingPolicies.containsKey(partition) ? + partition : DEFAULT_PARTITION; + return orderingPolicies.get(keyPartition); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java new file mode 100644 index 00000000000..0e9b55faf92 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +/** + * IteratorSelector contains information needed to tell an + * {@link OrderingPolicy} what to return in an iterator. + */ +public class IteratorSelector { + + public static final IteratorSelector EMPTY_ITERATOR_SELECTOR = + new IteratorSelector(); + + private String partition; + + /** + * The partition for this iterator selector. + * @return partition + */ + public String getPartition() { + return this.partition; + } + + /** + * Set partition for this iterator selector. + * @param p partition + */ + public void setPartition(String p) { + this.partition = p; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java index 9aacc7e79a4..66b6a59ec4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java @@ -45,10 +45,11 @@ public interface OrderingPolicy { /** * Return an iterator over the collection of {@link SchedulableEntity} * objects which orders them for container assignment. + * @param sel the {@link IteratorSelector} to filter with * @return an iterator over the collection of {@link SchedulableEntity} * objects */ - public Iterator getAssignmentIterator(); + Iterator getAssignmentIterator(IteratorSelector sel); /** * Return an iterator over the collection of {@link SchedulableEntity} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java index 41b83ce7162..be835560ade 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java @@ -55,4 +55,9 @@ public interface SchedulableEntity { */ public boolean isRecovering(); + /** + * Get partition corresponding to this entity. + * @return partition + */ + String getPartition(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index 73027058e0b..58af0922797 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -39,6 +39,7 @@ import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -1011,6 +1012,147 @@ public class TestSchedulerUtils { System.err.println("Failed to wait scheduler application attempt stopped."); } + @Test + public void testEnforcePartitionExclusivity() { + String enforcedExclusiveLabel = "x"; + Set enforcedExclusiveLabelSet = + Collections.singleton(enforcedExclusiveLabel); + String dummyLabel = "y"; + String appLabel = "appLabel"; + ResourceRequest rr = BuilderUtils.newResourceRequest( + mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1); + + // RR label unset and app label does not match. Nothing should happen. + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + null); + Assert.assertNull(rr.getNodeLabelExpression()); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + appLabel); + Assert.assertNull(rr.getNodeLabelExpression()); + + // RR label and app label do not match. Nothing should happen. + rr.setNodeLabelExpression(dummyLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + null); + Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression()); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + appLabel); + Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression()); + + // RR label matches but app label does not. RR label should be set + // to app label + rr.setNodeLabelExpression(enforcedExclusiveLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + null); + Assert.assertNull(rr.getNodeLabelExpression()); + rr.setNodeLabelExpression(enforcedExclusiveLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + appLabel); + Assert.assertEquals(appLabel, rr.getNodeLabelExpression()); + + // RR label unset and app label matches. RR label should be set + // to app label + rr.setNodeLabelExpression(null); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + enforcedExclusiveLabel); + Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression()); + + // RR label does not match and app label matches. RR label should be set + // to app label + rr.setNodeLabelExpression(dummyLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + enforcedExclusiveLabel); + Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression()); + + // RR label and app label matches. Nothing should happen. + rr.setNodeLabelExpression(enforcedExclusiveLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + enforcedExclusiveLabel); + Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression()); + + // Unconfigured label: nothing should happen. + rr.setNodeLabelExpression(null); + SchedulerUtils.enforcePartitionExclusivity(rr, null, + appLabel); + Assert.assertNull(rr.getNodeLabelExpression()); + rr.setNodeLabelExpression(dummyLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, null, + appLabel); + Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression()); + rr.setNodeLabelExpression(enforcedExclusiveLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, null, + appLabel); + Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression()); + } + + @Test + public void testEnforcePartitionExclusivityMultipleLabels() { + String enforcedLabel1 = "x"; + String enforcedLabel2 = "y"; + Set enforcedExclusiveLabelSet = new HashSet<>(); + enforcedExclusiveLabelSet.add(enforcedLabel1); + enforcedExclusiveLabelSet.add(enforcedLabel2); + String dummyLabel = "dummyLabel"; + String appLabel = "appLabel"; + ResourceRequest rr = BuilderUtils.newResourceRequest( + mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1); + + // RR label unset and app label does not match. Nothing should happen. + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + null); + Assert.assertNull(rr.getNodeLabelExpression()); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + appLabel); + Assert.assertNull(rr.getNodeLabelExpression()); + + // RR label and app label do not match. Nothing should happen. + rr.setNodeLabelExpression(dummyLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + null); + Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression()); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + appLabel); + Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression()); + + // RR label matches but app label does not. RR label should be set + // to app label + rr.setNodeLabelExpression(enforcedLabel1); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + null); + Assert.assertNull(rr.getNodeLabelExpression()); + rr.setNodeLabelExpression(enforcedLabel2); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + appLabel); + Assert.assertEquals(appLabel, rr.getNodeLabelExpression()); + + // RR label unset and app label matches. RR label should be set + // to app label + rr.setNodeLabelExpression(null); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + enforcedLabel1); + Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression()); + + // RR label does not match and app label matches. RR label should be set + // to app label + rr.setNodeLabelExpression(dummyLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + enforcedLabel2); + Assert.assertEquals(enforcedLabel2, rr.getNodeLabelExpression()); + + // RR label and app label matches. Nothing should happen. + rr.setNodeLabelExpression(enforcedLabel1); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + enforcedLabel1); + Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression()); + + // RR label and app label don't match, but they're both enforced labels. + // RR label should be set to app label. + rr.setNodeLabelExpression(enforcedLabel2); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, + enforcedLabel1); + Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression()); + } + public static SchedulerApplication verifyAppAddedAndRemovedFromScheduler( Map> applications, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 51900b576b5..92808456c67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -152,6 +152,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -1288,8 +1289,9 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { //This happens because app2 has no demand/a magnitude of NaN, which //results in app1 and app2 being equal in the fairness comparison and //failling back to fifo (start) ordering - assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(), - appId1.toString()); + assertEquals(q.getOrderingPolicy().getAssignmentIterator( + IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(), + appId1.toString()); //Now, allocate for app2 (this would be the first/AM allocation) ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory); @@ -1301,8 +1303,9 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { //verify re-ordering based on the allocation alone //Now, the first app for assignment is app2 - assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(), - appId2.toString()); + assertEquals(q.getOrderingPolicy().getAssignmentIterator( + IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(), + appId2.toString()); rm.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 15a5eb2e92d..c0537ffe993 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuot import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; @@ -132,6 +133,8 @@ public class TestLeafQueue { final static int GB = 1024; final static String DEFAULT_RACK = "/default"; + private final static String LABEL = "test"; + private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); @@ -140,14 +143,19 @@ public class TestLeafQueue { @Before public void setUp() throws Exception { - setUpInternal(resourceCalculator); + setUpInternal(resourceCalculator, false); } private void setUpWithDominantResourceCalculator() throws Exception { - setUpInternal(dominantResourceCalculator); + setUpInternal(dominantResourceCalculator, false); } - private void setUpInternal(ResourceCalculator rC) throws Exception { + private void setUpWithNodeLabels() throws Exception { + setUpInternal(resourceCalculator, true); + } + + private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels) + throws Exception { CapacityScheduler spyCs = new CapacityScheduler(); queues = new HashMap(); cs = spy(spyCs); @@ -172,7 +180,7 @@ public class TestLeafQueue { csConf.setBoolean(CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false); final String newRoot = "root" + System.currentTimeMillis(); - setupQueueConfiguration(csConf, newRoot); + setupQueueConfiguration(csConf, newRoot, withNodeLabels); YarnConfiguration conf = new YarnConfiguration(); cs.setConf(conf); @@ -228,24 +236,39 @@ public class TestLeafQueue { private static final String E = "e"; private void setupQueueConfiguration( CapacitySchedulerConfiguration conf, - final String newRoot) { + final String newRoot, boolean withNodeLabels) { // Define top-level queues conf.setQueues(ROOT, new String[] {newRoot}); conf.setMaximumCapacity(ROOT, 100); conf.setAcl(ROOT, QueueACL.SUBMIT_APPLICATIONS, " "); + if (withNodeLabels) { + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, LABEL, 100); + conf.setMaximumCapacityByLabel(CapacitySchedulerConfiguration.ROOT, + LABEL, 100); + } final String Q_newRoot = ROOT + "." + newRoot; conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E}); conf.setCapacity(Q_newRoot, 100); conf.setMaximumCapacity(Q_newRoot, 100); conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " "); + if (withNodeLabels) { + conf.setAccessibleNodeLabels(Q_newRoot, Collections.singleton(LABEL)); + conf.setCapacityByLabel(Q_newRoot, LABEL, 100); + conf.setMaximumCapacityByLabel(Q_newRoot, LABEL, 100); + } final String Q_A = Q_newRoot + "." + A; conf.setCapacity(Q_A, 8.5f); conf.setMaximumCapacity(Q_A, 20); conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*"); + if (withNodeLabels) { + conf.setAccessibleNodeLabels(Q_A, Collections.singleton(LABEL)); + conf.setCapacityByLabel(Q_A, LABEL, 100); + conf.setMaximumCapacityByLabel(Q_A, LABEL, 100); + } final String Q_B = Q_newRoot + "." + B; conf.setCapacity(Q_B, 80); @@ -3097,7 +3120,7 @@ public class TestLeafQueue { Map queues = new HashMap(); CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); final String newRootName = "root" + System.currentTimeMillis(); - setupQueueConfiguration(csConf, newRootName); + setupQueueConfiguration(csConf, newRootName, false); Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32); @@ -3289,6 +3312,116 @@ public class TestLeafQueue { Assert.assertEquals(3 * GB, app_0.getCurrentConsumption().getMemorySize()); } + @Test + public void testFifoWithPartitionsAssignment() throws Exception { + setUpWithNodeLabels(); + + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + OrderingPolicy policy = + new FifoOrderingPolicyWithExclusivePartitions<>(); + policy.configure(Collections.singletonMap( + YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, LABEL)); + a.setOrderingPolicy(policy); + String host00 = "127.0.0.1"; + String rack0 = "rack_0"; + FiCaSchedulerNode node00 = TestUtils.getMockNode(host00, rack0, 0, + 16 * GB); + when(node00.getPartition()).thenReturn(LABEL); + String host01 = "127.0.0.2"; + FiCaSchedulerNode node01 = TestUtils.getMockNode(host01, rack0, 0, + 16 * GB); + when(node01.getPartition()).thenReturn(""); + + final int numNodes = 4; + Resource clusterResource = Resources.createResource(numNodes * (16 * GB), + numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + String user0 = "user_0"; + + final ApplicationAttemptId appAttemptId0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app0 = spy(new FiCaSchedulerApp(appAttemptId0, user0, a, + mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5), + false)); + a.submitApplicationAttempt(app0, user0); + + final ApplicationAttemptId appAttemptId1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app1 = spy(new FiCaSchedulerApp(appAttemptId1, user0, a, + mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3), + false)); + when(app1.getPartition()).thenReturn(LABEL); + a.submitApplicationAttempt(app1, user0); + + Map apps = ImmutableMap.of( + app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(), + app1); + Map nodes = ImmutableMap.of(node00.getNodeID(), + node00, node01.getNodeID(), node01); + + Priority priority = TestUtils.createMockPriority(1); + List app0Requests = new ArrayList<>(); + List app1Requests = new ArrayList<>(); + + app0Requests.clear(); + app0Requests.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priority, + recordFactory)); + app0.updateResourceRequests(app0Requests); + + app1Requests.clear(); + app1Requests.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory, LABEL)); + app1.updateResourceRequests(app1Requests); + + // app_1 will get containers since it is exclusive-enforced + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node00, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage() + .getUsed(LABEL).getMemorySize()); + // app_0 should not get resources from node_0_0 since the labels + // don't match + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node00, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(0 * GB, app0.getCurrentConsumption().getMemorySize()); + + app1Requests.clear(); + app1Requests.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory, LABEL)); + app1.updateResourceRequests(app1Requests); + + // When node_0_1 heartbeats, app_0 should get containers + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node01, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize()); + Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage() + .getUsed(LABEL).getMemorySize()); + + app0Requests.clear(); + app0Requests.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory)); + app0.updateResourceRequests(app0Requests); + + // When node_0_0 heartbeats, app_1 should get containers again + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node00, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize()); + Assert.assertEquals(2 * GB, app1.getSchedulingResourceUsage() + .getUsed(LABEL).getMemorySize()); + } + @Test public void testConcurrentAccess() throws Exception { YarnConfiguration conf = new YarnConfiguration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java index 4f251bf4e38..62f7a4956b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java @@ -18,21 +18,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; -import java.util.*; - import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; public class MockSchedulableEntity implements SchedulableEntity { - + private String id; private long serial = 0; private Priority priority; private boolean isRecovering; + private String partition = ""; public MockSchedulableEntity() { } @@ -101,4 +99,13 @@ public class MockSchedulableEntity implements SchedulableEntity { protected void setRecovering(boolean entityRecovering) { this.isRecovering = entityRecovering; } + + @Override + public String getPartition() { + return partition; + } + + public void setPartition(String partition) { + this.partition = partition; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java index 683173af709..e023e011e5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -126,19 +126,25 @@ public class TestFairOrderingPolicy { //Assignment, least to greatest consumption - checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"}); + checkIds(schedOrder.getAssignmentIterator( + IteratorSelector.EMPTY_ITERATOR_SELECTOR), + new String[]{"3", "2", "1"}); //Preemption, greatest to least checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"}); //Change value without inform, should see no change msp2.setUsed(Resources.createResource(6)); - checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"}); + checkIds(schedOrder.getAssignmentIterator( + IteratorSelector.EMPTY_ITERATOR_SELECTOR), + new String[]{"3", "2", "1"}); checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"}); //Do inform, will reorder schedOrder.containerAllocated(msp2, null); - checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"}); + checkIds(schedOrder.getAssignmentIterator( + IteratorSelector.EMPTY_ITERATOR_SELECTOR), + new String[]{"3", "1", "2"}); checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"}); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java index 447618f4f24..d94d0775589 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java @@ -61,7 +61,7 @@ public class TestFifoOrderingPolicy { schedOrder.addSchedulableEntity(msp3); //Assignment, oldest to youngest - checkSerials(schedOrder.getAssignmentIterator(), new long[]{1, 2, 3}); + checkSerials(schedOrder.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[]{1, 2, 3}); //Preemption, youngest to oldest checkSerials(schedOrder.getPreemptionIterator(), new long[]{3, 2, 1}); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java index befa8e6c321..3996b0dbbb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java @@ -72,8 +72,9 @@ public class TestFifoOrderingPolicyForPendingApps { schedOrder.addSchedulableEntity(msp7); // Assignment with serial id's are 3,2,4,1,6,5,7 - checkSerials(schedOrder.getAssignmentIterator(), new long[] { 3, 2, 4, 1, - 6, 5, 7 }); + checkSerials(schedOrder.getAssignmentIterator( + IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[] {3, 2, 4, 1, + 6, 5, 7}); //Preemption, youngest to oldest checkSerials(schedOrder.getPreemptionIterator(), new long[] { 7, 5, 6, 1, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java new file mode 100644 index 00000000000..499a70a053c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; +import org.junit.Test; + + +/** + * Tests {@link FifoOrderingPolicyWithExclusivePartitions} ordering policy. + */ +public class TestFifoOrderingPolicyWithExclusivePartitions { + + private static final String PARTITION = "test"; + private static final String PARTITION2 = "test2"; + + @Test + public void testNoConfiguredExclusiveEnforcedPartitions() { + FifoOrderingPolicyWithExclusivePartitions policy = + new FifoOrderingPolicyWithExclusivePartitions<>(); + policy.configure(Collections.EMPTY_MAP); + + MockSchedulableEntity p1 = new MockSchedulableEntity(4, 0, false); + p1.setPartition(PARTITION); + p1.setId("p1"); + MockSchedulableEntity p2 = new MockSchedulableEntity(3, 1, false); + p2.setPartition(PARTITION); + p2.setId("p2"); + + MockSchedulableEntity r1 = new MockSchedulableEntity(2, 0, false); + r1.setId("r1"); + MockSchedulableEntity r2 = new MockSchedulableEntity(1, 0, false); + r2.setId("r2"); + + policy.addSchedulableEntity(p1); + policy.addAllSchedulableEntities(Arrays.asList(p2, r1, r2)); + Assert.assertEquals(4, policy.getNumSchedulableEntities()); + Assert.assertEquals(4, policy.getSchedulableEntities().size()); + IteratorSelector sel = new IteratorSelector(); + // Should behave like FifoOrderingPolicy, regardless of partition + verifyAssignmentIteratorOrder(policy, + IteratorSelector.EMPTY_ITERATOR_SELECTOR, "p2", "r2", "r1", "p1"); + verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "p2", "r2", "r1", "p1"); + verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2"); + + policy.removeSchedulableEntity(p2); + policy.removeSchedulableEntity(r2); + Assert.assertEquals(2, policy.getNumSchedulableEntities()); + Assert.assertEquals(2, policy.getSchedulableEntities().size()); + verifyAssignmentIteratorOrder(policy, + IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1", "p1"); + verifyPreemptionIteratorOrder(policy, "p1", "r1"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "r1", "p1"); + verifyPreemptionIteratorOrder(policy, "p1", "r1"); + } + + @Test + public void testSingleExclusiveEnforcedPartition() { + FifoOrderingPolicyWithExclusivePartitions policy = + new FifoOrderingPolicyWithExclusivePartitions<>(); + policy.configure(Collections.singletonMap( + YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, PARTITION)); + + // PARTITION iterator should return p2, p1, p3 + MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false); + p1.setPartition(PARTITION); + p1.setId("p1"); + MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false); + p2.setPartition(PARTITION); + p2.setId("p2"); + MockSchedulableEntity p3 = new MockSchedulableEntity(3, 0, false); + p3.setPartition(PARTITION); + p3.setId("p3"); + + // non-PARTITION iterator should return r3, r2, r1 + MockSchedulableEntity r1 = new MockSchedulableEntity(6, 0, false); + r1.setId("r1"); + MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false); + r2.setId("r2"); + MockSchedulableEntity r3 = new MockSchedulableEntity(2, 1, false); + r3.setId("r3"); + + policy.addSchedulableEntity(r1); + Assert.assertEquals(1, policy.getNumSchedulableEntities()); + Assert.assertEquals("r1", policy.getSchedulableEntities() + .iterator().next().getId()); + verifyAssignmentIteratorOrder(policy, + IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1"); + verifyPreemptionIteratorOrder(policy, "r1"); + + List entities = Arrays.asList(r2, r3, p1, p2); + policy.addAllSchedulableEntities(entities); + policy.addSchedulableEntity(p3); + Assert.assertEquals(6, policy.getNumSchedulableEntities()); + Assert.assertEquals(6, policy.getSchedulableEntities().size()); + // Assignment iterator should return non-PARTITION entities, + // in order based on FifoOrderingPolicy + verifyAssignmentIteratorOrder(policy, + IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r2", "r1"); + // Preemption iterator should return all entities, in global order + verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3"); + // Same thing as above, but with a non-empty partition + IteratorSelector sel = new IteratorSelector(); + sel.setPartition("dummy"); + verifyAssignmentIteratorOrder(policy, sel, "r3", "r2", "r1"); + verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3"); + // Should return PARTITION entities, in order based on FifoOrderingPolicy + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "p2", "p1", "p3"); + verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3"); + + policy.removeSchedulableEntity(p2); + policy.removeSchedulableEntity(r2); + Assert.assertEquals(4, policy.getNumSchedulableEntities()); + Assert.assertEquals(4, policy.getSchedulableEntities().size()); + verifyAssignmentIteratorOrder(policy, + IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1"); + verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "p1", "p3"); + verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3"); + + policy.removeSchedulableEntity(p1); + policy.removeSchedulableEntity(p3); + Assert.assertEquals(2, policy.getNumSchedulableEntities()); + Assert.assertEquals(2, policy.getSchedulableEntities().size()); + verifyAssignmentIteratorOrder(policy, + IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1"); + verifyPreemptionIteratorOrder(policy, "r1", "r3"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel); + verifyPreemptionIteratorOrder(policy, "r1", "r3"); + } + + @Test + public void testMultipleExclusiveEnforcedPartitions() { + FifoOrderingPolicyWithExclusivePartitions policy = + new FifoOrderingPolicyWithExclusivePartitions<>(); + policy.configure(Collections.singletonMap( + YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, + PARTITION + "," + PARTITION2)); + + // PARTITION iterator should return p2, p1 + MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false); + p1.setPartition(PARTITION); + p1.setId("p1"); + MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false); + p2.setPartition(PARTITION); + p2.setId("p2"); + + // PARTITION2 iterator should return r1, r2 + MockSchedulableEntity r1 = new MockSchedulableEntity(3, 0, false); + r1.setPartition(PARTITION2); + r1.setId("r1"); + MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false); + r2.setPartition(PARTITION2); + r2.setId("r2"); + + // default iterator should return s2, s1 + MockSchedulableEntity s1 = new MockSchedulableEntity(6, 0, false); + s1.setId("s1"); + MockSchedulableEntity s2 = new MockSchedulableEntity(2, 0, false); + s2.setId("s2"); + + policy.addAllSchedulableEntities(Arrays.asList(s1, s2, r1)); + Assert.assertEquals(3, policy.getNumSchedulableEntities()); + Assert.assertEquals(3, policy.getSchedulableEntities().size()); + IteratorSelector sel = new IteratorSelector(); + // assignment iterator returns only default (non-partitioned) entities + verifyAssignmentIteratorOrder(policy, + IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1"); + verifyPreemptionIteratorOrder(policy, "s1", "r1", "s2"); + sel.setPartition(PARTITION2); + verifyAssignmentIteratorOrder(policy, sel, "r1"); + + policy.addAllSchedulableEntities(Arrays.asList(r2, p1, p2)); + Assert.assertEquals(6, policy.getNumSchedulableEntities()); + Assert.assertEquals(6, policy.getSchedulableEntities().size()); + verifyAssignmentIteratorOrder(policy, + IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "p2", "p1"); + sel.setPartition(PARTITION2); + verifyAssignmentIteratorOrder(policy, sel, "r1", "r2"); + verifyPreemptionIteratorOrder(policy, "s1", "r2", "r1", "s2", "p1", "p2"); + + policy.removeSchedulableEntity(p2); + policy.removeSchedulableEntity(r1); + policy.removeSchedulableEntity(r2); + Assert.assertEquals(3, policy.getNumSchedulableEntities()); + Assert.assertEquals(3, policy.getSchedulableEntities().size()); + verifyAssignmentIteratorOrder(policy, + IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "p1"); + sel.setPartition(PARTITION2); + verifyAssignmentIteratorOrder(policy, sel); + verifyPreemptionIteratorOrder(policy, "s1", "s2", "p1"); + } + + private void verifyAssignmentIteratorOrder( + FifoOrderingPolicyWithExclusivePartitions policy, + IteratorSelector sel, String... ids) { + verifyIteratorOrder(policy.getAssignmentIterator(sel), ids); + } + + private void verifyPreemptionIteratorOrder( + FifoOrderingPolicyWithExclusivePartitions policy, + String... ids) { + verifyIteratorOrder(policy.getPreemptionIterator(), ids); + } + + private void verifyIteratorOrder(Iterator itr, + String... ids) { + for (String id : ids) { + Assert.assertEquals(id, itr.next().getId()); + } + Assert.assertFalse(itr.hasNext()); + } +}