From bcc85e3bab78bcacd430eac23141774465b96ef9 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 4 Sep 2015 15:13:53 -0700 Subject: [PATCH] YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat. (Hong Zhiguo via wangda) --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 5 + .../src/main/resources/yarn-default.xml | 6 + .../resourcemanager/NodesListManager.java | 142 +++++++++++++++++- .../rmapp/TestNodesListManager.java | 102 +++++++++++++ 5 files changed, 255 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 662106bf085..98cc98f3ffe 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -813,6 +813,9 @@ Release 2.8.0 - UNRELEASED YARN-4073. Removed unused ApplicationACLsManager in ContainerManagerImpl constructor. (Naganarasimha G R via rohithsharmaks) + YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat. + (Hong Zhiguo via wangda) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a18ef7c2a23..5e1bab281f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -746,6 +746,11 @@ public class YarnConfiguration extends Configuration { + "proxy-user-privileges.enabled"; public static final boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false; + /** The expiry interval for node IP caching. -1 disables the caching */ + public static final String RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = RM_PREFIX + + "node-ip-cache.expiry-interval-secs"; + public static final int DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = -1; + /** * How many diagnostics/failure messages can be saved in RM for * log aggregation. It also defines the number of diagnostics/failure diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 62ba599f82d..436bfb04e63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -272,6 +272,12 @@ + + The expiry interval for node IP caching. -1 disables the caching + yarn.resourcemanager.node-ip-cache.expiry-interval-secs + -1 + + Number of threads to handle resource tracker calls. yarn.resourcemanager.resource-tracker.client.thread-count diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index b9c76fbe78b..abea85e908d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -24,13 +24,18 @@ import java.util.Collections; import java.util.HashSet; import java.util.Map.Entry; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.Map; +import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -46,9 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; @SuppressWarnings("unchecked") -public class NodesListManager extends AbstractService implements +public class NodesListManager extends CompositeService implements EventHandler { private static final Log LOG = LogFactory.getLog(NodesListManager.class); @@ -63,6 +70,8 @@ public class NodesListManager extends AbstractService implements private String includesFile; private String excludesFile; + private Resolver resolver; + public NodesListManager(RMContext rmContext) { super(NodesListManager.class.getName()); this.rmContext = rmContext; @@ -73,6 +82,16 @@ public class NodesListManager extends AbstractService implements this.conf = conf; + int nodeIpCacheTimeout = conf.getInt( + YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS, + YarnConfiguration.DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS); + if (nodeIpCacheTimeout <= 0) { + resolver = new DirectResolver(); + } else { + resolver = new CachedResolver(new SystemClock(), nodeIpCacheTimeout); + addIfService(resolver); + } + // Read the hosts/exclude files to restrict access to the RM try { this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, @@ -148,17 +167,129 @@ public class NodesListManager extends AbstractService implements ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size()); } + @VisibleForTesting + public Resolver getResolver() { + return resolver; + } + + @VisibleForTesting + public interface Resolver { + // try to resolve hostName to IP address, fallback to hostName if failed + String resolve(String hostName); + } + + @VisibleForTesting + public static class DirectResolver implements Resolver { + @Override + public String resolve(String hostName) { + return NetUtils.normalizeHostName(hostName); + } + } + + @VisibleForTesting + public static class CachedResolver extends AbstractService + implements Resolver { + private static class CacheEntry { + public String ip; + public long resolveTime; + public CacheEntry(String ip, long resolveTime) { + this.ip = ip; + this.resolveTime = resolveTime; + } + } + private Map cache = + new ConcurrentHashMap(); + private int expiryIntervalMs; + private int checkIntervalMs; + private final Clock clock; + private Timer checkingTimer; + private TimerTask expireChecker = new ExpireChecker(); + + public CachedResolver(Clock clock, int expiryIntervalSecs) { + super("NodesListManager.CachedResolver"); + this.clock = clock; + this.expiryIntervalMs = expiryIntervalSecs * 1000; + checkIntervalMs = expiryIntervalMs/3; + checkingTimer = new Timer( + "Timer-NodesListManager.CachedResolver.ExpireChecker", true); + } + + @Override + protected void serviceStart() throws Exception { + checkingTimer.scheduleAtFixedRate( + expireChecker, checkIntervalMs, checkIntervalMs); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + checkingTimer.cancel(); + super.serviceStop(); + } + + @VisibleForTesting + public void addToCache(String hostName, String ip) { + cache.put(hostName, new CacheEntry(ip, clock.getTime())); + } + + public void removeFromCache(String hostName) { + cache.remove(hostName); + } + + private String reload(String hostName) { + String ip = NetUtils.normalizeHostName(hostName); + addToCache(hostName, ip); + return ip; + } + + @Override + public String resolve(String hostName) { + CacheEntry e = cache.get(hostName); + if (e != null) { + return e.ip; + } + return reload(hostName); + } + + @VisibleForTesting + public TimerTask getExpireChecker() { + return expireChecker; + } + + private class ExpireChecker extends TimerTask { + @Override + public void run() { + long currentTime = clock.getTime(); + Iterator> iterator = + cache.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (currentTime > + entry.getValue().resolveTime + + CachedResolver.this.expiryIntervalMs) { + iterator.remove(); + if (LOG.isDebugEnabled()) { + LOG.debug("[" + entry.getKey() + ":" + entry.getValue().ip + + "] Expired after " + + CachedResolver.this.expiryIntervalMs / 1000 + " secs"); + } + } + } + } + } + } + public boolean isValidNode(String hostName) { + String ip = resolver.resolve(hostName); synchronized (hostsReader) { Set hostsList = hostsReader.getHosts(); Set excludeList = hostsReader.getExcludedHosts(); - String ip = NetUtils.normalizeHostName(hostName); return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList .contains(ip)) && !(excludeList.contains(hostName) || excludeList.contains(ip)); } } - + /** * Provides the currently unusable nodes. Copies it into provided collection. * @param unUsableNodes @@ -207,6 +338,11 @@ public class NodesListManager extends AbstractService implements default: LOG.error("Ignoring invalid eventtype " + event.getType()); } + // remove the cache of normalized hostname if enabled + if (resolver instanceof CachedResolver) { + ((CachedResolver)resolver).removeFromCache( + eventNode.getNodeID().getHost()); + } } private void disableHostsFileReader(Exception ex) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java index 5330976480f..2f57dbf9329 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java @@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.util.ControlledClock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -130,6 +132,106 @@ public class TestNodesListManager { } + @Test + public void testCachedResolver() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ControlledClock clock = new ControlledClock(new SystemClock()); + clock.setTime(0); + final int CACHE_EXPIRY_INTERVAL_SECS = 30; + NodesListManager.CachedResolver resolver = + new NodesListManager.CachedResolver(clock, CACHE_EXPIRY_INTERVAL_SECS); + resolver.init(new YarnConfiguration()); + resolver.start(); + resolver.addToCache("testCachedResolverHost1", "1.1.1.1"); + Assert.assertEquals("1.1.1.1", + resolver.resolve("testCachedResolverHost1")); + + resolver.addToCache("testCachedResolverHost2", "1.1.1.2"); + Assert.assertEquals("1.1.1.1", + resolver.resolve("testCachedResolverHost1")); + Assert.assertEquals("1.1.1.2", + resolver.resolve("testCachedResolverHost2")); + + // test removeFromCache + resolver.removeFromCache("testCachedResolverHost1"); + Assert.assertNotEquals("1.1.1.1", + resolver.resolve("testCachedResolverHost1")); + Assert.assertEquals("1.1.1.2", + resolver.resolve("testCachedResolverHost2")); + + // test expiry + clock.tickMsec(CACHE_EXPIRY_INTERVAL_SECS * 1000 + 1); + resolver.getExpireChecker().run(); + Assert.assertNotEquals("1.1.1.1", + resolver.resolve("testCachedResolverHost1")); + Assert.assertNotEquals("1.1.1.2", + resolver.resolve("testCachedResolverHost2")); + } + + @Test + public void testDefaultResolver() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + + YarnConfiguration conf = new YarnConfiguration(); + + MockRM rm = new MockRM(conf); + rm.init(conf); + NodesListManager nodesListManager = rm.getNodesListManager(); + + NodesListManager.Resolver resolver = nodesListManager.getResolver(); + Assert.assertTrue("default resolver should be DirectResolver", + resolver instanceof NodesListManager.DirectResolver); + } + + @Test + public void testCachedResolverWithEvent() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS, 30); + + MockRM rm = new MockRM(conf); + rm.init(conf); + NodesListManager nodesListManager = rm.getNodesListManager(); + nodesListManager.init(conf); + nodesListManager.start(); + + NodesListManager.CachedResolver resolver = + (NodesListManager.CachedResolver)nodesListManager.getResolver(); + + resolver.addToCache("testCachedResolverHost1", "1.1.1.1"); + resolver.addToCache("testCachedResolverHost2", "1.1.1.2"); + Assert.assertEquals("1.1.1.1", + resolver.resolve("testCachedResolverHost1")); + Assert.assertEquals("1.1.1.2", + resolver.resolve("testCachedResolverHost2")); + + RMNode rmnode1 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8), + 1, "testCachedResolverHost1", 1234); + RMNode rmnode2 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8), + 1, "testCachedResolverHost2", 1234); + + nodesListManager.handle( + new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE, + rmnode1)); + Assert.assertNotEquals("1.1.1.1", + resolver.resolve("testCachedResolverHost1")); + Assert.assertEquals("1.1.1.2", + resolver.resolve("testCachedResolverHost2")); + + nodesListManager.handle( + new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE, + rmnode2)); + Assert.assertNotEquals("1.1.1.1", + resolver.resolve("testCachedResolverHost1")); + Assert.assertNotEquals("1.1.1.2", + resolver.resolve("testCachedResolverHost2")); + + } + /* * Create dispatcher object */