YARN-2615. Changed ClientToAMTokenIdentifier/RM(Timeline)DelegationTokenIdentifier to use protobuf as payload. Contributed by Junping Du
(cherry picked from commit ea26cc0b4a
This commit is contained in:
@ -256,6 +256,9 @@ Release 2.6.0 - UNRELEASED
YARN-2562. Changed ContainerId#toString() to be more readable. (Tsuyoshi
OZAWA via jianhe)
YARN-2615. Changed ClientToAMTokenIdentifier/RM(Timeline)DelegationTokenIdentifier
to use protobuf as payload. (Junping Du via jianhe)
@ -23,7 +23,6 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -33,7 +32,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.AMRMTokenIdentifierProto;
@ -80,9 +78,7 @@ public class AMRMTokenIdentifier extends TokenIdentifier {
public void readFields(DataInput in) throws IOException {
DataInputStream dis = (DataInputStream)in;
byte[] buffer = IOUtils.toByteArray(dis);
proto = AMRMTokenIdentifierProto.parseFrom(buffer);
proto = AMRMTokenIdentifierProto.parseFrom((DataInputStream)in);
@ -23,7 +23,6 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -33,17 +32,14 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
import com.google.protobuf.TextFormat;
@ -63,7 +59,6 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
public static final Text KIND = new Text("ContainerToken");
private ContainerTokenIdentifierProto proto;
private LogAggregationContext logAggregationContext;
public ContainerTokenIdentifier(ContainerId containerID,
String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
@ -174,9 +169,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
public void readFields(DataInput in) throws IOException {
DataInputStream dis = (DataInputStream)in;
byte[] buffer = IOUtils.toByteArray(dis);
proto = ContainerTokenIdentifierProto.parseFrom(buffer);
proto = ContainerTokenIdentifierProto.parseFrom((DataInputStream)in);
@ -18,13 +18,11 @@
package org.apache.hadoop.yarn.security;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -33,11 +31,9 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.NMTokenIdentifierProto;
import com.google.protobuf.TextFormat;
@ -103,9 +99,7 @@ public class NMTokenIdentifier extends TokenIdentifier {
public void readFields(DataInput in) throws IOException {
DataInputStream dis = (DataInputStream)in;
byte[] buffer = IOUtils.toByteArray(dis);
proto = NMTokenIdentifierProto.parseFrom(buffer);
proto = NMTokenIdentifierProto.parseFrom((DataInputStream)in);
@ -19,18 +19,21 @@
package org.apache.hadoop.yarn.security.client;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ClientToAMTokenIdentifierProto;
import com.google.protobuf.TextFormat;
@ -38,8 +41,7 @@ public class ClientToAMTokenIdentifier extends TokenIdentifier {
public static final Text KIND_NAME = new Text("YARN_CLIENT_TOKEN");
private ApplicationAttemptId applicationAttemptId;
private Text clientName = new Text();
private ClientToAMTokenIdentifierProto proto;
// TODO: Add more information in the tokenID such that it is not
// transferrable, more secure etc.
@ -48,34 +50,40 @@ public class ClientToAMTokenIdentifier extends TokenIdentifier {
public ClientToAMTokenIdentifier(ApplicationAttemptId id, String client) {
this.applicationAttemptId = id;
this.clientName = new Text(client);
ClientToAMTokenIdentifierProto.Builder builder =
if (id != null) {
if (client != null) {
proto = builder.build();
public ApplicationAttemptId getApplicationAttemptID() {
return this.applicationAttemptId;
if (!proto.hasAppAttemptId()) {
return null;
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
public String getClientName() {
return this.clientName.toString();
return proto.getClientName();
public ClientToAMTokenIdentifierProto getProto() {
return proto;
public void write(DataOutput out) throws IOException {
public void readFields(DataInput in) throws IOException {
this.applicationAttemptId =
ApplicationId.newInstance(in.readLong(), in.readInt()), in.readInt());
proto = ClientToAMTokenIdentifierProto.parseFrom((DataInputStream)in);
@ -85,17 +93,30 @@ public class ClientToAMTokenIdentifier extends TokenIdentifier {
public UserGroupInformation getUser() {
if (this.clientName == null) {
String clientName = getClientName();
if (clientName == null) {
return null;
return UserGroupInformation.createRemoteUser(this.clientName.toString());
return UserGroupInformation.createRemoteUser(clientName);
public static class Renewer extends Token.TrivialRenewer {
protected Text getKind() {
return KIND_NAME;
public int hashCode() {
return getProto().hashCode();
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
return false;
public String toString() {
return TextFormat.shortDebugString(getProto());
@ -29,16 +29,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
@ -48,12 +45,11 @@ import org.apache.hadoop.yarn.util.Records;
public class RMDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
public class RMDelegationTokenIdentifier extends YARNDelegationTokenIdentifier {
public static final Text KIND_NAME = new Text("RM_DELEGATION_TOKEN");
public RMDelegationTokenIdentifier() {
public RMDelegationTokenIdentifier(){}
* Create a new delegation token identifier
@ -23,11 +23,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
public class TimelineDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
public class TimelineDelegationTokenIdentifier extends YARNDelegationTokenIdentifier {
public static final Text KIND_NAME = new Text("TIMELINE_DELEGATION_TOKEN");
@ -0,0 +1,201 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
package org.apache.hadoop.yarn.security.client;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto;
public abstract class YARNDelegationTokenIdentifier extends
AbstractDelegationTokenIdentifier {
YARNDelegationTokenIdentifierProto.Builder builder =
public YARNDelegationTokenIdentifier() {}
public YARNDelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
if (owner != null) {
if (renewer != null) {
HadoopKerberosName renewerKrbName = new HadoopKerberosName(renewer.toString());
try {
} catch (IOException e) {
throw new RuntimeException(e);
if (realUser != null) {
* Get the username encoded in the token identifier
* @return the username or owner
public UserGroupInformation getUser() {
String owner = getOwner() == null ? null : getOwner().toString();
String realUser = getRealUser() == null ? null: getRealUser().toString();
if ( (owner == null) || (owner.toString().isEmpty())) {
return null;
final UserGroupInformation realUgi;
final UserGroupInformation ugi;
if ((realUser == null) || (realUser.toString().isEmpty())
|| realUser.equals(owner)) {
ugi = realUgi = UserGroupInformation.createRemoteUser(owner.toString());
} else {
realUgi = UserGroupInformation.createRemoteUser(realUser.toString());
ugi = UserGroupInformation.createProxyUser(owner.toString(), realUgi);
return ugi;
public Text getOwner() {
String owner = builder.getOwner();
if (owner == null) {
return null;
} else {
return new Text(owner);
public Text getRenewer() {
String renewer = builder.getRenewer();
if (renewer == null) {
return null;
} else {
return new Text(renewer);
public Text getRealUser() {
String realUser = builder.getRealUser();
if (realUser == null) {
return null;
} else {
return new Text(realUser);
public void setIssueDate(long issueDate) {
public long getIssueDate() {
return builder.getIssueDate();
public void setRenewDate(long renewDate) {
public long getRenewDate() {
return builder.getRenewDate();
public void setMaxDate(long maxDate) {
public long getMaxDate() {
return builder.getMaxDate();
public void setSequenceNumber(int seqNum) {
public int getSequenceNumber() {
return builder.getSequenceNumber();
public void setMasterKeyId(int newId) {
public int getMasterKeyId() {
return builder.getMasterKeyId();
protected static boolean isEqual(Object a, Object b) {
return a == null ? b == null : a.equals(b);
public boolean equals(Object obj) {
if (obj == this) {
return true;
if (obj instanceof YARNDelegationTokenIdentifier) {
YARNDelegationTokenIdentifier that = (YARNDelegationTokenIdentifier) obj;
return this.getSequenceNumber() == that.getSequenceNumber()
&& this.getIssueDate() == that.getIssueDate()
&& this.getMaxDate() == that.getMaxDate()
&& this.getMasterKeyId() == that.getMasterKeyId()
&& isEqual(this.getOwner(), that.getOwner())
&& isEqual(this.getRenewer(), that.getRenewer())
&& isEqual(this.getRealUser(), that.getRealUser());
return false;
public int hashCode() {
return this.getSequenceNumber();
public void readFields(DataInput in) throws IOException {
builder.mergeFrom((DataInputStream) in);
public void write(DataOutput out) throws IOException {
public String toString() {
StringBuilder buffer = new StringBuilder();
.append("owner=" + getOwner() + ", renewer=" + getRenewer() + ", realUser="
+ getRealUser() + ", issueDate=" + getIssueDate()
+ ", maxDate=" + getMaxDate() + ", sequenceNumber="
+ getSequenceNumber() + ", masterKeyId="
+ getMasterKeyId());
return buffer.toString();
@ -51,3 +51,19 @@ message ContainerTokenIdentifierProto {
optional LogAggregationContextProto logAggregationContext = 10;
message ClientToAMTokenIdentifierProto {
optional ApplicationAttemptIdProto appAttemptId = 1;
optional string clientName = 2;
message YARNDelegationTokenIdentifierProto {
optional string owner = 1;
optional string renewer = 2;
optional string realUser = 3;
optional int64 issueDate = 4;
optional int64 maxDate = 5;
optional int32 sequenceNumber = 6;
optional int32 masterKeyId = 7 [default = -1];
optional int64 renewDate = 8;
@ -17,19 +17,26 @@
package org.apache.hadoop.yarn.security;
import java.io.IOException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.junit.Assert;
import org.junit.Test;
public class TestYARNTokenIdentifier {
public void testNMTokenIdentifier() {
public void testNMTokenIdentifier() throws IOException {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1, 1), 1);
NodeId nodeId = NodeId.newInstance("host0", 0);
@ -39,8 +46,12 @@ public class TestYARNTokenIdentifier {
NMTokenIdentifier token = new NMTokenIdentifier(
appAttemptId, nodeId, applicationSubmitter, masterKeyId);
NMTokenIdentifier anotherToken = new NMTokenIdentifier(
appAttemptId, nodeId, applicationSubmitter, masterKeyId);
NMTokenIdentifier anotherToken = new NMTokenIdentifier();
byte[] tokenContent = token.getBytes();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
@ -65,15 +76,18 @@ public class TestYARNTokenIdentifier {
public void testAMRMTokenIdentifier() {
public void testAMRMTokenIdentifier() throws IOException {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1, 1), 1);
int masterKeyId = 1;
AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, masterKeyId);
AMRMTokenIdentifier anotherToken = new AMRMTokenIdentifier(
appAttemptId, masterKeyId);
AMRMTokenIdentifier anotherToken = new AMRMTokenIdentifier();
byte[] tokenContent = token.getBytes();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
@ -87,7 +101,35 @@ public class TestYARNTokenIdentifier {
public void testContainerTokenIdentifier() {
public void testClientToAMTokenIdentifier() throws IOException {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1, 1), 1);
String clientName = "user";
ClientToAMTokenIdentifier token = new ClientToAMTokenIdentifier(
appAttemptId, clientName);
ClientToAMTokenIdentifier anotherToken = new ClientToAMTokenIdentifier();
byte[] tokenContent = token.getBytes();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
"and deserialization.", token, anotherToken);
Assert.assertEquals("ApplicationAttemptId from proto is not the same with original token",
anotherToken.getApplicationAttemptID(), appAttemptId);
Assert.assertEquals("clientName from proto is not the same with original token",
anotherToken.getClientName(), clientName);
public void testContainerTokenIdentifier() throws IOException {
ContainerId containerID = ContainerId.newInstance(
1, 1), 1), 1);
@ -104,9 +146,12 @@ public class TestYARNTokenIdentifier {
containerID, hostName, appSubmitter, r, expiryTimeStamp,
masterKeyId, rmIdentifier, priority, creationTime);
ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier(
containerID, hostName, appSubmitter, r, expiryTimeStamp,
masterKeyId, rmIdentifier, priority, creationTime);
ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier();
byte[] tokenContent = token.getBytes();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
@ -151,4 +196,112 @@ public class TestYARNTokenIdentifier {
public void testRMDelegationTokenIdentifier() throws IOException {
Text owner = new Text("user1");
Text renewer = new Text("user2");
Text realUser = new Text("user3");
long issueDate = 1;
long maxDate = 2;
int sequenceNumber = 3;
int masterKeyId = 4;
RMDelegationTokenIdentifier token =
new RMDelegationTokenIdentifier(owner, renewer, realUser);
RMDelegationTokenIdentifier anotherToken = new RMDelegationTokenIdentifier();
byte[] tokenContent = token.getBytes();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
"and deserialization.", token, anotherToken);
Assert.assertEquals("owner from proto is not the same with original token",
anotherToken.getOwner(), owner);
Assert.assertEquals("renewer from proto is not the same with original token",
anotherToken.getRenewer(), renewer);
Assert.assertEquals("realUser from proto is not the same with original token",
anotherToken.getRealUser(), realUser);
Assert.assertEquals("issueDate from proto is not the same with original token",
anotherToken.getIssueDate(), issueDate);
Assert.assertEquals("maxDate from proto is not the same with original token",
anotherToken.getMaxDate(), maxDate);
Assert.assertEquals("sequenceNumber from proto is not the same with original token",
anotherToken.getSequenceNumber(), sequenceNumber);
Assert.assertEquals("masterKeyId from proto is not the same with original token",
anotherToken.getMasterKeyId(), masterKeyId);
public void testTimelineDelegationTokenIdentifier() throws IOException {
Text owner = new Text("user1");
Text renewer = new Text("user2");
Text realUser = new Text("user3");
long issueDate = 1;
long maxDate = 2;
int sequenceNumber = 3;
int masterKeyId = 4;
long renewDate = 5;
TimelineDelegationTokenIdentifier token =
new TimelineDelegationTokenIdentifier(owner, renewer, realUser);
TimelineDelegationTokenIdentifier anotherToken =
new TimelineDelegationTokenIdentifier();
byte[] tokenContent = token.getBytes();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
"and deserialization.", token, anotherToken);
Assert.assertEquals("owner from proto is not the same with original token",
anotherToken.getOwner(), owner);
Assert.assertEquals("renewer from proto is not the same with original token",
anotherToken.getRenewer(), renewer);
Assert.assertEquals("realUser from proto is not the same with original token",
anotherToken.getRealUser(), realUser);
Assert.assertEquals("issueDate from proto is not the same with original token",
anotherToken.getIssueDate(), issueDate);
Assert.assertEquals("maxDate from proto is not the same with original token",
anotherToken.getMaxDate(), maxDate);
Assert.assertEquals("sequenceNumber from proto is not the same with original token",
anotherToken.getSequenceNumber(), sequenceNumber);
Assert.assertEquals("masterKeyId from proto is not the same with original token",
anotherToken.getMasterKeyId(), masterKeyId);
Assert.assertEquals("renewDate from proto is not the same with original token",
anotherToken.getRenewDate(), renewDate);
@ -291,6 +291,29 @@
@ -372,7 +372,7 @@ public class FileSystemRMStateStore extends RMStateStore {
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier();
long renewDate = fsIn.readLong();
long renewDate = identifier.getRenewDate();
} else {
@ -505,8 +505,8 @@ public class FileSystemRMStateStore extends RMStateStore {
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
if (isUpdate) {
LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber());
updateFile(nodeCreatePath, os.toByteArray());
@ -530,7 +530,7 @@ public class ZKRMStateStore extends RMStateStore {
RMDelegationTokenIdentifier identifier =
new RMDelegationTokenIdentifier();
long renewDate = fsIn.readLong();
long renewDate = identifier.getRenewDate();
@ -776,8 +776,8 @@ public class ZKRMStateStore extends RMStateStore {
DataOutputStream seqOut = new DataOutputStream(seqOs);
try {
if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" +
@ -0,0 +1,198 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.proto.YarnSecurityTestClientAMTokenProtos.RMDelegationTokenIdentifierForTestProto;
public class RMDelegationTokenIdentifierForTest extends
RMDelegationTokenIdentifier {
private RMDelegationTokenIdentifierForTestProto proto;
private RMDelegationTokenIdentifierForTestProto.Builder builder;
public RMDelegationTokenIdentifierForTest() {
public RMDelegationTokenIdentifierForTest(
RMDelegationTokenIdentifier token, String message) {
builder = RMDelegationTokenIdentifierForTestProto.newBuilder();
if (token.getOwner() != null) {
if (token.getRenewer() != null) {
if (token.getRealUser() != null) {
proto = builder.build();
builder = null;
public void write(DataOutput out) throws IOException {
public void readFields(DataInput in) throws IOException {
DataInputStream dis = (DataInputStream)in;
byte[] buffer = IOUtils.toByteArray(dis);
proto = RMDelegationTokenIdentifierForTestProto.parseFrom(buffer);
* Get the username encoded in the token identifier
* @return the username or owner
public UserGroupInformation getUser() {
String owner = getOwner().toString();
String realUser = getRealUser().toString();
if ( (owner == null) || (owner.toString().isEmpty())) {
return null;
final UserGroupInformation realUgi;
final UserGroupInformation ugi;
if ((realUser == null) || (realUser.toString().isEmpty())
|| realUser.equals(owner)) {
ugi = realUgi = UserGroupInformation.createRemoteUser(owner.toString());
} else {
realUgi = UserGroupInformation.createRemoteUser(realUser.toString());
ugi = UserGroupInformation.createProxyUser(owner.toString(), realUgi);
return ugi;
public Text getOwner() {
String owner = proto.getOwner();
if (owner == null) {
return null;
} else {
return new Text(owner);
public Text getRenewer() {
String renewer = proto.getRenewer();
if (renewer == null) {
return null;
} else {
return new Text(renewer);
public Text getRealUser() {
String realUser = proto.getRealUser();
if (realUser == null) {
return null;
} else {
return new Text(realUser);
public void setIssueDate(long issueDate) {
RMDelegationTokenIdentifierForTestProto.Builder builder =
proto = builder.build();
public long getIssueDate() {
return proto.getIssueDate();
public void setMaxDate(long maxDate) {
RMDelegationTokenIdentifierForTestProto.Builder builder =
proto = builder.build();
public long getMaxDate() {
return proto.getMaxDate();
public void setSequenceNumber(int seqNum) {
RMDelegationTokenIdentifierForTestProto.Builder builder =
proto = builder.build();
public int getSequenceNumber() {
return proto.getSequenceNumber();
public void setMasterKeyId(int newId) {
RMDelegationTokenIdentifierForTestProto.Builder builder =
proto = builder.build();
public int getMasterKeyId() {
return proto.getMasterKeyId();
public String getMessage() {
return proto.getMessage();
public boolean equals(Object obj) {
if (obj == this) {
return true;
if (obj instanceof RMDelegationTokenIdentifierForTest) {
RMDelegationTokenIdentifierForTest that = (RMDelegationTokenIdentifierForTest) obj;
return this.getSequenceNumber() == that.getSequenceNumber()
&& this.getIssueDate() == that.getIssueDate()
&& this.getMaxDate() == that.getMaxDate()
&& this.getMasterKeyId() == that.getMasterKeyId()
&& isEqual(this.getOwner(), that.getOwner())
&& isEqual(this.getRenewer(), that.getRenewer())
&& isEqual(this.getRealUser(), that.getRealUser())
&& isEqual(this.getMessage(), that.getMessage());
return false;
public int hashCode() {
return this.getSequenceNumber();
@ -43,6 +43,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
@ -222,7 +223,50 @@ public class TestClientRMTokens {
} catch (YarnException e) {
// Test new version token
// Stop the existing proxy, start another.
if (clientRMWithDT != null) {
clientRMWithDT = null;
token = getDelegationToken(loggedInUser, clientRMService,
byte[] tokenIdentifierContent = token.getIdentifier().array();
RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
// Construct new version RMDelegationTokenIdentifier with additional field
RMDelegationTokenIdentifierForTest newVersionTokenIdentifier =
new RMDelegationTokenIdentifierForTest(tokenIdentifier, "message");
Token<RMDelegationTokenIdentifier> newRMDTtoken =
new Token<RMDelegationTokenIdentifier>(newVersionTokenIdentifier,
org.apache.hadoop.yarn.api.records.Token newToken =
// Now try talking to RMService using the new version delegation token
clientRMWithDT = getClientRMProtocolWithDT(newToken,
clientRMService.getBindAddress(), "loginuser3", conf);
request = Records.newRecord(GetNewApplicationRequest.class);
try {
} catch (IOException e) {
fail("Unexpected exception" + e);
} catch (YarnException e) {
fail("Unexpected exception" + e);
} finally {
@ -0,0 +1,110 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.proto.YarnSecurityTestClientAMTokenProtos.ClientToAMTokenIdentifierForTestProto;
import com.google.protobuf.TextFormat;
public class ClientToAMTokenIdentifierForTest extends ClientToAMTokenIdentifier {
private ClientToAMTokenIdentifierForTestProto proto;
public ClientToAMTokenIdentifierForTest() {
public ClientToAMTokenIdentifierForTest(
ClientToAMTokenIdentifier tokenIdentifier, String message) {
ClientToAMTokenIdentifierForTestProto.Builder builder =
proto = builder.build();
public ApplicationAttemptId getApplicationAttemptID() {
if (!proto.hasAppAttemptId()) {
return null;
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
public String getClientName() {
return proto.getClientName();
public void write(DataOutput out) throws IOException {
public void readFields(DataInput in) throws IOException {
DataInputStream dis = (DataInputStream)in;
byte[] buffer = IOUtils.toByteArray(dis);
proto = ClientToAMTokenIdentifierForTestProto.parseFrom(buffer);
public UserGroupInformation getUser() {
String clientName = getClientName();
if (clientName == null) {
return null;
return UserGroupInformation.createRemoteUser(clientName);
public int hashCode() {
return getNewProto().hashCode();
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getNewProto().equals(this.getClass().cast(other).getNewProto());
return false;
public ClientToAMTokenIdentifierForTestProto getNewProto() {
return proto;
public String toString() {
return TextFormat.shortDebugString(getNewProto());
@ -28,6 +28,7 @@ import java.lang.annotation.Annotation;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import javax.security.sasl.SaslException;
@ -35,6 +36,7 @@ import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
@ -115,6 +117,7 @@ public class TestClientToAMTokens {
private final byte[] secretKey;
private InetSocketAddress address;
private boolean pinged = false;
private ClientToAMTokenSecretManager secretMgr;
public CustomAM(ApplicationAttemptId appId, byte[] secretKey) {
@ -127,18 +130,23 @@ public class TestClientToAMTokens {
this.pinged = true;
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
return secretMgr;
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
Server server;
try {
secretMgr = new ClientToAMTokenSecretManager(
this.appAttemptId, secretKey);
server =
new RPC.Builder(conf)
new ClientToAMTokenSecretManager(this.appAttemptId, secretKey))
} catch (Exception e) {
throw new YarnRuntimeException(e);
@ -267,6 +275,12 @@ public class TestClientToAMTokens {
// Now for an authenticated user
verifyValidToken(conf, am, token);
// Verify for a new version token
verifyNewVersionToken(conf, am, token, rm);
private void verifyTokenWithTamperedID(final Configuration conf,
@ -338,6 +352,33 @@ public class TestClientToAMTokens {
private void verifyNewVersionToken(final Configuration conf, final CustomAM am,
Token<ClientToAMTokenIdentifier> token, MockRM rm) throws IOException,
InterruptedException {
UserGroupInformation ugi;
ugi = UserGroupInformation.createRemoteUser("me");
Token<ClientToAMTokenIdentifier> newToken =
new Token<ClientToAMTokenIdentifier>(
new ClientToAMTokenIdentifierForTest(token.decodeIdentifier(), "message"),
ugi.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
CustomProtocol client =
(CustomProtocol) RPC.getProxy(CustomProtocol.class, 1L, am.address,
return null;
private void verifyValidToken(final Configuration conf, final CustomAM am,
Token<ClientToAMTokenIdentifier> token) throws IOException,
InterruptedException {
@ -0,0 +1,43 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "YarnSecurityTestClientAMTokenProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_protos.proto";
message ClientToAMTokenIdentifierForTestProto {
optional ApplicationAttemptIdProto appAttemptId = 1;
optional string clientName = 2;
optional string message = 3;
message RMDelegationTokenIdentifierForTestProto {
optional string owner = 1;
optional string renewer = 2;
optional string realUser = 3;
optional int64 issueDate = 4;
optional int64 maxDate = 5;
optional int32 sequenceNumber = 6;
optional int32 masterKeyId = 7 [default = -1];
optional int64 renewDate = 8;
optional string message = 9;
Reference in New Issue