YARN-4940. yarn node -list -all failed if RM start with decommissioned node. Contributed by sandflee
(cherry picked from commit 69f3d428d5c3ab0c79cacffc22b1f59408622ae7) Conflicts: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
This commit is contained in:
parent
80d279b481
commit
0ad4634585
@ -130,6 +130,9 @@ Release 2.7.3 - UNRELEASED
|
||||
YARN-4924. NM recovery race can lead to container not cleaned up.
|
||||
(sandflee via jlowe)
|
||||
|
||||
YARN-4940. yarn node -list -all failed if RM start with decommissioned
|
||||
node (sandflee via jlowe)
|
||||
|
||||
Release 2.7.2 - 2016-01-25
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -140,7 +140,7 @@ public void refreshNodes(Configuration yarnConf) throws IOException,
|
||||
private void setDecomissionedNMs() {
|
||||
Set<String> excludeList = hostsReader.getExcludedHosts();
|
||||
for (final String host : excludeList) {
|
||||
UnknownNodeId nodeId = new UnknownNodeId(host);
|
||||
NodeId nodeId = createUnknownNodeId(host);
|
||||
RMNodeImpl rmNode = new RMNodeImpl(nodeId,
|
||||
rmContext, host, -1, -1, new UnknownNode(host), null, null);
|
||||
rmContext.getInactiveRMNodes().put(nodeId.getHost(), rmNode);
|
||||
@ -253,38 +253,8 @@ private HostsFileReader createHostsFileReader(String includesFile,
|
||||
* A NodeId instance needed upon startup for populating inactive nodes Map.
|
||||
* It only knows the hostname/ip and marks the port to -1 or invalid.
|
||||
*/
|
||||
public static class UnknownNodeId extends NodeId {
|
||||
|
||||
private String host;
|
||||
|
||||
public UnknownNodeId(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHost() {
|
||||
return this.host;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setHost(String hst) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort() {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setPort(int port) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void build() {
|
||||
|
||||
}
|
||||
public static NodeId createUnknownNodeId(String host) {
|
||||
return NodeId.newInstance(host, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -28,6 +28,8 @@
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
@ -277,7 +279,7 @@ protected ClientRMService createClientRMService() {
|
||||
Assert.assertTrue(report.getNodeLabels() != null
|
||||
&& report.getNodeLabels().isEmpty());
|
||||
}
|
||||
|
||||
|
||||
rpc.stopProxy(client, conf);
|
||||
rm.close();
|
||||
}
|
||||
@ -1516,4 +1518,49 @@ protected ClientRMService createClientRMService() {
|
||||
rpc.stopProxy(client, conf);
|
||||
rm.close();
|
||||
}
|
||||
|
||||
private void createExcludeFile(String filename) throws IOException {
|
||||
File file = new File(filename);
|
||||
if (file.exists()) {
|
||||
file.delete();
|
||||
}
|
||||
|
||||
FileOutputStream out = new FileOutputStream(file);
|
||||
out.write("decommisssionedHost".getBytes());
|
||||
out.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRMStartWithDecommissionedNode() throws Exception {
|
||||
String excludeFile = "excludeFile";
|
||||
createExcludeFile(excludeFile);
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||
excludeFile);
|
||||
MockRM rm = new MockRM(conf) {
|
||||
protected ClientRMService createClientRMService() {
|
||||
return new ClientRMService(this.rmContext, scheduler,
|
||||
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
|
||||
this.getRMContext().getRMDelegationTokenSecretManager());
|
||||
};
|
||||
};
|
||||
rm.start();
|
||||
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
|
||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||
ApplicationClientProtocol client =
|
||||
(ApplicationClientProtocol) rpc
|
||||
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
|
||||
|
||||
// Make call
|
||||
GetClusterNodesRequest request =
|
||||
GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
|
||||
List<NodeReport> nodeReports = client.getClusterNodes(request).getNodeReports();
|
||||
Assert.assertEquals(1, nodeReports.size());
|
||||
|
||||
rm.stop();
|
||||
rpc.stopProxy(client, conf);
|
||||
new File(excludeFile).delete();
|
||||
}
|
||||
}
|
||||
|
@ -459,8 +459,8 @@ public void testUpdateHeartbeatResponseForCleanup() {
|
||||
|
||||
@Test
|
||||
public void testUnknownNodeId() {
|
||||
NodesListManager.UnknownNodeId nodeId =
|
||||
new NodesListManager.UnknownNodeId("host1");
|
||||
NodeId nodeId =
|
||||
NodesListManager.createUnknownNodeId("host1");
|
||||
RMNodeImpl node =
|
||||
new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
|
||||
rmContext.getInactiveRMNodes().putIfAbsent(nodeId.getHost(),node);
|
||||
|
Loading…
x
Reference in New Issue
Block a user