fMerge -r 1170280:1170281 from trunk to branch-0.23 to fix MAPREDUCE-2997.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1170283 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-09-13 18:15:54 +00:00
parent cd7c0749ca
commit ba9254b44f
2 changed files with 19 additions and 19 deletions

View File

@ -1280,6 +1280,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2995. Better handling of expired containers in MapReduce MAPREDUCE-2995. Better handling of expired containers in MapReduce
ApplicationMaster. (vinodkv via acmurthy) ApplicationMaster. (vinodkv via acmurthy)
MAPREDUCE-2995. Fixed race condition in ContainerLauncher. (vinodkv via
acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -21,9 +21,9 @@
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -79,8 +79,8 @@ public class ContainerLauncherImpl extends AbstractService implements
private RecordFactory recordFactory; private RecordFactory recordFactory;
//have a cache/map of UGIs so as to avoid creating too many RPC //have a cache/map of UGIs so as to avoid creating too many RPC
//client connection objects to the same NodeManager //client connection objects to the same NodeManager
private Map<String, UserGroupInformation> ugiMap = private ConcurrentMap<String, UserGroupInformation> ugiMap =
new HashMap<String, UserGroupInformation>(); new ConcurrentHashMap<String, UserGroupInformation>();
public ContainerLauncherImpl(AppContext context) { public ContainerLauncherImpl(AppContext context) {
super(ContainerLauncherImpl.class.getName()); super(ContainerLauncherImpl.class.getName());
@ -142,22 +142,19 @@ protected ContainerManager getCMProxy(ContainerId containerID,
UserGroupInformation user = UserGroupInformation.getCurrentUser(); UserGroupInformation user = UserGroupInformation.getCurrentUser();
// TODO: Synchronization problems!!
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
if(!ugiMap.containsKey(containerManagerBindAddr)) {
Token<ContainerTokenIdentifier> token = Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
new Token<ContainerTokenIdentifier>( containerToken.getIdentifier().array(), containerToken
containerToken.getIdentifier().array(), .getPassword().array(), new Text(containerToken.getKind()),
containerToken.getPassword().array(), new Text( new Text(containerToken.getService()));
containerToken.getKind()), new Text( // the user in createRemoteUser in this context is not important
containerToken.getService())); UserGroupInformation ugi = UserGroupInformation
//the user in createRemoteUser in this context is not important .createRemoteUser(containerManagerBindAddr);
user = UserGroupInformation.createRemoteUser(containerManagerBindAddr); ugi.addToken(token);
user.addToken(token); ugiMap.putIfAbsent(containerManagerBindAddr, ugi);
ugiMap.put(containerManagerBindAddr, user);
} else { user = ugiMap.get(containerManagerBindAddr);
user = ugiMap.get(containerManagerBindAddr);
}
} }
ContainerManager proxy = ContainerManager proxy =
user.doAs(new PrivilegedAction<ContainerManager>() { user.doAs(new PrivilegedAction<ContainerManager>() {