YARN-4940. yarn node -list -all failed if RM start with decommissioned node. Contributed by sandflee

(cherry picked from commit 69f3d428d5)
This commit is contained in:
Jason Lowe 2016-04-15 20:36:45 +00:00
parent 2213a40f30
commit cd148cb347
4 changed files with 55 additions and 38 deletions

View File

@ -163,7 +163,7 @@ private void refreshHostsReader(Configuration yarnConf) throws IOException,
private void setDecomissionedNMs() { private void setDecomissionedNMs() {
Set<String> excludeList = hostsReader.getExcludedHosts(); Set<String> excludeList = hostsReader.getExcludedHosts();
for (final String host : excludeList) { for (final String host : excludeList) {
UnknownNodeId nodeId = new UnknownNodeId(host); NodeId nodeId = createUnknownNodeId(host);
RMNodeImpl rmNode = new RMNodeImpl(nodeId, RMNodeImpl rmNode = new RMNodeImpl(nodeId,
rmContext, host, -1, -1, new UnknownNode(host), null, null); rmContext, host, -1, -1, new UnknownNode(host), null, null);
rmContext.getInactiveRMNodes().put(nodeId, rmNode); rmContext.getInactiveRMNodes().put(nodeId, rmNode);
@ -430,38 +430,8 @@ public void refreshNodesForcefully() {
* A NodeId instance needed upon startup for populating inactive nodes Map. * A NodeId instance needed upon startup for populating inactive nodes Map.
* It only knows the hostname/ip and marks the port to -1 or invalid. * It only knows the hostname/ip and marks the port to -1 or invalid.
*/ */
public static class UnknownNodeId extends NodeId { public static NodeId createUnknownNodeId(String host) {
return NodeId.newInstance(host, -1);
private String host;
public UnknownNodeId(String host) {
this.host = host;
}
@Override
public String getHost() {
return this.host;
}
@Override
protected void setHost(String hst) {
}
@Override
public int getPort() {
return -1;
}
@Override
protected void setPort(int port) {
}
@Override
protected void build() {
}
} }
/** /**

View File

@ -786,8 +786,8 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
if (previousRMNode != null) { if (previousRMNode != null) {
rmNode.updateMetricsForRejoinedNode(previousRMNode.getState()); rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
} else { } else {
NodesListManager.UnknownNodeId unknownNodeId = NodeId unknownNodeId =
new NodesListManager.UnknownNodeId(nodeId.getHost()); NodesListManager.createUnknownNodeId(nodeId.getHost());
previousRMNode = previousRMNode =
rmNode.context.getInactiveRMNodes().remove(unknownNodeId); rmNode.context.getInactiveRMNodes().remove(unknownNodeId);
if (previousRMNode != null) { if (previousRMNode != null) {

View File

@ -28,6 +28,8 @@
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -1566,4 +1568,49 @@ private void testApplicationPriorityUpdation(ClientRMService rmService,
Assert.assertEquals("Incorrect priority has been returned", expected, Assert.assertEquals("Incorrect priority has been returned", expected,
updateApplicationPriority.getApplicationPriority().getPriority()); updateApplicationPriority.getApplicationPriority().getPriority());
} }
private void createExcludeFile(String filename) throws IOException {
File file = new File(filename);
if (file.exists()) {
file.delete();
}
FileOutputStream out = new FileOutputStream(file);
out.write("decommisssionedHost".getBytes());
out.close();
}
@Test
public void testRMStartWithDecommissionedNode() throws Exception {
String excludeFile = "excludeFile";
createExcludeFile(excludeFile);
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeFile);
MockRM rm = new MockRM(conf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
};
};
rm.start();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client =
(ApplicationClientProtocol) rpc
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
// Make call
GetClusterNodesRequest request =
GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
List<NodeReport> nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(1, nodeReports.size());
rm.stop();
rpc.stopProxy(client, conf);
new File(excludeFile).delete();
}
} }

View File

@ -690,8 +690,8 @@ public void testUpdateHeartbeatResponseForAppLifeCycle() {
@Test @Test
public void testUnknownNodeId() { public void testUnknownNodeId() {
NodesListManager.UnknownNodeId nodeId = NodeId nodeId =
new NodesListManager.UnknownNodeId("host1"); NodesListManager.createUnknownNodeId("host1");
RMNodeImpl node = RMNodeImpl node =
new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
rmContext.getInactiveRMNodes().putIfAbsent(nodeId,node); rmContext.getInactiveRMNodes().putIfAbsent(nodeId,node);