svn merge -c 1335567 FIXES: MAPREDUCE-4162. Correctly set token service (Daryn Sharp via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1335569 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-05-08 15:10:03 +00:00
parent 09c2172681
commit 4bc2774d79
24 changed files with 147 additions and 108 deletions

View File

@ -212,6 +212,8 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4210. Expose listener address for WebApp (Daryn Sharp via bobby) MAPREDUCE-4210. Expose listener address for WebApp (Daryn Sharp via bobby)
MAPREDUCE-4162. Correctly set token service (Daryn Sharp via bobby)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -50,7 +50,9 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -77,7 +79,8 @@ class YarnChild {
String host = args[0]; String host = args[0];
int port = Integer.parseInt(args[1]); int port = Integer.parseInt(args[1]);
final InetSocketAddress address = new InetSocketAddress(host, port); final InetSocketAddress address =
NetUtils.createSocketAddrForHost(host, port);
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]); final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
int jvmIdInt = Integer.parseInt(args[3]); int jvmIdInt = Integer.parseInt(args[3]);
JVMId jvmId = new JVMId(firstTaskid.getJobID(), JVMId jvmId = new JVMId(firstTaskid.getJobID(),
@ -214,8 +217,7 @@ class YarnChild {
LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() + LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() +
"; from file=" + jobTokenFile); "; from file=" + jobTokenFile);
Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials); Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
jt.setService(new Text(address.getAddress().getHostAddress() + ":" SecurityUtil.setTokenService(jt, address);
+ address.getPort()));
UserGroupInformation current = UserGroupInformation.getCurrentUser(); UserGroupInformation current = UserGroupInformation.getCurrentUser();
current.addToken(jt); current.addToken(jt);
for (Token<? extends TokenIdentifier> tok : credentials.getAllTokens()) { for (Token<? extends TokenIdentifier> tok : credentials.getAllTokens()) {

View File

@ -180,6 +180,11 @@ public class MRClientService extends AbstractService
private RecordFactory recordFactory = private RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
@Override
public InetSocketAddress getConnectAddress() {
return getBindAddress();
}
private Job verifyAndGetJob(JobId jobID, private Job verifyAndGetJob(JobId jobID,
boolean modifyAccess) throws YarnRemoteException { boolean modifyAccess) throws YarnRemoteException {
Job job = appContext.getJob(jobID); Job job = appContext.getJob(jobID);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.launcher; package org.apache.hadoop.mapreduce.v2.app.launcher;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.HashSet; import java.util.HashSet;
@ -34,7 +35,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -321,13 +322,13 @@ public class ContainerLauncherImpl extends AbstractService implements
final String containerManagerBindAddr, ContainerToken containerToken) final String containerManagerBindAddr, ContainerToken containerToken)
throws IOException { throws IOException {
final InetSocketAddress cmAddr =
NetUtils.createSocketAddr(containerManagerBindAddr);
UserGroupInformation user = UserGroupInformation.getCurrentUser(); UserGroupInformation user = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>( Token<ContainerTokenIdentifier> token =
containerToken.getIdentifier().array(), containerToken ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
.getPassword().array(), new Text(containerToken.getKind()),
new Text(containerToken.getService()));
// the user in createRemoteUser in this context has to be ContainerID // the user in createRemoteUser in this context has to be ContainerID
user = UserGroupInformation.createRemoteUser(containerID.toString()); user = UserGroupInformation.createRemoteUser(containerID.toString());
user.addToken(token); user.addToken(token);
@ -338,8 +339,7 @@ public class ContainerLauncherImpl extends AbstractService implements
@Override @Override
public ContainerManager run() { public ContainerManager run() {
return (ContainerManager) rpc.getProxy(ContainerManager.class, return (ContainerManager) rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr(containerManagerBindAddr), cmAddr, getConfig());
getConfig());
} }
}); });
return proxy; return proxy;

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -133,15 +134,14 @@ public abstract class RMCommunicator extends AbstractService {
protected void register() { protected void register() {
//Register //Register
String host = clientService.getBindAddress().getAddress() InetSocketAddress serviceAddr = clientService.getBindAddress();
.getCanonicalHostName();
try { try {
RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest request =
recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class); recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId); request.setApplicationAttemptId(applicationAttemptId);
request.setHost(host); request.setHost(serviceAddr.getHostName());
request.setRpcPort(clientService.getBindAddress().getPort()); request.setRpcPort(serviceAddr.getPort());
request.setTrackingUrl(host + ":" + clientService.getHttpPort()); request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response =
scheduler.registerApplicationMaster(request); scheduler.registerApplicationMaster(request);
minContainerCapability = response.getMinimumResourceCapability(); minContainerCapability = response.getMinimumResourceCapability();
@ -262,9 +262,6 @@ public abstract class RMCommunicator extends AbstractService {
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
String tokenURLEncodedStr = System.getenv().get( String tokenURLEncodedStr = System.getenv().get(
ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME); ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
}
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>(); Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
try { try {
@ -273,6 +270,10 @@ public abstract class RMCommunicator extends AbstractService {
throw new YarnException(e); throw new YarnException(e);
} }
SecurityUtil.setTokenService(token, serviceAddr);
if (LOG.isDebugEnabled()) {
LOG.debug("AppMasterToken is " + token);
}
currentUser.addToken(token); currentUser.addToken(token);
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.api; package org.apache.hadoop.mapreduce.v2.api;
import java.net.InetSocketAddress;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -45,6 +47,11 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
public interface MRClientProtocol { public interface MRClientProtocol {
/**
* Address to which the client is connected
* @return InetSocketAddress
*/
public InetSocketAddress getConnectAddress();
public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException; public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException;
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException; public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException;
public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException; public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException;

View File

@ -104,6 +104,11 @@ public class MRClientProtocolPBClientImpl implements MRClientProtocol {
MRClientProtocolPB.class, clientVersion, addr, conf); MRClientProtocolPB.class, clientVersion, addr, conf);
} }
@Override
public InetSocketAddress getConnectAddress() {
return RPC.getServerAddress(proxy);
}
@Override @Override
public GetJobReportResponse getJobReport(GetJobReportRequest request) public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws YarnRemoteException { throws YarnRemoteException {

View File

@ -122,6 +122,11 @@ public class TestRPCFactories {
public class MRClientProtocolTestImpl implements MRClientProtocol { public class MRClientProtocolTestImpl implements MRClientProtocol {
@Override
public InetSocketAddress getConnectAddress() {
return null;
}
@Override @Override
public GetJobReportResponse getJobReport(GetJobReportRequest request) public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws YarnRemoteException { throws YarnRemoteException {

View File

@ -35,13 +35,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Master;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.util.ConfigUtil; import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@ -388,21 +386,8 @@ public class Cluster {
*/ */
public Token<DelegationTokenIdentifier> public Token<DelegationTokenIdentifier>
getDelegationToken(Text renewer) throws IOException, InterruptedException{ getDelegationToken(Text renewer) throws IOException, InterruptedException{
Token<DelegationTokenIdentifier> result = // client has already set the service
client.getDelegationToken(renewer); return client.getDelegationToken(renewer);
if (result == null) {
return result;
}
InetSocketAddress addr = Master.getMasterAddress(conf);
StringBuilder service = new StringBuilder();
service.append(NetUtils.normalizeHostName(addr.getAddress().
getHostAddress()));
service.append(':');
service.append(addr.getPort());
result.setService(new Text(service.toString()));
return result;
} }
/** /**

View File

@ -178,6 +178,10 @@ public class HistoryClientService extends AbstractService {
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
public InetSocketAddress getConnectAddress() {
return getBindAddress();
}
private Job verifyAndGetJob(final JobId jobID) throws YarnRemoteException { private Job verifyAndGetJob(final JobId jobID) throws YarnRemoteException {
UserGroupInformation loginUgi = null; UserGroupInformation loginUgi = null;
Job job = null; Job job = null;
@ -335,8 +339,7 @@ public class HistoryClientService extends AbstractService {
jhsDTSecretManager); jhsDTSecretManager);
DelegationToken mrDToken = BuilderUtils.newDelegationToken( DelegationToken mrDToken = BuilderUtils.newDelegationToken(
realJHSToken.getIdentifier(), realJHSToken.getKind().toString(), realJHSToken.getIdentifier(), realJHSToken.getKind().toString(),
realJHSToken.getPassword(), bindAddress.getAddress().getHostAddress() realJHSToken.getPassword(), realJHSToken.getService().toString());
+ ":" + bindAddress.getPort());
response.setDelegationToken(mrDToken); response.setDelegationToken(mrDToken);
return response; return response;
} catch (IOException i) { } catch (IOException i) {

View File

@ -32,7 +32,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
@ -63,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
@ -144,7 +144,7 @@ public class ClientServiceDelegate {
if (application != null) { if (application != null) {
trackingUrl = application.getTrackingUrl(); trackingUrl = application.getTrackingUrl();
} }
String serviceAddr = null; InetSocketAddress serviceAddr = null;
while (application == null while (application == null
|| YarnApplicationState.RUNNING == application || YarnApplicationState.RUNNING == application
.getYarnApplicationState()) { .getYarnApplicationState()) {
@ -172,25 +172,23 @@ public class ClientServiceDelegate {
if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) { if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
UserGroupInformation newUgi = UserGroupInformation.createRemoteUser( UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
UserGroupInformation.getCurrentUser().getUserName()); UserGroupInformation.getCurrentUser().getUserName());
serviceAddr = application.getHost() + ":" + application.getRpcPort(); serviceAddr = NetUtils.createSocketAddrForHost(
application.getHost(), application.getRpcPort());
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
String clientTokenEncoded = application.getClientToken(); String clientTokenEncoded = application.getClientToken();
Token<ApplicationTokenIdentifier> clientToken = Token<ApplicationTokenIdentifier> clientToken =
new Token<ApplicationTokenIdentifier>(); new Token<ApplicationTokenIdentifier>();
clientToken.decodeFromUrlString(clientTokenEncoded); clientToken.decodeFromUrlString(clientTokenEncoded);
// RPC layer client expects ip:port as service for tokens // RPC layer client expects ip:port as service for tokens
InetSocketAddress addr = NetUtils.createSocketAddr(application SecurityUtil.setTokenService(clientToken, serviceAddr);
.getHost(), application.getRpcPort());
clientToken.setService(new Text(addr.getAddress().getHostAddress()
+ ":" + addr.getPort()));
newUgi.addToken(clientToken); newUgi.addToken(clientToken);
} }
LOG.debug("Connecting to " + serviceAddr); LOG.debug("Connecting to " + serviceAddr);
final String tempStr = serviceAddr; final InetSocketAddress finalServiceAddr = serviceAddr;
realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() { realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
@Override @Override
public MRClientProtocol run() throws IOException { public MRClientProtocol run() throws IOException {
return instantiateAMProxy(tempStr); return instantiateAMProxy(finalServiceAddr);
} }
}); });
} else { } else {
@ -270,13 +268,13 @@ public class ClientServiceDelegate {
return historyServerProxy; return historyServerProxy;
} }
MRClientProtocol instantiateAMProxy(final String serviceAddr) MRClientProtocol instantiateAMProxy(final InetSocketAddress serviceAddr)
throws IOException { throws IOException {
LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr); LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
YarnRPC rpc = YarnRPC.create(conf); YarnRPC rpc = YarnRPC.create(conf);
MRClientProtocol proxy = MRClientProtocol proxy =
(MRClientProtocol) rpc.getProxy(MRClientProtocol.class, (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), conf); serviceAddr, conf);
LOG.trace("Connected to ApplicationMaster at: " + serviceAddr); LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
return proxy; return proxy;
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -209,4 +210,10 @@ public class NotRunningJob implements MRClientProtocol {
/* Should not be invoked by anyone. */ /* Should not be invoked by anyone. */
throw new NotImplementedException(); throw new NotImplementedException();
} }
@Override
public InetSocketAddress getConnectAddress() {
/* Should not be invoked by anyone. Normally used to set token service */
throw new NotImplementedException();
}
} }

View File

@ -37,8 +37,6 @@ import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -67,14 +65,14 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.util.ProtoUtils;
// TODO: This should be part of something like yarn-client. // TODO: This should be part of something like yarn-client.
public class ResourceMgrDelegate { public class ResourceMgrDelegate {
private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class); private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
private final String rmAddress; private final InetSocketAddress rmAddress;
private YarnConfiguration conf; private YarnConfiguration conf;
ClientRMProtocol applicationsManager; ClientRMProtocol applicationsManager;
private ApplicationId applicationId; private ApplicationId applicationId;
@ -87,11 +85,7 @@ public class ResourceMgrDelegate {
public ResourceMgrDelegate(YarnConfiguration conf) { public ResourceMgrDelegate(YarnConfiguration conf) {
this.conf = conf; this.conf = conf;
YarnRPC rpc = YarnRPC.create(this.conf); YarnRPC rpc = YarnRPC.create(this.conf);
InetSocketAddress rmAddress = conf.getSocketAddr( this.rmAddress = getRmAddress(conf);
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
this.rmAddress = rmAddress.toString();
LOG.debug("Connecting to ResourceManager at " + rmAddress); LOG.debug("Connecting to ResourceManager at " + rmAddress);
applicationsManager = applicationsManager =
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
@ -109,7 +103,13 @@ public class ResourceMgrDelegate {
ClientRMProtocol applicationsManager) { ClientRMProtocol applicationsManager) {
this.conf = conf; this.conf = conf;
this.applicationsManager = applicationsManager; this.applicationsManager = applicationsManager;
this.rmAddress = applicationsManager.toString(); this.rmAddress = getRmAddress(conf);
}
private static InetSocketAddress getRmAddress(YarnConfiguration conf) {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
} }
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0) public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
@ -168,9 +168,7 @@ public class ResourceMgrDelegate {
org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse
response = applicationsManager.getDelegationToken(rmDTRequest); response = applicationsManager.getDelegationToken(rmDTRequest);
DelegationToken yarnToken = response.getRMDelegationToken(); DelegationToken yarnToken = response.getRMDelegationToken();
return new Token<RMDelegationTokenIdentifier>(yarnToken.getIdentifier().array(), return ProtoUtils.convertFromProtoFormat(yarnToken, rmAddress);
yarnToken.getPassword().array(),
new Text(yarnToken.getKind()), new Text(yarnToken.getService()));
} }

View File

@ -56,7 +56,6 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
@ -84,6 +83,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
/** /**
@ -184,7 +184,7 @@ public class YARNRunner implements ClientProtocol {
return resMgrDelegate.getClusterMetrics(); return resMgrDelegate.getClusterMetrics();
} }
private Token<MRDelegationTokenIdentifier> getDelegationTokenFromHS( private Token<?> getDelegationTokenFromHS(
MRClientProtocol hsProxy, Text renewer) throws IOException, MRClientProtocol hsProxy, Text renewer) throws IOException,
InterruptedException { InterruptedException {
GetDelegationTokenRequest request = recordFactory GetDelegationTokenRequest request = recordFactory
@ -192,10 +192,8 @@ public class YARNRunner implements ClientProtocol {
request.setRenewer(renewer.toString()); request.setRenewer(renewer.toString());
DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request) DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request)
.getDelegationToken(); .getDelegationToken();
return new Token<MRDelegationTokenIdentifier>(mrDelegationToken return ProtoUtils.convertFromProtoFormat(mrDelegationToken,
.getIdentifier().array(), mrDelegationToken.getPassword().array(), hsProxy.getConnectAddress());
new Text(mrDelegationToken.getKind()), new Text(
mrDelegationToken.getService()));
} }
@Override @Override

View File

@ -368,6 +368,11 @@ public class TestClientRedirect {
this(AMHOSTADDRESS); this(AMHOSTADDRESS);
} }
@Override
public InetSocketAddress getConnectAddress() {
return bindAddress;
}
public AMService(String hostAddress) { public AMService(String hostAddress) {
super("AMService"); super("AMService");
this.protocol = MRClientProtocol.class; this.protocol = MRClientProtocol.class;

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -242,7 +243,7 @@ public class TestClientServiceDelegate {
// should use the same proxy to AM2 and so instantiateProxy shouldn't be // should use the same proxy to AM2 and so instantiateProxy shouldn't be
// called. // called.
doReturn(firstGenAMProxy).doReturn(secondGenAMProxy).when( doReturn(firstGenAMProxy).doReturn(secondGenAMProxy).when(
clientServiceDelegate).instantiateAMProxy(any(String.class)); clientServiceDelegate).instantiateAMProxy(any(InetSocketAddress.class));
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus); Assert.assertNotNull(jobStatus);
@ -257,7 +258,7 @@ public class TestClientServiceDelegate {
Assert.assertEquals("jobName-secondGen", jobStatus.getJobName()); Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
verify(clientServiceDelegate, times(2)).instantiateAMProxy( verify(clientServiceDelegate, times(2)).instantiateAMProxy(
any(String.class)); any(InetSocketAddress.class));
} }
@Test @Test
@ -286,19 +287,19 @@ public class TestClientServiceDelegate {
Assert.assertEquals("N/A", jobStatus.getJobName()); Assert.assertEquals("N/A", jobStatus.getJobName());
verify(clientServiceDelegate, times(0)).instantiateAMProxy( verify(clientServiceDelegate, times(0)).instantiateAMProxy(
any(String.class)); any(InetSocketAddress.class));
// Should not reach AM even for second and third times too. // Should not reach AM even for second and third times too.
jobStatus = clientServiceDelegate.getJobStatus(oldJobId); jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus); Assert.assertNotNull(jobStatus);
Assert.assertEquals("N/A", jobStatus.getJobName()); Assert.assertEquals("N/A", jobStatus.getJobName());
verify(clientServiceDelegate, times(0)).instantiateAMProxy( verify(clientServiceDelegate, times(0)).instantiateAMProxy(
any(String.class)); any(InetSocketAddress.class));
jobStatus = clientServiceDelegate.getJobStatus(oldJobId); jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus); Assert.assertNotNull(jobStatus);
Assert.assertEquals("N/A", jobStatus.getJobName()); Assert.assertEquals("N/A", jobStatus.getJobName());
verify(clientServiceDelegate, times(0)).instantiateAMProxy( verify(clientServiceDelegate, times(0)).instantiateAMProxy(
any(String.class)); any(InetSocketAddress.class));
// The third time around, app is completed, so should go to JHS // The third time around, app is completed, so should go to JHS
JobStatus jobStatus1 = clientServiceDelegate.getJobStatus(oldJobId); JobStatus jobStatus1 = clientServiceDelegate.getJobStatus(oldJobId);
@ -309,7 +310,7 @@ public class TestClientServiceDelegate {
Assert.assertEquals(1.0f, jobStatus1.getReduceProgress()); Assert.assertEquals(1.0f, jobStatus1.getReduceProgress());
verify(clientServiceDelegate, times(0)).instantiateAMProxy( verify(clientServiceDelegate, times(0)).instantiateAMProxy(
any(String.class)); any(InetSocketAddress.class));
} }
@Test @Test

View File

@ -26,11 +26,9 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer; import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
@ -38,11 +36,11 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@ -95,9 +93,8 @@ public class TestJHSSecurity {
// Now try talking to JHS using the delegation token // Now try talking to JHS using the delegation token
UserGroupInformation ugi = UserGroupInformation ugi =
UserGroupInformation.createRemoteUser("TheDarkLord"); UserGroupInformation.createRemoteUser("TheDarkLord");
ugi.addToken(new Token<MRDelegationTokenIdentifier>(token.getIdentifier() ugi.addToken(ProtoUtils.convertFromProtoFormat(
.array(), token.getPassword().array(), new Text(token.getKind()), token, jobHistoryServer.getClientService().getBindAddress()));
new Text(token.getService())));
final YarnRPC rpc = YarnRPC.create(conf); final YarnRPC rpc = YarnRPC.create(conf);
MRClientProtocol userUsingDT = MRClientProtocol userUsingDT =
ugi.doAs(new PrivilegedAction<MRClientProtocol>() { ugi.doAs(new PrivilegedAction<MRClientProtocol>() {

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.SaslInputStream; import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslRpcClient; import org.apache.hadoop.security.SaslRpcClient;
import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -98,10 +99,8 @@ public class TestUmbilicalProtocolWithJobToken {
JobTokenIdentifier tokenId = new JobTokenIdentifier(new Text(jobId)); JobTokenIdentifier tokenId = new JobTokenIdentifier(new Text(jobId));
Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(tokenId, sm); Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(tokenId, sm);
sm.addTokenForJob(jobId, token); sm.addTokenForJob(jobId, token);
Text host = new Text(addr.getAddress().getHostAddress() + ":" SecurityUtil.setTokenService(token, addr);
+ addr.getPort()); LOG.info("Service address for token is " + token.getService());
token.setService(host);
LOG.info("Service IP address for token is " + host);
current.addToken(token); current.addToken(token);
current.doAs(new PrivilegedExceptionAction<Object>() { current.doAs(new PrivilegedExceptionAction<Object>() {
@Override @Override

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.ContainerManager;
*/ */
@Public @Public
@Stable @Stable
public interface ContainerToken { public interface ContainerToken extends DelegationToken {
/** /**
* Get the token identifier. * Get the token identifier.
* @return token identifier * @return token identifier

View File

@ -18,11 +18,17 @@
package org.apache.hadoop.yarn.util; package org.apache.hadoop.yarn.util;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -192,4 +198,23 @@ public class ProtoUtils {
return ApplicationAccessType.valueOf(e.name().replace( return ApplicationAccessType.valueOf(e.name().replace(
APP_ACCESS_TYPE_PREFIX, "")); APP_ACCESS_TYPE_PREFIX, ""));
} }
/**
* Convert a protobuf token into a rpc token and set its service
*
* @param protoToken the yarn token
* @param serviceAddr the connect address for the service
* @return rpc token
*/
public static <T extends TokenIdentifier> Token<T>
convertFromProtoFormat(DelegationToken protoToken, InetSocketAddress serviceAddr) {
Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
protoToken.getPassword().array(),
new Text(protoToken.getKind()),
new Text(protoToken.getService()));
if (serviceAddr != null) {
SecurityUtil.setTokenService(token, serviceAddr);
}
return token;
}
} }

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -275,10 +276,10 @@ public class BuilderUtils {
containerToken.setKind(ContainerTokenIdentifier.KIND.toString()); containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
containerToken.setPassword(password); containerToken.setPassword(password);
// RPC layer client expects ip:port as service for tokens // RPC layer client expects ip:port as service for tokens
InetSocketAddress addr = NetUtils.createSocketAddr(nodeId.getHost(), InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
nodeId.getPort()); nodeId.getPort());
containerToken.setService(addr.getAddress().getHostAddress() + ":" // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+ addr.getPort()); containerToken.setService(SecurityUtil.buildTokenService(addr).toString());
return containerToken; return containerToken;
} }

View File

@ -464,8 +464,7 @@ public class ClientRMService extends AbstractService implements
realRMDTtoken.getIdentifier(), realRMDTtoken.getIdentifier(),
realRMDTtoken.getKind().toString(), realRMDTtoken.getKind().toString(),
realRMDTtoken.getPassword(), realRMDTtoken.getPassword(),
clientBindAddress.getAddress().getHostAddress() + ":" realRMDTtoken.getService().toString()
+ clientBindAddress.getPort()
)); ));
return response; return response;
} catch(IOException io) { } catch(IOException io) {

View File

@ -32,9 +32,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.util.ProtoUtils;
/** /**
* The launch of the AM itself. * The launch of the AM itself.
@ -131,27 +132,25 @@ public class AMLauncher implements Runnable {
Container container = application.getMasterContainer(); Container container = application.getMasterContainer();
final String containerManagerBindAddress = container.getNodeId().toString(); final NodeId node = container.getNodeId();
final InetSocketAddress containerManagerBindAddress =
NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again. final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
UserGroupInformation currentUser = UserGroupInformation UserGroupInformation currentUser = UserGroupInformation
.createRemoteUser(containerId.toString()); .createRemoteUser(containerId.toString());
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
ContainerToken containerToken = container.getContainerToken();
Token<ContainerTokenIdentifier> token = Token<ContainerTokenIdentifier> token =
new Token<ContainerTokenIdentifier>( ProtoUtils.convertFromProtoFormat(container.getContainerToken(),
containerToken.getIdentifier().array(), containerManagerBindAddress);
containerToken.getPassword().array(), new Text(
containerToken.getKind()), new Text(
containerToken.getService()));
currentUser.addToken(token); currentUser.addToken(token);
} }
return currentUser.doAs(new PrivilegedAction<ContainerManager>() { return currentUser.doAs(new PrivilegedAction<ContainerManager>() {
@Override @Override
public ContainerManager run() { public ContainerManager run() {
return (ContainerManager) rpc.getProxy(ContainerManager.class, return (ContainerManager) rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr(containerManagerBindAddress), conf); containerManagerBindAddress, conf);
} }
}); });
} }
@ -218,22 +217,21 @@ public class AMLauncher implements Runnable {
Token<ApplicationTokenIdentifier> token = Token<ApplicationTokenIdentifier> token =
new Token<ApplicationTokenIdentifier>(id, new Token<ApplicationTokenIdentifier>(id,
this.rmContext.getApplicationTokenSecretManager()); this.rmContext.getApplicationTokenSecretManager());
InetSocketAddress unresolvedAddr = conf.getSocketAddr( InetSocketAddress serviceAddr = conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
String resolvedAddr = // normally the client should set the service after acquiring the token,
unresolvedAddr.getAddress().getHostAddress() + ":" // but this token is directly provided to the tasks
+ unresolvedAddr.getPort(); SecurityUtil.setTokenService(token, serviceAddr);
token.setService(new Text(resolvedAddr));
String appMasterTokenEncoded = token.encodeToUrlString(); String appMasterTokenEncoded = token.encodeToUrlString();
LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded); LOG.debug("Putting appMaster token in env : " + token);
environment.put( environment.put(
ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME, ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
appMasterTokenEncoded); appMasterTokenEncoded);
// Add the RM token // Add the RM token
credentials.addToken(new Text(resolvedAddr), token); credentials.addToken(token.getService(), token);
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob); credentials.writeTokenStorageToStream(dob);
container.setContainerTokens( container.setContainerTokens(
@ -245,7 +243,6 @@ public class AMLauncher implements Runnable {
this.clientToAMSecretManager.getMasterKey(identifier); this.clientToAMSecretManager.getMasterKey(identifier);
String encoded = String encoded =
Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded()); Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
LOG.debug("The encoded client secret-key to be put in env : " + encoded);
environment.put( environment.put(
ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME,
encoded); encoded);

View File

@ -401,7 +401,6 @@ public class TestContainerManagerSecurity {
appTokenSecretManager); appTokenSecretManager);
SecurityUtil.setTokenService(appToken, schedulerAddr); SecurityUtil.setTokenService(appToken, schedulerAddr);
currentUser.addToken(appToken); currentUser.addToken(appToken);
SecurityUtil.setTokenService(appToken, schedulerAddr);
AMRMProtocol scheduler = currentUser AMRMProtocol scheduler = currentUser
.doAs(new PrivilegedAction<AMRMProtocol>() { .doAs(new PrivilegedAction<AMRMProtocol>() {