MAPREDUCE-5205. Fixed MR App to load tokens correctly. Contributed by Vinod Kumar Vavilapalli.

svn merge --ignore-ancestry -c 1479016 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1479018 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-04 00:29:34 +00:00
parent e66aae5cc0
commit fb8d3fb0e4
3 changed files with 135 additions and 4 deletions

View File

@ -213,6 +213,8 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5193. A few MR tests use block sizes which are smaller than the MAPREDUCE-5193. A few MR tests use block sizes which are smaller than the
default minimum block size. (Andrew Wang via atm) default minimum block size. (Andrew Wang via atm)
MAPREDUCE-5205. Fixed MR App to load tokens correctly. (vinodkv)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1360,8 +1360,13 @@ public class MRAppMaster extends CompositeService {
final YarnConfiguration conf, String jobUserName) throws IOException, final YarnConfiguration conf, String jobUserName) throws IOException,
InterruptedException { InterruptedException {
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
// Security framework already loaded the tokens into current UGI, just use
// them
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
UserGroupInformation appMasterUgi = UserGroupInformation UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName); .createRemoteUser(jobUserName);
appMasterUgi.addCredentials(credentials);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() { appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {

View File

@ -17,18 +17,29 @@
*/ */
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
@ -41,13 +52,22 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -55,13 +75,20 @@ import org.junit.Test;
public class TestMRAppMaster { public class TestMRAppMaster {
private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class); private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
static String stagingDir = "staging/"; static String stagingDir = "staging/";
private static FileContext localFS = null;
private static final File testDir = new File("target",
TestMRAppMaster.class.getName() + "-tmpDir").getAbsoluteFile();
@BeforeClass @BeforeClass
public static void setup() { public static void setup() throws AccessControlException,
FileNotFoundException, IllegalArgumentException, IOException {
//Do not error out if metrics are inited multiple times //Do not error out if metrics are inited multiple times
DefaultMetricsSystem.setMiniClusterMode(true); DefaultMetricsSystem.setMiniClusterMode(true);
File dir = new File(stagingDir); File dir = new File(stagingDir);
stagingDir = dir.getAbsolutePath(); stagingDir = dir.getAbsolutePath();
localFS = FileContext.getLocalFSFileContext();
localFS.delete(new Path(testDir.getAbsolutePath()), true);
testDir.mkdir();
} }
@Before @Before
@ -267,6 +294,100 @@ public class TestMRAppMaster {
} }
} }
// A dirty hack to modify the env of the current JVM itself - Dirty, but
// should be okay for testing.
@SuppressWarnings({ "rawtypes", "unchecked" })
private static void setNewEnvironmentHack(Map<String, String> newenv)
throws Exception {
try {
Class<?> cl = Class.forName("java.lang.ProcessEnvironment");
Field field = cl.getDeclaredField("theEnvironment");
field.setAccessible(true);
Map<String, String> env = (Map<String, String>) field.get(null);
env.clear();
env.putAll(newenv);
Field ciField = cl.getDeclaredField("theCaseInsensitiveEnvironment");
ciField.setAccessible(true);
Map<String, String> cienv = (Map<String, String>) ciField.get(null);
cienv.clear();
cienv.putAll(newenv);
} catch (NoSuchFieldException e) {
Class[] classes = Collections.class.getDeclaredClasses();
Map<String, String> env = System.getenv();
for (Class cl : classes) {
if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
Field field = cl.getDeclaredField("m");
field.setAccessible(true);
Object obj = field.get(env);
Map<String, String> map = (Map<String, String>) obj;
map.clear();
map.putAll(newenv);
}
}
}
}
@Test
public void testMRAppMasterCredentials() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
// Simulate credentials passed to AM via client->RM->NM
Credentials credentials = new Credentials();
byte[] identifier = "MyIdentifier".getBytes();
byte[] password = "MyPassword".getBytes();
Text kind = new Text("MyTokenKind");
Text service = new Text("host:port");
Token<? extends TokenIdentifier> myToken =
new Token<TokenIdentifier>(identifier, password, kind, service);
Text tokenAlias = new Text("myToken");
credentials.addToken(tokenAlias, myToken);
Token<? extends TokenIdentifier> storedToken =
credentials.getToken(tokenAlias);
YarnConfiguration conf = new YarnConfiguration();
Path tokenFilePath = new Path(testDir.getAbsolutePath(), "tokens-file");
Map<String, String> newEnv = new HashMap<String, String>();
newEnv.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, tokenFilePath
.toUri().getPath());
setNewEnvironmentHack(newEnv);
credentials.writeTokenStorageFile(tokenFilePath, conf);
ApplicationId appId = BuilderUtils.newApplicationId(12345, 56);
ApplicationAttemptId applicationAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId containerId =
BuilderUtils.newContainerId(applicationAttemptId, 546);
String userName = UserGroupInformation.getCurrentUser().getShortUserName();
// Create staging dir, so MRAppMaster doesn't barf.
File stagingDir =
new File(MRApps.getStagingAreaDir(conf, userName).toString());
stagingDir.mkdirs();
// Set login-user to null as that is how real world MRApp starts with.
// This is null is the reason why token-file is read by UGI.
UserGroupInformation.setLoginUser(null);
MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), 1, false, true);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
// Now validate the credentials
Credentials appMasterCreds = appMaster.credentials;
Assert.assertNotNull(appMasterCreds);
Token<? extends TokenIdentifier> usedToken =
appMasterCreds.getToken(tokenAlias);
Assert.assertNotNull(usedToken);
Assert
.assertEquals("MyIdentifier", new String(storedToken.getIdentifier()));
Assert.assertEquals("MyPassword", new String(storedToken.getPassword()));
Assert.assertEquals("MyTokenKind", storedToken.getKind().toString());
Assert.assertEquals("host:port", storedToken.getService().toString());
}
} }
class MRAppMasterTest extends MRAppMaster { class MRAppMasterTest extends MRAppMaster {
@ -278,6 +399,7 @@ class MRAppMasterTest extends MRAppMaster {
ContainerAllocator mockContainerAllocator; ContainerAllocator mockContainerAllocator;
CommitterEventHandler mockCommitterEventHandler; CommitterEventHandler mockCommitterEventHandler;
RMHeartbeatHandler mockRMHeartbeatHandler; RMHeartbeatHandler mockRMHeartbeatHandler;
Credentials credentials;
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort, ContainerId containerId, String host, int port, int httpPort,
@ -336,7 +458,9 @@ class MRAppMasterTest extends MRAppMaster {
public void start() { public void start() {
if (overrideStart) { if (overrideStart) {
try { try {
String user = UserGroupInformation.getCurrentUser().getShortUserName(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String user = ugi.getShortUserName();
this.credentials = ugi.getCredentials();
stagingDirPath = MRApps.getStagingAreaDir(conf, user); stagingDirPath = MRApps.getStagingAreaDir(conf, user);
} catch (Exception e) { } catch (Exception e) {
fail(e.getMessage()); fail(e.getMessage());