MAPREDUCE-3333. Fixed bugs in ContainerLauncher of MR AppMaster due to which per-container connections to NodeManager were lingering long enough to hit the ulimits on number of processes. (vinodkv)

svn merge -c r1199751 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1199757 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-11-09 13:45:02 +00:00
parent e430e1a921
commit cfbea52bd9
10 changed files with 94 additions and 61 deletions

View File

@ -50,6 +50,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3368. Fixed test compilation. (Hitesh Shah via vinodkv) MAPREDUCE-3368. Fixed test compilation. (Hitesh Shah via vinodkv)
MAPREDUCE-3333. Fixed bugs in ContainerLauncher of MR AppMaster due to
which per-container connections to NodeManager were lingering long enough
to hit the ulimits on number of processes. (vinodkv)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -36,5 +36,5 @@ public interface ContainerLauncher
/** /**
* Maximum of 1 minute timeout for a Node to react to the command * Maximum of 1 minute timeout for a Node to react to the command
*/ */
static final int DEFAULT_NM__COMMAND_TIMEOUT = 60000; static final int DEFAULT_NM_COMMAND_TIMEOUT = 60000;
} }

View File

@ -21,9 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.launcher;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
@ -36,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
@ -59,11 +58,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -84,29 +82,28 @@ public class ContainerLauncherImpl extends AbstractService implements
private Thread eventHandlingThread; private Thread eventHandlingThread;
private BlockingQueue<ContainerLauncherEvent> eventQueue = private BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>(); new LinkedBlockingQueue<ContainerLauncherEvent>();
private RecordFactory recordFactory; YarnRPC rpc;
// To track numNodes. // To track numNodes.
Set<String> allNodes = new HashSet<String>(); Set<String> allNodes = new HashSet<String>();
// have a cache/map of proxies so as to avoid creating multiple RPC
// client connection objects for the same container.
private Map<ContainerId, ContainerManager> clientCache
= new HashMap<ContainerId, ContainerManager>();
public ContainerLauncherImpl(AppContext context) { public ContainerLauncherImpl(AppContext context) {
super(ContainerLauncherImpl.class.getName()); super(ContainerLauncherImpl.class.getName());
this.context = context; this.context = context;
} }
@Override @Override
public synchronized void init(Configuration conf) { public synchronized void init(Configuration config) {
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); Configuration conf = new Configuration(config);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
this.limitOnPoolSize = conf.getInt( this.limitOnPoolSize = conf.getInt(
MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
ContainerLauncher.DEFAULT_NM__COMMAND_TIMEOUT); ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT);
this.rpc = YarnRPC.create(conf);
super.init(conf); super.init(conf);
} }
@ -180,37 +177,28 @@ public class ContainerLauncherImpl extends AbstractService implements
UserGroupInformation user = UserGroupInformation.getCurrentUser(); UserGroupInformation user = UserGroupInformation.getCurrentUser();
synchronized (this.clientCache) { this.allNodes.add(containerManagerBindAddr);
if (this.clientCache.containsKey(containerID)) { if (UserGroupInformation.isSecurityEnabled()) {
return this.clientCache.get(containerID); Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
} containerToken.getIdentifier().array(), containerToken
.getPassword().array(), new Text(containerToken.getKind()),
this.allNodes.add(containerManagerBindAddr); new Text(containerToken.getService()));
// the user in createRemoteUser in this context has to be ContainerID
if (UserGroupInformation.isSecurityEnabled()) { user = UserGroupInformation.createRemoteUser(containerID.toString());
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>( user.addToken(token);
containerToken.getIdentifier().array(), containerToken
.getPassword().array(), new Text(containerToken.getKind()),
new Text(containerToken.getService()));
// the user in createRemoteUser in this context has to be ContainerID
user = UserGroupInformation.createRemoteUser(containerID.toString());
user.addToken(token);
}
ContainerManager proxy = user
.doAs(new PrivilegedAction<ContainerManager>() {
@Override
public ContainerManager run() {
YarnRPC rpc = YarnRPC.create(getConfig());
return (ContainerManager) rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr(containerManagerBindAddr),
getConfig());
}
});
this.clientCache.put(containerID, proxy);
return proxy;
} }
ContainerManager proxy = user
.doAs(new PrivilegedAction<ContainerManager>() {
@Override
public ContainerManager run() {
return (ContainerManager) rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr(containerManagerBindAddr),
getConfig());
}
});
return proxy;
} }
private static class CommandTimer extends TimerTask { private static class CommandTimer extends TimerTask {
@ -257,6 +245,8 @@ public class ContainerLauncherImpl extends AbstractService implements
Timer timer = new Timer(true); Timer timer = new Timer(true);
ContainerManager proxy = null;
switch(event.getType()) { switch(event.getType()) {
case CONTAINER_REMOTE_LAUNCH: case CONTAINER_REMOTE_LAUNCH:
@ -267,8 +257,8 @@ public class ContainerLauncherImpl extends AbstractService implements
timer.schedule(new CommandTimer(Thread.currentThread(), event), timer.schedule(new CommandTimer(Thread.currentThread(), event),
nmTimeOut); nmTimeOut);
ContainerManager proxy = getCMProxy(containerID, proxy = getCMProxy(containerID, containerManagerBindAddr,
containerManagerBindAddr, containerToken); containerToken);
// Interruped during getProxy, but that didn't throw exception // Interruped during getProxy, but that didn't throw exception
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
@ -284,8 +274,8 @@ public class ContainerLauncherImpl extends AbstractService implements
launchEvent.getContainer(); launchEvent.getContainer();
// Now launch the actual container // Now launch the actual container
StartContainerRequest startRequest = recordFactory StartContainerRequest startRequest = Records
.newRecordInstance(StartContainerRequest.class); .newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext); startRequest.setContainerLaunchContext(containerLaunchContext);
StartContainerResponse response = proxy.startContainer(startRequest); StartContainerResponse response = proxy.startContainer(startRequest);
@ -324,6 +314,7 @@ public class ContainerLauncherImpl extends AbstractService implements
sendContainerLaunchFailedMsg(taskAttemptID, message); sendContainerLaunchFailedMsg(taskAttemptID, message);
} finally { } finally {
timer.cancel(); timer.cancel();
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
} }
break; break;
@ -343,8 +334,8 @@ public class ContainerLauncherImpl extends AbstractService implements
timer.schedule(new CommandTimer(Thread.currentThread(), event), timer.schedule(new CommandTimer(Thread.currentThread(), event),
nmTimeOut); nmTimeOut);
ContainerManager proxy = getCMProxy(containerID, proxy = getCMProxy(containerID, containerManagerBindAddr,
containerManagerBindAddr, containerToken); containerToken);
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
// The timer cancelled the command in the mean while. No need to // The timer cancelled the command in the mean while. No need to
@ -356,8 +347,8 @@ public class ContainerLauncherImpl extends AbstractService implements
// TODO:check whether container is launched // TODO:check whether container is launched
// kill the remote container if already launched // kill the remote container if already launched
StopContainerRequest stopRequest = recordFactory StopContainerRequest stopRequest = Records
.newRecordInstance(StopContainerRequest.class); .newRecord(StopContainerRequest.class);
stopRequest.setContainerId(event.getContainerID()); stopRequest.setContainerId(event.getContainerID());
proxy.stopContainer(stopRequest); proxy.stopContainer(stopRequest);
} }
@ -373,6 +364,7 @@ public class ContainerLauncherImpl extends AbstractService implements
LOG.warn(message); LOG.warn(message);
} finally { } finally {
timer.cancel(); timer.cancel();
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
} }
// after killing, send killed event to taskattempt // after killing, send killed event to taskattempt

