MAPREDUCE-4162. Correctly set token service (Daryn Sharp via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1335567 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a9808de0d9
commit
aa60da6c2e
|
@ -321,6 +321,8 @@ Release 0.23.3 - UNRELEASED
|
||||||
|
|
||||||
MAPREDUCE-4210. Expose listener address for WebApp (Daryn Sharp via bobby)
|
MAPREDUCE-4210. Expose listener address for WebApp (Daryn Sharp via bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-4162. Correctly set token service (Daryn Sharp via bobby)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -50,7 +50,9 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
@ -77,7 +79,8 @@ class YarnChild {
|
||||||
|
|
||||||
String host = args[0];
|
String host = args[0];
|
||||||
int port = Integer.parseInt(args[1]);
|
int port = Integer.parseInt(args[1]);
|
||||||
final InetSocketAddress address = new InetSocketAddress(host, port);
|
final InetSocketAddress address =
|
||||||
|
NetUtils.createSocketAddrForHost(host, port);
|
||||||
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
|
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
|
||||||
int jvmIdInt = Integer.parseInt(args[3]);
|
int jvmIdInt = Integer.parseInt(args[3]);
|
||||||
JVMId jvmId = new JVMId(firstTaskid.getJobID(),
|
JVMId jvmId = new JVMId(firstTaskid.getJobID(),
|
||||||
|
@ -214,8 +217,7 @@ class YarnChild {
|
||||||
LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() +
|
LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() +
|
||||||
"; from file=" + jobTokenFile);
|
"; from file=" + jobTokenFile);
|
||||||
Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
|
Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
|
||||||
jt.setService(new Text(address.getAddress().getHostAddress() + ":"
|
SecurityUtil.setTokenService(jt, address);
|
||||||
+ address.getPort()));
|
|
||||||
UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
||||||
current.addToken(jt);
|
current.addToken(jt);
|
||||||
for (Token<? extends TokenIdentifier> tok : credentials.getAllTokens()) {
|
for (Token<? extends TokenIdentifier> tok : credentials.getAllTokens()) {
|
||||||
|
|
|
@ -180,6 +180,11 @@ public class MRClientService extends AbstractService
|
||||||
private RecordFactory recordFactory =
|
private RecordFactory recordFactory =
|
||||||
RecordFactoryProvider.getRecordFactory(null);
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InetSocketAddress getConnectAddress() {
|
||||||
|
return getBindAddress();
|
||||||
|
}
|
||||||
|
|
||||||
private Job verifyAndGetJob(JobId jobID,
|
private Job verifyAndGetJob(JobId jobID,
|
||||||
boolean modifyAccess) throws YarnRemoteException {
|
boolean modifyAccess) throws YarnRemoteException {
|
||||||
Job job = appContext.getJob(jobID);
|
Job job = appContext.getJob(jobID);
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.mapreduce.v2.app.launcher;
|
package org.apache.hadoop.mapreduce.v2.app.launcher;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -34,7 +35,6 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.mapred.ShuffleHandler;
|
import org.apache.hadoop.mapred.ShuffleHandler;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
|
@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
@ -321,13 +322,13 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
final String containerManagerBindAddr, ContainerToken containerToken)
|
final String containerManagerBindAddr, ContainerToken containerToken)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
|
final InetSocketAddress cmAddr =
|
||||||
|
NetUtils.createSocketAddr(containerManagerBindAddr);
|
||||||
UserGroupInformation user = UserGroupInformation.getCurrentUser();
|
UserGroupInformation user = UserGroupInformation.getCurrentUser();
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
|
Token<ContainerTokenIdentifier> token =
|
||||||
containerToken.getIdentifier().array(), containerToken
|
ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
|
||||||
.getPassword().array(), new Text(containerToken.getKind()),
|
|
||||||
new Text(containerToken.getService()));
|
|
||||||
// the user in createRemoteUser in this context has to be ContainerID
|
// the user in createRemoteUser in this context has to be ContainerID
|
||||||
user = UserGroupInformation.createRemoteUser(containerID.toString());
|
user = UserGroupInformation.createRemoteUser(containerID.toString());
|
||||||
user.addToken(token);
|
user.addToken(token);
|
||||||
|
@ -338,8 +339,7 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
@Override
|
@Override
|
||||||
public ContainerManager run() {
|
public ContainerManager run() {
|
||||||
return (ContainerManager) rpc.getProxy(ContainerManager.class,
|
return (ContainerManager) rpc.getProxy(ContainerManager.class,
|
||||||
NetUtils.createSocketAddr(containerManagerBindAddr),
|
cmAddr, getConfig());
|
||||||
getConfig());
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return proxy;
|
return proxy;
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
@ -133,15 +134,14 @@ public abstract class RMCommunicator extends AbstractService {
|
||||||
|
|
||||||
protected void register() {
|
protected void register() {
|
||||||
//Register
|
//Register
|
||||||
String host = clientService.getBindAddress().getAddress()
|
InetSocketAddress serviceAddr = clientService.getBindAddress();
|
||||||
.getCanonicalHostName();
|
|
||||||
try {
|
try {
|
||||||
RegisterApplicationMasterRequest request =
|
RegisterApplicationMasterRequest request =
|
||||||
recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
|
recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
|
||||||
request.setApplicationAttemptId(applicationAttemptId);
|
request.setApplicationAttemptId(applicationAttemptId);
|
||||||
request.setHost(host);
|
request.setHost(serviceAddr.getHostName());
|
||||||
request.setRpcPort(clientService.getBindAddress().getPort());
|
request.setRpcPort(serviceAddr.getPort());
|
||||||
request.setTrackingUrl(host + ":" + clientService.getHttpPort());
|
request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
|
||||||
RegisterApplicationMasterResponse response =
|
RegisterApplicationMasterResponse response =
|
||||||
scheduler.registerApplicationMaster(request);
|
scheduler.registerApplicationMaster(request);
|
||||||
minContainerCapability = response.getMinimumResourceCapability();
|
minContainerCapability = response.getMinimumResourceCapability();
|
||||||
|
@ -262,9 +262,6 @@ public abstract class RMCommunicator extends AbstractService {
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
String tokenURLEncodedStr = System.getenv().get(
|
String tokenURLEncodedStr = System.getenv().get(
|
||||||
ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
|
ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
|
|
||||||
}
|
|
||||||
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
|
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -273,6 +270,10 @@ public abstract class RMCommunicator extends AbstractService {
|
||||||
throw new YarnException(e);
|
throw new YarnException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SecurityUtil.setTokenService(token, serviceAddr);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("AppMasterToken is " + token);
|
||||||
|
}
|
||||||
currentUser.addToken(token);
|
currentUser.addToken(token);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.api;
|
package org.apache.hadoop.mapreduce.v2.api;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
|
||||||
|
@ -45,6 +47,11 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
|
||||||
public interface MRClientProtocol {
|
public interface MRClientProtocol {
|
||||||
|
/**
|
||||||
|
* Address to which the client is connected
|
||||||
|
* @return InetSocketAddress
|
||||||
|
*/
|
||||||
|
public InetSocketAddress getConnectAddress();
|
||||||
public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException;
|
public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException;
|
||||||
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException;
|
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException;
|
||||||
public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException;
|
public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException;
|
||||||
|
|
|
@ -104,6 +104,11 @@ public class MRClientProtocolPBClientImpl implements MRClientProtocol {
|
||||||
MRClientProtocolPB.class, clientVersion, addr, conf);
|
MRClientProtocolPB.class, clientVersion, addr, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InetSocketAddress getConnectAddress() {
|
||||||
|
return RPC.getServerAddress(proxy);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetJobReportResponse getJobReport(GetJobReportRequest request)
|
public GetJobReportResponse getJobReport(GetJobReportRequest request)
|
||||||
throws YarnRemoteException {
|
throws YarnRemoteException {
|
||||||
|
|
|
@ -122,6 +122,11 @@ public class TestRPCFactories {
|
||||||
|
|
||||||
public class MRClientProtocolTestImpl implements MRClientProtocol {
|
public class MRClientProtocolTestImpl implements MRClientProtocol {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InetSocketAddress getConnectAddress() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetJobReportResponse getJobReport(GetJobReportRequest request)
|
public GetJobReportResponse getJobReport(GetJobReportRequest request)
|
||||||
throws YarnRemoteException {
|
throws YarnRemoteException {
|
||||||
|
|
|
@ -35,13 +35,11 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.Master;
|
|
||||||
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
|
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
|
||||||
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.mapreduce.util.ConfigUtil;
|
import org.apache.hadoop.mapreduce.util.ConfigUtil;
|
||||||
import org.apache.hadoop.mapreduce.v2.LogParams;
|
import org.apache.hadoop.mapreduce.v2.LogParams;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
|
@ -388,21 +386,8 @@ public class Cluster {
|
||||||
*/
|
*/
|
||||||
public Token<DelegationTokenIdentifier>
|
public Token<DelegationTokenIdentifier>
|
||||||
getDelegationToken(Text renewer) throws IOException, InterruptedException{
|
getDelegationToken(Text renewer) throws IOException, InterruptedException{
|
||||||
Token<DelegationTokenIdentifier> result =
|
// client has already set the service
|
||||||
client.getDelegationToken(renewer);
|
return client.getDelegationToken(renewer);
|
||||||
|
|
||||||
if (result == null) {
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
InetSocketAddress addr = Master.getMasterAddress(conf);
|
|
||||||
StringBuilder service = new StringBuilder();
|
|
||||||
service.append(NetUtils.normalizeHostName(addr.getAddress().
|
|
||||||
getHostAddress()));
|
|
||||||
service.append(':');
|
|
||||||
service.append(addr.getPort());
|
|
||||||
result.setService(new Text(service.toString()));
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -178,6 +178,10 @@ public class HistoryClientService extends AbstractService {
|
||||||
|
|
||||||
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
|
public InetSocketAddress getConnectAddress() {
|
||||||
|
return getBindAddress();
|
||||||
|
}
|
||||||
|
|
||||||
private Job verifyAndGetJob(final JobId jobID) throws YarnRemoteException {
|
private Job verifyAndGetJob(final JobId jobID) throws YarnRemoteException {
|
||||||
UserGroupInformation loginUgi = null;
|
UserGroupInformation loginUgi = null;
|
||||||
Job job = null;
|
Job job = null;
|
||||||
|
@ -335,8 +339,7 @@ public class HistoryClientService extends AbstractService {
|
||||||
jhsDTSecretManager);
|
jhsDTSecretManager);
|
||||||
DelegationToken mrDToken = BuilderUtils.newDelegationToken(
|
DelegationToken mrDToken = BuilderUtils.newDelegationToken(
|
||||||
realJHSToken.getIdentifier(), realJHSToken.getKind().toString(),
|
realJHSToken.getIdentifier(), realJHSToken.getKind().toString(),
|
||||||
realJHSToken.getPassword(), bindAddress.getAddress().getHostAddress()
|
realJHSToken.getPassword(), realJHSToken.getService().toString());
|
||||||
+ ":" + bindAddress.getPort());
|
|
||||||
response.setDelegationToken(mrDToken);
|
response.setDelegationToken(mrDToken);
|
||||||
return response;
|
return response;
|
||||||
} catch (IOException i) {
|
} catch (IOException i) {
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.JobStatus;
|
import org.apache.hadoop.mapreduce.JobStatus;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
@ -63,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
|
@ -144,7 +144,7 @@ public class ClientServiceDelegate {
|
||||||
if (application != null) {
|
if (application != null) {
|
||||||
trackingUrl = application.getTrackingUrl();
|
trackingUrl = application.getTrackingUrl();
|
||||||
}
|
}
|
||||||
String serviceAddr = null;
|
InetSocketAddress serviceAddr = null;
|
||||||
while (application == null
|
while (application == null
|
||||||
|| YarnApplicationState.RUNNING == application
|
|| YarnApplicationState.RUNNING == application
|
||||||
.getYarnApplicationState()) {
|
.getYarnApplicationState()) {
|
||||||
|
@ -172,25 +172,23 @@ public class ClientServiceDelegate {
|
||||||
if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
|
if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
|
||||||
UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
|
UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
|
||||||
UserGroupInformation.getCurrentUser().getUserName());
|
UserGroupInformation.getCurrentUser().getUserName());
|
||||||
serviceAddr = application.getHost() + ":" + application.getRpcPort();
|
serviceAddr = NetUtils.createSocketAddrForHost(
|
||||||
|
application.getHost(), application.getRpcPort());
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
String clientTokenEncoded = application.getClientToken();
|
String clientTokenEncoded = application.getClientToken();
|
||||||
Token<ApplicationTokenIdentifier> clientToken =
|
Token<ApplicationTokenIdentifier> clientToken =
|
||||||
new Token<ApplicationTokenIdentifier>();
|
new Token<ApplicationTokenIdentifier>();
|
||||||
clientToken.decodeFromUrlString(clientTokenEncoded);
|
clientToken.decodeFromUrlString(clientTokenEncoded);
|
||||||
// RPC layer client expects ip:port as service for tokens
|
// RPC layer client expects ip:port as service for tokens
|
||||||
InetSocketAddress addr = NetUtils.createSocketAddr(application
|
SecurityUtil.setTokenService(clientToken, serviceAddr);
|
||||||
.getHost(), application.getRpcPort());
|
|
||||||
clientToken.setService(new Text(addr.getAddress().getHostAddress()
|
|
||||||
+ ":" + addr.getPort()));
|
|
||||||
newUgi.addToken(clientToken);
|
newUgi.addToken(clientToken);
|
||||||
}
|
}
|
||||||
LOG.debug("Connecting to " + serviceAddr);
|
LOG.debug("Connecting to " + serviceAddr);
|
||||||
final String tempStr = serviceAddr;
|
final InetSocketAddress finalServiceAddr = serviceAddr;
|
||||||
realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
|
realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
|
||||||
@Override
|
@Override
|
||||||
public MRClientProtocol run() throws IOException {
|
public MRClientProtocol run() throws IOException {
|
||||||
return instantiateAMProxy(tempStr);
|
return instantiateAMProxy(finalServiceAddr);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
|
@ -270,13 +268,13 @@ public class ClientServiceDelegate {
|
||||||
return historyServerProxy;
|
return historyServerProxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
MRClientProtocol instantiateAMProxy(final String serviceAddr)
|
MRClientProtocol instantiateAMProxy(final InetSocketAddress serviceAddr)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
|
LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
MRClientProtocol proxy =
|
MRClientProtocol proxy =
|
||||||
(MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
|
(MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
|
||||||
NetUtils.createSocketAddr(serviceAddr), conf);
|
serviceAddr, conf);
|
||||||
LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
|
LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
|
||||||
return proxy;
|
return proxy;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
@ -209,4 +210,10 @@ public class NotRunningJob implements MRClientProtocol {
|
||||||
/* Should not be invoked by anyone. */
|
/* Should not be invoked by anyone. */
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InetSocketAddress getConnectAddress() {
|
||||||
|
/* Should not be invoked by anyone. Normally used to set token service */
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,8 +37,6 @@ import org.apache.hadoop.mapreduce.QueueInfo;
|
||||||
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
|
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -67,14 +65,14 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||||
|
|
||||||
|
|
||||||
// TODO: This should be part of something like yarn-client.
|
// TODO: This should be part of something like yarn-client.
|
||||||
public class ResourceMgrDelegate {
|
public class ResourceMgrDelegate {
|
||||||
private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
|
private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
|
||||||
|
|
||||||
private final String rmAddress;
|
private final InetSocketAddress rmAddress;
|
||||||
private YarnConfiguration conf;
|
private YarnConfiguration conf;
|
||||||
ClientRMProtocol applicationsManager;
|
ClientRMProtocol applicationsManager;
|
||||||
private ApplicationId applicationId;
|
private ApplicationId applicationId;
|
||||||
|
@ -87,11 +85,7 @@ public class ResourceMgrDelegate {
|
||||||
public ResourceMgrDelegate(YarnConfiguration conf) {
|
public ResourceMgrDelegate(YarnConfiguration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
YarnRPC rpc = YarnRPC.create(this.conf);
|
YarnRPC rpc = YarnRPC.create(this.conf);
|
||||||
InetSocketAddress rmAddress = conf.getSocketAddr(
|
this.rmAddress = getRmAddress(conf);
|
||||||
YarnConfiguration.RM_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_PORT);
|
|
||||||
this.rmAddress = rmAddress.toString();
|
|
||||||
LOG.debug("Connecting to ResourceManager at " + rmAddress);
|
LOG.debug("Connecting to ResourceManager at " + rmAddress);
|
||||||
applicationsManager =
|
applicationsManager =
|
||||||
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
|
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
|
||||||
|
@ -109,7 +103,13 @@ public class ResourceMgrDelegate {
|
||||||
ClientRMProtocol applicationsManager) {
|
ClientRMProtocol applicationsManager) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.applicationsManager = applicationsManager;
|
this.applicationsManager = applicationsManager;
|
||||||
this.rmAddress = applicationsManager.toString();
|
this.rmAddress = getRmAddress(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static InetSocketAddress getRmAddress(YarnConfiguration conf) {
|
||||||
|
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_PORT);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
|
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
|
||||||
|
@ -168,9 +168,7 @@ public class ResourceMgrDelegate {
|
||||||
org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse
|
org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse
|
||||||
response = applicationsManager.getDelegationToken(rmDTRequest);
|
response = applicationsManager.getDelegationToken(rmDTRequest);
|
||||||
DelegationToken yarnToken = response.getRMDelegationToken();
|
DelegationToken yarnToken = response.getRMDelegationToken();
|
||||||
return new Token<RMDelegationTokenIdentifier>(yarnToken.getIdentifier().array(),
|
return ProtoUtils.convertFromProtoFormat(yarnToken, rmAddress);
|
||||||
yarnToken.getPassword().array(),
|
|
||||||
new Text(yarnToken.getKind()), new Text(yarnToken.getService()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,6 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.mapreduce.v2.LogParams;
|
import org.apache.hadoop.mapreduce.v2.LogParams;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
|
@ -84,6 +83,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -184,7 +184,7 @@ public class YARNRunner implements ClientProtocol {
|
||||||
return resMgrDelegate.getClusterMetrics();
|
return resMgrDelegate.getClusterMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Token<MRDelegationTokenIdentifier> getDelegationTokenFromHS(
|
private Token<?> getDelegationTokenFromHS(
|
||||||
MRClientProtocol hsProxy, Text renewer) throws IOException,
|
MRClientProtocol hsProxy, Text renewer) throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
GetDelegationTokenRequest request = recordFactory
|
GetDelegationTokenRequest request = recordFactory
|
||||||
|
@ -192,10 +192,8 @@ public class YARNRunner implements ClientProtocol {
|
||||||
request.setRenewer(renewer.toString());
|
request.setRenewer(renewer.toString());
|
||||||
DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request)
|
DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request)
|
||||||
.getDelegationToken();
|
.getDelegationToken();
|
||||||
return new Token<MRDelegationTokenIdentifier>(mrDelegationToken
|
return ProtoUtils.convertFromProtoFormat(mrDelegationToken,
|
||||||
.getIdentifier().array(), mrDelegationToken.getPassword().array(),
|
hsProxy.getConnectAddress());
|
||||||
new Text(mrDelegationToken.getKind()), new Text(
|
|
||||||
mrDelegationToken.getService()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -368,6 +368,11 @@ public class TestClientRedirect {
|
||||||
this(AMHOSTADDRESS);
|
this(AMHOSTADDRESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InetSocketAddress getConnectAddress() {
|
||||||
|
return bindAddress;
|
||||||
|
}
|
||||||
|
|
||||||
public AMService(String hostAddress) {
|
public AMService(String hostAddress) {
|
||||||
super("AMService");
|
super("AMService");
|
||||||
this.protocol = MRClientProtocol.class;
|
this.protocol = MRClientProtocol.class;
|
||||||
|
|
|
@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
|
@ -242,7 +243,7 @@ public class TestClientServiceDelegate {
|
||||||
// should use the same proxy to AM2 and so instantiateProxy shouldn't be
|
// should use the same proxy to AM2 and so instantiateProxy shouldn't be
|
||||||
// called.
|
// called.
|
||||||
doReturn(firstGenAMProxy).doReturn(secondGenAMProxy).when(
|
doReturn(firstGenAMProxy).doReturn(secondGenAMProxy).when(
|
||||||
clientServiceDelegate).instantiateAMProxy(any(String.class));
|
clientServiceDelegate).instantiateAMProxy(any(InetSocketAddress.class));
|
||||||
|
|
||||||
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
|
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
|
||||||
Assert.assertNotNull(jobStatus);
|
Assert.assertNotNull(jobStatus);
|
||||||
|
@ -257,7 +258,7 @@ public class TestClientServiceDelegate {
|
||||||
Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
|
Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
|
||||||
|
|
||||||
verify(clientServiceDelegate, times(2)).instantiateAMProxy(
|
verify(clientServiceDelegate, times(2)).instantiateAMProxy(
|
||||||
any(String.class));
|
any(InetSocketAddress.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -286,19 +287,19 @@ public class TestClientServiceDelegate {
|
||||||
Assert.assertEquals("N/A", jobStatus.getJobName());
|
Assert.assertEquals("N/A", jobStatus.getJobName());
|
||||||
|
|
||||||
verify(clientServiceDelegate, times(0)).instantiateAMProxy(
|
verify(clientServiceDelegate, times(0)).instantiateAMProxy(
|
||||||
any(String.class));
|
any(InetSocketAddress.class));
|
||||||
|
|
||||||
// Should not reach AM even for second and third times too.
|
// Should not reach AM even for second and third times too.
|
||||||
jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
|
jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
|
||||||
Assert.assertNotNull(jobStatus);
|
Assert.assertNotNull(jobStatus);
|
||||||
Assert.assertEquals("N/A", jobStatus.getJobName());
|
Assert.assertEquals("N/A", jobStatus.getJobName());
|
||||||
verify(clientServiceDelegate, times(0)).instantiateAMProxy(
|
verify(clientServiceDelegate, times(0)).instantiateAMProxy(
|
||||||
any(String.class));
|
any(InetSocketAddress.class));
|
||||||
jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
|
jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
|
||||||
Assert.assertNotNull(jobStatus);
|
Assert.assertNotNull(jobStatus);
|
||||||
Assert.assertEquals("N/A", jobStatus.getJobName());
|
Assert.assertEquals("N/A", jobStatus.getJobName());
|
||||||
verify(clientServiceDelegate, times(0)).instantiateAMProxy(
|
verify(clientServiceDelegate, times(0)).instantiateAMProxy(
|
||||||
any(String.class));
|
any(InetSocketAddress.class));
|
||||||
|
|
||||||
// The third time around, app is completed, so should go to JHS
|
// The third time around, app is completed, so should go to JHS
|
||||||
JobStatus jobStatus1 = clientServiceDelegate.getJobStatus(oldJobId);
|
JobStatus jobStatus1 = clientServiceDelegate.getJobStatus(oldJobId);
|
||||||
|
@ -309,7 +310,7 @@ public class TestClientServiceDelegate {
|
||||||
Assert.assertEquals(1.0f, jobStatus1.getReduceProgress());
|
Assert.assertEquals(1.0f, jobStatus1.getReduceProgress());
|
||||||
|
|
||||||
verify(clientServiceDelegate, times(0)).instantiateAMProxy(
|
verify(clientServiceDelegate, times(0)).instantiateAMProxy(
|
||||||
any(String.class));
|
any(InetSocketAddress.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -26,11 +26,9 @@ import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
|
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
|
||||||
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
|
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
|
||||||
|
@ -38,11 +36,11 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
|
@ -95,9 +93,8 @@ public class TestJHSSecurity {
|
||||||
// Now try talking to JHS using the delegation token
|
// Now try talking to JHS using the delegation token
|
||||||
UserGroupInformation ugi =
|
UserGroupInformation ugi =
|
||||||
UserGroupInformation.createRemoteUser("TheDarkLord");
|
UserGroupInformation.createRemoteUser("TheDarkLord");
|
||||||
ugi.addToken(new Token<MRDelegationTokenIdentifier>(token.getIdentifier()
|
ugi.addToken(ProtoUtils.convertFromProtoFormat(
|
||||||
.array(), token.getPassword().array(), new Text(token.getKind()),
|
token, jobHistoryServer.getClientService().getBindAddress()));
|
||||||
new Text(token.getService())));
|
|
||||||
final YarnRPC rpc = YarnRPC.create(conf);
|
final YarnRPC rpc = YarnRPC.create(conf);
|
||||||
MRClientProtocol userUsingDT =
|
MRClientProtocol userUsingDT =
|
||||||
ugi.doAs(new PrivilegedAction<MRClientProtocol>() {
|
ugi.doAs(new PrivilegedAction<MRClientProtocol>() {
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.SaslInputStream;
|
import org.apache.hadoop.security.SaslInputStream;
|
||||||
import org.apache.hadoop.security.SaslRpcClient;
|
import org.apache.hadoop.security.SaslRpcClient;
|
||||||
import org.apache.hadoop.security.SaslRpcServer;
|
import org.apache.hadoop.security.SaslRpcServer;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
@ -98,10 +99,8 @@ public class TestUmbilicalProtocolWithJobToken {
|
||||||
JobTokenIdentifier tokenId = new JobTokenIdentifier(new Text(jobId));
|
JobTokenIdentifier tokenId = new JobTokenIdentifier(new Text(jobId));
|
||||||
Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(tokenId, sm);
|
Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(tokenId, sm);
|
||||||
sm.addTokenForJob(jobId, token);
|
sm.addTokenForJob(jobId, token);
|
||||||
Text host = new Text(addr.getAddress().getHostAddress() + ":"
|
SecurityUtil.setTokenService(token, addr);
|
||||||
+ addr.getPort());
|
LOG.info("Service address for token is " + token.getService());
|
||||||
token.setService(host);
|
|
||||||
LOG.info("Service IP address for token is " + host);
|
|
||||||
current.addToken(token);
|
current.addToken(token);
|
||||||
current.doAs(new PrivilegedExceptionAction<Object>() {
|
current.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.ContainerManager;
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
public interface ContainerToken {
|
public interface ContainerToken extends DelegationToken {
|
||||||
/**
|
/**
|
||||||
* Get the token identifier.
|
* Get the token identifier.
|
||||||
* @return token identifier
|
* @return token identifier
|
||||||
|
|
|
@ -18,11 +18,17 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.util;
|
package org.apache.hadoop.yarn.util;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
|
@ -192,4 +198,23 @@ public class ProtoUtils {
|
||||||
return ApplicationAccessType.valueOf(e.name().replace(
|
return ApplicationAccessType.valueOf(e.name().replace(
|
||||||
APP_ACCESS_TYPE_PREFIX, ""));
|
APP_ACCESS_TYPE_PREFIX, ""));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a protobuf token into a rpc token and set its service
|
||||||
|
*
|
||||||
|
* @param protoToken the yarn token
|
||||||
|
* @param serviceAddr the connect address for the service
|
||||||
|
* @return rpc token
|
||||||
|
*/
|
||||||
|
public static <T extends TokenIdentifier> Token<T>
|
||||||
|
convertFromProtoFormat(DelegationToken protoToken, InetSocketAddress serviceAddr) {
|
||||||
|
Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
|
||||||
|
protoToken.getPassword().array(),
|
||||||
|
new Text(protoToken.getKind()),
|
||||||
|
new Text(protoToken.getService()));
|
||||||
|
if (serviceAddr != null) {
|
||||||
|
SecurityUtil.setTokenService(token, serviceAddr);
|
||||||
|
}
|
||||||
|
return token;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
@ -275,10 +276,10 @@ public class BuilderUtils {
|
||||||
containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
|
containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
|
||||||
containerToken.setPassword(password);
|
containerToken.setPassword(password);
|
||||||
// RPC layer client expects ip:port as service for tokens
|
// RPC layer client expects ip:port as service for tokens
|
||||||
InetSocketAddress addr = NetUtils.createSocketAddr(nodeId.getHost(),
|
InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
|
||||||
nodeId.getPort());
|
nodeId.getPort());
|
||||||
containerToken.setService(addr.getAddress().getHostAddress() + ":"
|
// NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
|
||||||
+ addr.getPort());
|
containerToken.setService(SecurityUtil.buildTokenService(addr).toString());
|
||||||
return containerToken;
|
return containerToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -464,8 +464,7 @@ public class ClientRMService extends AbstractService implements
|
||||||
realRMDTtoken.getIdentifier(),
|
realRMDTtoken.getIdentifier(),
|
||||||
realRMDTtoken.getKind().toString(),
|
realRMDTtoken.getKind().toString(),
|
||||||
realRMDTtoken.getPassword(),
|
realRMDTtoken.getPassword(),
|
||||||
clientBindAddress.getAddress().getHostAddress() + ":"
|
realRMDTtoken.getService().toString()
|
||||||
+ clientBindAddress.getPort()
|
|
||||||
));
|
));
|
||||||
return response;
|
return response;
|
||||||
} catch(IOException io) {
|
} catch(IOException io) {
|
||||||
|
|
|
@ -32,9 +32,9 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||||
|
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The launch of the AM itself.
|
* The launch of the AM itself.
|
||||||
|
@ -131,27 +132,25 @@ public class AMLauncher implements Runnable {
|
||||||
|
|
||||||
Container container = application.getMasterContainer();
|
Container container = application.getMasterContainer();
|
||||||
|
|
||||||
final String containerManagerBindAddress = container.getNodeId().toString();
|
final NodeId node = container.getNodeId();
|
||||||
|
final InetSocketAddress containerManagerBindAddress =
|
||||||
|
NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
|
||||||
|
|
||||||
final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
|
final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
|
||||||
|
|
||||||
UserGroupInformation currentUser = UserGroupInformation
|
UserGroupInformation currentUser = UserGroupInformation
|
||||||
.createRemoteUser(containerId.toString());
|
.createRemoteUser(containerId.toString());
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
ContainerToken containerToken = container.getContainerToken();
|
|
||||||
Token<ContainerTokenIdentifier> token =
|
Token<ContainerTokenIdentifier> token =
|
||||||
new Token<ContainerTokenIdentifier>(
|
ProtoUtils.convertFromProtoFormat(container.getContainerToken(),
|
||||||
containerToken.getIdentifier().array(),
|
containerManagerBindAddress);
|
||||||
containerToken.getPassword().array(), new Text(
|
|
||||||
containerToken.getKind()), new Text(
|
|
||||||
containerToken.getService()));
|
|
||||||
currentUser.addToken(token);
|
currentUser.addToken(token);
|
||||||
}
|
}
|
||||||
return currentUser.doAs(new PrivilegedAction<ContainerManager>() {
|
return currentUser.doAs(new PrivilegedAction<ContainerManager>() {
|
||||||
@Override
|
@Override
|
||||||
public ContainerManager run() {
|
public ContainerManager run() {
|
||||||
return (ContainerManager) rpc.getProxy(ContainerManager.class,
|
return (ContainerManager) rpc.getProxy(ContainerManager.class,
|
||||||
NetUtils.createSocketAddr(containerManagerBindAddress), conf);
|
containerManagerBindAddress, conf);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -218,22 +217,21 @@ public class AMLauncher implements Runnable {
|
||||||
Token<ApplicationTokenIdentifier> token =
|
Token<ApplicationTokenIdentifier> token =
|
||||||
new Token<ApplicationTokenIdentifier>(id,
|
new Token<ApplicationTokenIdentifier>(id,
|
||||||
this.rmContext.getApplicationTokenSecretManager());
|
this.rmContext.getApplicationTokenSecretManager());
|
||||||
InetSocketAddress unresolvedAddr = conf.getSocketAddr(
|
InetSocketAddress serviceAddr = conf.getSocketAddr(
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||||
String resolvedAddr =
|
// normally the client should set the service after acquiring the token,
|
||||||
unresolvedAddr.getAddress().getHostAddress() + ":"
|
// but this token is directly provided to the tasks
|
||||||
+ unresolvedAddr.getPort();
|
SecurityUtil.setTokenService(token, serviceAddr);
|
||||||
token.setService(new Text(resolvedAddr));
|
|
||||||
String appMasterTokenEncoded = token.encodeToUrlString();
|
String appMasterTokenEncoded = token.encodeToUrlString();
|
||||||
LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded);
|
LOG.debug("Putting appMaster token in env : " + token);
|
||||||
environment.put(
|
environment.put(
|
||||||
ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
|
ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
|
||||||
appMasterTokenEncoded);
|
appMasterTokenEncoded);
|
||||||
|
|
||||||
// Add the RM token
|
// Add the RM token
|
||||||
credentials.addToken(new Text(resolvedAddr), token);
|
credentials.addToken(token.getService(), token);
|
||||||
DataOutputBuffer dob = new DataOutputBuffer();
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
credentials.writeTokenStorageToStream(dob);
|
credentials.writeTokenStorageToStream(dob);
|
||||||
container.setContainerTokens(
|
container.setContainerTokens(
|
||||||
|
@ -245,7 +243,6 @@ public class AMLauncher implements Runnable {
|
||||||
this.clientToAMSecretManager.getMasterKey(identifier);
|
this.clientToAMSecretManager.getMasterKey(identifier);
|
||||||
String encoded =
|
String encoded =
|
||||||
Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
|
Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
|
||||||
LOG.debug("The encoded client secret-key to be put in env : " + encoded);
|
|
||||||
environment.put(
|
environment.put(
|
||||||
ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME,
|
ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME,
|
||||||
encoded);
|
encoded);
|
||||||
|
|
|
@ -401,7 +401,6 @@ public class TestContainerManagerSecurity {
|
||||||
appTokenSecretManager);
|
appTokenSecretManager);
|
||||||
SecurityUtil.setTokenService(appToken, schedulerAddr);
|
SecurityUtil.setTokenService(appToken, schedulerAddr);
|
||||||
currentUser.addToken(appToken);
|
currentUser.addToken(appToken);
|
||||||
SecurityUtil.setTokenService(appToken, schedulerAddr);
|
|
||||||
|
|
||||||
AMRMProtocol scheduler = currentUser
|
AMRMProtocol scheduler = currentUser
|
||||||
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
||||||
|
|
Loading…
Reference in New Issue