diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 4a89be511ea..aace37718f9 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -212,6 +212,8 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4210. Expose listener address for WebApp (Daryn Sharp via bobby) + MAPREDUCE-4162. Correctly set token service (Daryn Sharp via bobby) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index 01b29eaf178..2e8defbb549 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -50,7 +50,9 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -77,7 +79,8 @@ class YarnChild { String host = args[0]; int port = Integer.parseInt(args[1]); - final InetSocketAddress address = new InetSocketAddress(host, port); + final InetSocketAddress address = + NetUtils.createSocketAddrForHost(host, port); final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]); int jvmIdInt = Integer.parseInt(args[3]); JVMId jvmId = new JVMId(firstTaskid.getJobID(), @@ -214,8 +217,7 @@ class YarnChild { LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() + "; from file=" + jobTokenFile); Token jt = TokenCache.getJobToken(credentials); - jt.setService(new Text(address.getAddress().getHostAddress() + ":" - + address.getPort())); + SecurityUtil.setTokenService(jt, address); UserGroupInformation current = UserGroupInformation.getCurrentUser(); current.addToken(jt); for (Token tok : credentials.getAllTokens()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java index 0bd730b7dd0..341e7215293 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java @@ -180,6 +180,11 @@ public class MRClientService extends AbstractService private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + @Override + public InetSocketAddress getConnectAddress() { + return getBindAddress(); + } + private Job verifyAndGetJob(JobId jobID, boolean modifyAccess) throws YarnRemoteException { Job job = appContext.getJob(jobID); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 46a6111d610..44dd16daa05 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app.launcher; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.HashSet; @@ -34,7 +35,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.hadoop.yarn.util.Records; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -321,13 +322,13 @@ public class ContainerLauncherImpl extends AbstractService implements final String containerManagerBindAddr, ContainerToken containerToken) throws IOException { + final InetSocketAddress cmAddr = + NetUtils.createSocketAddr(containerManagerBindAddr); UserGroupInformation user = UserGroupInformation.getCurrentUser(); if (UserGroupInformation.isSecurityEnabled()) { - Token token = new Token( - containerToken.getIdentifier().array(), containerToken - .getPassword().array(), new Text(containerToken.getKind()), - new Text(containerToken.getService())); + Token token = + ProtoUtils.convertFromProtoFormat(containerToken, cmAddr); // the user in createRemoteUser in this context has to be ContainerID user = UserGroupInformation.createRemoteUser(containerID.toString()); user.addToken(token); @@ -338,8 +339,7 @@ public class ContainerLauncherImpl extends AbstractService implements @Override public ContainerManager run() { return (ContainerManager) rpc.getProxy(ContainerManager.class, - NetUtils.createSocketAddr(containerManagerBindAddr), - getConfig()); + cmAddr, getConfig()); } }); return proxy; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 49df2176ef9..b0471e68ca0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -133,15 +134,14 @@ public abstract class RMCommunicator extends AbstractService { protected void register() { //Register - String host = clientService.getBindAddress().getAddress() - .getCanonicalHostName(); + InetSocketAddress serviceAddr = clientService.getBindAddress(); try { RegisterApplicationMasterRequest request = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class); request.setApplicationAttemptId(applicationAttemptId); - request.setHost(host); - request.setRpcPort(clientService.getBindAddress().getPort()); - request.setTrackingUrl(host + ":" + clientService.getHttpPort()); + request.setHost(serviceAddr.getHostName()); + request.setRpcPort(serviceAddr.getPort()); + request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort()); RegisterApplicationMasterResponse response = scheduler.registerApplicationMaster(request); minContainerCapability = response.getMinimumResourceCapability(); @@ -262,9 +262,6 @@ public abstract class RMCommunicator extends AbstractService { if (UserGroupInformation.isSecurityEnabled()) { String tokenURLEncodedStr = System.getenv().get( ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME); - if (LOG.isDebugEnabled()) { - LOG.debug("AppMasterToken is " + tokenURLEncodedStr); - } Token token = new Token(); try { @@ -273,6 +270,10 @@ public abstract class RMCommunicator extends AbstractService { throw new YarnException(e); } + SecurityUtil.setTokenService(token, serviceAddr); + if (LOG.isDebugEnabled()) { + LOG.debug("AppMasterToken is " + token); + } currentUser.addToken(token); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRClientProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRClientProtocol.java index bc590b606a3..08166b96b18 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRClientProtocol.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRClientProtocol.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.api; +import java.net.InetSocketAddress; + import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest; @@ -45,6 +47,11 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; public interface MRClientProtocol { + /** + * Address to which the client is connected + * @return InetSocketAddress + */ + public InetSocketAddress getConnectAddress(); public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException; public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException; public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java index cf14532902c..3ab3f0c3b8b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java @@ -104,6 +104,11 @@ public class MRClientProtocolPBClientImpl implements MRClientProtocol { MRClientProtocolPB.class, clientVersion, addr, conf); } + @Override + public InetSocketAddress getConnectAddress() { + return RPC.getServerAddress(proxy); + } + @Override public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java index 9401f4b585a..c76328d5056 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java @@ -122,6 +122,11 @@ public class TestRPCFactories { public class MRClientProtocolTestImpl implements MRClientProtocol { + @Override + public InetSocketAddress getConnectAddress() { + return null; + } + @Override public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java index eb838fe8a7a..e456a7afa88 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java @@ -35,13 +35,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Master; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.util.ConfigUtil; import org.apache.hadoop.mapreduce.v2.LogParams; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; @@ -388,21 +386,8 @@ public class Cluster { */ public Token getDelegationToken(Text renewer) throws IOException, InterruptedException{ - Token result = - client.getDelegationToken(renewer); - - if (result == null) { - return result; - } - - InetSocketAddress addr = Master.getMasterAddress(conf); - StringBuilder service = new StringBuilder(); - service.append(NetUtils.normalizeHostName(addr.getAddress(). - getHostAddress())); - service.append(':'); - service.append(addr.getPort()); - result.setService(new Text(service.toString())); - return result; + // client has already set the service + return client.getDelegationToken(renewer); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java index b9ecb98a8a6..388356f01ab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java @@ -178,6 +178,10 @@ public class HistoryClientService extends AbstractService { private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + public InetSocketAddress getConnectAddress() { + return getBindAddress(); + } + private Job verifyAndGetJob(final JobId jobID) throws YarnRemoteException { UserGroupInformation loginUgi = null; Job job = null; @@ -335,8 +339,7 @@ public class HistoryClientService extends AbstractService { jhsDTSecretManager); DelegationToken mrDToken = BuilderUtils.newDelegationToken( realJHSToken.getIdentifier(), realJHSToken.getKind().toString(), - realJHSToken.getPassword(), bindAddress.getAddress().getHostAddress() - + ":" + bindAddress.getPort()); + realJHSToken.getPassword(), realJHSToken.getService().toString()); response.setDelegationToken(mrDToken); return response; } catch (IOException i) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index c2a373750cc..0143cb73913 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -32,7 +32,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -63,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.YarnException; @@ -144,7 +144,7 @@ public class ClientServiceDelegate { if (application != null) { trackingUrl = application.getTrackingUrl(); } - String serviceAddr = null; + InetSocketAddress serviceAddr = null; while (application == null || YarnApplicationState.RUNNING == application .getYarnApplicationState()) { @@ -172,25 +172,23 @@ public class ClientServiceDelegate { if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) { UserGroupInformation newUgi = UserGroupInformation.createRemoteUser( UserGroupInformation.getCurrentUser().getUserName()); - serviceAddr = application.getHost() + ":" + application.getRpcPort(); + serviceAddr = NetUtils.createSocketAddrForHost( + application.getHost(), application.getRpcPort()); if (UserGroupInformation.isSecurityEnabled()) { String clientTokenEncoded = application.getClientToken(); Token clientToken = new Token(); clientToken.decodeFromUrlString(clientTokenEncoded); // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = NetUtils.createSocketAddr(application - .getHost(), application.getRpcPort()); - clientToken.setService(new Text(addr.getAddress().getHostAddress() - + ":" + addr.getPort())); + SecurityUtil.setTokenService(clientToken, serviceAddr); newUgi.addToken(clientToken); } LOG.debug("Connecting to " + serviceAddr); - final String tempStr = serviceAddr; + final InetSocketAddress finalServiceAddr = serviceAddr; realProxy = newUgi.doAs(new PrivilegedExceptionAction() { @Override public MRClientProtocol run() throws IOException { - return instantiateAMProxy(tempStr); + return instantiateAMProxy(finalServiceAddr); } }); } else { @@ -270,13 +268,13 @@ public class ClientServiceDelegate { return historyServerProxy; } - MRClientProtocol instantiateAMProxy(final String serviceAddr) + MRClientProtocol instantiateAMProxy(final InetSocketAddress serviceAddr) throws IOException { LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr); YarnRPC rpc = YarnRPC.create(conf); MRClientProtocol proxy = (MRClientProtocol) rpc.getProxy(MRClientProtocol.class, - NetUtils.createSocketAddr(serviceAddr), conf); + serviceAddr, conf); LOG.trace("Connected to ApplicationMaster at: " + serviceAddr); return proxy; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java index 25069cccf1e..3d00e8af8c9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapred; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; @@ -209,4 +210,10 @@ public class NotRunningJob implements MRClientProtocol { /* Should not be invoked by anyone. */ throw new NotImplementedException(); } + + @Override + public InetSocketAddress getConnectAddress() { + /* Should not be invoked by anyone. Normally used to set token service */ + throw new NotImplementedException(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 79a1d27c2db..62b608aca47 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -37,8 +37,6 @@ import org.apache.hadoop.mapreduce.QueueInfo; import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; -import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -67,14 +65,14 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.ProtoUtils; // TODO: This should be part of something like yarn-client. public class ResourceMgrDelegate { private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class); - private final String rmAddress; + private final InetSocketAddress rmAddress; private YarnConfiguration conf; ClientRMProtocol applicationsManager; private ApplicationId applicationId; @@ -87,11 +85,7 @@ public class ResourceMgrDelegate { public ResourceMgrDelegate(YarnConfiguration conf) { this.conf = conf; YarnRPC rpc = YarnRPC.create(this.conf); - InetSocketAddress rmAddress = conf.getSocketAddr( - YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); - this.rmAddress = rmAddress.toString(); + this.rmAddress = getRmAddress(conf); LOG.debug("Connecting to ResourceManager at " + rmAddress); applicationsManager = (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, @@ -109,7 +103,13 @@ public class ResourceMgrDelegate { ClientRMProtocol applicationsManager) { this.conf = conf; this.applicationsManager = applicationsManager; - this.rmAddress = applicationsManager.toString(); + this.rmAddress = getRmAddress(conf); + } + + private static InetSocketAddress getRmAddress(YarnConfiguration conf) { + return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); } public void cancelDelegationToken(Token arg0) @@ -168,9 +168,7 @@ public class ResourceMgrDelegate { org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse response = applicationsManager.getDelegationToken(rmDTRequest); DelegationToken yarnToken = response.getRMDelegationToken(); - return new Token(yarnToken.getIdentifier().array(), - yarnToken.getPassword().array(), - new Text(yarnToken.getKind()), new Text(yarnToken.getService())); + return ProtoUtils.convertFromProtoFormat(yarnToken, rmAddress); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 3b00ddf83c5..e6358de35de 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -56,7 +56,6 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; -import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRApps; @@ -84,6 +83,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.ProtoUtils; /** @@ -184,7 +184,7 @@ public class YARNRunner implements ClientProtocol { return resMgrDelegate.getClusterMetrics(); } - private Token getDelegationTokenFromHS( + private Token getDelegationTokenFromHS( MRClientProtocol hsProxy, Text renewer) throws IOException, InterruptedException { GetDelegationTokenRequest request = recordFactory @@ -192,10 +192,8 @@ public class YARNRunner implements ClientProtocol { request.setRenewer(renewer.toString()); DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request) .getDelegationToken(); - return new Token(mrDelegationToken - .getIdentifier().array(), mrDelegationToken.getPassword().array(), - new Text(mrDelegationToken.getKind()), new Text( - mrDelegationToken.getService())); + return ProtoUtils.convertFromProtoFormat(mrDelegationToken, + hsProxy.getConnectAddress()); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 9d4efe639c7..095d3fd9301 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -368,6 +368,11 @@ public class TestClientRedirect { this(AMHOSTADDRESS); } + @Override + public InetSocketAddress getConnectAddress() { + return bindAddress; + } + public AMService(String hostAddress) { super("AMService"); this.protocol = MRClientProtocol.class; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java index 55cfeeb9442..a3940054c59 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; @@ -242,7 +243,7 @@ public class TestClientServiceDelegate { // should use the same proxy to AM2 and so instantiateProxy shouldn't be // called. doReturn(firstGenAMProxy).doReturn(secondGenAMProxy).when( - clientServiceDelegate).instantiateAMProxy(any(String.class)); + clientServiceDelegate).instantiateAMProxy(any(InetSocketAddress.class)); JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); Assert.assertNotNull(jobStatus); @@ -257,7 +258,7 @@ public class TestClientServiceDelegate { Assert.assertEquals("jobName-secondGen", jobStatus.getJobName()); verify(clientServiceDelegate, times(2)).instantiateAMProxy( - any(String.class)); + any(InetSocketAddress.class)); } @Test @@ -286,19 +287,19 @@ public class TestClientServiceDelegate { Assert.assertEquals("N/A", jobStatus.getJobName()); verify(clientServiceDelegate, times(0)).instantiateAMProxy( - any(String.class)); + any(InetSocketAddress.class)); // Should not reach AM even for second and third times too. jobStatus = clientServiceDelegate.getJobStatus(oldJobId); Assert.assertNotNull(jobStatus); Assert.assertEquals("N/A", jobStatus.getJobName()); verify(clientServiceDelegate, times(0)).instantiateAMProxy( - any(String.class)); + any(InetSocketAddress.class)); jobStatus = clientServiceDelegate.getJobStatus(oldJobId); Assert.assertNotNull(jobStatus); Assert.assertEquals("N/A", jobStatus.getJobName()); verify(clientServiceDelegate, times(0)).instantiateAMProxy( - any(String.class)); + any(InetSocketAddress.class)); // The third time around, app is completed, so should go to JHS JobStatus jobStatus1 = clientServiceDelegate.getJobStatus(oldJobId); @@ -309,7 +310,7 @@ public class TestClientServiceDelegate { Assert.assertEquals(1.0f, jobStatus1.getReduceProgress()); verify(clientServiceDelegate, times(0)).instantiateAMProxy( - any(String.class)); + any(InetSocketAddress.class)); } @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java index 20c00b1da08..792806b624c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java @@ -26,11 +26,9 @@ import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; -import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer; @@ -38,11 +36,11 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -95,9 +93,8 @@ public class TestJHSSecurity { // Now try talking to JHS using the delegation token UserGroupInformation ugi = UserGroupInformation.createRemoteUser("TheDarkLord"); - ugi.addToken(new Token(token.getIdentifier() - .array(), token.getPassword().array(), new Text(token.getKind()), - new Text(token.getService()))); + ugi.addToken(ProtoUtils.convertFromProtoFormat( + token, jobHistoryServer.getClientService().getBindAddress())); final YarnRPC rpc = YarnRPC.create(conf); MRClientProtocol userUsingDT = ugi.doAs(new PrivilegedAction() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java index dd4b3489750..8167102ab86 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java @@ -47,6 +47,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.SaslInputStream; import org.apache.hadoop.security.SaslRpcClient; import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Level; @@ -98,10 +99,8 @@ public class TestUmbilicalProtocolWithJobToken { JobTokenIdentifier tokenId = new JobTokenIdentifier(new Text(jobId)); Token token = new Token(tokenId, sm); sm.addTokenForJob(jobId, token); - Text host = new Text(addr.getAddress().getHostAddress() + ":" - + addr.getPort()); - token.setService(host); - LOG.info("Service IP address for token is " + host); + SecurityUtil.setTokenService(token, addr); + LOG.info("Service address for token is " + token.getService()); current.addToken(token); current.doAs(new PrivilegedExceptionAction() { @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerToken.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerToken.java index da34f71f927..0e0e8edb16f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerToken.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerToken.java @@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.ContainerManager; */ @Public @Stable -public interface ContainerToken { +public interface ContainerToken extends DelegationToken { /** * Get the token identifier. * @return token identifier diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java index 6bdc1dfcd0e..5a73eabce1d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java @@ -18,11 +18,17 @@ package org.apache.hadoop.yarn.util; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -192,4 +198,23 @@ public class ProtoUtils { return ApplicationAccessType.valueOf(e.name().replace( APP_ACCESS_TYPE_PREFIX, "")); } + + /** + * Convert a protobuf token into a rpc token and set its service + * + * @param protoToken the yarn token + * @param serviceAddr the connect address for the service + * @return rpc token + */ + public static Token + convertFromProtoFormat(DelegationToken protoToken, InetSocketAddress serviceAddr) { + Token token = new Token(protoToken.getIdentifier().array(), + protoToken.getPassword().array(), + new Text(protoToken.getKind()), + new Text(protoToken.getService())); + if (serviceAddr != null) { + SecurityUtil.setTokenService(token, serviceAddr); + } + return token; + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java index 413817ac586..9fadd09fc16 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -275,10 +276,10 @@ public class BuilderUtils { containerToken.setKind(ContainerTokenIdentifier.KIND.toString()); containerToken.setPassword(password); // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = NetUtils.createSocketAddr(nodeId.getHost(), + InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); - containerToken.setService(addr.getAddress().getHostAddress() + ":" - + addr.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + containerToken.setService(SecurityUtil.buildTokenService(addr).toString()); return containerToken; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 8a007d536c8..b4be6fcf677 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -464,8 +464,7 @@ public class ClientRMService extends AbstractService implements realRMDTtoken.getIdentifier(), realRMDTtoken.getKind().toString(), realRMDTtoken.getPassword(), - clientBindAddress.getAddress().getHostAddress() + ":" - + clientBindAddress.getPort() + realRMDTtoken.getService().toString() )); return response; } catch(IOException io) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 114dc977b59..aa9d2c245da 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -32,9 +32,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; @@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; +import org.apache.hadoop.yarn.util.ProtoUtils; /** * The launch of the AM itself. @@ -131,27 +132,25 @@ public class AMLauncher implements Runnable { Container container = application.getMasterContainer(); - final String containerManagerBindAddress = container.getNodeId().toString(); + final NodeId node = container.getNodeId(); + final InetSocketAddress containerManagerBindAddress = + NetUtils.createSocketAddrForHost(node.getHost(), node.getPort()); final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again. UserGroupInformation currentUser = UserGroupInformation .createRemoteUser(containerId.toString()); if (UserGroupInformation.isSecurityEnabled()) { - ContainerToken containerToken = container.getContainerToken(); Token token = - new Token( - containerToken.getIdentifier().array(), - containerToken.getPassword().array(), new Text( - containerToken.getKind()), new Text( - containerToken.getService())); + ProtoUtils.convertFromProtoFormat(container.getContainerToken(), + containerManagerBindAddress); currentUser.addToken(token); } return currentUser.doAs(new PrivilegedAction() { @Override public ContainerManager run() { return (ContainerManager) rpc.getProxy(ContainerManager.class, - NetUtils.createSocketAddr(containerManagerBindAddress), conf); + containerManagerBindAddress, conf); } }); } @@ -218,22 +217,21 @@ public class AMLauncher implements Runnable { Token token = new Token(id, this.rmContext.getApplicationTokenSecretManager()); - InetSocketAddress unresolvedAddr = conf.getSocketAddr( + InetSocketAddress serviceAddr = conf.getSocketAddr( YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - String resolvedAddr = - unresolvedAddr.getAddress().getHostAddress() + ":" - + unresolvedAddr.getPort(); - token.setService(new Text(resolvedAddr)); + // normally the client should set the service after acquiring the token, + // but this token is directly provided to the tasks + SecurityUtil.setTokenService(token, serviceAddr); String appMasterTokenEncoded = token.encodeToUrlString(); - LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded); + LOG.debug("Putting appMaster token in env : " + token); environment.put( ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME, appMasterTokenEncoded); // Add the RM token - credentials.addToken(new Text(resolvedAddr), token); + credentials.addToken(token.getService(), token); DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); container.setContainerTokens( @@ -245,7 +243,6 @@ public class AMLauncher implements Runnable { this.clientToAMSecretManager.getMasterKey(identifier); String encoded = Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded()); - LOG.debug("The encoded client secret-key to be put in env : " + encoded); environment.put( ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, encoded); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java index cf5629fc50f..4594c05ca36 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java @@ -401,7 +401,6 @@ public class TestContainerManagerSecurity { appTokenSecretManager); SecurityUtil.setTokenService(appToken, schedulerAddr); currentUser.addToken(appToken); - SecurityUtil.setTokenService(appToken, schedulerAddr); AMRMProtocol scheduler = currentUser .doAs(new PrivilegedAction() {