YARN-4940. yarn node -list -all failed if RM start with decommissioned node. Contributed by sandflee
This commit is contained in:
parent
fdbafbc9e5
commit
69f3d428d5
|
@ -163,7 +163,7 @@ public class NodesListManager extends CompositeService implements
|
||||||
private void setDecomissionedNMs() {
|
private void setDecomissionedNMs() {
|
||||||
Set<String> excludeList = hostsReader.getExcludedHosts();
|
Set<String> excludeList = hostsReader.getExcludedHosts();
|
||||||
for (final String host : excludeList) {
|
for (final String host : excludeList) {
|
||||||
UnknownNodeId nodeId = new UnknownNodeId(host);
|
NodeId nodeId = createUnknownNodeId(host);
|
||||||
RMNodeImpl rmNode = new RMNodeImpl(nodeId,
|
RMNodeImpl rmNode = new RMNodeImpl(nodeId,
|
||||||
rmContext, host, -1, -1, new UnknownNode(host), null, null);
|
rmContext, host, -1, -1, new UnknownNode(host), null, null);
|
||||||
rmContext.getInactiveRMNodes().put(nodeId, rmNode);
|
rmContext.getInactiveRMNodes().put(nodeId, rmNode);
|
||||||
|
@ -430,38 +430,8 @@ public class NodesListManager extends CompositeService implements
|
||||||
* A NodeId instance needed upon startup for populating inactive nodes Map.
|
* A NodeId instance needed upon startup for populating inactive nodes Map.
|
||||||
* It only knows the hostname/ip and marks the port to -1 or invalid.
|
* It only knows the hostname/ip and marks the port to -1 or invalid.
|
||||||
*/
|
*/
|
||||||
public static class UnknownNodeId extends NodeId {
|
public static NodeId createUnknownNodeId(String host) {
|
||||||
|
return NodeId.newInstance(host, -1);
|
||||||
private String host;
|
|
||||||
|
|
||||||
public UnknownNodeId(String host) {
|
|
||||||
this.host = host;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getHost() {
|
|
||||||
return this.host;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void setHost(String hst) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getPort() {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void setPort(int port) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void build() {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -786,8 +786,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
if (previousRMNode != null) {
|
if (previousRMNode != null) {
|
||||||
rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
|
rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
|
||||||
} else {
|
} else {
|
||||||
NodesListManager.UnknownNodeId unknownNodeId =
|
NodeId unknownNodeId =
|
||||||
new NodesListManager.UnknownNodeId(nodeId.getHost());
|
NodesListManager.createUnknownNodeId(nodeId.getHost());
|
||||||
previousRMNode =
|
previousRMNode =
|
||||||
rmNode.context.getInactiveRMNodes().remove(unknownNodeId);
|
rmNode.context.getInactiveRMNodes().remove(unknownNodeId);
|
||||||
if (previousRMNode != null) {
|
if (previousRMNode != null) {
|
||||||
|
|
|
@ -28,6 +28,8 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -249,7 +251,7 @@ public class TestClientRMService {
|
||||||
Assert.assertTrue(report.getNodeLabels() != null
|
Assert.assertTrue(report.getNodeLabels() != null
|
||||||
&& report.getNodeLabels().isEmpty());
|
&& report.getNodeLabels().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
rpc.stopProxy(client, conf);
|
rpc.stopProxy(client, conf);
|
||||||
rm.close();
|
rm.close();
|
||||||
}
|
}
|
||||||
|
@ -1566,4 +1568,49 @@ public class TestClientRMService {
|
||||||
Assert.assertEquals("Incorrect priority has been returned", expected,
|
Assert.assertEquals("Incorrect priority has been returned", expected,
|
||||||
updateApplicationPriority.getApplicationPriority().getPriority());
|
updateApplicationPriority.getApplicationPriority().getPriority());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createExcludeFile(String filename) throws IOException {
|
||||||
|
File file = new File(filename);
|
||||||
|
if (file.exists()) {
|
||||||
|
file.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
FileOutputStream out = new FileOutputStream(file);
|
||||||
|
out.write("decommisssionedHost".getBytes());
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMStartWithDecommissionedNode() throws Exception {
|
||||||
|
String excludeFile = "excludeFile";
|
||||||
|
createExcludeFile(excludeFile);
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||||
|
excludeFile);
|
||||||
|
MockRM rm = new MockRM(conf) {
|
||||||
|
protected ClientRMService createClientRMService() {
|
||||||
|
return new ClientRMService(this.rmContext, scheduler,
|
||||||
|
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
|
||||||
|
this.getRMContext().getRMDelegationTokenSecretManager());
|
||||||
|
};
|
||||||
|
};
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
|
||||||
|
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||||
|
ApplicationClientProtocol client =
|
||||||
|
(ApplicationClientProtocol) rpc
|
||||||
|
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
|
||||||
|
|
||||||
|
// Make call
|
||||||
|
GetClusterNodesRequest request =
|
||||||
|
GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
|
||||||
|
List<NodeReport> nodeReports = client.getClusterNodes(request).getNodeReports();
|
||||||
|
Assert.assertEquals(1, nodeReports.size());
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
rpc.stopProxy(client, conf);
|
||||||
|
new File(excludeFile).delete();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -690,8 +690,8 @@ public class TestRMNodeTransitions {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnknownNodeId() {
|
public void testUnknownNodeId() {
|
||||||
NodesListManager.UnknownNodeId nodeId =
|
NodeId nodeId =
|
||||||
new NodesListManager.UnknownNodeId("host1");
|
NodesListManager.createUnknownNodeId("host1");
|
||||||
RMNodeImpl node =
|
RMNodeImpl node =
|
||||||
new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
|
new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
|
||||||
rmContext.getInactiveRMNodes().putIfAbsent(nodeId,node);
|
rmContext.getInactiveRMNodes().putIfAbsent(nodeId,node);
|
||||||
|
|
Loading…
Reference in New Issue