YARN-9730. Support forcing configured partitions to be exclusive based on app node label

This commit is contained in:
Jonathan Hung 2019-09-24 12:13:29 -07:00
parent 66400c1cbb
commit c2731d4b63
23 changed files with 850 additions and 27 deletions

View File

@ -3789,6 +3789,12 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
CENTRALIZED_NODELABEL_CONFIGURATION_TYPE;
public static final String EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX
= "exclusive-enforced-partitions";
public static final String EXCLUSIVE_ENFORCED_PARTITIONS = NODE_LABELS_PREFIX
+ EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX;
public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY =
YARN_PREFIX + "cluster.max-application-priority";

View File

@ -4301,4 +4301,13 @@
<value>60000</value>
</property>
<property>
<description>
Comma-separated list of partitions. If a label P is in this list,
then the RM will enforce that an app has resource requests with label
P iff that app's node label expression is P.
</description>
<name>yarn.node-labels.exclusive-enforced-partitions</name>
<value></value>
</property>
</configuration>

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@ -236,6 +237,11 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
&& ResourceRequest.ANY.equals(req.getResourceName())) {
req.setNodeLabelExpression(asc.getNodeLabelExpression());
}
if (ResourceRequest.ANY.equals(req.getResourceName())) {
SchedulerUtils.enforcePartitionExclusivity(req,
getRmContext().getExclusiveEnforcedPartitions(),
asc.getNodeLabelExpression());
}
}
Resource maximumCapacity =

