diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5b4b3880c11..c0df4e47c13 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -50,6 +50,10 @@ Release 0.23.1 - Unreleased MAPREDUCE-3368. Fixed test compilation. (Hitesh Shah via vinodkv) + MAPREDUCE-3333. Fixed bugs in ContainerLauncher of MR AppMaster due to + which per-container connections to NodeManager were lingering long enough + to hit the ulimits on number of processes. (vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java index 12ac363875b..d9f18b3adbf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java @@ -36,5 +36,5 @@ enum EventType { /** * Maximum of 1 minute timeout for a Node to react to the command */ - static final int DEFAULT_NM__COMMAND_TIMEOUT = 60000; + static final int DEFAULT_NM_COMMAND_TIMEOUT = 60000; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 083ed58d9d3..62ceae99f97 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -21,9 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -36,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -59,11 +58,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerToken; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.Records; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -84,29 +82,28 @@ public class ContainerLauncherImpl extends AbstractService implements private Thread eventHandlingThread; private BlockingQueue eventQueue = new LinkedBlockingQueue(); - private RecordFactory recordFactory; + YarnRPC rpc; // To track numNodes. Set allNodes = new HashSet(); - // have a cache/map of proxies so as to avoid creating multiple RPC - // client connection objects for the same container. - private Map clientCache - = new HashMap(); - public ContainerLauncherImpl(AppContext context) { super(ContainerLauncherImpl.class.getName()); this.context = context; } @Override - public synchronized void init(Configuration conf) { - this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); + public synchronized void init(Configuration config) { + Configuration conf = new Configuration(config); + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, + 0); this.limitOnPoolSize = conf.getInt( MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, - ContainerLauncher.DEFAULT_NM__COMMAND_TIMEOUT); + ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT); + this.rpc = YarnRPC.create(conf); super.init(conf); } @@ -180,37 +177,28 @@ protected ContainerManager getCMProxy(ContainerId containerID, UserGroupInformation user = UserGroupInformation.getCurrentUser(); - synchronized (this.clientCache) { + this.allNodes.add(containerManagerBindAddr); - if (this.clientCache.containsKey(containerID)) { - return this.clientCache.get(containerID); - } - - this.allNodes.add(containerManagerBindAddr); - - if (UserGroupInformation.isSecurityEnabled()) { - Token token = new Token( - containerToken.getIdentifier().array(), containerToken - .getPassword().array(), new Text(containerToken.getKind()), - new Text(containerToken.getService())); - // the user in createRemoteUser in this context has to be ContainerID - user = UserGroupInformation.createRemoteUser(containerID.toString()); - user.addToken(token); - } - - ContainerManager proxy = user - .doAs(new PrivilegedAction() { - @Override - public ContainerManager run() { - YarnRPC rpc = YarnRPC.create(getConfig()); - return (ContainerManager) rpc.getProxy(ContainerManager.class, - NetUtils.createSocketAddr(containerManagerBindAddr), - getConfig()); - } - }); - this.clientCache.put(containerID, proxy); - return proxy; + if (UserGroupInformation.isSecurityEnabled()) { + Token token = new Token( + containerToken.getIdentifier().array(), containerToken + .getPassword().array(), new Text(containerToken.getKind()), + new Text(containerToken.getService())); + // the user in createRemoteUser in this context has to be ContainerID + user = UserGroupInformation.createRemoteUser(containerID.toString()); + user.addToken(token); } + + ContainerManager proxy = user + .doAs(new PrivilegedAction() { + @Override + public ContainerManager run() { + return (ContainerManager) rpc.getProxy(ContainerManager.class, + NetUtils.createSocketAddr(containerManagerBindAddr), + getConfig()); + } + }); + return proxy; } private static class CommandTimer extends TimerTask { @@ -257,6 +245,8 @@ public void run() { Timer timer = new Timer(true); + ContainerManager proxy = null; + switch(event.getType()) { case CONTAINER_REMOTE_LAUNCH: @@ -267,8 +257,8 @@ public void run() { timer.schedule(new CommandTimer(Thread.currentThread(), event), nmTimeOut); - ContainerManager proxy = getCMProxy(containerID, - containerManagerBindAddr, containerToken); + proxy = getCMProxy(containerID, containerManagerBindAddr, + containerToken); // Interruped during getProxy, but that didn't throw exception if (Thread.currentThread().isInterrupted()) { @@ -284,8 +274,8 @@ public void run() { launchEvent.getContainer(); // Now launch the actual container - StartContainerRequest startRequest = recordFactory - .newRecordInstance(StartContainerRequest.class); + StartContainerRequest startRequest = Records + .newRecord(StartContainerRequest.class); startRequest.setContainerLaunchContext(containerLaunchContext); StartContainerResponse response = proxy.startContainer(startRequest); @@ -324,6 +314,7 @@ public void run() { sendContainerLaunchFailedMsg(taskAttemptID, message); } finally { timer.cancel(); + ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); } break; @@ -343,8 +334,8 @@ public void run() { timer.schedule(new CommandTimer(Thread.currentThread(), event), nmTimeOut); - ContainerManager proxy = getCMProxy(containerID, - containerManagerBindAddr, containerToken); + proxy = getCMProxy(containerID, containerManagerBindAddr, + containerToken); if (Thread.currentThread().isInterrupted()) { // The timer cancelled the command in the mean while. No need to @@ -356,8 +347,8 @@ public void run() { // TODO:check whether container is launched // kill the remote container if already launched - StopContainerRequest stopRequest = recordFactory - .newRecordInstance(StopContainerRequest.class); + StopContainerRequest stopRequest = Records + .newRecord(StopContainerRequest.class); stopRequest.setContainerId(event.getContainerID()); proxy.stopContainer(stopRequest); } @@ -373,6 +364,7 @@ public void run() { LOG.warn(message); } finally { timer.cancel(); + ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); } // after killing, send killed event to taskattempt diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java index 07f8c23d7a0..34ad56073e4 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java @@ -56,6 +56,12 @@ public ContainerManagerPBClientImpl(long clientVersion, InetSocketAddress addr, ContainerManagerService.BlockingInterface.class, clientVersion, addr, conf); } + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + @Override public GetContainerStatusResponse getContainerStatus( GetContainerStatusRequest request) throws YarnRemoteException { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RpcClientFactory.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RpcClientFactory.java index 49aac3f3c7a..191091f9688 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RpcClientFactory.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RpcClientFactory.java @@ -25,6 +25,9 @@ public interface RpcClientFactory { - public Object getClient(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws YarnException; + public Object getClient(Class protocol, long clientVersion, + InetSocketAddress addr, Configuration conf) throws YarnException; + + public void stopClient(Object proxy); } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java index e6567ce3188..1cb5fa0acdf 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java @@ -20,16 +20,22 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.factories.RpcClientFactory; public class RpcClientFactoryPBImpl implements RpcClientFactory { + private static final Log LOG = LogFactory + .getLog(RpcClientFactoryPBImpl.class); + private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb.client"; private static final String PB_IMPL_CLASS_SUFFIX = "PBClientImpl"; @@ -74,9 +80,21 @@ public Object getClient(Class protocol, long clientVersion, InetSocketAddress throw new YarnException(e); } } - - - + + @Override + public void stopClient(Object proxy) { + try { + Method closeMethod = proxy.getClass().getMethod("close"); + closeMethod.invoke(proxy); + } catch (InvocationTargetException e) { + throw new YarnException(e); + } catch (Exception e) { + LOG.error("Cannot call close method due to Exception. " + + "Ignoring.", e); + throw new YarnException(e); + } + } + private String getPBImplClassName(Class clazz) { String srcPackagePart = getPackageName(clazz); String srcClassName = getClassName(clazz); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java index b40ecad235b..ba1dc2ff6bf 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java @@ -20,15 +20,12 @@ import java.net.InetSocketAddress; -import org.apache.hadoop.ipc.Server; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RpcFactoryProvider; /** @@ -50,6 +47,11 @@ public Object getProxy(Class protocol, InetSocketAddress addr, addr, conf); } + @Override + public void stopProxy(Object proxy, Configuration conf) { + RpcFactoryProvider.getClientFactory(conf).stopClient(proxy); + } + @Override public Server getServer(Class protocol, Object instance, InetSocketAddress addr, Configuration conf, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnRPC.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnRPC.java index 3a5146ae1f0..838693a8f46 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnRPC.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnRPC.java @@ -21,12 +21,12 @@ import java.io.IOException; import java.net.InetSocketAddress; -import org.apache.hadoop.ipc.Server; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.AvroSpecificRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.YarnException; @@ -54,6 +54,11 @@ public Object getProxy(Class protocol, InetSocketAddress addr, } } + @Override + public void stopProxy(Object proxy, Configuration conf) { + RPC.stopProxy(proxy); + } + @Override public Server getServer(Class protocol, Object instance, InetSocketAddress addr, Configuration conf, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java index c94a31871ad..936899fa567 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java @@ -164,6 +164,7 @@ public Object invoke(Object proxy, Method method, Object[] args) return actualReturnMessage; } + @Override public void close() throws IOException { if (!isClosed) { isClosed = true; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java index c6de9ae1905..e4e61ddfaa7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java @@ -20,10 +20,10 @@ import java.net.InetSocketAddress; -import org.apache.hadoop.ipc.Server; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.YarnException; @@ -38,6 +38,8 @@ public abstract class YarnRPC { public abstract Object getProxy(Class protocol, InetSocketAddress addr, Configuration conf); + public abstract void stopProxy(Object proxy, Configuration conf); + public abstract Server getServer(Class protocol, Object instance, InetSocketAddress addr, Configuration conf, SecretManager secretManager,