From d464483bf7f0b3e3be3ba32cd6c3eee546747ab5 Mon Sep 17 00:00:00 2001 From: Junping Du Date: Thu, 18 Aug 2016 07:23:29 -0700 Subject: [PATCH] YARN-4676. Automatic and Asynchronous Decommissioning Nodes Status Tracking. Contributed by Diniel Zhi. --- .../apache/hadoop/util/HostsFileReader.java | 111 ++++- .../hadoop/util/TestHostsFileReader.java | 64 ++- hadoop-project/src/site/site.xml | 1 + .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 5 + .../yarn/sls/scheduler/RMNodeWrapper.java | 5 + .../hadoop/yarn/conf/YarnConfiguration.java | 14 + .../protocolrecords/RefreshNodesRequest.java | 26 +- ...erver_resourcemanager_service_protos.proto | 1 + .../hadoop/yarn/client/cli/RMAdminCLI.java | 166 ++++--- .../yarn/client/cli/TestRMAdminCLI.java | 24 +- .../impl/pb/RefreshNodesRequestPBImpl.java | 17 +- .../src/main/resources/yarn-default.xml | 18 + .../server/resourcemanager/AdminService.java | 3 +- .../DecommissioningNodesWatcher.java | 439 ++++++++++++++++++ .../resourcemanager/NodesListManager.java | 168 +++++-- .../server/resourcemanager/RMServerUtils.java | 2 +- .../ResourceTrackerService.java | 19 + .../server/resourcemanager/rmnode/RMNode.java | 7 + .../rmnode/RMNodeDecommissioningEvent.java | 41 ++ .../resourcemanager/rmnode/RMNodeImpl.java | 54 ++- .../webapp/dao/ClusterMetricsInfo.java | 2 +- .../server/resourcemanager/MockNodes.java | 5 + .../yarn/server/resourcemanager/MockRM.java | 11 +- .../TestDecommissioningNodesWatcher.java | 131 ++++++ .../TestRMNodeTransitions.java | 11 - .../TestResourceTrackerService.java | 199 ++++++-- .../resourcetracker/TestNMReconnect.java | 2 - .../src/site/markdown/YarnCommands.md | 2 +- 28 files changed, 1328 insertions(+), 220 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index 1cba4266afe..2ef1ead3ab3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -21,16 +21,27 @@ package org.apache.hadoop.util; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.Set; +import java.util.HashMap; import java.util.HashSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.Map; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; -import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; // Keeps track of which datanodes/tasktrackers are allowed to connect to the // namenode/jobtracker. @@ -38,7 +49,9 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable public class HostsFileReader { private Set includes; - private Set excludes; + // exclude host list with optional timeout. + // If the value is null, it indicates default timeout. + private Map excludes; private String includesFile; private String excludesFile; private WriteLock writeLock; @@ -49,7 +62,7 @@ public class HostsFileReader { public HostsFileReader(String inFile, String exFile) throws IOException { includes = new HashSet(); - excludes = new HashSet(); + excludes = new HashMap(); includesFile = inFile; excludesFile = exFile; ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -62,7 +75,7 @@ public class HostsFileReader { public HostsFileReader(String includesFile, InputStream inFileInputStream, String excludesFile, InputStream exFileInputStream) throws IOException { includes = new HashSet(); - excludes = new HashSet(); + excludes = new HashMap(); this.includesFile = includesFile; this.excludesFile = excludesFile; ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -121,6 +134,73 @@ public class HostsFileReader { } } + public static void readFileToMap(String type, + String filename, Map map) throws IOException { + File file = new File(filename); + FileInputStream fis = new FileInputStream(file); + readFileToMapWithFileInputStream(type, filename, fis, map); + } + + public static void readFileToMapWithFileInputStream(String type, + String filename, InputStream inputStream, Map map) + throws IOException { + // The input file could be either simple text or XML. + boolean xmlInput = filename.toLowerCase().endsWith(".xml"); + if (xmlInput) { + readXmlFileToMapWithFileInputStream(type, filename, inputStream, map); + } else { + HashSet nodes = new HashSet(); + readFileToSetWithFileInputStream(type, filename, inputStream, nodes); + for (String node : nodes) { + map.put(node, null); + } + } + } + + public static void readXmlFileToMapWithFileInputStream(String type, + String filename, InputStream fileInputStream, Map map) + throws IOException { + Document dom; + DocumentBuilderFactory builder = DocumentBuilderFactory.newInstance(); + try { + DocumentBuilder db = builder.newDocumentBuilder(); + dom = db.parse(fileInputStream); + // Examples: + // host1 + // host2123 + // host3-1 + // host4, host5,host61800 + Element doc = dom.getDocumentElement(); + NodeList nodes = doc.getElementsByTagName("host"); + for (int i = 0; i < nodes.getLength(); i++) { + Node node = nodes.item(i); + if (node.getNodeType() == Node.ELEMENT_NODE) { + Element e= (Element) node; + // Support both single host and comma-separated list of hosts. + String v = readFirstTagValue(e, "name"); + String[] hosts = StringUtils.getTrimmedStrings(v); + String str = readFirstTagValue(e, "timeout"); + Integer timeout = (str == null)? null : Integer.parseInt(str); + for (String host : hosts) { + map.put(host, timeout); + LOG.info("Adding a node \"" + host + "\" to the list of " + + type + " hosts from " + filename); + } + } + } + } catch (IOException|SAXException|ParserConfigurationException e) { + LOG.fatal("error parsing " + filename, e); + throw new RuntimeException(e); + } finally { + fileInputStream.close(); + } + } + + static String readFirstTagValue(Element e, String tag) { + NodeList nodes = e.getElementsByTagName(tag); + return (nodes.getLength() == 0)? null : nodes.item(0).getTextContent(); + } + public void refresh(String includeFiles, String excludeFiles) throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); @@ -129,7 +209,7 @@ public class HostsFileReader { // update instance variables updateFileNames(includeFiles, excludeFiles); Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); + Map newExcludes = new HashMap(); boolean switchIncludes = false; boolean switchExcludes = false; if (includeFiles != null && !includeFiles.isEmpty()) { @@ -137,7 +217,7 @@ public class HostsFileReader { switchIncludes = true; } if (excludeFiles != null && !excludeFiles.isEmpty()) { - readFileToSet("excluded", excludeFiles, newExcludes); + readFileToMap("excluded", excludeFiles, newExcludes); switchExcludes = true; } @@ -161,7 +241,7 @@ public class HostsFileReader { this.writeLock.lock(); try { Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); + Map newExcludes = new HashMap(); boolean switchIncludes = false; boolean switchExcludes = false; if (inFileInputStream != null) { @@ -170,7 +250,7 @@ public class HostsFileReader { switchIncludes = true; } if (exFileInputStream != null) { - readFileToSetWithFileInputStream("excluded", excludesFile, + readFileToMapWithFileInputStream("excluded", excludesFile, exFileInputStream, newExcludes); switchExcludes = true; } @@ -199,7 +279,7 @@ public class HostsFileReader { public Set getExcludedHosts() { this.readLock.lock(); try { - return excludes; + return excludes.keySet(); } finally { this.readLock.unlock(); } @@ -209,7 +289,18 @@ public class HostsFileReader { this.readLock.lock(); try { includes.addAll(this.includes); - excludes.addAll(this.excludes); + excludes.addAll(this.excludes.keySet()); + } finally { + this.readLock.unlock(); + } + } + + public void getHostDetails(Set includeHosts, + Map excludeHosts) { + this.readLock.lock(); + try { + includeHosts.addAll(this.includes); + excludeHosts.putAll(this.excludes); } finally { this.readLock.unlock(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java index 8015f7a1ef1..576659125c8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java @@ -20,16 +20,19 @@ package org.apache.hadoop.util; import java.io.File; import java.io.FileNotFoundException; import java.io.FileWriter; +import java.util.HashMap; import java.util.HashSet; import java.util.Set; +import java.util.Map; import org.apache.hadoop.test.GenericTestUtils; import org.junit.*; + import static org.junit.Assert.*; /* * Test for HostsFileReader.java - * + * */ public class TestHostsFileReader { @@ -39,6 +42,7 @@ public class TestHostsFileReader { File INCLUDES_FILE = new File(HOSTS_TEST_DIR, "dfs.include"); String excludesFile = HOSTS_TEST_DIR + "/dfs.exclude"; String includesFile = HOSTS_TEST_DIR + "/dfs.include"; + private String excludesXmlFile = HOSTS_TEST_DIR + "/dfs.exclude.xml"; @Before public void setUp() throws Exception { @@ -288,4 +292,62 @@ public class TestHostsFileReader { assertFalse(hfp.getExcludedHosts().contains("somehost5")); } + + /* + * Test if timeout values are provided in HostFile + */ + @Test + public void testHostFileReaderWithTimeout() throws Exception { + FileWriter efw = new FileWriter(excludesXmlFile); + FileWriter ifw = new FileWriter(includesFile); + + efw.write("\n"); + efw.write("\n"); + efw.write("\n"); + efw.write("host1\n"); + efw.write("host2123\n"); + efw.write("host3-1\n"); + efw.write("10000\n"); + efw.write("10001123\n"); + efw.write("10002-1\n"); + efw.write("host4,host5, host6" + + "1800\n"); + efw.write("\n"); + efw.close(); + + ifw.write("#Hosts-in-DFS\n"); + ifw.write(" \n"); + ifw.write(" somehost \t somehost2 \n somehost4"); + ifw.write(" somehost3 \t # somehost5"); + ifw.close(); + + HostsFileReader hfp = new HostsFileReader(includesFile, excludesXmlFile); + + int includesLen = hfp.getHosts().size(); + int excludesLen = hfp.getExcludedHosts().size(); + assertEquals(4, includesLen); + assertEquals(9, excludesLen); + + Set includes = new HashSet(); + Map excludes = new HashMap(); + hfp.getHostDetails(includes, excludes); + assertTrue(excludes.containsKey("host1")); + assertTrue(excludes.containsKey("host2")); + assertTrue(excludes.containsKey("host3")); + assertTrue(excludes.containsKey("10000")); + assertTrue(excludes.containsKey("10001")); + assertTrue(excludes.containsKey("10002")); + assertTrue(excludes.containsKey("host4")); + assertTrue(excludes.containsKey("host5")); + assertTrue(excludes.containsKey("host6")); + assertTrue(excludes.get("host1") == null); + assertTrue(excludes.get("host2") == 123); + assertTrue(excludes.get("host3") == -1); + assertTrue(excludes.get("10000") == null); + assertTrue(excludes.get("10001") == 123); + assertTrue(excludes.get("10002") == -1); + assertTrue(excludes.get("host4") == 1800); + assertTrue(excludes.get("host5") == 1800); + assertTrue(excludes.get("host6") == 1800); + } } diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml index 0705cae36fb..aed9fa3c6a0 100644 --- a/hadoop-project/src/site/site.xml +++ b/hadoop-project/src/site/site.xml @@ -132,6 +132,7 @@ + diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index bd737bd7ac4..c598aa079ee 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -213,6 +213,11 @@ public class NodeInfo { @Override public void setUntrackedTimeStamp(long timeStamp) { } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } } public static RMNode newNodeInfo(String rackName, String hostName, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 5048978ef7f..6d0ffbd26c1 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -202,4 +202,9 @@ public class RMNodeWrapper implements RMNode { @Override public void setUntrackedTimeStamp(long timeStamp) { } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 84fb0d4954b..59f9b7a149e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -769,6 +769,20 @@ public class YarnConfiguration extends Configuration { */ public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser."; + /** + * Timeout in seconds for YARN node graceful decommission. + * This is the maximal time to wait for running containers and applications + * to complete before transition a DECOMMISSIONING node into DECOMMISSIONED. + */ + public static final String RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT = + RM_PREFIX + "nodemanager-graceful-decommission-timeout-secs"; + public static final int DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT = 3600; + + public static final String RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL = + RM_PREFIX + "decommissioning-nodes-watcher.poll-interval-secs"; + public static final int + DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL = 20; + //////////////////////////////// // Node Manager Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java index 0333c3b44a4..732d98ebe44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java @@ -43,6 +43,16 @@ public abstract class RefreshNodesRequest { return request; } + @Private + @Unstable + public static RefreshNodesRequest newInstance( + DecommissionType decommissionType, Integer timeout) { + RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class); + request.setDecommissionType(decommissionType); + request.setDecommissionTimeout(timeout); + return request; + } + /** * Set the DecommissionType * @@ -56,4 +66,18 @@ public abstract class RefreshNodesRequest { * @return decommissionType */ public abstract DecommissionType getDecommissionType(); -} + + /** + * Set the DecommissionTimeout. + * + * @param timeout graceful decommission timeout in seconds + */ + public abstract void setDecommissionTimeout(Integer timeout); + + /** + * Get the DecommissionTimeout. + * + * @return decommissionTimeout + */ + public abstract Integer getDecommissionTimeout(); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index eaf658f5e97..b9f30db46ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -37,6 +37,7 @@ message RefreshQueuesResponseProto { message RefreshNodesRequestProto { optional DecommissionTypeProto decommissionType = 1 [default = NORMAL]; + optional int32 decommissionTimeout = 2; } message RefreshNodesResponseProto { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 4aa3a14d980..fcb9b749e24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -99,10 +99,10 @@ public class RMAdminCLI extends HAAdmin { "properties. \n\t\tResourceManager will reload the " + "mapred-queues configuration file.")) .put("-refreshNodes", - new UsageInfo("[-g [timeout in seconds] -client|server]", + new UsageInfo("[-g|graceful [timeout in seconds] -client|server]", "Refresh the hosts information at the ResourceManager. Here " - + "[-g [timeout in seconds] -client|server] is optional, if we " - + "specify the timeout then ResourceManager will wait for " + + "[-g|graceful [timeout in seconds] -client|server] is optional," + + " if we specify the timeout then ResourceManager will wait for " + "timeout before marking the NodeManager as decommissioned." + " The -client|server indicates if the timeout tracking should" + " be handled by the client or the ResourceManager. The client" @@ -234,21 +234,23 @@ public class RMAdminCLI extends HAAdmin { summary.append("rmadmin is the command to execute YARN administrative " + "commands.\n"); summary.append("The full syntax is: \n\n" + - "yarn rmadmin" + - " [-refreshQueues]" + - " [-refreshNodes [-g [timeout in seconds] -client|server]]" + - " [-refreshNodesResources]" + - " [-refreshSuperUserGroupsConfiguration]" + - " [-refreshUserToGroupsMappings]" + - " [-refreshAdminAcls]" + - " [-refreshServiceAcl]" + - " [-getGroup [username]]" + - " [-addToClusterNodeLabels <\"label1(exclusive=true)," - + "label2(exclusive=false),label3\">]" + - " [-removeFromClusterNodeLabels ]" + - " [-replaceLabelsOnNode <\"node1[:port]=label1,label2 node2[:port]=label1\">]" + - " [-directlyAccessNodeLabelStore]" + - " [-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout])"); + "yarn rmadmin" + + " [-refreshQueues]" + + " [-refreshNodes [-g|graceful [timeout in seconds] -client|server]]" + + " [-refreshNodesResources]" + + " [-refreshSuperUserGroupsConfiguration]" + + " [-refreshUserToGroupsMappings]" + + " [-refreshAdminAcls]" + + " [-refreshServiceAcl]" + + " [-getGroup [username]]" + + " [-addToClusterNodeLabels <\"label1(exclusive=true)," + + "label2(exclusive=false),label3\">]" + + " [-removeFromClusterNodeLabels ]" + + " [-replaceLabelsOnNode <\"node1[:port]=label1,label2" + + " node2[:port]=label1\">]" + + " [-directlyAccessNodeLabelStore]" + + " [-updateNodeResource [NodeID] [MemSize] [vCores]" + + " ([OvercommitTimeout])"); if (isHAEnabled) { appendHAUsage(summary); } @@ -309,33 +311,40 @@ public class RMAdminCLI extends HAAdmin { return 0; } - private int refreshNodes() throws IOException, YarnException { + private int refreshNodes(boolean graceful) throws IOException, YarnException { // Refresh the nodes ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); - RefreshNodesRequest request = RefreshNodesRequest - .newInstance(DecommissionType.NORMAL); + RefreshNodesRequest request = RefreshNodesRequest.newInstance( + graceful? DecommissionType.GRACEFUL : DecommissionType.NORMAL); adminProtocol.refreshNodes(request); return 0; } - private int refreshNodes(long timeout, String trackingMode) + private int refreshNodes(int timeout, String trackingMode) throws IOException, YarnException { - if (!"client".equals(trackingMode)) { - throw new UnsupportedOperationException( - "Only client tracking mode is currently supported."); - } + boolean serverTracking = !"client".equals(trackingMode); // Graceful decommissioning with timeout ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshNodesRequest gracefulRequest = RefreshNodesRequest - .newInstance(DecommissionType.GRACEFUL); + .newInstance(DecommissionType.GRACEFUL, timeout); adminProtocol.refreshNodes(gracefulRequest); + if (serverTracking) { + return 0; + } CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest = recordFactory .newRecordInstance(CheckForDecommissioningNodesRequest.class); long waitingTime; boolean nodesDecommissioning = true; + // As RM enforces timeout automatically, client usually don't need + // to forcefully decommission nodes upon timeout. + // Here we let the client waits a small additional seconds so to avoid + // unnecessary double decommission. + final int gracePeriod = 5; // timeout=-1 means wait for all the nodes to be gracefully // decommissioned - for (waitingTime = 0; waitingTime < timeout || timeout == -1; waitingTime++) { + for (waitingTime = 0; + timeout == -1 || (timeout >= 0 && waitingTime < timeout + gracePeriod); + waitingTime++) { // wait for one second to check nodes decommissioning status try { Thread.sleep(1000); @@ -380,6 +389,10 @@ public class RMAdminCLI extends HAAdmin { return 0; } + private int refreshNodes() throws IOException, YarnException { + return refreshNodes(false); + } + private int refreshUserToGroupsMappings() throws IOException, YarnException { // Refresh the user-to-groups mappings @@ -725,33 +738,12 @@ public class RMAdminCLI extends HAAdmin { return exitCode; } } - + try { if ("-refreshQueues".equals(cmd)) { exitCode = refreshQueues(); } else if ("-refreshNodes".equals(cmd)) { - if (args.length == 1) { - exitCode = refreshNodes(); - } else if (args.length == 3 || args.length == 4) { - // if the graceful timeout specified - if ("-g".equals(args[1])) { - long timeout = -1; - String trackingMode; - if (args.length == 4) { - timeout = validateTimeout(args[2]); - trackingMode = validateTrackingMode(args[3]); - } else { - trackingMode = validateTrackingMode(args[2]); - } - exitCode = refreshNodes(timeout, trackingMode); - } else { - printUsage(cmd, isHAEnabled); - return -1; - } - } else { - printUsage(cmd, isHAEnabled); - return -1; - } + exitCode = handleRefreshNodes(args, cmd, isHAEnabled); } else if ("-refreshNodesResources".equals(cmd)) { exitCode = refreshNodesResources(); } else if ("-refreshUserToGroupsMappings".equals(cmd)) { @@ -768,22 +760,7 @@ public class RMAdminCLI extends HAAdmin { String[] usernames = Arrays.copyOfRange(args, i, args.length); exitCode = getGroups(usernames); } else if ("-updateNodeResource".equals(cmd)) { - if (args.length < 4 || args.length > 5) { - System.err.println("Number of parameters specified for " + - "updateNodeResource is wrong."); - printUsage(cmd, isHAEnabled); - exitCode = -1; - } else { - String nodeID = args[i++]; - String memSize = args[i++]; - String cores = args[i++]; - int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT; - if (i == args.length - 1) { - overCommitTimeout = Integer.parseInt(args[i]); - } - exitCode = updateNodeResource(nodeID, Integer.parseInt(memSize), - Integer.parseInt(cores), overCommitTimeout); - } + exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled); } else if ("-addToClusterNodeLabels".equals(cmd)) { if (i >= args.length) { System.err.println(NO_LABEL_ERR_MSG); @@ -843,10 +820,59 @@ public class RMAdminCLI extends HAAdmin { return exitCode; } - private long validateTimeout(String strTimeout) { - long timeout; + // A helper method to reduce the number of lines of run() + private int handleRefreshNodes(String[] args, String cmd, boolean isHAEnabled) + throws IOException, YarnException { + if (args.length == 1) { + return refreshNodes(); + } else if (args.length == 3 || args.length == 4) { + // if the graceful timeout specified + if ("-g".equals(args[1]) || "-graceful".equals(args[1])) { + int timeout = -1; + String trackingMode; + if (args.length == 4) { + timeout = validateTimeout(args[2]); + trackingMode = validateTrackingMode(args[3]); + } else { + trackingMode = validateTrackingMode(args[2]); + } + return refreshNodes(timeout, trackingMode); + } else { + printUsage(cmd, isHAEnabled); + return -1; + } + } else { + printUsage(cmd, isHAEnabled); + return -1; + } + } + + private int handleUpdateNodeResource( + String[] args, String cmd, boolean isHAEnabled) + throws NumberFormatException, IOException, YarnException { + int i = 1; + if (args.length < 4 || args.length > 5) { + System.err.println("Number of parameters specified for " + + "updateNodeResource is wrong."); + printUsage(cmd, isHAEnabled); + return -1; + } else { + String nodeID = args[i++]; + String memSize = args[i++]; + String cores = args[i++]; + int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT; + if (i == args.length - 1) { + overCommitTimeout = Integer.parseInt(args[i]); + } + return updateNodeResource(nodeID, Integer.parseInt(memSize), + Integer.parseInt(cores), overCommitTimeout); + } + } + + private int validateTimeout(String strTimeout) { + int timeout; try { - timeout = Long.parseLong(strTimeout); + timeout = Integer.parseInt(strTimeout); } catch (NumberFormatException ex) { throw new IllegalArgumentException(INVALID_TIMEOUT_ERR_MSG + strTimeout); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java index d3161ba9579..60c7eac00d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java @@ -267,7 +267,7 @@ public class TestRMAdminCLI { CheckForDecommissioningNodesRequest.class))).thenReturn(response); assertEquals(0, rmAdminCLI.run(args)); verify(admin).refreshNodes( - RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL)); + RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, 1)); verify(admin, never()).refreshNodes( RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL)); } @@ -327,7 +327,7 @@ public class TestRMAdminCLI { }); assertEquals(0, rmAdminCLI.run(args)); verify(admin, atLeastOnce()).refreshNodes( - RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL)); + RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, -1)); verify(admin, never()).refreshNodes( RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL)); } @@ -346,10 +346,6 @@ public class TestRMAdminCLI { String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"}; assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs)); - // server tracking mode - String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"}; - assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs)); - // invalid tracking mode String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"}; assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs)); @@ -465,8 +461,9 @@ public class TestRMAdminCLI { assertTrue(dataOut .toString() .contains( - "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " + - "seconds] -client|server]] [-refreshNodesResources] [-refresh" + + "yarn rmadmin [-refreshQueues] [-refreshNodes "+ + "[-g|graceful [timeout in seconds] -client|server]] " + + "[-refreshNodesResources] [-refresh" + "SuperUserGroupsConfiguration] [-refreshUserToGroupsMappings] " + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " + "[username]] [-addToClusterNodeLabels " + @@ -485,7 +482,8 @@ public class TestRMAdminCLI { assertTrue(dataOut .toString() .contains( - "-refreshNodes [-g [timeout in seconds] -client|server]: " + + "-refreshNodes [-g|graceful [timeout in seconds]" + + " -client|server]: " + "Refresh the hosts information at the ResourceManager.")); assertTrue(dataOut .toString() @@ -518,8 +516,8 @@ public class TestRMAdminCLI { testError(new String[] { "-help", "-refreshQueues" }, "Usage: yarn rmadmin [-refreshQueues]", dataErr, 0); testError(new String[] { "-help", "-refreshNodes" }, - "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds] " + - "-client|server]]", dataErr, 0); + "Usage: yarn rmadmin [-refreshNodes [-g|graceful " + + "[timeout in seconds] -client|server]]", dataErr, 0); testError(new String[] { "-help", "-refreshNodesResources" }, "Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0); testError(new String[] { "-help", "-refreshUserToGroupsMappings" }, @@ -558,8 +556,8 @@ public class TestRMAdminCLI { assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); oldOutPrintStream.println(dataOut); String expectedHelpMsg = - "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " - + "seconds] -client|server]] " + "yarn rmadmin [-refreshQueues] [-refreshNodes [-g|graceful " + + "[timeout in seconds] -client|server]] " + "[-refreshNodesResources] [-refreshSuperUserGroupsConfiguration] " + "[-refreshUserToGroupsMappings] " + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java index 05f323011be..c03a569611a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java @@ -31,7 +31,6 @@ import com.google.protobuf.TextFormat; @Private @Unstable public class RefreshNodesRequestPBImpl extends RefreshNodesRequest { - RefreshNodesRequestProto proto = RefreshNodesRequestProto.getDefaultInstance(); RefreshNodesRequestProto.Builder builder = null; boolean viaProto = false; @@ -108,6 +107,22 @@ public class RefreshNodesRequestPBImpl extends RefreshNodesRequest { return convertFromProtoFormat(p.getDecommissionType()); } + @Override + public synchronized void setDecommissionTimeout(Integer timeout) { + maybeInitBuilder(); + if (timeout != null) { + builder.setDecommissionTimeout(timeout); + } else { + builder.clearDecommissionTimeout(); + } + } + + @Override + public synchronized Integer getDecommissionTimeout() { + RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null; + } + private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) { return DecommissionType.valueOf(p.name()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 723dd1aa853..3d2d593d015 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2502,6 +2502,24 @@ 1800000 + + + Timeout in seconds for YARN node graceful decommission. + This is the maximal time to wait for running containers and applications to complete + before transition a DECOMMISSIONING node into DECOMMISSIONED. + + yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs + 3600 + + + + + Timeout in seconds of DecommissioningNodesWatcher internal polling. + + yarn.resourcemanager.decommissioning-nodes-watcher.poll-interval-secs + 20 + + The Node Label script to run. Script output Line starting with "NODE_PARTITION:" will be considered as Node Label Partition. In case of diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 2ec03aafb14..db55264c483 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -447,7 +447,8 @@ public class AdminService extends CompositeService implements rmContext.getNodesListManager().refreshNodes(conf); break; case GRACEFUL: - rmContext.getNodesListManager().refreshNodesGracefully(conf); + rmContext.getNodesListManager().refreshNodesGracefully( + conf, request.getDecommissionTimeout()); break; case FORCEFUL: rmContext.getNodesListManager().refreshNodesForcefully(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java new file mode 100644 index 00000000000..376b5039d90 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java @@ -0,0 +1,439 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.util.MonotonicClock; + +/** + * DecommissioningNodesWatcher is used by ResourceTrackerService to track + * DECOMMISSIONING nodes to decide when, after all running containers on + * the node have completed, will be transitioned into DECOMMISSIONED state + * (NodeManager will be told to shutdown). + * Under MR application, a node, after completes all its containers, + * may still serve it map output data during the duration of the application + * for reducers. A fully graceful mechanism would keep such DECOMMISSIONING + * nodes until all involved applications complete. It could be however + * undesirable under long-running applications scenario where a bunch + * of "idle" nodes would stay around for long period of time. + * + * DecommissioningNodesWatcher balance such concern with a timeout policy --- + * a DECOMMISSIONING node will be DECOMMISSIONED no later than + * DECOMMISSIONING_TIMEOUT regardless of running containers or applications. + * + * To be efficient, DecommissioningNodesWatcher skip tracking application + * containers on a particular node before the node is in DECOMMISSIONING state. + * It only tracks containers once the node is in DECOMMISSIONING state. + * DecommissioningNodesWatcher basically is no cost when no node is + * DECOMMISSIONING. This sacrifices the possibility that the node once + * host containers of an application that is still running + * (the affected map tasks will be rescheduled). + */ +public class DecommissioningNodesWatcher { + private static final Log LOG = + LogFactory.getLog(DecommissioningNodesWatcher.class); + + private final RMContext rmContext; + + // Default timeout value in mills. + // Negative value indicates no timeout. 0 means immediate. + private long defaultTimeoutMs = + 1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT; + + // Once a RMNode is observed in DECOMMISSIONING state, + // All its ContainerStatus update are tracked inside DecomNodeContext. + class DecommissioningNodeContext { + private final NodeId nodeId; + + // Last known NodeState. + private NodeState nodeState; + + // The moment node is observed in DECOMMISSIONING state. + private final long decommissioningStartTime; + + private long lastContainerFinishTime; + + // number of running containers at the moment. + private int numActiveContainers; + + // All applications run on the node at or after decommissioningStartTime. + private Set appIds; + + // First moment the node is observed in DECOMMISSIONED state. + private long decommissionedTime; + + // Timeout in millis for this decommissioning node. + // This value could be dynamically updated with new value from RMNode. + private long timeoutMs; + + private long lastUpdateTime; + + public DecommissioningNodeContext(NodeId nodeId) { + this.nodeId = nodeId; + this.appIds = new HashSet(); + this.decommissioningStartTime = mclock.getTime(); + this.timeoutMs = defaultTimeoutMs; + } + + void updateTimeout(Integer timeoutSec) { + this.timeoutMs = (timeoutSec == null)? + defaultTimeoutMs : (1000L * timeoutSec); + } + } + + // All DECOMMISSIONING nodes to track. + private HashMap decomNodes = + new HashMap(); + + private Timer pollTimer; + private MonotonicClock mclock; + + public DecommissioningNodesWatcher(RMContext rmContext) { + this.rmContext = rmContext; + pollTimer = new Timer(true); + mclock = new MonotonicClock(); + } + + public void init(Configuration conf) { + readDecommissioningTimeout(conf); + int v = conf.getInt( + YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL, + YarnConfiguration + .DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL); + pollTimer.schedule(new PollTimerTask(rmContext), 0, (1000L * v)); + } + + /** + * Update rmNode decommissioning status based on NodeStatus. + * @param rmNode The node + * @param remoteNodeStatus latest NodeStatus + */ + public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) { + DecommissioningNodeContext context = decomNodes.get(rmNode.getNodeID()); + long now = mclock.getTime(); + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + if (context == null) { + return; + } + context.nodeState = rmNode.getState(); + // keep DECOMMISSIONED node for a while for status log, so that such + // host will appear as DECOMMISSIONED instead of quietly disappears. + if (context.decommissionedTime == 0) { + context.decommissionedTime = now; + } else if (now - context.decommissionedTime > 60000L) { + decomNodes.remove(rmNode.getNodeID()); + } + } else if (rmNode.getState() == NodeState.DECOMMISSIONING) { + if (context == null) { + context = new DecommissioningNodeContext(rmNode.getNodeID()); + decomNodes.put(rmNode.getNodeID(), context); + context.nodeState = rmNode.getState(); + context.decommissionedTime = 0; + } + context.updateTimeout(rmNode.getDecommissioningTimeout()); + context.lastUpdateTime = now; + + if (remoteNodeStatus.getKeepAliveApplications() != null) { + context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications()); + } + + // Count number of active containers. + int numActiveContainers = 0; + for (ContainerStatus cs : remoteNodeStatus.getContainersStatuses()) { + ContainerState newState = cs.getState(); + if (newState == ContainerState.RUNNING || + newState == ContainerState.NEW) { + numActiveContainers++; + } + context.numActiveContainers = numActiveContainers; + ApplicationId aid = cs.getContainerId() + .getApplicationAttemptId().getApplicationId(); + if (!context.appIds.contains(aid)) { + context.appIds.add(aid); + } + } + + context.numActiveContainers = numActiveContainers; + + // maintain lastContainerFinishTime. + if (context.numActiveContainers == 0 && + context.lastContainerFinishTime == 0) { + context.lastContainerFinishTime = now; + } + } else { + // remove node in other states + if (context != null) { + decomNodes.remove(rmNode.getNodeID()); + } + } + } + + public synchronized void remove(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if (context != null) { + LOG.info("remove " + nodeId + " in " + context.nodeState); + decomNodes.remove(nodeId); + } + } + + /** + * Status about a specific decommissioning node. + * + */ + public enum DecommissioningNodeStatus { + // Node is not in DECOMMISSIONING state. + NONE, + + // wait for running containers to complete + WAIT_CONTAINER, + + // wait for running application to complete (after all containers complete); + WAIT_APP, + + // Timeout waiting for either containers or applications to complete. + TIMEOUT, + + // nothing to wait, ready to be decommissioned + READY, + + // The node has already been decommissioned + DECOMMISSIONED, + } + + public boolean checkReadyToBeDecommissioned(NodeId nodeId) { + DecommissioningNodeStatus s = checkDecommissioningStatus(nodeId); + return (s == DecommissioningNodeStatus.READY || + s == DecommissioningNodeStatus.TIMEOUT); + } + + public DecommissioningNodeStatus checkDecommissioningStatus(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if (context == null) { + return DecommissioningNodeStatus.NONE; + } + + if (context.nodeState == NodeState.DECOMMISSIONED) { + return DecommissioningNodeStatus.DECOMMISSIONED; + } + + long waitTime = mclock.getTime() - context.decommissioningStartTime; + if (context.numActiveContainers > 0) { + return (context.timeoutMs < 0 || waitTime < context.timeoutMs)? + DecommissioningNodeStatus.WAIT_CONTAINER : + DecommissioningNodeStatus.TIMEOUT; + } + + removeCompletedApps(context); + if (context.appIds.size() == 0) { + return DecommissioningNodeStatus.READY; + } else { + return (context.timeoutMs < 0 || waitTime < context.timeoutMs)? + DecommissioningNodeStatus.WAIT_APP : + DecommissioningNodeStatus.TIMEOUT; + } + } + + /** + * PollTimerTask periodically: + * 1. log status of all DECOMMISSIONING nodes; + * 2. identify and taken care of stale DECOMMISSIONING nodes + * (for example, node already terminated). + */ + class PollTimerTask extends TimerTask { + private final RMContext rmContext; + + public PollTimerTask(RMContext rmContext) { + this.rmContext = rmContext; + } + + public void run() { + logDecommissioningNodesStatus(); + long now = mclock.getTime(); + Set staleNodes = new HashSet(); + + for (Iterator> it = + decomNodes.entrySet().iterator(); it.hasNext();) { + Map.Entry e = it.next(); + DecommissioningNodeContext d = e.getValue(); + // Skip node recently updated (NM usually updates every second). + if (now - d.lastUpdateTime < 5000L) { + continue; + } + // Remove stale non-DECOMMISSIONING node + if (d.nodeState != NodeState.DECOMMISSIONING) { + LOG.debug("remove " + d.nodeState + " " + d.nodeId); + it.remove(); + continue; + } else if (now - d.lastUpdateTime > 60000L) { + // Node DECOMMISSIONED could become stale, remove as necessary. + RMNode rmNode = getRmNode(d.nodeId); + if (rmNode != null && + rmNode.getState() == NodeState.DECOMMISSIONED) { + LOG.debug("remove " + rmNode.getState() + " " + d.nodeId); + it.remove(); + continue; + } + } + if (d.timeoutMs >= 0 && + d.decommissioningStartTime + d.timeoutMs < now) { + staleNodes.add(d.nodeId); + LOG.debug("Identified stale and timeout node " + d.nodeId); + } + } + + for (NodeId nodeId : staleNodes) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null || rmNode.getState() != NodeState.DECOMMISSIONING) { + remove(nodeId); + continue; + } + if (rmNode.getState() == NodeState.DECOMMISSIONING && + checkReadyToBeDecommissioned(rmNode.getNodeID())) { + LOG.info("DECOMMISSIONING " + nodeId + " timeout"); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } + } + } + + private RMNode getRmNode(NodeId nodeId) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null) { + rmNode = this.rmContext.getInactiveRMNodes().get(nodeId); + } + return rmNode; + } + + private void removeCompletedApps(DecommissioningNodeContext context) { + Iterator it = context.appIds.iterator(); + while (it.hasNext()) { + ApplicationId appId = it.next(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.debug("Consider non-existing app " + appId + " as completed"); + it.remove(); + continue; + } + if (rmApp.getState() == RMAppState.FINISHED || + rmApp.getState() == RMAppState.FAILED || + rmApp.getState() == RMAppState.KILLED) { + LOG.debug("Remove " + rmApp.getState() + " app " + appId); + it.remove(); + } + } + } + + // Time in second to be decommissioned. + private int getTimeoutInSec(DecommissioningNodeContext context) { + if (context.nodeState == NodeState.DECOMMISSIONED) { + return 0; + } else if (context.nodeState != NodeState.DECOMMISSIONING) { + return -1; + } + if (context.appIds.size() == 0 && context.numActiveContainers == 0) { + return 0; + } + // negative timeout value means no timeout (infinite timeout). + if (context.timeoutMs < 0) { + return -1; + } + + long now = mclock.getTime(); + long timeout = context.decommissioningStartTime + context.timeoutMs - now; + return Math.max(0, (int)(timeout / 1000)); + } + + private void logDecommissioningNodesStatus() { + if (!LOG.isDebugEnabled() || decomNodes.size() == 0) { + return; + } + StringBuilder sb = new StringBuilder(); + long now = mclock.getTime(); + for (DecommissioningNodeContext d : decomNodes.values()) { + DecommissioningNodeStatus s = checkDecommissioningStatus(d.nodeId); + sb.append(String.format( + "%n %-34s %4ds fresh:%3ds containers:%2d %14s", + d.nodeId.getHost(), + (now - d.decommissioningStartTime) / 1000, + (now - d.lastUpdateTime) / 1000, + d.numActiveContainers, + s)); + if (s == DecommissioningNodeStatus.WAIT_APP || + s == DecommissioningNodeStatus.WAIT_CONTAINER) { + sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d))); + } + for (ApplicationId aid : d.appIds) { + sb.append("\n " + aid); + RMApp rmApp = rmContext.getRMApps().get(aid); + if (rmApp != null) { + sb.append(String.format( + " %s %9s %5.2f%% %5ds", + rmApp.getState(), + (rmApp.getApplicationType() == null)? + "" : rmApp.getApplicationType(), + 100.0 * rmApp.getProgress(), + (mclock.getTime() - rmApp.getStartTime()) / 1000)); + } + } + } + LOG.info("Decommissioning Nodes: " + sb.toString()); + } + + // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml. + // This enables DecommissioningNodesWatcher to pick up new value + // without ResourceManager restart. + private void readDecommissioningTimeout(Configuration conf) { + try { + if (conf == null) { + conf = new YarnConfiguration(); + } + int v = conf.getInt( + YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, + YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); + if (defaultTimeoutMs != 1000L * v) { + defaultTimeoutMs = 1000L * v; + LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs); + } + } catch (Exception e) { + LOG.info("Error readDecommissioningTimeout ", e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 79373832e8c..99413bc8c68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -19,14 +19,18 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.Map; -import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +41,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -47,14 +52,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; - -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings("unchecked") public class NodesListManager extends CompositeService implements EventHandler { @@ -178,10 +184,11 @@ public class NodesListManager extends CompositeService implements if (!LOG.isDebugEnabled()) { return; } - - LOG.debug("hostsReader: in=" + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + + LOG.debug("hostsReader: in=" + + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" + - conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)); Set hostsList = new HashSet(); @@ -196,23 +203,19 @@ public class NodesListManager extends CompositeService implements } } - public void refreshNodes(Configuration yarnConf) throws IOException, - YarnException { - refreshHostsReader(yarnConf); - - for (NodeId nodeId: rmContext.getRMNodes().keySet()) { - if (!isValidNode(nodeId.getHost())) { - RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ? - RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, nodeEventType)); - } - } - updateInactiveNodes(); + public void refreshNodes(Configuration yarnConf) + throws IOException, YarnException { + refreshNodes(yarnConf, false); } - private void refreshHostsReader(Configuration yarnConf) throws IOException, - YarnException { + public void refreshNodes(Configuration yarnConf, boolean graceful) + throws IOException, YarnException { + refreshHostsReader(yarnConf, graceful, null); + } + + private void refreshHostsReader( + Configuration yarnConf, boolean graceful, Integer timeout) + throws IOException, YarnException { if (null == yarnConf) { yarnConf = new YarnConfiguration(); } @@ -222,8 +225,16 @@ public class NodesListManager extends CompositeService implements excludesFile = yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); + LOG.info("refreshNodes excludesFile " + excludesFile); hostsReader.refresh(includesFile, excludesFile); printConfiguredHosts(); + + LOG.info("hostsReader include:{" + + StringUtils.join(",", hostsReader.getHosts()) + + "} exclude:{" + + StringUtils.join(",", hostsReader.getExcludedHosts()) + "}"); + + handleExcludeNodeList(graceful, timeout); } private void setDecomissionedNMs() { @@ -237,6 +248,86 @@ public class NodesListManager extends CompositeService implements } } + // Handle excluded nodes based on following rules: + // Recommission DECOMMISSIONED or DECOMMISSIONING nodes no longer excluded; + // Gracefully decommission excluded nodes that are not already + // DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes + // that are already DECOMMISSIONED or DECOMMISSIONING. + private void handleExcludeNodeList(boolean graceful, Integer timeout) { + // DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned. + List nodesToRecom = new ArrayList(); + + // Nodes need to be decommissioned (graceful or forceful); + List nodesToDecom = new ArrayList(); + + Set includes = new HashSet(); + Map excludes = new HashMap(); + hostsReader.getHostDetails(includes, excludes); + + for (RMNode n : this.rmContext.getRMNodes().values()) { + NodeState s = n.getState(); + // An invalid node (either due to explicit exclude or not include) + // should be excluded. + boolean isExcluded = !isValidNode( + n.getHostName(), includes, excludes.keySet()); + String nodeStr = "node " + n.getNodeID() + " with state " + s; + if (!isExcluded) { + // Note that no action is needed for DECOMMISSIONED node. + if (s == NodeState.DECOMMISSIONING) { + LOG.info("Recommission " + nodeStr); + nodesToRecom.add(n); + } + // Otherwise no-action needed. + } else { + // exclude is true. + if (graceful) { + // Use per node timeout if exist otherwise the request timeout. + Integer timeoutToUse = (excludes.get(n.getHostName()) != null)? + excludes.get(n.getHostName()) : timeout; + if (s != NodeState.DECOMMISSIONED && + s != NodeState.DECOMMISSIONING) { + LOG.info("Gracefully decommission " + nodeStr); + nodesToDecom.add(n); + } else if (s == NodeState.DECOMMISSIONING && + !Objects.equals(n.getDecommissioningTimeout(), + timeoutToUse)) { + LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse); + nodesToDecom.add(n); + } else { + LOG.info("No action for " + nodeStr); + } + } else { + if (s != NodeState.DECOMMISSIONED) { + LOG.info("Forcefully decommission " + nodeStr); + nodesToDecom.add(n); + } + } + } + } + + for (RMNode n : nodesToRecom) { + RMNodeEvent e = new RMNodeEvent( + n.getNodeID(), RMNodeEventType.RECOMMISSION); + this.rmContext.getDispatcher().getEventHandler().handle(e); + } + + for (RMNode n : nodesToDecom) { + RMNodeEvent e; + if (graceful) { + Integer timeoutToUse = (excludes.get(n.getHostName()) != null)? + excludes.get(n.getHostName()) : timeout; + e = new RMNodeDecommissioningEvent(n.getNodeID(), timeoutToUse); + } else { + RMNodeEventType eventType = isUntrackedNode(n.getHostName())? + RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; + e = new RMNodeEvent(n.getNodeID(), eventType); + } + this.rmContext.getDispatcher().getEventHandler().handle(e); + } + + updateInactiveNodes(); + } + @VisibleForTesting public int getNodeRemovalCheckInterval() { return nodeRemovalCheckInterval; @@ -360,11 +451,15 @@ public class NodesListManager extends CompositeService implements } public boolean isValidNode(String hostName) { - String ip = resolver.resolve(hostName); Set hostsList = new HashSet(); Set excludeList = new HashSet(); hostsReader.getHostDetails(hostsList, excludeList); + return isValidNode(hostName, hostsList, excludeList); + } + private boolean isValidNode( + String hostName, Set hostsList, Set excludeList) { + String ip = resolver.resolve(hostName); return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList .contains(ip)) && !(excludeList.contains(hostName) || excludeList.contains(ip)); @@ -478,29 +573,14 @@ public class NodesListManager extends CompositeService implements /** * Refresh the nodes gracefully * - * @param conf + * @param yarnConf + * @param timeout decommission timeout, null means default timeout. * @throws IOException * @throws YarnException */ - public void refreshNodesGracefully(Configuration conf) throws IOException, - YarnException { - refreshHostsReader(conf); - for (Entry entry : rmContext.getRMNodes().entrySet()) { - NodeId nodeId = entry.getKey(); - if (!isValidNode(nodeId.getHost())) { - RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ? - RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION; - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, nodeEventType)); - } else { - // Recommissioning the nodes - if (entry.getValue().getState() == NodeState.DECOMMISSIONING) { - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION)); - } - } - } - updateInactiveNodes(); + public void refreshNodesGracefully(Configuration yarnConf, Integer timeout) + throws IOException, YarnException { + refreshHostsReader(yarnConf, true, timeout); } /** @@ -596,4 +676,4 @@ public class NodesListManager extends CompositeService implements this.host = hst; } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 9b9b02e0c3b..5e9827a46d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -70,7 +70,7 @@ public class RMServerUtils { public static List queryRMNodes(RMContext context, EnumSet acceptedStates) { - // nodes contains nodes that are NEW, RUNNING OR UNHEALTHY + // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING. ArrayList results = new ArrayList(); if (acceptedStates.contains(NodeState.NEW) || acceptedStates.contains(NodeState.RUNNING) || diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f7916f82483..ede83263d3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -111,6 +111,8 @@ public class ResourceTrackerService extends AbstractService implements private int minAllocMb; private int minAllocVcores; + private DecommissioningNodesWatcher decommissioningWatcher; + private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; private DynamicResourceConfiguration drConf; @@ -129,6 +131,7 @@ public class ResourceTrackerService extends AbstractService implements ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); + this.decommissioningWatcher = new DecommissioningNodesWatcher(rmContext); } @Override @@ -168,6 +171,7 @@ public class ResourceTrackerService extends AbstractService implements } loadDynamicResourceConfiguration(conf); + decommissioningWatcher.init(conf); super.serviceInit(conf); } @@ -492,6 +496,7 @@ public class ResourceTrackerService extends AbstractService implements // Send ping this.nmLivelinessMonitor.receivedPing(nodeId); + this.decommissioningWatcher.update(rmNode, remoteNodeStatus); // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); @@ -514,6 +519,20 @@ public class ResourceTrackerService extends AbstractService implements message); } + // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED. + if (rmNode.getState() == NodeState.DECOMMISSIONING && + decommissioningWatcher.checkReadyToBeDecommissioned( + rmNode.getNodeID())) { + String message = "DECOMMISSIONING " + nodeId + + " is ready to be decommissioned"; + LOG.info(message); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + this.nmLivelinessMonitor.unregister(nodeId); + return YarnServerBuilderUtils.newNodeHeartbeatResponse( + NodeAction.SHUTDOWN, message); + } + // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index b71c3a943cc..0c46c2aca39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -175,5 +175,12 @@ public interface RMNode { long getUntrackedTimeStamp(); void setUntrackedTimeStamp(long timeStamp); + + /** + * Optional decommissioning timeout in second + * (null indicates default timeout). + * @return the decommissioning timeout in second. + */ + Integer getDecommissioningTimeout(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java new file mode 100644 index 00000000000..9955e9ea4fd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.NodeId; + +/** + * RMNode Decommissioning Event. + * + */ +public class RMNodeDecommissioningEvent extends RMNodeEvent { + // Optional decommissioning timeout in second. + private final Integer decommissioningTimeout; + + // Create instance with optional timeout + // (timeout could be null which means use default). + public RMNodeDecommissioningEvent(NodeId nodeId, Integer timeout) { + super(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION); + this.decommissioningTimeout = timeout; + } + + public Integer getDecommissioningTimeout() { + return this.decommissioningTimeout; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1af0b97a6d3..9eb8d166188 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -124,6 +125,7 @@ public class RMNodeImpl implements RMNode, EventHandler { private String healthReport; private long lastHealthReportTime; private String nodeManagerVersion; + private Integer decommissioningTimeout; private long timeStamp; /* Aggregated resource utilization for the containers. */ @@ -179,7 +181,6 @@ public class RMNodeImpl implements RMNode, EventHandler { NodeState, RMNodeEventType, RMNodeEvent>(NodeState.NEW) - //Transitions from NEW state .addTransition(NodeState.NEW, NodeState.RUNNING, RMNodeEventType.STARTED, new AddNodeTransition()) @@ -265,6 +266,9 @@ public class RMNodeImpl implements RMNode, EventHandler { .addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED, RMNodeEventType.REBOOTING, new DeactivateNodeTransition(NodeState.REBOOTED)) + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new AddContainersToBeRemovedFromNMTransition()) .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) @@ -633,7 +637,7 @@ public class RMNodeImpl implements RMNode, EventHandler { } catch (InvalidStateTransitionException e) { LOG.error("Can't handle this event at current state", e); LOG.error("Invalid event " + event.getType() + - " on Node " + this.nodeId); + " on Node " + this.nodeId + " oldState " + oldState); } if (oldState != getState()) { LOG.info(nodeId + " Node Transitioned from " + oldState + " to " @@ -666,6 +670,9 @@ public class RMNodeImpl implements RMNode, EventHandler { case SHUTDOWN: metrics.decrNumShutdownNMs(); break; + case DECOMMISSIONING: + metrics.decrDecommissioningNMs(); + break; default: LOG.debug("Unexpected previous node state"); } @@ -712,6 +719,9 @@ public class RMNodeImpl implements RMNode, EventHandler { case DECOMMISSIONING: metrics.decrDecommissioningNMs(); break; + case DECOMMISSIONED: + metrics.decrDecommisionedNMs(); + break; case UNHEALTHY: metrics.decrNumUnhealthyNMs(); break; @@ -1087,9 +1097,26 @@ public class RMNodeImpl implements RMNode, EventHandler { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + Integer timeout = null; + if (RMNodeDecommissioningEvent.class.isInstance(event)) { + RMNodeDecommissioningEvent e = ((RMNodeDecommissioningEvent) event); + timeout = e.getDecommissioningTimeout(); + } + // Pick up possible updates on decommissioningTimeout. + if (rmNode.getState() == NodeState.DECOMMISSIONING) { + if (!Objects.equals(rmNode.getDecommissioningTimeout(), timeout)) { + LOG.info("Update " + rmNode.getNodeID() + + " DecommissioningTimeout to be " + timeout); + rmNode.decommissioningTimeout = timeout; + } else { + LOG.info(rmNode.getNodeID() + " is already DECOMMISSIONING"); + } + return; + } LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING."); // Update NM metrics during graceful decommissioning. rmNode.updateMetricsForGracefulDecommission(initState, finalState); + rmNode.decommissioningTimeout = timeout; if (rmNode.originalTotalCapability == null){ rmNode.originalTotalCapability = Resources.clone(rmNode.totalCapability); @@ -1156,24 +1183,6 @@ public class RMNodeImpl implements RMNode, EventHandler { return NodeState.UNHEALTHY; } } - if (isNodeDecommissioning) { - List runningApps = rmNode.getRunningApps(); - - List keepAliveApps = statusEvent.getKeepAliveAppIds(); - - // no running (and keeping alive) app on this node, get it - // decommissioned. - // TODO may need to check no container is being scheduled on this node - // as well. - if ((runningApps == null || runningApps.size() == 0) - && (keepAliveApps == null || keepAliveApps.size() == 0)) { - RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); - return NodeState.DECOMMISSIONED; - } - - // TODO (in YARN-3223) if node in decommissioning, get node resource - // updated if container get finished (keep available resource to be 0) - } rmNode.handleContainerStatus(statusEvent.getContainers()); rmNode.handleReportedIncreasedContainers( @@ -1473,4 +1482,9 @@ public class RMNodeImpl implements RMNode, EventHandler { this.writeLock.unlock(); } } + + @Override + public Integer getDecommissioningTimeout() { + return decommissioningTimeout; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java index 3012d0d0b75..1789e0963df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java @@ -97,7 +97,7 @@ public class ClusterMetricsInfo { this.rebootedNodes = clusterMetrics.getNumRebootedNMs(); this.shutdownNodes = clusterMetrics.getNumShutdownNMs(); this.totalNodes = activeNodes + lostNodes + decommissionedNodes - + rebootedNodes + unhealthyNodes + shutdownNodes; + + rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes; } public int getAppsSubmitted() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index e64d6bbc9e6..eb010ab2195 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -274,6 +274,11 @@ public class MockNodes { public QueuedContainersStatus getQueuedContainersStatus() { return null; } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 5856e595f6f..f84326139e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -709,6 +709,9 @@ public class MockRM extends ResourceManager { public void waitForState(NodeId nodeId, NodeState finalState) throws InterruptedException { RMNode node = getRMContext().getRMNodes().get(nodeId); + if (node == null) { + node = getRMContext().getInactiveRMNodes().get(nodeId); + } Assert.assertNotNull("node shouldn't be null", node); int timeWaiting = 0; while (!finalState.equals(node.getState())) { @@ -722,11 +725,17 @@ public class MockRM extends ResourceManager { timeWaiting += WAIT_MS_PER_LOOP; } - System.out.println("Node State is : " + node.getState()); + System.out.println("Node " + nodeId + " State is : " + node.getState()); Assert.assertEquals("Node state is not correct (timedout)", finalState, node.getState()); } + public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { + RMNodeImpl node = (RMNodeImpl) + getRMContext().getRMNodes().get(nm.getNodeId()); + node.handle(new RMNodeEvent(nm.getNodeId(), event)); + } + public KillApplicationResponse killApp(ApplicationId appId) throws Exception { ApplicationClientProtocol client = getClientRMService(); KillApplicationRequest req = KillApplicationRequest.newInstance(appId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java new file mode 100644 index 00000000000..690de308e23 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +/** + * This class tests DecommissioningNodesWatcher. + */ +public class TestDecommissioningNodesWatcher { + private MockRM rm; + + @Test + public void testDecommissioningNodesWatcher() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "40"); + + rm = new MockRM(conf); + rm.start(); + + DecommissioningNodesWatcher watcher = + new DecommissioningNodesWatcher(rm.getRMContext()); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + NodeId id1 = nm1.getNodeId(); + + rm.waitForState(id1, NodeState.RUNNING); + Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + + // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. + rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION); + rm.waitForState(id1, NodeState.DECOMMISSIONING); + + // Update status with decreasing number of running containers until 0. + watcher.update(node1, createNodeStatus(id1, app, 12)); + watcher.update(node1, createNodeStatus(id1, app, 11)); + Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + + watcher.update(node1, createNodeStatus(id1, app, 1)); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER, + watcher.checkDecommissioningStatus(id1)); + + watcher.update(node1, createNodeStatus(id1, app, 0)); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP, + watcher.checkDecommissioningStatus(id1)); + + // Set app to be FINISHED and verified DecommissioningNodeStatus is READY. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + Assert.assertEquals(DecommissioningNodeStatus.READY, + watcher.checkDecommissioningStatus(id1)); + } + + @After + public void tearDown() { + if (rm != null) { + rm.stop(); + } + } + + private NodeStatus createNodeStatus( + NodeId nodeId, RMApp app, int numRunningContainers) { + return NodeStatus.newInstance( + nodeId, 0, getContainerStatuses(app, numRunningContainers), + new ArrayList(), + NodeHealthStatus.newInstance( + true, "", System.currentTimeMillis() - 1000), + null, null, null); + } + + // Get mocked ContainerStatus for bunch of containers, + // where numRunningContainers are RUNNING. + private List getContainerStatuses( + RMApp app, int numRunningContainers) { + // Total 12 containers + final int total = 12; + numRunningContainers = Math.min(total, numRunningContainers); + List output = new ArrayList(); + for (int i = 0; i < total; i++) { + ContainerState cstate = (i >= numRunningContainers)? + ContainerState.COMPLETE : ContainerState.RUNNING; + output.add(ContainerStatus.newInstance( + ContainerId.newContainerId( + ApplicationAttemptId.newInstance(app.getApplicationId(), i), 1), + cstate, "Dummy", 0)); + } + return output; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 83a7c73cfbf..e82b93cc3a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -254,17 +254,6 @@ public class TestRMNodeTransitions { cm.getNumDecommissioningNMs()); Assert.assertEquals("Decommissioned Nodes", initialDecommissioned, cm.getNumDecommisionedNMs()); - - // Verify node in DECOMMISSIONING will be changed by status update - // without running apps - statusEvent = getMockRMNodeStatusEventWithoutRunningApps(); - node.handle(statusEvent); - Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); - Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); - Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1, - cm.getNumDecommissioningNMs()); - Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1, - cm.getNumDecommisionedNMs()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index eb0d5f13399..22553b186cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -82,6 +83,9 @@ public class TestResourceTrackerService extends NodeLabelTestBase { private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); private final File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); + private final File excludeHostFile = new File(TEMP_DIR + File.separator + + "excludeHostFile.txt"); + private MockRM rm; /** @@ -216,6 +220,109 @@ public class TestResourceTrackerService extends NodeLabelTestBase { checkDecommissionedNMCount(rm, metricCount + 1); } + /** + * Graceful decommission node with no running application. + */ + @Test + public void testGracefulDecommissionNoApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("host3:4433", 5120); + + int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); + + rm.waitForState(nm2.getNodeId(), NodeState.RUNNING); + rm.waitForState(nm3.getNodeId(), NodeState.RUNNING); + + // Graceful decommission both host2 and host3. + writeToHostsFile("host2", "host3"); + rm.getNodesListManager().refreshNodes(conf, true); + + rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); + rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING); + + nodeHeartbeat1 = nm1.nodeHeartbeat(true); + nodeHeartbeat2 = nm2.nodeHeartbeat(true); + nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + checkDecommissionedNMCount(rm, metricCount + 2); + rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED); + rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction()); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction()); + } + + /** + * Graceful decommission node with running application. + */ + @Test + public void testGracefulDecommissionWithApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + MockNM nm2 = rm.registerNode("host2:5678", 20480); + MockNM nm3 = rm.registerNode("host3:4433", 10240); + NodeId id1 = nm1.getNodeId(); + NodeId id3 = nm3.getNodeId(); + rm.waitForState(id1, NodeState.RUNNING); + rm.waitForState(id3, NodeState.RUNNING); + + // Create an app and launch two containers on host1. + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId(); + + // Graceful decommission host1 and host3 + writeToHostsFile("host1", "host3"); + rm.getNodesListManager().refreshNodes(conf, true); + rm.waitForState(id1, NodeState.DECOMMISSIONING); + rm.waitForState(id3, NodeState.DECOMMISSIONING); + + // host1 should be DECOMMISSIONING due to running containers. + // host3 should become DECOMMISSIONED. + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + nm3.nodeHeartbeat(true); + rm.waitForState(id1, NodeState.DECOMMISSIONING); + rm.waitForState(id3, NodeState.DECOMMISSIONED); + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + + // Complete containers on host1. + // Since the app is still RUNNING, expect NodeAction.NORMAL. + NodeHeartbeatResponse nodeHeartbeat1 = + nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction()); + + // Finish the app and verified DECOMMISSIONED. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction()); + rm.waitForState(id1, NodeState.DECOMMISSIONED); + } + /** * Decommissioning using a post-configured include hosts file */ @@ -1139,19 +1246,17 @@ public class TestResourceTrackerService extends NodeLabelTestBase { MockNM nm1 = new MockNM("host1:1234", 5120, resourceTrackerService); RegisterNodeManagerResponse response = nm1.registerNode(); Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); + int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); writeToHostsFile("host2"); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true); Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction()); - int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); - checkShutdownNMCount(rm, shutdownNMsCount); checkDecommissionedNMCount(rm, decommisionedNMsCount); request.setNodeId(nm1.getNodeId()); resourceTrackerService.unRegisterNodeManager(request); - shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); - checkShutdownNMCount(rm, shutdownNMsCount); + checkShutdownNMCount(rm, ++shutdownNMsCount); checkDecommissionedNMCount(rm, decommisionedNMsCount); // 1. Register the Node Manager @@ -1187,8 +1292,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase { nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); - File excludeHostFile = - new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); writeToHostsFile(excludeHostFile, "host1"); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile.getAbsolutePath()); @@ -1214,8 +1317,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase { Assert.assertEquals("The inactiveRMNodes should contain an entry for the" + "decommissioned node", 1, rm1.getRMContext().getInactiveRMNodes().size()); - excludeHostFile = - new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); writeToHostsFile(excludeHostFile, ""); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile.getAbsolutePath()); @@ -1245,8 +1346,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase { nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); //host3 will not register or heartbeat - File excludeHostFile = - new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); writeToHostsFile(excludeHostFile, "host3", "host2"); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile.getAbsolutePath()); @@ -1278,14 +1377,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase { MockNM nm2 = rm.registerNode("host2:5678", 10240); nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); - File excludeHostFile = - new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); writeToHostsFile(excludeHostFile, "host3", "host2"); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile.getAbsolutePath()); writeToHostsFile(hostFile, "host1", "host2"); writeToHostsFile(excludeHostFile, "host1"); - rm.getNodesListManager().refreshNodesGracefully(conf); + rm.getNodesListManager().refreshNodesGracefully(conf, null); rm.drainEvents(); nm1.nodeHeartbeat(true); rm.drainEvents(); @@ -1294,7 +1391,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { .getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState .DECOMMISSIONED); writeToHostsFile(excludeHostFile, ""); - rm.getNodesListManager().refreshNodesGracefully(conf); + rm.getNodesListManager().refreshNodesGracefully(conf, null); rm.drainEvents(); Assert.assertTrue("Node " + nm1.getNodeId().getHost() + " should be Decommissioned", rm.getRMContext() @@ -1304,7 +1401,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { } /** - * Remove a node from all lists and check if its forgotten + * Remove a node from all lists and check if its forgotten. */ @Test public void testNodeRemovalNormally() throws Exception { @@ -1325,7 +1422,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { public void refreshNodesOption(boolean doGraceful, Configuration conf) throws Exception { if (doGraceful) { - rm.getNodesListManager().refreshNodesGracefully(conf); + rm.getNodesListManager().refreshNodesGracefully(conf, null); } else { rm.getNodesListManager().refreshNodes(conf); } @@ -1334,8 +1431,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase { public void testNodeRemovalUtil(boolean doGraceful) throws Exception { Configuration conf = new Configuration(); int timeoutValue = 500; - File excludeHostFile = new File(TEMP_DIR + File.separator + - "excludeHostFile.txt"); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, ""); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, ""); conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, @@ -1369,18 +1464,23 @@ public class TestResourceTrackerService extends NodeLabelTestBase { conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); refreshNodesOption(doGraceful, conf); + if (doGraceful) { + rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); + } nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); rm.drainEvents(); Assert.assertTrue("Node should not be in active node list", !rmContext.getRMNodes().containsKey(nm2.getNodeId())); RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); Assert.assertEquals("Node should be in inactive node list", - rmNode.getState(), NodeState.SHUTDOWN); + rmNode.getState(), + doGraceful? NodeState.DECOMMISSIONED : NodeState.SHUTDOWN); Assert.assertEquals("Active nodes should be 2", metrics.getNumActiveNMs(), 2); - Assert.assertEquals("Shutdown nodes should be 1", - metrics.getNumShutdownNMs(), 1); + Assert.assertEquals("Shutdown nodes should be expected", + metrics.getNumShutdownNMs(), doGraceful? 0 : 1); int nodeRemovalTimeout = conf.getInt( @@ -1405,14 +1505,18 @@ public class TestResourceTrackerService extends NodeLabelTestBase { rm.drainEvents(); writeToHostsFile("host1", ip); refreshNodesOption(doGraceful, conf); + rm.waitForState(nm2.getNodeId(), + doGraceful? NodeState.DECOMMISSIONING : NodeState.SHUTDOWN); + nm2.nodeHeartbeat(true); rm.drainEvents(); rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); Assert.assertEquals("Node should be shutdown", - rmNode.getState(), NodeState.SHUTDOWN); + rmNode.getState(), + doGraceful? NodeState.DECOMMISSIONED : NodeState.SHUTDOWN); Assert.assertEquals("Active nodes should be 2", metrics.getNumActiveNMs(), 2); - Assert.assertEquals("Shutdown nodes should be 1", - metrics.getNumShutdownNMs(), 1); + Assert.assertEquals("Shutdown nodes should be expected", + metrics.getNumShutdownNMs(), doGraceful? 0 : 1); //add back the node before timer expires latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS); @@ -1456,6 +1560,20 @@ public class TestResourceTrackerService extends NodeLabelTestBase { } //Test decommed/ing node that transitions to untracked,timer should remove + testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3, + maxThreadSleeptime, doGraceful); + rm.stop(); + } + + // A helper method used by testNodeRemovalUtil to avoid exceeding + // max allowed length. + private void testNodeRemovalUtilDecomToUntracked( + RMContext rmContext, Configuration conf, + MockNM nm1, MockNM nm2, MockNM nm3, + long maxThreadSleeptime, boolean doGraceful) throws Exception { + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + String ip = NetUtils.normalizeHostName("localhost"); + CountDownLatch latch = new CountDownLatch(1); writeToHostsFile("host1", ip, "host2"); writeToHostsFile(excludeHostFile, "host2"); refreshNodesOption(doGraceful, conf); @@ -1463,7 +1581,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { //nm2.nodeHeartbeat(true); nm3.nodeHeartbeat(true); latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); - rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : + RMNode rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : rmContext.getInactiveRMNodes().get(nm2.getNodeId()); Assert.assertNotEquals("Timer for this node was not canceled!", rmNode, null); @@ -1474,6 +1592,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { writeToHostsFile("host1", ip); writeToHostsFile(excludeHostFile, ""); refreshNodesOption(doGraceful, conf); + nm2.nodeHeartbeat(true); latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : rmContext.getInactiveRMNodes().get(nm2.getNodeId()); @@ -1485,16 +1604,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase { metrics.getNumShutdownNMs(), 0); Assert.assertEquals("Active nodes should be 2", metrics.getNumActiveNMs(), 2); - - rm.stop(); } private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception { Configuration conf = new Configuration(); conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 2000); int timeoutValue = 500; - File excludeHostFile = new File(TEMP_DIR + File.separator + - "excludeHostFile.txt"); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, @@ -1527,7 +1642,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { Assert.assertEquals("All 3 nodes should be active", metrics.getNumActiveNMs(), 3); int waitCount = 0; - while(waitCount ++<20){ + while(waitCount++ < 20){ synchronized (this) { wait(200); } @@ -1579,8 +1694,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase { throws Exception { Configuration conf = new Configuration(); int timeoutValue = 500; - File excludeHostFile = new File(TEMP_DIR + File.separator + - "excludeHostFile.txt"); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, @@ -1651,8 +1764,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase { throws Exception { Configuration conf = new Configuration(); int timeoutValue = 500; - File excludeHostFile = new File(TEMP_DIR + File.separator + - "excludeHostFile.txt"); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, @@ -1696,15 +1807,19 @@ public class TestResourceTrackerService extends NodeLabelTestBase { nm2.nodeHeartbeat(false); nm3.nodeHeartbeat(true); rm.drainEvents(); - Assert.assertNotEquals("host2 should be a shutdown NM!", - rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); - Assert.assertEquals("host2 should be a shutdown NM!", - rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(), - NodeState.SHUTDOWN); + if (!doGraceful) { + Assert.assertNotEquals("host2 should be a shutdown NM!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); + Assert.assertEquals("host2 should be a shutdown NM!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(), + NodeState.SHUTDOWN); + } Assert.assertEquals("There should be 2 Active NM!", clusterMetrics.getNumActiveNMs(), 2); - Assert.assertEquals("There should be 1 Shutdown NM!", - clusterMetrics.getNumShutdownNMs(), 1); + if (!doGraceful) { + Assert.assertEquals("There should be 1 Shutdown NM!", + clusterMetrics.getNumShutdownNMs(), 1); + } Assert.assertEquals("There should be 0 Unhealthy NM!", clusterMetrics.getUnhealthyNMs(), 0); int nodeRemovalTimeout = @@ -1732,7 +1847,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { } private void writeToHostsFile(String... hosts) throws IOException { - writeToHostsFile(hostFile, hosts); + writeToHostsFile(hostFile, hosts); } private void writeToHostsFile(File file, String... hosts) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index 2c926d915cb..e7c7e51bf2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -210,8 +210,6 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase { nm1.registerNode(); rm.waitForState(nm1.getNodeId(), NodeState.RUNNING); - rm.getRMContext().getNodesListManager().getHostsReader(). - getExcludedHosts().add("127.0.0.1"); rm.getRMContext().getDispatcher().getEventHandler().handle( new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.GRACEFUL_DECOMMISSION)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md index d4c9ddd0beb..ed9f7b8ae1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md @@ -239,7 +239,7 @@ Usage: | COMMAND\_OPTIONS | Description | |:---- |:---- | | -refreshQueues | Reload the queues' acls, states and scheduler specific properties. ResourceManager will reload the mapred-queues configuration file. | -| -refreshNodes | Refresh the hosts information at the ResourceManager. | +| -refreshNodes [-g|graceful [timeout in seconds] -client|server] | Refresh the hosts information at the ResourceManager. -g option indicates graceful decommission of excluded hosts, in which case, the optional timeout indicates maximal time in seconds ResourceManager should wait before forcefully mark the node as decommissioned. | | -refreshNodesResources | Refresh resources of NodeManagers at the ResourceManager. | | -refreshSuperUserGroupsConfiguration | Refresh superuser proxy groups mappings. | | -refreshUserToGroupsMappings | Refresh user-to-groups mappings. |