From cfbea52bd9d7fa0d4708c9fb8aa981f0ffb4c1f5 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 9 Nov 2011 13:45:02 +0000 Subject: [PATCH] MAPREDUCE-3333. Fixed bugs in ContainerLauncher of MR AppMaster due to which per-container connections to NodeManager were lingering long enough to hit the ulimits on number of processes. (vinodkv) svn merge -c r1199751 --ignore-ancestry ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1199757 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../v2/app/launcher/ContainerLauncher.java | 2 +- .../app/launcher/ContainerLauncherImpl.java | 92 +++++++++---------- .../client/ContainerManagerPBClientImpl.java | 6 ++ .../yarn/factories/RpcClientFactory.java | 5 +- .../impl/pb/RpcClientFactoryPBImpl.java | 24 ++++- .../hadoop/yarn/ipc/HadoopYarnProtoRPC.java | 10 +- .../apache/hadoop/yarn/ipc/HadoopYarnRPC.java | 7 +- .../yarn/ipc/ProtoOverHadoopRpcEngine.java | 1 + .../org/apache/hadoop/yarn/ipc/YarnRPC.java | 4 +- 10 files changed, 94 insertions(+), 61 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5b4b3880c11..c0df4e47c13 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -50,6 +50,10 @@ Release 0.23.1 - Unreleased MAPREDUCE-3368. Fixed test compilation. (Hitesh Shah via vinodkv) + MAPREDUCE-3333. Fixed bugs in ContainerLauncher of MR AppMaster due to + which per-container connections to NodeManager were lingering long enough + to hit the ulimits on number of processes. (vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java index 12ac363875b..d9f18b3adbf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java @@ -36,5 +36,5 @@ public interface ContainerLauncher /** * Maximum of 1 minute timeout for a Node to react to the command */ - static final int DEFAULT_NM__COMMAND_TIMEOUT = 60000; + static final int DEFAULT_NM_COMMAND_TIMEOUT = 60000; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 083ed58d9d3..62ceae99f97 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -21,9 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.launcher; import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -36,6 +34,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -59,11 +58,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerToken; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.Records; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -84,29 +82,28 @@ public class ContainerLauncherImpl extends AbstractService implements private Thread eventHandlingThread; private BlockingQueue eventQueue = new LinkedBlockingQueue(); - private RecordFactory recordFactory; + YarnRPC rpc; // To track numNodes. Set allNodes = new HashSet(); - // have a cache/map of proxies so as to avoid creating multiple RPC - // client connection objects for the same container. - private Map clientCache - = new HashMap(); - public ContainerLauncherImpl(AppContext context) { super(ContainerLauncherImpl.class.getName()); this.context = context; } @Override - public synchronized void init(Configuration conf) { - this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); + public synchronized void init(Configuration config) { + Configuration conf = new Configuration(config); + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, + 0); this.limitOnPoolSize = conf.getInt( MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, - ContainerLauncher.DEFAULT_NM__COMMAND_TIMEOUT); + ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT); + this.rpc = YarnRPC.create(conf); super.init(conf); } @@ -180,37 +177,28 @@ public class ContainerLauncherImpl extends AbstractService implements UserGroupInformation user = UserGroupInformation.getCurrentUser(); - synchronized (this.clientCache) { + this.allNodes.add(containerManagerBindAddr); - if (this.clientCache.containsKey(containerID)) { - return this.clientCache.get(containerID); - } - - this.allNodes.add(containerManagerBindAddr); - - if (UserGroupInformation.isSecurityEnabled()) { - Token token = new Token( - containerToken.getIdentifier().array(), containerToken - .getPassword().array(), new Text(containerToken.getKind()), - new Text(containerToken.getService())); - // the user in createRemoteUser in this context has to be ContainerID - user = UserGroupInformation.createRemoteUser(containerID.toString()); - user.addToken(token); - } - - ContainerManager proxy = user - .doAs(new PrivilegedAction() { - @Override - public ContainerManager run() { - YarnRPC rpc = YarnRPC.create(getConfig()); - return (ContainerManager) rpc.getProxy(ContainerManager.class, - NetUtils.createSocketAddr(containerManagerBindAddr), - getConfig()); - } - }); - this.clientCache.put(containerID, proxy); - return proxy; + if (UserGroupInformation.isSecurityEnabled()) { + Token token = new Token( + containerToken.getIdentifier().array(), containerToken + .getPassword().array(), new Text(containerToken.getKind()), + new Text(containerToken.getService())); + // the user in createRemoteUser in this context has to be ContainerID + user = UserGroupInformation.createRemoteUser(containerID.toString()); + user.addToken(token); } + + ContainerManager proxy = user + .doAs(new PrivilegedAction() { + @Override + public ContainerManager run() { + return (ContainerManager) rpc.getProxy(ContainerManager.class, + NetUtils.createSocketAddr(containerManagerBindAddr), + getConfig()); + } + }); + return proxy; } private static class CommandTimer extends TimerTask { @@ -257,6 +245,8 @@ public class ContainerLauncherImpl extends AbstractService implements Timer timer = new Timer(true); + ContainerManager proxy = null; + switch(event.getType()) { case CONTAINER_REMOTE_LAUNCH: @@ -267,8 +257,8 @@ public class ContainerLauncherImpl extends AbstractService implements timer.schedule(new CommandTimer(Thread.currentThread(), event), nmTimeOut); - ContainerManager proxy = getCMProxy(containerID, - containerManagerBindAddr, containerToken); + proxy = getCMProxy(containerID, containerManagerBindAddr, + containerToken); // Interruped during getProxy, but that didn't throw exception if (Thread.currentThread().isInterrupted()) { @@ -284,8 +274,8 @@ public class ContainerLauncherImpl extends AbstractService implements launchEvent.getContainer(); // Now launch the actual container - StartContainerRequest startRequest = recordFactory - .newRecordInstance(StartContainerRequest.class); + StartContainerRequest startRequest = Records + .newRecord(StartContainerRequest.class); startRequest.setContainerLaunchContext(containerLaunchContext); StartContainerResponse response = proxy.startContainer(startRequest); @@ -324,6 +314,7 @@ public class ContainerLauncherImpl extends AbstractService implements sendContainerLaunchFailedMsg(taskAttemptID, message); } finally { timer.cancel(); + ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); } break; @@ -343,8 +334,8 @@ public class ContainerLauncherImpl extends AbstractService implements timer.schedule(new CommandTimer(Thread.currentThread(), event), nmTimeOut); - ContainerManager proxy = getCMProxy(containerID, - containerManagerBindAddr, containerToken); + proxy = getCMProxy(containerID, containerManagerBindAddr, + containerToken); if (Thread.currentThread().isInterrupted()) { // The timer cancelled the command in the mean while. No need to @@ -356,8 +347,8 @@ public class ContainerLauncherImpl extends AbstractService implements // TODO:check whether container is launched // kill the remote container if already launched - StopContainerRequest stopRequest = recordFactory - .newRecordInstance(StopContainerRequest.class); + StopContainerRequest stopRequest = Records + .newRecord(StopContainerRequest.class); stopRequest.setContainerId(event.getContainerID()); proxy.stopContainer(stopRequest); } @@ -373,6 +364,7 @@ public class ContainerLauncherImpl extends AbstractService implements LOG.warn(message); } finally { timer.cancel(); + ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); } // after killing, send killed event to taskattempt diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java index 07f8c23d7a0..34ad56073e4 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java @@ -56,6 +56,12 @@ public class ContainerManagerPBClientImpl implements ContainerManager { ContainerManagerService.BlockingInterface.class, clientVersion, addr, conf); } + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + @Override public GetContainerStatusResponse getContainerStatus( GetContainerStatusRequest request) throws YarnRemoteException { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RpcClientFactory.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RpcClientFactory.java index 49aac3f3c7a..191091f9688 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RpcClientFactory.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RpcClientFactory.java @@ -25,6 +25,9 @@ import org.apache.hadoop.yarn.YarnException; public interface RpcClientFactory { - public Object getClient(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws YarnException; + public Object getClient(Class protocol, long clientVersion, + InetSocketAddress addr, Configuration conf) throws YarnException; + + public void stopClient(Object proxy); } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java index e6567ce3188..1cb5fa0acdf 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java @@ -20,16 +20,22 @@ package org.apache.hadoop.yarn.factories.impl.pb; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.factories.RpcClientFactory; public class RpcClientFactoryPBImpl implements RpcClientFactory { + private static final Log LOG = LogFactory + .getLog(RpcClientFactoryPBImpl.class); + private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb.client"; private static final String PB_IMPL_CLASS_SUFFIX = "PBClientImpl"; @@ -74,9 +80,21 @@ public class RpcClientFactoryPBImpl implements RpcClientFactory { throw new YarnException(e); } } - - - + + @Override + public void stopClient(Object proxy) { + try { + Method closeMethod = proxy.getClass().getMethod("close"); + closeMethod.invoke(proxy); + } catch (InvocationTargetException e) { + throw new YarnException(e); + } catch (Exception e) { + LOG.error("Cannot call close method due to Exception. " + + "Ignoring.", e); + throw new YarnException(e); + } + } + private String getPBImplClassName(Class clazz) { String srcPackagePart = getPackageName(clazz); String srcClassName = getClassName(clazz); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java index b40ecad235b..ba1dc2ff6bf 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java @@ -20,15 +20,12 @@ package org.apache.hadoop.yarn.ipc; import java.net.InetSocketAddress; -import org.apache.hadoop.ipc.Server; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RpcFactoryProvider; /** @@ -50,6 +47,11 @@ public class HadoopYarnProtoRPC extends YarnRPC { addr, conf); } + @Override + public void stopProxy(Object proxy, Configuration conf) { + RpcFactoryProvider.getClientFactory(conf).stopClient(proxy); + } + @Override public Server getServer(Class protocol, Object instance, InetSocketAddress addr, Configuration conf, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnRPC.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnRPC.java index 3a5146ae1f0..838693a8f46 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnRPC.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnRPC.java @@ -21,12 +21,12 @@ package org.apache.hadoop.yarn.ipc; import java.io.IOException; import java.net.InetSocketAddress; -import org.apache.hadoop.ipc.Server; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.AvroSpecificRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.YarnException; @@ -54,6 +54,11 @@ public class HadoopYarnRPC extends YarnRPC { } } + @Override + public void stopProxy(Object proxy, Configuration conf) { + RPC.stopProxy(proxy); + } + @Override public Server getServer(Class protocol, Object instance, InetSocketAddress addr, Configuration conf, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java index c94a31871ad..936899fa567 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java @@ -164,6 +164,7 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine { return actualReturnMessage; } + @Override public void close() throws IOException { if (!isClosed) { isClosed = true; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java index c6de9ae1905..e4e61ddfaa7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java @@ -20,10 +20,10 @@ package org.apache.hadoop.yarn.ipc; import java.net.InetSocketAddress; -import org.apache.hadoop.ipc.Server; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.YarnException; @@ -38,6 +38,8 @@ public abstract class YarnRPC { public abstract Object getProxy(Class protocol, InetSocketAddress addr, Configuration conf); + public abstract void stopProxy(Object proxy, Configuration conf); + public abstract Server getServer(Class protocol, Object instance, InetSocketAddress addr, Configuration conf, SecretManager secretManager,