diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e3d068e02a1..905158f2082 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -531,6 +531,9 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery (Jerry Chen via jlowe) + MAPREDUCE-4921. JobClient should acquire HS token with RM principal + (daryn via bobby) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java index 803b584b7d5..83346746a03 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java @@ -144,13 +144,9 @@ public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } * we have to add this hack. */ private boolean getDelegationTokenCalled = false; - /* notes the renewer that will renew the delegation token */ - private String dtRenewer = null; /* do we need a HS delegation token for this client */ static final String HS_DELEGATION_TOKEN_REQUIRED = "mapreduce.history.server.delegationtoken.required"; - static final String HS_DELEGATION_TOKEN_RENEWER - = "mapreduce.history.server.delegationtoken.renewer"; static{ ConfigUtil.loadResources(); @@ -576,8 +572,6 @@ public RunningJob submitJob(final JobConf conf) throws FileNotFoundException, if (getDelegationTokenCalled) { conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled); getDelegationTokenCalled = false; - conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer); - dtRenewer = null; } Job job = clientUgi.doAs(new PrivilegedExceptionAction () { @Override @@ -1178,7 +1172,6 @@ public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() public Token getDelegationToken(final Text renewer) throws IOException, InterruptedException { getDelegationTokenCalled = true; - dtRenewer = renewer.toString(); return clientUgi.doAs(new PrivilegedExceptionAction>() { public Token run() throws IOException, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 98c94fab804..427f5a03f88 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ProtoUtils; +import com.google.common.annotations.VisibleForTesting; /** * This class enables the current JobClient (0.22 hadoop) to run on YARN. @@ -184,12 +185,12 @@ public ClusterMetrics getClusterMetrics() throws IOException, return resMgrDelegate.getClusterMetrics(); } - private Token getDelegationTokenFromHS( - MRClientProtocol hsProxy, Text renewer) throws IOException, - InterruptedException { + @VisibleForTesting + Token getDelegationTokenFromHS(MRClientProtocol hsProxy) + throws IOException, InterruptedException { GetDelegationTokenRequest request = recordFactory .newRecordInstance(GetDelegationTokenRequest.class); - request.setRenewer(renewer.toString()); + request.setRenewer(Master.getMasterPrincipal(conf)); DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request) .getDelegationToken(); return ProtoUtils.convertFromProtoFormat(mrDelegationToken, @@ -269,8 +270,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) // the delegation tokens for the HistoryServer also. if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) { - Token hsDT = getDelegationTokenFromHS(hsProxy, new Text( - conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER))); + Token hsDT = getDelegationTokenFromHS(hsProxy); ts.addToken(hsDT.getService(), hsDT); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java similarity index 85% rename from hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index aa844c025f1..6a67bbd3a6d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.mapreduce.v2; +package org.apache.hadoop.mapred; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; @@ -29,6 +29,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; import java.util.List; import junit.framework.TestCase; @@ -41,6 +43,7 @@ import org.apache.hadoop.mapred.ClientCache; import org.apache.hadoop.mapred.ClientServiceDelegate; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Master; import org.apache.hadoop.mapred.ResourceMgrDelegate; import org.apache.hadoop.mapred.YARNRunner; import org.apache.hadoop.mapreduce.JobID; @@ -48,7 +51,11 @@ import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; @@ -69,6 +76,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -245,6 +253,46 @@ public synchronized void start() { verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)); } + @Test + public void testHistoryServerToken() throws Exception { + final String masterPrincipal = Master.getMasterPrincipal(conf); + + final MRClientProtocol hsProxy = mock(MRClientProtocol.class); + when(hsProxy.getDelegationToken(any(GetDelegationTokenRequest.class))).thenAnswer( + new Answer() { + public GetDelegationTokenResponse answer(InvocationOnMock invocation) { + GetDelegationTokenRequest request = + (GetDelegationTokenRequest)invocation.getArguments()[0]; + // check that the renewer matches the cluster's RM principal + assertEquals(request.getRenewer(), masterPrincipal); + + DelegationToken token = + recordFactory.newRecordInstance(DelegationToken.class); + // none of these fields matter for the sake of the test + token.setKind(""); + token.setService(""); + token.setIdentifier(ByteBuffer.allocate(0)); + token.setPassword(ByteBuffer.allocate(0)); + GetDelegationTokenResponse tokenResponse = + recordFactory.newRecordInstance(GetDelegationTokenResponse.class); + tokenResponse.setDelegationToken(token); + return tokenResponse; + } + }); + + UserGroupInformation.createRemoteUser("someone").doAs( + new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + yarnRunner = new YARNRunner(conf, null, null); + yarnRunner.getDelegationTokenFromHS(hsProxy); + verify(hsProxy). + getDelegationToken(any(GetDelegationTokenRequest.class)); + return null; + } + }); + } + @Test public void testAMAdminCommandOpts() throws Exception { JobConf jobConf = new JobConf();