MAPREDUCE-4144. Fix a NPE in the ResourceManager when handling node updates. (Contributed by Jason Lowe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1325991 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2012-04-13 22:24:16 +00:00
parent 5a20d446cf
commit 9a10b4e773
3 changed files with 104 additions and 5 deletions

View File

@ -335,6 +335,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4128. AM Recovery expects all attempts of a completed task to
also be completed. (Bikas Saha via bobby)
MAPREDUCE-4144. Fix a NPE in the ResourceManager when handling node
updates. (Jason Lowe via sseth)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1118,13 +1118,12 @@ public class LeafQueue implements CSQueue {
boolean canAssign(SchedulerApp application, Priority priority,
SchedulerNode node, NodeType type, RMContainer reservedContainer) {
// Reserved...
if (reservedContainer != null) {
return true;
}
// Clearly we need containers for this application...
if (type == NodeType.OFF_SWITCH) {
if (reservedContainer != null) {
return true;
}
// 'Delay' off-switch
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, RMNode.ANY);

View File

@ -926,6 +926,103 @@ public class TestLeafQueue {
assertEquals(4*GB, a.getMetrics().getAllocatedMB());
}
@Test
public void testStolenReservedContainer() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
recordFactory)));
// Setup app_1 to request a 4GB container on host_0 and
// another 4GB container anywhere.
ArrayList<ResourceRequest> appRequests_1 =
new ArrayList<ResourceRequest>(4);
appRequests_1.add(TestUtils.createResourceRequest(host_0, 4*GB, 1,
priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1,
priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 2,
priority, recordFactory));
app_1.updateResourceRequests(appRequests_1);
// Start testing...
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
assertEquals(0*GB, a.getMetrics().getAvailableMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(2*GB, node_0.getUsedResource().getMemory());
assertEquals(4*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// node_1 heartbeats in and gets the DEFAULT_RACK request for app_1
a.assignContainers(clusterResource, node_1);
assertEquals(10*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(4*GB, node_1.getUsedResource().getMemory());
assertEquals(4*GB, a.getMetrics().getReservedMB());
assertEquals(6*GB, a.getMetrics().getAllocatedMB());
// Now free 1 container from app_0 and try to assign to node_0
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(8*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentReservation().getMemory());
assertEquals(4*GB, node_0.getUsedResource().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(8*GB, a.getMetrics().getAllocatedMB());
}
@Test
public void testReservationExchange() throws Exception {