YARN-4676. Automatic and Asynchronous Decommissioning Nodes Status Tracking. Contributed by Diniel Zhi.

This commit is contained in:
Junping Du 2016-08-18 07:23:29 -07:00
parent e39d81be80
commit d464483bf7
28 changed files with 1328 additions and 220 deletions

View File

@ -21,16 +21,27 @@ package org.apache.hadoop.util;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
// Keeps track of which datanodes/tasktrackers are allowed to connect to the
// namenode/jobtracker.
@ -38,7 +49,9 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
public class HostsFileReader {
private Set<String> includes;
private Set<String> excludes;
// exclude host list with optional timeout.
// If the value is null, it indicates default timeout.
private Map<String, Integer> excludes;
private String includesFile;
private String excludesFile;
private WriteLock writeLock;
@ -49,7 +62,7 @@ public class HostsFileReader {
public HostsFileReader(String inFile,
String exFile) throws IOException {
includes = new HashSet<String>();
excludes = new HashSet<String>();
excludes = new HashMap<String, Integer>();
includesFile = inFile;
excludesFile = exFile;
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@ -62,7 +75,7 @@ public class HostsFileReader {
public HostsFileReader(String includesFile, InputStream inFileInputStream,
String excludesFile, InputStream exFileInputStream) throws IOException {
includes = new HashSet<String>();
excludes = new HashSet<String>();
excludes = new HashMap<String, Integer>();
this.includesFile = includesFile;
this.excludesFile = excludesFile;
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@ -121,6 +134,73 @@ public class HostsFileReader {
}
}
public static void readFileToMap(String type,
String filename, Map<String, Integer> map) throws IOException {
File file = new File(filename);
FileInputStream fis = new FileInputStream(file);
readFileToMapWithFileInputStream(type, filename, fis, map);
}
public static void readFileToMapWithFileInputStream(String type,
String filename, InputStream inputStream, Map<String, Integer> map)
throws IOException {
// The input file could be either simple text or XML.
boolean xmlInput = filename.toLowerCase().endsWith(".xml");
if (xmlInput) {
readXmlFileToMapWithFileInputStream(type, filename, inputStream, map);
} else {
HashSet<String> nodes = new HashSet<String>();
readFileToSetWithFileInputStream(type, filename, inputStream, nodes);
for (String node : nodes) {
map.put(node, null);
}
}
}
public static void readXmlFileToMapWithFileInputStream(String type,
String filename, InputStream fileInputStream, Map<String, Integer> map)
throws IOException {
Document dom;
DocumentBuilderFactory builder = DocumentBuilderFactory.newInstance();
try {
DocumentBuilder db = builder.newDocumentBuilder();
dom = db.parse(fileInputStream);
// Examples:
// <host><name>host1</name></host>
// <host><name>host2</name><timeout>123</timeout></host>
// <host><name>host3</name><timeout>-1</timeout></host>
// <host><name>host4, host5,host6</name><timeout>1800</timeout></host>
Element doc = dom.getDocumentElement();
NodeList nodes = doc.getElementsByTagName("host");
for (int i = 0; i < nodes.getLength(); i++) {
Node node = nodes.item(i);
if (node.getNodeType() == Node.ELEMENT_NODE) {
Element e= (Element) node;
// Support both single host and comma-separated list of hosts.
String v = readFirstTagValue(e, "name");
String[] hosts = StringUtils.getTrimmedStrings(v);
String str = readFirstTagValue(e, "timeout");
Integer timeout = (str == null)? null : Integer.parseInt(str);
for (String host : hosts) {
map.put(host, timeout);
LOG.info("Adding a node \"" + host + "\" to the list of "
+ type + " hosts from " + filename);
}
}
}
} catch (IOException|SAXException|ParserConfigurationException e) {
LOG.fatal("error parsing " + filename, e);
throw new RuntimeException(e);
} finally {
fileInputStream.close();
}
}
static String readFirstTagValue(Element e, String tag) {
NodeList nodes = e.getElementsByTagName(tag);
return (nodes.getLength() == 0)? null : nodes.item(0).getTextContent();
}
public void refresh(String includeFiles, String excludeFiles)
throws IOException {
LOG.info("Refreshing hosts (include/exclude) list");
@ -129,7 +209,7 @@ public class HostsFileReader {
// update instance variables
updateFileNames(includeFiles, excludeFiles);
Set<String> newIncludes = new HashSet<String>();
Set<String> newExcludes = new HashSet<String>();
Map<String, Integer> newExcludes = new HashMap<String, Integer>();
boolean switchIncludes = false;
boolean switchExcludes = false;
if (includeFiles != null && !includeFiles.isEmpty()) {
@ -137,7 +217,7 @@ public class HostsFileReader {
switchIncludes = true;
}
if (excludeFiles != null && !excludeFiles.isEmpty()) {
readFileToSet("excluded", excludeFiles, newExcludes);
readFileToMap("excluded", excludeFiles, newExcludes);
switchExcludes = true;
}
@ -161,7 +241,7 @@ public class HostsFileReader {
this.writeLock.lock();
try {
Set<String> newIncludes = new HashSet<String>();
Set<String> newExcludes = new HashSet<String>();
Map<String, Integer> newExcludes = new HashMap<String, Integer>();
boolean switchIncludes = false;
boolean switchExcludes = false;
if (inFileInputStream != null) {
@ -170,7 +250,7 @@ public class HostsFileReader {
switchIncludes = true;
}
if (exFileInputStream != null) {
readFileToSetWithFileInputStream("excluded", excludesFile,
readFileToMapWithFileInputStream("excluded", excludesFile,
exFileInputStream, newExcludes);
switchExcludes = true;
}
@ -199,7 +279,7 @@ public class HostsFileReader {
public Set<String> getExcludedHosts() {
this.readLock.lock();
try {
return excludes;
return excludes.keySet();
} finally {
this.readLock.unlock();
}
@ -209,7 +289,18 @@ public class HostsFileReader {
this.readLock.lock();
try {
includes.addAll(this.includes);
excludes.addAll(this.excludes);
excludes.addAll(this.excludes.keySet());
} finally {
this.readLock.unlock();
}
}
public void getHostDetails(Set<String> includeHosts,
Map<String, Integer> excludeHosts) {
this.readLock.lock();
try {
includeHosts.addAll(this.includes);
excludeHosts.putAll(this.excludes);
} finally {
this.readLock.unlock();
}

View File

@ -20,16 +20,19 @@ package org.apache.hadoop.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.*;
import static org.junit.Assert.*;
/*
* Test for HostsFileReader.java
*
*
*/
public class TestHostsFileReader {
@ -39,6 +42,7 @@ public class TestHostsFileReader {
File INCLUDES_FILE = new File(HOSTS_TEST_DIR, "dfs.include");
String excludesFile = HOSTS_TEST_DIR + "/dfs.exclude";
String includesFile = HOSTS_TEST_DIR + "/dfs.include";
private String excludesXmlFile = HOSTS_TEST_DIR + "/dfs.exclude.xml";
@Before
public void setUp() throws Exception {
@ -288,4 +292,62 @@ public class TestHostsFileReader {
assertFalse(hfp.getExcludedHosts().contains("somehost5"));
}
/*
* Test if timeout values are provided in HostFile
*/
@Test
public void testHostFileReaderWithTimeout() throws Exception {
FileWriter efw = new FileWriter(excludesXmlFile);
FileWriter ifw = new FileWriter(includesFile);
efw.write("<?xml version=\"1.0\"?>\n");
efw.write("<!-- yarn.nodes.exclude -->\n");
efw.write("<hosts>\n");
efw.write("<host><name>host1</name></host>\n");
efw.write("<host><name>host2</name><timeout>123</timeout></host>\n");
efw.write("<host><name>host3</name><timeout>-1</timeout></host>\n");
efw.write("<host><name>10000</name></host>\n");
efw.write("<host><name>10001</name><timeout>123</timeout></host>\n");
efw.write("<host><name>10002</name><timeout>-1</timeout></host>\n");
efw.write("<host><name>host4,host5, host6</name>" +
"<timeout>1800</timeout></host>\n");
efw.write("</hosts>\n");
efw.close();
ifw.write("#Hosts-in-DFS\n");
ifw.write(" \n");
ifw.write(" somehost \t somehost2 \n somehost4");
ifw.write(" somehost3 \t # somehost5");
ifw.close();
HostsFileReader hfp = new HostsFileReader(includesFile, excludesXmlFile);
int includesLen = hfp.getHosts().size();
int excludesLen = hfp.getExcludedHosts().size();
assertEquals(4, includesLen);
assertEquals(9, excludesLen);
Set<String> includes = new HashSet<String>();
Map<String, Integer> excludes = new HashMap<String, Integer>();
hfp.getHostDetails(includes, excludes);
assertTrue(excludes.containsKey("host1"));
assertTrue(excludes.containsKey("host2"));
assertTrue(excludes.containsKey("host3"));
assertTrue(excludes.containsKey("10000"));
assertTrue(excludes.containsKey("10001"));
assertTrue(excludes.containsKey("10002"));
assertTrue(excludes.containsKey("host4"));
assertTrue(excludes.containsKey("host5"));
assertTrue(excludes.containsKey("host6"));
assertTrue(excludes.get("host1") == null);
assertTrue(excludes.get("host2") == 123);
assertTrue(excludes.get("host3") == -1);
assertTrue(excludes.get("10000") == null);
assertTrue(excludes.get("10001") == 123);
assertTrue(excludes.get("10002") == -1);
assertTrue(excludes.get("host4") == 1800);
assertTrue(excludes.get("host5") == 1800);
assertTrue(excludes.get("host6") == 1800);
}
}

View File

@ -132,6 +132,7 @@
<item name="Secure Containers" href="hadoop-yarn/hadoop-yarn-site/SecureContainer.html"/>
<item name="Registry" href="hadoop-yarn/hadoop-yarn-site/registry/index.html"/>
<item name="Reservation System" href="hadoop-yarn/hadoop-yarn-site/ReservationSystem.html"/>
<item name="Graceful Decommission" href="hadoop-yarn/hadoop-yarn-site/GracefulDecommission.html"/>
</menu>
<menu name="YARN REST APIs" inherit="top">

View File

@ -213,6 +213,11 @@ public class NodeInfo {
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
@Override
public Integer getDecommissioningTimeout() {
return null;
}
}
public static RMNode newNodeInfo(String rackName, String hostName,

View File

@ -202,4 +202,9 @@ public class RMNodeWrapper implements RMNode {
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
@Override
public Integer getDecommissioningTimeout() {
return null;
}
}

View File

@ -769,6 +769,20 @@ public class YarnConfiguration extends Configuration {
*/
public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser.";
/**
* Timeout in seconds for YARN node graceful decommission.
* This is the maximal time to wait for running containers and applications
* to complete before transition a DECOMMISSIONING node into DECOMMISSIONED.
*/
public static final String RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT =
RM_PREFIX + "nodemanager-graceful-decommission-timeout-secs";
public static final int DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT = 3600;
public static final String RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL =
RM_PREFIX + "decommissioning-nodes-watcher.poll-interval-secs";
public static final int
DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL = 20;
////////////////////////////////
// Node Manager Configs
////////////////////////////////

View File

@ -43,6 +43,16 @@ public abstract class RefreshNodesRequest {
return request;
}
@Private
@Unstable
public static RefreshNodesRequest newInstance(
DecommissionType decommissionType, Integer timeout) {
RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class);
request.setDecommissionType(decommissionType);
request.setDecommissionTimeout(timeout);
return request;
}
/**
* Set the DecommissionType
*
@ -56,4 +66,18 @@ public abstract class RefreshNodesRequest {
* @return decommissionType
*/
public abstract DecommissionType getDecommissionType();
}
/**
* Set the DecommissionTimeout.
*
* @param timeout graceful decommission timeout in seconds
*/
public abstract void setDecommissionTimeout(Integer timeout);
/**
* Get the DecommissionTimeout.
*
* @return decommissionTimeout
*/
public abstract Integer getDecommissionTimeout();
}

View File

@ -37,6 +37,7 @@ message RefreshQueuesResponseProto {
message RefreshNodesRequestProto {
optional DecommissionTypeProto decommissionType = 1 [default = NORMAL];
optional int32 decommissionTimeout = 2;
}
message RefreshNodesResponseProto {
}

View File

@ -99,10 +99,10 @@ public class RMAdminCLI extends HAAdmin {
"properties. \n\t\tResourceManager will reload the " +
"mapred-queues configuration file."))
.put("-refreshNodes",
new UsageInfo("[-g [timeout in seconds] -client|server]",
new UsageInfo("[-g|graceful [timeout in seconds] -client|server]",
"Refresh the hosts information at the ResourceManager. Here "
+ "[-g [timeout in seconds] -client|server] is optional, if we "
+ "specify the timeout then ResourceManager will wait for "
+ "[-g|graceful [timeout in seconds] -client|server] is optional,"
+ " if we specify the timeout then ResourceManager will wait for "
+ "timeout before marking the NodeManager as decommissioned."
+ " The -client|server indicates if the timeout tracking should"
+ " be handled by the client or the ResourceManager. The client"
@ -234,21 +234,23 @@ public class RMAdminCLI extends HAAdmin {
summary.append("rmadmin is the command to execute YARN administrative " +
"commands.\n");
summary.append("The full syntax is: \n\n" +
"yarn rmadmin" +
" [-refreshQueues]" +
" [-refreshNodes [-g [timeout in seconds] -client|server]]" +
" [-refreshNodesResources]" +
" [-refreshSuperUserGroupsConfiguration]" +
" [-refreshUserToGroupsMappings]" +
" [-refreshAdminAcls]" +
" [-refreshServiceAcl]" +
" [-getGroup [username]]" +
" [-addToClusterNodeLabels <\"label1(exclusive=true),"
+ "label2(exclusive=false),label3\">]" +
" [-removeFromClusterNodeLabels <label1,label2,label3>]" +
" [-replaceLabelsOnNode <\"node1[:port]=label1,label2 node2[:port]=label1\">]" +
" [-directlyAccessNodeLabelStore]" +
" [-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout])");
"yarn rmadmin" +
" [-refreshQueues]" +
" [-refreshNodes [-g|graceful [timeout in seconds] -client|server]]" +
" [-refreshNodesResources]" +
" [-refreshSuperUserGroupsConfiguration]" +
" [-refreshUserToGroupsMappings]" +
" [-refreshAdminAcls]" +
" [-refreshServiceAcl]" +
" [-getGroup [username]]" +
" [-addToClusterNodeLabels <\"label1(exclusive=true),"
+ "label2(exclusive=false),label3\">]" +
" [-removeFromClusterNodeLabels <label1,label2,label3>]" +
" [-replaceLabelsOnNode <\"node1[:port]=label1,label2" +
" node2[:port]=label1\">]" +
" [-directlyAccessNodeLabelStore]" +
" [-updateNodeResource [NodeID] [MemSize] [vCores]" +
" ([OvercommitTimeout])");
if (isHAEnabled) {
appendHAUsage(summary);
}
@ -309,33 +311,40 @@ public class RMAdminCLI extends HAAdmin {
return 0;
}
private int refreshNodes() throws IOException, YarnException {
private int refreshNodes(boolean graceful) throws IOException, YarnException {
// Refresh the nodes
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
RefreshNodesRequest request = RefreshNodesRequest
.newInstance(DecommissionType.NORMAL);
RefreshNodesRequest request = RefreshNodesRequest.newInstance(
graceful? DecommissionType.GRACEFUL : DecommissionType.NORMAL);
adminProtocol.refreshNodes(request);
return 0;
}
private int refreshNodes(long timeout, String trackingMode)
private int refreshNodes(int timeout, String trackingMode)
throws IOException, YarnException {
if (!"client".equals(trackingMode)) {
throw new UnsupportedOperationException(
"Only client tracking mode is currently supported.");
}
boolean serverTracking = !"client".equals(trackingMode);
// Graceful decommissioning with timeout
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
RefreshNodesRequest gracefulRequest = RefreshNodesRequest
.newInstance(DecommissionType.GRACEFUL);
.newInstance(DecommissionType.GRACEFUL, timeout);
adminProtocol.refreshNodes(gracefulRequest);
if (serverTracking) {
return 0;
}
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest = recordFactory
.newRecordInstance(CheckForDecommissioningNodesRequest.class);
long waitingTime;
boolean nodesDecommissioning = true;
// As RM enforces timeout automatically, client usually don't need
// to forcefully decommission nodes upon timeout.
// Here we let the client waits a small additional seconds so to avoid
// unnecessary double decommission.
final int gracePeriod = 5;
// timeout=-1 means wait for all the nodes to be gracefully
// decommissioned
for (waitingTime = 0; waitingTime < timeout || timeout == -1; waitingTime++) {
for (waitingTime = 0;
timeout == -1 || (timeout >= 0 && waitingTime < timeout + gracePeriod);
waitingTime++) {
// wait for one second to check nodes decommissioning status
try {
Thread.sleep(1000);
@ -380,6 +389,10 @@ public class RMAdminCLI extends HAAdmin {
return 0;
}
private int refreshNodes() throws IOException, YarnException {
return refreshNodes(false);
}
private int refreshUserToGroupsMappings() throws IOException,
YarnException {
// Refresh the user-to-groups mappings
@ -725,33 +738,12 @@ public class RMAdminCLI extends HAAdmin {
return exitCode;
}
}
try {
if ("-refreshQueues".equals(cmd)) {
exitCode = refreshQueues();
} else if ("-refreshNodes".equals(cmd)) {
if (args.length == 1) {
exitCode = refreshNodes();
} else if (args.length == 3 || args.length == 4) {
// if the graceful timeout specified
if ("-g".equals(args[1])) {
long timeout = -1;
String trackingMode;
if (args.length == 4) {
timeout = validateTimeout(args[2]);
trackingMode = validateTrackingMode(args[3]);
} else {
trackingMode = validateTrackingMode(args[2]);
}
exitCode = refreshNodes(timeout, trackingMode);
} else {
printUsage(cmd, isHAEnabled);
return -1;
}
} else {
printUsage(cmd, isHAEnabled);
return -1;
}
exitCode = handleRefreshNodes(args, cmd, isHAEnabled);
} else if ("-refreshNodesResources".equals(cmd)) {
exitCode = refreshNodesResources();
} else if ("-refreshUserToGroupsMappings".equals(cmd)) {
@ -768,22 +760,7 @@ public class RMAdminCLI extends HAAdmin {
String[] usernames = Arrays.copyOfRange(args, i, args.length);
exitCode = getGroups(usernames);
} else if ("-updateNodeResource".equals(cmd)) {
if (args.length < 4 || args.length > 5) {
System.err.println("Number of parameters specified for " +
"updateNodeResource is wrong.");
printUsage(cmd, isHAEnabled);
exitCode = -1;
} else {
String nodeID = args[i++];
String memSize = args[i++];
String cores = args[i++];
int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
if (i == args.length - 1) {
overCommitTimeout = Integer.parseInt(args[i]);
}
exitCode = updateNodeResource(nodeID, Integer.parseInt(memSize),
Integer.parseInt(cores), overCommitTimeout);
}
exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled);
} else if ("-addToClusterNodeLabels".equals(cmd)) {
if (i >= args.length) {
System.err.println(NO_LABEL_ERR_MSG);
@ -843,10 +820,59 @@ public class RMAdminCLI extends HAAdmin {
return exitCode;
}
private long validateTimeout(String strTimeout) {
long timeout;
// A helper method to reduce the number of lines of run()
private int handleRefreshNodes(String[] args, String cmd, boolean isHAEnabled)
throws IOException, YarnException {
if (args.length == 1) {
return refreshNodes();
} else if (args.length == 3 || args.length == 4) {
// if the graceful timeout specified
if ("-g".equals(args[1]) || "-graceful".equals(args[1])) {
int timeout = -1;
String trackingMode;
if (args.length == 4) {
timeout = validateTimeout(args[2]);
trackingMode = validateTrackingMode(args[3]);
} else {
trackingMode = validateTrackingMode(args[2]);
}
return refreshNodes(timeout, trackingMode);
} else {
printUsage(cmd, isHAEnabled);
return -1;
}
} else {
printUsage(cmd, isHAEnabled);
return -1;
}
}
private int handleUpdateNodeResource(
String[] args, String cmd, boolean isHAEnabled)
throws NumberFormatException, IOException, YarnException {
int i = 1;
if (args.length < 4 || args.length > 5) {
System.err.println("Number of parameters specified for " +
"updateNodeResource is wrong.");
printUsage(cmd, isHAEnabled);
return -1;
} else {
String nodeID = args[i++];
String memSize = args[i++];
String cores = args[i++];
int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
if (i == args.length - 1) {
overCommitTimeout = Integer.parseInt(args[i]);
}
return updateNodeResource(nodeID, Integer.parseInt(memSize),
Integer.parseInt(cores), overCommitTimeout);
}
}
private int validateTimeout(String strTimeout) {
int timeout;
try {
timeout = Long.parseLong(strTimeout);
timeout = Integer.parseInt(strTimeout);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException(INVALID_TIMEOUT_ERR_MSG + strTimeout);
}

View File

@ -267,7 +267,7 @@ public class TestRMAdminCLI {
CheckForDecommissioningNodesRequest.class))).thenReturn(response);
assertEquals(0, rmAdminCLI.run(args));
verify(admin).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, 1));
verify(admin, never()).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
}
@ -327,7 +327,7 @@ public class TestRMAdminCLI {
});
assertEquals(0, rmAdminCLI.run(args));
verify(admin, atLeastOnce()).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, -1));
verify(admin, never()).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
}
@ -346,10 +346,6 @@ public class TestRMAdminCLI {
String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"};
assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs));
// server tracking mode
String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"};
assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs));
// invalid tracking mode
String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"};
assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs));
@ -465,8 +461,9 @@ public class TestRMAdminCLI {
assertTrue(dataOut
.toString()
.contains(
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " +
"seconds] -client|server]] [-refreshNodesResources] [-refresh" +
"yarn rmadmin [-refreshQueues] [-refreshNodes "+
"[-g|graceful [timeout in seconds] -client|server]] " +
"[-refreshNodesResources] [-refresh" +
"SuperUserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " +
"[username]] [-addToClusterNodeLabels " +
@ -485,7 +482,8 @@ public class TestRMAdminCLI {
assertTrue(dataOut
.toString()
.contains(
"-refreshNodes [-g [timeout in seconds] -client|server]: " +
"-refreshNodes [-g|graceful [timeout in seconds]" +
" -client|server]: " +
"Refresh the hosts information at the ResourceManager."));
assertTrue(dataOut
.toString()
@ -518,8 +516,8 @@ public class TestRMAdminCLI {
testError(new String[] { "-help", "-refreshQueues" },
"Usage: yarn rmadmin [-refreshQueues]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodes" },
"Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds] " +
"-client|server]]", dataErr, 0);
"Usage: yarn rmadmin [-refreshNodes [-g|graceful " +
"[timeout in seconds] -client|server]]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodesResources" },
"Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0);
testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
@ -558,8 +556,8 @@ public class TestRMAdminCLI {
assertEquals(0, rmAdminCLIWithHAEnabled.run(args));
oldOutPrintStream.println(dataOut);
String expectedHelpMsg =
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in "
+ "seconds] -client|server]] "
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g|graceful "
+ "[timeout in seconds] -client|server]] "
+ "[-refreshNodesResources] [-refreshSuperUserGroupsConfiguration] "
+ "[-refreshUserToGroupsMappings] "
+ "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"

View File

@ -31,7 +31,6 @@ import com.google.protobuf.TextFormat;
@Private
@Unstable
public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
RefreshNodesRequestProto proto = RefreshNodesRequestProto.getDefaultInstance();
RefreshNodesRequestProto.Builder builder = null;
boolean viaProto = false;
@ -108,6 +107,22 @@ public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
return convertFromProtoFormat(p.getDecommissionType());
}
@Override
public synchronized void setDecommissionTimeout(Integer timeout) {
maybeInitBuilder();
if (timeout != null) {
builder.setDecommissionTimeout(timeout);
} else {
builder.clearDecommissionTimeout();
}
}
@Override
public synchronized Integer getDecommissionTimeout() {
RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null;
}
private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) {
return DecommissionType.valueOf(p.name());
}

View File

@ -2502,6 +2502,24 @@
<value>1800000</value>
</property>
<property>
<description>
Timeout in seconds for YARN node graceful decommission.
This is the maximal time to wait for running containers and applications to complete
before transition a DECOMMISSIONING node into DECOMMISSIONED.
</description>
<name>yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs</name>
<value>3600</value>
</property>
<property>
<description>
Timeout in seconds of DecommissioningNodesWatcher internal polling.
</description>
<name>yarn.resourcemanager.decommissioning-nodes-watcher.poll-interval-secs</name>
<value>20</value>
</property>
<property>
<description>The Node Label script to run. Script output Line starting with
"NODE_PARTITION:" will be considered as Node Label Partition. In case of

View File

@ -447,7 +447,8 @@ public class AdminService extends CompositeService implements
rmContext.getNodesListManager().refreshNodes(conf);
break;
case GRACEFUL:
rmContext.getNodesListManager().refreshNodesGracefully(conf);
rmContext.getNodesListManager().refreshNodesGracefully(
conf, request.getDecommissionTimeout());
break;
case FORCEFUL:
rmContext.getNodesListManager().refreshNodesForcefully();

View File

@ -0,0 +1,439 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.util.MonotonicClock;
/**
* DecommissioningNodesWatcher is used by ResourceTrackerService to track
* DECOMMISSIONING nodes to decide when, after all running containers on
* the node have completed, will be transitioned into DECOMMISSIONED state
* (NodeManager will be told to shutdown).
* Under MR application, a node, after completes all its containers,
* may still serve it map output data during the duration of the application
* for reducers. A fully graceful mechanism would keep such DECOMMISSIONING
* nodes until all involved applications complete. It could be however
* undesirable under long-running applications scenario where a bunch
* of "idle" nodes would stay around for long period of time.
*
* DecommissioningNodesWatcher balance such concern with a timeout policy ---
* a DECOMMISSIONING node will be DECOMMISSIONED no later than
* DECOMMISSIONING_TIMEOUT regardless of running containers or applications.
*
* To be efficient, DecommissioningNodesWatcher skip tracking application
* containers on a particular node before the node is in DECOMMISSIONING state.
* It only tracks containers once the node is in DECOMMISSIONING state.
* DecommissioningNodesWatcher basically is no cost when no node is
* DECOMMISSIONING. This sacrifices the possibility that the node once
* host containers of an application that is still running
* (the affected map tasks will be rescheduled).
*/
public class DecommissioningNodesWatcher {
private static final Log LOG =
LogFactory.getLog(DecommissioningNodesWatcher.class);
private final RMContext rmContext;
// Default timeout value in mills.
// Negative value indicates no timeout. 0 means immediate.
private long defaultTimeoutMs =
1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
// Once a RMNode is observed in DECOMMISSIONING state,
// All its ContainerStatus update are tracked inside DecomNodeContext.
class DecommissioningNodeContext {
private final NodeId nodeId;
// Last known NodeState.
private NodeState nodeState;
// The moment node is observed in DECOMMISSIONING state.
private final long decommissioningStartTime;
private long lastContainerFinishTime;
// number of running containers at the moment.
private int numActiveContainers;
// All applications run on the node at or after decommissioningStartTime.
private Set<ApplicationId> appIds;
// First moment the node is observed in DECOMMISSIONED state.
private long decommissionedTime;
// Timeout in millis for this decommissioning node.
// This value could be dynamically updated with new value from RMNode.
private long timeoutMs;
private long lastUpdateTime;
public DecommissioningNodeContext(NodeId nodeId) {
this.nodeId = nodeId;
this.appIds = new HashSet<ApplicationId>();
this.decommissioningStartTime = mclock.getTime();
this.timeoutMs = defaultTimeoutMs;
}
void updateTimeout(Integer timeoutSec) {
this.timeoutMs = (timeoutSec == null)?
defaultTimeoutMs : (1000L * timeoutSec);
}
}
// All DECOMMISSIONING nodes to track.
private HashMap<NodeId, DecommissioningNodeContext> decomNodes =
new HashMap<NodeId, DecommissioningNodeContext>();
private Timer pollTimer;
private MonotonicClock mclock;
public DecommissioningNodesWatcher(RMContext rmContext) {
this.rmContext = rmContext;
pollTimer = new Timer(true);
mclock = new MonotonicClock();
}
public void init(Configuration conf) {
readDecommissioningTimeout(conf);
int v = conf.getInt(
YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL,
YarnConfiguration
.DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL);
pollTimer.schedule(new PollTimerTask(rmContext), 0, (1000L * v));
}
/**
* Update rmNode decommissioning status based on NodeStatus.
* @param rmNode The node
* @param remoteNodeStatus latest NodeStatus
*/
public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) {
DecommissioningNodeContext context = decomNodes.get(rmNode.getNodeID());
long now = mclock.getTime();
if (rmNode.getState() == NodeState.DECOMMISSIONED) {
if (context == null) {
return;
}
context.nodeState = rmNode.getState();
// keep DECOMMISSIONED node for a while for status log, so that such
// host will appear as DECOMMISSIONED instead of quietly disappears.
if (context.decommissionedTime == 0) {
context.decommissionedTime = now;
} else if (now - context.decommissionedTime > 60000L) {
decomNodes.remove(rmNode.getNodeID());
}
} else if (rmNode.getState() == NodeState.DECOMMISSIONING) {
if (context == null) {
context = new DecommissioningNodeContext(rmNode.getNodeID());
decomNodes.put(rmNode.getNodeID(), context);
context.nodeState = rmNode.getState();
context.decommissionedTime = 0;
}
context.updateTimeout(rmNode.getDecommissioningTimeout());
context.lastUpdateTime = now;
if (remoteNodeStatus.getKeepAliveApplications() != null) {
context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications());
}
// Count number of active containers.
int numActiveContainers = 0;
for (ContainerStatus cs : remoteNodeStatus.getContainersStatuses()) {
ContainerState newState = cs.getState();
if (newState == ContainerState.RUNNING ||
newState == ContainerState.NEW) {
numActiveContainers++;
}
context.numActiveContainers = numActiveContainers;
ApplicationId aid = cs.getContainerId()
.getApplicationAttemptId().getApplicationId();
if (!context.appIds.contains(aid)) {
context.appIds.add(aid);
}
}
context.numActiveContainers = numActiveContainers;
// maintain lastContainerFinishTime.
if (context.numActiveContainers == 0 &&
context.lastContainerFinishTime == 0) {
context.lastContainerFinishTime = now;
}
} else {
// remove node in other states
if (context != null) {
decomNodes.remove(rmNode.getNodeID());
}
}
}
public synchronized void remove(NodeId nodeId) {
DecommissioningNodeContext context = decomNodes.get(nodeId);
if (context != null) {
LOG.info("remove " + nodeId + " in " + context.nodeState);
decomNodes.remove(nodeId);
}
}
/**
* Status about a specific decommissioning node.
*
*/
public enum DecommissioningNodeStatus {
// Node is not in DECOMMISSIONING state.
NONE,
// wait for running containers to complete
WAIT_CONTAINER,
// wait for running application to complete (after all containers complete);
WAIT_APP,
// Timeout waiting for either containers or applications to complete.
TIMEOUT,
// nothing to wait, ready to be decommissioned
READY,
// The node has already been decommissioned
DECOMMISSIONED,
}
public boolean checkReadyToBeDecommissioned(NodeId nodeId) {
DecommissioningNodeStatus s = checkDecommissioningStatus(nodeId);
return (s == DecommissioningNodeStatus.READY ||
s == DecommissioningNodeStatus.TIMEOUT);
}
public DecommissioningNodeStatus checkDecommissioningStatus(NodeId nodeId) {
DecommissioningNodeContext context = decomNodes.get(nodeId);
if (context == null) {
return DecommissioningNodeStatus.NONE;
}
if (context.nodeState == NodeState.DECOMMISSIONED) {
return DecommissioningNodeStatus.DECOMMISSIONED;
}
long waitTime = mclock.getTime() - context.decommissioningStartTime;
if (context.numActiveContainers > 0) {
return (context.timeoutMs < 0 || waitTime < context.timeoutMs)?
DecommissioningNodeStatus.WAIT_CONTAINER :
DecommissioningNodeStatus.TIMEOUT;
}
removeCompletedApps(context);
if (context.appIds.size() == 0) {
return DecommissioningNodeStatus.READY;
} else {
return (context.timeoutMs < 0 || waitTime < context.timeoutMs)?
DecommissioningNodeStatus.WAIT_APP :
DecommissioningNodeStatus.TIMEOUT;
}
}
/**
* PollTimerTask periodically:
* 1. log status of all DECOMMISSIONING nodes;
* 2. identify and taken care of stale DECOMMISSIONING nodes
* (for example, node already terminated).
*/
class PollTimerTask extends TimerTask {
private final RMContext rmContext;
public PollTimerTask(RMContext rmContext) {
this.rmContext = rmContext;
}
public void run() {
logDecommissioningNodesStatus();
long now = mclock.getTime();
Set<NodeId> staleNodes = new HashSet<NodeId>();
for (Iterator<Map.Entry<NodeId, DecommissioningNodeContext>> it =
decomNodes.entrySet().iterator(); it.hasNext();) {
Map.Entry<NodeId, DecommissioningNodeContext> e = it.next();
DecommissioningNodeContext d = e.getValue();
// Skip node recently updated (NM usually updates every second).
if (now - d.lastUpdateTime < 5000L) {
continue;
}
// Remove stale non-DECOMMISSIONING node
if (d.nodeState != NodeState.DECOMMISSIONING) {
LOG.debug("remove " + d.nodeState + " " + d.nodeId);
it.remove();
continue;
} else if (now - d.lastUpdateTime > 60000L) {
// Node DECOMMISSIONED could become stale, remove as necessary.
RMNode rmNode = getRmNode(d.nodeId);
if (rmNode != null &&
rmNode.getState() == NodeState.DECOMMISSIONED) {
LOG.debug("remove " + rmNode.getState() + " " + d.nodeId);
it.remove();
continue;
}
}
if (d.timeoutMs >= 0 &&
d.decommissioningStartTime + d.timeoutMs < now) {
staleNodes.add(d.nodeId);
LOG.debug("Identified stale and timeout node " + d.nodeId);
}
}
for (NodeId nodeId : staleNodes) {
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode == null || rmNode.getState() != NodeState.DECOMMISSIONING) {
remove(nodeId);
continue;
}
if (rmNode.getState() == NodeState.DECOMMISSIONING &&
checkReadyToBeDecommissioned(rmNode.getNodeID())) {
LOG.info("DECOMMISSIONING " + nodeId + " timeout");
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
}
}
}
}
private RMNode getRmNode(NodeId nodeId) {
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode == null) {
rmNode = this.rmContext.getInactiveRMNodes().get(nodeId);
}
return rmNode;
}
private void removeCompletedApps(DecommissioningNodeContext context) {
Iterator<ApplicationId> it = context.appIds.iterator();
while (it.hasNext()) {
ApplicationId appId = it.next();
RMApp rmApp = rmContext.getRMApps().get(appId);
if (rmApp == null) {
LOG.debug("Consider non-existing app " + appId + " as completed");
it.remove();
continue;
}
if (rmApp.getState() == RMAppState.FINISHED ||
rmApp.getState() == RMAppState.FAILED ||
rmApp.getState() == RMAppState.KILLED) {
LOG.debug("Remove " + rmApp.getState() + " app " + appId);
it.remove();
}
}
}
// Time in second to be decommissioned.
private int getTimeoutInSec(DecommissioningNodeContext context) {
if (context.nodeState == NodeState.DECOMMISSIONED) {
return 0;
} else if (context.nodeState != NodeState.DECOMMISSIONING) {
return -1;
}
if (context.appIds.size() == 0 && context.numActiveContainers == 0) {
return 0;
}
// negative timeout value means no timeout (infinite timeout).
if (context.timeoutMs < 0) {
return -1;
}
long now = mclock.getTime();
long timeout = context.decommissioningStartTime + context.timeoutMs - now;
return Math.max(0, (int)(timeout / 1000));
}
private void logDecommissioningNodesStatus() {
if (!LOG.isDebugEnabled() || decomNodes.size() == 0) {
return;
}
StringBuilder sb = new StringBuilder();
long now = mclock.getTime();
for (DecommissioningNodeContext d : decomNodes.values()) {
DecommissioningNodeStatus s = checkDecommissioningStatus(d.nodeId);
sb.append(String.format(
"%n %-34s %4ds fresh:%3ds containers:%2d %14s",
d.nodeId.getHost(),
(now - d.decommissioningStartTime) / 1000,
(now - d.lastUpdateTime) / 1000,
d.numActiveContainers,
s));
if (s == DecommissioningNodeStatus.WAIT_APP ||
s == DecommissioningNodeStatus.WAIT_CONTAINER) {
sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d)));
}
for (ApplicationId aid : d.appIds) {
sb.append("\n " + aid);
RMApp rmApp = rmContext.getRMApps().get(aid);
if (rmApp != null) {
sb.append(String.format(
" %s %9s %5.2f%% %5ds",
rmApp.getState(),
(rmApp.getApplicationType() == null)?
"" : rmApp.getApplicationType(),
100.0 * rmApp.getProgress(),
(mclock.getTime() - rmApp.getStartTime()) / 1000));
}
}
}
LOG.info("Decommissioning Nodes: " + sb.toString());
}
// Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
// This enables DecommissioningNodesWatcher to pick up new value
// without ResourceManager restart.
private void readDecommissioningTimeout(Configuration conf) {
try {
if (conf == null) {
conf = new YarnConfiguration();
}
int v = conf.getInt(
YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
if (defaultTimeoutMs != 1000L * v) {
defaultTimeoutMs = 1000L * v;
LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs);
}
} catch (Exception e) {
LOG.info("Error readDecommissioningTimeout ", e);
}
}
}

View File

@ -19,14 +19,18 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -37,6 +41,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -47,14 +52,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import com.google.common.annotations.VisibleForTesting;
@SuppressWarnings("unchecked")
public class NodesListManager extends CompositeService implements
EventHandler<NodesListManagerEvent> {
@ -178,10 +184,11 @@ public class NodesListManager extends CompositeService implements
if (!LOG.isDebugEnabled()) {
return;
}
LOG.debug("hostsReader: in=" + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
LOG.debug("hostsReader: in=" +
conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" +
conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
Set<String> hostsList = new HashSet<String>();
@ -196,23 +203,19 @@ public class NodesListManager extends CompositeService implements
}
}
public void refreshNodes(Configuration yarnConf) throws IOException,
YarnException {
refreshHostsReader(yarnConf);
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) {
RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, nodeEventType));
}
}
updateInactiveNodes();
public void refreshNodes(Configuration yarnConf)
throws IOException, YarnException {
refreshNodes(yarnConf, false);
}
private void refreshHostsReader(Configuration yarnConf) throws IOException,
YarnException {
public void refreshNodes(Configuration yarnConf, boolean graceful)
throws IOException, YarnException {
refreshHostsReader(yarnConf, graceful, null);
}
private void refreshHostsReader(
Configuration yarnConf, boolean graceful, Integer timeout)
throws IOException, YarnException {
if (null == yarnConf) {
yarnConf = new YarnConfiguration();
}
@ -222,8 +225,16 @@ public class NodesListManager extends CompositeService implements
excludesFile =
yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
LOG.info("refreshNodes excludesFile " + excludesFile);
hostsReader.refresh(includesFile, excludesFile);
printConfiguredHosts();
LOG.info("hostsReader include:{" +
StringUtils.join(",", hostsReader.getHosts()) +
"} exclude:{" +
StringUtils.join(",", hostsReader.getExcludedHosts()) + "}");
handleExcludeNodeList(graceful, timeout);
}
private void setDecomissionedNMs() {
@ -237,6 +248,86 @@ public class NodesListManager extends CompositeService implements
}
}
// Handle excluded nodes based on following rules:
// Recommission DECOMMISSIONED or DECOMMISSIONING nodes no longer excluded;
// Gracefully decommission excluded nodes that are not already
// DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes
// that are already DECOMMISSIONED or DECOMMISSIONING.
private void handleExcludeNodeList(boolean graceful, Integer timeout) {
// DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned.
List<RMNode> nodesToRecom = new ArrayList<RMNode>();
// Nodes need to be decommissioned (graceful or forceful);
List<RMNode> nodesToDecom = new ArrayList<RMNode>();
Set<String> includes = new HashSet<String>();
Map<String, Integer> excludes = new HashMap<String, Integer>();
hostsReader.getHostDetails(includes, excludes);
for (RMNode n : this.rmContext.getRMNodes().values()) {
NodeState s = n.getState();
// An invalid node (either due to explicit exclude or not include)
// should be excluded.
boolean isExcluded = !isValidNode(
n.getHostName(), includes, excludes.keySet());
String nodeStr = "node " + n.getNodeID() + " with state " + s;
if (!isExcluded) {
// Note that no action is needed for DECOMMISSIONED node.
if (s == NodeState.DECOMMISSIONING) {
LOG.info("Recommission " + nodeStr);
nodesToRecom.add(n);
}
// Otherwise no-action needed.
} else {
// exclude is true.
if (graceful) {
// Use per node timeout if exist otherwise the request timeout.
Integer timeoutToUse = (excludes.get(n.getHostName()) != null)?
excludes.get(n.getHostName()) : timeout;
if (s != NodeState.DECOMMISSIONED &&
s != NodeState.DECOMMISSIONING) {
LOG.info("Gracefully decommission " + nodeStr);
nodesToDecom.add(n);
} else if (s == NodeState.DECOMMISSIONING &&
!Objects.equals(n.getDecommissioningTimeout(),
timeoutToUse)) {
LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse);
nodesToDecom.add(n);
} else {
LOG.info("No action for " + nodeStr);
}
} else {
if (s != NodeState.DECOMMISSIONED) {
LOG.info("Forcefully decommission " + nodeStr);
nodesToDecom.add(n);
}
}
}
}
for (RMNode n : nodesToRecom) {
RMNodeEvent e = new RMNodeEvent(
n.getNodeID(), RMNodeEventType.RECOMMISSION);
this.rmContext.getDispatcher().getEventHandler().handle(e);
}
for (RMNode n : nodesToDecom) {
RMNodeEvent e;
if (graceful) {
Integer timeoutToUse = (excludes.get(n.getHostName()) != null)?
excludes.get(n.getHostName()) : timeout;
e = new RMNodeDecommissioningEvent(n.getNodeID(), timeoutToUse);
} else {
RMNodeEventType eventType = isUntrackedNode(n.getHostName())?
RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
e = new RMNodeEvent(n.getNodeID(), eventType);
}
this.rmContext.getDispatcher().getEventHandler().handle(e);
}
updateInactiveNodes();
}
@VisibleForTesting
public int getNodeRemovalCheckInterval() {
return nodeRemovalCheckInterval;
@ -360,11 +451,15 @@ public class NodesListManager extends CompositeService implements
}
public boolean isValidNode(String hostName) {
String ip = resolver.resolve(hostName);
Set<String> hostsList = new HashSet<String>();
Set<String> excludeList = new HashSet<String>();
hostsReader.getHostDetails(hostsList, excludeList);
return isValidNode(hostName, hostsList, excludeList);
}
private boolean isValidNode(
String hostName, Set<String> hostsList, Set<String> excludeList) {
String ip = resolver.resolve(hostName);
return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
.contains(ip))
&& !(excludeList.contains(hostName) || excludeList.contains(ip));
@ -478,29 +573,14 @@ public class NodesListManager extends CompositeService implements
/**
* Refresh the nodes gracefully
*
* @param conf
* @param yarnConf
* @param timeout decommission timeout, null means default timeout.
* @throws IOException
* @throws YarnException
*/
public void refreshNodesGracefully(Configuration conf) throws IOException,
YarnException {
refreshHostsReader(conf);
for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) {
RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, nodeEventType));
} else {
// Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION));
}
}
}
updateInactiveNodes();
public void refreshNodesGracefully(Configuration yarnConf, Integer timeout)
throws IOException, YarnException {
refreshHostsReader(yarnConf, true, timeout);
}
/**
@ -596,4 +676,4 @@ public class NodesListManager extends CompositeService implements
this.host = hst;
}
}
}
}

View File

@ -70,7 +70,7 @@ public class RMServerUtils {
public static List<RMNode> queryRMNodes(RMContext context,
EnumSet<NodeState> acceptedStates) {
// nodes contains nodes that are NEW, RUNNING OR UNHEALTHY
// nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING.
ArrayList<RMNode> results = new ArrayList<RMNode>();
if (acceptedStates.contains(NodeState.NEW) ||
acceptedStates.contains(NodeState.RUNNING) ||

View File

@ -111,6 +111,8 @@ public class ResourceTrackerService extends AbstractService implements
private int minAllocMb;
private int minAllocVcores;
private DecommissioningNodesWatcher decommissioningWatcher;
private boolean isDistributedNodeLabelsConf;
private boolean isDelegatedCentralizedNodeLabelsConf;
private DynamicResourceConfiguration drConf;
@ -129,6 +131,7 @@ public class ResourceTrackerService extends AbstractService implements
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
this.decommissioningWatcher = new DecommissioningNodesWatcher(rmContext);
}
@Override
@ -168,6 +171,7 @@ public class ResourceTrackerService extends AbstractService implements
}
loadDynamicResourceConfiguration(conf);
decommissioningWatcher.init(conf);
super.serviceInit(conf);
}
@ -492,6 +496,7 @@ public class ResourceTrackerService extends AbstractService implements
// Send ping
this.nmLivelinessMonitor.receivedPing(nodeId);
this.decommissioningWatcher.update(rmNode, remoteNodeStatus);
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
@ -514,6 +519,20 @@ public class ResourceTrackerService extends AbstractService implements
message);
}
// Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED.
if (rmNode.getState() == NodeState.DECOMMISSIONING &&
decommissioningWatcher.checkReadyToBeDecommissioned(
rmNode.getNodeID())) {
String message = "DECOMMISSIONING " + nodeId +
" is ready to be decommissioned";
LOG.info(message);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
this.nmLivelinessMonitor.unregister(nodeId);
return YarnServerBuilderUtils.newNodeHeartbeatResponse(
NodeAction.SHUTDOWN, message);
}
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.

View File

@ -175,5 +175,12 @@ public interface RMNode {
long getUntrackedTimeStamp();
void setUntrackedTimeStamp(long timeStamp);
/**
* Optional decommissioning timeout in second
* (null indicates default timeout).
* @return the decommissioning timeout in second.
*/
Integer getDecommissioningTimeout();
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import org.apache.hadoop.yarn.api.records.NodeId;
/**
* RMNode Decommissioning Event.
*
*/
public class RMNodeDecommissioningEvent extends RMNodeEvent {
// Optional decommissioning timeout in second.
private final Integer decommissioningTimeout;
// Create instance with optional timeout
// (timeout could be null which means use default).
public RMNodeDecommissioningEvent(NodeId nodeId, Integer timeout) {
super(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION);
this.decommissioningTimeout = timeout;
}
public Integer getDecommissioningTimeout() {
return this.decommissioningTimeout;
}
}

View File

@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -124,6 +125,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private String healthReport;
private long lastHealthReportTime;
private String nodeManagerVersion;
private Integer decommissioningTimeout;
private long timeStamp;
/* Aggregated resource utilization for the containers. */
@ -179,7 +181,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
NodeState,
RMNodeEventType,
RMNodeEvent>(NodeState.NEW)
//Transitions from NEW state
.addTransition(NodeState.NEW, NodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition())
@ -265,6 +266,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
.addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED,
RMNodeEventType.REBOOTING,
new DeactivateNodeTransition(NodeState.REBOOTED))
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
@ -633,7 +637,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} catch (InvalidStateTransitionException e) {
LOG.error("Can't handle this event at current state", e);
LOG.error("Invalid event " + event.getType() +
" on Node " + this.nodeId);
" on Node " + this.nodeId + " oldState " + oldState);
}
if (oldState != getState()) {
LOG.info(nodeId + " Node Transitioned from " + oldState + " to "
@ -666,6 +670,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
case SHUTDOWN:
metrics.decrNumShutdownNMs();
break;
case DECOMMISSIONING:
metrics.decrDecommissioningNMs();
break;
default:
LOG.debug("Unexpected previous node state");
}
@ -712,6 +719,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
case DECOMMISSIONING:
metrics.decrDecommissioningNMs();
break;
case DECOMMISSIONED:
metrics.decrDecommisionedNMs();
break;
case UNHEALTHY:
metrics.decrNumUnhealthyNMs();
break;
@ -1087,9 +1097,26 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
Integer timeout = null;
if (RMNodeDecommissioningEvent.class.isInstance(event)) {
RMNodeDecommissioningEvent e = ((RMNodeDecommissioningEvent) event);
timeout = e.getDecommissioningTimeout();
}
// Pick up possible updates on decommissioningTimeout.
if (rmNode.getState() == NodeState.DECOMMISSIONING) {
if (!Objects.equals(rmNode.getDecommissioningTimeout(), timeout)) {
LOG.info("Update " + rmNode.getNodeID() +
" DecommissioningTimeout to be " + timeout);
rmNode.decommissioningTimeout = timeout;
} else {
LOG.info(rmNode.getNodeID() + " is already DECOMMISSIONING");
}
return;
}
LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
// Update NM metrics during graceful decommissioning.
rmNode.updateMetricsForGracefulDecommission(initState, finalState);
rmNode.decommissioningTimeout = timeout;
if (rmNode.originalTotalCapability == null){
rmNode.originalTotalCapability =
Resources.clone(rmNode.totalCapability);
@ -1156,24 +1183,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
return NodeState.UNHEALTHY;
}
}
if (isNodeDecommissioning) {
List<ApplicationId> runningApps = rmNode.getRunningApps();
List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
// no running (and keeping alive) app on this node, get it
// decommissioned.
// TODO may need to check no container is being scheduled on this node
// as well.
if ((runningApps == null || runningApps.size() == 0)
&& (keepAliveApps == null || keepAliveApps.size() == 0)) {
RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
return NodeState.DECOMMISSIONED;
}
// TODO (in YARN-3223) if node in decommissioning, get node resource
// updated if container get finished (keep available resource to be 0)
}
rmNode.handleContainerStatus(statusEvent.getContainers());
rmNode.handleReportedIncreasedContainers(
@ -1473,4 +1482,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.writeLock.unlock();
}
}
@Override
public Integer getDecommissioningTimeout() {
return decommissioningTimeout;
}
}

