Merge -c 1576751 from trunk to branch-2 to fix YARN-1444. Fix CapacityScheduler to deal with cases where applications specify host/rack requests without off-switch request. Contributed by Wangda Tan.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1576752 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2014-03-12 14:37:27 +00:00
parent d5120ccc6b
commit 8c4578e894
5 changed files with 100 additions and 4 deletions

View File

@ -434,6 +434,10 @@ Release 2.4.0 - UNRELEASED
YARN-1800. Fixed NodeManager to gracefully handle RejectedExecutionException YARN-1800. Fixed NodeManager to gracefully handle RejectedExecutionException
in the public-localizer thread-pool. (Varun Vasudev via vinodkv) in the public-localizer thread-pool. (Varun Vasudev via vinodkv)
YARN-1444. Fix CapacityScheduler to deal with cases where applications
specify host/rack requests without off-switch request. (Wangda Tan via
acmurthy)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -836,10 +836,14 @@ public class LeafQueue implements CSQueue {
// Schedule in priority order // Schedule in priority order
for (Priority priority : application.getPriorities()) { for (Priority priority : application.getPriorities()) {
ResourceRequest anyRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (null == anyRequest) {
continue;
}
// Required resource // Required resource
Resource required = Resource required = anyRequest.getCapability();
application.getResourceRequest(
priority, ResourceRequest.ANY).getCapability();
// Do we need containers at this 'priority'? // Do we need containers at this 'priority'?
if (!needContainers(application, priority, required)) { if (!needContainers(application, priority, required)) {

View File

@ -503,9 +503,13 @@ public class FifoScheduler extends AbstractYarnScheduler implements
private int getMaxAllocatableContainers(FiCaSchedulerApp application, private int getMaxAllocatableContainers(FiCaSchedulerApp application,
Priority priority, FiCaSchedulerNode node, NodeType type) { Priority priority, FiCaSchedulerNode node, NodeType type) {
int maxContainers = 0;
ResourceRequest offSwitchRequest = ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, ResourceRequest.ANY); application.getResourceRequest(priority, ResourceRequest.ANY);
int maxContainers = offSwitchRequest.getNumContainers(); if (offSwitchRequest != null) {
maxContainers = offSwitchRequest.getNumContainers();
}
if (type == NodeType.OFF_SWITCH) { if (type == NodeType.OFF_SWITCH) {
return maxContainers; return maxContainers;

View File

@ -92,6 +92,38 @@ public class TestFifoScheduler {
} }
} }
@Test
public void testAllocateContainerOnNodeWithoutOffSwitchSpecified()
throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
RMApp app1 = rm.submitApp(2048);
// kick the scheduling, 2 GB given to AM1, remaining 4GB on nm1
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
// add request for containers
List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
requests.add(am1.createResourceReq("127.0.0.1", 1 * GB, 1, 1));
requests.add(am1.createResourceReq("/default-rack", 1 * GB, 1, 1));
am1.allocate(requests, null); // send the request
try {
// kick the schedule
nm1.nodeHeartbeat(true);
} catch (NullPointerException e) {
Assert.fail("NPE when allocating container on node but "
+ "forget to set off-switch request should be handled");
}
}
@Test @Test
public void test() throws Exception { public void test() throws Exception {
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();

View File

@ -34,11 +34,14 @@ import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -2013,6 +2016,55 @@ public class TestLeafQueue {
assertEquals(400, a.getMaximumActiveApplications()); assertEquals(400, a.getMaximumActiveApplications());
} }
@Test
public void testAllocateContainerOnNodeWithoutOffSwitchSpecified()
throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
// Users
final String user_0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext);
a.submitApplicationAttempt(app_1, user_0); // same user
// Setup some nodes
String host_0 = "127.0.0.1";
FiCaSchedulerNode node_0 =
TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8 * GB);
final int numNodes = 1;
Resource clusterResource =
Resources.createResource(numNodes * (8 * GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Arrays.asList(TestUtils.createResourceRequest(
"127.0.0.1", 1 * GB, 3, true, priority, recordFactory), TestUtils
.createResourceRequest(DEFAULT_RACK, 1 * GB, 3, true, priority,
recordFactory)));
try {
a.assignContainers(clusterResource, node_0);
} catch (NullPointerException e) {
Assert.fail("NPE when allocating container on node but "
+ "forget to set off-switch request should be handled");
}
}
private CapacitySchedulerContext mockCSContext( private CapacitySchedulerContext mockCSContext(
CapacitySchedulerConfiguration csConf, Resource clusterResource) { CapacitySchedulerConfiguration csConf, Resource clusterResource) {
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);