YARN-6024. Capacity Scheduler 'continuous reservation looking' doesn't work when sum of queue's used and reserved resources is equal to max. Contributed by Wangda Tan.
This commit is contained in:
parent
3e0bd9951f
commit
7820eeb267
|
@ -473,7 +473,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
// when total-used-without-reserved-resource < currentLimit, we still
|
// when total-used-without-reserved-resource < currentLimit, we still
|
||||||
// have chance to allocate on this node by unreserving some containers
|
// have chance to allocate on this node by unreserving some containers
|
||||||
if (Resources.lessThan(resourceCalculator, clusterResource,
|
if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
|
||||||
newTotalWithoutReservedResource, currentLimitResource)) {
|
newTotalWithoutReservedResource, currentLimitResource)) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("try to use reserved: " + getQueueName()
|
LOG.debug("try to use reserved: " + getQueueName()
|
||||||
|
|
|
@ -52,9 +52,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -774,4 +777,54 @@ public class TestContainerAllocation {
|
||||||
|
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testContinuousReservationLookingWhenUsedEqualsMax() throws Exception {
|
||||||
|
CapacitySchedulerConfiguration newConf =
|
||||||
|
(CapacitySchedulerConfiguration) TestUtils
|
||||||
|
.getConfigurationWithMultipleQueues(conf);
|
||||||
|
// Set maximum capacity of A to 10
|
||||||
|
newConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 10);
|
||||||
|
MockRM rm1 = new MockRM(newConf);
|
||||||
|
|
||||||
|
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
|
||||||
|
MockNM nm2 = rm1.registerNode("h2:1234", 90 * GB);
|
||||||
|
|
||||||
|
// launch an app to queue A, AM container should be launched in nm1
|
||||||
|
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
|
// launch 2nd app to queue B, AM container should be launched in nm1
|
||||||
|
// Now usage of nm1 is 3G (2G + 1G)
|
||||||
|
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b");
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
||||||
|
|
||||||
|
am1.allocate("*", 4 * GB, 2, null);
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
||||||
|
|
||||||
|
// Do node heartbeats twice, we expect one container allocated on nm1 and
|
||||||
|
// one container reserved on nm1.
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
|
||||||
|
FiCaSchedulerApp schedulerApp1 =
|
||||||
|
cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||||
|
|
||||||
|
// App1 will get 2 container allocated (plus AM container)
|
||||||
|
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
|
||||||
|
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
|
||||||
|
|
||||||
|
// Do node heartbeats on nm2, we expect one container allocated on nm2 and
|
||||||
|
// one unreserved on nm1
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
||||||
|
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
|
||||||
|
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -217,4 +217,40 @@ public class TestUtils {
|
||||||
when(container.getPriority()).thenReturn(priority);
|
when(container.getPriority()).thenReturn(priority);
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a queue structure:
|
||||||
|
* <pre>
|
||||||
|
* Root
|
||||||
|
* / | \
|
||||||
|
* a b c
|
||||||
|
* 10 20 70
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
public static Configuration
|
||||||
|
getConfigurationWithMultipleQueues(Configuration config) {
|
||||||
|
CapacitySchedulerConfiguration conf =
|
||||||
|
new CapacitySchedulerConfiguration(config);
|
||||||
|
|
||||||
|
// Define top-level queues
|
||||||
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[] { "a", "b", "c" });
|
||||||
|
|
||||||
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
|
conf.setCapacity(A, 10);
|
||||||
|
conf.setMaximumCapacity(A, 100);
|
||||||
|
conf.setUserLimitFactor(A, 100);
|
||||||
|
|
||||||
|
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||||
|
conf.setCapacity(B, 20);
|
||||||
|
conf.setMaximumCapacity(B, 100);
|
||||||
|
conf.setUserLimitFactor(B, 100);
|
||||||
|
|
||||||
|
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||||
|
conf.setCapacity(C, 70);
|
||||||
|
conf.setMaximumCapacity(C, 100);
|
||||||
|
conf.setUserLimitFactor(C, 100);
|
||||||
|
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue