YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat. (Hong Zhiguo via wangda)

This commit is contained in:
Wangda Tan 2015-09-04 15:13:53 -07:00
parent e1feaf6db0
commit bcc85e3bab
5 changed files with 255 additions and 3 deletions

View File

@ -813,6 +813,9 @@ Release 2.8.0 - UNRELEASED
YARN-4073. Removed unused ApplicationACLsManager in ContainerManagerImpl constructor.
(Naganarasimha G R via rohithsharmaks)
YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat.
(Hong Zhiguo via wangda)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -746,6 +746,11 @@ public class YarnConfiguration extends Configuration {
+ "proxy-user-privileges.enabled";
public static final boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
/** The expiry interval for node IP caching. -1 disables the caching */
public static final String RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = RM_PREFIX
+ "node-ip-cache.expiry-interval-secs";
public static final int DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = -1;
/**
* How many diagnostics/failure messages can be saved in RM for
* log aggregation. It also defines the number of diagnostics/failure

View File

@ -272,6 +272,12 @@
<value></value>
</property>
<property>
<description>The expiry interval for node IP caching. -1 disables the caching</description>
<name>yarn.resourcemanager.node-ip-cache.expiry-interval-secs</name>
<value>-1</value>
</property>
<property>
<description>Number of threads to handle resource tracker calls.</description>
<name>yarn.resourcemanager.resource-tracker.client.thread-count</name>

View File

@ -24,13 +24,18 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@ -46,9 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
@SuppressWarnings("unchecked")
public class NodesListManager extends AbstractService implements
public class NodesListManager extends CompositeService implements
EventHandler<NodesListManagerEvent> {
private static final Log LOG = LogFactory.getLog(NodesListManager.class);
@ -63,6 +70,8 @@ public class NodesListManager extends AbstractService implements
private String includesFile;
private String excludesFile;
private Resolver resolver;
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
this.rmContext = rmContext;
@ -73,6 +82,16 @@ public class NodesListManager extends AbstractService implements
this.conf = conf;
int nodeIpCacheTimeout = conf.getInt(
YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS,
YarnConfiguration.DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS);
if (nodeIpCacheTimeout <= 0) {
resolver = new DirectResolver();
} else {
resolver = new CachedResolver(new SystemClock(), nodeIpCacheTimeout);
addIfService(resolver);
}
// Read the hosts/exclude files to restrict access to the RM
try {
this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
@ -148,17 +167,129 @@ public class NodesListManager extends AbstractService implements
ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size());
}
@VisibleForTesting
public Resolver getResolver() {
return resolver;
}
@VisibleForTesting
public interface Resolver {
// try to resolve hostName to IP address, fallback to hostName if failed
String resolve(String hostName);
}
@VisibleForTesting
public static class DirectResolver implements Resolver {
@Override
public String resolve(String hostName) {
return NetUtils.normalizeHostName(hostName);
}
}
@VisibleForTesting
public static class CachedResolver extends AbstractService
implements Resolver {
private static class CacheEntry {
public String ip;
public long resolveTime;
public CacheEntry(String ip, long resolveTime) {
this.ip = ip;
this.resolveTime = resolveTime;
}
}
private Map<String, CacheEntry> cache =
new ConcurrentHashMap<String, CacheEntry>();
private int expiryIntervalMs;
private int checkIntervalMs;
private final Clock clock;
private Timer checkingTimer;
private TimerTask expireChecker = new ExpireChecker();
public CachedResolver(Clock clock, int expiryIntervalSecs) {
super("NodesListManager.CachedResolver");
this.clock = clock;
this.expiryIntervalMs = expiryIntervalSecs * 1000;
checkIntervalMs = expiryIntervalMs/3;
checkingTimer = new Timer(
"Timer-NodesListManager.CachedResolver.ExpireChecker", true);
}
@Override
protected void serviceStart() throws Exception {
checkingTimer.scheduleAtFixedRate(
expireChecker, checkIntervalMs, checkIntervalMs);
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
checkingTimer.cancel();
super.serviceStop();
}
@VisibleForTesting
public void addToCache(String hostName, String ip) {
cache.put(hostName, new CacheEntry(ip, clock.getTime()));
}
public void removeFromCache(String hostName) {
cache.remove(hostName);
}
private String reload(String hostName) {
String ip = NetUtils.normalizeHostName(hostName);
addToCache(hostName, ip);
return ip;
}
@Override
public String resolve(String hostName) {
CacheEntry e = cache.get(hostName);
if (e != null) {
return e.ip;
}
return reload(hostName);
}
@VisibleForTesting
public TimerTask getExpireChecker() {
return expireChecker;
}
private class ExpireChecker extends TimerTask {
@Override
public void run() {
long currentTime = clock.getTime();
Iterator<Map.Entry<String, CacheEntry>> iterator =
cache.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, CacheEntry> entry = iterator.next();
if (currentTime >
entry.getValue().resolveTime +
CachedResolver.this.expiryIntervalMs) {
iterator.remove();
if (LOG.isDebugEnabled()) {
LOG.debug("[" + entry.getKey() + ":" + entry.getValue().ip +
"] Expired after " +
CachedResolver.this.expiryIntervalMs / 1000 + " secs");
}
}
}
}
}
}
public boolean isValidNode(String hostName) {
String ip = resolver.resolve(hostName);
synchronized (hostsReader) {
Set<String> hostsList = hostsReader.getHosts();
Set<String> excludeList = hostsReader.getExcludedHosts();
String ip = NetUtils.normalizeHostName(hostName);
return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
.contains(ip))
&& !(excludeList.contains(hostName) || excludeList.contains(ip));
}
}
/**
* Provides the currently unusable nodes. Copies it into provided collection.
* @param unUsableNodes
@ -207,6 +338,11 @@ public class NodesListManager extends AbstractService implements
default:
LOG.error("Ignoring invalid eventtype " + event.getType());
}
// remove the cache of normalized hostname if enabled
if (resolver instanceof CachedResolver) {
((CachedResolver)resolver).removeFromCache(
eventNode.getNodeID().getHost());
}
}
private void disableHostsFileReader(Exception ex) {

View File

@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@ -130,6 +132,106 @@ public class TestNodesListManager {
}
@Test
public void testCachedResolver() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ControlledClock clock = new ControlledClock(new SystemClock());
clock.setTime(0);
final int CACHE_EXPIRY_INTERVAL_SECS = 30;
NodesListManager.CachedResolver resolver =
new NodesListManager.CachedResolver(clock, CACHE_EXPIRY_INTERVAL_SECS);
resolver.init(new YarnConfiguration());
resolver.start();
resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
Assert.assertEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
Assert.assertEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
// test removeFromCache
resolver.removeFromCache("testCachedResolverHost1");
Assert.assertNotEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
// test expiry
clock.tickMsec(CACHE_EXPIRY_INTERVAL_SECS * 1000 + 1);
resolver.getExpireChecker().run();
Assert.assertNotEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertNotEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
}
@Test
public void testDefaultResolver() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
YarnConfiguration conf = new YarnConfiguration();
MockRM rm = new MockRM(conf);
rm.init(conf);
NodesListManager nodesListManager = rm.getNodesListManager();
NodesListManager.Resolver resolver = nodesListManager.getResolver();
Assert.assertTrue("default resolver should be DirectResolver",
resolver instanceof NodesListManager.DirectResolver);
}
@Test
public void testCachedResolverWithEvent() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS, 30);
MockRM rm = new MockRM(conf);
rm.init(conf);
NodesListManager nodesListManager = rm.getNodesListManager();
nodesListManager.init(conf);
nodesListManager.start();
NodesListManager.CachedResolver resolver =
(NodesListManager.CachedResolver)nodesListManager.getResolver();
resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
Assert.assertEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
RMNode rmnode1 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
1, "testCachedResolverHost1", 1234);
RMNode rmnode2 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
1, "testCachedResolverHost2", 1234);
nodesListManager.handle(
new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
rmnode1));
Assert.assertNotEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
nodesListManager.handle(
new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
rmnode2));
Assert.assertNotEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertNotEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
}
/*
* Create dispatcher object
*/