YARN-2615. Changed ClientToAMTokenIdentifier/RM(Timeline)DelegationTokenIdentifier to use protobuf as payload. Contributed by Junping Du
(cherry picked from commit ea26cc0b4a
)
This commit is contained in:
parent
4ba102bdc3
commit
43358be60b
|
@ -286,6 +286,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2562. Changed ContainerId#toString() to be more readable. (Tsuyoshi
|
YARN-2562. Changed ContainerId#toString() to be more readable. (Tsuyoshi
|
||||||
OZAWA via jianhe)
|
OZAWA via jianhe)
|
||||||
|
|
||||||
|
YARN-2615. Changed ClientToAMTokenIdentifier/RM(Timeline)DelegationTokenIdentifier
|
||||||
|
to use protobuf as payload. (Junping Du via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.io.DataInputStream;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
@ -33,7 +32,6 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.AMRMTokenIdentifierProto;
|
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.AMRMTokenIdentifierProto;
|
||||||
|
|
||||||
|
@ -80,9 +78,7 @@ public class AMRMTokenIdentifier extends TokenIdentifier {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFields(DataInput in) throws IOException {
|
public void readFields(DataInput in) throws IOException {
|
||||||
DataInputStream dis = (DataInputStream)in;
|
proto = AMRMTokenIdentifierProto.parseFrom((DataInputStream)in);
|
||||||
byte[] buffer = IOUtils.toByteArray(dis);
|
|
||||||
proto = AMRMTokenIdentifierProto.parseFrom(buffer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.io.DataInputStream;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -33,17 +32,14 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
|
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
|
||||||
|
|
||||||
import com.google.protobuf.TextFormat;
|
import com.google.protobuf.TextFormat;
|
||||||
|
@ -63,7 +59,6 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
public static final Text KIND = new Text("ContainerToken");
|
public static final Text KIND = new Text("ContainerToken");
|
||||||
|
|
||||||
private ContainerTokenIdentifierProto proto;
|
private ContainerTokenIdentifierProto proto;
|
||||||
private LogAggregationContext logAggregationContext;
|
|
||||||
|
|
||||||
public ContainerTokenIdentifier(ContainerId containerID,
|
public ContainerTokenIdentifier(ContainerId containerID,
|
||||||
String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
|
String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
|
||||||
|
@ -174,9 +169,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFields(DataInput in) throws IOException {
|
public void readFields(DataInput in) throws IOException {
|
||||||
DataInputStream dis = (DataInputStream)in;
|
proto = ContainerTokenIdentifierProto.parseFrom((DataInputStream)in);
|
||||||
byte[] buffer = IOUtils.toByteArray(dis);
|
|
||||||
proto = ContainerTokenIdentifierProto.parseFrom(buffer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,13 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.security;
|
package org.apache.hadoop.yarn.security;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
@ -33,11 +31,9 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.NMTokenIdentifierProto;
|
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.NMTokenIdentifierProto;
|
||||||
|
|
||||||
import com.google.protobuf.TextFormat;
|
import com.google.protobuf.TextFormat;
|
||||||
|
@ -103,9 +99,7 @@ public class NMTokenIdentifier extends TokenIdentifier {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFields(DataInput in) throws IOException {
|
public void readFields(DataInput in) throws IOException {
|
||||||
DataInputStream dis = (DataInputStream)in;
|
proto = NMTokenIdentifierProto.parseFrom((DataInputStream)in);
|
||||||
byte[] buffer = IOUtils.toByteArray(dis);
|
|
||||||
proto = NMTokenIdentifierProto.parseFrom(buffer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -19,18 +19,21 @@
|
||||||
package org.apache.hadoop.yarn.security.client;
|
package org.apache.hadoop.yarn.security.client;
|
||||||
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ClientToAMTokenIdentifierProto;
|
||||||
|
|
||||||
|
import com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
|
|
||||||
@Public
|
@Public
|
||||||
@Evolving
|
@Evolving
|
||||||
|
@ -38,8 +41,7 @@ public class ClientToAMTokenIdentifier extends TokenIdentifier {
|
||||||
|
|
||||||
public static final Text KIND_NAME = new Text("YARN_CLIENT_TOKEN");
|
public static final Text KIND_NAME = new Text("YARN_CLIENT_TOKEN");
|
||||||
|
|
||||||
private ApplicationAttemptId applicationAttemptId;
|
private ClientToAMTokenIdentifierProto proto;
|
||||||
private Text clientName = new Text();
|
|
||||||
|
|
||||||
// TODO: Add more information in the tokenID such that it is not
|
// TODO: Add more information in the tokenID such that it is not
|
||||||
// transferrable, more secure etc.
|
// transferrable, more secure etc.
|
||||||
|
@ -48,34 +50,40 @@ public class ClientToAMTokenIdentifier extends TokenIdentifier {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientToAMTokenIdentifier(ApplicationAttemptId id, String client) {
|
public ClientToAMTokenIdentifier(ApplicationAttemptId id, String client) {
|
||||||
this();
|
ClientToAMTokenIdentifierProto.Builder builder =
|
||||||
this.applicationAttemptId = id;
|
ClientToAMTokenIdentifierProto.newBuilder();
|
||||||
this.clientName = new Text(client);
|
if (id != null) {
|
||||||
|
builder.setAppAttemptId(((ApplicationAttemptIdPBImpl)id).getProto());
|
||||||
|
}
|
||||||
|
if (client != null) {
|
||||||
|
builder.setClientName(client);
|
||||||
|
}
|
||||||
|
proto = builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ApplicationAttemptId getApplicationAttemptID() {
|
public ApplicationAttemptId getApplicationAttemptID() {
|
||||||
return this.applicationAttemptId;
|
if (!proto.hasAppAttemptId()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getClientName() {
|
public String getClientName() {
|
||||||
return this.clientName.toString();
|
return proto.getClientName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ClientToAMTokenIdentifierProto getProto() {
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(DataOutput out) throws IOException {
|
public void write(DataOutput out) throws IOException {
|
||||||
out.writeLong(this.applicationAttemptId.getApplicationId()
|
out.write(proto.toByteArray());
|
||||||
.getClusterTimestamp());
|
|
||||||
out.writeInt(this.applicationAttemptId.getApplicationId().getId());
|
|
||||||
out.writeInt(this.applicationAttemptId.getAttemptId());
|
|
||||||
this.clientName.write(out);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFields(DataInput in) throws IOException {
|
public void readFields(DataInput in) throws IOException {
|
||||||
this.applicationAttemptId =
|
proto = ClientToAMTokenIdentifierProto.parseFrom((DataInputStream)in);
|
||||||
ApplicationAttemptId.newInstance(
|
|
||||||
ApplicationId.newInstance(in.readLong(), in.readInt()), in.readInt());
|
|
||||||
this.clientName.readFields(in);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -85,17 +93,30 @@ public class ClientToAMTokenIdentifier extends TokenIdentifier {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public UserGroupInformation getUser() {
|
public UserGroupInformation getUser() {
|
||||||
if (this.clientName == null) {
|
String clientName = getClientName();
|
||||||
|
if (clientName == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return UserGroupInformation.createRemoteUser(this.clientName.toString());
|
return UserGroupInformation.createRemoteUser(clientName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return getProto().hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@Override
|
||||||
public static class Renewer extends Token.TrivialRenewer {
|
public boolean equals(Object other) {
|
||||||
@Override
|
if (other == null)
|
||||||
protected Text getKind() {
|
return false;
|
||||||
return KIND_NAME;
|
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||||
|
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return TextFormat.shortDebugString(getProto());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,16 +29,13 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenRenewer;
|
import org.apache.hadoop.security.token.TokenRenewer;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
@ -48,13 +45,12 @@ import org.apache.hadoop.yarn.util.Records;
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Evolving
|
@Evolving
|
||||||
public class RMDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
|
public class RMDelegationTokenIdentifier extends YARNDelegationTokenIdentifier {
|
||||||
|
|
||||||
public static final Text KIND_NAME = new Text("RM_DELEGATION_TOKEN");
|
public static final Text KIND_NAME = new Text("RM_DELEGATION_TOKEN");
|
||||||
|
|
||||||
public RMDelegationTokenIdentifier() {
|
public RMDelegationTokenIdentifier(){}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new delegation token identifier
|
* Create a new delegation token identifier
|
||||||
* @param owner the effective username of the token owner
|
* @param owner the effective username of the token owner
|
||||||
|
|
|
@ -23,11 +23,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
|
||||||
|
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public class TimelineDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
|
public class TimelineDelegationTokenIdentifier extends YARNDelegationTokenIdentifier {
|
||||||
|
|
||||||
public static final Text KIND_NAME = new Text("TIMELINE_DELEGATION_TOKEN");
|
public static final Text KIND_NAME = new Text("TIMELINE_DELEGATION_TOKEN");
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,201 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.security.client;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.HadoopKerberosName;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto;
|
||||||
|
|
||||||
|
public abstract class YARNDelegationTokenIdentifier extends
|
||||||
|
AbstractDelegationTokenIdentifier {
|
||||||
|
|
||||||
|
YARNDelegationTokenIdentifierProto.Builder builder =
|
||||||
|
YARNDelegationTokenIdentifierProto.newBuilder();
|
||||||
|
|
||||||
|
public YARNDelegationTokenIdentifier() {}
|
||||||
|
|
||||||
|
public YARNDelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
|
||||||
|
if (owner != null) {
|
||||||
|
builder.setOwner(owner.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (renewer != null) {
|
||||||
|
HadoopKerberosName renewerKrbName = new HadoopKerberosName(renewer.toString());
|
||||||
|
try {
|
||||||
|
builder.setRenewer(renewerKrbName.getShortName());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (realUser != null) {
|
||||||
|
builder.setRealUser(realUser.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the username encoded in the token identifier
|
||||||
|
*
|
||||||
|
* @return the username or owner
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public UserGroupInformation getUser() {
|
||||||
|
String owner = getOwner() == null ? null : getOwner().toString();
|
||||||
|
String realUser = getRealUser() == null ? null: getRealUser().toString();
|
||||||
|
if ( (owner == null) || (owner.toString().isEmpty())) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
final UserGroupInformation realUgi;
|
||||||
|
final UserGroupInformation ugi;
|
||||||
|
if ((realUser == null) || (realUser.toString().isEmpty())
|
||||||
|
|| realUser.equals(owner)) {
|
||||||
|
ugi = realUgi = UserGroupInformation.createRemoteUser(owner.toString());
|
||||||
|
} else {
|
||||||
|
realUgi = UserGroupInformation.createRemoteUser(realUser.toString());
|
||||||
|
ugi = UserGroupInformation.createProxyUser(owner.toString(), realUgi);
|
||||||
|
}
|
||||||
|
realUgi.setAuthenticationMethod(AuthenticationMethod.TOKEN);
|
||||||
|
return ugi;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Text getOwner() {
|
||||||
|
String owner = builder.getOwner();
|
||||||
|
if (owner == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return new Text(owner);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Text getRenewer() {
|
||||||
|
String renewer = builder.getRenewer();
|
||||||
|
if (renewer == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return new Text(renewer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Text getRealUser() {
|
||||||
|
String realUser = builder.getRealUser();
|
||||||
|
if (realUser == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return new Text(realUser);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIssueDate(long issueDate) {
|
||||||
|
builder.setIssueDate(issueDate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getIssueDate() {
|
||||||
|
return builder.getIssueDate();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void setRenewDate(long renewDate) {
|
||||||
|
builder.setRenewDate(renewDate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getRenewDate() {
|
||||||
|
return builder.getRenewDate();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxDate(long maxDate) {
|
||||||
|
builder.setMaxDate(maxDate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxDate() {
|
||||||
|
return builder.getMaxDate();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSequenceNumber(int seqNum) {
|
||||||
|
builder.setSequenceNumber(seqNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getSequenceNumber() {
|
||||||
|
return builder.getSequenceNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMasterKeyId(int newId) {
|
||||||
|
builder.setMasterKeyId(newId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMasterKeyId() {
|
||||||
|
return builder.getMasterKeyId();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static boolean isEqual(Object a, Object b) {
|
||||||
|
return a == null ? b == null : a.equals(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj == this) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (obj instanceof YARNDelegationTokenIdentifier) {
|
||||||
|
YARNDelegationTokenIdentifier that = (YARNDelegationTokenIdentifier) obj;
|
||||||
|
return this.getSequenceNumber() == that.getSequenceNumber()
|
||||||
|
&& this.getIssueDate() == that.getIssueDate()
|
||||||
|
&& this.getMaxDate() == that.getMaxDate()
|
||||||
|
&& this.getMasterKeyId() == that.getMasterKeyId()
|
||||||
|
&& isEqual(this.getOwner(), that.getOwner())
|
||||||
|
&& isEqual(this.getRenewer(), that.getRenewer())
|
||||||
|
&& isEqual(this.getRealUser(), that.getRealUser());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return this.getSequenceNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
builder.mergeFrom((DataInputStream) in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
builder.build().writeTo((DataOutputStream)out);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder buffer = new StringBuilder();
|
||||||
|
buffer
|
||||||
|
.append("owner=" + getOwner() + ", renewer=" + getRenewer() + ", realUser="
|
||||||
|
+ getRealUser() + ", issueDate=" + getIssueDate()
|
||||||
|
+ ", maxDate=" + getMaxDate() + ", sequenceNumber="
|
||||||
|
+ getSequenceNumber() + ", masterKeyId="
|
||||||
|
+ getMasterKeyId());
|
||||||
|
return buffer.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -51,3 +51,19 @@ message ContainerTokenIdentifierProto {
|
||||||
optional LogAggregationContextProto logAggregationContext = 10;
|
optional LogAggregationContextProto logAggregationContext = 10;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message ClientToAMTokenIdentifierProto {
|
||||||
|
optional ApplicationAttemptIdProto appAttemptId = 1;
|
||||||
|
optional string clientName = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message YARNDelegationTokenIdentifierProto {
|
||||||
|
optional string owner = 1;
|
||||||
|
optional string renewer = 2;
|
||||||
|
optional string realUser = 3;
|
||||||
|
optional int64 issueDate = 4;
|
||||||
|
optional int64 maxDate = 5;
|
||||||
|
optional int32 sequenceNumber = 6;
|
||||||
|
optional int32 masterKeyId = 7 [default = -1];
|
||||||
|
optional int64 renewDate = 8;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,19 +17,26 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.security;
|
package org.apache.hadoop.yarn.security;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestYARNTokenIdentifier {
|
public class TestYARNTokenIdentifier {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNMTokenIdentifier() {
|
public void testNMTokenIdentifier() throws IOException {
|
||||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
||||||
ApplicationId.newInstance(1, 1), 1);
|
ApplicationId.newInstance(1, 1), 1);
|
||||||
NodeId nodeId = NodeId.newInstance("host0", 0);
|
NodeId nodeId = NodeId.newInstance("host0", 0);
|
||||||
|
@ -39,8 +46,12 @@ public class TestYARNTokenIdentifier {
|
||||||
NMTokenIdentifier token = new NMTokenIdentifier(
|
NMTokenIdentifier token = new NMTokenIdentifier(
|
||||||
appAttemptId, nodeId, applicationSubmitter, masterKeyId);
|
appAttemptId, nodeId, applicationSubmitter, masterKeyId);
|
||||||
|
|
||||||
NMTokenIdentifier anotherToken = new NMTokenIdentifier(
|
NMTokenIdentifier anotherToken = new NMTokenIdentifier();
|
||||||
appAttemptId, nodeId, applicationSubmitter, masterKeyId);
|
|
||||||
|
byte[] tokenContent = token.getBytes();
|
||||||
|
DataInputBuffer dib = new DataInputBuffer();
|
||||||
|
dib.reset(tokenContent, tokenContent.length);
|
||||||
|
anotherToken.readFields(dib);
|
||||||
|
|
||||||
// verify the whole record equals with original record
|
// verify the whole record equals with original record
|
||||||
Assert.assertEquals("Token is not the same after serialization " +
|
Assert.assertEquals("Token is not the same after serialization " +
|
||||||
|
@ -65,15 +76,18 @@ public class TestYARNTokenIdentifier {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAMRMTokenIdentifier() {
|
public void testAMRMTokenIdentifier() throws IOException {
|
||||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
||||||
ApplicationId.newInstance(1, 1), 1);
|
ApplicationId.newInstance(1, 1), 1);
|
||||||
int masterKeyId = 1;
|
int masterKeyId = 1;
|
||||||
|
|
||||||
AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, masterKeyId);
|
AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, masterKeyId);
|
||||||
|
|
||||||
AMRMTokenIdentifier anotherToken = new AMRMTokenIdentifier(
|
AMRMTokenIdentifier anotherToken = new AMRMTokenIdentifier();
|
||||||
appAttemptId, masterKeyId);
|
byte[] tokenContent = token.getBytes();
|
||||||
|
DataInputBuffer dib = new DataInputBuffer();
|
||||||
|
dib.reset(tokenContent, tokenContent.length);
|
||||||
|
anotherToken.readFields(dib);
|
||||||
|
|
||||||
// verify the whole record equals with original record
|
// verify the whole record equals with original record
|
||||||
Assert.assertEquals("Token is not the same after serialization " +
|
Assert.assertEquals("Token is not the same after serialization " +
|
||||||
|
@ -87,7 +101,35 @@ public class TestYARNTokenIdentifier {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testContainerTokenIdentifier() {
|
public void testClientToAMTokenIdentifier() throws IOException {
|
||||||
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
||||||
|
ApplicationId.newInstance(1, 1), 1);
|
||||||
|
|
||||||
|
String clientName = "user";
|
||||||
|
|
||||||
|
ClientToAMTokenIdentifier token = new ClientToAMTokenIdentifier(
|
||||||
|
appAttemptId, clientName);
|
||||||
|
|
||||||
|
ClientToAMTokenIdentifier anotherToken = new ClientToAMTokenIdentifier();
|
||||||
|
|
||||||
|
byte[] tokenContent = token.getBytes();
|
||||||
|
DataInputBuffer dib = new DataInputBuffer();
|
||||||
|
dib.reset(tokenContent, tokenContent.length);
|
||||||
|
anotherToken.readFields(dib);
|
||||||
|
|
||||||
|
// verify the whole record equals with original record
|
||||||
|
Assert.assertEquals("Token is not the same after serialization " +
|
||||||
|
"and deserialization.", token, anotherToken);
|
||||||
|
|
||||||
|
Assert.assertEquals("ApplicationAttemptId from proto is not the same with original token",
|
||||||
|
anotherToken.getApplicationAttemptID(), appAttemptId);
|
||||||
|
|
||||||
|
Assert.assertEquals("clientName from proto is not the same with original token",
|
||||||
|
anotherToken.getClientName(), clientName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerTokenIdentifier() throws IOException {
|
||||||
ContainerId containerID = ContainerId.newInstance(
|
ContainerId containerID = ContainerId.newInstance(
|
||||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(
|
ApplicationAttemptId.newInstance(ApplicationId.newInstance(
|
||||||
1, 1), 1), 1);
|
1, 1), 1), 1);
|
||||||
|
@ -104,9 +146,12 @@ public class TestYARNTokenIdentifier {
|
||||||
containerID, hostName, appSubmitter, r, expiryTimeStamp,
|
containerID, hostName, appSubmitter, r, expiryTimeStamp,
|
||||||
masterKeyId, rmIdentifier, priority, creationTime);
|
masterKeyId, rmIdentifier, priority, creationTime);
|
||||||
|
|
||||||
ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier(
|
ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier();
|
||||||
containerID, hostName, appSubmitter, r, expiryTimeStamp,
|
|
||||||
masterKeyId, rmIdentifier, priority, creationTime);
|
byte[] tokenContent = token.getBytes();
|
||||||
|
DataInputBuffer dib = new DataInputBuffer();
|
||||||
|
dib.reset(tokenContent, tokenContent.length);
|
||||||
|
anotherToken.readFields(dib);
|
||||||
|
|
||||||
// verify the whole record equals with original record
|
// verify the whole record equals with original record
|
||||||
Assert.assertEquals("Token is not the same after serialization " +
|
Assert.assertEquals("Token is not the same after serialization " +
|
||||||
|
@ -150,5 +195,113 @@ public class TestYARNTokenIdentifier {
|
||||||
|
|
||||||
Assert.assertNull(anotherToken.getLogAggregationContext());
|
Assert.assertNull(anotherToken.getLogAggregationContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMDelegationTokenIdentifier() throws IOException {
|
||||||
|
|
||||||
|
Text owner = new Text("user1");
|
||||||
|
Text renewer = new Text("user2");
|
||||||
|
Text realUser = new Text("user3");
|
||||||
|
long issueDate = 1;
|
||||||
|
long maxDate = 2;
|
||||||
|
int sequenceNumber = 3;
|
||||||
|
int masterKeyId = 4;
|
||||||
|
|
||||||
|
RMDelegationTokenIdentifier token =
|
||||||
|
new RMDelegationTokenIdentifier(owner, renewer, realUser);
|
||||||
|
token.setIssueDate(issueDate);
|
||||||
|
token.setMaxDate(maxDate);
|
||||||
|
token.setSequenceNumber(sequenceNumber);
|
||||||
|
token.setMasterKeyId(masterKeyId);
|
||||||
|
|
||||||
|
RMDelegationTokenIdentifier anotherToken = new RMDelegationTokenIdentifier();
|
||||||
|
|
||||||
|
byte[] tokenContent = token.getBytes();
|
||||||
|
DataInputBuffer dib = new DataInputBuffer();
|
||||||
|
dib.reset(tokenContent, tokenContent.length);
|
||||||
|
anotherToken.readFields(dib);
|
||||||
|
|
||||||
|
// verify the whole record equals with original record
|
||||||
|
Assert.assertEquals("Token is not the same after serialization " +
|
||||||
|
"and deserialization.", token, anotherToken);
|
||||||
|
|
||||||
|
Assert.assertEquals("owner from proto is not the same with original token",
|
||||||
|
anotherToken.getOwner(), owner);
|
||||||
|
|
||||||
|
Assert.assertEquals("renewer from proto is not the same with original token",
|
||||||
|
anotherToken.getRenewer(), renewer);
|
||||||
|
|
||||||
|
Assert.assertEquals("realUser from proto is not the same with original token",
|
||||||
|
anotherToken.getRealUser(), realUser);
|
||||||
|
|
||||||
|
Assert.assertEquals("issueDate from proto is not the same with original token",
|
||||||
|
anotherToken.getIssueDate(), issueDate);
|
||||||
|
|
||||||
|
Assert.assertEquals("maxDate from proto is not the same with original token",
|
||||||
|
anotherToken.getMaxDate(), maxDate);
|
||||||
|
|
||||||
|
Assert.assertEquals("sequenceNumber from proto is not the same with original token",
|
||||||
|
anotherToken.getSequenceNumber(), sequenceNumber);
|
||||||
|
|
||||||
|
Assert.assertEquals("masterKeyId from proto is not the same with original token",
|
||||||
|
anotherToken.getMasterKeyId(), masterKeyId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimelineDelegationTokenIdentifier() throws IOException {
|
||||||
|
|
||||||
|
Text owner = new Text("user1");
|
||||||
|
Text renewer = new Text("user2");
|
||||||
|
Text realUser = new Text("user3");
|
||||||
|
long issueDate = 1;
|
||||||
|
long maxDate = 2;
|
||||||
|
int sequenceNumber = 3;
|
||||||
|
int masterKeyId = 4;
|
||||||
|
long renewDate = 5;
|
||||||
|
|
||||||
|
TimelineDelegationTokenIdentifier token =
|
||||||
|
new TimelineDelegationTokenIdentifier(owner, renewer, realUser);
|
||||||
|
token.setIssueDate(issueDate);
|
||||||
|
token.setMaxDate(maxDate);
|
||||||
|
token.setSequenceNumber(sequenceNumber);
|
||||||
|
token.setMasterKeyId(masterKeyId);
|
||||||
|
token.setRenewDate(renewDate);
|
||||||
|
|
||||||
|
TimelineDelegationTokenIdentifier anotherToken =
|
||||||
|
new TimelineDelegationTokenIdentifier();
|
||||||
|
|
||||||
|
byte[] tokenContent = token.getBytes();
|
||||||
|
DataInputBuffer dib = new DataInputBuffer();
|
||||||
|
dib.reset(tokenContent, tokenContent.length);
|
||||||
|
anotherToken.readFields(dib);
|
||||||
|
|
||||||
|
// verify the whole record equals with original record
|
||||||
|
Assert.assertEquals("Token is not the same after serialization " +
|
||||||
|
"and deserialization.", token, anotherToken);
|
||||||
|
|
||||||
|
Assert.assertEquals("owner from proto is not the same with original token",
|
||||||
|
anotherToken.getOwner(), owner);
|
||||||
|
|
||||||
|
Assert.assertEquals("renewer from proto is not the same with original token",
|
||||||
|
anotherToken.getRenewer(), renewer);
|
||||||
|
|
||||||
|
Assert.assertEquals("realUser from proto is not the same with original token",
|
||||||
|
anotherToken.getRealUser(), realUser);
|
||||||
|
|
||||||
|
Assert.assertEquals("issueDate from proto is not the same with original token",
|
||||||
|
anotherToken.getIssueDate(), issueDate);
|
||||||
|
|
||||||
|
Assert.assertEquals("maxDate from proto is not the same with original token",
|
||||||
|
anotherToken.getMaxDate(), maxDate);
|
||||||
|
|
||||||
|
Assert.assertEquals("sequenceNumber from proto is not the same with original token",
|
||||||
|
anotherToken.getSequenceNumber(), sequenceNumber);
|
||||||
|
|
||||||
|
Assert.assertEquals("masterKeyId from proto is not the same with original token",
|
||||||
|
anotherToken.getMasterKeyId(), masterKeyId);
|
||||||
|
|
||||||
|
Assert.assertEquals("renewDate from proto is not the same with original token",
|
||||||
|
anotherToken.getRenewDate(), renewDate);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -291,6 +291,29 @@
|
||||||
<output>${project.build.directory}/generated-sources/java</output>
|
<output>${project.build.directory}/generated-sources/java</output>
|
||||||
</configuration>
|
</configuration>
|
||||||
</execution>
|
</execution>
|
||||||
|
<execution>
|
||||||
|
<id>compile-test-protoc</id>
|
||||||
|
<phase>generate-sources</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>protoc</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<protocVersion>${protobuf.version}</protocVersion>
|
||||||
|
<protocCommand>${protoc.path}</protocCommand>
|
||||||
|
<imports>
|
||||||
|
<param>${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto</param>
|
||||||
|
<param>${basedir}/../../hadoop-yarn-api/src/main/proto</param>
|
||||||
|
<param>${basedir}/src/test/proto</param>
|
||||||
|
</imports>
|
||||||
|
<source>
|
||||||
|
<directory>${basedir}/src/test/proto</directory>
|
||||||
|
<includes>
|
||||||
|
<include>test_client_tokens.proto</include>
|
||||||
|
</includes>
|
||||||
|
</source>
|
||||||
|
<output>${project.build.directory}/generated-sources/java</output>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
</executions>
|
</executions>
|
||||||
</plugin>
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
|
|
|
@ -372,7 +372,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
|
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
|
||||||
RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier();
|
RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier();
|
||||||
identifier.readFields(fsIn);
|
identifier.readFields(fsIn);
|
||||||
long renewDate = fsIn.readLong();
|
long renewDate = identifier.getRenewDate();
|
||||||
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
|
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
|
||||||
renewDate);
|
renewDate);
|
||||||
} else {
|
} else {
|
||||||
|
@ -505,8 +505,8 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
|
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
|
||||||
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
||||||
DataOutputStream fsOut = new DataOutputStream(os);
|
DataOutputStream fsOut = new DataOutputStream(os);
|
||||||
|
identifier.setRenewDate(renewDate);
|
||||||
identifier.write(fsOut);
|
identifier.write(fsOut);
|
||||||
fsOut.writeLong(renewDate);
|
|
||||||
if (isUpdate) {
|
if (isUpdate) {
|
||||||
LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber());
|
LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber());
|
||||||
updateFile(nodeCreatePath, os.toByteArray());
|
updateFile(nodeCreatePath, os.toByteArray());
|
||||||
|
|
|
@ -530,7 +530,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
RMDelegationTokenIdentifier identifier =
|
RMDelegationTokenIdentifier identifier =
|
||||||
new RMDelegationTokenIdentifier();
|
new RMDelegationTokenIdentifier();
|
||||||
identifier.readFields(fsIn);
|
identifier.readFields(fsIn);
|
||||||
long renewDate = fsIn.readLong();
|
long renewDate = identifier.getRenewDate();
|
||||||
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
|
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
|
||||||
renewDate);
|
renewDate);
|
||||||
}
|
}
|
||||||
|
@ -776,8 +776,8 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
DataOutputStream seqOut = new DataOutputStream(seqOs);
|
DataOutputStream seqOut = new DataOutputStream(seqOs);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
rmDTIdentifier.setRenewDate(renewDate);
|
||||||
rmDTIdentifier.write(tokenOut);
|
rmDTIdentifier.write(tokenOut);
|
||||||
tokenOut.writeLong(renewDate);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" +
|
LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" +
|
||||||
rmDTIdentifier.getSequenceNumber());
|
rmDTIdentifier.getSequenceNumber());
|
||||||
|
|
|
@ -0,0 +1,198 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnSecurityTestClientAMTokenProtos.RMDelegationTokenIdentifierForTestProto;
|
||||||
|
|
||||||
|
public class RMDelegationTokenIdentifierForTest extends
|
||||||
|
RMDelegationTokenIdentifier {
|
||||||
|
|
||||||
|
private RMDelegationTokenIdentifierForTestProto proto;
|
||||||
|
private RMDelegationTokenIdentifierForTestProto.Builder builder;
|
||||||
|
|
||||||
|
public RMDelegationTokenIdentifierForTest() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public RMDelegationTokenIdentifierForTest(
|
||||||
|
RMDelegationTokenIdentifier token, String message) {
|
||||||
|
builder = RMDelegationTokenIdentifierForTestProto.newBuilder();
|
||||||
|
if (token.getOwner() != null) {
|
||||||
|
builder.setOwner(token.getOwner().toString());
|
||||||
|
}
|
||||||
|
if (token.getRenewer() != null) {
|
||||||
|
builder.setRenewer(token.getRenewer().toString());
|
||||||
|
}
|
||||||
|
if (token.getRealUser() != null) {
|
||||||
|
builder.setRealUser(token.getRealUser().toString());
|
||||||
|
}
|
||||||
|
builder.setIssueDate(token.getIssueDate());
|
||||||
|
builder.setMaxDate(token.getMaxDate());
|
||||||
|
builder.setSequenceNumber(token.getSequenceNumber());
|
||||||
|
builder.setMasterKeyId(token.getMasterKeyId());
|
||||||
|
builder.setMessage(message);
|
||||||
|
proto = builder.build();
|
||||||
|
builder = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
out.write(proto.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
DataInputStream dis = (DataInputStream)in;
|
||||||
|
byte[] buffer = IOUtils.toByteArray(dis);
|
||||||
|
proto = RMDelegationTokenIdentifierForTestProto.parseFrom(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the username encoded in the token identifier
|
||||||
|
*
|
||||||
|
* @return the username or owner
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public UserGroupInformation getUser() {
|
||||||
|
String owner = getOwner().toString();
|
||||||
|
String realUser = getRealUser().toString();
|
||||||
|
if ( (owner == null) || (owner.toString().isEmpty())) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
final UserGroupInformation realUgi;
|
||||||
|
final UserGroupInformation ugi;
|
||||||
|
if ((realUser == null) || (realUser.toString().isEmpty())
|
||||||
|
|| realUser.equals(owner)) {
|
||||||
|
ugi = realUgi = UserGroupInformation.createRemoteUser(owner.toString());
|
||||||
|
} else {
|
||||||
|
realUgi = UserGroupInformation.createRemoteUser(realUser.toString());
|
||||||
|
ugi = UserGroupInformation.createProxyUser(owner.toString(), realUgi);
|
||||||
|
}
|
||||||
|
realUgi.setAuthenticationMethod(AuthenticationMethod.TOKEN);
|
||||||
|
return ugi;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Text getOwner() {
|
||||||
|
String owner = proto.getOwner();
|
||||||
|
if (owner == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return new Text(owner);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Text getRenewer() {
|
||||||
|
String renewer = proto.getRenewer();
|
||||||
|
if (renewer == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return new Text(renewer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Text getRealUser() {
|
||||||
|
String realUser = proto.getRealUser();
|
||||||
|
if (realUser == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return new Text(realUser);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIssueDate(long issueDate) {
|
||||||
|
RMDelegationTokenIdentifierForTestProto.Builder builder =
|
||||||
|
RMDelegationTokenIdentifierForTestProto.newBuilder(proto);
|
||||||
|
builder.setIssueDate(issueDate);
|
||||||
|
proto = builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getIssueDate() {
|
||||||
|
return proto.getIssueDate();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxDate(long maxDate) {
|
||||||
|
RMDelegationTokenIdentifierForTestProto.Builder builder =
|
||||||
|
RMDelegationTokenIdentifierForTestProto.newBuilder(proto);
|
||||||
|
builder.setMaxDate(maxDate);
|
||||||
|
proto = builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxDate() {
|
||||||
|
return proto.getMaxDate();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSequenceNumber(int seqNum) {
|
||||||
|
RMDelegationTokenIdentifierForTestProto.Builder builder =
|
||||||
|
RMDelegationTokenIdentifierForTestProto.newBuilder(proto);
|
||||||
|
builder.setSequenceNumber(seqNum);
|
||||||
|
proto = builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getSequenceNumber() {
|
||||||
|
return proto.getSequenceNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMasterKeyId(int newId) {
|
||||||
|
RMDelegationTokenIdentifierForTestProto.Builder builder =
|
||||||
|
RMDelegationTokenIdentifierForTestProto.newBuilder(proto);
|
||||||
|
builder.setMasterKeyId(newId);
|
||||||
|
proto = builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMasterKeyId() {
|
||||||
|
return proto.getMasterKeyId();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMessage() {
|
||||||
|
return proto.getMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj == this) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (obj instanceof RMDelegationTokenIdentifierForTest) {
|
||||||
|
RMDelegationTokenIdentifierForTest that = (RMDelegationTokenIdentifierForTest) obj;
|
||||||
|
return this.getSequenceNumber() == that.getSequenceNumber()
|
||||||
|
&& this.getIssueDate() == that.getIssueDate()
|
||||||
|
&& this.getMaxDate() == that.getMaxDate()
|
||||||
|
&& this.getMasterKeyId() == that.getMasterKeyId()
|
||||||
|
&& isEqual(this.getOwner(), that.getOwner())
|
||||||
|
&& isEqual(this.getRenewer(), that.getRenewer())
|
||||||
|
&& isEqual(this.getRealUser(), that.getRealUser())
|
||||||
|
&& isEqual(this.getMessage(), that.getMessage());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return this.getSequenceNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -43,6 +43,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
@ -221,8 +222,51 @@ public class TestClientRMTokens {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
} catch (YarnException e) {
|
} catch (YarnException e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test new version token
|
||||||
|
// Stop the existing proxy, start another.
|
||||||
|
if (clientRMWithDT != null) {
|
||||||
|
RPC.stopProxy(clientRMWithDT);
|
||||||
|
clientRMWithDT = null;
|
||||||
|
}
|
||||||
|
token = getDelegationToken(loggedInUser, clientRMService,
|
||||||
|
loggedInUser.getShortUserName());
|
||||||
|
|
||||||
|
byte[] tokenIdentifierContent = token.getIdentifier().array();
|
||||||
|
RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier();
|
||||||
|
|
||||||
|
DataInputBuffer dib = new DataInputBuffer();
|
||||||
|
dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
|
||||||
|
tokenIdentifier.readFields(dib);
|
||||||
|
|
||||||
|
// Construct new version RMDelegationTokenIdentifier with additional field
|
||||||
|
RMDelegationTokenIdentifierForTest newVersionTokenIdentifier =
|
||||||
|
new RMDelegationTokenIdentifierForTest(tokenIdentifier, "message");
|
||||||
|
|
||||||
|
Token<RMDelegationTokenIdentifier> newRMDTtoken =
|
||||||
|
new Token<RMDelegationTokenIdentifier>(newVersionTokenIdentifier,
|
||||||
|
rmDtSecretManager);
|
||||||
|
org.apache.hadoop.yarn.api.records.Token newToken =
|
||||||
|
BuilderUtils.newDelegationToken(
|
||||||
|
newRMDTtoken.getIdentifier(),
|
||||||
|
newRMDTtoken.getKind().toString(),
|
||||||
|
newRMDTtoken.getPassword(),
|
||||||
|
newRMDTtoken.getService().toString()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Now try talking to RMService using the new version delegation token
|
||||||
|
clientRMWithDT = getClientRMProtocolWithDT(newToken,
|
||||||
|
clientRMService.getBindAddress(), "loginuser3", conf);
|
||||||
|
|
||||||
|
request = Records.newRecord(GetNewApplicationRequest.class);
|
||||||
|
|
||||||
|
try {
|
||||||
|
clientRMWithDT.getNewApplication(request);
|
||||||
|
} catch (IOException e) {
|
||||||
|
fail("Unexpected exception" + e);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
fail("Unexpected exception" + e);
|
||||||
|
}
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
rmDtSecretManager.stopThreads();
|
rmDtSecretManager.stopThreads();
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnSecurityTestClientAMTokenProtos.ClientToAMTokenIdentifierForTestProto;
|
||||||
|
|
||||||
|
import com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
|
public class ClientToAMTokenIdentifierForTest extends ClientToAMTokenIdentifier {
|
||||||
|
|
||||||
|
private ClientToAMTokenIdentifierForTestProto proto;
|
||||||
|
|
||||||
|
public ClientToAMTokenIdentifierForTest() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClientToAMTokenIdentifierForTest(
|
||||||
|
ClientToAMTokenIdentifier tokenIdentifier, String message) {
|
||||||
|
ClientToAMTokenIdentifierForTestProto.Builder builder =
|
||||||
|
ClientToAMTokenIdentifierForTestProto.newBuilder();
|
||||||
|
builder.setAppAttemptId(tokenIdentifier.getProto().getAppAttemptId());
|
||||||
|
builder.setClientName(tokenIdentifier.getProto().getClientName());
|
||||||
|
builder.setMessage(message);
|
||||||
|
proto = builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ApplicationAttemptId getApplicationAttemptID() {
|
||||||
|
if (!proto.hasAppAttemptId()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getClientName() {
|
||||||
|
return proto.getClientName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
out.write(proto.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
DataInputStream dis = (DataInputStream)in;
|
||||||
|
byte[] buffer = IOUtils.toByteArray(dis);
|
||||||
|
proto = ClientToAMTokenIdentifierForTestProto.parseFrom(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UserGroupInformation getUser() {
|
||||||
|
String clientName = getClientName();
|
||||||
|
if (clientName == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return UserGroupInformation.createRemoteUser(clientName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return getNewProto().hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (other == null)
|
||||||
|
return false;
|
||||||
|
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||||
|
return this.getNewProto().equals(this.getClass().cast(other).getNewProto());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClientToAMTokenIdentifierForTestProto getNewProto() {
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return TextFormat.shortDebugString(getNewProto());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -32,6 +32,7 @@ import java.lang.annotation.Annotation;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
import javax.security.sasl.SaslException;
|
import javax.security.sasl.SaslException;
|
||||||
|
|
||||||
|
@ -39,6 +40,7 @@ import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
@ -129,6 +131,7 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
||||||
private final byte[] secretKey;
|
private final byte[] secretKey;
|
||||||
private InetSocketAddress address;
|
private InetSocketAddress address;
|
||||||
private boolean pinged = false;
|
private boolean pinged = false;
|
||||||
|
private ClientToAMTokenSecretManager secretMgr;
|
||||||
|
|
||||||
public CustomAM(ApplicationAttemptId appId, byte[] secretKey) {
|
public CustomAM(ApplicationAttemptId appId, byte[] secretKey) {
|
||||||
super("CustomAM");
|
super("CustomAM");
|
||||||
|
@ -140,6 +143,10 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
||||||
public void ping() throws YarnException, IOException {
|
public void ping() throws YarnException, IOException {
|
||||||
this.pinged = true;
|
this.pinged = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
|
||||||
|
return secretMgr;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
|
@ -147,12 +154,13 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
||||||
|
|
||||||
Server server;
|
Server server;
|
||||||
try {
|
try {
|
||||||
|
secretMgr = new ClientToAMTokenSecretManager(
|
||||||
|
this.appAttemptId, secretKey);
|
||||||
server =
|
server =
|
||||||
new RPC.Builder(conf)
|
new RPC.Builder(conf)
|
||||||
.setProtocol(CustomProtocol.class)
|
.setProtocol(CustomProtocol.class)
|
||||||
.setNumHandlers(1)
|
.setNumHandlers(1)
|
||||||
.setSecretManager(
|
.setSecretManager(secretMgr)
|
||||||
new ClientToAMTokenSecretManager(this.appAttemptId, secretKey))
|
|
||||||
.setInstance(this).build();
|
.setInstance(this).build();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new YarnRuntimeException(e);
|
throw new YarnRuntimeException(e);
|
||||||
|
@ -279,6 +287,10 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
||||||
|
|
||||||
// Now for an authenticated user
|
// Now for an authenticated user
|
||||||
verifyValidToken(conf, am, token);
|
verifyValidToken(conf, am, token);
|
||||||
|
|
||||||
|
// Verify for a new version token
|
||||||
|
verifyNewVersionToken(conf, am, token, rm);
|
||||||
|
|
||||||
|
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
@ -352,6 +364,33 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyNewVersionToken(final Configuration conf, final CustomAM am,
|
||||||
|
Token<ClientToAMTokenIdentifier> token, MockRM rm) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
UserGroupInformation ugi;
|
||||||
|
ugi = UserGroupInformation.createRemoteUser("me");
|
||||||
|
|
||||||
|
Token<ClientToAMTokenIdentifier> newToken =
|
||||||
|
new Token<ClientToAMTokenIdentifier>(
|
||||||
|
new ClientToAMTokenIdentifierForTest(token.decodeIdentifier(), "message"),
|
||||||
|
am.getClientToAMTokenSecretManager());
|
||||||
|
newToken.setService(token.getService());
|
||||||
|
|
||||||
|
ugi.addToken(newToken);
|
||||||
|
|
||||||
|
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
CustomProtocol client =
|
||||||
|
(CustomProtocol) RPC.getProxy(CustomProtocol.class, 1L, am.address,
|
||||||
|
conf);
|
||||||
|
client.ping();
|
||||||
|
Assert.assertTrue(am.pinged);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private void verifyValidToken(final Configuration conf, final CustomAM am,
|
private void verifyValidToken(final Configuration conf, final CustomAM am,
|
||||||
Token<ClientToAMTokenIdentifier> token) throws IOException,
|
Token<ClientToAMTokenIdentifier> token) throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
option java_package = "org.apache.hadoop.yarn.proto";
|
||||||
|
option java_outer_classname = "YarnSecurityTestClientAMTokenProtos";
|
||||||
|
option java_generic_services = true;
|
||||||
|
option java_generate_equals_and_hash = true;
|
||||||
|
package hadoop.yarn;
|
||||||
|
|
||||||
|
import "yarn_protos.proto";
|
||||||
|
|
||||||
|
message ClientToAMTokenIdentifierForTestProto {
|
||||||
|
optional ApplicationAttemptIdProto appAttemptId = 1;
|
||||||
|
optional string clientName = 2;
|
||||||
|
optional string message = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RMDelegationTokenIdentifierForTestProto {
|
||||||
|
optional string owner = 1;
|
||||||
|
optional string renewer = 2;
|
||||||
|
optional string realUser = 3;
|
||||||
|
optional int64 issueDate = 4;
|
||||||
|
optional int64 maxDate = 5;
|
||||||
|
optional int32 sequenceNumber = 6;
|
||||||
|
optional int32 masterKeyId = 7 [default = -1];
|
||||||
|
optional int64 renewDate = 8;
|
||||||
|
optional string message = 9;
|
||||||
|
}
|
Loading…
Reference in New Issue