YARN-9730. Support forcing configured partitions to be exclusive based on app node label

(cherry picked from commit 73a044a63822303f792183244e25432528ecfb1e)
(cherry picked from commit dd094d79023f6598e47146166aa8c213e03d41b7)
(cherry picked from commit 10bdcb6f1da3b86146efa479c0bbc8d1da505789)
This commit is contained in:
Jonathan Hung 2019-09-24 12:13:29 -07:00
parent 90fbfbbe71
commit eedbf9d195
23 changed files with 849 additions and 26 deletions

View File

@ -3207,6 +3207,12 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
CENTRALIZED_NODELABEL_CONFIGURATION_TYPE;
public static final String EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX
= "exclusive-enforced-partitions";
public static final String EXCLUSIVE_ENFORCED_PARTITIONS = NODE_LABELS_PREFIX
+ EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX;
public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY =
YARN_PREFIX + "cluster.max-application-priority";

View File

@ -3557,4 +3557,13 @@
<value>60000</value>
</property>
<property>
<description>
Comma-separated list of partitions. If a label P is in this list,
then the RM will enforce that an app has resource requests with label
P iff that app's node label expression is P.
</description>
<name>yarn.node-labels.exclusive-enforced-partitions</name>
<value></value>
</property>
</configuration>

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -206,6 +207,11 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
&& ResourceRequest.ANY.equals(req.getResourceName())) {
req.setNodeLabelExpression(asc.getNodeLabelExpression());
}
if (ResourceRequest.ANY.equals(req.getResourceName())) {
SchedulerUtils.enforcePartitionExclusivity(req,
getRmContext().getExclusiveEnforcedPartitions(),
asc.getNodeLabelExpression());
}
}
Resource maximumCapacity =

View File

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -90,6 +91,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private final ApplicationACLsManager applicationACLsManager;
private Configuration conf;
private YarnAuthorizationProvider authorizer;
private Set<String> exclusiveEnforcedPartitions;
public RMAppManager(RMContext context,
YarnScheduler scheduler, ApplicationMasterService masterService,
@ -110,6 +112,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
}
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
this.exclusiveEnforcedPartitions = context.getExclusiveEnforcedPartitions();
}
/**
@ -490,6 +493,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
throw new InvalidResourceRequestException("Invalid resource request, "
+ "no resource request specified with " + ResourceRequest.ANY);
}
SchedulerUtils.enforcePartitionExclusivity(anyReq,
exclusiveEnforcedPartitions,
submissionContext.getNodeLabelExpression());
// Make sure that all of the requests agree with the ANY request
// and have correct values

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
@ -161,4 +162,7 @@ public interface RMContext extends ApplicationMasterServiceContext {
ResourceManager getResourceManager();
String getAppProxyUrl(Configuration conf, ApplicationId applicationId);
Set<String> getExclusiveEnforcedPartitions();
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
@ -33,6 +35,7 @@ import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
@ -582,4 +585,16 @@ public class RMContextImpl implements RMContext {
return UNAVAILABLE;
}
}
public Set<String> getExclusiveEnforcedPartitions() {
String[] configuredPartitions = getYarnConfiguration().getStrings(
YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS);
Set<String> exclusiveEnforcedPartitions = new HashSet<>();
if (configuredPartitions != null) {
for (String partition : configuredPartitions) {
exclusiveEnforcedPartitions.add(partition);
}
}
return exclusiveEnforcedPartitions;
}
}

View File

@ -199,6 +199,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private AtomicLong unconfirmedAllocatedMem = new AtomicLong();
private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger();
private String nodeLabelExpression;
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext) {
@ -223,6 +225,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
unmanagedAM = appSubmissionContext.getUnmanagedAM();
this.logAggregationContext =
appSubmissionContext.getLogAggregationContext();
this.nodeLabelExpression =
appSubmissionContext.getNodeLabelExpression();
}
}
@ -1388,4 +1392,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return diagnosticMessage;
}
}
@Override
public String getPartition() {
return nodeLabelExpression == null ? "" : nodeLabelExpression;
}
}

View File

@ -257,6 +257,29 @@ public class SchedulerUtils {
false, rmContext, queueInfo);
}
/**
* If RM should enforce partition exclusivity for enforced partition "x":
* 1) If request is "x" and app label is not "x",
* override request to app's label.
* 2) If app label is "x", ensure request is "x".
* @param resReq resource request
* @param enforcedPartitions list of exclusive enforced partitions
* @param appLabel app's node label expression
*/
public static void enforcePartitionExclusivity(ResourceRequest resReq,
Set<String> enforcedPartitions, String appLabel) {
if (enforcedPartitions == null || enforcedPartitions.isEmpty()) {
return;
}
if (!enforcedPartitions.contains(appLabel)
&& enforcedPartitions.contains(resReq.getNodeLabelExpression())) {
resReq.setNodeLabelExpression(appLabel);
}
if (enforcedPartitions.contains(appLabel)) {
resReq.setNodeLabelExpression(appLabel);
}
}
/**
* Utility method to validate a resource request, by insuring that the
* requested memory/vcore is non-negative and not greater than max

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.P
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -152,6 +153,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String FAIR_APP_ORDERING_POLICY = "fair";
public static final String FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY
= "fifo-with-partitions";
public static final String DEFAULT_APP_ORDERING_POLICY =
FIFO_APP_ORDERING_POLICY;
@ -469,6 +473,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) {
policyType = FairOrderingPolicy.class.getName();
}
if (policyType.trim().equals(FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY)) {
policyType = FifoOrderingPolicyWithExclusivePartitions.class.getName();
}
try {
orderingPolicy = (OrderingPolicy<S>)
Class.forName(policyType).newInstance();

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
@ -787,7 +788,8 @@ public class LeafQueue extends AbstractCSQueue {
}
for (Iterator<FiCaSchedulerApp> fsApp =
getPendingAppsOrderingPolicy().getAssignmentIterator();
getPendingAppsOrderingPolicy()
.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
fsApp.hasNext(); ) {
FiCaSchedulerApp application = fsApp.next();
ApplicationId applicationId = application.getApplicationId();
@ -1075,8 +1077,10 @@ public class LeafQueue extends AbstractCSQueue {
Map<String, CachedUserLimit> userLimits = new HashMap<>();
boolean needAssignToQueueCheck = true;
IteratorSelector sel = new IteratorSelector();
sel.setPartition(ps.getPartition());
for (Iterator<FiCaSchedulerApp> assignmentIterator =
orderingPolicy.getAssignmentIterator();
orderingPolicy.getAssignmentIterator(sel);
assignmentIterator.hasNext(); ) {
FiCaSchedulerApp application = assignmentIterator.next();

View File

@ -46,13 +46,13 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
public Collection<S> getSchedulableEntities() {
return schedulableEntities;
}
@Override
public Iterator<S> getAssignmentIterator() {
public Iterator<S> getAssignmentIterator(IteratorSelector sel) {
reorderScheduleEntities();
return schedulableEntities.iterator();
}
@Override
public Iterator<S> getPreemptionIterator() {
reorderScheduleEntities();
@ -137,5 +137,5 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
@Override
public abstract String getInfo();
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* Similar to {@link FifoOrderingPolicy}, but with separate ordering policies
* for each partition in
* {@code yarn.scheduler.capacity.<queue-path>.ordering-policy.exclusive-enforced-partitions}.
*/
public class FifoOrderingPolicyWithExclusivePartitions<S extends SchedulableEntity>
implements OrderingPolicy<S> {
private static final String DEFAULT_PARTITION = "DEFAULT_PARTITION";
private Map<String, OrderingPolicy<S>> orderingPolicies;
public FifoOrderingPolicyWithExclusivePartitions() {
this.orderingPolicies = new HashMap<>();
this.orderingPolicies.put(DEFAULT_PARTITION, new FifoOrderingPolicy());
}
public Collection<S> getSchedulableEntities() {
return unionOrderingPolicies().getSchedulableEntities();
}
public Iterator<S> getAssignmentIterator(IteratorSelector sel) {
// Return schedulable entities only from filtered partition
return getPartitionOrderingPolicy(sel.getPartition())
.getAssignmentIterator(sel);
}
public Iterator<S> getPreemptionIterator() {
// Entities from all partitions should be preemptible
return unionOrderingPolicies().getPreemptionIterator();
}
/**
* Union all schedulable entities from all ordering policies.
* @return ordering policy containing all schedulable entities
*/
private OrderingPolicy<S> unionOrderingPolicies() {
OrderingPolicy<S> ret = new FifoOrderingPolicy<>();
for (Map.Entry<String, OrderingPolicy<S>> entry
: orderingPolicies.entrySet()) {
ret.addAllSchedulableEntities(entry.getValue().getSchedulableEntities());
}
return ret;
}
public void addSchedulableEntity(S s) {
getPartitionOrderingPolicy(s.getPartition()).addSchedulableEntity(s);
}
public boolean removeSchedulableEntity(S s) {
return getPartitionOrderingPolicy(s.getPartition())
.removeSchedulableEntity(s);
}
public void addAllSchedulableEntities(Collection<S> sc) {
for (S entity : sc) {
getPartitionOrderingPolicy(entity.getPartition())
.addSchedulableEntity(entity);
}
}
public int getNumSchedulableEntities() {
// Return total number of schedulable entities, to maintain parity with
// existing FifoOrderingPolicy e.g. when determining if queue has reached
// its max app limit
int ret = 0;
for (Map.Entry<String, OrderingPolicy<S>> entry
: orderingPolicies.entrySet()) {
ret += entry.getValue().getNumSchedulableEntities();
}
return ret;
}
public void containerAllocated(S schedulableEntity, RMContainer r) {
getPartitionOrderingPolicy(schedulableEntity.getPartition())
.containerAllocated(schedulableEntity, r);
}
public void containerReleased(S schedulableEntity, RMContainer r) {
getPartitionOrderingPolicy(schedulableEntity.getPartition())
.containerReleased(schedulableEntity, r);
}
public void demandUpdated(S schedulableEntity) {
getPartitionOrderingPolicy(schedulableEntity.getPartition())
.demandUpdated(schedulableEntity);
}
@Override
public void configure(Map<String, String> conf) {
if (conf == null) {
return;
}
String partitions =
conf.get(YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX);
if (partitions != null) {
for (String partition : partitions.split(",")) {
partition = partition.trim();
if (!partition.isEmpty()) {
this.orderingPolicies.put(partition, new FifoOrderingPolicy());
}
}
}
}
@Override
public String getInfo() {
return "FifoOrderingPolicyWithExclusivePartitions";
}
private OrderingPolicy<S> getPartitionOrderingPolicy(String partition) {
String keyPartition = orderingPolicies.containsKey(partition) ?
partition : DEFAULT_PARTITION;
return orderingPolicies.get(keyPartition);
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
/**
* IteratorSelector contains information needed to tell an
* {@link OrderingPolicy} what to return in an iterator.
*/
public class IteratorSelector {
public static final IteratorSelector EMPTY_ITERATOR_SELECTOR =
new IteratorSelector();
private String partition;
/**
* The partition for this iterator selector.
* @return partition
*/
public String getPartition() {
return this.partition;
}
/**
* Set partition for this iterator selector.
* @param p partition
*/
public void setPartition(String p) {
this.partition = p;
}
}

View File

@ -45,10 +45,11 @@ public interface OrderingPolicy<S extends SchedulableEntity> {
/**
* Return an iterator over the collection of {@link SchedulableEntity}
* objects which orders them for container assignment.
* @param sel the {@link IteratorSelector} to filter with
* @return an iterator over the collection of {@link SchedulableEntity}
* objects
*/
public Iterator<S> getAssignmentIterator();
Iterator<S> getAssignmentIterator(IteratorSelector sel);
/**
* Return an iterator over the collection of {@link SchedulableEntity}

View File

@ -55,4 +55,9 @@ public interface SchedulableEntity {
*/
public boolean isRecovering();
/**
* Get partition corresponding to this entity.
* @return partition
*/
String getPartition();
}

View File

@ -32,6 +32,7 @@ import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -798,6 +799,147 @@ public class TestSchedulerUtils {
System.err.println("Failed to wait scheduler application attempt stopped.");
}
@Test
public void testEnforcePartitionExclusivity() {
String enforcedExclusiveLabel = "x";
Set<String> enforcedExclusiveLabelSet =
Collections.singleton(enforcedExclusiveLabel);
String dummyLabel = "y";
String appLabel = "appLabel";
ResourceRequest rr = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1);
// RR label unset and app label does not match. Nothing should happen.
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertNull(rr.getNodeLabelExpression());
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertNull(rr.getNodeLabelExpression());
// RR label and app label do not match. Nothing should happen.
rr.setNodeLabelExpression(dummyLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
// RR label matches but app label does not. RR label should be set
// to app label
rr.setNodeLabelExpression(enforcedExclusiveLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertNull(rr.getNodeLabelExpression());
rr.setNodeLabelExpression(enforcedExclusiveLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertEquals(appLabel, rr.getNodeLabelExpression());
// RR label unset and app label matches. RR label should be set
// to app label
rr.setNodeLabelExpression(null);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedExclusiveLabel);
Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
// RR label does not match and app label matches. RR label should be set
// to app label
rr.setNodeLabelExpression(dummyLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedExclusiveLabel);
Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
// RR label and app label matches. Nothing should happen.
rr.setNodeLabelExpression(enforcedExclusiveLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedExclusiveLabel);
Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
// Unconfigured label: nothing should happen.
rr.setNodeLabelExpression(null);
SchedulerUtils.enforcePartitionExclusivity(rr, null,
appLabel);
Assert.assertNull(rr.getNodeLabelExpression());
rr.setNodeLabelExpression(dummyLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, null,
appLabel);
Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
rr.setNodeLabelExpression(enforcedExclusiveLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, null,
appLabel);
Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
}
@Test
public void testEnforcePartitionExclusivityMultipleLabels() {
String enforcedLabel1 = "x";
String enforcedLabel2 = "y";
Set<String> enforcedExclusiveLabelSet = new HashSet<>();
enforcedExclusiveLabelSet.add(enforcedLabel1);
enforcedExclusiveLabelSet.add(enforcedLabel2);
String dummyLabel = "dummyLabel";
String appLabel = "appLabel";
ResourceRequest rr = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1);
// RR label unset and app label does not match. Nothing should happen.
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertNull(rr.getNodeLabelExpression());
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertNull(rr.getNodeLabelExpression());
// RR label and app label do not match. Nothing should happen.
rr.setNodeLabelExpression(dummyLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
// RR label matches but app label does not. RR label should be set
// to app label
rr.setNodeLabelExpression(enforcedLabel1);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertNull(rr.getNodeLabelExpression());
rr.setNodeLabelExpression(enforcedLabel2);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertEquals(appLabel, rr.getNodeLabelExpression());
// RR label unset and app label matches. RR label should be set
// to app label
rr.setNodeLabelExpression(null);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedLabel1);
Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
// RR label does not match and app label matches. RR label should be set
// to app label
rr.setNodeLabelExpression(dummyLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedLabel2);
Assert.assertEquals(enforcedLabel2, rr.getNodeLabelExpression());
// RR label and app label matches. Nothing should happen.
rr.setNodeLabelExpression(enforcedLabel1);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedLabel1);
Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
// RR label and app label don't match, but they're both enforced labels.
// RR label should be set to app label.
rr.setNodeLabelExpression(enforcedLabel2);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedLabel1);
Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
}
public static SchedulerApplication<SchedulerApplicationAttempt>
verifyAppAddedAndRemovedFromScheduler(
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,

View File

@ -124,6 +124,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -1229,8 +1230,9 @@ public class TestCapacityScheduler {
//This happens because app2 has no demand/a magnitude of NaN, which
//results in app1 and app2 being equal in the fairness comparison and
//failling back to fifo (start) ordering
assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
appId1.toString());
assertEquals(q.getOrderingPolicy().getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
appId1.toString());
//Now, allocate for app2 (this would be the first/AM allocation)
ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);
@ -1243,8 +1245,9 @@ public class TestCapacityScheduler {
//verify re-ordering based on the allocation alone
//Now, the first app for assignment is app2
assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
appId2.toString());
assertEquals(q.getOrderingPolicy().getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
appId2.toString());
rm.stop();
}

View File

@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
@ -122,6 +123,8 @@ public class TestLeafQueue {
final static int GB = 1024;
final static String DEFAULT_RACK = "/default";
private final static String LABEL = "test";
private final ResourceCalculator resourceCalculator =
new DefaultResourceCalculator();
@ -130,14 +133,19 @@ public class TestLeafQueue {
@Before
public void setUp() throws Exception {
setUpInternal(resourceCalculator);
setUpInternal(resourceCalculator, false);
}
private void setUpWithDominantResourceCalculator() throws Exception {
setUpInternal(dominantResourceCalculator);
setUpInternal(dominantResourceCalculator, false);
}
private void setUpInternal(ResourceCalculator rC) throws Exception {
private void setUpWithNodeLabels() throws Exception {
setUpInternal(resourceCalculator, true);
}
private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels)
throws Exception {
CapacityScheduler spyCs = new CapacityScheduler();
queues = new HashMap<String, CSQueue>();
cs = spy(spyCs);
@ -162,7 +170,7 @@ public class TestLeafQueue {
csConf.setBoolean(CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES,
false);
final String newRoot = "root" + System.currentTimeMillis();
setupQueueConfiguration(csConf, newRoot);
setupQueueConfiguration(csConf, newRoot, withNodeLabels);
YarnConfiguration conf = new YarnConfiguration();
cs.setConf(conf);
@ -216,24 +224,39 @@ public class TestLeafQueue {
private static final String E = "e";
private void setupQueueConfiguration(
CapacitySchedulerConfiguration conf,
final String newRoot) {
final String newRoot, boolean withNodeLabels) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {newRoot});
conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100);
conf.setAcl(CapacitySchedulerConfiguration.ROOT,
QueueACL.SUBMIT_APPLICATIONS, " ");
if (withNodeLabels) {
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, LABEL, 100);
conf.setMaximumCapacityByLabel(CapacitySchedulerConfiguration.ROOT,
LABEL, 100);
}
final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E});
conf.setCapacity(Q_newRoot, 100);
conf.setMaximumCapacity(Q_newRoot, 100);
conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
if (withNodeLabels) {
conf.setAccessibleNodeLabels(Q_newRoot, Collections.singleton(LABEL));
conf.setCapacityByLabel(Q_newRoot, LABEL, 100);
conf.setMaximumCapacityByLabel(Q_newRoot, LABEL, 100);
}
final String Q_A = Q_newRoot + "." + A;
conf.setCapacity(Q_A, 8.5f);
conf.setMaximumCapacity(Q_A, 20);
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
if (withNodeLabels) {
conf.setAccessibleNodeLabels(Q_A, Collections.singleton(LABEL));
conf.setCapacityByLabel(Q_A, LABEL, 100);
conf.setMaximumCapacityByLabel(Q_A, LABEL, 100);
}
final String Q_B = Q_newRoot + "." + B;
conf.setCapacity(Q_B, 80);
@ -3228,6 +3251,116 @@ public class TestLeafQueue {
Assert.assertEquals(3 * GB, app_0.getCurrentConsumption().getMemorySize());
}
@Test
public void testFifoWithPartitionsAssignment() throws Exception {
setUpWithNodeLabels();
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
OrderingPolicy<FiCaSchedulerApp> policy =
new FifoOrderingPolicyWithExclusivePartitions<>();
policy.configure(Collections.singletonMap(
YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, LABEL));
a.setOrderingPolicy(policy);
String host00 = "127.0.0.1";
String rack0 = "rack_0";
FiCaSchedulerNode node00 = TestUtils.getMockNode(host00, rack0, 0,
16 * GB);
when(node00.getPartition()).thenReturn(LABEL);
String host01 = "127.0.0.2";
FiCaSchedulerNode node01 = TestUtils.getMockNode(host01, rack0, 0,
16 * GB);
when(node01.getPartition()).thenReturn("");
final int numNodes = 4;
Resource clusterResource = Resources.createResource(numNodes * (16 * GB),
numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
String user0 = "user_0";
final ApplicationAttemptId appAttemptId0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 = spy(new FiCaSchedulerApp(appAttemptId0, user0, a,
mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5),
false));
a.submitApplicationAttempt(app0, user0);
final ApplicationAttemptId appAttemptId1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app1 = spy(new FiCaSchedulerApp(appAttemptId1, user0, a,
mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3),
false));
when(app1.getPartition()).thenReturn(LABEL);
a.submitApplicationAttempt(app1, user0);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
app1);
Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node00.getNodeID(),
node00, node01.getNodeID(), node01);
Priority priority = TestUtils.createMockPriority(1);
List<ResourceRequest> app0Requests = new ArrayList<>();
List<ResourceRequest> app1Requests = new ArrayList<>();
app0Requests.clear();
app0Requests.add(TestUtils
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priority,
recordFactory));
app0.updateResourceRequests(app0Requests);
app1Requests.clear();
app1Requests.add(TestUtils
.createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
recordFactory, LABEL));
app1.updateResourceRequests(app1Requests);
// app_1 will get containers since it is exclusive-enforced
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node00,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage()
.getUsed(LABEL).getMemorySize());
// app_0 should not get resources from node_0_0 since the labels
// don't match
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node00,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
Assert.assertEquals(0 * GB, app0.getCurrentConsumption().getMemorySize());
app1Requests.clear();
app1Requests.add(TestUtils
.createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
recordFactory, LABEL));
app1.updateResourceRequests(app1Requests);
// When node_0_1 heartbeats, app_0 should get containers
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node01,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage()
.getUsed(LABEL).getMemorySize());
app0Requests.clear();
app0Requests.add(TestUtils
.createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
recordFactory));
app0.updateResourceRequests(app0Requests);
// When node_0_0 heartbeats, app_1 should get containers again
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node00,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
Assert.assertEquals(2 * GB, app1.getSchedulingResourceUsage()
.getUsed(LABEL).getMemorySize());
}
@Test
public void testConcurrentAccess() throws Exception {
YarnConfiguration conf = new YarnConfiguration();

View File

@ -18,21 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
public class MockSchedulableEntity implements SchedulableEntity {
private String id;
private long serial = 0;
private Priority priority;
private boolean isRecovering;
private String partition = "";
public MockSchedulableEntity() { }
@ -101,4 +99,13 @@ public class MockSchedulableEntity implements SchedulableEntity {
protected void setRecovering(boolean entityRecovering) {
this.isRecovering = entityRecovering;
}
@Override
public String getPartition() {
return partition;
}
public void setPartition(String partition) {
this.partition = partition;
}
}

View File

@ -126,19 +126,25 @@ public class TestFairOrderingPolicy {
//Assignment, least to greatest consumption
checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
checkIds(schedOrder.getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR),
new String[]{"3", "2", "1"});
//Preemption, greatest to least
checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
//Change value without inform, should see no change
msp2.setUsed(Resources.createResource(6));
checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
checkIds(schedOrder.getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR),
new String[]{"3", "2", "1"});
checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
//Do inform, will reorder
schedOrder.containerAllocated(msp2, null);
checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"});
checkIds(schedOrder.getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR),
new String[]{"3", "1", "2"});
checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"});
}

View File

@ -61,7 +61,7 @@ public class TestFifoOrderingPolicy {
schedOrder.addSchedulableEntity(msp3);
//Assignment, oldest to youngest
checkSerials(schedOrder.getAssignmentIterator(), new long[]{1, 2, 3});
checkSerials(schedOrder.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[]{1, 2, 3});
//Preemption, youngest to oldest
checkSerials(schedOrder.getPreemptionIterator(), new long[]{3, 2, 1});

View File

@ -72,8 +72,9 @@ public class TestFifoOrderingPolicyForPendingApps {
schedOrder.addSchedulableEntity(msp7);
// Assignment with serial id's are 3,2,4,1,6,5,7
checkSerials(schedOrder.getAssignmentIterator(), new long[] { 3, 2, 4, 1,
6, 5, 7 });
checkSerials(schedOrder.getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[] {3, 2, 4, 1,
6, 5, 7});
//Preemption, youngest to oldest
checkSerials(schedOrder.getPreemptionIterator(), new long[] { 7, 5, 6, 1,

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests {@link FifoOrderingPolicyWithExclusivePartitions} ordering policy.
*/
public class TestFifoOrderingPolicyWithExclusivePartitions {
private static final String PARTITION = "test";
private static final String PARTITION2 = "test2";
@Test
public void testNoConfiguredExclusiveEnforcedPartitions() {
FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy =
new FifoOrderingPolicyWithExclusivePartitions<>();
policy.configure(Collections.EMPTY_MAP);
MockSchedulableEntity p1 = new MockSchedulableEntity(4, 0, false);
p1.setPartition(PARTITION);
p1.setId("p1");
MockSchedulableEntity p2 = new MockSchedulableEntity(3, 1, false);
p2.setPartition(PARTITION);
p2.setId("p2");
MockSchedulableEntity r1 = new MockSchedulableEntity(2, 0, false);
r1.setId("r1");
MockSchedulableEntity r2 = new MockSchedulableEntity(1, 0, false);
r2.setId("r2");
policy.addSchedulableEntity(p1);
policy.addAllSchedulableEntities(Arrays.asList(p2, r1, r2));
Assert.assertEquals(4, policy.getNumSchedulableEntities());
Assert.assertEquals(4, policy.getSchedulableEntities().size());
IteratorSelector sel = new IteratorSelector();
// Should behave like FifoOrderingPolicy, regardless of partition
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "p2", "r2", "r1", "p1");
verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "p2", "r2", "r1", "p1");
verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2");
policy.removeSchedulableEntity(p2);
policy.removeSchedulableEntity(r2);
Assert.assertEquals(2, policy.getNumSchedulableEntities());
Assert.assertEquals(2, policy.getSchedulableEntities().size());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1", "p1");
verifyPreemptionIteratorOrder(policy, "p1", "r1");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "r1", "p1");
verifyPreemptionIteratorOrder(policy, "p1", "r1");
}
@Test
public void testSingleExclusiveEnforcedPartition() {
FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy =
new FifoOrderingPolicyWithExclusivePartitions<>();
policy.configure(Collections.singletonMap(
YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, PARTITION));
// PARTITION iterator should return p2, p1, p3
MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false);
p1.setPartition(PARTITION);
p1.setId("p1");
MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false);
p2.setPartition(PARTITION);
p2.setId("p2");
MockSchedulableEntity p3 = new MockSchedulableEntity(3, 0, false);
p3.setPartition(PARTITION);
p3.setId("p3");
// non-PARTITION iterator should return r3, r2, r1
MockSchedulableEntity r1 = new MockSchedulableEntity(6, 0, false);
r1.setId("r1");
MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false);
r2.setId("r2");
MockSchedulableEntity r3 = new MockSchedulableEntity(2, 1, false);
r3.setId("r3");
policy.addSchedulableEntity(r1);
Assert.assertEquals(1, policy.getNumSchedulableEntities());
Assert.assertEquals("r1", policy.getSchedulableEntities()
.iterator().next().getId());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1");
verifyPreemptionIteratorOrder(policy, "r1");
List<MockSchedulableEntity> entities = Arrays.asList(r2, r3, p1, p2);
policy.addAllSchedulableEntities(entities);
policy.addSchedulableEntity(p3);
Assert.assertEquals(6, policy.getNumSchedulableEntities());
Assert.assertEquals(6, policy.getSchedulableEntities().size());
// Assignment iterator should return non-PARTITION entities,
// in order based on FifoOrderingPolicy
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r2", "r1");
// Preemption iterator should return all entities, in global order
verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
// Same thing as above, but with a non-empty partition
IteratorSelector sel = new IteratorSelector();
sel.setPartition("dummy");
verifyAssignmentIteratorOrder(policy, sel, "r3", "r2", "r1");
verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
// Should return PARTITION entities, in order based on FifoOrderingPolicy
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "p2", "p1", "p3");
verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
policy.removeSchedulableEntity(p2);
policy.removeSchedulableEntity(r2);
Assert.assertEquals(4, policy.getNumSchedulableEntities());
Assert.assertEquals(4, policy.getSchedulableEntities().size());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1");
verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "p1", "p3");
verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3");
policy.removeSchedulableEntity(p1);
policy.removeSchedulableEntity(p3);
Assert.assertEquals(2, policy.getNumSchedulableEntities());
Assert.assertEquals(2, policy.getSchedulableEntities().size());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1");
verifyPreemptionIteratorOrder(policy, "r1", "r3");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel);
verifyPreemptionIteratorOrder(policy, "r1", "r3");
}
@Test
public void testMultipleExclusiveEnforcedPartitions() {
FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy =
new FifoOrderingPolicyWithExclusivePartitions<>();
policy.configure(Collections.singletonMap(
YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX,
PARTITION + "," + PARTITION2));
// PARTITION iterator should return p2, p1
MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false);
p1.setPartition(PARTITION);
p1.setId("p1");
MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false);
p2.setPartition(PARTITION);
p2.setId("p2");
// PARTITION2 iterator should return r1, r2
MockSchedulableEntity r1 = new MockSchedulableEntity(3, 0, false);
r1.setPartition(PARTITION2);
r1.setId("r1");
MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false);
r2.setPartition(PARTITION2);
r2.setId("r2");
// default iterator should return s2, s1
MockSchedulableEntity s1 = new MockSchedulableEntity(6, 0, false);
s1.setId("s1");
MockSchedulableEntity s2 = new MockSchedulableEntity(2, 0, false);
s2.setId("s2");
policy.addAllSchedulableEntities(Arrays.asList(s1, s2, r1));
Assert.assertEquals(3, policy.getNumSchedulableEntities());
Assert.assertEquals(3, policy.getSchedulableEntities().size());
IteratorSelector sel = new IteratorSelector();
// assignment iterator returns only default (non-partitioned) entities
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
verifyPreemptionIteratorOrder(policy, "s1", "r1", "s2");
sel.setPartition(PARTITION2);
verifyAssignmentIteratorOrder(policy, sel, "r1");
policy.addAllSchedulableEntities(Arrays.asList(r2, p1, p2));
Assert.assertEquals(6, policy.getNumSchedulableEntities());
Assert.assertEquals(6, policy.getSchedulableEntities().size());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "p2", "p1");
sel.setPartition(PARTITION2);
verifyAssignmentIteratorOrder(policy, sel, "r1", "r2");
verifyPreemptionIteratorOrder(policy, "s1", "r2", "r1", "s2", "p1", "p2");
policy.removeSchedulableEntity(p2);
policy.removeSchedulableEntity(r1);
policy.removeSchedulableEntity(r2);
Assert.assertEquals(3, policy.getNumSchedulableEntities());
Assert.assertEquals(3, policy.getSchedulableEntities().size());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "p1");
sel.setPartition(PARTITION2);
verifyAssignmentIteratorOrder(policy, sel);
verifyPreemptionIteratorOrder(policy, "s1", "s2", "p1");
}
private void verifyAssignmentIteratorOrder(
FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy,
IteratorSelector sel, String... ids) {
verifyIteratorOrder(policy.getAssignmentIterator(sel), ids);
}
private void verifyPreemptionIteratorOrder(
FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy,
String... ids) {
verifyIteratorOrder(policy.getPreemptionIterator(), ids);
}
private void verifyIteratorOrder(Iterator<MockSchedulableEntity> itr,
String... ids) {
for (String id : ids) {
Assert.assertEquals(id, itr.next().getId());
}
Assert.assertFalse(itr.hasNext());
}
}