View File

@ -56,6 +56,12 @@ public class ContainerManagerPBClientImpl implements ContainerManager {
ContainerManagerService.BlockingInterface.class, clientVersion, addr, conf); ContainerManagerService.BlockingInterface.class, clientVersion, addr, conf);
} }
public void close() {
if (this.proxy != null) {
RPC.stopProxy(this.proxy);
}
}
@Override @Override
public GetContainerStatusResponse getContainerStatus( public GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException { GetContainerStatusRequest request) throws YarnRemoteException {

View File

@ -25,6 +25,9 @@ import org.apache.hadoop.yarn.YarnException;
public interface RpcClientFactory { public interface RpcClientFactory {
public Object getClient(Class<?> protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws YarnException; public Object getClient(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf) throws YarnException;
public void stopClient(Object proxy);
} }

View File

@ -20,16 +20,22 @@ package org.apache.hadoop.yarn.factories.impl.pb;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.factories.RpcClientFactory; import org.apache.hadoop.yarn.factories.RpcClientFactory;
public class RpcClientFactoryPBImpl implements RpcClientFactory { public class RpcClientFactoryPBImpl implements RpcClientFactory {
private static final Log LOG = LogFactory
.getLog(RpcClientFactoryPBImpl.class);
private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb.client"; private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb.client";
private static final String PB_IMPL_CLASS_SUFFIX = "PBClientImpl"; private static final String PB_IMPL_CLASS_SUFFIX = "PBClientImpl";
@ -75,7 +81,19 @@ public class RpcClientFactoryPBImpl implements RpcClientFactory {
} }
} }
@Override
public void stopClient(Object proxy) {
try {
Method closeMethod = proxy.getClass().getMethod("close");
closeMethod.invoke(proxy);
} catch (InvocationTargetException e) {
throw new YarnException(e);
} catch (Exception e) {
LOG.error("Cannot call close method due to Exception. "
+ "Ignoring.", e);
throw new YarnException(e);
}
}
private String getPBImplClassName(Class<?> clazz) { private String getPBImplClassName(Class<?> clazz) {
String srcPackagePart = getPackageName(clazz); String srcPackagePart = getPackageName(clazz);

View File

@ -20,15 +20,12 @@ package org.apache.hadoop.yarn.ipc;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.ipc.Server;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RpcFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RpcFactoryProvider;
/** /**
@ -50,6 +47,11 @@ public class HadoopYarnProtoRPC extends YarnRPC {
addr, conf); addr, conf);
} }
@Override
public void stopProxy(Object proxy, Configuration conf) {
RpcFactoryProvider.getClientFactory(conf).stopClient(proxy);
}
@Override @Override
public Server getServer(Class protocol, Object instance, public Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,

View File

@ -21,12 +21,12 @@ package org.apache.hadoop.yarn.ipc;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.ipc.Server;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.AvroSpecificRpcEngine; import org.apache.hadoop.ipc.AvroSpecificRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
@ -54,6 +54,11 @@ public class HadoopYarnRPC extends YarnRPC {
} }
} }
@Override
public void stopProxy(Object proxy, Configuration conf) {
RPC.stopProxy(proxy);
}
@Override @Override
public Server getServer(Class protocol, Object instance, public Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,

View File

@ -164,6 +164,7 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
return actualReturnMessage; return actualReturnMessage;
} }
@Override
public void close() throws IOException { public void close() throws IOException {
if (!isClosed) { if (!isClosed) {
isClosed = true; isClosed = true;

View File

@ -20,10 +20,10 @@ package org.apache.hadoop.yarn.ipc;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.ipc.Server;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
@ -38,6 +38,8 @@ public abstract class YarnRPC {
public abstract Object getProxy(Class protocol, InetSocketAddress addr, public abstract Object getProxy(Class protocol, InetSocketAddress addr,
Configuration conf); Configuration conf);
public abstract void stopProxy(Object proxy, Configuration conf);
public abstract Server getServer(Class protocol, Object instance, public abstract Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,