YARN-1071. Enabled ResourceManager to recover cluster metrics numDecommissionedNMs after restarting. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1570469 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4f8a487fcd
commit
da20095284
|
@ -324,6 +324,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
YARN-1398. Fixed a deadlock in ResourceManager between users requesting
|
YARN-1398. Fixed a deadlock in ResourceManager between users requesting
|
||||||
queue-acls and completing containers. (vinodkv)
|
queue-acls and completing containers. (vinodkv)
|
||||||
|
|
||||||
|
YARN-1071. Enabled ResourceManager to recover cluster metrics
|
||||||
|
numDecommissionedNMs after restarting. (Jian He via zjshen)
|
||||||
|
|
||||||
Release 2.3.1 - UNRELEASED
|
Release 2.3.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -92,6 +92,10 @@ public class ClusterMetrics {
|
||||||
numDecommissionedNMs.incr();
|
numDecommissionedNMs.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setDecommisionedNMs(int num) {
|
||||||
|
numDecommissionedNMs.set(num);
|
||||||
|
}
|
||||||
|
|
||||||
public void decrDecommisionedNMs() {
|
public void decrDecommisionedNMs() {
|
||||||
numDecommissionedNMs.decr();
|
numDecommissionedNMs.decr();
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,6 +75,7 @@ public class NodesListManager extends AbstractService implements
|
||||||
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
||||||
this.hostsReader =
|
this.hostsReader =
|
||||||
createHostsFileReader(this.includesFile, this.excludesFile);
|
createHostsFileReader(this.includesFile, this.excludesFile);
|
||||||
|
setDecomissionedNMsMetrics();
|
||||||
printConfiguredHosts();
|
printConfiguredHosts();
|
||||||
} catch (YarnException ex) {
|
} catch (YarnException ex) {
|
||||||
disableHostsFileReader(ex);
|
disableHostsFileReader(ex);
|
||||||
|
@ -120,10 +121,16 @@ public class NodesListManager extends AbstractService implements
|
||||||
this.conf, includesFile), excludesFile.isEmpty() ? null
|
this.conf, includesFile), excludesFile.isEmpty() ? null
|
||||||
: this.rmContext.getConfigurationProvider()
|
: this.rmContext.getConfigurationProvider()
|
||||||
.getConfigurationInputStream(this.conf, excludesFile));
|
.getConfigurationInputStream(this.conf, excludesFile));
|
||||||
|
setDecomissionedNMsMetrics();
|
||||||
printConfiguredHosts();
|
printConfiguredHosts();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setDecomissionedNMsMetrics() {
|
||||||
|
Set<String> excludeList = hostsReader.getExcludedHosts();
|
||||||
|
ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size());
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isValidNode(String hostName) {
|
public boolean isValidNode(String hostName) {
|
||||||
synchronized (hostsReader) {
|
synchronized (hostsReader) {
|
||||||
Set<String> hostsList = hostsReader.getHosts();
|
Set<String> hostsList = hostsReader.getHosts();
|
||||||
|
@ -190,6 +197,7 @@ public class NodesListManager extends AbstractService implements
|
||||||
conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
||||||
this.hostsReader =
|
this.hostsReader =
|
||||||
createHostsFileReader(this.includesFile, this.excludesFile);
|
createHostsFileReader(this.includesFile, this.excludesFile);
|
||||||
|
setDecomissionedNMsMetrics();
|
||||||
} catch (IOException ioe2) {
|
} catch (IOException ioe2) {
|
||||||
// Should *never* happen
|
// Should *never* happen
|
||||||
this.hostsReader = null;
|
this.hostsReader = null;
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -424,9 +425,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Decomissioned NMs equals to the nodes missing in include list (if
|
||||||
|
// include list not empty) or the nodes listed in excluded list.
|
||||||
|
// DecomissionedNMs as per exclude list is set upfront when the
|
||||||
|
// exclude list is read so that RM restart can also reflect the
|
||||||
|
// decomissionedNMs. Note that RM is still not able to know decomissionedNMs
|
||||||
|
// as per include list after it restarts as they are known when those nodes
|
||||||
|
// come for registration.
|
||||||
|
// DecomissionedNMs as per include list is incremented in this transition.
|
||||||
switch (finalState) {
|
switch (finalState) {
|
||||||
case DECOMMISSIONED:
|
case DECOMMISSIONED:
|
||||||
metrics.incrDecommisionedNMs();
|
Set<String> ecludedHosts =
|
||||||
|
context.getNodesListManager().getHostsReader().getExcludedHosts();
|
||||||
|
if (!ecludedHosts.contains(hostName)
|
||||||
|
&& !ecludedHosts.contains(NetUtils.normalizeHostName(hostName))) {
|
||||||
|
metrics.incrDecommisionedNMs();
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case LOST:
|
case LOST:
|
||||||
metrics.incrNumLostNMs();
|
metrics.incrNumLostNMs();
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.List;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.util.HostsFileReader;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
@ -102,6 +103,10 @@ public class TestRMNodeTransitions {
|
||||||
rmContext =
|
rmContext =
|
||||||
new RMContextImpl(rmDispatcher, null, null, null,
|
new RMContextImpl(rmDispatcher, null, null, null,
|
||||||
mock(DelegationTokenRenewer.class), null, null, null, null, null);
|
mock(DelegationTokenRenewer.class), null, null, null, null, null);
|
||||||
|
NodesListManager nodesListManager = mock(NodesListManager.class);
|
||||||
|
HostsFileReader reader = mock(HostsFileReader.class);
|
||||||
|
when(nodesListManager.getHostsReader()).thenReturn(reader);
|
||||||
|
((RMContextImpl) rmContext).setNodesListManager(nodesListManager);
|
||||||
scheduler = mock(YarnScheduler.class);
|
scheduler = mock(YarnScheduler.class);
|
||||||
doAnswer(
|
doAnswer(
|
||||||
new Answer<Void>() {
|
new Answer<Void>() {
|
||||||
|
|
|
@ -23,6 +23,8 @@ import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
@ -38,7 +40,9 @@ import java.util.Set;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
@ -90,12 +94,16 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestRMRestart {
|
public class TestRMRestart {
|
||||||
|
|
||||||
|
private final static File TEMP_DIR = new File(System.getProperty(
|
||||||
|
"test.build.data", "/tmp"), "decommision");
|
||||||
|
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
|
||||||
private YarnConfiguration conf;
|
private YarnConfiguration conf;
|
||||||
|
|
||||||
// Fake rmAddr for token-renewal
|
// Fake rmAddr for token-renewal
|
||||||
|
@ -113,6 +121,11 @@ public class TestRMRestart {
|
||||||
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
|
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
TEMP_DIR.delete();
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
@Test (timeout=180000)
|
@Test (timeout=180000)
|
||||||
public void testRMRestart() throws Exception {
|
public void testRMRestart() throws Exception {
|
||||||
|
@ -1666,6 +1679,56 @@ public class TestRMRestart {
|
||||||
appsCompleted + appsCompletedCarryOn);
|
appsCompleted + appsCompletedCarryOn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDecomissionedNMsMetricsOnRMRestart() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||||
|
hostFile.getAbsolutePath());
|
||||||
|
writeToHostsFile("");
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
rm1.registerNode("localhost:1234", 8000);
|
||||||
|
rm1.registerNode("host2:1234", 8000);
|
||||||
|
Assert
|
||||||
|
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
String ip = NetUtils.normalizeHostName("localhost");
|
||||||
|
// Add 2 hosts to exclude list.
|
||||||
|
writeToHostsFile("host2", ip);
|
||||||
|
|
||||||
|
// refresh nodes
|
||||||
|
rm1.getNodesListManager().refreshNodes(conf);
|
||||||
|
Assert
|
||||||
|
.assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
|
||||||
|
// restart RM.
|
||||||
|
MockRM rm2 = new MockRM(conf);
|
||||||
|
rm2.start();
|
||||||
|
Assert
|
||||||
|
.assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeToHostsFile(String... hosts) throws IOException {
|
||||||
|
if (!hostFile.exists()) {
|
||||||
|
TEMP_DIR.mkdirs();
|
||||||
|
hostFile.createNewFile();
|
||||||
|
}
|
||||||
|
FileOutputStream fStream = null;
|
||||||
|
try {
|
||||||
|
fStream = new FileOutputStream(hostFile);
|
||||||
|
for (int i = 0; i < hosts.length; i++) {
|
||||||
|
fStream.write(hosts[i].getBytes());
|
||||||
|
fStream.write(System.getProperty("line.separator").getBytes());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (fStream != null) {
|
||||||
|
IOUtils.closeStream(fStream);
|
||||||
|
fStream = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public class TestMemoryRMStateStore extends MemoryRMStateStore {
|
public class TestMemoryRMStateStore extends MemoryRMStateStore {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
public int updateApp = 0;
|
public int updateApp = 0;
|
||||||
|
|
|
@ -150,7 +150,6 @@ public class TestResourceTrackerService {
|
||||||
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
|
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
|
||||||
|
|
||||||
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
|
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
|
||||||
|
|
||||||
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
|
@ -161,18 +160,17 @@ public class TestResourceTrackerService {
|
||||||
writeToHostsFile("host2", ip);
|
writeToHostsFile("host2", ip);
|
||||||
|
|
||||||
rm.getNodesListManager().refreshNodes(conf);
|
rm.getNodesListManager().refreshNodes(conf);
|
||||||
|
checkDecommissionedNMCount(rm, metricCount + 2);
|
||||||
|
|
||||||
nodeHeartbeat = nm1.nodeHeartbeat(true);
|
nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
Assert.assertTrue("The decommisioned metrics are not updated",
|
Assert.assertTrue("The decommisioned metrics are not updated",
|
||||||
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
checkDecommissionedNMCount(rm, ++metricCount);
|
|
||||||
|
|
||||||
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
||||||
Assert.assertTrue("The decommisioned metrics are not updated",
|
Assert.assertTrue("The decommisioned metrics are not updated",
|
||||||
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
checkDecommissionedNMCount(rm, ++metricCount);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue