YARN-2314. Disable ContainerManagementProtocolProxy cache by default to prevent creating thousands of threads in a large cluster. Contributed by Jason Lowe

(cherry picked from commit f44cf99599)
This commit is contained in:
Jian He 2014-10-24 23:05:16 -07:00
parent 96a6e02d16
commit 9bd149978d
4 changed files with 115 additions and 43 deletions

View File

@ -716,6 +716,10 @@ Release 2.6.0 - UNRELEASED
to contact with AM before AM actually receives the ClientToAMTokenMasterKey. to contact with AM before AM actually receives the ClientToAMTokenMasterKey.
(Jason Lowe via jianhe) (Jason Lowe via jianhe)
YARN-2314. Disable ContainerManagementProtocolProxy cache by default to
prevent creating thousands of threads in a large cluster. (Jason Lowe via
jianhe)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -70,10 +70,18 @@ public class YarnConfiguration extends Configuration {
public static final int APPLICATION_MAX_TAG_LENGTH = 100; public static final int APPLICATION_MAX_TAG_LENGTH = 100;
static { static {
addDeprecatedKeys();
Configuration.addDefaultResource(YARN_DEFAULT_CONFIGURATION_FILE); Configuration.addDefaultResource(YARN_DEFAULT_CONFIGURATION_FILE);
Configuration.addDefaultResource(YARN_SITE_CONFIGURATION_FILE); Configuration.addDefaultResource(YARN_SITE_CONFIGURATION_FILE);
} }
private static void addDeprecatedKeys() {
Configuration.addDeprecations(new DeprecationDelta[] {
new DeprecationDelta("yarn.client.max-nodemanagers-proxies",
NM_CLIENT_MAX_NM_PROXIES)
});
}
//Configurations //Configurations
public static final String YARN_PREFIX = "yarn."; public static final String YARN_PREFIX = "yarn.";
@ -1446,21 +1454,27 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE = 500; public static final int DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE = 500;
/** /**
* Maximum number of proxy connections for node manager. It should always be * Maximum number of proxy connections to cache for node managers. If set
* more than 1. NMClient and MRAppMaster will use this to cache connection * to a value greater than zero then the cache is enabled and the NMClient
* with node manager. There will be at max one connection per node manager. * and MRAppMaster will cache the specified number of node manager proxies.
* Ex. configuring it to a value of 5 will make sure that client will at * There will be at max one proxy per node manager. Ex. configuring it to a
* max have 5 connections cached with 5 different node managers. These * value of 5 will make sure that client will at max have 5 proxies cached
* connections will be timed out if idle for more than system wide idle * with 5 different node managers. These connections for these proxies will
* timeout period. The token if used for authentication then it will be used * be timed out if idle for more than the system wide idle timeout period.
* only at connection creation time. If new token is received then earlier * Note that this could cause issues on large clusters as many connections
* connection should be closed in order to use newer token. * could linger simultaneously and lead to a large number of connection
* Note: {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE} * threads. The token used for authentication will be used only at
* are related to each other. * connection creation time. If a new token is received then the earlier
* connection should be closed in order to use the new token. This and
* {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE} are related
* and should be in sync (no need for them to be equal).
* If the value of this property is zero then the connection cache is
* disabled and connections will use a zero idle timeout to prevent too
* many connection threads on large clusters.
*/ */
public static final String NM_CLIENT_MAX_NM_PROXIES = public static final String NM_CLIENT_MAX_NM_PROXIES =
YARN_PREFIX + "client.max-nodemanagers-proxies"; YARN_PREFIX + "client.max-cached-nodemanagers-proxies";
public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 500; public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 0;
/** Max time to wait to establish a connection to NM */ /** Max time to wait to establish a connection to NM */
public static final String CLIENT_NM_CONNECT_MAX_WAIT_MS = public static final String CLIENT_NM_CONNECT_MAX_WAIT_MS =

View File

@ -20,14 +20,17 @@ package org.apache.hadoop.yarn.client.api.impl;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@ -53,7 +56,7 @@ public class ContainerManagementProtocolProxy {
static final Log LOG = LogFactory.getLog(ContainerManagementProtocolProxy.class); static final Log LOG = LogFactory.getLog(ContainerManagementProtocolProxy.class);
private final int maxConnectedNMs; private final int maxConnectedNMs;
private final LinkedHashMap<String, ContainerManagementProtocolProxyData> cmProxy; private final Map<String, ContainerManagementProtocolProxyData> cmProxy;
private final Configuration conf; private final Configuration conf;
private final YarnRPC rpc; private final YarnRPC rpc;
private NMTokenCache nmTokenCache; private NMTokenCache nmTokenCache;
@ -70,16 +73,25 @@ public class ContainerManagementProtocolProxy {
maxConnectedNMs = maxConnectedNMs =
conf.getInt(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES, conf.getInt(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES,
YarnConfiguration.DEFAULT_NM_CLIENT_MAX_NM_PROXIES); YarnConfiguration.DEFAULT_NM_CLIENT_MAX_NM_PROXIES);
if (maxConnectedNMs < 1) { if (maxConnectedNMs < 0) {
throw new YarnRuntimeException( throw new YarnRuntimeException(
YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES
+ " (" + maxConnectedNMs + ") can not be less than 1."); + " (" + maxConnectedNMs + ") can not be less than 0.");
} }
LOG.info(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES + " : " LOG.info(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES + " : "
+ maxConnectedNMs); + maxConnectedNMs);
if (maxConnectedNMs > 0) {
cmProxy = cmProxy =
new LinkedHashMap<String, ContainerManagementProtocolProxyData>(); new LinkedHashMap<String, ContainerManagementProtocolProxyData>();
} else {
cmProxy = Collections.emptyMap();
// Connections are not being cached so ensure connections close quickly
// to avoid creating thousands of RPC client threads on large clusters.
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
}
rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
} }
@ -117,13 +129,9 @@ public class ContainerManagementProtocolProxy {
proxy = proxy =
new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr, new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
containerId, nmTokenCache.getToken(containerManagerBindAddr)); containerId, nmTokenCache.getToken(containerManagerBindAddr));
if (cmProxy.size() > maxConnectedNMs) { if (maxConnectedNMs > 0) {
// Number of existing proxy exceed the limit. addProxyToCache(containerManagerBindAddr, proxy);
String cmAddr = cmProxy.keySet().iterator().next();
removeProxy(cmProxy.get(cmAddr));
} }
cmProxy.put(containerManagerBindAddr, proxy);
} }
// This is to track active users of this proxy. // This is to track active users of this proxy.
proxy.activeCallers++; proxy.activeCallers++;
@ -132,14 +140,51 @@ public class ContainerManagementProtocolProxy {
return proxy; return proxy;
} }
private void addProxyToCache(String containerManagerBindAddr,
ContainerManagementProtocolProxyData proxy) {
while (cmProxy.size() >= maxConnectedNMs) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cleaning up the proxy cache, size=" + cmProxy.size()
+ " max=" + maxConnectedNMs);
}
boolean removedProxy = false;
for (ContainerManagementProtocolProxyData otherProxy : cmProxy.values()) {
removedProxy = removeProxy(otherProxy);
if (removedProxy) {
break;
}
}
if (!removedProxy) {
// all of the proxies are currently in use and already scheduled
// for removal, so we need to wait until at least one of them closes
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
if (maxConnectedNMs > 0) {
cmProxy.put(containerManagerBindAddr, proxy);
}
}
private void updateLRUCache(String containerManagerBindAddr) { private void updateLRUCache(String containerManagerBindAddr) {
if (maxConnectedNMs > 0) {
ContainerManagementProtocolProxyData proxy = ContainerManagementProtocolProxyData proxy =
cmProxy.remove(containerManagerBindAddr); cmProxy.remove(containerManagerBindAddr);
cmProxy.put(containerManagerBindAddr, proxy); cmProxy.put(containerManagerBindAddr, proxy);
} }
}
public synchronized void mayBeCloseProxy( public synchronized void mayBeCloseProxy(
ContainerManagementProtocolProxyData proxy) { ContainerManagementProtocolProxyData proxy) {
tryCloseProxy(proxy);
}
private boolean tryCloseProxy(
ContainerManagementProtocolProxyData proxy) {
proxy.activeCallers--; proxy.activeCallers--;
if (proxy.scheduledForClose && proxy.activeCallers < 0) { if (proxy.scheduledForClose && proxy.activeCallers < 0) {
LOG.info("Closing proxy : " + proxy.containerManagerBindAddr); LOG.info("Closing proxy : " + proxy.containerManagerBindAddr);
@ -149,15 +194,18 @@ public class ContainerManagementProtocolProxy {
} finally { } finally {
this.notifyAll(); this.notifyAll();
} }
return true;
} }
return false;
} }
private synchronized void removeProxy( private synchronized boolean removeProxy(
ContainerManagementProtocolProxyData proxy) { ContainerManagementProtocolProxyData proxy) {
if (!proxy.scheduledForClose) { if (!proxy.scheduledForClose) {
proxy.scheduledForClose = true; proxy.scheduledForClose = true;
mayBeCloseProxy(proxy); return tryCloseProxy(proxy);
} }
return false;
} }
public synchronized void stopAllProxies() { public synchronized void stopAllProxies() {

View File

@ -1095,20 +1095,26 @@
<property> <property>
<description> <description>
Maximum number of proxy connections for node manager. It should always be Maximum number of proxy connections to cache for node managers. If set
more than 1. NMClient and MRAppMaster will use this to cache connection to a value greater than zero then the cache is enabled and the NMClient
with node manager. There will be at max one connection per node manager. and MRAppMaster will cache the specified number of node manager proxies.
Ex. configuring it to a value of 5 will make sure that client will at There will be at max one proxy per node manager. Ex. configuring it to a
max have 5 connections cached with 5 different node managers. These value of 5 will make sure that client will at max have 5 proxies cached
connections will be timed out if idle for more than system wide idle with 5 different node managers. These connections for these proxies will
timeout period. The token if used for authentication then it will be used be timed out if idle for more than the system wide idle timeout period.
only at connection creation time. If new token is received then earlier Note that this could cause issues on large clusters as many connections
connection should be closed in order to use newer token. This and could linger simultaneously and lead to a large number of connection
threads. The token used for authentication will be used only at
connection creation time. If a new token is received then the earlier
connection should be closed in order to use the new token. This and
(yarn.client.nodemanager-client-async.thread-pool-max-size) are related (yarn.client.nodemanager-client-async.thread-pool-max-size) are related
and should be sync (no need for them to be equal). and should be in sync (no need for them to be equal).
If the value of this property is zero then the connection cache is
disabled and connections will use a zero idle timeout to prevent too
many connection threads on large clusters.
</description> </description>
<name>yarn.client.max-nodemanagers-proxies</name> <name>yarn.client.max-cached-nodemanagers-proxies</name>
<value>500</value> <value>0</value>
</property> </property>
<property> <property>