MAPREDUCE-5205. Fixed MR App to load tokens correctly. Contributed by Vinod Kumar Vavilapalli.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1479016 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bfc73613fc
commit
6e37a7e5e1
|
@ -375,6 +375,8 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
MAPREDUCE-5193. A few MR tests use block sizes which are smaller than the
|
MAPREDUCE-5193. A few MR tests use block sizes which are smaller than the
|
||||||
default minimum block size. (Andrew Wang via atm)
|
default minimum block size. (Andrew Wang via atm)
|
||||||
|
|
||||||
|
MAPREDUCE-5205. Fixed MR App to load tokens correctly. (vinodkv)
|
||||||
|
|
||||||
Release 2.0.4-alpha - UNRELEASED
|
Release 2.0.4-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -1360,8 +1360,13 @@ public class MRAppMaster extends CompositeService {
|
||||||
final YarnConfiguration conf, String jobUserName) throws IOException,
|
final YarnConfiguration conf, String jobUserName) throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
// Security framework already loaded the tokens into current UGI, just use
|
||||||
|
// them
|
||||||
|
Credentials credentials =
|
||||||
|
UserGroupInformation.getCurrentUser().getCredentials();
|
||||||
UserGroupInformation appMasterUgi = UserGroupInformation
|
UserGroupInformation appMasterUgi = UserGroupInformation
|
||||||
.createRemoteUser(jobUserName);
|
.createRemoteUser(jobUserName);
|
||||||
|
appMasterUgi.addCredentials(credentials);
|
||||||
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws Exception {
|
public Object run() throws Exception {
|
||||||
|
|
|
@ -17,18 +17,29 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.mapreduce.v2.app;
|
package org.apache.hadoop.mapreduce.v2.app;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
|
@ -41,13 +52,22 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -55,13 +75,20 @@ import org.junit.Test;
|
||||||
public class TestMRAppMaster {
|
public class TestMRAppMaster {
|
||||||
private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
|
private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
|
||||||
static String stagingDir = "staging/";
|
static String stagingDir = "staging/";
|
||||||
|
private static FileContext localFS = null;
|
||||||
|
private static final File testDir = new File("target",
|
||||||
|
TestMRAppMaster.class.getName() + "-tmpDir").getAbsoluteFile();
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setup() {
|
public static void setup() throws AccessControlException,
|
||||||
|
FileNotFoundException, IllegalArgumentException, IOException {
|
||||||
//Do not error out if metrics are inited multiple times
|
//Do not error out if metrics are inited multiple times
|
||||||
DefaultMetricsSystem.setMiniClusterMode(true);
|
DefaultMetricsSystem.setMiniClusterMode(true);
|
||||||
File dir = new File(stagingDir);
|
File dir = new File(stagingDir);
|
||||||
stagingDir = dir.getAbsolutePath();
|
stagingDir = dir.getAbsolutePath();
|
||||||
|
localFS = FileContext.getLocalFSFileContext();
|
||||||
|
localFS.delete(new Path(testDir.getAbsolutePath()), true);
|
||||||
|
testDir.mkdir();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -269,6 +296,100 @@ public class TestMRAppMaster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A dirty hack to modify the env of the current JVM itself - Dirty, but
|
||||||
|
// should be okay for testing.
|
||||||
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||||
|
private static void setNewEnvironmentHack(Map<String, String> newenv)
|
||||||
|
throws Exception {
|
||||||
|
try {
|
||||||
|
Class<?> cl = Class.forName("java.lang.ProcessEnvironment");
|
||||||
|
Field field = cl.getDeclaredField("theEnvironment");
|
||||||
|
field.setAccessible(true);
|
||||||
|
Map<String, String> env = (Map<String, String>) field.get(null);
|
||||||
|
env.clear();
|
||||||
|
env.putAll(newenv);
|
||||||
|
Field ciField = cl.getDeclaredField("theCaseInsensitiveEnvironment");
|
||||||
|
ciField.setAccessible(true);
|
||||||
|
Map<String, String> cienv = (Map<String, String>) ciField.get(null);
|
||||||
|
cienv.clear();
|
||||||
|
cienv.putAll(newenv);
|
||||||
|
} catch (NoSuchFieldException e) {
|
||||||
|
Class[] classes = Collections.class.getDeclaredClasses();
|
||||||
|
Map<String, String> env = System.getenv();
|
||||||
|
for (Class cl : classes) {
|
||||||
|
if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
|
||||||
|
Field field = cl.getDeclaredField("m");
|
||||||
|
field.setAccessible(true);
|
||||||
|
Object obj = field.get(env);
|
||||||
|
Map<String, String> map = (Map<String, String>) obj;
|
||||||
|
map.clear();
|
||||||
|
map.putAll(newenv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMRAppMasterCredentials() throws Exception {
|
||||||
|
|
||||||
|
Logger rootLogger = LogManager.getRootLogger();
|
||||||
|
rootLogger.setLevel(Level.DEBUG);
|
||||||
|
|
||||||
|
// Simulate credentials passed to AM via client->RM->NM
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
byte[] identifier = "MyIdentifier".getBytes();
|
||||||
|
byte[] password = "MyPassword".getBytes();
|
||||||
|
Text kind = new Text("MyTokenKind");
|
||||||
|
Text service = new Text("host:port");
|
||||||
|
Token<? extends TokenIdentifier> myToken =
|
||||||
|
new Token<TokenIdentifier>(identifier, password, kind, service);
|
||||||
|
Text tokenAlias = new Text("myToken");
|
||||||
|
credentials.addToken(tokenAlias, myToken);
|
||||||
|
Token<? extends TokenIdentifier> storedToken =
|
||||||
|
credentials.getToken(tokenAlias);
|
||||||
|
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
|
||||||
|
Path tokenFilePath = new Path(testDir.getAbsolutePath(), "tokens-file");
|
||||||
|
Map<String, String> newEnv = new HashMap<String, String>();
|
||||||
|
newEnv.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, tokenFilePath
|
||||||
|
.toUri().getPath());
|
||||||
|
setNewEnvironmentHack(newEnv);
|
||||||
|
credentials.writeTokenStorageFile(tokenFilePath, conf);
|
||||||
|
|
||||||
|
ApplicationId appId = BuilderUtils.newApplicationId(12345, 56);
|
||||||
|
ApplicationAttemptId applicationAttemptId =
|
||||||
|
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||||
|
ContainerId containerId =
|
||||||
|
BuilderUtils.newContainerId(applicationAttemptId, 546);
|
||||||
|
String userName = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
|
||||||
|
// Create staging dir, so MRAppMaster doesn't barf.
|
||||||
|
File stagingDir =
|
||||||
|
new File(MRApps.getStagingAreaDir(conf, userName).toString());
|
||||||
|
stagingDir.mkdirs();
|
||||||
|
|
||||||
|
// Set login-user to null as that is how real world MRApp starts with.
|
||||||
|
// This is null is the reason why token-file is read by UGI.
|
||||||
|
UserGroupInformation.setLoginUser(null);
|
||||||
|
|
||||||
|
MRAppMasterTest appMaster =
|
||||||
|
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
||||||
|
System.currentTimeMillis(), 1, false, true);
|
||||||
|
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
||||||
|
|
||||||
|
// Now validate the credentials
|
||||||
|
Credentials appMasterCreds = appMaster.credentials;
|
||||||
|
Assert.assertNotNull(appMasterCreds);
|
||||||
|
Token<? extends TokenIdentifier> usedToken =
|
||||||
|
appMasterCreds.getToken(tokenAlias);
|
||||||
|
Assert.assertNotNull(usedToken);
|
||||||
|
Assert
|
||||||
|
.assertEquals("MyIdentifier", new String(storedToken.getIdentifier()));
|
||||||
|
Assert.assertEquals("MyPassword", new String(storedToken.getPassword()));
|
||||||
|
Assert.assertEquals("MyTokenKind", storedToken.getKind().toString());
|
||||||
|
Assert.assertEquals("host:port", storedToken.getService().toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class MRAppMasterTest extends MRAppMaster {
|
class MRAppMasterTest extends MRAppMaster {
|
||||||
|
@ -280,6 +401,7 @@ class MRAppMasterTest extends MRAppMaster {
|
||||||
ContainerAllocator mockContainerAllocator;
|
ContainerAllocator mockContainerAllocator;
|
||||||
CommitterEventHandler mockCommitterEventHandler;
|
CommitterEventHandler mockCommitterEventHandler;
|
||||||
RMHeartbeatHandler mockRMHeartbeatHandler;
|
RMHeartbeatHandler mockRMHeartbeatHandler;
|
||||||
|
Credentials credentials;
|
||||||
|
|
||||||
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
|
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
|
||||||
ContainerId containerId, String host, int port, int httpPort,
|
ContainerId containerId, String host, int port, int httpPort,
|
||||||
|
@ -338,7 +460,9 @@ class MRAppMasterTest extends MRAppMaster {
|
||||||
public void start() {
|
public void start() {
|
||||||
if (overrideStart) {
|
if (overrideStart) {
|
||||||
try {
|
try {
|
||||||
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
|
String user = ugi.getShortUserName();
|
||||||
|
this.credentials = ugi.getCredentials();
|
||||||
stagingDirPath = MRApps.getStagingAreaDir(conf, user);
|
stagingDirPath = MRApps.getStagingAreaDir(conf, user);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
fail(e.getMessage());
|
fail(e.getMessage());
|
||||||
|
|
Loading…
Reference in New Issue