MAPREDUCE-5199. Removing ApplicationTokens file as it is no longer needed. Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1492848 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-06-13 20:20:33 +00:00
parent 7e1744ccf9
commit b64572b06b
10 changed files with 75 additions and 79 deletions

View File

@ -282,6 +282,9 @@ Release 2.1.0-beta - UNRELEASED
MAPREDUCE-5283. Over 10 different tests have near identical
implementations of AppContext (Sandy Ryza via jlowe)
MAPREDUCE-5199. Removing ApplicationTokens file as it is no longer needed.
(Daryn Sharp via vinodkv)
OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@ -37,7 +36,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputCommitter;
@ -107,6 +105,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringInterner;
@ -125,6 +124,7 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
@ -192,7 +192,7 @@ public class MRAppMaster extends CompositeService {
private SpeculatorEventDispatcher speculatorEventDispatcher;
private Job job;
private Credentials fsTokens = new Credentials(); // Filled during init
private Credentials jobCredentials = new Credentials(); // Filled during init
protected UserGroupInformation currentUser; // Will be setup during init
private volatile boolean isLastAMRetry = false;
@ -231,7 +231,7 @@ public MRAppMaster(ApplicationAttemptId applicationAttemptId,
protected void serviceInit(final Configuration conf) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
downloadTokensAndSetupUGI(conf);
initJobCredentialsAndUGI(conf);
isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
LOG.info("The specific max attempts: " + maxAppAttempts +
@ -470,7 +470,7 @@ protected FileSystem getFileSystem(Configuration conf) throws IOException {
}
protected Credentials getCredentials() {
return fsTokens;
return jobCredentials;
}
/**
@ -590,7 +590,7 @@ protected Job createJob(Configuration conf, JobStateInternal forcedState,
// create single job
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
completedTasksFromPreviousRun, metrics,
committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context,
@ -607,22 +607,11 @@ protected Job createJob(Configuration conf, JobStateInternal forcedState,
* Obtain the tokens needed by the job and put them in the UGI
* @param conf
*/
protected void downloadTokensAndSetupUGI(Configuration conf) {
protected void initJobCredentialsAndUGI(Configuration conf) {
try {
this.currentUser = UserGroupInformation.getCurrentUser();
// Read the file-system tokens from the localized tokens-file.
Path jobSubmitDir =
FileContext.getLocalFSFileContext().makeQualified(
new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
.getAbsolutePath()));
Path jobTokenFile =
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ jobTokenFile);
currentUser.addCredentials(fsTokens); // For use by AppMaster itself.
this.jobCredentials = ((JobConf)conf).getCredentials();
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
@ -1034,7 +1023,7 @@ private void processRecovery() {
// are reducers as the shuffle secret would be app attempt specific.
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
TokenCache.getShuffleSecretKey(fsTokens) != null);
TokenCache.getShuffleSecretKey(jobCredentials) != null);
if (recoveryEnabled && recoverySupportedByCommitter
&& shuffleKeyValidForRecovery) {
@ -1372,9 +1361,23 @@ protected static void initAndStartAppMaster(final MRAppMaster appMaster,
// them
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
LOG.info("Executing with tokens:");
for (Token<?> token : credentials.getAllTokens()) {
LOG.info(token);
}
UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
appMasterUgi.addCredentials(credentials);
// Now remove the AM->RM token so tasks don't have it
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
if (token.getKind().equals(ApplicationTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
conf.getCredentials().addAll(credentials);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override

View File

@ -594,7 +594,7 @@ JobEventType.JOB_KILL, new KillTasksTransition())
private float cleanupProgress;
private boolean isUber = false;
private Credentials fsTokens;
private Credentials jobCredentials;
private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager;
@ -604,7 +604,7 @@ public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
Credentials jobCredentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
OutputCommitter committer, boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
@ -631,7 +631,7 @@ public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.fsTokens = fsTokenCredentials;
this.jobCredentials = jobCredentials;
this.jobTokenSecretManager = jobTokenSecretManager;
this.aclsManager = new JobACLsManager(conf);
@ -1414,11 +1414,11 @@ protected void setup(JobImpl job) throws IOException {
// If the job client did not setup the shuffle secret then reuse
// the job token secret for the shuffle.
if (TokenCache.getShuffleSecretKey(job.fsTokens) == null) {
if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
LOG.warn("Shuffle secret key missing from job credentials."
+ " Using job token secret as shuffle secret.");
TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
job.fsTokens);
job.jobCredentials);
}
}
@ -1431,7 +1431,7 @@ private void createMapTasks(JobImpl job, long inputLength,
job.remoteJobConfFile,
job.conf, splits[i],
job.taskAttemptListener,
job.jobToken, job.fsTokens,
job.jobToken, job.jobCredentials,
job.clock,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
@ -1449,7 +1449,7 @@ private void createReduceTasks(JobImpl job) {
job.remoteJobConfFile,
job.conf, job.numMapTasks,
job.taskAttemptListener, job.jobToken,
job.fsTokens, job.clock,
job.jobCredentials, job.clock,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);

View File

@ -141,7 +141,7 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
}
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
protected void initJobCredentialsAndUGI(Configuration conf) {
// Fake a shuffle secret that normally is provided by the job client.
String shuffleSecret = "fake-shuffle-secret";
TokenCache.setShuffleSecretKey(shuffleSecret.getBytes(), getCredentials());

View File

@ -58,11 +58,11 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnRuntimeException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@ -344,6 +344,13 @@ public void testMRAppMasterCredentials() throws Exception {
new Token<TokenIdentifier>(identifier, password, kind, service);
Text tokenAlias = new Text("myToken");
credentials.addToken(tokenAlias, myToken);
Text appTokenService = new Text("localhost:0");
Token<ApplicationTokenIdentifier> appToken =
new Token<ApplicationTokenIdentifier>(identifier, password,
ApplicationTokenIdentifier.KIND_NAME, appTokenService);
credentials.addToken(appTokenService, appToken);
Text keyAlias = new Text("mySecretKeyAlias");
credentials.addSecretKey(keyAlias, "mySecretKey".getBytes());
Token<? extends TokenIdentifier> storedToken =
@ -379,13 +386,13 @@ public void testMRAppMasterCredentials() throws Exception {
System.currentTimeMillis(), 1, false, true);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
// Now validate the credentials
Credentials appMasterCreds = appMaster.credentials;
// Now validate the task credentials
Credentials appMasterCreds = appMaster.getCredentials();
Assert.assertNotNull(appMasterCreds);
Assert.assertEquals(1, appMasterCreds.numberOfSecretKeys());
Assert.assertEquals(1, appMasterCreds.numberOfTokens());
// Validate the tokens
// Validate the tokens - app token should not be present
Token<? extends TokenIdentifier> usedToken =
appMasterCreds.getToken(tokenAlias);
Assert.assertNotNull(usedToken);
@ -397,13 +404,24 @@ public void testMRAppMasterCredentials() throws Exception {
Assert.assertEquals("mySecretKey", new String(usedKey));
// The credentials should also be added to conf so that OuputCommitter can
// access it
// access it - app token should not be present
Credentials confCredentials = conf.getCredentials();
Assert.assertEquals(1, confCredentials.numberOfSecretKeys());
Assert.assertEquals(1, confCredentials.numberOfTokens());
Assert.assertEquals(storedToken, confCredentials.getToken(tokenAlias));
Assert.assertEquals("mySecretKey",
new String(confCredentials.getSecretKey(keyAlias)));
// Verify the AM's ugi - app token should be present
Credentials ugiCredentials = appMaster.getUgi().getCredentials();
Assert.assertEquals(1, ugiCredentials.numberOfSecretKeys());
Assert.assertEquals(2, ugiCredentials.numberOfTokens());
Assert.assertEquals(storedToken, ugiCredentials.getToken(tokenAlias));
Assert.assertEquals(appToken, ugiCredentials.getToken(appTokenService));
Assert.assertEquals("mySecretKey",
new String(ugiCredentials.getSecretKey(keyAlias)));
}
}
@ -416,7 +434,6 @@ class MRAppMasterTest extends MRAppMaster {
ContainerAllocator mockContainerAllocator;
CommitterEventHandler mockCommitterEventHandler;
RMHeartbeatHandler mockRMHeartbeatHandler;
Credentials credentials;
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
@ -445,15 +462,6 @@ protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
}
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
try {
this.currentUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
@Override
protected ContainerAllocator createContainerAllocator(
final ClientService clientService, final AppContext context) {
@ -477,7 +485,6 @@ protected void serviceStart() throws Exception {
try {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String user = ugi.getShortUserName();
this.credentials = ugi.getCredentials();
stagingDirPath = MRApps.getStagingAreaDir(conf, user);
} catch (Exception e) {
fail(e.getMessage());
@ -487,4 +494,12 @@ protected void serviceStart() throws Exception {
}
}
@Override
public Credentials getCredentials() {
return super.getCredentials();
}
public UserGroupInformation getUgi() {
return currentUser;
}
}

View File

@ -1595,7 +1595,7 @@ public MRAppNoShuffleSecret(int maps, int reduces, boolean autoComplete,
}
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
protected void initJobCredentialsAndUGI(Configuration conf) {
// do NOT put a shuffle secret in the job credentials
}
}

View File

@ -270,7 +270,7 @@ public Configuration getConfig() {
}
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
protected void initJobCredentialsAndUGI(Configuration conf) {
}
public boolean getTestIsLastAMRetry(){

View File

@ -593,12 +593,6 @@ public interface MRJobConfig {
public static final String APPLICATION_MASTER_CLASS =
"org.apache.hadoop.mapreduce.v2.app.MRAppMaster";
// The token file for the application. Should contain tokens for access to
// remote file system and may optionally contain application specific tokens.
// For now, generated by the AppManagers and used by NodeManagers and the
// Containers.
public static final String APPLICATION_TOKENS_FILE = "appTokens";
public static final String MAPREDUCE_V2_CHILD_CLASS =
"org.apache.hadoop.mapred.YarnChild";

View File

@ -64,7 +64,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnRuntimeException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -179,7 +178,7 @@ public ClusterMetrics getClusterMetrics() throws IOException,
}
@VisibleForTesting
void addHistoyToken(Credentials ts) throws IOException, InterruptedException {
void addHistoryToken(Credentials ts) throws IOException, InterruptedException {
/* check if we have a hsproxy, if not, no need */
MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) {
@ -279,17 +278,8 @@ public long getTaskTrackerExpiryInterval() throws IOException,
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
addHistoyToken(ts);
addHistoryToken(ts);
// Upload only in security mode: TODO
Path applicationTokensFile =
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
try {
ts.writeTokenStorageFile(applicationTokensFile, conf);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
@ -383,8 +373,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
// TODO gross hack
for (String s : new String[] {
MRJobConfig.JOB_SPLIT,
MRJobConfig.JOB_SPLIT_METAINFO,
MRJobConfig.APPLICATION_TOKENS_FILE }) {
MRJobConfig.JOB_SPLIT_METAINFO }) {
localResources.put(
MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
createApplicationResource(defaultFileContext,

View File

@ -306,13 +306,13 @@ public void testGetHSDelegationToken() throws Exception {
YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache);
// No HS token if no RM token
yarnRunner.addHistoyToken(creds);
yarnRunner.addHistoryToken(creds);
verify(mockHsProxy, times(0)).getDelegationToken(
any(GetDelegationTokenRequest.class));
// No HS token if RM token, but secirity disabled.
creds.addToken(new Text("rmdt"), token);
yarnRunner.addHistoyToken(creds);
yarnRunner.addHistoryToken(creds);
verify(mockHsProxy, times(0)).getDelegationToken(
any(GetDelegationTokenRequest.class));
@ -322,18 +322,18 @@ public void testGetHSDelegationToken() throws Exception {
creds = new Credentials();
// No HS token if no RM token, security enabled
yarnRunner.addHistoyToken(creds);
yarnRunner.addHistoryToken(creds);
verify(mockHsProxy, times(0)).getDelegationToken(
any(GetDelegationTokenRequest.class));
// HS token if RM token present, security enabled
creds.addToken(new Text("rmdt"), token);
yarnRunner.addHistoyToken(creds);
yarnRunner.addHistoryToken(creds);
verify(mockHsProxy, times(1)).getDelegationToken(
any(GetDelegationTokenRequest.class));
// No additional call to get HS token if RM and HS token present
yarnRunner.addHistoyToken(creds);
yarnRunner.addHistoryToken(creds);
verify(mockHsProxy, times(1)).getDelegationToken(
any(GetDelegationTokenRequest.class));
} finally {
@ -407,10 +407,6 @@ public void testAMAdminCommandOpts() throws Exception {
out = new FileOutputStream(jobsplitmetainfo);
out.close();
File appTokens = new File(testWorkDir, MRJobConfig.APPLICATION_TOKENS_FILE);
out = new FileOutputStream(appTokens);
out.close();
ApplicationSubmissionContext submissionContext =
yarnRunner.createApplicationSubmissionContext(jobConf, testWorkDir.toString(), new Credentials());
@ -477,10 +473,6 @@ public void testWarnCommandOpts() throws Exception {
out = new FileOutputStream(jobsplitmetainfo);
out.close();
File appTokens = new File(testWorkDir, MRJobConfig.APPLICATION_TOKENS_FILE);
out = new FileOutputStream(appTokens);
out.close();
@SuppressWarnings("unused")
ApplicationSubmissionContext submissionContext =
yarnRunner.createApplicationSubmissionContext(jobConf, testWorkDir.toString(), new Credentials());