View File

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -102,6 +103,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private YarnAuthorizationProvider authorizer;
private boolean timelineServiceV2Enabled;
private boolean nodeLabelsEnabled;
private Set<String> exclusiveEnforcedPartitions;
public RMAppManager(RMContext context,
YarnScheduler scheduler, ApplicationMasterService masterService,
@ -126,6 +128,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
timelineServiceV2Enabled(conf);
this.nodeLabelsEnabled = YarnConfiguration
.areNodeLabelsEnabled(rmContext.getYarnConfiguration());
this.exclusiveEnforcedPartitions = context.getExclusiveEnforcedPartitions();
}
/**
@ -593,6 +596,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
throw new InvalidResourceRequestException("Invalid resource request, "
+ "no resource request specified with " + ResourceRequest.ANY);
}
SchedulerUtils.enforcePartitionExclusivity(anyReq,
exclusiveEnforcedPartitions,
submissionContext.getNodeLabelExpression());
// Make sure that all of the requests agree with the ANY request
// and have correct values

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
@ -203,4 +204,7 @@ public interface RMContext extends ApplicationMasterServiceContext {
long getTokenSequenceNo();
void incrTokenSequenceNo();
Set<String> getExclusiveEnforcedPartitions();
}

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
@ -678,4 +681,16 @@ public class RMContextImpl implements RMContext {
public void incrTokenSequenceNo() {
this.activeServiceContext.incrTokenSequenceNo();
}
public Set<String> getExclusiveEnforcedPartitions() {
String[] configuredPartitions = getYarnConfiguration().getStrings(
YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS);
Set<String> exclusiveEnforcedPartitions = new HashSet<>();
if (configuredPartitions != null) {
for (String partition : configuredPartitions) {
exclusiveEnforcedPartitions.add(partition);
}
}
return exclusiveEnforcedPartitions;
}
}

View File

@ -205,6 +205,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private AtomicLong unconfirmedAllocatedMem = new AtomicLong();
private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger();
private String nodeLabelExpression;
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext) {
@ -226,6 +228,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
unmanagedAM = appSubmissionContext.getUnmanagedAM();
this.logAggregationContext =
appSubmissionContext.getLogAggregationContext();
this.nodeLabelExpression =
appSubmissionContext.getNodeLabelExpression();
}
applicationSchedulingEnvs = rmApp.getApplicationSchedulingEnvs();
}
@ -1469,4 +1473,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public Map<String, String> getApplicationSchedulingEnvs() {
return this.applicationSchedulingEnvs;
}
@Override
public String getPartition() {
return nodeLabelExpression == null ? "" : nodeLabelExpression;
}
}

View File

@ -301,6 +301,29 @@ public class SchedulerUtils {
rmContext, queueInfo, nodeLabelsEnabled);
}
/**
* If RM should enforce partition exclusivity for enforced partition "x":
* 1) If request is "x" and app label is not "x",
* override request to app's label.
* 2) If app label is "x", ensure request is "x".
* @param resReq resource request
* @param enforcedPartitions list of exclusive enforced partitions
* @param appLabel app's node label expression
*/
public static void enforcePartitionExclusivity(ResourceRequest resReq,
Set<String> enforcedPartitions, String appLabel) {
if (enforcedPartitions == null || enforcedPartitions.isEmpty()) {
return;
}
if (!enforcedPartitions.contains(appLabel)
&& enforcedPartitions.contains(resReq.getNodeLabelExpression())) {
resReq.setNodeLabelExpression(appLabel);
}
if (enforcedPartitions.contains(appLabel)) {
resReq.setNodeLabelExpression(appLabel);
}
}
/**
* Utility method to validate a resource request, by ensuring that the
* requested memory/vcore is non-negative and not greater than max

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodePolicySpec;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
@ -162,6 +163,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String FAIR_APP_ORDERING_POLICY = "fair";
public static final String FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY
= "fifo-with-partitions";
public static final String DEFAULT_APP_ORDERING_POLICY =
FIFO_APP_ORDERING_POLICY;
@ -561,6 +565,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) {
policyType = FairOrderingPolicy.class.getName();
}
if (policyType.trim().equals(FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY)) {
policyType = FifoOrderingPolicyWithExclusivePartitions.class.getName();
}
try {
orderingPolicy = (OrderingPolicy<S>)
Class.forName(policyType).newInstance();

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
@ -804,7 +805,8 @@ public class LeafQueue extends AbstractCSQueue {
}
for (Iterator<FiCaSchedulerApp> fsApp =
getPendingAppsOrderingPolicy().getAssignmentIterator();
getPendingAppsOrderingPolicy()
.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
fsApp.hasNext(); ) {
FiCaSchedulerApp application = fsApp.next();
ApplicationId applicationId = application.getApplicationId();
@ -1095,8 +1097,10 @@ public class LeafQueue extends AbstractCSQueue {
Map<String, CachedUserLimit> userLimits = new HashMap<>();
boolean needAssignToQueueCheck = true;
IteratorSelector sel = new IteratorSelector();
sel.setPartition(candidates.getPartition());
for (Iterator<FiCaSchedulerApp> assignmentIterator =
orderingPolicy.getAssignmentIterator();
orderingPolicy.getAssignmentIterator(sel);
assignmentIterator.hasNext(); ) {
FiCaSchedulerApp application = assignmentIterator.next();

View File

@ -47,13 +47,13 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
public Collection<S> getSchedulableEntities() {
return schedulableEntities;
}
@Override
public Iterator<S> getAssignmentIterator() {
public Iterator<S> getAssignmentIterator(IteratorSelector sel) {
reorderScheduleEntities();
return schedulableEntities.iterator();
}
@Override
public Iterator<S> getPreemptionIterator() {
reorderScheduleEntities();
@ -138,5 +138,5 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
@Override
public abstract String getInfo();
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* Similar to {@link FifoOrderingPolicy}, but with separate ordering policies
* for each partition in
* {@code yarn.scheduler.capacity.<queue-path>.ordering-policy.exclusive-enforced-partitions}.
*/
public class FifoOrderingPolicyWithExclusivePartitions<S extends SchedulableEntity>
implements OrderingPolicy<S> {
private static final String DEFAULT_PARTITION = "DEFAULT_PARTITION";
private Map<String, OrderingPolicy<S>> orderingPolicies;
public FifoOrderingPolicyWithExclusivePartitions() {
this.orderingPolicies = new HashMap<>();
this.orderingPolicies.put(DEFAULT_PARTITION, new FifoOrderingPolicy());
}
public Collection<S> getSchedulableEntities() {
return unionOrderingPolicies().getSchedulableEntities();
}
public Iterator<S> getAssignmentIterator(IteratorSelector sel) {
// Return schedulable entities only from filtered partition
return getPartitionOrderingPolicy(sel.getPartition())
.getAssignmentIterator(sel);
}
public Iterator<S> getPreemptionIterator() {
// Entities from all partitions should be preemptible
return unionOrderingPolicies().getPreemptionIterator();
}
/**
* Union all schedulable entities from all ordering policies.
* @return ordering policy containing all schedulable entities
*/
private OrderingPolicy<S> unionOrderingPolicies() {
OrderingPolicy<S> ret = new FifoOrderingPolicy<>();
for (Map.Entry<String, OrderingPolicy<S>> entry
: orderingPolicies.entrySet()) {
ret.addAllSchedulableEntities(entry.getValue().getSchedulableEntities());
}
return ret;
}
public void addSchedulableEntity(S s) {
getPartitionOrderingPolicy(s.getPartition()).addSchedulableEntity(s);
}
public boolean removeSchedulableEntity(S s) {
return getPartitionOrderingPolicy(s.getPartition())
.removeSchedulableEntity(s);
}
public void addAllSchedulableEntities(Collection<S> sc) {
for (S entity : sc) {
getPartitionOrderingPolicy(entity.getPartition())
.addSchedulableEntity(entity);
}
}
public int getNumSchedulableEntities() {
// Return total number of schedulable entities, to maintain parity with
// existing FifoOrderingPolicy e.g. when determining if queue has reached
// its max app limit
int ret = 0;
for (Map.Entry<String, OrderingPolicy<S>> entry
: orderingPolicies.entrySet()) {
ret += entry.getValue().getNumSchedulableEntities();
}
return ret;
}
public void containerAllocated(S schedulableEntity, RMContainer r) {
getPartitionOrderingPolicy(schedulableEntity.getPartition())
.containerAllocated(schedulableEntity, r);
}
public void containerReleased(S schedulableEntity, RMContainer r) {
getPartitionOrderingPolicy(schedulableEntity.getPartition())
.containerReleased(schedulableEntity, r);
}
public void demandUpdated(S schedulableEntity) {
getPartitionOrderingPolicy(schedulableEntity.getPartition())
.demandUpdated(schedulableEntity);
}
@Override
public void configure(Map<String, String> conf) {
if (conf == null) {
return;
}
String partitions =
conf.get(YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX);
if (partitions != null) {
for (String partition : partitions.split(",")) {
partition = partition.trim();
if (!partition.isEmpty()) {
this.orderingPolicies.put(partition, new FifoOrderingPolicy());
}
}
}
}
@Override
public String getInfo() {
return "FifoOrderingPolicyWithExclusivePartitions";
}
private OrderingPolicy<S> getPartitionOrderingPolicy(String partition) {
String keyPartition = orderingPolicies.containsKey(partition) ?
partition : DEFAULT_PARTITION;
return orderingPolicies.get(keyPartition);
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
/**
* IteratorSelector contains information needed to tell an
* {@link OrderingPolicy} what to return in an iterator.
*/
public class IteratorSelector {
public static final IteratorSelector EMPTY_ITERATOR_SELECTOR =
new IteratorSelector();
private String partition;
/**
* The partition for this iterator selector.
* @return partition
*/
public String getPartition() {
return this.partition;
}
/**
* Set partition for this iterator selector.
* @param p partition
*/
public void setPartition(String p) {
this.partition = p;
}
}

View File

@ -45,10 +45,11 @@ public interface OrderingPolicy<S extends SchedulableEntity> {
/**
* Return an iterator over the collection of {@link SchedulableEntity}
* objects which orders them for container assignment.
* @param sel the {@link IteratorSelector} to filter with
* @return an iterator over the collection of {@link SchedulableEntity}
* objects
*/
public Iterator<S> getAssignmentIterator();
Iterator<S> getAssignmentIterator(IteratorSelector sel);
/**
* Return an iterator over the collection of {@link SchedulableEntity}

View File

@ -55,4 +55,9 @@ public interface SchedulableEntity {
*/
public boolean isRecovering();
/**
* Get partition corresponding to this entity.
* @return partition
*/
String getPartition();
}

View File

@ -38,6 +38,7 @@ import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -978,6 +979,147 @@ public class TestSchedulerUtils {
System.err.println("Failed to wait scheduler application attempt stopped.");
}
@Test
public void testEnforcePartitionExclusivity() {
String enforcedExclusiveLabel = "x";
Set<String> enforcedExclusiveLabelSet =
Collections.singleton(enforcedExclusiveLabel);
String dummyLabel = "y";
String appLabel = "appLabel";
ResourceRequest rr = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1);
// RR label unset and app label does not match. Nothing should happen.
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertNull(rr.getNodeLabelExpression());
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertNull(rr.getNodeLabelExpression());
// RR label and app label do not match. Nothing should happen.
rr.setNodeLabelExpression(dummyLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
// RR label matches but app label does not. RR label should be set
// to app label
rr.setNodeLabelExpression(enforcedExclusiveLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertNull(rr.getNodeLabelExpression());
rr.setNodeLabelExpression(enforcedExclusiveLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertEquals(appLabel, rr.getNodeLabelExpression());
// RR label unset and app label matches. RR label should be set
// to app label
rr.setNodeLabelExpression(null);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedExclusiveLabel);
Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
// RR label does not match and app label matches. RR label should be set
// to app label
rr.setNodeLabelExpression(dummyLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedExclusiveLabel);
Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
// RR label and app label matches. Nothing should happen.
rr.setNodeLabelExpression(enforcedExclusiveLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedExclusiveLabel);
Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
// Unconfigured label: nothing should happen.
rr.setNodeLabelExpression(null);
SchedulerUtils.enforcePartitionExclusivity(rr, null,
appLabel);
Assert.assertNull(rr.getNodeLabelExpression());
rr.setNodeLabelExpression(dummyLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, null,
appLabel);
Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
rr.setNodeLabelExpression(enforcedExclusiveLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, null,
appLabel);
Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
}
@Test
public void testEnforcePartitionExclusivityMultipleLabels() {
String enforcedLabel1 = "x";
String enforcedLabel2 = "y";
Set<String> enforcedExclusiveLabelSet = new HashSet<>();
enforcedExclusiveLabelSet.add(enforcedLabel1);
enforcedExclusiveLabelSet.add(enforcedLabel2);
String dummyLabel = "dummyLabel";
String appLabel = "appLabel";
ResourceRequest rr = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1);
// RR label unset and app label does not match. Nothing should happen.
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertNull(rr.getNodeLabelExpression());
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertNull(rr.getNodeLabelExpression());
// RR label and app label do not match. Nothing should happen.
rr.setNodeLabelExpression(dummyLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
// RR label matches but app label does not. RR label should be set
// to app label
rr.setNodeLabelExpression(enforcedLabel1);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
null);
Assert.assertNull(rr.getNodeLabelExpression());
rr.setNodeLabelExpression(enforcedLabel2);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
appLabel);
Assert.assertEquals(appLabel, rr.getNodeLabelExpression());
// RR label unset and app label matches. RR label should be set
// to app label
rr.setNodeLabelExpression(null);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedLabel1);
Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
// RR label does not match and app label matches. RR label should be set
// to app label
rr.setNodeLabelExpression(dummyLabel);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedLabel2);
Assert.assertEquals(enforcedLabel2, rr.getNodeLabelExpression());
// RR label and app label matches. Nothing should happen.
rr.setNodeLabelExpression(enforcedLabel1);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedLabel1);
Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
// RR label and app label don't match, but they're both enforced labels.
// RR label should be set to app label.
rr.setNodeLabelExpression(enforcedLabel2);
SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
enforcedLabel1);
Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
}
public static SchedulerApplication<SchedulerApplicationAttempt>
verifyAppAddedAndRemovedFromScheduler(
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>>

View File

@ -162,6 +162,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -1301,8 +1302,9 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
//This happens because app2 has no demand/a magnitude of NaN, which
//results in app1 and app2 being equal in the fairness comparison and
//failling back to fifo (start) ordering
assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
appId1.toString());
assertEquals(q.getOrderingPolicy().getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
appId1.toString());
//Now, allocate for app2 (this would be the first/AM allocation)
ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);
@ -1314,8 +1316,9 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
//verify re-ordering based on the allocation alone
//Now, the first app for assignment is app2
assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
appId2.toString());
assertEquals(q.getOrderingPolicy().getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
appId2.toString());
rm.stop();
}

View File

@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuot
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
@ -133,6 +134,8 @@ public class TestLeafQueue {
final static int GB = 1024;
final static String DEFAULT_RACK = "/default";
private final static String LABEL = "test";
private final ResourceCalculator resourceCalculator =
new DefaultResourceCalculator();
@ -141,14 +144,19 @@ public class TestLeafQueue {
@Before
public void setUp() throws Exception {
setUpInternal(resourceCalculator);
setUpInternal(resourceCalculator, false);
}
private void setUpWithDominantResourceCalculator() throws Exception {
setUpInternal(dominantResourceCalculator);
setUpInternal(dominantResourceCalculator, false);
}
private void setUpInternal(ResourceCalculator rC) throws Exception {
private void setUpWithNodeLabels() throws Exception {
setUpInternal(resourceCalculator, true);
}
private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels)
throws Exception {
CapacityScheduler spyCs = new CapacityScheduler();
queues = new HashMap<String, CSQueue>();
cs = spy(spyCs);
@ -174,7 +182,7 @@ public class TestLeafQueue {
csConf.setBoolean(CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES,
false);
final String newRoot = "root" + System.currentTimeMillis();
setupQueueConfiguration(csConf, newRoot);
setupQueueConfiguration(csConf, newRoot, withNodeLabels);
YarnConfiguration conf = new YarnConfiguration();
cs.setConf(conf);
when(spyRMContext.getYarnConfiguration()).thenReturn(conf);
@ -231,24 +239,39 @@ public class TestLeafQueue {
private static final String E = "e";
private void setupQueueConfiguration(
CapacitySchedulerConfiguration conf,
final String newRoot) {
final String newRoot, boolean withNodeLabels) {
// Define top-level queues
conf.setQueues(ROOT, new String[] {newRoot});
conf.setMaximumCapacity(ROOT, 100);
conf.setAcl(ROOT,
QueueACL.SUBMIT_APPLICATIONS, " ");
if (withNodeLabels) {
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, LABEL, 100);
conf.setMaximumCapacityByLabel(CapacitySchedulerConfiguration.ROOT,
LABEL, 100);
}
final String Q_newRoot = ROOT + "." + newRoot;
conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E});
conf.setCapacity(Q_newRoot, 100);
conf.setMaximumCapacity(Q_newRoot, 100);
conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
if (withNodeLabels) {
conf.setAccessibleNodeLabels(Q_newRoot, Collections.singleton(LABEL));
conf.setCapacityByLabel(Q_newRoot, LABEL, 100);
conf.setMaximumCapacityByLabel(Q_newRoot, LABEL, 100);
}
final String Q_A = Q_newRoot + "." + A;
conf.setCapacity(Q_A, 8.5f);
conf.setMaximumCapacity(Q_A, 20);
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
if (withNodeLabels) {
conf.setAccessibleNodeLabels(Q_A, Collections.singleton(LABEL));
conf.setCapacityByLabel(Q_A, LABEL, 100);
conf.setMaximumCapacityByLabel(Q_A, LABEL, 100);
}
final String Q_B = Q_newRoot + "." + B;
conf.setCapacity(Q_B, 80);
@ -3100,7 +3123,7 @@ public class TestLeafQueue {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
final String newRootName = "root" + System.currentTimeMillis();
setupQueueConfiguration(csConf, newRootName);
setupQueueConfiguration(csConf, newRootName, false);
Resource clusterResource = Resources.createResource(100 * 16 * GB,
100 * 32);
@ -3292,6 +3315,116 @@ public class TestLeafQueue {
Assert.assertEquals(3 * GB, app_0.getCurrentConsumption().getMemorySize());
}
@Test
public void testFifoWithPartitionsAssignment() throws Exception {
setUpWithNodeLabels();
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
OrderingPolicy<FiCaSchedulerApp> policy =
new FifoOrderingPolicyWithExclusivePartitions<>();
policy.configure(Collections.singletonMap(
YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, LABEL));
a.setOrderingPolicy(policy);
String host00 = "127.0.0.1";
String rack0 = "rack_0";
FiCaSchedulerNode node00 = TestUtils.getMockNode(host00, rack0, 0,
16 * GB);
when(node00.getPartition()).thenReturn(LABEL);
String host01 = "127.0.0.2";
FiCaSchedulerNode node01 = TestUtils.getMockNode(host01, rack0, 0,
16 * GB);
when(node01.getPartition()).thenReturn("");
final int numNodes = 4;
Resource clusterResource = Resources.createResource(numNodes * (16 * GB),
numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
String user0 = "user_0";
final ApplicationAttemptId appAttemptId0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 = spy(new FiCaSchedulerApp(appAttemptId0, user0, a,
mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5),
false));
a.submitApplicationAttempt(app0, user0);
final ApplicationAttemptId appAttemptId1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app1 = spy(new FiCaSchedulerApp(appAttemptId1, user0, a,
mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3),
false));
when(app1.getPartition()).thenReturn(LABEL);
a.submitApplicationAttempt(app1, user0);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
app1);
Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node00.getNodeID(),
node00, node01.getNodeID(), node01);
Priority priority = TestUtils.createMockPriority(1);
List<ResourceRequest> app0Requests = new ArrayList<>();
List<ResourceRequest> app1Requests = new ArrayList<>();
app0Requests.clear();
app0Requests.add(TestUtils
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priority,
recordFactory));
app0.updateResourceRequests(app0Requests);
app1Requests.clear();
app1Requests.add(TestUtils
.createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
recordFactory, LABEL));
app1.updateResourceRequests(app1Requests);
// app_1 will get containers since it is exclusive-enforced
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node00,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage()
.getUsed(LABEL).getMemorySize());
// app_0 should not get resources from node_0_0 since the labels
// don't match
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node00,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
Assert.assertEquals(0 * GB, app0.getCurrentConsumption().getMemorySize());
app1Requests.clear();
app1Requests.add(TestUtils
.createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
recordFactory, LABEL));
app1.updateResourceRequests(app1Requests);
// When node_0_1 heartbeats, app_0 should get containers
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node01,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage()
.getUsed(LABEL).getMemorySize());
app0Requests.clear();
app0Requests.add(TestUtils
.createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
recordFactory));
app0.updateResourceRequests(app0Requests);
// When node_0_0 heartbeats, app_1 should get containers again
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node00,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
Assert.assertEquals(2 * GB, app1.getSchedulingResourceUsage()
.getUsed(LABEL).getMemorySize());
}
@Test
public void testConcurrentAccess() throws Exception {
YarnConfiguration conf = new YarnConfiguration();

View File

@ -18,21 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
public class MockSchedulableEntity implements SchedulableEntity {
private String id;
private long serial = 0;
private Priority priority;
private boolean isRecovering;
private String partition = "";
public MockSchedulableEntity() { }
@ -101,4 +99,13 @@ public class MockSchedulableEntity implements SchedulableEntity {
protected void setRecovering(boolean entityRecovering) {
this.isRecovering = entityRecovering;
}
@Override
public String getPartition() {
return partition;
}
public void setPartition(String partition) {
this.partition = partition;
}
}

View File

@ -126,19 +126,25 @@ public class TestFairOrderingPolicy {
//Assignment, least to greatest consumption
checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
checkIds(schedOrder.getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR),
new String[]{"3", "2", "1"});
//Preemption, greatest to least
checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
//Change value without inform, should see no change
msp2.setUsed(Resources.createResource(6));
checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
checkIds(schedOrder.getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR),
new String[]{"3", "2", "1"});
checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
//Do inform, will reorder
schedOrder.containerAllocated(msp2, null);
checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"});
checkIds(schedOrder.getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR),
new String[]{"3", "1", "2"});
checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"});
}