View File

@ -97,7 +97,7 @@ public class ClusterMetricsInfo {
this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
this.shutdownNodes = clusterMetrics.getNumShutdownNMs();
this.totalNodes = activeNodes + lostNodes + decommissionedNodes
+ rebootedNodes + unhealthyNodes + shutdownNodes;
+ rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes;
}
public int getAppsSubmitted() {

View File

@ -274,6 +274,11 @@ public class MockNodes {
public QueuedContainersStatus getQueuedContainersStatus() {
return null;
}
@Override
public Integer getDecommissioningTimeout() {
return null;
}
};
private static RMNode buildRMNode(int rack, final Resource perNode,

View File

@ -709,6 +709,9 @@ public class MockRM extends ResourceManager {
public void waitForState(NodeId nodeId, NodeState finalState)
throws InterruptedException {
RMNode node = getRMContext().getRMNodes().get(nodeId);
if (node == null) {
node = getRMContext().getInactiveRMNodes().get(nodeId);
}
Assert.assertNotNull("node shouldn't be null", node);
int timeWaiting = 0;
while (!finalState.equals(node.getState())) {
@ -722,11 +725,17 @@ public class MockRM extends ResourceManager {
timeWaiting += WAIT_MS_PER_LOOP;
}
System.out.println("Node State is : " + node.getState());
System.out.println("Node " + nodeId + " State is : " + node.getState());
Assert.assertEquals("Node state is not correct (timedout)", finalState,
node.getState());
}
public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception {
RMNodeImpl node = (RMNodeImpl)
getRMContext().getRMNodes().get(nm.getNodeId());
node.handle(new RMNodeEvent(nm.getNodeId(), event));
}
public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
ApplicationClientProtocol client = getClientRMService();
KillApplicationRequest req = KillApplicationRequest.newInstance(appId);

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
/**
* This class tests DecommissioningNodesWatcher.
*/
public class TestDecommissioningNodesWatcher {
private MockRM rm;
@Test
public void testDecommissioningNodesWatcher() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "40");
rm = new MockRM(conf);
rm.start();
DecommissioningNodesWatcher watcher =
new DecommissioningNodesWatcher(rm.getRMContext());
MockNM nm1 = rm.registerNode("host1:1234", 10240);
RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
NodeId id1 = nm1.getNodeId();
rm.waitForState(id1, NodeState.RUNNING);
Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
RMApp app = rm.submitApp(2000);
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
// Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher.
rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION);
rm.waitForState(id1, NodeState.DECOMMISSIONING);
// Update status with decreasing number of running containers until 0.
watcher.update(node1, createNodeStatus(id1, app, 12));
watcher.update(node1, createNodeStatus(id1, app, 11));
Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
watcher.update(node1, createNodeStatus(id1, app, 1));
Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER,
watcher.checkDecommissioningStatus(id1));
watcher.update(node1, createNodeStatus(id1, app, 0));
Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP,
watcher.checkDecommissioningStatus(id1));
// Set app to be FINISHED and verified DecommissioningNodeStatus is READY.
MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
Assert.assertEquals(DecommissioningNodeStatus.READY,
watcher.checkDecommissioningStatus(id1));
}
@After
public void tearDown() {
if (rm != null) {
rm.stop();
}
}
private NodeStatus createNodeStatus(
NodeId nodeId, RMApp app, int numRunningContainers) {
return NodeStatus.newInstance(
nodeId, 0, getContainerStatuses(app, numRunningContainers),
new ArrayList<ApplicationId>(),
NodeHealthStatus.newInstance(
true, "", System.currentTimeMillis() - 1000),
null, null, null);
}
// Get mocked ContainerStatus for bunch of containers,
// where numRunningContainers are RUNNING.
private List<ContainerStatus> getContainerStatuses(
RMApp app, int numRunningContainers) {
// Total 12 containers
final int total = 12;
numRunningContainers = Math.min(total, numRunningContainers);
List<ContainerStatus> output = new ArrayList<ContainerStatus>();
for (int i = 0; i < total; i++) {
ContainerState cstate = (i >= numRunningContainers)?
ContainerState.COMPLETE : ContainerState.RUNNING;
output.add(ContainerStatus.newInstance(
ContainerId.newContainerId(
ApplicationAttemptId.newInstance(app.getApplicationId(), i), 1),
cstate, "Dummy", 0));
}
return output;
}
}

