YARN-5837. NPE when getting node status of a decommissioned node after an RM restart. Contributed by Robert Kanter

This commit is contained in:
Jason Lowe 2016-11-04 22:20:21 +00:00
parent 0c0ab102ab
commit 6bb741ff0e
3 changed files with 26 additions and 1 deletions

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -242,7 +243,8 @@ public class NodesListManager extends CompositeService implements
for (final String host : excludeList) {
NodeId nodeId = createUnknownNodeId(host);
RMNodeImpl rmNode = new RMNodeImpl(nodeId,
rmContext, host, -1, -1, new UnknownNode(host), null, null);
rmContext, host, -1, -1, new UnknownNode(host),
Resource.newInstance(0, 0), "unknown");
rmContext.getInactiveRMNodes().put(nodeId, rmNode);
rmNode.handle(new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
}

View File

@ -256,4 +256,8 @@ public class MockNM {
public int getvCores() {
return vCores;
}
public String getVersion() {
return version;
}
}

View File

@ -40,6 +40,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
@ -103,6 +104,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@ -1957,6 +1959,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm1.start();
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
Resource expectedCapability =
Resource.newInstance(nm1.getMemory(), nm1.getvCores());
String expectedVersion = nm1.getVersion();
Assert
.assertEquals(0,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
@ -1978,6 +1983,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
Assert
.assertEquals(2,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
verifyNodesAfterDecom(rm1, 2, expectedCapability, expectedVersion);
rm1.stop();
rm1 = null;
Assert
@ -1991,6 +1997,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
Assert
.assertEquals(2,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
verifyNodesAfterDecom(rm2, 2, Resource.newInstance(0, 0), "unknown");
} finally {
if (rm1 != null) {
rm1.stop();
@ -2001,6 +2008,18 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
}
}
private void verifyNodesAfterDecom(MockRM rm, int numNodes,
Resource expectedCapability,
String expectedVersion) {
ConcurrentMap<NodeId, RMNode> inactiveRMNodes =
rm.getRMContext().getInactiveRMNodes();
Assert.assertEquals(numNodes, inactiveRMNodes.size());
for (RMNode rmNode : inactiveRMNodes.values()) {
Assert.assertEquals(expectedCapability, rmNode.getTotalCapability());
Assert.assertEquals(expectedVersion, rmNode.getNodeManagerVersion());
}
}
// Test Delegation token is renewed synchronously so that recover events
// can be processed before any other external incoming events, specifically
// the ContainerFinished event on NM re-registraton.