YARN-10854. Support marking inactive node as untracked without configured include path. Contributed by Tao Yang.
This commit is contained in:
parent
e31169c864
commit
efb3fa2bf5
|
@ -1168,6 +1168,15 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final int
|
public static final int
|
||||||
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000;
|
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to enable RM to mark inactive nodes as untracked and removed from
|
||||||
|
* nodes list for the YARN cluster without configured include path.
|
||||||
|
*/
|
||||||
|
public static final String RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH =
|
||||||
|
RM_PREFIX + "enable-node-untracked-without-include-path";
|
||||||
|
public static final boolean
|
||||||
|
DEFAULT_RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RM proxy users' prefix
|
* RM proxy users' prefix
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -4779,4 +4779,25 @@
|
||||||
<name>yarn.resourcemanager.application-tag-based-placement.force-lowercase</name>
|
<name>yarn.resourcemanager.application-tag-based-placement.force-lowercase</name>
|
||||||
<value>true</value>
|
<value>true</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
Whether to enable RM to mark inactive nodes as untracked after the timeout
|
||||||
|
specified by yarn.resourcemanager.node-removal-untracked.timeout-ms and
|
||||||
|
then remove them from nodes list for the YARN cluster without configured
|
||||||
|
include path, which means RM can periodically clear inactive nodes to
|
||||||
|
avoid increasing memory to store these data when enabled, most desired by
|
||||||
|
elastic cloud environment with frequent auto-scaling operations.
|
||||||
|
It works only when the YARN cluster doesn't utilize include file, the key
|
||||||
|
configurations are as follows:
|
||||||
|
yarn.resourcemanager.nodes.exclude-path=/path-to-exclude-file
|
||||||
|
yarn.resourcemanager.nodes.include-path=
|
||||||
|
yarn.resourcemanager.node-removal-untracked.timeout-ms=60000
|
||||||
|
In this situation, the inactive nodes will never be marked as untracked
|
||||||
|
and removed from the nodes list unless this configuration is enabled:
|
||||||
|
yarn.resourcemanager.enable-node-untracked-without-include-path=true
|
||||||
|
</description>
|
||||||
|
<name>yarn.resourcemanager.enable-node-untracked-without-include-path</name>
|
||||||
|
<value>false</value>
|
||||||
|
</property>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -85,6 +85,7 @@ public class NodesListManager extends CompositeService implements
|
||||||
private Timer removalTimer;
|
private Timer removalTimer;
|
||||||
private int nodeRemovalCheckInterval;
|
private int nodeRemovalCheckInterval;
|
||||||
private Set<RMNode> gracefulDecommissionableNodes;
|
private Set<RMNode> gracefulDecommissionableNodes;
|
||||||
|
private boolean enableNodeUntrackedWithoutIncludePath;
|
||||||
|
|
||||||
public NodesListManager(RMContext rmContext) {
|
public NodesListManager(RMContext rmContext) {
|
||||||
super(NodesListManager.class.getName());
|
super(NodesListManager.class.getName());
|
||||||
|
@ -124,6 +125,9 @@ public class NodesListManager extends CompositeService implements
|
||||||
disableHostsFileReader(ioe);
|
disableHostsFileReader(ioe);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enableNodeUntrackedWithoutIncludePath = conf.getBoolean(
|
||||||
|
YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH,
|
||||||
|
YarnConfiguration.DEFAULT_RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH);
|
||||||
final int nodeRemovalTimeout =
|
final int nodeRemovalTimeout =
|
||||||
conf.getInt(
|
conf.getInt(
|
||||||
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
|
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
|
||||||
|
@ -605,7 +609,10 @@ public class NodesListManager extends CompositeService implements
|
||||||
Set<String> hostsList = hostDetails.getIncludedHosts();
|
Set<String> hostsList = hostDetails.getIncludedHosts();
|
||||||
Set<String> excludeList = hostDetails.getExcludedHosts();
|
Set<String> excludeList = hostDetails.getExcludedHosts();
|
||||||
|
|
||||||
return !hostsList.isEmpty() && !hostsList.contains(hostName)
|
return (!hostsList.isEmpty() || (enableNodeUntrackedWithoutIncludePath
|
||||||
|
&& (hostDetails.getIncludesFile() == null
|
||||||
|
|| hostDetails.getIncludesFile().isEmpty())))
|
||||||
|
&& !hostsList.contains(hostName)
|
||||||
&& !hostsList.contains(ip) && !excludeList.contains(hostName)
|
&& !hostsList.contains(ip) && !excludeList.contains(hostName)
|
||||||
&& !excludeList.contains(ip);
|
&& !excludeList.contains(ip);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,14 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.hadoop.net.ServerSocketUtil;
|
import org.apache.hadoop.net.ServerSocketUtil;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.util.Sets;
|
||||||
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
|
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
|
||||||
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
|
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
|
||||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||||
|
@ -122,6 +124,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
||||||
|
@ -3063,4 +3066,86 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
|
|
||||||
resourceTrackerService.close();
|
resourceTrackerService.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decommissioning without pre-configured include hosts file.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDecommissionWithoutIncludeFile() throws Exception {
|
||||||
|
// clear exclude hosts
|
||||||
|
writeToHostsFile(excludeHostFile, "");
|
||||||
|
// init conf:
|
||||||
|
// (1) set untracked removal timeout to 500ms
|
||||||
|
// (2) set exclude path (no include path)
|
||||||
|
// (3) enable node untracked without pre-configured include path
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
|
||||||
|
500);
|
||||||
|
conf.setBoolean(
|
||||||
|
YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH, true);
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||||
|
excludeHostFile.getAbsolutePath());
|
||||||
|
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
MockNM nm1 = rm.registerNode("host1:1234", 10240);
|
||||||
|
MockNM nm2 = rm.registerNode("host2:1234", 10240);
|
||||||
|
MockNM nm3 = rm.registerNode("host3:1234", 10240);
|
||||||
|
MockNM nm4 = rm.registerNode("host4:1234", 10240);
|
||||||
|
assertEquals(4, rm.getRMContext().getRMNodes().size());
|
||||||
|
assertEquals(0, rm.getRMContext().getInactiveRMNodes().size());
|
||||||
|
|
||||||
|
// decommission nm1 via adding nm1 into exclude hosts
|
||||||
|
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
writeToHostsFile(excludeHostFile, "host1");
|
||||||
|
rm.getNodesListManager().refreshNodes(conf);
|
||||||
|
rm.drainEvents();
|
||||||
|
assertEquals(rmNode1.getState(), NodeState.DECOMMISSIONED);
|
||||||
|
assertEquals(3, rm.getRMContext().getRMNodes().size());
|
||||||
|
assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
|
||||||
|
assertEquals(Sets.newHashSet(nm1.getNodeId()),
|
||||||
|
rm.getRMContext().getInactiveRMNodes().keySet());
|
||||||
|
|
||||||
|
// remove nm1 from exclude hosts, so that it will be marked as untracked
|
||||||
|
// and removed from inactive nodes after the timeout
|
||||||
|
writeToHostsFile(excludeHostFile, "");
|
||||||
|
rm.getNodesListManager().refreshNodes(conf);
|
||||||
|
// confirmed that nm1 should be removed from inactive nodes in 1 second
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> rm.getRMContext().getInactiveRMNodes().size() == 0, 100, 1000);
|
||||||
|
|
||||||
|
// lost nm2
|
||||||
|
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
|
||||||
|
rm.getRMContext().getDispatcher().getEventHandler()
|
||||||
|
.handle(new RMNodeEvent(nm2.getNodeId(), RMNodeEventType.EXPIRE));
|
||||||
|
rm.drainEvents();
|
||||||
|
assertEquals(rmNode2.getState(), NodeState.LOST);
|
||||||
|
assertEquals(2, rm.getRMContext().getRMNodes().size());
|
||||||
|
assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
|
||||||
|
assertEquals(Sets.newHashSet(nm2.getNodeId()),
|
||||||
|
rm.getRMContext().getInactiveRMNodes().keySet());
|
||||||
|
// confirmed that nm2 should be removed from inactive nodes in 1 second
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> rm.getRMContext().getInactiveRMNodes().size() == 0, 100, 1000);
|
||||||
|
|
||||||
|
// shutdown nm3
|
||||||
|
RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
|
||||||
|
rm.getRMContext().getDispatcher().getEventHandler()
|
||||||
|
.handle(new RMNodeEvent(nm3.getNodeId(), RMNodeEventType.SHUTDOWN));
|
||||||
|
rm.drainEvents();
|
||||||
|
assertEquals(rmNode3.getState(), NodeState.SHUTDOWN);
|
||||||
|
assertEquals(1, rm.getRMContext().getRMNodes().size());
|
||||||
|
assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
|
||||||
|
assertEquals(Sets.newHashSet(nm3.getNodeId()),
|
||||||
|
rm.getRMContext().getInactiveRMNodes().keySet());
|
||||||
|
// confirmed that nm3 should be removed from inactive nodes in 1 second
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> rm.getRMContext().getInactiveRMNodes().size() == 0, 100, 1000);
|
||||||
|
|
||||||
|
// nm4 is still active node at last
|
||||||
|
assertEquals(Sets.newHashSet(nm4.getNodeId()),
|
||||||
|
rm.getRMContext().getRMNodes().keySet());
|
||||||
|
|
||||||
|
rm.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue