MAPREDUCE-3333. Fixed bugs in ContainerLauncher of MR AppMaster due to which per-container connections to NodeManager were lingering long enough to hit the ulimits on number of processes. (vinodkv)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1199751 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-11-09 13:41:19 +00:00
parent 3853d112ea
commit 2df83e5b4d
10 changed files with 94 additions and 61 deletions

View File

@ -98,6 +98,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3345. Fixed a race condition in ResourceManager that was causing
TestContainerManagerSecurity to fail sometimes. (Hitesh Shah via vinodkv)
MAPREDUCE-3333. Fixed bugs in ContainerLauncher of MR AppMaster due to
which per-container connections to NodeManager were lingering long enough
to hit the ulimits on number of processes. (vinodkv)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -36,5 +36,5 @@ public interface ContainerLauncher
/**
* Maximum of 1 minute timeout for a Node to react to the command
*/
static final int DEFAULT_NM__COMMAND_TIMEOUT = 60000;
static final int DEFAULT_NM_COMMAND_TIMEOUT = 60000;
}

View File

@ -21,9 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.launcher;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@ -36,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -59,11 +58,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -84,29 +82,28 @@ public class ContainerLauncherImpl extends AbstractService implements
private Thread eventHandlingThread;
private BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>();
private RecordFactory recordFactory;
YarnRPC rpc;
// To track numNodes.
Set<String> allNodes = new HashSet<String>();
// have a cache/map of proxies so as to avoid creating multiple RPC
// client connection objects for the same container.
private Map<ContainerId, ContainerManager> clientCache
= new HashMap<ContainerId, ContainerManager>();
public ContainerLauncherImpl(AppContext context) {
super(ContainerLauncherImpl.class.getName());
this.context = context;
}
@Override
public synchronized void init(Configuration conf) {
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
public synchronized void init(Configuration config) {
Configuration conf = new Configuration(config);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
this.limitOnPoolSize = conf.getInt(
MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
ContainerLauncher.DEFAULT_NM__COMMAND_TIMEOUT);
ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT);
this.rpc = YarnRPC.create(conf);
super.init(conf);
}
@ -180,37 +177,28 @@ public class ContainerLauncherImpl extends AbstractService implements
UserGroupInformation user = UserGroupInformation.getCurrentUser();
synchronized (this.clientCache) {
this.allNodes.add(containerManagerBindAddr);
if (this.clientCache.containsKey(containerID)) {
return this.clientCache.get(containerID);
}
this.allNodes.add(containerManagerBindAddr);
if (UserGroupInformation.isSecurityEnabled()) {
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
containerToken.getIdentifier().array(), containerToken
.getPassword().array(), new Text(containerToken.getKind()),
new Text(containerToken.getService()));
// the user in createRemoteUser in this context has to be ContainerID
user = UserGroupInformation.createRemoteUser(containerID.toString());
user.addToken(token);
}
ContainerManager proxy = user
.doAs(new PrivilegedAction<ContainerManager>() {
@Override
public ContainerManager run() {
YarnRPC rpc = YarnRPC.create(getConfig());
return (ContainerManager) rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr(containerManagerBindAddr),
getConfig());
}
});
this.clientCache.put(containerID, proxy);
return proxy;
if (UserGroupInformation.isSecurityEnabled()) {
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
containerToken.getIdentifier().array(), containerToken
.getPassword().array(), new Text(containerToken.getKind()),
new Text(containerToken.getService()));
// the user in createRemoteUser in this context has to be ContainerID
user = UserGroupInformation.createRemoteUser(containerID.toString());
user.addToken(token);
}
ContainerManager proxy = user
.doAs(new PrivilegedAction<ContainerManager>() {
@Override
public ContainerManager run() {
return (ContainerManager) rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr(containerManagerBindAddr),
getConfig());
}
});
return proxy;
}
private static class CommandTimer extends TimerTask {
@ -257,6 +245,8 @@ public class ContainerLauncherImpl extends AbstractService implements
Timer timer = new Timer(true);
ContainerManager proxy = null;
switch(event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
@ -267,8 +257,8 @@ public class ContainerLauncherImpl extends AbstractService implements
timer.schedule(new CommandTimer(Thread.currentThread(), event),
nmTimeOut);
ContainerManager proxy = getCMProxy(containerID,
containerManagerBindAddr, containerToken);
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
// Interruped during getProxy, but that didn't throw exception
if (Thread.currentThread().isInterrupted()) {
@ -284,8 +274,8 @@ public class ContainerLauncherImpl extends AbstractService implements
launchEvent.getContainer();
// Now launch the actual container
StartContainerRequest startRequest = recordFactory
.newRecordInstance(StartContainerRequest.class);
StartContainerRequest startRequest = Records
.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
StartContainerResponse response = proxy.startContainer(startRequest);
@ -324,6 +314,7 @@ public class ContainerLauncherImpl extends AbstractService implements
sendContainerLaunchFailedMsg(taskAttemptID, message);
} finally {
timer.cancel();
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
break;
@ -343,8 +334,8 @@ public class ContainerLauncherImpl extends AbstractService implements
timer.schedule(new CommandTimer(Thread.currentThread(), event),
nmTimeOut);
ContainerManager proxy = getCMProxy(containerID,
containerManagerBindAddr, containerToken);
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
if (Thread.currentThread().isInterrupted()) {
// The timer cancelled the command in the mean while. No need to
@ -356,8 +347,8 @@ public class ContainerLauncherImpl extends AbstractService implements
// TODO:check whether container is launched
// kill the remote container if already launched
StopContainerRequest stopRequest = recordFactory
.newRecordInstance(StopContainerRequest.class);
StopContainerRequest stopRequest = Records
.newRecord(StopContainerRequest.class);
stopRequest.setContainerId(event.getContainerID());
proxy.stopContainer(stopRequest);
}
@ -373,6 +364,7 @@ public class ContainerLauncherImpl extends AbstractService implements
LOG.warn(message);
} finally {
timer.cancel();
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
// after killing, send killed event to taskattempt

View File

@ -56,6 +56,12 @@ public class ContainerManagerPBClientImpl implements ContainerManager {
ContainerManagerService.BlockingInterface.class, clientVersion, addr, conf);
}
public void close() {
if (this.proxy != null) {
RPC.stopProxy(this.proxy);
}
}
@Override
public GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException {

View File

@ -25,6 +25,9 @@ import org.apache.hadoop.yarn.YarnException;
public interface RpcClientFactory {
public Object getClient(Class<?> protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws YarnException;
public Object getClient(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf) throws YarnException;
public void stopClient(Object proxy);
}

View File

@ -20,16 +20,22 @@ package org.apache.hadoop.yarn.factories.impl.pb;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.factories.RpcClientFactory;
public class RpcClientFactoryPBImpl implements RpcClientFactory {
private static final Log LOG = LogFactory
.getLog(RpcClientFactoryPBImpl.class);
private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb.client";
private static final String PB_IMPL_CLASS_SUFFIX = "PBClientImpl";
@ -74,9 +80,21 @@ public class RpcClientFactoryPBImpl implements RpcClientFactory {
throw new YarnException(e);
}
}
@Override
public void stopClient(Object proxy) {
try {
Method closeMethod = proxy.getClass().getMethod("close");
closeMethod.invoke(proxy);
} catch (InvocationTargetException e) {
throw new YarnException(e);
} catch (Exception e) {
LOG.error("Cannot call close method due to Exception. "
+ "Ignoring.", e);
throw new YarnException(e);
}
}
private String getPBImplClassName(Class<?> clazz) {
String srcPackagePart = getPackageName(clazz);
String srcClassName = getClassName(clazz);

View File

@ -20,15 +20,12 @@ package org.apache.hadoop.yarn.ipc;
import java.net.InetSocketAddress;
import org.apache.hadoop.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RpcFactoryProvider;
/**
@ -50,6 +47,11 @@ public class HadoopYarnProtoRPC extends YarnRPC {
addr, conf);
}
@Override
public void stopProxy(Object proxy, Configuration conf) {
RpcFactoryProvider.getClientFactory(conf).stopClient(proxy);
}
@Override
public Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf,

View File

@ -21,12 +21,12 @@ package org.apache.hadoop.yarn.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.AvroSpecificRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
@ -54,6 +54,11 @@ public class HadoopYarnRPC extends YarnRPC {
}
}
@Override
public void stopProxy(Object proxy, Configuration conf) {
RPC.stopProxy(proxy);
}
@Override
public Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf,

View File

@ -155,6 +155,7 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
return actualReturnMessage;
}
@Override
public void close() throws IOException {
if (!isClosed) {
isClosed = true;

View File

@ -20,10 +20,10 @@ package org.apache.hadoop.yarn.ipc;
import java.net.InetSocketAddress;
import org.apache.hadoop.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
@ -38,6 +38,8 @@ public abstract class YarnRPC {
public abstract Object getProxy(Class protocol, InetSocketAddress addr,
Configuration conf);
public abstract void stopProxy(Object proxy, Configuration conf);
public abstract Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,