View File

@ -254,17 +254,6 @@ public class TestRMNodeTransitions {
cm.getNumDecommissioningNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
cm.getNumDecommisionedNMs());
// Verify node in DECOMMISSIONING will be changed by status update
// without running apps
statusEvent = getMockRMNodeStatusEventWithoutRunningApps();
node.handle(statusEvent);
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
cm.getNumDecommissioningNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1,
cm.getNumDecommisionedNMs());
}
@Test

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -82,6 +83,9 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
private final static File TEMP_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "decommision");
private final File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
private final File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
private MockRM rm;
/**
@ -216,6 +220,109 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
checkDecommissionedNMCount(rm, metricCount + 1);
}
/**
* Graceful decommission node with no running application.
*/
@Test
public void testGracefulDecommissionNoApp() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
writeToHostsFile("");
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("host3:4433", 5120);
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction()));
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction()));
rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);
// Graceful decommission both host2 and host3.
writeToHostsFile("host2", "host3");
rm.getNodesListManager().refreshNodes(conf, true);
rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING);
nodeHeartbeat1 = nm1.nodeHeartbeat(true);
nodeHeartbeat2 = nm2.nodeHeartbeat(true);
nodeHeartbeat3 = nm3.nodeHeartbeat(true);
checkDecommissionedNMCount(rm, metricCount + 2);
rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED);
rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction());
Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
}
/**
* Graceful decommission node with running application.
*/
@Test
public void testGracefulDecommissionWithApp() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
writeToHostsFile("");
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 10240);
MockNM nm2 = rm.registerNode("host2:5678", 20480);
MockNM nm3 = rm.registerNode("host3:4433", 10240);
NodeId id1 = nm1.getNodeId();
NodeId id3 = nm3.getNodeId();
rm.waitForState(id1, NodeState.RUNNING);
rm.waitForState(id3, NodeState.RUNNING);
// Create an app and launch two containers on host1.
RMApp app = rm.submitApp(2000);
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId();
// Graceful decommission host1 and host3
writeToHostsFile("host1", "host3");
rm.getNodesListManager().refreshNodes(conf, true);
rm.waitForState(id1, NodeState.DECOMMISSIONING);
rm.waitForState(id3, NodeState.DECOMMISSIONING);
// host1 should be DECOMMISSIONING due to running containers.
// host3 should become DECOMMISSIONED.
nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);
nm3.nodeHeartbeat(true);
rm.waitForState(id1, NodeState.DECOMMISSIONING);
rm.waitForState(id3, NodeState.DECOMMISSIONED);
nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);
// Complete containers on host1.
// Since the app is still RUNNING, expect NodeAction.NORMAL.
NodeHeartbeatResponse nodeHeartbeat1 =
nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE);
Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction());
// Finish the app and verified DECOMMISSIONED.
MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE);
Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction());
rm.waitForState(id1, NodeState.DECOMMISSIONED);
}
/**
* Decommissioning using a post-configured include hosts file
*/
@ -1139,19 +1246,17 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
MockNM nm1 = new MockNM("host1:1234", 5120, resourceTrackerService);
RegisterNodeManagerResponse response = nm1.registerNode();
Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction());
int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
writeToHostsFile("host2");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkShutdownNMCount(rm, ++shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
// 1. Register the Node Manager
@ -1187,8 +1292,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
File excludeHostFile =
new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
writeToHostsFile(excludeHostFile, "host1");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
@ -1214,8 +1317,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Assert.assertEquals("The inactiveRMNodes should contain an entry for the" +
"decommissioned node",
1, rm1.getRMContext().getInactiveRMNodes().size());
excludeHostFile =
new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
writeToHostsFile(excludeHostFile, "");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
@ -1245,8 +1346,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
//host3 will not register or heartbeat
File excludeHostFile =
new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
writeToHostsFile(excludeHostFile, "host3", "host2");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
@ -1278,14 +1377,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
MockNM nm2 = rm.registerNode("host2:5678", 10240);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
File excludeHostFile =
new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
writeToHostsFile(excludeHostFile, "host3", "host2");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
writeToHostsFile(hostFile, "host1", "host2");
writeToHostsFile(excludeHostFile, "host1");
rm.getNodesListManager().refreshNodesGracefully(conf);
rm.getNodesListManager().refreshNodesGracefully(conf, null);
rm.drainEvents();
nm1.nodeHeartbeat(true);
rm.drainEvents();
@ -1294,7 +1391,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
.getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState
.DECOMMISSIONED);
writeToHostsFile(excludeHostFile, "");
rm.getNodesListManager().refreshNodesGracefully(conf);
rm.getNodesListManager().refreshNodesGracefully(conf, null);
rm.drainEvents();
Assert.assertTrue("Node " + nm1.getNodeId().getHost() +
" should be Decommissioned", rm.getRMContext()
@ -1304,7 +1401,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
}
/**
* Remove a node from all lists and check if its forgotten
* Remove a node from all lists and check if its forgotten.
*/
@Test
public void testNodeRemovalNormally() throws Exception {
@ -1325,7 +1422,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
public void refreshNodesOption(boolean doGraceful, Configuration conf)
throws Exception {
if (doGraceful) {
rm.getNodesListManager().refreshNodesGracefully(conf);
rm.getNodesListManager().refreshNodesGracefully(conf, null);
} else {
rm.getNodesListManager().refreshNodes(conf);
}
@ -1334,8 +1431,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
Configuration conf = new Configuration();
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
@ -1369,18 +1464,23 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
refreshNodesOption(doGraceful, conf);
if (doGraceful) {
rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
}
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertTrue("Node should not be in active node list",
!rmContext.getRMNodes().containsKey(nm2.getNodeId()));
RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should be in inactive node list",
rmNode.getState(), NodeState.SHUTDOWN);
rmNode.getState(),
doGraceful? NodeState.DECOMMISSIONED : NodeState.SHUTDOWN);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
Assert.assertEquals("Shutdown nodes should be 1",
metrics.getNumShutdownNMs(), 1);
Assert.assertEquals("Shutdown nodes should be expected",
metrics.getNumShutdownNMs(), doGraceful? 0 : 1);
int nodeRemovalTimeout =
conf.getInt(
@ -1405,14 +1505,18 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.drainEvents();
writeToHostsFile("host1", ip);
refreshNodesOption(doGraceful, conf);
rm.waitForState(nm2.getNodeId(),
doGraceful? NodeState.DECOMMISSIONING : NodeState.SHUTDOWN);
nm2.nodeHeartbeat(true);
rm.drainEvents();
rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should be shutdown",
rmNode.getState(), NodeState.SHUTDOWN);
rmNode.getState(),
doGraceful? NodeState.DECOMMISSIONED : NodeState.SHUTDOWN);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
Assert.assertEquals("Shutdown nodes should be 1",
metrics.getNumShutdownNMs(), 1);
Assert.assertEquals("Shutdown nodes should be expected",
metrics.getNumShutdownNMs(), doGraceful? 0 : 1);
//add back the node before timer expires
latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
@ -1456,6 +1560,20 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
}
//Test decommed/ing node that transitions to untracked,timer should remove
testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3,
maxThreadSleeptime, doGraceful);
rm.stop();
}
// A helper method used by testNodeRemovalUtil to avoid exceeding
// max allowed length.
private void testNodeRemovalUtilDecomToUntracked(
RMContext rmContext, Configuration conf,
MockNM nm1, MockNM nm2, MockNM nm3,
long maxThreadSleeptime, boolean doGraceful) throws Exception {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
String ip = NetUtils.normalizeHostName("localhost");
CountDownLatch latch = new CountDownLatch(1);
writeToHostsFile("host1", ip, "host2");
writeToHostsFile(excludeHostFile, "host2");
refreshNodesOption(doGraceful, conf);
@ -1463,7 +1581,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
//nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
RMNode rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertNotEquals("Timer for this node was not canceled!",
rmNode, null);
@ -1474,6 +1592,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
writeToHostsFile("host1", ip);
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
nm2.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
@ -1485,16 +1604,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
rm.stop();
}
private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
Configuration conf = new Configuration();
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 2000);
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
@ -1527,7 +1642,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
int waitCount = 0;
while(waitCount ++<20){
while(waitCount++ < 20){
synchronized (this) {
wait(200);
}
@ -1579,8 +1694,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
throws Exception {
Configuration conf = new Configuration();
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
@ -1651,8 +1764,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
throws Exception {
Configuration conf = new Configuration();
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
@ -1696,15 +1807,19 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
nm2.nodeHeartbeat(false);
nm3.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertNotEquals("host2 should be a shutdown NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("host2 should be a shutdown NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
NodeState.SHUTDOWN);
if (!doGraceful) {
Assert.assertNotEquals("host2 should be a shutdown NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("host2 should be a shutdown NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
NodeState.SHUTDOWN);
}
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
Assert.assertEquals("There should be 1 Shutdown NM!",
clusterMetrics.getNumShutdownNMs(), 1);
if (!doGraceful) {
Assert.assertEquals("There should be 1 Shutdown NM!",
clusterMetrics.getNumShutdownNMs(), 1);
}
Assert.assertEquals("There should be 0 Unhealthy NM!",
clusterMetrics.getUnhealthyNMs(), 0);
int nodeRemovalTimeout =
@ -1732,7 +1847,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
}
private void writeToHostsFile(String... hosts) throws IOException {
writeToHostsFile(hostFile, hosts);
writeToHostsFile(hostFile, hosts);
}
private void writeToHostsFile(File file, String... hosts)

View File

@ -210,8 +210,6 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase {
nm1.registerNode();
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.getRMContext().getNodesListManager().getHostsReader().
getExcludedHosts().add("127.0.0.1");
rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMNodeEvent(nm1.getNodeId(),
RMNodeEventType.GRACEFUL_DECOMMISSION));

View File

@ -239,7 +239,7 @@ Usage:
| COMMAND\_OPTIONS | Description |
|:---- |:---- |
| -refreshQueues | Reload the queues' acls, states and scheduler specific properties. ResourceManager will reload the mapred-queues configuration file. |
| -refreshNodes | Refresh the hosts information at the ResourceManager. |
| -refreshNodes [-g|graceful [timeout in seconds] -client|server] | Refresh the hosts information at the ResourceManager. -g option indicates graceful decommission of excluded hosts, in which case, the optional timeout indicates maximal time in seconds ResourceManager should wait before forcefully mark the node as decommissioned. |
| -refreshNodesResources | Refresh resources of NodeManagers at the ResourceManager. |
| -refreshSuperUserGroupsConfiguration | Refresh superuser proxy groups mappings. |
| -refreshUserToGroupsMappings | Refresh user-to-groups mappings. |