YARN-9204. RM fails to start if absolute resource is specified for partition capacity in CS queues. Contributed by Jiandan Yang.
This commit is contained in:
parent
27aa6e8899
commit
abde1e1f58
|
@ -474,6 +474,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
", capacity=" + capacity);
|
", capacity=" + capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setCapacity(String queue, String absoluteResourceCapacity) {
|
||||||
|
if (queue.equals("root")) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Cannot set capacity, root queue has a fixed capacity");
|
||||||
|
}
|
||||||
|
set(getQueuePrefix(queue) + CAPACITY, absoluteResourceCapacity);
|
||||||
|
LOG.debug("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue)
|
||||||
|
+ ", capacity=" + absoluteResourceCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
public float getNonLabeledQueueMaximumCapacity(String queue) {
|
public float getNonLabeledQueueMaximumCapacity(String queue) {
|
||||||
String configuredCapacity = get(getQueuePrefix(queue) + MAXIMUM_CAPACITY);
|
String configuredCapacity = get(getQueuePrefix(queue) + MAXIMUM_CAPACITY);
|
||||||
boolean matcher = (configuredCapacity != null)
|
boolean matcher = (configuredCapacity != null)
|
||||||
|
@ -509,6 +520,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity);
|
setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setCapacityByLabel(String queue, String label,
|
||||||
|
String absoluteResourceCapacity) {
|
||||||
|
set(getNodeLabelPrefix(queue, label) + CAPACITY, absoluteResourceCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
public void setMaximumCapacityByLabel(String queue, String label,
|
public void setMaximumCapacityByLabel(String queue, String label,
|
||||||
float capacity) {
|
float capacity) {
|
||||||
setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity);
|
setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity);
|
||||||
|
@ -645,8 +662,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
private float internalGetLabeledQueueCapacity(String queue, String label, String suffix,
|
private float internalGetLabeledQueueCapacity(String queue, String label, String suffix,
|
||||||
float defaultValue) {
|
float defaultValue) {
|
||||||
String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
|
String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
|
||||||
boolean matcher = (capacityPropertyName != null)
|
String configuredCapacity = get(capacityPropertyName);
|
||||||
&& RESOURCE_PATTERN.matcher(capacityPropertyName).find();
|
boolean matcher = (configuredCapacity != null)
|
||||||
|
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
|
||||||
if (matcher) {
|
if (matcher) {
|
||||||
// Return capacity in percentage as 0 for non-root queues and 100 for
|
// Return capacity in percentage as 0 for non-root queues and 100 for
|
||||||
// root.From AbstractCSQueue, absolute resource will be parsed and
|
// root.From AbstractCSQueue, absolute resource will be parsed and
|
||||||
|
|
|
@ -42,6 +42,7 @@ import java.util.Map;
|
||||||
import java.util.concurrent.BrokenBarrierException;
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -123,6 +124,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
||||||
|
@ -991,6 +993,45 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
||||||
new ClientToAMTokenSecretManagerInRM(), null));
|
new ClientToAMTokenSecretManagerInRM(), null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParseQueueWithAbsoluteResource() {
|
||||||
|
String childQueue = "testQueue";
|
||||||
|
String labelName = "testLabel";
|
||||||
|
|
||||||
|
CapacityScheduler cs = new CapacityScheduler();
|
||||||
|
cs.setConf(new YarnConfiguration());
|
||||||
|
cs.setRMContext(resourceManager.getRMContext());
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
|
||||||
|
conf.setQueues("root", new String[] {childQueue});
|
||||||
|
conf.setCapacity("root." + childQueue, "[memory=20480,vcores=200]");
|
||||||
|
conf.setAccessibleNodeLabels("root." + childQueue,
|
||||||
|
Sets.newHashSet(labelName));
|
||||||
|
conf.setCapacityByLabel("root", labelName, "[memory=10240,vcores=100]");
|
||||||
|
conf.setCapacityByLabel("root." + childQueue, labelName,
|
||||||
|
"[memory=4096,vcores=10]");
|
||||||
|
|
||||||
|
cs.init(conf);
|
||||||
|
cs.start();
|
||||||
|
|
||||||
|
Resource rootQueueLableCapacity =
|
||||||
|
cs.getQueue("root").getQueueResourceQuotas()
|
||||||
|
.getConfiguredMinResource(labelName);
|
||||||
|
assertEquals(10240, rootQueueLableCapacity.getMemorySize());
|
||||||
|
assertEquals(100, rootQueueLableCapacity.getVirtualCores());
|
||||||
|
|
||||||
|
QueueResourceQuotas childQueueQuotas =
|
||||||
|
cs.getQueue(childQueue).getQueueResourceQuotas();
|
||||||
|
Resource childQueueCapacity = childQueueQuotas.getConfiguredMinResource();
|
||||||
|
assertEquals(20480, childQueueCapacity.getMemorySize());
|
||||||
|
assertEquals(200, childQueueCapacity.getVirtualCores());
|
||||||
|
|
||||||
|
Resource childQueueLabelCapacity =
|
||||||
|
childQueueQuotas.getConfiguredMinResource(labelName);
|
||||||
|
assertEquals(4096, childQueueLabelCapacity.getMemorySize());
|
||||||
|
assertEquals(10, childQueueLabelCapacity.getVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReconnectedNode() throws Exception {
|
public void testReconnectedNode() throws Exception {
|
||||||
CapacitySchedulerConfiguration csConf =
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
|
Loading…
Reference in New Issue