View File

@ -63,7 +63,7 @@ public class TestFifoOrderingPolicy {
schedOrder.addSchedulableEntity(msp3);
//Assignment, oldest to youngest
checkSerials(schedOrder.getAssignmentIterator(), new long[]{1, 2, 3});
checkSerials(schedOrder.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[]{1, 2, 3});
//Preemption, youngest to oldest
checkSerials(schedOrder.getPreemptionIterator(), new long[]{3, 2, 1});

View File

@ -74,8 +74,9 @@ public class TestFifoOrderingPolicyForPendingApps {
schedOrder.addSchedulableEntity(msp7);
// Assignment with serial id's are 3,2,4,1,6,5,7
checkSerials(schedOrder.getAssignmentIterator(), new long[] { 3, 2, 4, 1,
6, 5, 7 });
checkSerials(schedOrder.getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[] {3, 2, 4, 1,
6, 5, 7});
//Preemption, youngest to oldest
checkSerials(schedOrder.getPreemptionIterator(), new long[] { 7, 5, 6, 1,

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests {@link FifoOrderingPolicyWithExclusivePartitions} ordering policy.
*/
public class TestFifoOrderingPolicyWithExclusivePartitions {
private static final String PARTITION = "test";
private static final String PARTITION2 = "test2";
@Test
public void testNoConfiguredExclusiveEnforcedPartitions() {
FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy =
new FifoOrderingPolicyWithExclusivePartitions<>();
policy.configure(Collections.EMPTY_MAP);
MockSchedulableEntity p1 = new MockSchedulableEntity(4, 0, false);
p1.setPartition(PARTITION);
p1.setId("p1");
MockSchedulableEntity p2 = new MockSchedulableEntity(3, 1, false);
p2.setPartition(PARTITION);
p2.setId("p2");
MockSchedulableEntity r1 = new MockSchedulableEntity(2, 0, false);
r1.setId("r1");
MockSchedulableEntity r2 = new MockSchedulableEntity(1, 0, false);
r2.setId("r2");
policy.addSchedulableEntity(p1);
policy.addAllSchedulableEntities(Arrays.asList(p2, r1, r2));
Assert.assertEquals(4, policy.getNumSchedulableEntities());
Assert.assertEquals(4, policy.getSchedulableEntities().size());
IteratorSelector sel = new IteratorSelector();
// Should behave like FifoOrderingPolicy, regardless of partition
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "p2", "r2", "r1", "p1");
verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "p2", "r2", "r1", "p1");
verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2");
policy.removeSchedulableEntity(p2);
policy.removeSchedulableEntity(r2);
Assert.assertEquals(2, policy.getNumSchedulableEntities());
Assert.assertEquals(2, policy.getSchedulableEntities().size());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1", "p1");
verifyPreemptionIteratorOrder(policy, "p1", "r1");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "r1", "p1");
verifyPreemptionIteratorOrder(policy, "p1", "r1");
}
@Test
public void testSingleExclusiveEnforcedPartition() {
FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy =
new FifoOrderingPolicyWithExclusivePartitions<>();
policy.configure(Collections.singletonMap(
YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, PARTITION));
// PARTITION iterator should return p2, p1, p3
MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false);
p1.setPartition(PARTITION);
p1.setId("p1");
MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false);
p2.setPartition(PARTITION);
p2.setId("p2");
MockSchedulableEntity p3 = new MockSchedulableEntity(3, 0, false);
p3.setPartition(PARTITION);
p3.setId("p3");
// non-PARTITION iterator should return r3, r2, r1
MockSchedulableEntity r1 = new MockSchedulableEntity(6, 0, false);
r1.setId("r1");
MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false);
r2.setId("r2");
MockSchedulableEntity r3 = new MockSchedulableEntity(2, 1, false);
r3.setId("r3");
policy.addSchedulableEntity(r1);
Assert.assertEquals(1, policy.getNumSchedulableEntities());
Assert.assertEquals("r1", policy.getSchedulableEntities()
.iterator().next().getId());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1");
verifyPreemptionIteratorOrder(policy, "r1");
List<MockSchedulableEntity> entities = Arrays.asList(r2, r3, p1, p2);
policy.addAllSchedulableEntities(entities);
policy.addSchedulableEntity(p3);
Assert.assertEquals(6, policy.getNumSchedulableEntities());
Assert.assertEquals(6, policy.getSchedulableEntities().size());
// Assignment iterator should return non-PARTITION entities,
// in order based on FifoOrderingPolicy
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r2", "r1");
// Preemption iterator should return all entities, in global order
verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
// Same thing as above, but with a non-empty partition
IteratorSelector sel = new IteratorSelector();
sel.setPartition("dummy");
verifyAssignmentIteratorOrder(policy, sel, "r3", "r2", "r1");
verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
// Should return PARTITION entities, in order based on FifoOrderingPolicy
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "p2", "p1", "p3");
verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
policy.removeSchedulableEntity(p2);
policy.removeSchedulableEntity(r2);
Assert.assertEquals(4, policy.getNumSchedulableEntities());
Assert.assertEquals(4, policy.getSchedulableEntities().size());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1");
verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "p1", "p3");
verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3");
policy.removeSchedulableEntity(p1);
policy.removeSchedulableEntity(p3);
Assert.assertEquals(2, policy.getNumSchedulableEntities());
Assert.assertEquals(2, policy.getSchedulableEntities().size());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1");
verifyPreemptionIteratorOrder(policy, "r1", "r3");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel);
verifyPreemptionIteratorOrder(policy, "r1", "r3");
}
@Test
public void testMultipleExclusiveEnforcedPartitions() {
FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy =
new FifoOrderingPolicyWithExclusivePartitions<>();
policy.configure(Collections.singletonMap(
YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX,
PARTITION + "," + PARTITION2));
// PARTITION iterator should return p2, p1
MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false);
p1.setPartition(PARTITION);
p1.setId("p1");
MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false);
p2.setPartition(PARTITION);
p2.setId("p2");
// PARTITION2 iterator should return r1, r2
MockSchedulableEntity r1 = new MockSchedulableEntity(3, 0, false);
r1.setPartition(PARTITION2);
r1.setId("r1");
MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false);
r2.setPartition(PARTITION2);
r2.setId("r2");
// default iterator should return s2, s1
MockSchedulableEntity s1 = new MockSchedulableEntity(6, 0, false);
s1.setId("s1");
MockSchedulableEntity s2 = new MockSchedulableEntity(2, 0, false);
s2.setId("s2");
policy.addAllSchedulableEntities(Arrays.asList(s1, s2, r1));
Assert.assertEquals(3, policy.getNumSchedulableEntities());
Assert.assertEquals(3, policy.getSchedulableEntities().size());
IteratorSelector sel = new IteratorSelector();
// assignment iterator returns only default (non-partitioned) entities
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
verifyPreemptionIteratorOrder(policy, "s1", "r1", "s2");
sel.setPartition(PARTITION2);
verifyAssignmentIteratorOrder(policy, sel, "r1");
policy.addAllSchedulableEntities(Arrays.asList(r2, p1, p2));
Assert.assertEquals(6, policy.getNumSchedulableEntities());
Assert.assertEquals(6, policy.getSchedulableEntities().size());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "p2", "p1");
sel.setPartition(PARTITION2);
verifyAssignmentIteratorOrder(policy, sel, "r1", "r2");
verifyPreemptionIteratorOrder(policy, "s1", "r2", "r1", "s2", "p1", "p2");
policy.removeSchedulableEntity(p2);
policy.removeSchedulableEntity(r1);
policy.removeSchedulableEntity(r2);
Assert.assertEquals(3, policy.getNumSchedulableEntities());
Assert.assertEquals(3, policy.getSchedulableEntities().size());
verifyAssignmentIteratorOrder(policy,
IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
sel.setPartition(PARTITION);
verifyAssignmentIteratorOrder(policy, sel, "p1");
sel.setPartition(PARTITION2);
verifyAssignmentIteratorOrder(policy, sel);
verifyPreemptionIteratorOrder(policy, "s1", "s2", "p1");
}
private void verifyAssignmentIteratorOrder(
FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy,
IteratorSelector sel, String... ids) {
verifyIteratorOrder(policy.getAssignmentIterator(sel), ids);
}
private void verifyPreemptionIteratorOrder(
FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy,
String... ids) {
verifyIteratorOrder(policy.getPreemptionIterator(), ids);
}
private void verifyIteratorOrder(Iterator<MockSchedulableEntity> itr,
String... ids) {
for (String id : ids) {
Assert.assertEquals(id, itr.next().getId());
}
Assert.assertFalse(itr.hasNext());
}
}