svn merge -c 1432230 FIXES: MAPREDUCE-4921. JobClient should acquire HS token with RM principal (daryn via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1432232 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2013-01-11 19:05:02 +00:00
parent 0dadaa47c3
commit 63106e20e0
4 changed files with 58 additions and 14 deletions

View File

@ -531,6 +531,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery (Jerry MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery (Jerry
Chen via jlowe) Chen via jlowe)
MAPREDUCE-4921. JobClient should acquire HS token with RM principal
(daryn via bobby)
Release 0.23.5 - UNRELEASED Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -144,13 +144,9 @@ public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
* we have to add this hack. * we have to add this hack.
*/ */
private boolean getDelegationTokenCalled = false; private boolean getDelegationTokenCalled = false;
/* notes the renewer that will renew the delegation token */
private String dtRenewer = null;
/* do we need a HS delegation token for this client */ /* do we need a HS delegation token for this client */
static final String HS_DELEGATION_TOKEN_REQUIRED static final String HS_DELEGATION_TOKEN_REQUIRED
= "mapreduce.history.server.delegationtoken.required"; = "mapreduce.history.server.delegationtoken.required";
static final String HS_DELEGATION_TOKEN_RENEWER
= "mapreduce.history.server.delegationtoken.renewer";
static{ static{
ConfigUtil.loadResources(); ConfigUtil.loadResources();
@ -576,8 +572,6 @@ public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
if (getDelegationTokenCalled) { if (getDelegationTokenCalled) {
conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled); conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
getDelegationTokenCalled = false; getDelegationTokenCalled = false;
conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer);
dtRenewer = null;
} }
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () { Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
@Override @Override
@ -1178,7 +1172,6 @@ public org.apache.hadoop.mapreduce.QueueAclsInfo[] run()
public Token<DelegationTokenIdentifier> public Token<DelegationTokenIdentifier>
getDelegationToken(final Text renewer) throws IOException, InterruptedException { getDelegationToken(final Text renewer) throws IOException, InterruptedException {
getDelegationTokenCalled = true; getDelegationTokenCalled = true;
dtRenewer = renewer.toString();
return clientUgi.doAs(new return clientUgi.doAs(new
PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
public Token<DelegationTokenIdentifier> run() throws IOException, public Token<DelegationTokenIdentifier> run() throws IOException,

View File

@ -85,6 +85,7 @@
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.hadoop.yarn.util.ProtoUtils;
import com.google.common.annotations.VisibleForTesting;
/** /**
* This class enables the current JobClient (0.22 hadoop) to run on YARN. * This class enables the current JobClient (0.22 hadoop) to run on YARN.
@ -184,12 +185,12 @@ public ClusterMetrics getClusterMetrics() throws IOException,
return resMgrDelegate.getClusterMetrics(); return resMgrDelegate.getClusterMetrics();
} }
private Token<?> getDelegationTokenFromHS( @VisibleForTesting
MRClientProtocol hsProxy, Text renewer) throws IOException, Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
InterruptedException { throws IOException, InterruptedException {
GetDelegationTokenRequest request = recordFactory GetDelegationTokenRequest request = recordFactory
.newRecordInstance(GetDelegationTokenRequest.class); .newRecordInstance(GetDelegationTokenRequest.class);
request.setRenewer(renewer.toString()); request.setRenewer(Master.getMasterPrincipal(conf));
DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request) DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request)
.getDelegationToken(); .getDelegationToken();
return ProtoUtils.convertFromProtoFormat(mrDelegationToken, return ProtoUtils.convertFromProtoFormat(mrDelegationToken,
@ -269,8 +270,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
// the delegation tokens for the HistoryServer also. // the delegation tokens for the HistoryServer also.
if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) { DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
Token hsDT = getDelegationTokenFromHS(hsProxy, new Text( Token hsDT = getDelegationTokenFromHS(hsProxy);
conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
ts.addToken(hsDT.getService(), hsDT); ts.addToken(hsDT.getService(), hsDT);
} }
} }

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.mapreduce.v2; package org.apache.hadoop.mapred;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -29,6 +29,8 @@
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.List; import java.util.List;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -41,6 +43,7 @@
import org.apache.hadoop.mapred.ClientCache; import org.apache.hadoop.mapred.ClientCache;
import org.apache.hadoop.mapred.ClientServiceDelegate; import org.apache.hadoop.mapred.ClientServiceDelegate;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Master;
import org.apache.hadoop.mapred.ResourceMgrDelegate; import org.apache.hadoop.mapred.ResourceMgrDelegate;
import org.apache.hadoop.mapred.YARNRunner; import org.apache.hadoop.mapred.YARNRunner;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
@ -48,7 +51,11 @@
import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
@ -69,6 +76,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -245,6 +253,46 @@ public synchronized void start() {
verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)); verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
} }
@Test
public void testHistoryServerToken() throws Exception {
final String masterPrincipal = Master.getMasterPrincipal(conf);
final MRClientProtocol hsProxy = mock(MRClientProtocol.class);
when(hsProxy.getDelegationToken(any(GetDelegationTokenRequest.class))).thenAnswer(
new Answer<GetDelegationTokenResponse>() {
public GetDelegationTokenResponse answer(InvocationOnMock invocation) {
GetDelegationTokenRequest request =
(GetDelegationTokenRequest)invocation.getArguments()[0];
// check that the renewer matches the cluster's RM principal
assertEquals(request.getRenewer(), masterPrincipal);
DelegationToken token =
recordFactory.newRecordInstance(DelegationToken.class);
// none of these fields matter for the sake of the test
token.setKind("");
token.setService("");
token.setIdentifier(ByteBuffer.allocate(0));
token.setPassword(ByteBuffer.allocate(0));
GetDelegationTokenResponse tokenResponse =
recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
tokenResponse.setDelegationToken(token);
return tokenResponse;
}
});
UserGroupInformation.createRemoteUser("someone").doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
yarnRunner = new YARNRunner(conf, null, null);
yarnRunner.getDelegationTokenFromHS(hsProxy);
verify(hsProxy).
getDelegationToken(any(GetDelegationTokenRequest.class));
return null;
}
});
}
@Test @Test
public void testAMAdminCommandOpts() throws Exception { public void testAMAdminCommandOpts() throws Exception {
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf();