YARN-8858. CapacityScheduler should respect maximum node resource when per-queue maximum-allocation is being used. Contributed by Wangda Tan.
(cherry picked from commit edce866489
)
This commit is contained in:
parent
8a9f61be4e
commit
af85ce6ae4
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -236,6 +237,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setForceConfiguredMaxAllocation(boolean flag) {
|
||||||
|
writeLock.lock();
|
||||||
|
try {
|
||||||
|
forceConfiguredMaxAllocation = flag;
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void updateMaxResources(SchedulerNode node, boolean add) {
|
private void updateMaxResources(SchedulerNode node, boolean add) {
|
||||||
Resource totalResource = node.getTotalResource();
|
Resource totalResource = node.getTotalResource();
|
||||||
ResourceInformation[] totalResources;
|
ResourceInformation[] totalResources;
|
||||||
|
|
|
@ -2261,7 +2261,17 @@ public class CapacityScheduler extends
|
||||||
LOG.error("queue " + queueName + " is not an leaf queue");
|
LOG.error("queue " + queueName + " is not an leaf queue");
|
||||||
return getMaximumResourceCapability();
|
return getMaximumResourceCapability();
|
||||||
}
|
}
|
||||||
return ((LeafQueue)queue).getMaximumAllocation();
|
|
||||||
|
// queue.getMaxAllocation returns *configured* maximum allocation.
|
||||||
|
// getMaximumResourceCapability() returns maximum allocation considers
|
||||||
|
// per-node maximum resources. So return (component-wise) min of the two.
|
||||||
|
|
||||||
|
Resource queueMaxAllocation = ((LeafQueue)queue).getMaximumAllocation();
|
||||||
|
Resource clusterMaxAllocationConsiderNodeMax =
|
||||||
|
getMaximumResourceCapability();
|
||||||
|
|
||||||
|
return Resources.componentwiseMin(queueMaxAllocation,
|
||||||
|
clusterMaxAllocationConsiderNodeMax);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String handleMoveToPlanQueue(String targetQueueName) {
|
private String handleMoveToPlanQueue(String targetQueueName) {
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
|
@ -65,6 +66,7 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
|
||||||
|
|
||||||
public class TestContainerAllocation {
|
public class TestContainerAllocation {
|
||||||
|
|
||||||
|
@ -1062,4 +1064,54 @@ public class TestContainerAllocation {
|
||||||
|
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testContainerRejectionWhenAskBeyondDynamicMax()
|
||||||
|
throws Exception {
|
||||||
|
CapacitySchedulerConfiguration newConf =
|
||||||
|
(CapacitySchedulerConfiguration) TestUtils
|
||||||
|
.getConfigurationWithMultipleQueues(conf);
|
||||||
|
newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||||
|
DominantResourceCalculator.class, ResourceCalculator.class);
|
||||||
|
newConf.set(CapacitySchedulerConfiguration.getQueuePrefix("root.a")
|
||||||
|
+ MAXIMUM_ALLOCATION_MB, "4096");
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(newConf);
|
||||||
|
rm1.start();
|
||||||
|
|
||||||
|
// before any node registered or before registration timeout,
|
||||||
|
// submit an app beyond queue max leads to failure.
|
||||||
|
boolean submitFailed = false;
|
||||||
|
MockNM nm1 = rm1.registerNode("h1:1234", 2 * GB, 1);
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
try {
|
||||||
|
am1.allocate("*", 5 * GB, 1, null);
|
||||||
|
} catch (InvalidResourceRequestException e) {
|
||||||
|
submitFailed = true;
|
||||||
|
}
|
||||||
|
Assert.assertTrue(submitFailed);
|
||||||
|
|
||||||
|
// Ask 4GB succeeded.
|
||||||
|
am1.allocate("*", 4 * GB, 1, null);
|
||||||
|
|
||||||
|
// Add a new node, now the cluster maximum should be refreshed to 3GB.
|
||||||
|
CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
|
||||||
|
cs.getNodeTracker().setForceConfiguredMaxAllocation(false);
|
||||||
|
rm1.registerNode("h2:1234", 3 * GB, 1);
|
||||||
|
|
||||||
|
// Now ask 4 GB will fail
|
||||||
|
submitFailed = false;
|
||||||
|
try {
|
||||||
|
am1.allocate("*", 4 * GB, 1, null);
|
||||||
|
} catch (InvalidResourceRequestException e) {
|
||||||
|
submitFailed = true;
|
||||||
|
}
|
||||||
|
Assert.assertTrue(submitFailed);
|
||||||
|
|
||||||
|
// But ask 3 GB succeeded.
|
||||||
|
am1.allocate("*", 3 * GB, 1, null);
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue