diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 76d45009c22..7139818f779 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3207,6 +3207,12 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
CENTRALIZED_NODELABEL_CONFIGURATION_TYPE;
+ public static final String EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX
+ = "exclusive-enforced-partitions";
+
+ public static final String EXCLUSIVE_ENFORCED_PARTITIONS = NODE_LABELS_PREFIX
+ + EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX;
+
public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY =
YARN_PREFIX + "cluster.max-application-priority";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 6574016caa5..0c26b4b52ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3557,4 +3557,13 @@
60000
+
+
+ Comma-separated list of partitions. If a label P is in this list,
+ then the RM will enforce that an app has resource requests with label
+ P iff that app's node label expression is P.
+
+ yarn.node-labels.exclusive-enforced-partitions
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index 0baf17aa952..65bbaca19cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -71,6 +71,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -206,6 +207,11 @@ public void allocate(ApplicationAttemptId appAttemptId,
&& ResourceRequest.ANY.equals(req.getResourceName())) {
req.setNodeLabelExpression(asc.getNodeLabelExpression());
}
+ if (ResourceRequest.ANY.equals(req.getResourceName())) {
+ SchedulerUtils.enforcePartitionExclusivity(req,
+ getRmContext().getExclusiveEnforcedPartitions(),
+ asc.getNodeLabelExpression());
+ }
}
Resource maximumCapacity =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 84a5383f299..5b50623c9bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -21,6 +21,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -90,6 +91,7 @@ public class RMAppManager implements EventHandler,
private final ApplicationACLsManager applicationACLsManager;
private Configuration conf;
private YarnAuthorizationProvider authorizer;
+ private Set exclusiveEnforcedPartitions;
public RMAppManager(RMContext context,
YarnScheduler scheduler, ApplicationMasterService masterService,
@@ -110,6 +112,7 @@ public RMAppManager(RMContext context,
this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
}
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
+ this.exclusiveEnforcedPartitions = context.getExclusiveEnforcedPartitions();
}
/**
@@ -490,6 +493,9 @@ private List validateAndCreateResourceRequest(
throw new InvalidResourceRequestException("Invalid resource request, "
+ "no resource request specified with " + ResourceRequest.ANY);
}
+ SchedulerUtils.enforcePartitionExclusivity(anyReq,
+ exclusiveEnforcedPartitions,
+ submissionContext.getNodeLabelExpression());
// Make sure that all of the requests agree with the ANY request
// and have correct values
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index b255a304eae..b5cc5a7ad77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
@@ -161,4 +162,7 @@ void setRMDelegatedNodeLabelsUpdater(
ResourceManager getResourceManager();
String getAppProxyUrl(Configuration conf, ApplicationId applicationId);
+
+ Set getExclusiveEnforcedPartitions();
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index d7c624d4863..ee9aa0d28a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -21,6 +21,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
@@ -33,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
@@ -582,4 +585,16 @@ public String getAppProxyUrl(Configuration conf, ApplicationId applicationId)
return UNAVAILABLE;
}
}
+
+ public Set getExclusiveEnforcedPartitions() {
+ String[] configuredPartitions = getYarnConfiguration().getStrings(
+ YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS);
+ Set exclusiveEnforcedPartitions = new HashSet<>();
+ if (configuredPartitions != null) {
+ for (String partition : configuredPartitions) {
+ exclusiveEnforcedPartitions.add(partition);
+ }
+ }
+ return exclusiveEnforcedPartitions;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 409f18487cb..0685f5dc1c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -199,6 +199,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private AtomicLong unconfirmedAllocatedMem = new AtomicLong();
private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger();
+ private String nodeLabelExpression;
+
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext) {
@@ -223,6 +225,8 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
unmanagedAM = appSubmissionContext.getUnmanagedAM();
this.logAggregationContext =
appSubmissionContext.getLogAggregationContext();
+ this.nodeLabelExpression =
+ appSubmissionContext.getNodeLabelExpression();
}
}
@@ -1388,4 +1392,9 @@ public String getDiagnosticMessage() {
return diagnosticMessage;
}
}
+
+ @Override
+ public String getPartition() {
+ return nodeLabelExpression == null ? "" : nodeLabelExpression;
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 041a762e9fb..f8fe979e043 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -257,6 +257,29 @@ public static void normalizeAndvalidateRequest(ResourceRequest resReq,
false, rmContext, queueInfo);
}
+ /**
+ * If RM should enforce partition exclusivity for enforced partition "x":
+ * 1) If request is "x" and app label is not "x",
+ * override request to app's label.
+ * 2) If app label is "x", ensure request is "x".
+ * @param resReq resource request
+ * @param enforcedPartitions list of exclusive enforced partitions
+ * @param appLabel app's node label expression
+ */
+ public static void enforcePartitionExclusivity(ResourceRequest resReq,
+ Set enforcedPartitions, String appLabel) {
+ if (enforcedPartitions == null || enforcedPartitions.isEmpty()) {
+ return;
+ }
+ if (!enforcedPartitions.contains(appLabel)
+ && enforcedPartitions.contains(resReq.getNodeLabelExpression())) {
+ resReq.setNodeLabelExpression(appLabel);
+ }
+ if (enforcedPartitions.contains(appLabel)) {
+ resReq.setNodeLabelExpression(appLabel);
+ }
+ }
+
/**
* Utility method to validate a resource request, by insuring that the
* requested memory/vcore is non-negative and not greater than max
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 0b9d9bf23e1..29fb2536913 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -152,6 +153,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String FAIR_APP_ORDERING_POLICY = "fair";
+ public static final String FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY
+ = "fifo-with-partitions";
+
public static final String DEFAULT_APP_ORDERING_POLICY =
FIFO_APP_ORDERING_POLICY;
@@ -469,6 +473,9 @@ public OrderingPolicy getAppOrderingPolicy(
if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) {
policyType = FairOrderingPolicy.class.getName();
}
+ if (policyType.trim().equals(FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY)) {
+ policyType = FifoOrderingPolicyWithExclusivePartitions.class.getName();
+ }
try {
orderingPolicy = (OrderingPolicy)
Class.forName(policyType).newInstance();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 993f0894923..546c334a771 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -71,6 +71,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
@@ -787,7 +788,8 @@ private void activateApplications() {
}
for (Iterator fsApp =
- getPendingAppsOrderingPolicy().getAssignmentIterator();
+ getPendingAppsOrderingPolicy()
+ .getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
fsApp.hasNext(); ) {
FiCaSchedulerApp application = fsApp.next();
ApplicationId applicationId = application.getApplicationId();
@@ -1075,8 +1077,10 @@ public CSAssignment assignContainers(Resource clusterResource,
Map userLimits = new HashMap<>();
boolean needAssignToQueueCheck = true;
+ IteratorSelector sel = new IteratorSelector();
+ sel.setPartition(ps.getPartition());
for (Iterator assignmentIterator =
- orderingPolicy.getAssignmentIterator();
+ orderingPolicy.getAssignmentIterator(sel);
assignmentIterator.hasNext(); ) {
FiCaSchedulerApp application = assignmentIterator.next();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
index 09dd3bf3cc6..9f40d66cd1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
@@ -46,13 +46,13 @@ public AbstractComparatorOrderingPolicy() { }
public Collection getSchedulableEntities() {
return schedulableEntities;
}
-
+
@Override
- public Iterator getAssignmentIterator() {
+ public Iterator getAssignmentIterator(IteratorSelector sel) {
reorderScheduleEntities();
return schedulableEntities.iterator();
}
-
+
@Override
public Iterator getPreemptionIterator() {
reorderScheduleEntities();
@@ -137,5 +137,5 @@ public abstract void containerReleased(S schedulableEntity,
@Override
public abstract String getInfo();
-
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java
new file mode 100644
index 00000000000..1b1ef6606ab
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+
+/**
+ * Similar to {@link FifoOrderingPolicy}, but with separate ordering policies
+ * for each partition in
+ * {@code yarn.scheduler.capacity..ordering-policy.exclusive-enforced-partitions}.
+ */
+public class FifoOrderingPolicyWithExclusivePartitions
+ implements OrderingPolicy {
+
+ private static final String DEFAULT_PARTITION = "DEFAULT_PARTITION";
+
+ private Map> orderingPolicies;
+
+ public FifoOrderingPolicyWithExclusivePartitions() {
+ this.orderingPolicies = new HashMap<>();
+ this.orderingPolicies.put(DEFAULT_PARTITION, new FifoOrderingPolicy());
+ }
+
+ public Collection getSchedulableEntities() {
+ return unionOrderingPolicies().getSchedulableEntities();
+ }
+
+ public Iterator getAssignmentIterator(IteratorSelector sel) {
+ // Return schedulable entities only from filtered partition
+ return getPartitionOrderingPolicy(sel.getPartition())
+ .getAssignmentIterator(sel);
+ }
+
+ public Iterator getPreemptionIterator() {
+ // Entities from all partitions should be preemptible
+ return unionOrderingPolicies().getPreemptionIterator();
+ }
+
+ /**
+ * Union all schedulable entities from all ordering policies.
+ * @return ordering policy containing all schedulable entities
+ */
+ private OrderingPolicy unionOrderingPolicies() {
+ OrderingPolicy ret = new FifoOrderingPolicy<>();
+ for (Map.Entry> entry
+ : orderingPolicies.entrySet()) {
+ ret.addAllSchedulableEntities(entry.getValue().getSchedulableEntities());
+ }
+ return ret;
+ }
+
+ public void addSchedulableEntity(S s) {
+ getPartitionOrderingPolicy(s.getPartition()).addSchedulableEntity(s);
+ }
+
+ public boolean removeSchedulableEntity(S s) {
+ return getPartitionOrderingPolicy(s.getPartition())
+ .removeSchedulableEntity(s);
+ }
+
+ public void addAllSchedulableEntities(Collection sc) {
+ for (S entity : sc) {
+ getPartitionOrderingPolicy(entity.getPartition())
+ .addSchedulableEntity(entity);
+ }
+ }
+
+ public int getNumSchedulableEntities() {
+ // Return total number of schedulable entities, to maintain parity with
+ // existing FifoOrderingPolicy e.g. when determining if queue has reached
+ // its max app limit
+ int ret = 0;
+ for (Map.Entry> entry
+ : orderingPolicies.entrySet()) {
+ ret += entry.getValue().getNumSchedulableEntities();
+ }
+ return ret;
+ }
+
+ public void containerAllocated(S schedulableEntity, RMContainer r) {
+ getPartitionOrderingPolicy(schedulableEntity.getPartition())
+ .containerAllocated(schedulableEntity, r);
+ }
+
+ public void containerReleased(S schedulableEntity, RMContainer r) {
+ getPartitionOrderingPolicy(schedulableEntity.getPartition())
+ .containerReleased(schedulableEntity, r);
+ }
+
+ public void demandUpdated(S schedulableEntity) {
+ getPartitionOrderingPolicy(schedulableEntity.getPartition())
+ .demandUpdated(schedulableEntity);
+ }
+
+ @Override
+ public void configure(Map conf) {
+ if (conf == null) {
+ return;
+ }
+ String partitions =
+ conf.get(YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX);
+ if (partitions != null) {
+ for (String partition : partitions.split(",")) {
+ partition = partition.trim();
+ if (!partition.isEmpty()) {
+ this.orderingPolicies.put(partition, new FifoOrderingPolicy());
+ }
+ }
+ }
+ }
+
+ @Override
+ public String getInfo() {
+ return "FifoOrderingPolicyWithExclusivePartitions";
+ }
+
+ private OrderingPolicy getPartitionOrderingPolicy(String partition) {
+ String keyPartition = orderingPolicies.containsKey(partition) ?
+ partition : DEFAULT_PARTITION;
+ return orderingPolicies.get(keyPartition);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java
new file mode 100644
index 00000000000..0e9b55faf92
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+/**
+ * IteratorSelector contains information needed to tell an
+ * {@link OrderingPolicy} what to return in an iterator.
+ */
+public class IteratorSelector {
+
+ public static final IteratorSelector EMPTY_ITERATOR_SELECTOR =
+ new IteratorSelector();
+
+ private String partition;
+
+ /**
+ * The partition for this iterator selector.
+ * @return partition
+ */
+ public String getPartition() {
+ return this.partition;
+ }
+
+ /**
+ * Set partition for this iterator selector.
+ * @param p partition
+ */
+ public void setPartition(String p) {
+ this.partition = p;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
index 9aacc7e79a4..66b6a59ec4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
@@ -45,10 +45,11 @@ public interface OrderingPolicy {
/**
* Return an iterator over the collection of {@link SchedulableEntity}
* objects which orders them for container assignment.
+ * @param sel the {@link IteratorSelector} to filter with
* @return an iterator over the collection of {@link SchedulableEntity}
* objects
*/
- public Iterator getAssignmentIterator();
+ Iterator getAssignmentIterator(IteratorSelector sel);
/**
* Return an iterator over the collection of {@link SchedulableEntity}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
index 41b83ce7162..be835560ade 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
@@ -55,4 +55,9 @@ public interface SchedulableEntity {
*/
public boolean isRecovering();
+ /**
+ * Get partition corresponding to this entity.
+ * @return partition
+ */
+ String getPartition();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
index cdc67ed60ef..57b195f58c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
@@ -32,6 +32,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -798,6 +799,147 @@ public static void waitSchedulerApplicationAttemptStopped(
System.err.println("Failed to wait scheduler application attempt stopped.");
}
+ @Test
+ public void testEnforcePartitionExclusivity() {
+ String enforcedExclusiveLabel = "x";
+ Set enforcedExclusiveLabelSet =
+ Collections.singleton(enforcedExclusiveLabel);
+ String dummyLabel = "y";
+ String appLabel = "appLabel";
+ ResourceRequest rr = BuilderUtils.newResourceRequest(
+ mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1);
+
+ // RR label unset and app label does not match. Nothing should happen.
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ null);
+ Assert.assertNull(rr.getNodeLabelExpression());
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ appLabel);
+ Assert.assertNull(rr.getNodeLabelExpression());
+
+ // RR label and app label do not match. Nothing should happen.
+ rr.setNodeLabelExpression(dummyLabel);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ null);
+ Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ appLabel);
+ Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
+
+ // RR label matches but app label does not. RR label should be set
+ // to app label
+ rr.setNodeLabelExpression(enforcedExclusiveLabel);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ null);
+ Assert.assertNull(rr.getNodeLabelExpression());
+ rr.setNodeLabelExpression(enforcedExclusiveLabel);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ appLabel);
+ Assert.assertEquals(appLabel, rr.getNodeLabelExpression());
+
+ // RR label unset and app label matches. RR label should be set
+ // to app label
+ rr.setNodeLabelExpression(null);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ enforcedExclusiveLabel);
+ Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
+
+ // RR label does not match and app label matches. RR label should be set
+ // to app label
+ rr.setNodeLabelExpression(dummyLabel);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ enforcedExclusiveLabel);
+ Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
+
+ // RR label and app label matches. Nothing should happen.
+ rr.setNodeLabelExpression(enforcedExclusiveLabel);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ enforcedExclusiveLabel);
+ Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
+
+ // Unconfigured label: nothing should happen.
+ rr.setNodeLabelExpression(null);
+ SchedulerUtils.enforcePartitionExclusivity(rr, null,
+ appLabel);
+ Assert.assertNull(rr.getNodeLabelExpression());
+ rr.setNodeLabelExpression(dummyLabel);
+ SchedulerUtils.enforcePartitionExclusivity(rr, null,
+ appLabel);
+ Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
+ rr.setNodeLabelExpression(enforcedExclusiveLabel);
+ SchedulerUtils.enforcePartitionExclusivity(rr, null,
+ appLabel);
+ Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
+ }
+
+ @Test
+ public void testEnforcePartitionExclusivityMultipleLabels() {
+ String enforcedLabel1 = "x";
+ String enforcedLabel2 = "y";
+ Set enforcedExclusiveLabelSet = new HashSet<>();
+ enforcedExclusiveLabelSet.add(enforcedLabel1);
+ enforcedExclusiveLabelSet.add(enforcedLabel2);
+ String dummyLabel = "dummyLabel";
+ String appLabel = "appLabel";
+ ResourceRequest rr = BuilderUtils.newResourceRequest(
+ mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1);
+
+ // RR label unset and app label does not match. Nothing should happen.
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ null);
+ Assert.assertNull(rr.getNodeLabelExpression());
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ appLabel);
+ Assert.assertNull(rr.getNodeLabelExpression());
+
+ // RR label and app label do not match. Nothing should happen.
+ rr.setNodeLabelExpression(dummyLabel);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ null);
+ Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ appLabel);
+ Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
+
+ // RR label matches but app label does not. RR label should be set
+ // to app label
+ rr.setNodeLabelExpression(enforcedLabel1);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ null);
+ Assert.assertNull(rr.getNodeLabelExpression());
+ rr.setNodeLabelExpression(enforcedLabel2);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ appLabel);
+ Assert.assertEquals(appLabel, rr.getNodeLabelExpression());
+
+ // RR label unset and app label matches. RR label should be set
+ // to app label
+ rr.setNodeLabelExpression(null);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ enforcedLabel1);
+ Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
+
+ // RR label does not match and app label matches. RR label should be set
+ // to app label
+ rr.setNodeLabelExpression(dummyLabel);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ enforcedLabel2);
+ Assert.assertEquals(enforcedLabel2, rr.getNodeLabelExpression());
+
+ // RR label and app label matches. Nothing should happen.
+ rr.setNodeLabelExpression(enforcedLabel1);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ enforcedLabel1);
+ Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
+
+ // RR label and app label don't match, but they're both enforced labels.
+ // RR label should be set to app label.
+ rr.setNodeLabelExpression(enforcedLabel2);
+ SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+ enforcedLabel1);
+ Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
+ }
+
public static SchedulerApplication
verifyAppAddedAndRemovedFromScheduler(
Map> applications,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 0dd79c893a9..59228037012 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -124,6 +124,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -1229,8 +1230,9 @@ public void testAllocateReorder() throws Exception {
//This happens because app2 has no demand/a magnitude of NaN, which
//results in app1 and app2 being equal in the fairness comparison and
//failling back to fifo (start) ordering
- assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
- appId1.toString());
+ assertEquals(q.getOrderingPolicy().getAssignmentIterator(
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
+ appId1.toString());
//Now, allocate for app2 (this would be the first/AM allocation)
ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);
@@ -1243,8 +1245,9 @@ public void testAllocateReorder() throws Exception {
//verify re-ordering based on the allocation alone
//Now, the first app for assignment is app2
- assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
- appId2.toString());
+ assertEquals(q.getOrderingPolicy().getAssignmentIterator(
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
+ appId2.toString());
rm.stop();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 3b76fb27967..b2283fb0b46 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -77,6 +77,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
@@ -122,6 +123,8 @@ public class TestLeafQueue {
final static int GB = 1024;
final static String DEFAULT_RACK = "/default";
+ private final static String LABEL = "test";
+
private final ResourceCalculator resourceCalculator =
new DefaultResourceCalculator();
@@ -130,14 +133,19 @@ public class TestLeafQueue {
@Before
public void setUp() throws Exception {
- setUpInternal(resourceCalculator);
+ setUpInternal(resourceCalculator, false);
}
private void setUpWithDominantResourceCalculator() throws Exception {
- setUpInternal(dominantResourceCalculator);
+ setUpInternal(dominantResourceCalculator, false);
}
- private void setUpInternal(ResourceCalculator rC) throws Exception {
+ private void setUpWithNodeLabels() throws Exception {
+ setUpInternal(resourceCalculator, true);
+ }
+
+ private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels)
+ throws Exception {
CapacityScheduler spyCs = new CapacityScheduler();
queues = new HashMap();
cs = spy(spyCs);
@@ -162,7 +170,7 @@ private void setUpInternal(ResourceCalculator rC) throws Exception {
csConf.setBoolean(CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES,
false);
final String newRoot = "root" + System.currentTimeMillis();
- setupQueueConfiguration(csConf, newRoot);
+ setupQueueConfiguration(csConf, newRoot, withNodeLabels);
YarnConfiguration conf = new YarnConfiguration();
cs.setConf(conf);
@@ -216,24 +224,39 @@ private void setUpInternal(ResourceCalculator rC) throws Exception {
private static final String E = "e";
private void setupQueueConfiguration(
CapacitySchedulerConfiguration conf,
- final String newRoot) {
+ final String newRoot, boolean withNodeLabels) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {newRoot});
conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100);
conf.setAcl(CapacitySchedulerConfiguration.ROOT,
QueueACL.SUBMIT_APPLICATIONS, " ");
+ if (withNodeLabels) {
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, LABEL, 100);
+ conf.setMaximumCapacityByLabel(CapacitySchedulerConfiguration.ROOT,
+ LABEL, 100);
+ }
final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E});
conf.setCapacity(Q_newRoot, 100);
conf.setMaximumCapacity(Q_newRoot, 100);
conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
+ if (withNodeLabels) {
+ conf.setAccessibleNodeLabels(Q_newRoot, Collections.singleton(LABEL));
+ conf.setCapacityByLabel(Q_newRoot, LABEL, 100);
+ conf.setMaximumCapacityByLabel(Q_newRoot, LABEL, 100);
+ }
final String Q_A = Q_newRoot + "." + A;
conf.setCapacity(Q_A, 8.5f);
conf.setMaximumCapacity(Q_A, 20);
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
+ if (withNodeLabels) {
+ conf.setAccessibleNodeLabels(Q_A, Collections.singleton(LABEL));
+ conf.setCapacityByLabel(Q_A, LABEL, 100);
+ conf.setMaximumCapacityByLabel(Q_A, LABEL, 100);
+ }
final String Q_B = Q_newRoot + "." + B;
conf.setCapacity(Q_B, 80);
@@ -3228,6 +3251,116 @@ public void testFifoAssignment() throws Exception {
Assert.assertEquals(3 * GB, app_0.getCurrentConsumption().getMemorySize());
}
+ @Test
+ public void testFifoWithPartitionsAssignment() throws Exception {
+ setUpWithNodeLabels();
+
+ LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
+ OrderingPolicy policy =
+ new FifoOrderingPolicyWithExclusivePartitions<>();
+ policy.configure(Collections.singletonMap(
+ YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, LABEL));
+ a.setOrderingPolicy(policy);
+ String host00 = "127.0.0.1";
+ String rack0 = "rack_0";
+ FiCaSchedulerNode node00 = TestUtils.getMockNode(host00, rack0, 0,
+ 16 * GB);
+ when(node00.getPartition()).thenReturn(LABEL);
+ String host01 = "127.0.0.2";
+ FiCaSchedulerNode node01 = TestUtils.getMockNode(host01, rack0, 0,
+ 16 * GB);
+ when(node01.getPartition()).thenReturn("");
+
+ final int numNodes = 4;
+ Resource clusterResource = Resources.createResource(numNodes * (16 * GB),
+ numNodes * 16);
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ String user0 = "user_0";
+
+ final ApplicationAttemptId appAttemptId0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app0 = spy(new FiCaSchedulerApp(appAttemptId0, user0, a,
+ mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5),
+ false));
+ a.submitApplicationAttempt(app0, user0);
+
+ final ApplicationAttemptId appAttemptId1 =
+ TestUtils.getMockApplicationAttemptId(1, 0);
+ FiCaSchedulerApp app1 = spy(new FiCaSchedulerApp(appAttemptId1, user0, a,
+ mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3),
+ false));
+ when(app1.getPartition()).thenReturn(LABEL);
+ a.submitApplicationAttempt(app1, user0);
+
+ Map apps = ImmutableMap.of(
+ app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
+ app1);
+ Map nodes = ImmutableMap.of(node00.getNodeID(),
+ node00, node01.getNodeID(), node01);
+
+ Priority priority = TestUtils.createMockPriority(1);
+ List app0Requests = new ArrayList<>();
+ List app1Requests = new ArrayList<>();
+
+ app0Requests.clear();
+ app0Requests.add(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priority,
+ recordFactory));
+ app0.updateResourceRequests(app0Requests);
+
+ app1Requests.clear();
+ app1Requests.add(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+ recordFactory, LABEL));
+ app1.updateResourceRequests(app1Requests);
+
+ // app_1 will get containers since it is exclusive-enforced
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node00,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+ Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage()
+ .getUsed(LABEL).getMemorySize());
+ // app_0 should not get resources from node_0_0 since the labels
+ // don't match
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node00,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+ Assert.assertEquals(0 * GB, app0.getCurrentConsumption().getMemorySize());
+
+ app1Requests.clear();
+ app1Requests.add(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+ recordFactory, LABEL));
+ app1.updateResourceRequests(app1Requests);
+
+ // When node_0_1 heartbeats, app_0 should get containers
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node01,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+ Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
+ Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage()
+ .getUsed(LABEL).getMemorySize());
+
+ app0Requests.clear();
+ app0Requests.add(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+ recordFactory));
+ app0.updateResourceRequests(app0Requests);
+
+ // When node_0_0 heartbeats, app_1 should get containers again
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node00,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+ Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
+ Assert.assertEquals(2 * GB, app1.getSchedulingResourceUsage()
+ .getUsed(LABEL).getMemorySize());
+ }
+
@Test
public void testConcurrentAccess() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java
index 4f251bf4e38..62f7a4956b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java
@@ -18,21 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
-import java.util.*;
-
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
public class MockSchedulableEntity implements SchedulableEntity {
-
+
private String id;
private long serial = 0;
private Priority priority;
private boolean isRecovering;
+ private String partition = "";
public MockSchedulableEntity() { }
@@ -101,4 +99,13 @@ public boolean isRecovering() {
protected void setRecovering(boolean entityRecovering) {
this.isRecovering = entityRecovering;
}
+
+ @Override
+ public String getPartition() {
+ return partition;
+ }
+
+ public void setPartition(String partition) {
+ this.partition = partition;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
index 683173af709..e023e011e5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
@@ -126,19 +126,25 @@ public void testIterators() {
//Assignment, least to greatest consumption
- checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
+ checkIds(schedOrder.getAssignmentIterator(
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR),
+ new String[]{"3", "2", "1"});
//Preemption, greatest to least
checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
//Change value without inform, should see no change
msp2.setUsed(Resources.createResource(6));
- checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
+ checkIds(schedOrder.getAssignmentIterator(
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR),
+ new String[]{"3", "2", "1"});
checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
//Do inform, will reorder
schedOrder.containerAllocated(msp2, null);
- checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"});
+ checkIds(schedOrder.getAssignmentIterator(
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR),
+ new String[]{"3", "1", "2"});
checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"});
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java
index 447618f4f24..d94d0775589 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java
@@ -61,7 +61,7 @@ public void testIterators() {
schedOrder.addSchedulableEntity(msp3);
//Assignment, oldest to youngest
- checkSerials(schedOrder.getAssignmentIterator(), new long[]{1, 2, 3});
+ checkSerials(schedOrder.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[]{1, 2, 3});
//Preemption, youngest to oldest
checkSerials(schedOrder.getPreemptionIterator(), new long[]{3, 2, 1});
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java
index befa8e6c321..3996b0dbbb2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java
@@ -72,8 +72,9 @@ public void testIterators() {
schedOrder.addSchedulableEntity(msp7);
// Assignment with serial id's are 3,2,4,1,6,5,7
- checkSerials(schedOrder.getAssignmentIterator(), new long[] { 3, 2, 4, 1,
- 6, 5, 7 });
+ checkSerials(schedOrder.getAssignmentIterator(
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[] {3, 2, 4, 1,
+ 6, 5, 7});
//Preemption, youngest to oldest
checkSerials(schedOrder.getPreemptionIterator(), new long[] { 7, 5, 6, 1,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java
new file mode 100644
index 00000000000..499a70a053c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Tests {@link FifoOrderingPolicyWithExclusivePartitions} ordering policy.
+ */
+public class TestFifoOrderingPolicyWithExclusivePartitions {
+
+ private static final String PARTITION = "test";
+ private static final String PARTITION2 = "test2";
+
+ @Test
+ public void testNoConfiguredExclusiveEnforcedPartitions() {
+ FifoOrderingPolicyWithExclusivePartitions policy =
+ new FifoOrderingPolicyWithExclusivePartitions<>();
+ policy.configure(Collections.EMPTY_MAP);
+
+ MockSchedulableEntity p1 = new MockSchedulableEntity(4, 0, false);
+ p1.setPartition(PARTITION);
+ p1.setId("p1");
+ MockSchedulableEntity p2 = new MockSchedulableEntity(3, 1, false);
+ p2.setPartition(PARTITION);
+ p2.setId("p2");
+
+ MockSchedulableEntity r1 = new MockSchedulableEntity(2, 0, false);
+ r1.setId("r1");
+ MockSchedulableEntity r2 = new MockSchedulableEntity(1, 0, false);
+ r2.setId("r2");
+
+ policy.addSchedulableEntity(p1);
+ policy.addAllSchedulableEntities(Arrays.asList(p2, r1, r2));
+ Assert.assertEquals(4, policy.getNumSchedulableEntities());
+ Assert.assertEquals(4, policy.getSchedulableEntities().size());
+ IteratorSelector sel = new IteratorSelector();
+ // Should behave like FifoOrderingPolicy, regardless of partition
+ verifyAssignmentIteratorOrder(policy,
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR, "p2", "r2", "r1", "p1");
+ verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2");
+ sel.setPartition(PARTITION);
+ verifyAssignmentIteratorOrder(policy, sel, "p2", "r2", "r1", "p1");
+ verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2");
+
+ policy.removeSchedulableEntity(p2);
+ policy.removeSchedulableEntity(r2);
+ Assert.assertEquals(2, policy.getNumSchedulableEntities());
+ Assert.assertEquals(2, policy.getSchedulableEntities().size());
+ verifyAssignmentIteratorOrder(policy,
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1", "p1");
+ verifyPreemptionIteratorOrder(policy, "p1", "r1");
+ sel.setPartition(PARTITION);
+ verifyAssignmentIteratorOrder(policy, sel, "r1", "p1");
+ verifyPreemptionIteratorOrder(policy, "p1", "r1");
+ }
+
+ @Test
+ public void testSingleExclusiveEnforcedPartition() {
+ FifoOrderingPolicyWithExclusivePartitions policy =
+ new FifoOrderingPolicyWithExclusivePartitions<>();
+ policy.configure(Collections.singletonMap(
+ YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, PARTITION));
+
+ // PARTITION iterator should return p2, p1, p3
+ MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false);
+ p1.setPartition(PARTITION);
+ p1.setId("p1");
+ MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false);
+ p2.setPartition(PARTITION);
+ p2.setId("p2");
+ MockSchedulableEntity p3 = new MockSchedulableEntity(3, 0, false);
+ p3.setPartition(PARTITION);
+ p3.setId("p3");
+
+ // non-PARTITION iterator should return r3, r2, r1
+ MockSchedulableEntity r1 = new MockSchedulableEntity(6, 0, false);
+ r1.setId("r1");
+ MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false);
+ r2.setId("r2");
+ MockSchedulableEntity r3 = new MockSchedulableEntity(2, 1, false);
+ r3.setId("r3");
+
+ policy.addSchedulableEntity(r1);
+ Assert.assertEquals(1, policy.getNumSchedulableEntities());
+ Assert.assertEquals("r1", policy.getSchedulableEntities()
+ .iterator().next().getId());
+ verifyAssignmentIteratorOrder(policy,
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1");
+ verifyPreemptionIteratorOrder(policy, "r1");
+
+ List entities = Arrays.asList(r2, r3, p1, p2);
+ policy.addAllSchedulableEntities(entities);
+ policy.addSchedulableEntity(p3);
+ Assert.assertEquals(6, policy.getNumSchedulableEntities());
+ Assert.assertEquals(6, policy.getSchedulableEntities().size());
+ // Assignment iterator should return non-PARTITION entities,
+ // in order based on FifoOrderingPolicy
+ verifyAssignmentIteratorOrder(policy,
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r2", "r1");
+ // Preemption iterator should return all entities, in global order
+ verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
+ // Same thing as above, but with a non-empty partition
+ IteratorSelector sel = new IteratorSelector();
+ sel.setPartition("dummy");
+ verifyAssignmentIteratorOrder(policy, sel, "r3", "r2", "r1");
+ verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
+ // Should return PARTITION entities, in order based on FifoOrderingPolicy
+ sel.setPartition(PARTITION);
+ verifyAssignmentIteratorOrder(policy, sel, "p2", "p1", "p3");
+ verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
+
+ policy.removeSchedulableEntity(p2);
+ policy.removeSchedulableEntity(r2);
+ Assert.assertEquals(4, policy.getNumSchedulableEntities());
+ Assert.assertEquals(4, policy.getSchedulableEntities().size());
+ verifyAssignmentIteratorOrder(policy,
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1");
+ verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3");
+ sel.setPartition(PARTITION);
+ verifyAssignmentIteratorOrder(policy, sel, "p1", "p3");
+ verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3");
+
+ policy.removeSchedulableEntity(p1);
+ policy.removeSchedulableEntity(p3);
+ Assert.assertEquals(2, policy.getNumSchedulableEntities());
+ Assert.assertEquals(2, policy.getSchedulableEntities().size());
+ verifyAssignmentIteratorOrder(policy,
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1");
+ verifyPreemptionIteratorOrder(policy, "r1", "r3");
+ sel.setPartition(PARTITION);
+ verifyAssignmentIteratorOrder(policy, sel);
+ verifyPreemptionIteratorOrder(policy, "r1", "r3");
+ }
+
+ @Test
+ public void testMultipleExclusiveEnforcedPartitions() {
+ FifoOrderingPolicyWithExclusivePartitions policy =
+ new FifoOrderingPolicyWithExclusivePartitions<>();
+ policy.configure(Collections.singletonMap(
+ YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX,
+ PARTITION + "," + PARTITION2));
+
+ // PARTITION iterator should return p2, p1
+ MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false);
+ p1.setPartition(PARTITION);
+ p1.setId("p1");
+ MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false);
+ p2.setPartition(PARTITION);
+ p2.setId("p2");
+
+ // PARTITION2 iterator should return r1, r2
+ MockSchedulableEntity r1 = new MockSchedulableEntity(3, 0, false);
+ r1.setPartition(PARTITION2);
+ r1.setId("r1");
+ MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false);
+ r2.setPartition(PARTITION2);
+ r2.setId("r2");
+
+ // default iterator should return s2, s1
+ MockSchedulableEntity s1 = new MockSchedulableEntity(6, 0, false);
+ s1.setId("s1");
+ MockSchedulableEntity s2 = new MockSchedulableEntity(2, 0, false);
+ s2.setId("s2");
+
+ policy.addAllSchedulableEntities(Arrays.asList(s1, s2, r1));
+ Assert.assertEquals(3, policy.getNumSchedulableEntities());
+ Assert.assertEquals(3, policy.getSchedulableEntities().size());
+ IteratorSelector sel = new IteratorSelector();
+ // assignment iterator returns only default (non-partitioned) entities
+ verifyAssignmentIteratorOrder(policy,
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
+ verifyPreemptionIteratorOrder(policy, "s1", "r1", "s2");
+ sel.setPartition(PARTITION2);
+ verifyAssignmentIteratorOrder(policy, sel, "r1");
+
+ policy.addAllSchedulableEntities(Arrays.asList(r2, p1, p2));
+ Assert.assertEquals(6, policy.getNumSchedulableEntities());
+ Assert.assertEquals(6, policy.getSchedulableEntities().size());
+ verifyAssignmentIteratorOrder(policy,
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
+ sel.setPartition(PARTITION);
+ verifyAssignmentIteratorOrder(policy, sel, "p2", "p1");
+ sel.setPartition(PARTITION2);
+ verifyAssignmentIteratorOrder(policy, sel, "r1", "r2");
+ verifyPreemptionIteratorOrder(policy, "s1", "r2", "r1", "s2", "p1", "p2");
+
+ policy.removeSchedulableEntity(p2);
+ policy.removeSchedulableEntity(r1);
+ policy.removeSchedulableEntity(r2);
+ Assert.assertEquals(3, policy.getNumSchedulableEntities());
+ Assert.assertEquals(3, policy.getSchedulableEntities().size());
+ verifyAssignmentIteratorOrder(policy,
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
+ sel.setPartition(PARTITION);
+ verifyAssignmentIteratorOrder(policy, sel, "p1");
+ sel.setPartition(PARTITION2);
+ verifyAssignmentIteratorOrder(policy, sel);
+ verifyPreemptionIteratorOrder(policy, "s1", "s2", "p1");
+ }
+
+ private void verifyAssignmentIteratorOrder(
+ FifoOrderingPolicyWithExclusivePartitions policy,
+ IteratorSelector sel, String... ids) {
+ verifyIteratorOrder(policy.getAssignmentIterator(sel), ids);
+ }
+
+ private void verifyPreemptionIteratorOrder(
+ FifoOrderingPolicyWithExclusivePartitions policy,
+ String... ids) {
+ verifyIteratorOrder(policy.getPreemptionIterator(), ids);
+ }
+
+ private void verifyIteratorOrder(Iterator itr,
+ String... ids) {
+ for (String id : ids) {
+ Assert.assertEquals(id, itr.next().getId());
+ }
+ Assert.assertFalse(itr.hasNext());
+ }
+}