YARN-9017. PlacementRule order is not maintained in CS. Contributed by Bilwa S T.
This commit is contained in:
parent
130f89e068
commit
35010120fb
|
@ -698,8 +698,11 @@ public class CapacityScheduler extends
|
||||||
Set<String> distinguishRuleSet = CapacitySchedulerConfigValidator
|
Set<String> distinguishRuleSet = CapacitySchedulerConfigValidator
|
||||||
.validatePlacementRules(placementRuleStrs);
|
.validatePlacementRules(placementRuleStrs);
|
||||||
|
|
||||||
// add UserGroupMappingPlacementRule if absent
|
// add UserGroupMappingPlacementRule if empty,default value of
|
||||||
distinguishRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
|
// yarn.scheduler.queue-placement-rules is user-group
|
||||||
|
if (distinguishRuleSet.isEmpty()) {
|
||||||
|
distinguishRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
|
||||||
|
}
|
||||||
|
|
||||||
placementRuleStrs = new ArrayList<>(distinguishRuleSet);
|
placementRuleStrs = new ArrayList<>(distinguishRuleSet);
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
public final class CapacitySchedulerConfigValidator {
|
public final class CapacitySchedulerConfigValidator {
|
||||||
|
@ -58,7 +58,7 @@ public final class CapacitySchedulerConfigValidator {
|
||||||
|
|
||||||
public static Set<String> validatePlacementRules(
|
public static Set<String> validatePlacementRules(
|
||||||
Collection<String> placementRuleStrs) throws IOException {
|
Collection<String> placementRuleStrs) throws IOException {
|
||||||
Set<String> distinguishRuleSet = new HashSet<>();
|
Set<String> distinguishRuleSet = new LinkedHashSet<>();
|
||||||
// fail the case if we get duplicate placementRule add in
|
// fail the case if we get duplicate placementRule add in
|
||||||
for (String pls : placementRuleStrs) {
|
for (String pls : placementRuleStrs) {
|
||||||
if (!distinguishRuleSet.add(pls)) {
|
if (!distinguishRuleSet.add(pls)) {
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
@ -28,7 +27,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -49,20 +50,22 @@ public class TestPlacementManager {
|
||||||
public static final String PARENT_QUEUE = "c";
|
public static final String PARENT_QUEUE = "c";
|
||||||
|
|
||||||
private MockRM mockRM = null;
|
private MockRM mockRM = null;
|
||||||
|
private CapacitySchedulerConfiguration conf;
|
||||||
private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
|
|
||||||
|
|
||||||
private String getQueueMapping(String parentQueue, String leafQueue) {
|
private String getQueueMapping(String parentQueue, String leafQueue) {
|
||||||
return parentQueue + DOT + leafQueue;
|
return parentQueue + DOT + leafQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Before
|
||||||
public void testPlaceApplicationWithPlacementRuleChain() throws Exception {
|
public void setup() {
|
||||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
conf = new CapacitySchedulerConfiguration();
|
||||||
setupQueueConfiguration(conf);
|
setupQueueConfiguration(conf);
|
||||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
ResourceScheduler.class);
|
ResourceScheduler.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPlaceApplicationWithPlacementRuleChain() throws Exception {
|
||||||
mockRM = new MockRM(conf);
|
mockRM = new MockRM(conf);
|
||||||
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
||||||
mockRM.start();
|
mockRM.start();
|
||||||
|
@ -108,4 +111,38 @@ public class TestPlacementManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPlacementRuleUpdationOrder() throws Exception {
|
||||||
|
List<QueueMapping> queueMappings = new ArrayList<>();
|
||||||
|
QueueMapping userQueueMapping = QueueMappingBuilder.create()
|
||||||
|
.type(MappingType.USER).source(USER1)
|
||||||
|
.queue(getQueueMapping(PARENT_QUEUE, USER1)).build();
|
||||||
|
UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule(
|
||||||
|
false, Arrays.asList(userQueueMapping), null);
|
||||||
|
|
||||||
|
// Configure placement rule
|
||||||
|
conf.set(YarnConfiguration.QUEUE_PLACEMENT_RULES, ugRule.getName());
|
||||||
|
queueMappings.add(userQueueMapping);
|
||||||
|
conf.setQueueMappings(queueMappings);
|
||||||
|
|
||||||
|
mockRM = new MockRM(conf);
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
||||||
|
mockRM.start();
|
||||||
|
PlacementManager pm = cs.getRMContext().getQueuePlacementManager();
|
||||||
|
|
||||||
|
// As we are setting placement rule, It shouldn't update default
|
||||||
|
// placement rule ie user-group. Number of placemnt rules should be 1.
|
||||||
|
Assert.assertEquals(1, pm.getPlacementRules().size());
|
||||||
|
// Verifying if placement rule set is same as the one we configured
|
||||||
|
Assert.assertEquals(ugRule.getName(),
|
||||||
|
pm.getPlacementRules().get(0).getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
if (null != mockRM) {
|
||||||
|
mockRM.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue