MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is submitting a job (daryn)

merge -c1463804 from branch-2.0.4-alpha



git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1464153 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Konstantin Boudnik 2013-04-03 18:45:15 +00:00
parent dd2e87a79f
commit d18cc69d4e
5 changed files with 137 additions and 52 deletions

View File

@ -214,6 +214,9 @@ Release 2.0.4-beta - UNRELEASED
fix failures in renewal of HistoryServer's delegations tokens. (Siddharth fix failures in renewal of HistoryServer's delegations tokens. (Siddharth
Seth via vinodkv) Seth via vinodkv)
MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is
submitting a job (Daryn Sharp via cos)
Release 2.0.3-alpha - 2013-02-06 Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -138,15 +138,6 @@ import org.apache.hadoop.util.ToolRunner;
public class JobClient extends CLI { public class JobClient extends CLI {
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
/* notes that get delegation token was called. Again this is hack for oozie
* to make sure we add history server delegation tokens to the credentials
* for the job. Since the api only allows one delegation token to be returned,
* we have to add this hack.
*/
private boolean getDelegationTokenCalled = false;
/* do we need a HS delegation token for this client */
static final String HS_DELEGATION_TOKEN_REQUIRED
= "mapreduce.history.server.delegationtoken.required";
static{ static{
ConfigUtil.loadResources(); ConfigUtil.loadResources();
@ -569,10 +560,6 @@ public class JobClient extends CLI {
try { try {
conf.setBooleanIfUnset("mapred.mapper.new-api", false); conf.setBooleanIfUnset("mapred.mapper.new-api", false);
conf.setBooleanIfUnset("mapred.reducer.new-api", false); conf.setBooleanIfUnset("mapred.reducer.new-api", false);
if (getDelegationTokenCalled) {
conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
getDelegationTokenCalled = false;
}
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () { Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
@Override @Override
public Job run() throws IOException, ClassNotFoundException, public Job run() throws IOException, ClassNotFoundException,
@ -1173,7 +1160,6 @@ public class JobClient extends CLI {
*/ */
public Token<DelegationTokenIdentifier> public Token<DelegationTokenIdentifier>
getDelegationToken(final Text renewer) throws IOException, InterruptedException { getDelegationToken(final Text renewer) throws IOException, InterruptedException {
getDelegationTokenCalled = true;
return clientUgi.doAs(new return clientUgi.doAs(new
PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
public Token<DelegationTokenIdentifier> run() throws IOException, public Token<DelegationTokenIdentifier> run() throws IOException,

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -87,6 +88,10 @@ public class ResourceMgrDelegate extends YarnClientImpl {
return oldMetrics; return oldMetrics;
} }
InetSocketAddress getConnectAddress() {
return rmAddress;
}
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public Token getDelegationToken(Text renewer) throws IOException, public Token getDelegationToken(Text renewer) throws IOException,
InterruptedException { InterruptedException {

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequ
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMTokenSelector;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.hadoop.yarn.util.ProtoUtils;
@ -90,7 +92,7 @@ import com.google.common.annotations.VisibleForTesting;
/** /**
* This class enables the current JobClient (0.22 hadoop) to run on YARN. * This class enables the current JobClient (0.22 hadoop) to run on YARN.
*/ */
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings("unchecked")
public class YARNRunner implements ClientProtocol { public class YARNRunner implements ClientProtocol {
private static final Log LOG = LogFactory.getLog(YARNRunner.class); private static final Log LOG = LogFactory.getLog(YARNRunner.class);
@ -101,14 +103,6 @@ public class YARNRunner implements ClientProtocol {
private Configuration conf; private Configuration conf;
private final FileContext defaultFileContext; private final FileContext defaultFileContext;
/* usually is false unless the jobclient get delegation token is
* called. This is a hack wherein we do return a token from RM
* on getDelegationtoken but due to the restricted api on jobclient
* we just add a job history DT token when submitting a job.
*/
private static final boolean DEFAULT_HS_DELEGATION_TOKEN_REQUIRED =
false;
/** /**
* Yarn runner incapsulates the client interface of * Yarn runner incapsulates the client interface of
* yarn * yarn
@ -185,6 +179,28 @@ public class YARNRunner implements ClientProtocol {
return resMgrDelegate.getClusterMetrics(); return resMgrDelegate.getClusterMetrics();
} }
@VisibleForTesting
void addHistoyToken(Credentials ts) throws IOException, InterruptedException {
/* check if we have a hsproxy, if not, no need */
MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) {
/*
* note that get delegation token was called. Again this is hack for oozie
* to make sure we add history server delegation tokens to the credentials
*/
RMTokenSelector tokenSelector = new RMTokenSelector();
Text service = SecurityUtil.buildTokenService(resMgrDelegate
.getConnectAddress());
if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) {
Text hsService = SecurityUtil.buildTokenService(hsProxy
.getConnectAddress());
if (ts.getToken(hsService) == null) {
ts.addToken(hsService, getDelegationTokenFromHS(hsProxy));
}
}
}
}
@VisibleForTesting @VisibleForTesting
Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy) Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
throws IOException, InterruptedException { throws IOException, InterruptedException {
@ -263,17 +279,7 @@ public class YARNRunner implements ClientProtocol {
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException { throws IOException, InterruptedException {
/* check if we have a hsproxy, if not, no need */ addHistoyToken(ts);
MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
if (hsProxy != null) {
// JobClient will set this flag if getDelegationToken is called, if so, get
// the delegation tokens for the HistoryServer also.
if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
Token hsDT = getDelegationTokenFromHS(hsProxy);
ts.addToken(hsDT.getService(), hsDT);
}
}
// Upload only in security mode: TODO // Upload only in security mode: TODO
Path applicationTokensFile = Path applicationTokensFile =

View File

@ -20,8 +20,10 @@ package org.apache.hadoop.mapred;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -30,6 +32,7 @@ import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.List; import java.util.List;
@ -39,28 +42,24 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClientCache; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ClientServiceDelegate;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Master;
import org.apache.hadoop.mapred.ResourceMgrDelegate;
import org.apache.hadoop.mapred.YARNRunner;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -69,21 +68,27 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Appender; import org.apache.log4j.Appender;
import org.apache.log4j.Layout; import org.apache.log4j.Layout;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@ -146,7 +151,7 @@ public class TestYARNRunner extends TestCase {
} }
@Test @Test(timeout=20000)
public void testJobKill() throws Exception { public void testJobKill() throws Exception {
clientDelegate = mock(ClientServiceDelegate.class); clientDelegate = mock(ClientServiceDelegate.class);
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
@ -171,7 +176,7 @@ public class TestYARNRunner extends TestCase {
verify(clientDelegate).killJob(jobId); verify(clientDelegate).killJob(jobId);
} }
@Test @Test(timeout=20000)
public void testJobSubmissionFailure() throws Exception { public void testJobSubmissionFailure() throws Exception {
when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))). when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
thenReturn(appId); thenReturn(appId);
@ -193,7 +198,7 @@ public class TestYARNRunner extends TestCase {
} }
} }
@Test @Test(timeout=20000)
public void testResourceMgrDelegate() throws Exception { public void testResourceMgrDelegate() throws Exception {
/* we not want a mock of resource mgr delegate */ /* we not want a mock of resource mgr delegate */
final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class); final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
@ -260,7 +265,87 @@ public class TestYARNRunner extends TestCase {
verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)); verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
} }
@Test @Test(timeout=20000)
public void testGetHSDelegationToken() throws Exception {
try {
Configuration conf = new Configuration();
// Setup mock service
InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444);
Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress);
InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200);
Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress);
// Setup mock rm token
RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier(
new Text("owner"), new Text("renewer"), new Text("real"));
Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
new byte[0], new byte[0], tokenIdentifier.getKind(), rmTokenSevice);
token.setKind(RMDelegationTokenIdentifier.KIND_NAME);
// Setup mock history token
DelegationToken historyToken = BuilderUtils.newDelegationToken(
new byte[0], MRDelegationTokenIdentifier.KIND_NAME.toString(),
new byte[0], hsTokenSevice.toString());
GetDelegationTokenResponse getDtResponse = Records
.newRecord(GetDelegationTokenResponse.class);
getDtResponse.setDelegationToken(historyToken);
// mock services
MRClientProtocol mockHsProxy = mock(MRClientProtocol.class);
doReturn(mockHsAddress).when(mockHsProxy).getConnectAddress();
doReturn(getDtResponse).when(mockHsProxy).getDelegationToken(
any(GetDelegationTokenRequest.class));
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
doReturn(mockRmAddress).when(rmDelegate).getConnectAddress();
ClientCache clientCache = mock(ClientCache.class);
doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy();
Credentials creds = new Credentials();
YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache);
// No HS token if no RM token
yarnRunner.addHistoyToken(creds);
verify(mockHsProxy, times(0)).getDelegationToken(
any(GetDelegationTokenRequest.class));
// No HS token if RM token, but secirity disabled.
creds.addToken(new Text("rmdt"), token);
yarnRunner.addHistoyToken(creds);
verify(mockHsProxy, times(0)).getDelegationToken(
any(GetDelegationTokenRequest.class));
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
creds = new Credentials();
// No HS token if no RM token, security enabled
yarnRunner.addHistoyToken(creds);
verify(mockHsProxy, times(0)).getDelegationToken(
any(GetDelegationTokenRequest.class));
// HS token if RM token present, security enabled
creds.addToken(new Text("rmdt"), token);
yarnRunner.addHistoyToken(creds);
verify(mockHsProxy, times(1)).getDelegationToken(
any(GetDelegationTokenRequest.class));
// No additional call to get HS token if RM and HS token present
yarnRunner.addHistoyToken(creds);
verify(mockHsProxy, times(1)).getDelegationToken(
any(GetDelegationTokenRequest.class));
} finally {
// Back to defaults.
UserGroupInformation.setConfiguration(new Configuration());
}
}
@Test(timeout=20000)
public void testHistoryServerToken() throws Exception { public void testHistoryServerToken() throws Exception {
//Set the master principal in the config //Set the master principal in the config
conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL"); conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL");
@ -303,7 +388,7 @@ public class TestYARNRunner extends TestCase {
}); });
} }
@Test @Test(timeout=20000)
public void testAMAdminCommandOpts() throws Exception { public void testAMAdminCommandOpts() throws Exception {
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf();
@ -366,7 +451,7 @@ public class TestYARNRunner extends TestCase {
assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex); assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex);
} }
} }
@Test @Test(timeout=20000)
public void testWarnCommandOpts() throws Exception { public void testWarnCommandOpts() throws Exception {
Logger logger = Logger.getLogger(YARNRunner.class); Logger logger = Logger.getLogger(YARNRunner.class);