MAPREDUCE-5240. Fix a bug in MRAppMaster because of which OutputCommitter could not access credentials set by the user. Contributed by Vinod Kumar Vavilapalli.

svn merge --ignore-ancestry -c 1482175 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1482176 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-14 02:50:27 +00:00
parent 7db8233722
commit 6706e58f61
3 changed files with 35 additions and 15 deletions

View File

@ -248,6 +248,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5220. Setter methods in TaskCompletionEvent are public in MR1 and MAPREDUCE-5220. Setter methods in TaskCompletionEvent are public in MR1 and
protected in MR2. (sandyr via tucu) protected in MR2. (sandyr via tucu)
MAPREDUCE-5240. Fix a bug in MRAppMaster because of which OutputCommitter
could not access credentials set by the user. (vinodkv)
Release 2.0.4-alpha - 2013-04-25 Release 2.0.4-alpha - 2013-04-25
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1304,7 +1304,7 @@ public class MRAppMaster extends CompositeService {
Integer.parseInt(maxAppAttempts)); Integer.parseInt(maxAppAttempts));
ShutdownHookManager.get().addShutdownHook( ShutdownHookManager.get().addShutdownHook(
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration(new JobConf()); JobConf conf = new JobConf(new YarnConfiguration());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE)); conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
String jobUserName = System String jobUserName = System
.getenv(ApplicationConstants.Environment.USER.name()); .getenv(ApplicationConstants.Environment.USER.name());
@ -1357,7 +1357,7 @@ public class MRAppMaster extends CompositeService {
} }
protected static void initAndStartAppMaster(final MRAppMaster appMaster, protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final YarnConfiguration conf, String jobUserName) throws IOException, final JobConf conf, String jobUserName) throws IOException,
InterruptedException { InterruptedException {
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
// Security framework already loaded the tokens into current UGI, just use // Security framework already loaded the tokens into current UGI, just use
@ -1367,6 +1367,7 @@ public class MRAppMaster extends CompositeService {
UserGroupInformation appMasterUgi = UserGroupInformation UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName); .createRemoteUser(jobUserName);
appMasterUgi.addCredentials(credentials); appMasterUgi.addCredentials(credentials);
conf.getCredentials().addAll(credentials);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() { appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -113,7 +113,7 @@ public class TestMRAppMaster {
MRAppMasterTest appMaster = MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
YarnConfiguration conf = new YarnConfiguration(); JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
@ -126,7 +126,7 @@ public class TestMRAppMaster {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1"; String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser"; String userName = "TestAppMasterUser";
YarnConfiguration conf = new YarnConfiguration(); JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr); .toApplicationAttemptId(applicationAttemptIdStr);
@ -161,7 +161,7 @@ public class TestMRAppMaster {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1"; String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser"; String userName = "TestAppMasterUser";
YarnConfiguration conf = new YarnConfiguration(); JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr); .toApplicationAttemptId(applicationAttemptIdStr);
@ -197,7 +197,7 @@ public class TestMRAppMaster {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1"; String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser"; String userName = "TestAppMasterUser";
YarnConfiguration conf = new YarnConfiguration(); JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr); .toApplicationAttemptId(applicationAttemptIdStr);
@ -233,7 +233,7 @@ public class TestMRAppMaster {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1"; String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser"; String userName = "TestAppMasterUser";
YarnConfiguration conf = new YarnConfiguration(); JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr); .toApplicationAttemptId(applicationAttemptIdStr);
@ -278,7 +278,7 @@ public class TestMRAppMaster {
ApplicationAttemptId applicationAttemptId = ConverterUtils ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr); .toApplicationAttemptId(applicationAttemptIdStr);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
YarnConfiguration conf = new YarnConfiguration(); JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
File stagingDir = File stagingDir =
@ -343,10 +343,12 @@ public class TestMRAppMaster {
new Token<TokenIdentifier>(identifier, password, kind, service); new Token<TokenIdentifier>(identifier, password, kind, service);
Text tokenAlias = new Text("myToken"); Text tokenAlias = new Text("myToken");
credentials.addToken(tokenAlias, myToken); credentials.addToken(tokenAlias, myToken);
Text keyAlias = new Text("mySecretKeyAlias");
credentials.addSecretKey(keyAlias, "mySecretKey".getBytes());
Token<? extends TokenIdentifier> storedToken = Token<? extends TokenIdentifier> storedToken =
credentials.getToken(tokenAlias); credentials.getToken(tokenAlias);
YarnConfiguration conf = new YarnConfiguration(); JobConf conf = new JobConf();
Path tokenFilePath = new Path(testDir.getAbsolutePath(), "tokens-file"); Path tokenFilePath = new Path(testDir.getAbsolutePath(), "tokens-file");
Map<String, String> newEnv = new HashMap<String, String>(); Map<String, String> newEnv = new HashMap<String, String>();
@ -379,14 +381,28 @@ public class TestMRAppMaster {
// Now validate the credentials // Now validate the credentials
Credentials appMasterCreds = appMaster.credentials; Credentials appMasterCreds = appMaster.credentials;
Assert.assertNotNull(appMasterCreds); Assert.assertNotNull(appMasterCreds);
Assert.assertEquals(1, appMasterCreds.numberOfSecretKeys());
Assert.assertEquals(1, appMasterCreds.numberOfTokens());
// Validate the tokens
Token<? extends TokenIdentifier> usedToken = Token<? extends TokenIdentifier> usedToken =
appMasterCreds.getToken(tokenAlias); appMasterCreds.getToken(tokenAlias);
Assert.assertNotNull(usedToken); Assert.assertNotNull(usedToken);
Assert Assert.assertEquals(storedToken, usedToken);
.assertEquals("MyIdentifier", new String(storedToken.getIdentifier()));
Assert.assertEquals("MyPassword", new String(storedToken.getPassword())); // Validate the keys
Assert.assertEquals("MyTokenKind", storedToken.getKind().toString()); byte[] usedKey = appMasterCreds.getSecretKey(keyAlias);
Assert.assertEquals("host:port", storedToken.getService().toString()); Assert.assertNotNull(usedKey);
Assert.assertEquals("mySecretKey", new String(usedKey));
// The credentials should also be added to conf so that OuputCommitter can
// access it
Credentials confCredentials = conf.getCredentials();
Assert.assertEquals(1, confCredentials.numberOfSecretKeys());
Assert.assertEquals(1, confCredentials.numberOfTokens());
Assert.assertEquals(storedToken, confCredentials.getToken(tokenAlias));
Assert.assertEquals("mySecretKey",
new String(confCredentials.getSecretKey(keyAlias)));
} }
} }