diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 015c1f299e1..51353cd5dc8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -996,4 +996,6 @@ public interface MRJobConfig { * A comma-separated list of properties whose value will be redacted. */ String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties"; + + String MR_JOB_SEND_TOKEN_CONF = "mapreduce.job.send-token-conf"; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 523dd284598..5aa2e719515 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1955,4 +1955,22 @@ mapreduce.job.redacted-properties + + + + This configuration is a regex expression. The list of configurations that + match the regex expression will be sent to RM. RM will use these + configurations for renewing tokens. + This configuration is added for below scenario: User needs to run distcp + jobs across two clusters, but the RM does not have necessary hdfs + configurations to connect to the remote hdfs cluster. Hence, user relies on + this config to send the configurations to RM and RM uses these + configurations to renew tokens. + For example the following regex expression indicates the minimum required + configs for RM to connect to a remote hdfs cluster: + dfs.nameservices|^dfs.namenode.rpc-address.*$|^dfs.ha.namenodes.*$|^dfs.client.failover.proxy.provider.*$|dfs.namenode.kerberos.principal + + mapreduce.job.send-token-conf + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 4c6f0f3139d..98fe5535cf3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Vector; @@ -499,6 +500,12 @@ public class YARNRunner implements ClientProtocol { ContainerLaunchContext.newInstance(localResources, environment, vargsFinal, null, securityTokens, acls); + String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF); + if (regex != null && !regex.isEmpty()) { + setTokenRenewerConf(amContainer, conf, regex); + } + + Collection tagsFromConf = jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS); @@ -576,6 +583,35 @@ public class YARNRunner implements ClientProtocol { return appContext; } + private void setTokenRenewerConf(ContainerLaunchContext context, + Configuration conf, String regex) throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(); + Configuration copy = new Configuration(false); + copy.clear(); + int count = 0; + for (Map.Entry map : conf) { + String key = map.getKey(); + String val = map.getValue(); + if (key.matches(regex)) { + copy.set(key, val); + count++; + } + } + copy.write(dob); + ByteBuffer appConf = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + LOG.info("Send configurations that match regex expression: " + regex + + " , total number of configs: " + count + ", total size : " + dob + .getLength() + " bytes."); + if (LOG.isDebugEnabled()) { + for (Iterator> itor = copy.iterator(); itor + .hasNext(); ) { + Map.Entry entry = itor.next(); + LOG.info(entry.getKey() + " ===> " + entry.getValue()); + } + } + context.setTokensConf(appConf); + } + @Override public void setJobPriority(JobID arg0, String arg1) throws IOException, InterruptedException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index abf2e72e0d1..c6da9a32201 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -46,6 +46,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -99,6 +100,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Appender; import org.apache.log4j.Layout; @@ -106,6 +108,7 @@ import org.apache.log4j.Logger; import org.apache.log4j.SimpleLayout; import org.apache.log4j.WriterAppender; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -677,4 +680,40 @@ public class TestYARNRunner { return yarnRunner.createApplicationSubmissionContext(jobConf, testWorkDir.toString(), new Credentials()); } + + // Test configs that match regex expression should be set in + // containerLaunchContext + @Test + public void testSendJobConf() throws IOException { + JobConf jobConf = new JobConf(); + jobConf.set("dfs.nameservices", "mycluster1,mycluster2"); + jobConf.set("dfs.namenode.rpc-address.mycluster2.nn1", "123.0.0.1"); + jobConf.set("dfs.namenode.rpc-address.mycluster2.nn2", "123.0.0.2"); + jobConf.set("dfs.ha.namenodes.mycluster2", "nn1,nn2"); + jobConf.set("dfs.client.failover.proxy.provider.mycluster2", "provider"); + jobConf.set("hadoop.tmp.dir", "testconfdir"); + jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + jobConf.set("mapreduce.job.send-token-conf", + "dfs.nameservices|^dfs.namenode.rpc-address.*$|^dfs.ha.namenodes.*$" + + "|^dfs.client.failover.proxy.provider.*$" + + "|dfs.namenode.kerberos.principal"); + UserGroupInformation.setConfiguration(jobConf); + + YARNRunner yarnRunner = new YARNRunner(jobConf); + ApplicationSubmissionContext submissionContext = + buildSubmitContext(yarnRunner, jobConf); + Configuration confSent = BuilderUtils.parseTokensConf(submissionContext); + + // configs that match regex should be included + Assert.assertTrue(confSent.get("dfs.namenode.rpc-address.mycluster2.nn1") + .equals("123.0.0.1")); + Assert.assertTrue(confSent.get("dfs.namenode.rpc-address.mycluster2.nn2") + .equals("123.0.0.2")); + + // configs that aren't matching regex should not be included + Assert.assertTrue(confSent.get("hadoop.tmp.dir") == null || !confSent + .get("hadoop.tmp.dir").equals("testconfdir")); + UserGroupInformation.reset(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index 6d4bccd80c8..616aa4ba4e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -107,6 +107,22 @@ public abstract class ContainerLaunchContext { @Stable public abstract void setTokens(ByteBuffer tokens); + /** + * Get the configuration used by RM to renew tokens. + * @return The configuration used by RM to renew the tokens. + */ + @Public + @Unstable + public abstract ByteBuffer getTokensConf(); + + /** + * Set the configuration used by RM to renew the tokens. + * @param tokensConf The configuration used by RM to renew the tokens + */ + @Public + @Unstable + public abstract void setTokensConf(ByteBuffer tokensConf); + /** * Get LocalResource required by the container. * @return all LocalResource required by the container diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 602c19061ee..d446c8f1b87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -490,7 +490,12 @@ public class YarnConfiguration extends Configuration { RM_PREFIX + "delegation.token.max-lifetime"; public static final long RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7*24*60*60*1000; // 7 days - + + public static final String RM_DELEGATION_TOKEN_MAX_CONF_SIZE = + RM_PREFIX + "delegation-token.max-conf-size-bytes"; + public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES = + 12800; + public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 9775b87e7d4..266a321b915 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -552,6 +552,8 @@ message ContainerLaunchContextProto { repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; optional ContainerRetryContextProto container_retry_context = 7; + optional bytes tokens_conf = 8; + } message ContainerStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 1efe5417103..1f76c34960b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -54,6 +54,7 @@ extends ContainerLaunchContext { private Map localResources = null; private ByteBuffer tokens = null; + private ByteBuffer tokensConf = null; private Map serviceData = null; private Map environment = null; private List commands = null; @@ -111,6 +112,9 @@ extends ContainerLaunchContext { if (this.tokens != null) { builder.setTokens(convertToProtoFormat(this.tokens)); } + if (this.tokensConf != null) { + builder.setTokensConf(convertToProtoFormat(this.tokensConf)); + } if (this.serviceData != null) { addServiceDataToProto(); } @@ -267,6 +271,28 @@ extends ContainerLaunchContext { this.tokens = tokens; } + @Override + public ByteBuffer getTokensConf() { + ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.tokensConf != null) { + return this.tokensConf; + } + if (!p.hasTokensConf()) { + return null; + } + this.tokensConf = convertFromProtoFormat(p.getTokensConf()); + return this.tokensConf; + } + + @Override + public void setTokensConf(ByteBuffer tokensConf) { + maybeInitBuilder(); + if (tokensConf == null) { + builder.clearTokensConf(); + } + this.tokensConf = tokensConf; + } + @Override public Map getServiceData() { initServiceData(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 07336084c08..2a7e8834a54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -699,6 +699,16 @@ 30000 + + Maximum size in bytes for configurations that can be provided + by application to RM for delegation token renewal. + By experiment, it's roughly 128 bytes per key-value pair. + The default value 12800 allows roughly 100 configs, may be less. + + yarn.resourcemanager.delegation-token.max-conf-size-bytes + 12800 + + If true, ResourceManager will have proxy-user privileges. Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 74c06ff9781..e7f47af2647 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -29,8 +29,11 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.AMCommand; @@ -62,6 +65,8 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -496,4 +501,31 @@ public class BuilderUtils { return response; } + + public static Credentials parseCredentials( + ApplicationSubmissionContext application) throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + ByteBuffer tokens = application.getAMContainerSpec().getTokens(); + if (tokens != null) { + dibb.reset(tokens); + credentials.readTokenStorageStream(dibb); + tokens.rewind(); + } + return credentials; + } + + public static Configuration parseTokensConf( + ApplicationSubmissionContext context) throws IOException { + ByteBuffer tokensConf = context.getAMContainerSpec().getTokensConf(); + if (tokensConf == null) { + return null; + } + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(tokensConf); + Configuration appConf = new Configuration(false); + appConf.readFields(dibb); + tokensConf.rewind(); + return appConf; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index ec71e38ccec..74867ac4d0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.security.AccessControlException; import java.text.MessageFormat; import java.util.ArrayList; @@ -589,6 +590,21 @@ public class ClientRMService extends AbstractService implements return SubmitApplicationResponse.newInstance(); } + ByteBuffer tokenConf = + submissionContext.getAMContainerSpec().getTokensConf(); + if (tokenConf != null) { + int maxSize = getConfig() + .getInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES); + LOG.info("Using app provided configurations for delegation token renewal," + + " total size = " + tokenConf.capacity()); + if (tokenConf.capacity() > maxSize) { + throw new YarnException( + "Exceed " + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE + + " = " + maxSize + " bytes, current conf size = " + + tokenConf.capacity() + " bytes."); + } + } if (submissionContext.getQueue() == null) { submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); } @@ -623,8 +639,7 @@ public class ClientRMService extends AbstractService implements RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId, callerContext); } catch (YarnException e) { - LOG.info("Exception in submitting application with id " + - applicationId.getId(), e); + LOG.info("Exception in submitting " + applicationId, e); RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, e.getMessage(), "ClientRMService", "Exception in submitting application", applicationId, callerContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 41ad8b7a916..4281b7985f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -17,18 +17,14 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -297,14 +293,14 @@ public class RMAppManager implements EventHandler, RMAppImpl application = createAndPopulateNewRMApp(submissionContext, submitTime, user, false); - Credentials credentials = null; try { - credentials = parseCredentials(submissionContext); if (UserGroupInformation.isSecurityEnabled()) { this.rmContext.getDelegationTokenRenewer() - .addApplicationAsync(applicationId, credentials, + .addApplicationAsync(applicationId, + BuilderUtils.parseCredentials(submissionContext), submissionContext.getCancelTokensWhenComplete(), - application.getUser()); + application.getUser(), + BuilderUtils.parseTokensConf(submissionContext)); } else { // Dispatcher is not yet started at this time, so these START events // enqueued should be guaranteed to be first processed when dispatcher @@ -313,11 +309,10 @@ public class RMAppManager implements EventHandler, .handle(new RMAppEvent(applicationId, RMAppEventType.START)); } } catch (Exception e) { - LOG.warn("Unable to parse credentials.", e); + LOG.warn("Unable to parse credentials for " + applicationId, e); // Sending APP_REJECTED is fine, since we assume that the // RMApp is in NEW state and thus we haven't yet informed the // scheduler about the existence of the application - assert application.getState() == RMAppState.NEW; this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, e.getMessage())); @@ -507,20 +502,7 @@ public class RMAppManager implements EventHandler, return null; } - - protected Credentials parseCredentials( - ApplicationSubmissionContext application) throws IOException { - Credentials credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - ByteBuffer tokens = application.getAMContainerSpec().getTokens(); - if (tokens != null) { - dibb.reset(tokens); - credentials.readTokenStorageStream(dibb); - tokens.rewind(); - } - return credentials; - } - + @Override public void recover(RMState state) throws Exception { RMStateStore store = rmContext.getStateStore(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 71b5ab76915..17f3c728019 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1035,9 +1035,12 @@ public class RMAppImpl implements RMApp, Recoverable { try { app.rmContext.getDelegationTokenRenewer() .addApplicationAsyncDuringRecovery(app.getApplicationId(), - app.parseCredentials(), + BuilderUtils.parseCredentials(app.submissionContext), app.submissionContext.getCancelTokensWhenComplete(), - app.getUser()); + app.getUser(), + BuilderUtils.parseTokensConf(app.submissionContext)); + // set the memory free + app.submissionContext.getAMContainerSpec().setTokensConf(null); } catch (Exception e) { String msg = "Failed to fetch user credentials from application:" + e.getMessage(); @@ -1090,6 +1093,8 @@ public class RMAppImpl implements RMApp, Recoverable { app.submissionContext, false, app.applicationPriority)); // send the ATS create Event app.sendATSCreateEvent(); + // Set the memory free after submission context is persisted + app.submissionContext.getAMContainerSpec().setTokensConf(null); } } @@ -1405,6 +1410,8 @@ public class RMAppImpl implements RMApp, Recoverable { .applicationFinished(app, finalState); app.rmContext.getSystemMetricsPublisher() .appFinished(app, finalState, app.finishTime); + // set the memory free + app.submissionContext.getAMContainerSpec().setTokensConf(null); }; } @@ -1614,18 +1621,6 @@ public class RMAppImpl implements RMApp, Recoverable { return this.amReq; } - protected Credentials parseCredentials() throws IOException { - Credentials credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - ByteBuffer tokens = submissionContext.getAMContainerSpec().getTokens(); - if (tokens != null) { - dibb.reset(tokens); - credentials.readTokenStorageStream(dibb); - tokens.rewind(); - } - return credentials; - } - @Override public Map getLogAggregationReportsForApp() { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index dfbf33397d1..abb8d59ff0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -29,6 +29,7 @@ import java.util.Date; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.Timer; @@ -379,43 +380,43 @@ public class DelegationTokenRenewer extends AbstractService { * @param applicationId added application * @param ts tokens * @param shouldCancelAtEnd true if tokens should be canceled when the app is - * done else false. + * done else false. * @param user user + * @param tokenConf tokenConf sent by the app-submitter */ public void addApplicationAsync(ApplicationId applicationId, Credentials ts, - boolean shouldCancelAtEnd, String user) { + boolean shouldCancelAtEnd, String user, Configuration tokenConf) { processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent( - applicationId, ts, shouldCancelAtEnd, user)); + applicationId, ts, shouldCancelAtEnd, user, tokenConf)); } /** * Asynchronously add application tokens for renewal. - * - * @param applicationId + * @param applicationId * added application * @param ts * tokens * @param shouldCancelAtEnd * true if tokens should be canceled when the app is done else false. - * @param user - * user + * @param user user + * @param tokenConf tokenConf sent by the app-submitter */ public void addApplicationAsyncDuringRecovery(ApplicationId applicationId, - Credentials ts, boolean shouldCancelAtEnd, String user) { + Credentials ts, boolean shouldCancelAtEnd, String user, + Configuration tokenConf) { processDelegationTokenRenewerEvent( new DelegationTokenRenewerAppRecoverEvent(applicationId, ts, - shouldCancelAtEnd, user)); + shouldCancelAtEnd, user, tokenConf)); } - /** - * Synchronously renew delegation tokens. - * @param user user - */ + + // Only for testing + // Synchronously renew delegation tokens. public void addApplicationSync(ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd, String user) throws IOException, InterruptedException { handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent( - applicationId, ts, shouldCancelAtEnd, user)); + applicationId, ts, shouldCancelAtEnd, user, new Configuration())); } private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) @@ -455,8 +456,27 @@ public class DelegationTokenRenewer extends AbstractService { DelegationTokenToRenew dttr = allTokens.get(token); if (dttr == null) { + Configuration tokenConf; + if (evt.tokenConf != null) { + // Override conf with app provided conf - this is required in cases + // where RM does not have the required conf to communicate with + // remote hdfs cluster. The conf is provided by the application + // itself. + tokenConf = evt.tokenConf; + LOG.info("Using app provided token conf for renewal," + + " number of configs = " + tokenConf.size()); + if (LOG.isDebugEnabled()) { + for (Iterator> itor = + tokenConf.iterator(); itor.hasNext(); ) { + Map.Entry entry = itor.next(); + LOG.info(entry.getKey() + " ===> " + entry.getValue()); + } + } + } else { + tokenConf = getConfig(); + } dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token, - getConfig(), now, shouldCancelAtEnd, evt.getUser()); + tokenConf, now, shouldCancelAtEnd, evt.getUser()); try { renewToken(dttr); } catch (IOException ioe) { @@ -926,22 +946,22 @@ public class DelegationTokenRenewer extends AbstractService { } static class DelegationTokenRenewerAppSubmitEvent - extends - AbstractDelegationTokenRenewerAppEvent { + extends AbstractDelegationTokenRenewerAppEvent { public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, - Credentials credentails, boolean shouldCancelAtEnd, String user) { + Credentials credentails, boolean shouldCancelAtEnd, String user, + Configuration tokenConf) { super(appId, credentails, shouldCancelAtEnd, user, - DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); + DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION, tokenConf); } } static class DelegationTokenRenewerAppRecoverEvent - extends - AbstractDelegationTokenRenewerAppEvent { + extends AbstractDelegationTokenRenewerAppEvent { public DelegationTokenRenewerAppRecoverEvent(ApplicationId appId, - Credentials credentails, boolean shouldCancelAtEnd, String user) { + Credentials credentails, boolean shouldCancelAtEnd, String user, + Configuration tokenConf) { super(appId, credentails, shouldCancelAtEnd, user, - DelegationTokenRenewerEventType.RECOVER_APPLICATION); + DelegationTokenRenewerEventType.RECOVER_APPLICATION, tokenConf); } } @@ -949,16 +969,18 @@ public class DelegationTokenRenewer extends AbstractService { DelegationTokenRenewerEvent { private Credentials credentials; + private Configuration tokenConf; private boolean shouldCancelAtEnd; private String user; public AbstractDelegationTokenRenewerAppEvent(ApplicationId appId, - Credentials credentails, boolean shouldCancelAtEnd, String user, - DelegationTokenRenewerEventType type) { + Credentials credentials, boolean shouldCancelAtEnd, String user, + DelegationTokenRenewerEventType type, Configuration tokenConf) { super(appId, type); - this.credentials = credentails; + this.credentials = credentials; this.shouldCancelAtEnd = shouldCancelAtEnd; this.user = user; + this.tokenConf = tokenConf; } public Credentials getCredentials() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 02d39560bcb..7d19dab398f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; @@ -563,7 +564,7 @@ public class MockRM extends ResourceManager { return submitApp(resource, name, user, acls, false, queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, 0, null, true, priority, amLabel, null); + false, null, 0, null, true, priority, amLabel, null, null); } public RMApp submitApp(Resource resource, String name, String user, @@ -664,7 +665,17 @@ public class MockRM extends ResourceManager { return submitApp(capability, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, isAppIdProvided, applicationId, attemptFailuresValidityInterval, - logAggregationContext, cancelTokensWhenComplete, priority, "", null); + logAggregationContext, cancelTokensWhenComplete, priority, "", null, + null); + } + + public RMApp submitApp(Credentials cred, ByteBuffer tokensConf) + throws Exception { + return submitApp(Resource.newInstance(200, 1), "app1", "user", null, false, + null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), cred, null, true, + false, false, null, 0, null, true, Priority.newInstance(0), null, null, + tokensConf); } public RMApp submitApp(Resource capability, String name, String user, @@ -674,7 +685,8 @@ public class MockRM extends ResourceManager { ApplicationId applicationId, long attemptFailuresValidityInterval, LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete, Priority priority, String amLabel, - Map applicationTimeouts) + Map applicationTimeouts, + ByteBuffer tokensConf) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); @@ -713,6 +725,7 @@ public class MockRM extends ResourceManager { ts.writeTokenStorageToStream(dob); ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); clc.setTokens(securityTokens); + clc.setTokensConf(tokensConf); } sub.setAMContainerSpec(clc); sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); @@ -729,22 +742,20 @@ public class MockRM extends ResourceManager { req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); - PrivilegedAction action = - new PrivilegedAction() { + PrivilegedExceptionAction action = + new PrivilegedExceptionAction() { ApplicationClientProtocol client; SubmitApplicationRequest req; @Override - public SubmitApplicationResponse run() { + public SubmitApplicationResponse run() throws IOException, YarnException { try { return client.submitApplication(req); - } catch (YarnException e) { - e.printStackTrace(); - } catch (IOException e) { + } catch (YarnException | IOException e) { e.printStackTrace(); + throw e; } - return null; } - PrivilegedAction setClientReq( + PrivilegedExceptionAction setClientReq( ApplicationClientProtocol client, SubmitApplicationRequest req) { this.client = client; this.req = req; @@ -1224,6 +1235,7 @@ public class MockRM extends ResourceManager { null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, - false, false, null, 0, null, true, priority, null, applicationTimeouts); + false, false, null, 0, null, true, priority, null, applicationTimeouts, + null); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 39e07e08cf2..1aec76f3004 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -38,9 +38,11 @@ import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -254,6 +256,7 @@ public class TestAppManager{ public void tearDown() { setAppEventType(RMAppEventType.KILL); ((Service)rmContext.getDispatcher()).stop(); + UserGroupInformation.reset(); } @Test @@ -306,13 +309,15 @@ public class TestAppManager{ ResourceRequest.ANY, Resource.newInstance(1024, 1), 1); sub.setAMContainerResourceRequest(resReg); req.setApplicationSubmissionContext(sub); + sub.setAMContainerSpec(mock(ContainerLaunchContext.class)); try { rmService.submitApplication(req); } catch (Exception e) { + e.printStackTrace(); if (e instanceof YarnException) { Assert.assertTrue(e.getCause() instanceof AccessControlException); } else { - Assert.fail("Yarn exception is expected"); + Assert.fail("Yarn exception is expected : " + e.getMessage()); } } finally { mockRM.close(); @@ -538,6 +543,10 @@ public class TestAppManager{ DataOutputBuffer dob = new DataOutputBuffer(); ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); asContext.getAMContainerSpec().setTokens(securityTokens); try { appMonitor.submitApplication(asContext, "test"); @@ -559,36 +568,6 @@ public class TestAppManager{ asContext.getAMContainerSpec().setTokens(null); } - @Test - public void testRMAppSubmitWithValidTokens() throws Exception { - // Setup valid security tokens - DataOutputBuffer dob = new DataOutputBuffer(); - Credentials credentials = new Credentials(); - credentials.writeTokenStorageToStream(dob); - ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, - dob.getLength()); - asContext.getAMContainerSpec().setTokens(securityTokens); - appMonitor.submitApplication(asContext, "test"); - RMApp app = rmContext.getRMApps().get(appId); - Assert.assertNotNull("app is null", app); - Assert.assertEquals("app id doesn't match", appId, - app.getApplicationId()); - Assert.assertEquals("app state doesn't match", RMAppState.NEW, - app.getState()); - verify(metricsPublisher).appACLsUpdated( - any(RMApp.class), any(String.class), anyLong()); - - // wait for event to be processed - int timeoutSecs = 0; - while ((getAppEventType() == RMAppEventType.KILL) && - timeoutSecs++ < 20) { - Thread.sleep(1000); - } - Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, - getAppEventType()); - asContext.getAMContainerSpec().setTokens(null); - } - @Test (timeout = 30000) public void testRMAppSubmitMaxAppAttempts() throws Exception { int[] globalMaxAppAttempts = new int[] { 10, 1 }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 8f60ed5d7e1..9fc92285715 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -112,6 +112,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -1678,32 +1679,28 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { memStore.init(conf); MockRM rm1 = new TestSecurityMockRM(conf, memStore) { - @Override - protected RMAppManager createRMAppManager() { - return new TestRMAppManager(this.rmContext, this.scheduler, - this.masterService, this.applicationACLsManager, conf); + class TestDelegationTokenRenewer extends DelegationTokenRenewer { + public void addApplicationAsync(ApplicationId applicationId, Credentials ts, + boolean shouldCancelAtEnd, String user, Configuration appConf) { + throw new RuntimeException("failed to submit app"); + } } - - class TestRMAppManager extends RMAppManager { - - public TestRMAppManager(RMContext context, YarnScheduler scheduler, - ApplicationMasterService masterService, - ApplicationACLsManager applicationACLsManager, Configuration conf) { - super(context, scheduler, masterService, applicationACLsManager, conf); - } - - @Override - protected Credentials parseCredentials( - ApplicationSubmissionContext application) throws IOException { - throw new IOException("Parsing credential error."); - } + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new TestDelegationTokenRenewer(); } }; rm1.start(); - RMApp app1 = - rm1.submitApp(200, "name", "user", + RMApp app1 = null; + try { + app1 = rm1.submitApp(200, "name", "user", new HashMap(), false, "default", -1, null, "MAPREDUCE", false); + Assert.fail(); + } catch (Exception e) { + + } + app1 = rm1.getRMContext().getRMApps().values().iterator().next(); rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); // Check app staet is saved in state store. Assert.assertEquals(RMAppState.FAILED, memStore.getState() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 087199da192..b13a37a23bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -30,6 +30,7 @@ import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -486,9 +487,10 @@ public class TestRMAppLogAggregationStatus { private RMApp createRMApp(Configuration conf) { ApplicationSubmissionContext submissionContext = - ApplicationSubmissionContext.newInstance(appId, "test", "default", - Priority.newInstance(0), null, false, true, - 2, Resource.newInstance(10, 2), "test"); + ApplicationSubmissionContext + .newInstance(appId, "test", "default", Priority.newInstance(0), + mock(ContainerLaunchContext.class), false, true, 2, + Resource.newInstance(10, 2), "test"); return new RMAppImpl(this.appId, this.rmContext, conf, "test", "test", "default", submissionContext, scheduler, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 1d74d8175dd..9977683898f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -264,7 +264,7 @@ public class TestRMAppTransitions { // but applicationId is still set for safety submissionContext.setApplicationId(applicationId); submissionContext.setPriority(Priority.newInstance(0)); - + submissionContext.setAMContainerSpec(mock(ContainerLaunchContext.class)); RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, user, queue, submissionContext, scheduler, masterService, System.currentTimeMillis(), "YARN", null, mock(ResourceRequest.class)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 8e6272a6dfd..2ef2e76da31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -229,10 +230,10 @@ public class FairSchedulerTestBase { String queue, String user, Resource amResource) { RMContext rmContext = resourceManager.getRMContext(); ApplicationId appId = attId.getApplicationId(); - RMApp rmApp = new RMAppImpl(appId, rmContext, conf, - null, user, null, ApplicationSubmissionContext.newInstance(appId, null, - queue, null, null, false, false, 0, amResource, null), scheduler, null, - 0, null, null, null); + RMApp rmApp = new RMAppImpl(appId, rmContext, conf, null, user, null, + ApplicationSubmissionContext.newInstance(appId, null, queue, null, + mock(ContainerLaunchContext.class), false, false, 0, amResource, + null), scheduler, null, 0, null, null, null); rmContext.getRMApps().put(appId, rmApp); RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START); resourceManager.getRMContext().getRMApps().get(appId).handle(event); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 66760229a72..0b4d3aae329 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -392,7 +393,8 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user"); + delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user", + new Configuration()); waitForEventsToGetProcessed(delegationTokenRenewer); // first 3 initial renewals + 1 real @@ -432,7 +434,8 @@ public class TestDelegationTokenRenewer { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user"); + delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user", + new Configuration()); waitForEventsToGetProcessed(delegationTokenRenewer); delegationTokenRenewer.applicationFinished(applicationId_1); waitForEventsToGetProcessed(delegationTokenRenewer); @@ -468,7 +471,8 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId appId = BuilderUtils.newApplicationId(0, 0); - delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user"); + delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user", + new Configuration()); int waitCnt = 20; while (waitCnt-- >0) { if (!eventQueue.isEmpty()) { @@ -531,7 +535,8 @@ public class TestDelegationTokenRenewer { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user"); + delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user", + new Configuration()); waitForEventsToGetProcessed(delegationTokenRenewer); delegationTokenRenewer.applicationFinished(applicationId_1); waitForEventsToGetProcessed(delegationTokenRenewer); @@ -600,7 +605,8 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplicationAsync(applicationId_0, ts, true, "user"); + localDtr.addApplicationAsync(applicationId_0, ts, true, "user", + new Configuration()); waitForEventsToGetProcessed(localDtr); if (!eventQueue.isEmpty()){ Event evt = eventQueue.take(); @@ -679,7 +685,8 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplicationAsync(applicationId_0, ts, true, "user"); + localDtr.addApplicationAsync(applicationId_0, ts, true, "user", + new Configuration()); localDtr.applicationFinished(applicationId_0); waitForEventsToGetProcessed(delegationTokenRenewer); //Send another keep alive. @@ -831,14 +838,16 @@ public class TestDelegationTokenRenewer { Thread submitThread = new Thread() { @Override public void run() { - dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user"); + dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user", + new Configuration()); } }; submitThread.start(); // wait till 1st submit blocks, then submit another startBarrier.await(); - dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user"); + dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user", + new Configuration()); // signal 1st to complete endBarrier.await(); submitThread.join(); @@ -1273,4 +1282,106 @@ public class TestDelegationTokenRenewer { } }, 10, 10000); } + + // Test DelegationTokenRenewer uses the tokenConf provided by application + // for token renewal. + @Test + public void testRenewTokenUsingTokenConfProvidedByApp() throws Exception{ + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + final MockRM rm = new TestSecurityMockRM(conf, null); + rm.start(); + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm1.registerNode(); + + // create a token + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + final Token token1 = + new Token(dtId1.getBytes(), + "password1".getBytes(), dtId1.getKind(), new Text("service1")); + Credentials credentials = new Credentials(); + credentials.addToken(userText1, token1); + + // create token conf for renewal + Configuration appConf = new Configuration(false); + appConf.set("dfs.nameservices", "mycluster1,mycluster2"); + appConf.set("dfs.namenode.rpc-address.mycluster2.nn1", "123.0.0.1"); + appConf.set("dfs.namenode.rpc-address.mycluster2.nn2", "123.0.0.2"); + appConf.set("dfs.ha.namenodes.mycluster2", "nn1,nn2"); + appConf.set("dfs.client.failover.proxy.provider.mycluster2", "provider"); + DataOutputBuffer dob = new DataOutputBuffer(); + appConf.write(dob); + ByteBuffer tokenConf = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + final int confSize = appConf.size(); + + // submit app + RMApp app = rm.submitApp(credentials, tokenConf); + + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + DelegationTokenToRenew toRenew = + rm.getRMContext().getDelegationTokenRenewer().getAllTokens() + .get(token1); + // check app conf size equals to original size and it does contain + // the specific config we added. + return toRenew != null && toRenew.conf != null + && toRenew.conf.size() == confSize && toRenew.conf + .get("dfs.namenode.rpc-address.mycluster2.nn1").equals("123.0.0.1"); + } + }, 200, 10000); + } + + // Test if app's token conf exceeds RM_DELEGATION_TOKEN_MAX_CONF_SIZE, + // app should fail + @Test + public void testTokensConfExceedLimit() throws Exception { + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + // limit 100 bytes + conf.setInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE, 100); + MockRM rm = new TestSecurityMockRM(conf, null); + rm.start(); + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm1.registerNode(); + + // create a token + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + final Token token1 = + new Token(dtId1.getBytes(), + "password1".getBytes(), dtId1.getKind(), new Text("service1")); + Credentials credentials = new Credentials(); + credentials.addToken(userText1, token1); + + // create token conf for renewal, total size (512 bytes) > limit (100 bytes) + // By experiment, it's roughly 128 bytes per key-value pair. + Configuration appConf = new Configuration(false); + appConf.clear(); + appConf.set("dfs.nameservices", "mycluster1,mycluster2"); // 128 bytes + appConf.set("dfs.namenode.rpc-address.mycluster2.nn1", "123.0.0.1"); //128 bytes + appConf.set("dfs.namenode.rpc-address.mycluster3.nn2", "123.0.0.2"); // 128 bytes + + DataOutputBuffer dob = new DataOutputBuffer(); + appConf.write(dob); + ByteBuffer tokenConf = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + + try { + rm.submitApp(credentials, tokenConf); + Assert.fail(); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(e.getCause().getMessage() + .contains(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE)); + } + } }