YARN-2628. Capacity scheduler with DominantResourceCalculator carries out reservation even though slots are free. Contributed by Varun Vasudev
(cherry picked from commit 054f285526
)
This commit is contained in:
parent
ecaae67ab3
commit
6f4c77409b
|
@ -497,6 +497,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2527. Fixed the potential NPE in ApplicationACLsManager and added test
|
YARN-2527. Fixed the potential NPE in ApplicationACLsManager and added test
|
||||||
cases for it. (Benoy Antony via zjshen)
|
cases for it. (Benoy Antony via zjshen)
|
||||||
|
|
||||||
|
YARN-2628. Capacity scheduler with DominantResourceCalculator carries out
|
||||||
|
reservation even though slots are free. (Varun Vasudev via jianhe)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -941,8 +941,8 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
// Try to schedule more if there are no reservations to fulfill
|
// Try to schedule more if there are no reservations to fulfill
|
||||||
if (node.getReservedContainer() == null) {
|
if (node.getReservedContainer() == null) {
|
||||||
if (Resources.greaterThanOrEqual(calculator, getClusterResource(),
|
if (calculator.computeAvailableContainers(node.getAvailableResource(),
|
||||||
node.getAvailableResource(), minimumAllocation)) {
|
minimumAllocation) > 0) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
|
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
|
||||||
", available: " + node.getAvailableResource());
|
", available: " + node.getAvailableResource());
|
||||||
|
|
|
@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -115,6 +116,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedule
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -1995,4 +1997,66 @@ public class TestCapacityScheduler {
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test to ensure that we don't carry out reservation on nodes
|
||||||
|
// that have no CPU available when using the DominantResourceCalculator
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testAppReservationWithDominantResourceCalculator() throws Exception {
|
||||||
|
CapacitySchedulerConfiguration csconf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
csconf.setResourceComparator(DominantResourceCalculator.class);
|
||||||
|
|
||||||
|
YarnConfiguration conf = new YarnConfiguration(csconf);
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 1);
|
||||||
|
|
||||||
|
// register extra nodes to bump up cluster resource
|
||||||
|
MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10 * GB, 4);
|
||||||
|
rm.registerNode("127.0.0.1:1236", 10 * GB, 4);
|
||||||
|
|
||||||
|
RMApp app1 = rm.submitApp(1024);
|
||||||
|
// kick the scheduling
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||||
|
am1.registerAppAttempt();
|
||||||
|
SchedulerNodeReport report_nm1 =
|
||||||
|
rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
||||||
|
|
||||||
|
// check node report
|
||||||
|
Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemory());
|
||||||
|
Assert.assertEquals(9 * GB, report_nm1.getAvailableResource().getMemory());
|
||||||
|
|
||||||
|
// add request for containers
|
||||||
|
am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 1 * GB, 1, 1);
|
||||||
|
am1.schedule(); // send the request
|
||||||
|
|
||||||
|
// kick the scheduler, container reservation should not happen
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
Thread.sleep(1000);
|
||||||
|
AllocateResponse allocResponse = am1.schedule();
|
||||||
|
ApplicationResourceUsageReport report =
|
||||||
|
rm.getResourceScheduler().getAppResourceUsageReport(
|
||||||
|
attempt1.getAppAttemptId());
|
||||||
|
Assert.assertEquals(0, allocResponse.getAllocatedContainers().size());
|
||||||
|
Assert.assertEquals(0, report.getNumReservedContainers());
|
||||||
|
|
||||||
|
// container should get allocated on this node
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
while (allocResponse.getAllocatedContainers().size() == 0) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
allocResponse = am1.schedule();
|
||||||
|
}
|
||||||
|
report =
|
||||||
|
rm.getResourceScheduler().getAppResourceUsageReport(
|
||||||
|
attempt1.getAppAttemptId());
|
||||||
|
Assert.assertEquals(1, allocResponse.getAllocatedContainers().size());
|
||||||
|
Assert.assertEquals(0, report.getNumReservedContainers());
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue