YARN-5837. NPE when getting node status of a decommissioned node after an RM restart. Contributed by Robert Kanter
(cherry picked from commit 6bb741ff0e
)
This commit is contained in:
parent
4cee535aa5
commit
27ed2c526a
|
@ -44,6 +44,7 @@ import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -242,7 +243,8 @@ public class NodesListManager extends CompositeService implements
|
||||||
for (final String host : excludeList) {
|
for (final String host : excludeList) {
|
||||||
NodeId nodeId = createUnknownNodeId(host);
|
NodeId nodeId = createUnknownNodeId(host);
|
||||||
RMNodeImpl rmNode = new RMNodeImpl(nodeId,
|
RMNodeImpl rmNode = new RMNodeImpl(nodeId,
|
||||||
rmContext, host, -1, -1, new UnknownNode(host), null, null);
|
rmContext, host, -1, -1, new UnknownNode(host),
|
||||||
|
Resource.newInstance(0, 0), "unknown");
|
||||||
rmContext.getInactiveRMNodes().put(nodeId, rmNode);
|
rmContext.getInactiveRMNodes().put(nodeId, rmNode);
|
||||||
rmNode.handle(new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
|
rmNode.handle(new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
|
||||||
}
|
}
|
||||||
|
|
|
@ -256,4 +256,8 @@ public class MockNM {
|
||||||
public int getvCores() {
|
public int getvCores() {
|
||||||
return vCores;
|
return vCores;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getVersion() {
|
||||||
|
return version;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -103,6 +104,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
@ -1894,6 +1896,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
|
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
|
||||||
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
|
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
|
||||||
|
Resource expectedCapability =
|
||||||
|
Resource.newInstance(nm1.getMemory(), nm1.getvCores());
|
||||||
|
String expectedVersion = nm1.getVersion();
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(0,
|
.assertEquals(0,
|
||||||
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
@ -1915,6 +1920,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(2,
|
.assertEquals(2,
|
||||||
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
verifyNodesAfterDecom(rm1, 2, expectedCapability, expectedVersion);
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
rm1 = null;
|
rm1 = null;
|
||||||
Assert
|
Assert
|
||||||
|
@ -1928,6 +1934,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(2,
|
.assertEquals(2,
|
||||||
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
verifyNodesAfterDecom(rm2, 2, Resource.newInstance(0, 0), "unknown");
|
||||||
} finally {
|
} finally {
|
||||||
if (rm1 != null) {
|
if (rm1 != null) {
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
|
@ -1938,6 +1945,18 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyNodesAfterDecom(MockRM rm, int numNodes,
|
||||||
|
Resource expectedCapability,
|
||||||
|
String expectedVersion) {
|
||||||
|
ConcurrentMap<NodeId, RMNode> inactiveRMNodes =
|
||||||
|
rm.getRMContext().getInactiveRMNodes();
|
||||||
|
Assert.assertEquals(numNodes, inactiveRMNodes.size());
|
||||||
|
for (RMNode rmNode : inactiveRMNodes.values()) {
|
||||||
|
Assert.assertEquals(expectedCapability, rmNode.getTotalCapability());
|
||||||
|
Assert.assertEquals(expectedVersion, rmNode.getNodeManagerVersion());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Test Delegation token is renewed synchronously so that recover events
|
// Test Delegation token is renewed synchronously so that recover events
|
||||||
// can be processed before any other external incoming events, specifically
|
// can be processed before any other external incoming events, specifically
|
||||||
// the ContainerFinished event on NM re-registraton.
|
// the ContainerFinished event on NM re-registraton.
|
||||||
|
|
Loading…
Reference in New Issue