YARN-3102. Decommisioned Nodes not listed in Web UI. Contributed by Kuhu Shukla
(cherry picked from commit ed55950164
)
This commit is contained in:
parent
fc8d9cc758
commit
36aae8050e
|
@ -1392,6 +1392,9 @@ Release 2.7.3 - UNRELEASED
|
||||||
YARN-4428. Redirect RM page to AHS page when AHS turned on and RM page is
|
YARN-4428. Redirect RM page to AHS page when AHS turned on and RM page is
|
||||||
not available (Chang Li via jlowe)
|
not available (Chang Li via jlowe)
|
||||||
|
|
||||||
|
YARN-3102. Decommisioned Nodes not listed in Web UI (Kuhu Shukla via
|
||||||
|
jlowe)
|
||||||
|
|
||||||
Release 2.7.2 - 2016-01-25
|
Release 2.7.2 - 2016-01-25
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.service.CompositeService;
|
import org.apache.hadoop.service.CompositeService;
|
||||||
import org.apache.hadoop.util.HostsFileReader;
|
import org.apache.hadoop.util.HostsFileReader;
|
||||||
|
@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
@ -96,7 +98,7 @@ public class NodesListManager extends CompositeService implements
|
||||||
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
||||||
this.hostsReader =
|
this.hostsReader =
|
||||||
createHostsFileReader(this.includesFile, this.excludesFile);
|
createHostsFileReader(this.includesFile, this.excludesFile);
|
||||||
setDecomissionedNMsMetrics();
|
setDecomissionedNMs();
|
||||||
printConfiguredHosts();
|
printConfiguredHosts();
|
||||||
} catch (YarnException ex) {
|
} catch (YarnException ex) {
|
||||||
disableHostsFileReader(ex);
|
disableHostsFileReader(ex);
|
||||||
|
@ -158,9 +160,24 @@ public class NodesListManager extends CompositeService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setDecomissionedNMsMetrics() {
|
private void setDecomissionedNMs() {
|
||||||
Set<String> excludeList = hostsReader.getExcludedHosts();
|
Set<String> excludeList = hostsReader.getExcludedHosts();
|
||||||
ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size());
|
for (final String host : excludeList) {
|
||||||
|
UnknownNodeId nodeId = new UnknownNodeId(host);
|
||||||
|
RMNodeImpl rmNode = new RMNodeImpl(nodeId,
|
||||||
|
rmContext, host, -1, -1, new UnknownNode(host), null, null);
|
||||||
|
|
||||||
|
RMNode prevRMNode =
|
||||||
|
rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
|
||||||
|
if (prevRMNode != null) {
|
||||||
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMNodeEvent(prevRMNode.getNodeID(),
|
||||||
|
RMNodeEventType.DECOMMISSION));
|
||||||
|
} else {
|
||||||
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -335,7 +352,7 @@ public class NodesListManager extends CompositeService implements
|
||||||
conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
||||||
this.hostsReader =
|
this.hostsReader =
|
||||||
createHostsFileReader(this.includesFile, this.excludesFile);
|
createHostsFileReader(this.includesFile, this.excludesFile);
|
||||||
setDecomissionedNMsMetrics();
|
setDecomissionedNMs();
|
||||||
} catch (IOException ioe2) {
|
} catch (IOException ioe2) {
|
||||||
// Should *never* happen
|
// Should *never* happen
|
||||||
this.hostsReader = null;
|
this.hostsReader = null;
|
||||||
|
@ -418,4 +435,98 @@ public class NodesListManager extends CompositeService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A NodeId instance needed upon startup for populating inactive nodes Map.
|
||||||
|
* It only knows the hostname/ip and marks the port to -1 or invalid.
|
||||||
|
*/
|
||||||
|
public static class UnknownNodeId extends NodeId {
|
||||||
|
|
||||||
|
private String host;
|
||||||
|
|
||||||
|
public UnknownNodeId(String host) {
|
||||||
|
this.host = host;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getHost() {
|
||||||
|
return this.host;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setHost(String hst) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getPort() {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setPort(int port) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void build() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Node instance needed upon startup for populating inactive nodes Map.
|
||||||
|
* It only knows its hostname/ip.
|
||||||
|
*/
|
||||||
|
private static class UnknownNode implements Node {
|
||||||
|
|
||||||
|
private String host;
|
||||||
|
|
||||||
|
public UnknownNode(String host) {
|
||||||
|
this.host = host;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getNetworkLocation() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNetworkLocation(String location) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return host;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Node getParent() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setParent(Node parent) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getLevel() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setLevel(int i) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getHost() {
|
||||||
|
return host;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHost(String hst) {
|
||||||
|
this.host = hst;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -203,6 +203,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
clusterTimeStamp = timestamp;
|
clusterTimeStamp = timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Dispatcher getRmDispatcher() {
|
||||||
|
return rmDispatcher;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
@ -172,6 +173,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
.addTransition(NodeState.NEW, NodeState.NEW,
|
.addTransition(NodeState.NEW, NodeState.NEW,
|
||||||
RMNodeEventType.RESOURCE_UPDATE,
|
RMNodeEventType.RESOURCE_UPDATE,
|
||||||
new UpdateNodeResourceWhenUnusableTransition())
|
new UpdateNodeResourceWhenUnusableTransition())
|
||||||
|
.addTransition(NodeState.NEW, NodeState.DECOMMISSIONED,
|
||||||
|
RMNodeEventType.DECOMMISSION,
|
||||||
|
new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
|
||||||
|
|
||||||
//Transitions from RUNNING state
|
//Transitions from RUNNING state
|
||||||
.addTransition(NodeState.RUNNING,
|
.addTransition(NodeState.RUNNING,
|
||||||
|
@ -691,6 +695,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
case UNHEALTHY:
|
case UNHEALTHY:
|
||||||
metrics.decrNumUnhealthyNMs();
|
metrics.decrNumUnhealthyNMs();
|
||||||
break;
|
break;
|
||||||
|
case NEW:
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
LOG.warn("Unexpected initial state");
|
LOG.warn("Unexpected initial state");
|
||||||
}
|
}
|
||||||
|
@ -768,12 +774,18 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
List<NMContainerStatus> containers = null;
|
List<NMContainerStatus> containers = null;
|
||||||
|
|
||||||
NodeId nodeId = rmNode.nodeId;
|
NodeId nodeId = rmNode.nodeId;
|
||||||
if (rmNode.context.getInactiveRMNodes().containsKey(nodeId)) {
|
RMNode previousRMNode =
|
||||||
// Old node rejoining
|
rmNode.context.getInactiveRMNodes().remove(nodeId);
|
||||||
RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(nodeId);
|
if (previousRMNode != null) {
|
||||||
rmNode.context.getInactiveRMNodes().remove(nodeId);
|
rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
|
||||||
rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
|
|
||||||
} else {
|
} else {
|
||||||
|
NodesListManager.UnknownNodeId unknownNodeId =
|
||||||
|
new NodesListManager.UnknownNodeId(nodeId.getHost());
|
||||||
|
previousRMNode =
|
||||||
|
rmNode.context.getInactiveRMNodes().remove(unknownNodeId);
|
||||||
|
if (previousRMNode != null) {
|
||||||
|
ClusterMetrics.getMetrics().decrDecommisionedNMs();
|
||||||
|
}
|
||||||
// Increment activeNodes explicitly because this is a new node.
|
// Increment activeNodes explicitly because this is a new node.
|
||||||
ClusterMetrics.getMetrics().incrNumActiveNodes();
|
ClusterMetrics.getMetrics().incrNumActiveNodes();
|
||||||
containers = startEvent.getNMContainerStatuses();
|
containers = startEvent.getNMContainerStatuses();
|
||||||
|
|
|
@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
|
@ -142,6 +144,20 @@ public class MockRM extends ResourceManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Dispatcher createDispatcher() {
|
||||||
|
return new DrainDispatcher();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void drainEvents() {
|
||||||
|
Dispatcher rmDispatcher = getRmDispatcher();
|
||||||
|
if (rmDispatcher instanceof DrainDispatcher) {
|
||||||
|
((DrainDispatcher) rmDispatcher).await();
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedOperationException("Not a Drain Dispatcher!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void waitForState(ApplicationId appId, RMAppState finalState)
|
public void waitForState(ApplicationId appId, RMAppState finalState)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
RMApp app = getRMContext().getRMApps().get(appId);
|
RMApp app = getRMContext().getRMApps().get(appId);
|
||||||
|
|
|
@ -1888,15 +1888,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||||
hostFile.getAbsolutePath());
|
hostFile.getAbsolutePath());
|
||||||
writeToHostsFile("");
|
writeToHostsFile("");
|
||||||
final DrainDispatcher dispatcher = new DrainDispatcher();
|
|
||||||
MockRM rm1 = null, rm2 = null;
|
MockRM rm1 = null, rm2 = null;
|
||||||
try {
|
try {
|
||||||
rm1 = new MockRM(conf) {
|
rm1 = new MockRM(conf);
|
||||||
@Override
|
|
||||||
protected Dispatcher createDispatcher() {
|
|
||||||
return dispatcher;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
|
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
|
||||||
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
|
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
|
||||||
|
@ -1917,7 +1911,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
Assert.assertTrue("The decommisioned metrics are not updated",
|
Assert.assertTrue("The decommisioned metrics are not updated",
|
||||||
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
|
||||||
dispatcher.await();
|
rm1.drainEvents();
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(2,
|
.assertEquals(2,
|
||||||
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
@ -1930,6 +1924,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
// restart RM.
|
// restart RM.
|
||||||
rm2 = new MockRM(conf);
|
rm2 = new MockRM(conf);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
rm2.drainEvents();
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(2,
|
.assertEquals(2,
|
||||||
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
|
|
@ -168,27 +168,21 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
.getAbsolutePath());
|
.getAbsolutePath());
|
||||||
|
|
||||||
writeToHostsFile("");
|
writeToHostsFile("");
|
||||||
final DrainDispatcher dispatcher = new DrainDispatcher();
|
rm = new MockRM(conf);
|
||||||
rm = new MockRM(conf) {
|
|
||||||
@Override
|
|
||||||
protected Dispatcher createDispatcher() {
|
|
||||||
return dispatcher;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
||||||
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
|
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
|
||||||
|
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
|
|
||||||
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
|
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
|
||||||
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
|
|
||||||
// To test that IPs also work
|
// To test that IPs also work
|
||||||
String ip = NetUtils.normalizeHostName("localhost");
|
String ip = NetUtils.normalizeHostName("localhost");
|
||||||
|
@ -207,15 +201,15 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
||||||
Assert.assertTrue("The decommisioned metrics are not updated",
|
Assert.assertTrue("The decommisioned metrics are not updated",
|
||||||
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
|
|
||||||
writeToHostsFile("");
|
writeToHostsFile("");
|
||||||
rm.getNodesListManager().refreshNodes(conf);
|
rm.getNodesListManager().refreshNodes(conf);
|
||||||
|
|
||||||
nm3 = rm.registerNode("localhost:4433", 1024);
|
nm3 = rm.registerNode("localhost:4433", 1024);
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
// decommissined node is 1 since 1 node is rejoined after updating exclude
|
// decommissined node is 1 since 1 node is rejoined after updating exclude
|
||||||
// file
|
// file
|
||||||
|
@ -990,7 +984,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReconnectNode() throws Exception {
|
public void testReconnectNode() throws Exception {
|
||||||
final DrainDispatcher dispatcher = new DrainDispatcher();
|
|
||||||
rm = new MockRM() {
|
rm = new MockRM() {
|
||||||
@Override
|
@Override
|
||||||
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
||||||
|
@ -1001,11 +994,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Dispatcher createDispatcher() {
|
|
||||||
return dispatcher;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
|
@ -1013,7 +1001,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
MockNM nm2 = rm.registerNode("host2:5678", 5120);
|
MockNM nm2 = rm.registerNode("host2:5678", 5120);
|
||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
nm2.nodeHeartbeat(false);
|
nm2.nodeHeartbeat(false);
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
checkUnealthyNMCount(rm, nm2, true, 1);
|
checkUnealthyNMCount(rm, nm2, true, 1);
|
||||||
final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
|
final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
|
||||||
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
||||||
|
@ -1024,7 +1012,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
nm1 = rm.registerNode("host1:1234", 5120);
|
nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
|
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
|
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
|
||||||
checkUnealthyNMCount(rm, nm2, true, 1);
|
checkUnealthyNMCount(rm, nm2, true, 1);
|
||||||
|
|
||||||
|
@ -1032,23 +1020,23 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
nm2 = rm.registerNode("host2:5678", 5120);
|
nm2 = rm.registerNode("host2:5678", 5120);
|
||||||
response = nm2.nodeHeartbeat(false);
|
response = nm2.nodeHeartbeat(false);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
|
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
|
||||||
checkUnealthyNMCount(rm, nm2, true, 1);
|
checkUnealthyNMCount(rm, nm2, true, 1);
|
||||||
|
|
||||||
// unhealthy node changed back to healthy
|
// unhealthy node changed back to healthy
|
||||||
nm2 = rm.registerNode("host2:5678", 5120);
|
nm2 = rm.registerNode("host2:5678", 5120);
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
response = nm2.nodeHeartbeat(true);
|
response = nm2.nodeHeartbeat(true);
|
||||||
response = nm2.nodeHeartbeat(true);
|
response = nm2.nodeHeartbeat(true);
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
Assert.assertEquals(5120 + 5120, metrics.getAvailableMB());
|
Assert.assertEquals(5120 + 5120, metrics.getAvailableMB());
|
||||||
|
|
||||||
// reconnect of node with changed capability
|
// reconnect of node with changed capability
|
||||||
nm1 = rm.registerNode("host2:5678", 10240);
|
nm1 = rm.registerNode("host2:5678", 10240);
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
response = nm1.nodeHeartbeat(true);
|
response = nm1.nodeHeartbeat(true);
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
||||||
Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
|
Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
|
||||||
|
|
||||||
|
@ -1056,9 +1044,9 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
|
List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
|
||||||
runningApps.add(ApplicationId.newInstance(1, 0));
|
runningApps.add(ApplicationId.newInstance(1, 0));
|
||||||
nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps);
|
nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps);
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
response = nm1.nodeHeartbeat(true);
|
response = nm1.nodeHeartbeat(true);
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
||||||
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
|
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
|
||||||
|
|
||||||
|
@ -1066,10 +1054,10 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
|
nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
|
||||||
nm1.setHttpPort(3);
|
nm1.setHttpPort(3);
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
response = nm1.nodeHeartbeat(true);
|
response = nm1.nodeHeartbeat(true);
|
||||||
response = nm1.nodeHeartbeat(true);
|
response = nm1.nodeHeartbeat(true);
|
||||||
dispatcher.await();
|
rm.drainEvents();
|
||||||
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
|
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
Assert.assertEquals(3, rmNode.getHttpPort());
|
Assert.assertEquals(3, rmNode.getHttpPort());
|
||||||
Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory());
|
Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory());
|
||||||
|
@ -1184,14 +1172,116 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
|
checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testInitDecommMetric() throws Exception {
|
||||||
|
testInitDecommMetricHelper(true);
|
||||||
|
testInitDecommMetricHelper(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testInitDecommMetricHelper(boolean hasIncludeList)
|
||||||
|
throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
|
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
File excludeHostFile =
|
||||||
|
new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
|
||||||
|
writeToHostsFile(excludeHostFile, "host1");
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||||
|
excludeHostFile.getAbsolutePath());
|
||||||
|
|
||||||
|
if (hasIncludeList) {
|
||||||
|
writeToHostsFile(hostFile, "host1", "host2");
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||||
|
hostFile.getAbsolutePath());
|
||||||
|
}
|
||||||
|
rm.getNodesListManager().refreshNodes(conf);
|
||||||
|
rm.drainEvents();
|
||||||
|
rm.stop();
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
nm1 = rm1.registerNode("host1:1234", 5120);
|
||||||
|
nm2 = rm1.registerNode("host2:5678", 10240);
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
rm1.drainEvents();
|
||||||
|
Assert.assertEquals("Number of Decommissioned nodes should be 1",
|
||||||
|
1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
Assert.assertEquals("The inactiveRMNodes should contain an entry for the" +
|
||||||
|
"decommissioned node",
|
||||||
|
1, rm1.getRMContext().getInactiveRMNodes().size());
|
||||||
|
excludeHostFile =
|
||||||
|
new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
|
||||||
|
writeToHostsFile(excludeHostFile, "");
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||||
|
excludeHostFile.getAbsolutePath());
|
||||||
|
rm1.getNodesListManager().refreshNodes(conf);
|
||||||
|
nm1 = rm1.registerNode("host1:1234", 5120);
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
rm1.drainEvents();
|
||||||
|
Assert.assertEquals("The decommissioned nodes metric should have " +
|
||||||
|
"decremented to 0",
|
||||||
|
0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
Assert.assertEquals("The active nodes metric should be 2",
|
||||||
|
2, ClusterMetrics.getMetrics().getNumActiveNMs());
|
||||||
|
Assert.assertEquals("The inactive RMNodes entry should have been removed",
|
||||||
|
0, rm1.getRMContext().getInactiveRMNodes().size());
|
||||||
|
rm1.drainEvents();
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testInitDecommMetricNoRegistration() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
|
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
//host3 will not register or heartbeat
|
||||||
|
File excludeHostFile =
|
||||||
|
new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
|
||||||
|
writeToHostsFile(excludeHostFile, "host3", "host2");
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||||
|
excludeHostFile.getAbsolutePath());
|
||||||
|
writeToHostsFile(hostFile, "host1", "host2");
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||||
|
hostFile.getAbsolutePath());
|
||||||
|
rm.getNodesListManager().refreshNodes(conf);
|
||||||
|
rm.drainEvents();
|
||||||
|
Assert.assertEquals("The decommissioned nodes metric should be 1 ",
|
||||||
|
1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
rm.stop();
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
rm1.getNodesListManager().refreshNodes(conf);
|
||||||
|
rm1.drainEvents();
|
||||||
|
Assert.assertEquals("The decommissioned nodes metric should be 2 ",
|
||||||
|
2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
|
||||||
private void writeToHostsFile(String... hosts) throws IOException {
|
private void writeToHostsFile(String... hosts) throws IOException {
|
||||||
if (!hostFile.exists()) {
|
writeToHostsFile(hostFile, hosts);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeToHostsFile(File file, String... hosts)
|
||||||
|
throws IOException {
|
||||||
|
if (!file.exists()) {
|
||||||
TEMP_DIR.mkdirs();
|
TEMP_DIR.mkdirs();
|
||||||
hostFile.createNewFile();
|
file.createNewFile();
|
||||||
}
|
}
|
||||||
FileOutputStream fStream = null;
|
FileOutputStream fStream = null;
|
||||||
try {
|
try {
|
||||||
fStream = new FileOutputStream(hostFile);
|
fStream = new FileOutputStream(file);
|
||||||
for (int i = 0; i < hosts.length; i++) {
|
for (int i = 0; i < hosts.length; i++) {
|
||||||
fStream.write(hosts[i].getBytes());
|
fStream.write(hosts[i].getBytes());
|
||||||
fStream.write("\n".getBytes());
|
fStream.write("\n".getBytes());
|
||||||
|
|
Loading…
Reference in New Issue