MAPREDUCE-3873. Fixed NodeManagers' decommissioning at RM to accept IP addresses also. Contributed by xieguiming and vinodkv.
svn merge --ignore-ancestry -c 1346671 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1346673 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b93758626e
commit
cb4b43b37e
|
@ -45,6 +45,9 @@ Release 2.0.1-alpha - UNRELEASED
|
||||||
MAPREDUCE-4313. TestTokenCache doesn't compile due
|
MAPREDUCE-4313. TestTokenCache doesn't compile due
|
||||||
TokenCache.getDelegationToken compilation error (bobby)
|
TokenCache.getDelegationToken compilation error (bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-3873. Fixed NodeManagers' decommissioning at RM to accept IP
|
||||||
|
addresses also. (xieguiming via vinodkv)
|
||||||
|
|
||||||
Release 2.0.0-alpha - 05-23-2012
|
Release 2.0.0-alpha - 05-23-2012
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.util.HostsFileReader;
|
import org.apache.hadoop.util.HostsFileReader;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public class NodesListManager extends AbstractService implements
|
public class NodesListManager extends AbstractService implements
|
||||||
EventHandler<NodesListManagerEvent> {
|
EventHandler<NodesListManagerEvent> {
|
||||||
|
|
||||||
|
@ -112,8 +114,10 @@ public class NodesListManager extends AbstractService implements
|
||||||
synchronized (hostsReader) {
|
synchronized (hostsReader) {
|
||||||
Set<String> hostsList = hostsReader.getHosts();
|
Set<String> hostsList = hostsReader.getHosts();
|
||||||
Set<String> excludeList = hostsReader.getExcludedHosts();
|
Set<String> excludeList = hostsReader.getExcludedHosts();
|
||||||
return ((hostsList.isEmpty() || hostsList.contains(hostName)) &&
|
String ip = NetUtils.normalizeHostName(hostName);
|
||||||
!excludeList.contains(hostName));
|
return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
|
||||||
|
.contains(ip))
|
||||||
|
&& !(excludeList.contains(hostName) || excludeList.contains(ip));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,14 +28,14 @@ import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
|
@ -52,8 +52,6 @@ public class TestResourceTrackerService {
|
||||||
"test.build.data", "/tmp"), "decommision");
|
"test.build.data", "/tmp"), "decommision");
|
||||||
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
|
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
|
||||||
private MockRM rm;
|
private MockRM rm;
|
||||||
private static final RecordFactory recordFactory = RecordFactoryProvider
|
|
||||||
.getRecordFactory(null);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* decommissioning using a include hosts file
|
* decommissioning using a include hosts file
|
||||||
|
@ -61,9 +59,9 @@ public class TestResourceTrackerService {
|
||||||
@Test
|
@Test
|
||||||
public void testDecommissionWithIncludeHosts() throws Exception {
|
public void testDecommissionWithIncludeHosts() throws Exception {
|
||||||
|
|
||||||
writeToHostsFile("host1", "host2");
|
writeToHostsFile("localhost", "host1", "host2");
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set("yarn.resourcemanager.nodes.include-path", hostFile
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
|
||||||
.getAbsolutePath());
|
.getAbsolutePath());
|
||||||
|
|
||||||
rm = new MockRM(conf);
|
rm = new MockRM(conf);
|
||||||
|
@ -71,17 +69,22 @@ public class TestResourceTrackerService {
|
||||||
|
|
||||||
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
||||||
|
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
|
||||||
|
|
||||||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||||
assert(metrics != null);
|
assert(metrics != null);
|
||||||
int initialMetricCount = metrics.getNumDecommisionedNMs();
|
int metricCount = metrics.getNumDecommisionedNMs();
|
||||||
|
|
||||||
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
|
||||||
writeToHostsFile("host1");
|
// To test that IPs also work
|
||||||
|
String ip = NetUtils.normalizeHostName("localhost");
|
||||||
|
writeToHostsFile("host1", ip);
|
||||||
|
|
||||||
rm.getNodesListManager().refreshNodes();
|
rm.getNodesListManager().refreshNodes();
|
||||||
|
|
||||||
|
@ -94,7 +97,12 @@ public class TestResourceTrackerService {
|
||||||
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
|
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
|
||||||
.equals(nodeHeartbeat.getNodeAction()));
|
.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
|
||||||
checkDecommissionedNMCount(rm, ++initialMetricCount);
|
checkDecommissionedNMCount(rm, ++metricCount);
|
||||||
|
|
||||||
|
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
|
||||||
|
.getNumDecommisionedNMs());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -103,7 +111,7 @@ public class TestResourceTrackerService {
|
||||||
@Test
|
@Test
|
||||||
public void testDecommissionWithExcludeHosts() throws Exception {
|
public void testDecommissionWithExcludeHosts() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
|
||||||
.getAbsolutePath());
|
.getAbsolutePath());
|
||||||
|
|
||||||
writeToHostsFile("");
|
writeToHostsFile("");
|
||||||
|
@ -112,16 +120,18 @@ public class TestResourceTrackerService {
|
||||||
|
|
||||||
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
||||||
|
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
|
||||||
|
|
||||||
int initialMetricCount = ClusterMetrics.getMetrics()
|
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
|
||||||
.getNumDecommisionedNMs();
|
|
||||||
|
|
||||||
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
|
||||||
writeToHostsFile("host2");
|
// To test that IPs also work
|
||||||
|
String ip = NetUtils.normalizeHostName("localhost");
|
||||||
|
writeToHostsFile("host2", ip);
|
||||||
|
|
||||||
rm.getNodesListManager().refreshNodes();
|
rm.getNodesListManager().refreshNodes();
|
||||||
|
|
||||||
|
@ -130,14 +140,19 @@ public class TestResourceTrackerService {
|
||||||
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
Assert.assertTrue("The decommisioned metrics are not updated",
|
Assert.assertTrue("The decommisioned metrics are not updated",
|
||||||
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
checkDecommissionedNMCount(rm, ++initialMetricCount);
|
checkDecommissionedNMCount(rm, ++metricCount);
|
||||||
|
|
||||||
|
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue("The decommisioned metrics are not updated",
|
||||||
|
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
checkDecommissionedNMCount(rm, ++metricCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeRegistrationFailure() throws Exception {
|
public void testNodeRegistrationFailure() throws Exception {
|
||||||
writeToHostsFile("host1");
|
writeToHostsFile("host1");
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set("yarn.resourcemanager.nodes.include-path", hostFile
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
|
||||||
.getAbsolutePath());
|
.getAbsolutePath());
|
||||||
rm = new MockRM(conf);
|
rm = new MockRM(conf);
|
||||||
rm.start();
|
rm.start();
|
||||||
|
@ -191,7 +206,7 @@ public class TestResourceTrackerService {
|
||||||
@Test
|
@Test
|
||||||
public void testUnhealthyNodeStatus() throws Exception {
|
public void testUnhealthyNodeStatus() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
|
||||||
.getAbsolutePath());
|
.getAbsolutePath());
|
||||||
|
|
||||||
rm = new MockRM(conf);
|
rm = new MockRM(conf);
|
||||||
|
|
Loading…
Reference in New Issue