merge YARN-135 from trunk. Client tokens should be per app-attempt, and should be unregistered on App-finish. Contributed by Vinod Kumar Vavilapalli
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1433571 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2a7658d519
commit
44b3be14bc
|
@ -125,8 +125,8 @@ public class MRClientService extends AbstractService
|
|||
.getenv(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME);
|
||||
byte[] bytes = Base64.decodeBase64(secretKeyStr);
|
||||
secretManager =
|
||||
new ClientToAMTokenSecretManager(this.appContext.getApplicationID(),
|
||||
bytes);
|
||||
new ClientToAMTokenSecretManager(
|
||||
this.appContext.getApplicationAttemptId(), bytes);
|
||||
}
|
||||
server =
|
||||
rpc.getServer(MRClientProtocol.class, protocolHandler, address,
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.hadoop.yarn.api.records.DelegationToken;
|
|||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.DelegationTokenPBImpl;
|
||||
|
||||
|
||||
public class GetDelegationTokenResponsePBImpl extends
|
||||
ProtoBase<GetDelegationTokenResponseProto> implements GetDelegationTokenResponse {
|
||||
|
||||
|
@ -97,7 +96,6 @@ public class GetDelegationTokenResponsePBImpl extends
|
|||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
|
||||
private DelegationTokenPBImpl convertFromProtoFormat(TokenProto p) {
|
||||
return new DelegationTokenPBImpl(p);
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.JobStatus;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
|
@ -69,14 +70,16 @@ import org.apache.hadoop.security.token.Token;
|
|||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ClientToken;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
|
||||
public class ClientServiceDelegate {
|
||||
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
|
||||
|
@ -176,13 +179,10 @@ public class ClientServiceDelegate {
|
|||
serviceAddr = NetUtils.createSocketAddrForHost(
|
||||
application.getHost(), application.getRpcPort());
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
String clientTokenEncoded = application.getClientToken();
|
||||
Token<ApplicationTokenIdentifier> clientToken =
|
||||
new Token<ApplicationTokenIdentifier>();
|
||||
clientToken.decodeFromUrlString(clientTokenEncoded);
|
||||
// RPC layer client expects ip:port as service for tokens
|
||||
SecurityUtil.setTokenService(clientToken, serviceAddr);
|
||||
newUgi.addToken(clientToken);
|
||||
ClientToken clientToken = application.getClientToken();
|
||||
Token<ClientTokenIdentifier> token =
|
||||
ProtoUtils.convertFromProtoFormat(clientToken, serviceAddr);
|
||||
newUgi.addToken(token);
|
||||
}
|
||||
LOG.debug("Connecting to " + serviceAddr);
|
||||
final InetSocketAddress finalServiceAddr = serviceAddr;
|
||||
|
|
|
@ -62,8 +62,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
|
|||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
|
@ -86,10 +86,9 @@ public class NotRunningJob implements MRClientProtocol {
|
|||
.newRecordInstance(ApplicationAttemptId.class);
|
||||
|
||||
// Setting AppState to NEW and finalStatus to UNDEFINED as they are never
|
||||
// used
|
||||
// for a non running job
|
||||
// used for a non running job
|
||||
return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
|
||||
"N/A", "N/A", "N/A", "N/A", 0, "", YarnApplicationState.NEW, "N/A",
|
||||
"N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
|
||||
"N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A");
|
||||
}
|
||||
|
||||
|
|
|
@ -179,6 +179,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
YARN-336. Fair scheduler FIFO scheduling within a queue only allows 1
|
||||
app at a time. (Sandy Ryza via tomwhite)
|
||||
|
||||
YARN-135. Client tokens should be per app-attempt, and should be
|
||||
unregistered on App-finish. (vinodkv via sseth)
|
||||
|
||||
Release 2.0.2-alpha - 2012-09-07
|
||||
|
||||
YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy)
|
||||
|
|
|
@ -45,8 +45,8 @@ public interface ApplicationMaster {
|
|||
YarnApplicationState getState();
|
||||
void setState(YarnApplicationState state);
|
||||
|
||||
String getClientToken();
|
||||
void setClientToken(String clientToken);
|
||||
ClientToken getClientToken();
|
||||
void setClientToken(ClientToken clientToken);
|
||||
|
||||
int getAMFailCount();
|
||||
void setAMFailCount(int amFailCount);
|
||||
|
|
|
@ -144,11 +144,11 @@ public interface ApplicationReport {
|
|||
*/
|
||||
@Public
|
||||
@Stable
|
||||
String getClientToken();
|
||||
ClientToken getClientToken();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setClientToken(String clientToken);
|
||||
void setClientToken(ClientToken clientToken);
|
||||
|
||||
/**
|
||||
* Get the <code>YarnApplicationState</code> of the application.
|
||||
|
|
|
@ -18,9 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
|
@ -43,53 +40,4 @@ import org.apache.hadoop.yarn.api.ContainerManager;
|
|||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public interface ContainerToken extends DelegationToken {
|
||||
/**
|
||||
* Get the token identifier.
|
||||
* @return token identifier
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
ByteBuffer getIdentifier();
|
||||
|
||||
@Private
|
||||
@Stable
|
||||
void setIdentifier(ByteBuffer identifier);
|
||||
|
||||
/**
|
||||
* Get the token password
|
||||
* @return token password
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
ByteBuffer getPassword();
|
||||
|
||||
@Private
|
||||
@Stable
|
||||
void setPassword(ByteBuffer password);
|
||||
|
||||
/**
|
||||
* Get the token kind.
|
||||
* @return token kind
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
String getKind();
|
||||
|
||||
@Private
|
||||
@Stable
|
||||
void setKind(String kind);
|
||||
|
||||
/**
|
||||
* Get the service to which the token is allocated.
|
||||
* @return service to which the token is allocated
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
String getService();
|
||||
|
||||
@Private
|
||||
@Stable
|
||||
void setService(String service);
|
||||
|
||||
}
|
||||
public interface ContainerToken extends Token {}
|
||||
|
|
|
@ -18,12 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||
|
||||
/**
|
||||
|
@ -33,52 +29,4 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
|
|||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public interface DelegationToken {
|
||||
/**
|
||||
* Get the token identifier.
|
||||
* @return token identifier
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
ByteBuffer getIdentifier();
|
||||
|
||||
@Private
|
||||
@Stable
|
||||
void setIdentifier(ByteBuffer identifier);
|
||||
|
||||
/**
|
||||
* Get the token password
|
||||
* @return token password
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
ByteBuffer getPassword();
|
||||
|
||||
@Private
|
||||
@Stable
|
||||
void setPassword(ByteBuffer password);
|
||||
|
||||
/**
|
||||
* Get the token kind.
|
||||
* @return token kind
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
String getKind();
|
||||
|
||||
@Private
|
||||
@Stable
|
||||
void setKind(String kind);
|
||||
|
||||
/**
|
||||
* Get the service to which the token is allocated.
|
||||
* @return service to which the token is allocated
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
String getService();
|
||||
|
||||
@Private
|
||||
@Stable
|
||||
void setService(String service);
|
||||
}
|
||||
public interface DelegationToken extends Token {}
|
||||
|
|
|
@ -18,10 +18,11 @@
|
|||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ClientToken;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
|
@ -31,15 +32,15 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStatusProto;
|
|||
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
|
||||
|
||||
public class ApplicationMasterPBImpl extends ProtoBase<ApplicationMasterProto> implements ApplicationMaster {
|
||||
public class ApplicationMasterPBImpl extends ProtoBase<ApplicationMasterProto>
|
||||
implements ApplicationMaster {
|
||||
ApplicationMasterProto proto = ApplicationMasterProto.getDefaultInstance();
|
||||
ApplicationMasterProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private ApplicationId applicationId = null;
|
||||
private ApplicationStatus applicationStatus = null;
|
||||
|
||||
private ClientToken clientToken = null;
|
||||
|
||||
public ApplicationMasterPBImpl() {
|
||||
builder = ApplicationMasterProto.newBuilder();
|
||||
|
@ -59,13 +60,22 @@ public class ApplicationMasterPBImpl extends ProtoBase<ApplicationMasterProto> i
|
|||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (this.applicationId != null && !((ApplicationIdPBImpl)this.applicationId).getProto().equals(builder.getApplicationId())) {
|
||||
if (this.applicationId != null
|
||||
&& !((ApplicationIdPBImpl) this.applicationId).getProto().equals(
|
||||
builder.getApplicationId())) {
|
||||
builder.setApplicationId(convertToProtoFormat(this.applicationId));
|
||||
}
|
||||
|
||||
if (this.applicationStatus != null && !((ApplicationStatusPBImpl)this.applicationStatus).getProto().equals(builder.getStatus())) {
|
||||
if (this.applicationStatus != null
|
||||
&& !((ApplicationStatusPBImpl) this.applicationStatus).getProto()
|
||||
.equals(builder.getStatus())) {
|
||||
builder.setStatus(convertToProtoFormat(this.applicationStatus));
|
||||
}
|
||||
if (this.clientToken != null
|
||||
&& !((ClientTokenPBImpl) this.clientToken).getProto().equals(
|
||||
builder.getClientToken())) {
|
||||
builder.setClientToken(convertToProtoFormat(this.clientToken));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -188,23 +198,26 @@ public class ApplicationMasterPBImpl extends ProtoBase<ApplicationMasterProto> i
|
|||
this.applicationStatus = status;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClientToken() {
|
||||
public ClientToken getClientToken() {
|
||||
ApplicationMasterProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.clientToken != null) {
|
||||
return this.clientToken;
|
||||
}
|
||||
if (!p.hasClientToken()) {
|
||||
return null;
|
||||
}
|
||||
return (p.getClientToken());
|
||||
this.clientToken = convertFromProtoFormat(p.getClientToken());
|
||||
return this.clientToken;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void setClientToken(String clientToken) {
|
||||
public void setClientToken(ClientToken clientToken) {
|
||||
maybeInitBuilder();
|
||||
if (clientToken == null) {
|
||||
if (clientToken == null)
|
||||
builder.clearClientToken();
|
||||
return;
|
||||
}
|
||||
builder.setClientToken((clientToken));
|
||||
this.clientToken = clientToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -271,4 +284,11 @@ public class ApplicationMasterPBImpl extends ProtoBase<ApplicationMasterProto> i
|
|||
return ((ApplicationStatusPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private ClientTokenPBImpl convertFromProtoFormat(TokenProto p) {
|
||||
return new ClientTokenPBImpl(p);
|
||||
}
|
||||
|
||||
private TokenProto convertToProtoFormat(ClientToken t) {
|
||||
return ((ClientTokenPBImpl)t).getProto();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,19 +18,21 @@
|
|||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ClientToken;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
|
||||
|
@ -40,8 +42,9 @@ implements ApplicationReport {
|
|||
ApplicationReportProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
ApplicationId applicationId;
|
||||
ApplicationAttemptId currentApplicationAttemptId;
|
||||
private ApplicationId applicationId;
|
||||
private ApplicationAttemptId currentApplicationAttemptId;
|
||||
private ClientToken clientToken = null;
|
||||
|
||||
public ApplicationReportPBImpl() {
|
||||
builder = ApplicationReportProto.newBuilder();
|
||||
|
@ -159,12 +162,16 @@ implements ApplicationReport {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String getClientToken() {
|
||||
public ClientToken getClientToken() {
|
||||
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.clientToken != null) {
|
||||
return this.clientToken;
|
||||
}
|
||||
if (!p.hasClientToken()) {
|
||||
return null;
|
||||
}
|
||||
return (p.getClientToken());
|
||||
this.clientToken = convertFromProtoFormat(p.getClientToken());
|
||||
return this.clientToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -176,7 +183,6 @@ implements ApplicationReport {
|
|||
return p.getUser();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getDiagnostics() {
|
||||
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
@ -290,13 +296,11 @@ implements ApplicationReport {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setClientToken(String clientToken) {
|
||||
public void setClientToken(ClientToken clientToken) {
|
||||
maybeInitBuilder();
|
||||
if (clientToken == null) {
|
||||
if (clientToken == null)
|
||||
builder.clearClientToken();
|
||||
return;
|
||||
}
|
||||
builder.setClientToken((clientToken));
|
||||
this.clientToken = clientToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -360,6 +364,11 @@ implements ApplicationReport {
|
|||
builder.getCurrentApplicationAttemptId())) {
|
||||
builder.setCurrentApplicationAttemptId(convertToProtoFormat(this.currentApplicationAttemptId));
|
||||
}
|
||||
if (this.clientToken != null
|
||||
&& !((ClientTokenPBImpl) this.clientToken).getProto().equals(
|
||||
builder.getClientToken())) {
|
||||
builder.setClientToken(convertToProtoFormat(this.clientToken));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -419,4 +428,11 @@ implements ApplicationReport {
|
|||
return ProtoUtils.convertToProtoFormat(s);
|
||||
}
|
||||
|
||||
private ClientTokenPBImpl convertFromProtoFormat(TokenProto p) {
|
||||
return new ClientTokenPBImpl(p);
|
||||
}
|
||||
|
||||
private TokenProto convertToProtoFormat(ClientToken t) {
|
||||
return ((ClientTokenPBImpl)t).getProto();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -38,8 +37,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
|||
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
|
||||
|
||||
|
||||
public class ContainerPBImpl extends ProtoBase<ContainerProto> implements Container {
|
||||
|
||||
|
|
|
@ -18,149 +18,16 @@
|
|||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
|
||||
public class ContainerTokenPBImpl extends TokenPBImpl implements ContainerToken {
|
||||
|
||||
|
||||
public class ContainerTokenPBImpl extends ProtoBase<TokenProto> implements ContainerToken {
|
||||
private TokenProto proto = TokenProto.getDefaultInstance();
|
||||
private TokenProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
private ByteBuffer identifier;
|
||||
private ByteBuffer password;
|
||||
|
||||
|
||||
public ContainerTokenPBImpl() {
|
||||
builder = TokenProto.newBuilder();
|
||||
super();
|
||||
}
|
||||
|
||||
public ContainerTokenPBImpl(TokenProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
public ContainerTokenPBImpl(TokenProto p) {
|
||||
super(p);
|
||||
}
|
||||
|
||||
public synchronized TokenProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private synchronized void mergeLocalToBuilder() {
|
||||
if (this.identifier != null) {
|
||||
builder.setIdentifier(convertToProtoFormat(this.identifier));
|
||||
}
|
||||
if (this.password != null) {
|
||||
builder.setPassword(convertToProtoFormat(this.password));
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
maybeInitBuilder();
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private synchronized void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = TokenProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized ByteBuffer getIdentifier() {
|
||||
TokenProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.identifier != null) {
|
||||
return this.identifier;
|
||||
}
|
||||
if (!p.hasIdentifier()) {
|
||||
return null;
|
||||
}
|
||||
this.identifier = convertFromProtoFormat(p.getIdentifier());
|
||||
return this.identifier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setIdentifier(ByteBuffer identifier) {
|
||||
maybeInitBuilder();
|
||||
if (identifier == null)
|
||||
builder.clearIdentifier();
|
||||
this.identifier = identifier;
|
||||
}
|
||||
@Override
|
||||
public synchronized ByteBuffer getPassword() {
|
||||
TokenProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.password != null) {
|
||||
return this.password;
|
||||
}
|
||||
if (!p.hasPassword()) {
|
||||
return null;
|
||||
}
|
||||
this.password = convertFromProtoFormat(p.getPassword());
|
||||
return this.password;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setPassword(ByteBuffer password) {
|
||||
maybeInitBuilder();
|
||||
if (password == null)
|
||||
builder.clearPassword();
|
||||
this.password = password;
|
||||
}
|
||||
@Override
|
||||
public synchronized String getKind() {
|
||||
TokenProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasKind()) {
|
||||
return null;
|
||||
}
|
||||
return (p.getKind());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setKind(String kind) {
|
||||
maybeInitBuilder();
|
||||
if (kind == null) {
|
||||
builder.clearKind();
|
||||
return;
|
||||
}
|
||||
builder.setKind((kind));
|
||||
}
|
||||
@Override
|
||||
public synchronized String getService() {
|
||||
TokenProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasService()) {
|
||||
return null;
|
||||
}
|
||||
return (p.getService());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setService(String service) {
|
||||
maybeInitBuilder();
|
||||
if (service == null) {
|
||||
builder.clearService();
|
||||
return;
|
||||
}
|
||||
builder.setService((service));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("ContainerToken { ");
|
||||
sb.append("kind: ").append(getKind()).append(", ");
|
||||
sb.append("service: ").append(getService()).append(" }");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,139 +18,17 @@
|
|||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
|
||||
public class DelegationTokenPBImpl extends ProtoBase<TokenProto>
|
||||
implements DelegationToken {
|
||||
private TokenProto proto = TokenProto.getDefaultInstance();
|
||||
private TokenProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
private ByteBuffer identifier;
|
||||
private ByteBuffer password;
|
||||
|
||||
|
||||
public class DelegationTokenPBImpl extends TokenPBImpl implements
|
||||
DelegationToken {
|
||||
|
||||
public DelegationTokenPBImpl() {
|
||||
builder = TokenProto.newBuilder();
|
||||
super();
|
||||
}
|
||||
|
||||
public DelegationTokenPBImpl(TokenProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
public DelegationTokenPBImpl(TokenProto p) {
|
||||
super(p);
|
||||
}
|
||||
|
||||
public synchronized TokenProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private synchronized void mergeLocalToBuilder() {
|
||||
if (this.identifier != null) {
|
||||
builder.setIdentifier(convertToProtoFormat(this.identifier));
|
||||
}
|
||||
if (this.password != null) {
|
||||
builder.setPassword(convertToProtoFormat(this.password));
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
maybeInitBuilder();
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private synchronized void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = TokenProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized ByteBuffer getIdentifier() {
|
||||
TokenProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.identifier != null) {
|
||||
return this.identifier;
|
||||
}
|
||||
if (!p.hasIdentifier()) {
|
||||
return null;
|
||||
}
|
||||
this.identifier = convertFromProtoFormat(p.getIdentifier());
|
||||
return this.identifier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setIdentifier(ByteBuffer identifier) {
|
||||
maybeInitBuilder();
|
||||
if (identifier == null)
|
||||
builder.clearIdentifier();
|
||||
this.identifier = identifier;
|
||||
}
|
||||
@Override
|
||||
public synchronized ByteBuffer getPassword() {
|
||||
TokenProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.password != null) {
|
||||
return this.password;
|
||||
}
|
||||
if (!p.hasPassword()) {
|
||||
return null;
|
||||
}
|
||||
this.password = convertFromProtoFormat(p.getPassword());
|
||||
return this.password;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setPassword(ByteBuffer password) {
|
||||
maybeInitBuilder();
|
||||
if (password == null)
|
||||
builder.clearPassword();
|
||||
this.password = password;
|
||||
}
|
||||
@Override
|
||||
public synchronized String getKind() {
|
||||
TokenProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasKind()) {
|
||||
return null;
|
||||
}
|
||||
return (p.getKind());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setKind(String kind) {
|
||||
maybeInitBuilder();
|
||||
if (kind == null) {
|
||||
builder.clearKind();
|
||||
return;
|
||||
}
|
||||
builder.setKind((kind));
|
||||
}
|
||||
@Override
|
||||
public synchronized String getService() {
|
||||
TokenProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasService()) {
|
||||
return null;
|
||||
}
|
||||
return (p.getService());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setService(String service) {
|
||||
maybeInitBuilder();
|
||||
if (service == null) {
|
||||
builder.clearService();
|
||||
return;
|
||||
}
|
||||
builder.setService((service));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
|
@ -37,16 +36,16 @@ import org.apache.hadoop.yarn.api.records.NodeState;
|
|||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.QueueACLProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
|
||||
|
@ -206,8 +205,9 @@ public class ProtoUtils {
|
|||
* @param serviceAddr the connect address for the service
|
||||
* @return rpc token
|
||||
*/
|
||||
public static <T extends TokenIdentifier> Token<T>
|
||||
convertFromProtoFormat(DelegationToken protoToken, InetSocketAddress serviceAddr) {
|
||||
public static <T extends TokenIdentifier> Token<T> convertFromProtoFormat(
|
||||
org.apache.hadoop.yarn.api.records.Token protoToken,
|
||||
InetSocketAddress serviceAddr) {
|
||||
Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
|
||||
protoToken.getPassword().array(),
|
||||
new Text(protoToken.getKind()),
|
||||
|
|
|
@ -102,7 +102,7 @@ message ApplicationMasterProto {
|
|||
optional string trackingUrl = 4;
|
||||
optional ApplicationStatusProto status = 5;
|
||||
optional YarnApplicationStateProto state = 6;
|
||||
optional string client_token = 7;
|
||||
optional hadoop.common.TokenProto client_token = 7;
|
||||
optional int32 containerCount = 8;
|
||||
optional int32 amFailCount = 9;
|
||||
optional string diagnostics = 10 [default = ""];
|
||||
|
@ -151,7 +151,7 @@ message ApplicationReportProto {
|
|||
optional string name = 4;
|
||||
optional string host = 5;
|
||||
optional int32 rpc_port = 6;
|
||||
optional string client_token = 7;
|
||||
optional hadoop.common.TokenProto client_token = 7;
|
||||
optional ApplicationStatusProto status = 8;
|
||||
optional YarnApplicationStateProto yarn_application_state = 9;
|
||||
optional ContainerProto masterContainer = 10;
|
||||
|
|
|
@ -21,24 +21,25 @@ package org.apache.hadoop.yarn.security.client;
|
|||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
||||
public abstract class BaseClientToAMTokenSecretManager extends
|
||||
SecretManager<ClientTokenIdentifier> {
|
||||
|
||||
public abstract SecretKey getMasterKey(ApplicationId applicationId);
|
||||
public abstract SecretKey getMasterKey(
|
||||
ApplicationAttemptId applicationAttemptId);
|
||||
|
||||
@Override
|
||||
public synchronized byte[] createPassword(
|
||||
ClientTokenIdentifier identifier) {
|
||||
return createPassword(identifier.getBytes(),
|
||||
getMasterKey(identifier.getApplicationID()));
|
||||
getMasterKey(identifier.getApplicationAttemptID()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] retrievePassword(ClientTokenIdentifier identifier)
|
||||
throws SecretManager.InvalidToken {
|
||||
SecretKey masterKey = getMasterKey(identifier.getApplicationID());
|
||||
SecretKey masterKey = getMasterKey(identifier.getApplicationAttemptID());
|
||||
if (masterKey == null) {
|
||||
throw new SecretManager.InvalidToken("Illegal client-token!");
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.security.client;
|
|||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
||||
public class ClientToAMTokenSecretManager extends
|
||||
BaseClientToAMTokenSecretManager {
|
||||
|
@ -29,14 +29,14 @@ public class ClientToAMTokenSecretManager extends
|
|||
// Only one client-token and one master-key for AM
|
||||
private final SecretKey masterKey;
|
||||
|
||||
public ClientToAMTokenSecretManager(ApplicationId applicationID,
|
||||
byte[] secretKeyBytes) {
|
||||
public ClientToAMTokenSecretManager(
|
||||
ApplicationAttemptId applicationAttemptID, byte[] secretKeyBytes) {
|
||||
super();
|
||||
this.masterKey = SecretManager.createSecretKey(secretKeyBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SecretKey getMasterKey(ApplicationId applicationID) {
|
||||
public SecretKey getMasterKey(ApplicationAttemptId applicationAttemptID) {
|
||||
// Only one client-token and one master-key for AM, just return that.
|
||||
return this.masterKey;
|
||||
}
|
||||
|
|
|
@ -27,14 +27,14 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
|
||||
public class ClientTokenIdentifier extends TokenIdentifier {
|
||||
|
||||
public static final Text KIND_NAME = new Text("YARN_CLIENT_TOKEN");
|
||||
|
||||
private ApplicationId applicationId;
|
||||
private ApplicationAttemptId applicationAttemptId;
|
||||
|
||||
// TODO: Add more information in the tokenID such that it is not
|
||||
// transferrable, more secure etc.
|
||||
|
@ -42,25 +42,29 @@ public class ClientTokenIdentifier extends TokenIdentifier {
|
|||
public ClientTokenIdentifier() {
|
||||
}
|
||||
|
||||
public ClientTokenIdentifier(ApplicationId id) {
|
||||
public ClientTokenIdentifier(ApplicationAttemptId id) {
|
||||
this();
|
||||
this.applicationId = id;
|
||||
this.applicationAttemptId = id;
|
||||
}
|
||||
|
||||
public ApplicationId getApplicationID() {
|
||||
return this.applicationId;
|
||||
public ApplicationAttemptId getApplicationAttemptID() {
|
||||
return this.applicationAttemptId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
out.writeLong(this.applicationId.getClusterTimestamp());
|
||||
out.writeInt(this.applicationId.getId());
|
||||
out.writeLong(this.applicationAttemptId.getApplicationId()
|
||||
.getClusterTimestamp());
|
||||
out.writeInt(this.applicationAttemptId.getApplicationId().getId());
|
||||
out.writeInt(this.applicationAttemptId.getAttemptId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.applicationId =
|
||||
BuilderUtils.newApplicationId(in.readLong(), in.readInt());
|
||||
this.applicationAttemptId =
|
||||
BuilderUtils.newApplicationAttemptId(
|
||||
BuilderUtils.newApplicationId(in.readLong(), in.readInt()),
|
||||
in.readInt());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -70,10 +74,10 @@ public class ClientTokenIdentifier extends TokenIdentifier {
|
|||
|
||||
@Override
|
||||
public UserGroupInformation getUser() {
|
||||
if (this.applicationId == null) {
|
||||
if (this.applicationAttemptId == null) {
|
||||
return null;
|
||||
}
|
||||
return UserGroupInformation.createRemoteUser(this.applicationId.toString());
|
||||
return UserGroupInformation.createRemoteUser(this.applicationAttemptId.toString());
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
|
|
@ -25,10 +25,6 @@ import java.util.Comparator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
|
@ -37,12 +33,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ClientToken;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
|
@ -52,9 +50,9 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
|
@ -256,30 +254,36 @@ public class BuilderUtils {
|
|||
return container;
|
||||
}
|
||||
|
||||
public static DelegationToken newDelegationToken(
|
||||
byte[] identifier, String kind, byte[] password,
|
||||
String service) {
|
||||
DelegationToken delegationToken = recordFactory.newRecordInstance(
|
||||
DelegationToken.class);
|
||||
delegationToken.setIdentifier(ByteBuffer.wrap(identifier));
|
||||
delegationToken.setKind(kind);
|
||||
delegationToken.setPassword(ByteBuffer.wrap(password));
|
||||
delegationToken.setService(service);
|
||||
return delegationToken;
|
||||
public static <T extends Token> T newToken(Class<T> tokenClass,
|
||||
byte[] identifier, String kind, byte[] password, String service) {
|
||||
T token = recordFactory.newRecordInstance(tokenClass);
|
||||
token.setIdentifier(ByteBuffer.wrap(identifier));
|
||||
token.setKind(kind);
|
||||
token.setPassword(ByteBuffer.wrap(password));
|
||||
token.setService(service);
|
||||
return token;
|
||||
}
|
||||
|
||||
|
||||
public static DelegationToken newDelegationToken(byte[] identifier,
|
||||
String kind, byte[] password, String service) {
|
||||
return newToken(DelegationToken.class, identifier, kind, password, service);
|
||||
}
|
||||
|
||||
public static ClientToken newClientToken(byte[] identifier, String kind,
|
||||
byte[] password, String service) {
|
||||
return newToken(ClientToken.class, identifier, kind, password, service);
|
||||
}
|
||||
|
||||
public static ContainerToken newContainerToken(NodeId nodeId,
|
||||
ByteBuffer password, ContainerTokenIdentifier tokenIdentifier) {
|
||||
ContainerToken containerToken = recordFactory
|
||||
.newRecordInstance(ContainerToken.class);
|
||||
containerToken.setIdentifier(ByteBuffer.wrap(tokenIdentifier.getBytes()));
|
||||
containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
|
||||
containerToken.setPassword(password);
|
||||
byte[] password, ContainerTokenIdentifier tokenIdentifier) {
|
||||
// RPC layer client expects ip:port as service for tokens
|
||||
InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
|
||||
nodeId.getPort());
|
||||
// NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
|
||||
containerToken.setService(SecurityUtil.buildTokenService(addr).toString());
|
||||
InetSocketAddress addr =
|
||||
NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
|
||||
// NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
|
||||
ContainerToken containerToken =
|
||||
newToken(ContainerToken.class, tokenIdentifier.getBytes(),
|
||||
ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
|
||||
.buildTokenService(addr).toString());
|
||||
return containerToken;
|
||||
}
|
||||
|
||||
|
@ -333,7 +337,7 @@ public class BuilderUtils {
|
|||
public static ApplicationReport newApplicationReport(
|
||||
ApplicationId applicationId, ApplicationAttemptId applicationAttemptId,
|
||||
String user, String queue, String name, String host, int rpcPort,
|
||||
String clientToken, YarnApplicationState state, String diagnostics,
|
||||
ClientToken clientToken, YarnApplicationState state, String diagnostics,
|
||||
String url, long startTime, long finishTime,
|
||||
FinalApplicationStatus finalStatus,
|
||||
ApplicationResourceUsageReport appResources, String origTrackingUrl) {
|
||||
|
|
|
@ -199,7 +199,6 @@ public class BaseContainerTokenSecretManager extends
|
|||
this.readLock.unlock();
|
||||
}
|
||||
|
||||
return BuilderUtils.newContainerToken(nodeId, ByteBuffer.wrap(password),
|
||||
tokenIdentifier);
|
||||
return BuilderUtils.newContainerToken(nodeId, password, tokenIdentifier);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,19 +28,17 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
|
@ -236,21 +234,6 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
RMApp application = null;
|
||||
try {
|
||||
|
||||
String clientTokenStr = null;
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
|
||||
// TODO: This needs to move to per-AppAttempt
|
||||
this.rmContext.getClientToAMTokenSecretManager().registerApplication(
|
||||
applicationId);
|
||||
|
||||
Token<ClientTokenIdentifier> clientToken = new
|
||||
Token<ClientTokenIdentifier>(
|
||||
new ClientTokenIdentifier(applicationId),
|
||||
this.rmContext.getClientToAMTokenSecretManager());
|
||||
clientTokenStr = clientToken.encodeToUrlString();
|
||||
LOG.debug("Sending client token as " + clientTokenStr);
|
||||
}
|
||||
|
||||
// Sanity checks
|
||||
if (submissionContext.getQueue() == null) {
|
||||
submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
|
||||
|
@ -265,8 +248,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
new RMAppImpl(applicationId, rmContext, this.conf,
|
||||
submissionContext.getApplicationName(),
|
||||
submissionContext.getUser(), submissionContext.getQueue(),
|
||||
submissionContext, clientTokenStr, this.scheduler,
|
||||
this.masterService, submitTime);
|
||||
submissionContext, this.scheduler, this.masterService,
|
||||
submitTime);
|
||||
|
||||
// Sanity check - duplicate?
|
||||
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
|
||||
|
|
|
@ -237,7 +237,7 @@ public class AMLauncher implements Runnable {
|
|||
|
||||
SecretKey clientSecretKey =
|
||||
this.rmContext.getClientToAMTokenSecretManager().getMasterKey(
|
||||
applicationId);
|
||||
application.getAppAttemptId());
|
||||
String encoded =
|
||||
Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
|
||||
environment.put(
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ClientToken;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
|
@ -49,11 +50,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
|
@ -82,7 +83,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
private final String queue;
|
||||
private final String name;
|
||||
private final ApplicationSubmissionContext submissionContext;
|
||||
private final String clientTokenStr;
|
||||
private final Dispatcher dispatcher;
|
||||
private final YarnScheduler scheduler;
|
||||
private final ApplicationMasterService masterService;
|
||||
|
@ -213,9 +213,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
|
||||
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
||||
Configuration config, String name, String user, String queue,
|
||||
ApplicationSubmissionContext submissionContext, String clientTokenStr,
|
||||
YarnScheduler scheduler, ApplicationMasterService masterService,
|
||||
long submitTime) {
|
||||
ApplicationSubmissionContext submissionContext,
|
||||
YarnScheduler scheduler,
|
||||
ApplicationMasterService masterService, long submitTime) {
|
||||
|
||||
this.applicationId = applicationId;
|
||||
this.name = name;
|
||||
|
@ -226,7 +226,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
this.user = user;
|
||||
this.queue = queue;
|
||||
this.submissionContext = submissionContext;
|
||||
this.clientTokenStr = clientTokenStr;
|
||||
this.scheduler = scheduler;
|
||||
this.masterService = masterService;
|
||||
this.submitTime = submitTime;
|
||||
|
@ -402,7 +401,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
|
||||
try {
|
||||
ApplicationAttemptId currentApplicationAttemptId = null;
|
||||
String clientToken = UNAVAILABLE;
|
||||
ClientToken clientToken = null;
|
||||
String trackingUrl = UNAVAILABLE;
|
||||
String host = UNAVAILABLE;
|
||||
String origTrackingUrl = UNAVAILABLE;
|
||||
|
@ -541,9 +540,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
appAttemptId.setApplicationId(applicationId);
|
||||
appAttemptId.setAttemptId(attempts.size() + 1);
|
||||
|
||||
RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
|
||||
clientTokenStr, rmContext, scheduler, masterService,
|
||||
submissionContext, conf);
|
||||
RMAppAttempt attempt =
|
||||
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
|
||||
submissionContext, conf);
|
||||
attempts.put(appAttemptId, attempt);
|
||||
currentAttempt = attempt;
|
||||
if(startAttempt) {
|
||||
|
|
|
@ -23,10 +23,11 @@ import java.util.Set;
|
|||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ClientToken;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
|
@ -91,7 +92,7 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
|||
* The token required by the clients to talk to the application attempt
|
||||
* @return the token required by the clients to talk to the application attempt
|
||||
*/
|
||||
String getClientToken();
|
||||
ClientToken getClientToken();
|
||||
|
||||
/**
|
||||
* Diagnostics information for the application attempt.
|
||||
|
|
|
@ -39,14 +39,17 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ClientToken;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -55,6 +58,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||
|
@ -119,7 +123,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
private final WriteLock writeLock;
|
||||
|
||||
private final ApplicationAttemptId applicationAttemptId;
|
||||
private final String clientToken;
|
||||
private ClientToken clientToken;
|
||||
private final ApplicationSubmissionContext submissionContext;
|
||||
|
||||
//nodes on while this attempt's containers ran
|
||||
|
@ -347,11 +351,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
.installTopology();
|
||||
|
||||
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
||||
String clientToken, RMContext rmContext, YarnScheduler scheduler,
|
||||
RMContext rmContext, YarnScheduler scheduler,
|
||||
ApplicationMasterService masterService,
|
||||
ApplicationSubmissionContext submissionContext,
|
||||
Configuration conf) {
|
||||
|
||||
this.conf = conf;
|
||||
this.applicationAttemptId = appAttemptId;
|
||||
this.rmContext = rmContext;
|
||||
|
@ -359,7 +362,19 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
this.submissionContext = submissionContext;
|
||||
this.scheduler = scheduler;
|
||||
this.masterService = masterService;
|
||||
this.clientToken = clientToken;
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
|
||||
this.rmContext.getClientToAMTokenSecretManager().registerApplication(
|
||||
appAttemptId);
|
||||
|
||||
Token<ClientTokenIdentifier> token =
|
||||
new Token<ClientTokenIdentifier>(new ClientTokenIdentifier(
|
||||
appAttemptId), this.rmContext.getClientToAMTokenSecretManager());
|
||||
this.clientToken =
|
||||
BuilderUtils.newClientToken(token.getIdentifier(), token.getKind()
|
||||
.toString(), token.getPassword(), token.getService().toString());
|
||||
}
|
||||
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
this.readLock = lock.readLock();
|
||||
|
@ -477,7 +492,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String getClientToken() {
|
||||
public ClientToken getClientToken() {
|
||||
return this.clientToken;
|
||||
}
|
||||
|
||||
|
@ -963,6 +978,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
appAttempt.rmContext.getAMFinishingMonitor().unregister(
|
||||
appAttempt.getAppAttemptId());
|
||||
|
||||
|
||||
// Unregister from the ClientTokenSecretManager
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
appAttempt.rmContext.getClientToAMTokenSecretManager()
|
||||
.unRegisterApplication(appAttempt.getAppAttemptId());
|
||||
}
|
||||
|
||||
if(!appAttempt.submissionContext.getUnmanagedAM()) {
|
||||
// Tell the launcher to cleanup.
|
||||
appAttempt.eventHandler.handle(new AMLauncherEvent(
|
||||
|
|
|
@ -23,26 +23,29 @@ import java.util.Map;
|
|||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.security.client.BaseClientToAMTokenSecretManager;
|
||||
|
||||
public class ClientToAMTokenSecretManagerInRM extends
|
||||
BaseClientToAMTokenSecretManager {
|
||||
|
||||
// Per application master-keys for managing client-tokens
|
||||
private Map<ApplicationId, SecretKey> masterKeys =
|
||||
new HashMap<ApplicationId, SecretKey>();
|
||||
private Map<ApplicationAttemptId, SecretKey> masterKeys =
|
||||
new HashMap<ApplicationAttemptId, SecretKey>();
|
||||
|
||||
public synchronized void registerApplication(ApplicationId applicationID) {
|
||||
this.masterKeys.put(applicationID, generateSecret());
|
||||
public synchronized void registerApplication(
|
||||
ApplicationAttemptId applicationAttemptID) {
|
||||
this.masterKeys.put(applicationAttemptID, generateSecret());
|
||||
}
|
||||
|
||||
public synchronized void unRegisterApplication(ApplicationId applicationID) {
|
||||
this.masterKeys.remove(applicationID);
|
||||
public synchronized void unRegisterApplication(
|
||||
ApplicationAttemptId applicationAttemptID) {
|
||||
this.masterKeys.remove(applicationAttemptID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized SecretKey getMasterKey(ApplicationId applicationID) {
|
||||
return this.masterKeys.get(applicationID);
|
||||
public synchronized SecretKey getMasterKey(
|
||||
ApplicationAttemptId applicationAttemptID) {
|
||||
return this.masterKeys.get(applicationAttemptID);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -338,7 +338,7 @@ public class TestApplicationACLs {
|
|||
Assert.assertEquals("Enemy should not see app rpc port!",
|
||||
-1, appReport.getRpcPort());
|
||||
Assert.assertEquals("Enemy should not see app client token!",
|
||||
UNAVAILABLE, appReport.getClientToken());
|
||||
null, appReport.getClientToken());
|
||||
Assert.assertEquals("Enemy should not see app diagnostics!",
|
||||
UNAVAILABLE, appReport.getDiagnostics());
|
||||
Assert.assertEquals("Enemy should not see app tracking url!",
|
||||
|
|
|
@ -18,10 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -276,7 +276,7 @@ public class TestClientRMService {
|
|||
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
|
||||
ApplicationId applicationId3, YarnConfiguration config, String queueName) {
|
||||
return new RMAppImpl(applicationId3, rmContext, config, null, null,
|
||||
queueName, null, null, yarnScheduler, null, System
|
||||
queueName, null, yarnScheduler, null , System
|
||||
.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationMaster;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ClientToken;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
|
@ -77,7 +78,7 @@ public abstract class MockAsm extends MockApps {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String getClientToken() {
|
||||
public ClientToken getClientToken() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
|
@ -127,7 +128,7 @@ public abstract class MockAsm extends MockApps {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setClientToken(String clientToken) {
|
||||
public void setClientToken(ClientToken clientToken) {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
|
|
|
@ -168,7 +168,6 @@ public class TestRMAppTransitions {
|
|||
Configuration conf = new YarnConfiguration();
|
||||
// ensure max retries set to known value
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, maxRetries);
|
||||
String clientTokenStr = "bogusstring";
|
||||
YarnScheduler scheduler = mock(YarnScheduler.class);
|
||||
ApplicationMasterService masterService =
|
||||
new ApplicationMasterService(rmContext, scheduler);
|
||||
|
@ -177,11 +176,10 @@ public class TestRMAppTransitions {
|
|||
submissionContext = new ApplicationSubmissionContextPBImpl();
|
||||
}
|
||||
|
||||
RMApp application = new RMAppImpl(applicationId, rmContext,
|
||||
conf, name, user,
|
||||
queue, submissionContext, clientTokenStr,
|
||||
scheduler,
|
||||
masterService, System.currentTimeMillis());
|
||||
RMApp application =
|
||||
new RMAppImpl(applicationId, rmContext, conf, name, user, queue,
|
||||
submissionContext, scheduler, masterService,
|
||||
System.currentTimeMillis());
|
||||
|
||||
testAppStartState(applicationId, user, name, queue, application);
|
||||
return application;
|
||||
|
|
|
@ -209,9 +209,9 @@ public class TestRMAppAttemptTransitions {
|
|||
unmanagedAM = false;
|
||||
|
||||
application = mock(RMApp.class);
|
||||
applicationAttempt =
|
||||
new RMAppAttemptImpl(applicationAttemptId, null, rmContext, scheduler,
|
||||
masterService, submissionContext, new Configuration());
|
||||
applicationAttempt =
|
||||
new RMAppAttemptImpl(applicationAttemptId, rmContext, scheduler,
|
||||
masterService, submissionContext, new Configuration());
|
||||
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
|
||||
when(application.getApplicationId()).thenReturn(applicationId);
|
||||
|
||||
|
|
|
@ -52,8 +52,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ClientToken;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
|
@ -67,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRMWithCustomAMLauncher;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -106,14 +109,14 @@ public class TestClientTokens {
|
|||
private static class CustomAM extends AbstractService implements
|
||||
CustomProtocol {
|
||||
|
||||
private final ApplicationId appId;
|
||||
private final ApplicationAttemptId appAttemptId;
|
||||
private final String secretKey;
|
||||
private InetSocketAddress address;
|
||||
private boolean pinged = false;
|
||||
|
||||
public CustomAM(ApplicationId appId, String secretKeyStr) {
|
||||
public CustomAM(ApplicationAttemptId appId, String secretKeyStr) {
|
||||
super("CustomAM");
|
||||
this.appId = appId;
|
||||
this.appAttemptId = appId;
|
||||
this.secretKey = secretKeyStr;
|
||||
}
|
||||
|
||||
|
@ -128,7 +131,7 @@ public class TestClientTokens {
|
|||
|
||||
ClientToAMTokenSecretManager secretManager = null;
|
||||
byte[] bytes = Base64.decodeBase64(this.secretKey);
|
||||
secretManager = new ClientToAMTokenSecretManager(this.appId, bytes);
|
||||
secretManager = new ClientToAMTokenSecretManager(this.appAttemptId, bytes);
|
||||
Server server;
|
||||
try {
|
||||
server =
|
||||
|
@ -216,7 +219,7 @@ public class TestClientTokens {
|
|||
GetApplicationReportResponse reportResponse =
|
||||
rm.getClientRMService().getApplicationReport(request);
|
||||
ApplicationReport appReport = reportResponse.getApplicationReport();
|
||||
String clientTokenEncoded = appReport.getClientToken();
|
||||
ClientToken clientToken = appReport.getClientToken();
|
||||
|
||||
// Wait till AM is 'launched'
|
||||
int waitTime = 0;
|
||||
|
@ -226,9 +229,11 @@ public class TestClientTokens {
|
|||
Assert.assertNotNull(containerManager.clientTokensSecret);
|
||||
|
||||
// Start the AM with the correct shared-secret.
|
||||
ApplicationAttemptId appAttemptId =
|
||||
app.getAppAttempts().keySet().iterator().next();
|
||||
Assert.assertNotNull(appAttemptId);
|
||||
final CustomAM am =
|
||||
new CustomAM(app.getApplicationId(),
|
||||
containerManager.clientTokensSecret);
|
||||
new CustomAM(appAttemptId, containerManager.clientTokensSecret);
|
||||
am.init(conf);
|
||||
am.start();
|
||||
|
||||
|
@ -249,21 +254,19 @@ public class TestClientTokens {
|
|||
|
||||
// Verify denial for a malicious user
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("me");
|
||||
Token<ClientTokenIdentifier> clientToken =
|
||||
new Token<ClientTokenIdentifier>();
|
||||
clientToken.decodeFromUrlString(clientTokenEncoded);
|
||||
// RPC layer client expects ip:port as service for tokens
|
||||
SecurityUtil.setTokenService(clientToken, am.address);
|
||||
Token<ClientTokenIdentifier> token =
|
||||
ProtoUtils.convertFromProtoFormat(clientToken, am.address);
|
||||
|
||||
// Malicious user, messes with appId
|
||||
ClientTokenIdentifier maliciousID =
|
||||
new ClientTokenIdentifier(BuilderUtils.newApplicationId(app
|
||||
.getApplicationId().getClusterTimestamp(), 42));
|
||||
new ClientTokenIdentifier(BuilderUtils.newApplicationAttemptId(
|
||||
BuilderUtils.newApplicationId(app.getApplicationId()
|
||||
.getClusterTimestamp(), 42), 43));
|
||||
|
||||
Token<ClientTokenIdentifier> maliciousToken =
|
||||
new Token<ClientTokenIdentifier>(maliciousID.getBytes(),
|
||||
clientToken.getPassword(), clientToken.getKind(),
|
||||
clientToken.getService());
|
||||
token.getPassword(), token.getKind(),
|
||||
token.getService());
|
||||
ugi.addToken(maliciousToken);
|
||||
|
||||
try {
|
||||
|
@ -297,7 +300,7 @@ public class TestClientTokens {
|
|||
|
||||
// Now for an authenticated user
|
||||
ugi = UserGroupInformation.createRemoteUser("me");
|
||||
ugi.addToken(clientToken);
|
||||
ugi.addToken(token);
|
||||
|
||||
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue