Merge MAPREDUCE-4144 from trunk. Fix a NPE in the ResourceManager when handling node updates. (Contributed by Jason Lowe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1325993 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d41c0ede09
commit
68f136337d
|
@ -234,6 +234,9 @@ Release 0.23.3 - UNRELEASED
|
|||
MAPREDUCE-4128. AM Recovery expects all attempts of a completed task to
|
||||
also be completed. (Bikas Saha via bobby)
|
||||
|
||||
MAPREDUCE-4144. Fix a NPE in the ResourceManager when handling node
|
||||
updates. (Jason Lowe via sseth)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -1118,13 +1118,12 @@ public class LeafQueue implements CSQueue {
|
|||
boolean canAssign(SchedulerApp application, Priority priority,
|
||||
SchedulerNode node, NodeType type, RMContainer reservedContainer) {
|
||||
|
||||
// Reserved...
|
||||
if (reservedContainer != null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Clearly we need containers for this application...
|
||||
if (type == NodeType.OFF_SWITCH) {
|
||||
if (reservedContainer != null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 'Delay' off-switch
|
||||
ResourceRequest offSwitchRequest =
|
||||
application.getResourceRequest(priority, RMNode.ANY);
|
||||
|
|
|
@ -926,6 +926,103 @@ public class TestLeafQueue {
|
|||
assertEquals(4*GB, a.getMetrics().getAllocatedMB());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStolenReservedContainer() throws Exception {
|
||||
// Manipulate queue 'a'
|
||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||
//unset maxCapacity
|
||||
a.setMaxCapacity(1.0f);
|
||||
|
||||
// Users
|
||||
final String user_0 = "user_0";
|
||||
final String user_1 = "user_1";
|
||||
|
||||
// Submit applications
|
||||
final ApplicationAttemptId appAttemptId_0 =
|
||||
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||
SchedulerApp app_0 =
|
||||
new SchedulerApp(appAttemptId_0, user_0, a,
|
||||
mock(ActiveUsersManager.class), rmContext, null);
|
||||
a.submitApplication(app_0, user_0, A);
|
||||
|
||||
final ApplicationAttemptId appAttemptId_1 =
|
||||
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||
SchedulerApp app_1 =
|
||||
new SchedulerApp(appAttemptId_1, user_1, a,
|
||||
mock(ActiveUsersManager.class), rmContext, null);
|
||||
a.submitApplication(app_1, user_1, A);
|
||||
|
||||
// Setup some nodes
|
||||
String host_0 = "host_0";
|
||||
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
|
||||
String host_1 = "host_1";
|
||||
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
|
||||
|
||||
final int numNodes = 3;
|
||||
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
|
||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||
|
||||
// Setup resource-requests
|
||||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app_0.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
|
||||
recordFactory)));
|
||||
|
||||
// Setup app_1 to request a 4GB container on host_0 and
|
||||
// another 4GB container anywhere.
|
||||
ArrayList<ResourceRequest> appRequests_1 =
|
||||
new ArrayList<ResourceRequest>(4);
|
||||
appRequests_1.add(TestUtils.createResourceRequest(host_0, 4*GB, 1,
|
||||
priority, recordFactory));
|
||||
appRequests_1.add(TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1,
|
||||
priority, recordFactory));
|
||||
appRequests_1.add(TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 2,
|
||||
priority, recordFactory));
|
||||
app_1.updateResourceRequests(appRequests_1);
|
||||
|
||||
// Start testing...
|
||||
|
||||
a.assignContainers(clusterResource, node_0);
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||
assertEquals(0*GB, a.getMetrics().getReservedMB());
|
||||
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
|
||||
assertEquals(0*GB, a.getMetrics().getAvailableMB());
|
||||
|
||||
// Now, reservation should kick in for app_1
|
||||
a.assignContainers(clusterResource, node_0);
|
||||
assertEquals(6*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
|
||||
assertEquals(2*GB, node_0.getUsedResource().getMemory());
|
||||
assertEquals(4*GB, a.getMetrics().getReservedMB());
|
||||
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
|
||||
|
||||
// node_1 heartbeats in and gets the DEFAULT_RACK request for app_1
|
||||
a.assignContainers(clusterResource, node_1);
|
||||
assertEquals(10*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
|
||||
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
|
||||
assertEquals(4*GB, node_1.getUsedResource().getMemory());
|
||||
assertEquals(4*GB, a.getMetrics().getReservedMB());
|
||||
assertEquals(6*GB, a.getMetrics().getAllocatedMB());
|
||||
|
||||
// Now free 1 container from app_0 and try to assign to node_0
|
||||
a.completedContainer(clusterResource, app_0, node_0,
|
||||
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
|
||||
a.assignContainers(clusterResource, node_0);
|
||||
assertEquals(8*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(8*GB, app_1.getCurrentConsumption().getMemory());
|
||||
assertEquals(0*GB, app_1.getCurrentReservation().getMemory());
|
||||
assertEquals(4*GB, node_0.getUsedResource().getMemory());
|
||||
assertEquals(0*GB, a.getMetrics().getReservedMB());
|
||||
assertEquals(8*GB, a.getMetrics().getAllocatedMB());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReservationExchange() throws Exception {
|
||||
|
||||
|
|
Loading…
Reference in New Issue