diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 067776611d7..0a6500b3433 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -262,6 +262,9 @@ Release 2.6.0 - UNRELEASED YARN-2372. There are Chinese Characters in the FairScheduler's document (Fengdong Yu via aw) + YARN-668. Changed NMTokenIdentifier/AMRMTokenIdentifier/ContainerTokenIdentifier + to use protobuf object as the payload. (Junping Du via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index b21ea520911..cb4df3a4049 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -131,4 +131,39 @@ + + + + + org.apache.hadoop + hadoop-maven-plugins + + + compile-protoc + generate-sources + + protoc + + + ${protobuf.version} + ${protoc.path} + + ${basedir}/src/test/proto + ${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto + ${basedir}/../hadoop-yarn-api/src/main/proto + + + ${basedir}/src/test/proto + + test_amrm_token.proto + + + ${project.build.directory}/generated-sources/java + + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/AMRMTokenIdentifierForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/AMRMTokenIdentifierForTest.java new file mode 100644 index 00000000000..f94f0c8edd1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/AMRMTokenIdentifierForTest.java @@ -0,0 +1,130 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnSecurityTestAMRMTokenProtos.AMRMTokenIdentifierForTestProto; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; + +import com.google.protobuf.TextFormat; + +public class AMRMTokenIdentifierForTest extends AMRMTokenIdentifier { + + private static Log LOG = LogFactory.getLog(AMRMTokenIdentifierForTest.class); + + public static final Text KIND = new Text("YARN_AM_RM_TOKEN"); + + private AMRMTokenIdentifierForTestProto proto; + private AMRMTokenIdentifierForTestProto.Builder builder; + + public AMRMTokenIdentifierForTest(){ + builder = AMRMTokenIdentifierForTestProto.newBuilder(); + } + + public AMRMTokenIdentifierForTest(AMRMTokenIdentifierForTestProto proto) { + this.proto = proto; + } + + public AMRMTokenIdentifierForTest(AMRMTokenIdentifier tokenIdentifier, + String message) { + builder = AMRMTokenIdentifierForTestProto.newBuilder(); + builder.setAppAttemptId(tokenIdentifier.getProto().getAppAttemptId()); + builder.setKeyId(tokenIdentifier.getKeyId()); + builder.setMessage(message); + proto = builder.build(); + builder = null; + } + + @Override + public void write(DataOutput out) throws IOException { + out.write(proto.toByteArray()); + } + + @Override + public void readFields(DataInput in) throws IOException { + DataInputStream dis = (DataInputStream)in; + byte[] buffer = IOUtils.toByteArray(dis); + proto = AMRMTokenIdentifierForTestProto.parseFrom(buffer); + } + + @Override + public Text getKind() { + return KIND; + } + + public String getMessage() { + return proto.getMessage(); + } + + public void setMessage(String message) { + builder.setMessage(message); + } + + public void build() { + proto = builder.build(); + builder = null; + } + + public ApplicationAttemptId getApplicationAttemptId() { + return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId()); + } + + public int getKeyId() { + return proto.getKeyId(); + } + + public AMRMTokenIdentifierForTestProto getNewProto(){ + return this.proto; + } + + @Override + public int hashCode() { + return this.proto.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getNewProto().equals(this.getClass().cast(other).getNewProto()); + } + return false; + } + + + @Override + public String toString() { + return TextFormat.shortDebugString(this.proto); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index a434e35a9f2..4921452375e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -41,6 +41,7 @@ import java.util.TreeSet; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -908,6 +909,36 @@ public class TestAMRMClient { // can do the allocate call with latest AMRMToken amClient.allocate(0.1f); + + // Verify latest AMRMToken can be used to send allocation request. + UserGroupInformation testUser1 = + UserGroupInformation.createRemoteUser("testUser1"); + + AMRMTokenIdentifierForTest newVersionTokenIdentifier = + new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message"); + + Assert.assertEquals("Message is changed after set to newVersionTokenIdentifier", + "message", newVersionTokenIdentifier.getMessage()); + org.apache.hadoop.security.token.Token newVersionToken = + new org.apache.hadoop.security.token.Token ( + newVersionTokenIdentifier.getBytes(), + amrmTokenSecretManager.retrievePassword(newVersionTokenIdentifier), + newVersionTokenIdentifier.getKind(), new Text()); + + SecurityUtil.setTokenService(newVersionToken, yarnCluster + .getResourceManager().getApplicationMasterService().getBindAddress()); + testUser1.addToken(newVersionToken); + + + testUser1.doAs(new PrivilegedAction() { + @Override + public ApplicationMasterProtocol run() { + return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy( + ApplicationMasterProtocol.class, + yarnCluster.getResourceManager().getApplicationMasterService() + .getBindAddress(), conf); + } + }).allocate(Records.newRecord(AllocateRequest.class)); // Make sure previous token has been rolled-over // and can not use this rolled-over token to make a allocate all. @@ -931,12 +962,12 @@ public class TestAMRMClient { } try { - UserGroupInformation testUser = - UserGroupInformation.createRemoteUser("testUser"); + UserGroupInformation testUser2 = + UserGroupInformation.createRemoteUser("testUser2"); SecurityUtil.setTokenService(amrmToken_2, yarnCluster .getResourceManager().getApplicationMasterService().getBindAddress()); - testUser.addToken(amrmToken_2); - testUser.doAs(new PrivilegedAction() { + testUser2.addToken(amrmToken_2); + testUser2.doAs(new PrivilegedAction() { @Override public ApplicationMasterProtocol run() { return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/proto/test_amrm_token.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/proto/test_amrm_token.proto new file mode 100644 index 00000000000..6773277e783 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/proto/test_amrm_token.proto @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "YarnSecurityTestAMRMTokenProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_protos.proto"; + +message AMRMTokenIdentifierForTestProto { + optional ApplicationAttemptIdProto appAttemptId = 1; + optional int32 keyId = 2; + optional string message = 3; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index 04cf50debff..3adfe8b7881 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -232,6 +232,30 @@ + + compile-protoc + generate-sources + + protoc + + + ${protobuf.version} + ${protoc.path} + + ${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto + ${basedir}/../hadoop-yarn-api/src/main/proto + ${basedir}/src/main/proto + + + ${basedir}/src/main/proto/server + + yarn_security_token.proto + + + ${project.build.directory}/generated-sources/java + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java index bc2d7c5624a..84fce5eb51f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.security; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -32,6 +34,10 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.AMRMTokenIdentifierProto; + +import com.google.protobuf.TextFormat; /** * AMRMTokenIdentifier is the TokenIdentifier to be used by @@ -42,49 +48,41 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; public class AMRMTokenIdentifier extends TokenIdentifier { public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN"); - - private ApplicationAttemptId applicationAttemptId; - private int keyId = Integer.MIN_VALUE; + private AMRMTokenIdentifierProto proto; public AMRMTokenIdentifier() { } - - public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId) { - this(); - this.applicationAttemptId = appAttemptId; - } - + public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId, int masterKeyId) { - this(); - this.applicationAttemptId = appAttemptId; - this.keyId = masterKeyId; + AMRMTokenIdentifierProto.Builder builder = + AMRMTokenIdentifierProto.newBuilder(); + if (appAttemptId != null) { + builder.setAppAttemptId( + ((ApplicationAttemptIdPBImpl)appAttemptId).getProto()); + } + builder.setKeyId(masterKeyId); + proto = builder.build(); } @Private public ApplicationAttemptId getApplicationAttemptId() { - return this.applicationAttemptId; + if (!proto.hasAppAttemptId()) { + return null; + } + return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId()); } @Override public void write(DataOutput out) throws IOException { - ApplicationId appId = this.applicationAttemptId.getApplicationId(); - out.writeLong(appId.getClusterTimestamp()); - out.writeInt(appId.getId()); - out.writeInt(this.applicationAttemptId.getAttemptId()); - out.writeInt(this.keyId); + out.write(proto.toByteArray()); } @Override public void readFields(DataInput in) throws IOException { - long clusterTimeStamp = in.readLong(); - int appId = in.readInt(); - int attemptId = in.readInt(); - ApplicationId applicationId = - ApplicationId.newInstance(clusterTimeStamp, appId); - this.applicationAttemptId = - ApplicationAttemptId.newInstance(applicationId, attemptId); - this.keyId = in.readInt(); + DataInputStream dis = (DataInputStream)in; + byte[] buffer = IOUtils.toByteArray(dis); + proto = AMRMTokenIdentifierProto.parseFrom(buffer); } @Override @@ -94,16 +92,20 @@ public class AMRMTokenIdentifier extends TokenIdentifier { @Override public UserGroupInformation getUser() { - if (this.applicationAttemptId == null - || "".equals(this.applicationAttemptId.toString())) { - return null; + String appAttemptId = null; + if (proto.hasAppAttemptId()) { + appAttemptId = + new ApplicationAttemptIdPBImpl(proto.getAppAttemptId()).toString(); } - return UserGroupInformation.createRemoteUser(this.applicationAttemptId - .toString()); + return UserGroupInformation.createRemoteUser(appAttemptId); } public int getKeyId() { - return this.keyId; + return proto.getKeyId(); + } + + public AMRMTokenIdentifierProto getProto() { + return this.proto; } // TODO: Needed? @@ -114,4 +116,24 @@ public class AMRMTokenIdentifier extends TokenIdentifier { return KIND_NAME; } } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index 0bb016a0533..e61f07c2634 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.security; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -38,7 +40,14 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; +import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto; + +import com.google.protobuf.TextFormat; + /** * TokenIdentifier for a container. Encodes {@link ContainerId}, @@ -53,15 +62,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier { public static final Text KIND = new Text("ContainerToken"); - private ContainerId containerId; - private String nmHostAddr; - private String appSubmitter; - private Resource resource; - private long expiryTimeStamp; - private int masterKeyId; - private long rmIdentifier; - private Priority priority; - private long creationTime; + private ContainerTokenIdentifierProto proto; private LogAggregationContext logAggregationContext; public ContainerTokenIdentifier(ContainerId containerID, @@ -75,16 +76,29 @@ public class ContainerTokenIdentifier extends TokenIdentifier { String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, long rmIdentifier, Priority priority, long creationTime, LogAggregationContext logAggregationContext) { - this.containerId = containerID; - this.nmHostAddr = hostName; - this.appSubmitter = appSubmitter; - this.resource = r; - this.expiryTimeStamp = expiryTimeStamp; - this.masterKeyId = masterKeyId; - this.rmIdentifier = rmIdentifier; - this.priority = priority; - this.creationTime = creationTime; - this.logAggregationContext = logAggregationContext; + ContainerTokenIdentifierProto.Builder builder = + ContainerTokenIdentifierProto.newBuilder(); + if (containerID != null) { + builder.setContainerId(((ContainerIdPBImpl)containerID).getProto()); + } + builder.setNmHostAddr(hostName); + builder.setAppSubmitter(appSubmitter); + if (r != null) { + builder.setResource(((ResourcePBImpl)r).getProto()); + } + builder.setExpiryTimeStamp(expiryTimeStamp); + builder.setMasterKeyId(masterKeyId); + builder.setRmIdentifier(rmIdentifier); + if (priority != null) { + builder.setPriority(((PriorityPBImpl)priority).getProto()); + } + builder.setCreationTime(creationTime); + + if (logAggregationContext != null) { + builder.setLogAggregationContext( + ((LogAggregationContextPBImpl)logAggregationContext).getProto()); + } + proto = builder.build(); } /** @@ -94,104 +108,75 @@ public class ContainerTokenIdentifier extends TokenIdentifier { } public ContainerId getContainerID() { - return this.containerId; + if (!proto.hasContainerId()) { + return null; + } + return new ContainerIdPBImpl(proto.getContainerId()); } public String getApplicationSubmitter() { - return this.appSubmitter; + return proto.getAppSubmitter(); } public String getNmHostAddress() { - return this.nmHostAddr; + return proto.getNmHostAddr(); } public Resource getResource() { - return this.resource; + if (!proto.hasResource()) { + return null; + } + return new ResourcePBImpl(proto.getResource()); } public long getExpiryTimeStamp() { - return this.expiryTimeStamp; + return proto.getExpiryTimeStamp(); } public int getMasterKeyId() { - return this.masterKeyId; + return proto.getMasterKeyId(); } public Priority getPriority() { - return this.priority; + if (!proto.hasPriority()) { + return null; + } + return new PriorityPBImpl(proto.getPriority()); } public long getCreationTime() { - return this.creationTime; + return proto.getCreationTime(); } /** * Get the RMIdentifier of RM in which containers are allocated * @return RMIdentifier */ - public long getRMIdentifer() { - return this.rmIdentifier; + public long getRMIdentifier() { + return proto.getRmIdentifier(); + } + + public ContainerTokenIdentifierProto getProto() { + return proto; } public LogAggregationContext getLogAggregationContext() { - return this.logAggregationContext; + if (!proto.hasLogAggregationContext()) { + return null; + } + return new LogAggregationContextPBImpl(proto.getLogAggregationContext()); } @Override public void write(DataOutput out) throws IOException { LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this); - ApplicationAttemptId applicationAttemptId = this.containerId - .getApplicationAttemptId(); - ApplicationId applicationId = applicationAttemptId.getApplicationId(); - out.writeLong(applicationId.getClusterTimestamp()); - out.writeInt(applicationId.getId()); - out.writeInt(applicationAttemptId.getAttemptId()); - out.writeLong(this.containerId.getContainerId()); - out.writeUTF(this.nmHostAddr); - out.writeUTF(this.appSubmitter); - out.writeInt(this.resource.getMemory()); - out.writeInt(this.resource.getVirtualCores()); - out.writeLong(this.expiryTimeStamp); - out.writeInt(this.masterKeyId); - out.writeLong(this.rmIdentifier); - out.writeInt(this.priority.getPriority()); - out.writeLong(this.creationTime); - if (this.logAggregationContext == null) { - out.writeInt(-1); - } else { - byte[] logAggregationContext = - ((LogAggregationContextPBImpl) this.logAggregationContext).getProto() - .toByteArray(); - out.writeInt(logAggregationContext.length); - out.write(logAggregationContext); - } + out.write(proto.toByteArray()); } @Override public void readFields(DataInput in) throws IOException { - ApplicationId applicationId = - ApplicationId.newInstance(in.readLong(), in.readInt()); - ApplicationAttemptId applicationAttemptId = - ApplicationAttemptId.newInstance(applicationId, in.readInt()); - this.containerId = - ContainerId.newInstance(applicationAttemptId, in.readLong()); - this.nmHostAddr = in.readUTF(); - this.appSubmitter = in.readUTF(); - int memory = in.readInt(); - int vCores = in.readInt(); - this.resource = Resource.newInstance(memory, vCores); - this.expiryTimeStamp = in.readLong(); - this.masterKeyId = in.readInt(); - this.rmIdentifier = in.readLong(); - this.priority = Priority.newInstance(in.readInt()); - this.creationTime = in.readLong(); - int size = in.readInt(); - if (size != -1) { - byte[] bytes = new byte[size]; - in.readFully(bytes); - this.logAggregationContext = - new LogAggregationContextPBImpl( - LogAggregationContextProto.parseFrom(bytes)); - } + DataInputStream dis = (DataInputStream)in; + byte[] buffer = IOUtils.toByteArray(dis); + proto = ContainerTokenIdentifierProto.parseFrom(buffer); } @Override @@ -201,7 +186,12 @@ public class ContainerTokenIdentifier extends TokenIdentifier { @Override public UserGroupInformation getUser() { - return UserGroupInformation.createRemoteUser(this.containerId.toString()); + String containerId = null; + if (proto.hasContainerId()) { + containerId = new ContainerIdPBImpl(proto.getContainerId()).toString(); + } + return UserGroupInformation.createRemoteUser( + containerId); } // TODO: Needed? @@ -212,4 +202,24 @@ public class ContainerTokenIdentifier extends TokenIdentifier { return KIND; } } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java index 55ddd0ac632..25670fa35c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java @@ -18,10 +18,13 @@ package org.apache.hadoop.yarn.security; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -32,6 +35,12 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; +import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.NMTokenIdentifierProto; + +import com.google.protobuf.TextFormat; @Public @Evolving @@ -41,17 +50,21 @@ public class NMTokenIdentifier extends TokenIdentifier { public static final Text KIND = new Text("NMToken"); - private ApplicationAttemptId appAttemptId; - private NodeId nodeId; - private String appSubmitter; - private int keyId; + private NMTokenIdentifierProto proto; - public NMTokenIdentifier(ApplicationAttemptId appAttemptId, NodeId nodeId, - String applicationSubmitter, int masterKeyId) { - this.appAttemptId = appAttemptId; - this.nodeId = nodeId; - this.appSubmitter = applicationSubmitter; - this.keyId = masterKeyId; + public NMTokenIdentifier(ApplicationAttemptId appAttemptId, + NodeId nodeId, String applicationSubmitter, int masterKeyId) { + NMTokenIdentifierProto.Builder builder = NMTokenIdentifierProto.newBuilder(); + if (appAttemptId != null) { + builder.setAppAttemptId( + ((ApplicationAttemptIdPBImpl)appAttemptId).getProto()); + } + if (nodeId != null) { + builder.setNodeId(((NodeIdPBImpl)nodeId).getProto()); + } + builder.setAppSubmitter(applicationSubmitter); + builder.setKeyId(masterKeyId); + proto = builder.build(); } /** @@ -61,43 +74,38 @@ public class NMTokenIdentifier extends TokenIdentifier { } public ApplicationAttemptId getApplicationAttemptId() { - return appAttemptId; + if (!proto.hasAppAttemptId()) { + return null; + } + return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId()); } public NodeId getNodeId() { - return nodeId; + if (!proto.hasNodeId()) { + return null; + } + return new NodeIdPBImpl(proto.getNodeId()); } public String getApplicationSubmitter() { - return appSubmitter; + return proto.getAppSubmitter(); } public int getKeyId() { - return keyId; + return proto.getKeyId(); } @Override public void write(DataOutput out) throws IOException { LOG.debug("Writing NMTokenIdentifier to RPC layer: " + this); - ApplicationId applicationId = appAttemptId.getApplicationId(); - out.writeLong(applicationId.getClusterTimestamp()); - out.writeInt(applicationId.getId()); - out.writeInt(appAttemptId.getAttemptId()); - out.writeUTF(this.nodeId.toString()); - out.writeUTF(this.appSubmitter); - out.writeInt(this.keyId); + out.write(proto.toByteArray()); } @Override public void readFields(DataInput in) throws IOException { - appAttemptId = - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(in.readLong(), in.readInt()), - in.readInt()); - String[] hostAddr = in.readUTF().split(":"); - nodeId = NodeId.newInstance(hostAddr[0], Integer.parseInt(hostAddr[1])); - appSubmitter = in.readUTF(); - keyId = in.readInt(); + DataInputStream dis = (DataInputStream)in; + byte[] buffer = IOUtils.toByteArray(dis); + proto = NMTokenIdentifierProto.parseFrom(buffer); } @Override @@ -107,6 +115,35 @@ public class NMTokenIdentifier extends TokenIdentifier { @Override public UserGroupInformation getUser() { - return UserGroupInformation.createRemoteUser(appAttemptId.toString()); + String appAttemptId = null; + if (proto.hasAppAttemptId()) { + appAttemptId = new ApplicationAttemptIdPBImpl( + proto.getAppAttemptId()).toString(); + } + return UserGroupInformation.createRemoteUser(appAttemptId); + } + + public NMTokenIdentifierProto getProto() { + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto new file mode 100644 index 00000000000..845873f57ec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "YarnSecurityTokenProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_protos.proto"; + +// None of the following records are supposed to be exposed to users. + +message NMTokenIdentifierProto { + optional ApplicationAttemptIdProto appAttemptId = 1; + optional NodeIdProto nodeId = 2; + optional string appSubmitter = 3; + optional int32 keyId = 4 [default = -1]; +} + +message AMRMTokenIdentifierProto { + optional ApplicationAttemptIdProto appAttemptId = 1; + optional int32 keyId = 2 [default = -1]; +} + +message ContainerTokenIdentifierProto { + optional ContainerIdProto containerId = 1; + optional string nmHostAddr = 2; + optional string appSubmitter = 3; + optional ResourceProto resource = 4; + optional int64 expiryTimeStamp =5; + optional int32 masterKeyId = 6 [default = -1]; + optional int64 rmIdentifier = 7; + optional PriorityProto priority = 8; + optional int64 creationTime = 9; + optional LogAggregationContextProto logAggregationContext = 10; +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java new file mode 100644 index 00000000000..3c0d5d1d03f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.security; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.junit.Assert; +import org.junit.Test; + +public class TestYARNTokenIdentifier { + + @Test + public void testNMTokenIdentifier() { + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1, 1), 1); + NodeId nodeId = NodeId.newInstance("host0", 0); + String applicationSubmitter = "usr0"; + int masterKeyId = 1; + + NMTokenIdentifier token = new NMTokenIdentifier( + appAttemptId, nodeId, applicationSubmitter, masterKeyId); + + NMTokenIdentifier anotherToken = new NMTokenIdentifier( + appAttemptId, nodeId, applicationSubmitter, masterKeyId); + + // verify the whole record equals with original record + Assert.assertEquals("Token is not the same after serialization " + + "and deserialization.", token, anotherToken); + + // verify all properties are the same as original + Assert.assertEquals( + "appAttemptId from proto is not the same with original token", + anotherToken.getApplicationAttemptId(), appAttemptId); + + Assert.assertEquals( + "NodeId from proto is not the same with original token", + anotherToken.getNodeId(), nodeId); + + Assert.assertEquals( + "applicationSubmitter from proto is not the same with original token", + anotherToken.getApplicationSubmitter(), applicationSubmitter); + + Assert.assertEquals( + "masterKeyId from proto is not the same with original token", + anotherToken.getKeyId(), masterKeyId); + } + + @Test + public void testAMRMTokenIdentifier() { + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1, 1), 1); + int masterKeyId = 1; + + AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, masterKeyId); + + AMRMTokenIdentifier anotherToken = new AMRMTokenIdentifier( + appAttemptId, masterKeyId); + + // verify the whole record equals with original record + Assert.assertEquals("Token is not the same after serialization " + + "and deserialization.", token, anotherToken); + + Assert.assertEquals("ApplicationAttemptId from proto is not the same with original token", + anotherToken.getApplicationAttemptId(), appAttemptId); + + Assert.assertEquals("masterKeyId from proto is not the same with original token", + anotherToken.getKeyId(), masterKeyId); + } + + @Test + public void testContainerTokenIdentifier() { + ContainerId containerID = ContainerId.newInstance( + ApplicationAttemptId.newInstance(ApplicationId.newInstance( + 1, 1), 1), 1); + String hostName = "host0"; + String appSubmitter = "usr0"; + Resource r = Resource.newInstance(1024, 1); + long expiryTimeStamp = 1000; + int masterKeyId = 1; + long rmIdentifier = 1; + Priority priority = Priority.newInstance(1); + long creationTime = 1000; + + ContainerTokenIdentifier token = new ContainerTokenIdentifier( + containerID, hostName, appSubmitter, r, expiryTimeStamp, + masterKeyId, rmIdentifier, priority, creationTime); + + ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier( + containerID, hostName, appSubmitter, r, expiryTimeStamp, + masterKeyId, rmIdentifier, priority, creationTime); + + // verify the whole record equals with original record + Assert.assertEquals("Token is not the same after serialization " + + "and deserialization.", token, anotherToken); + + Assert.assertEquals( + "ContainerID from proto is not the same with original token", + anotherToken.getContainerID(), containerID); + + Assert.assertEquals( + "Hostname from proto is not the same with original token", + anotherToken.getNmHostAddress(), hostName); + + Assert.assertEquals( + "ApplicationSubmitter from proto is not the same with original token", + anotherToken.getApplicationSubmitter(), appSubmitter); + + Assert.assertEquals( + "Resource from proto is not the same with original token", + anotherToken.getResource(), r); + + Assert.assertEquals( + "expiryTimeStamp from proto is not the same with original token", + anotherToken.getExpiryTimeStamp(), expiryTimeStamp); + + Assert.assertEquals( + "KeyId from proto is not the same with original token", + anotherToken.getMasterKeyId(), masterKeyId); + + Assert.assertEquals( + "RMIdentifier from proto is not the same with original token", + anotherToken.getRMIdentifier(), rmIdentifier); + + Assert.assertEquals( + "Priority from proto is not the same with original token", + anotherToken.getPriority(), priority); + + Assert.assertEquals( + "CreationTime from proto is not the same with original token", + anotherToken.getCreationTime(), creationTime); + + Assert.assertNull(anotherToken.getLogAggregationContext()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 17c9e3e26b1..e8001ffc628 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -133,6 +133,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; @@ -650,8 +651,8 @@ public class ContainerManagerImpl extends CompositeService implements boolean unauthorized = false; StringBuilder messageBuilder = new StringBuilder("Unauthorized request to start container. "); - if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().equals( - containerId.getApplicationAttemptId().getApplicationId())) { + if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId(). + equals(containerId.getApplicationAttemptId().getApplicationId())) { unauthorized = true; messageBuilder.append("\nNMToken for application attempt : ") .append(nmTokenIdentifier.getApplicationAttemptId()) @@ -784,7 +785,7 @@ public class ContainerManagerImpl extends CompositeService implements */ authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier); - if (containerTokenIdentifier.getRMIdentifer() != nodeStatusUpdater + if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater .getRMIdentifier()) { // Is the container coming from unknown RM StringBuilder sb = new StringBuilder("\nContainer "); @@ -1035,9 +1036,10 @@ public class ContainerManagerImpl extends CompositeService implements */ ApplicationId nmTokenAppId = identifier.getApplicationAttemptId().getApplicationId(); + if ((!nmTokenAppId.equals(containerId.getApplicationAttemptId().getApplicationId())) || (container != null && !nmTokenAppId.equals(container - .getContainerId().getApplicationAttemptId().getApplicationId()))) { + .getContainerId().getApplicationAttemptId().getApplicationId()))) { if (stopRequest) { LOG.warn(identifier.getApplicationAttemptId() + " attempted to stop non-application container : " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index a813e9806c8..8f7fa782996 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -528,7 +528,7 @@ public class TestContainer { public boolean matches(Object o) { ContainersLauncherEvent evt = (ContainersLauncherEvent) o; return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER - && wcf.cId == evt.getContainer().getContainerId(); + && wcf.cId.equals(evt.getContainer().getContainerId()); } }; verify(wc.launcherBus).handle(argThat(matchesLaunchReq)); @@ -751,7 +751,7 @@ public class TestContainer { String host = "127.0.0.1"; int port = 1234; long currentTime = System.currentTimeMillis(); - ContainerTokenIdentifier identifier = + ContainerTokenIdentifier identifier = new ContainerTokenIdentifier(cId, "127.0.0.1", user, resource, currentTime + 10000L, 123, currentTime, Priority.newInstance(0), 0); Token token = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 3508a3cf762..a0a50efe5cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -99,7 +99,7 @@ public class TestApplicationMasterService { ContainerTokenIdentifier tokenId = BuilderUtils.newContainerTokenIdentifier(allocatedContainer .getContainerToken()); - Assert.assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifer()); + Assert.assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifier()); rm.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml index 30f334d0756..2ac274d016b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml @@ -113,6 +113,35 @@ + + org.apache.hadoop + hadoop-maven-plugins + + + compile-protoc + generate-sources + + protoc + + + ${protobuf.version} + ${protoc.path} + + ${basedir}/src/test/proto + ${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto + ${basedir}/../../hadoop-yarn-api/src/main/proto + + + ${basedir}/src/test/proto + + test_token.proto + + + ${project.build.directory}/generated-sources/java + + + + maven-jar-plugin diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/ContainerTokenIdentifierForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/ContainerTokenIdentifierForTest.java new file mode 100644 index 00000000000..7e8004d410e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/ContainerTokenIdentifierForTest.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; +import org.apache.hadoop.yarn.proto.YarnSecurityTestTokenProtos.ContainerTokenIdentifierForTestProto; + +import com.google.protobuf.TextFormat; + +public class ContainerTokenIdentifierForTest extends ContainerTokenIdentifier { + + private static Log LOG = LogFactory.getLog(ContainerTokenIdentifier.class); + + public static final Text KIND = new Text("ContainerToken"); + + private ContainerTokenIdentifierForTestProto proto; + + public ContainerTokenIdentifierForTest(ContainerId containerID, + String hostName, String appSubmitter, Resource r, long expiryTimeStamp, + int masterKeyId, long rmIdentifier, Priority priority, long creationTime, + LogAggregationContext logAggregationContext) { + ContainerTokenIdentifierForTestProto.Builder builder = + ContainerTokenIdentifierForTestProto.newBuilder(); + if (containerID != null) { + builder.setContainerId(((ContainerIdPBImpl)containerID).getProto()); + } + builder.setNmHostAddr(hostName); + builder.setAppSubmitter(appSubmitter); + if (r != null) { + builder.setResource(((ResourcePBImpl)r).getProto()); + } + builder.setExpiryTimeStamp(expiryTimeStamp); + builder.setMasterKeyId(masterKeyId); + builder.setRmIdentifier(rmIdentifier); + if (priority != null) { + builder.setPriority(((PriorityPBImpl)priority).getProto()); + } + builder.setCreationTime(creationTime); + + if (logAggregationContext != null) { + builder.setLogAggregationContext( + ((LogAggregationContextPBImpl)logAggregationContext).getProto()); + } + proto = builder.build(); + } + + public ContainerTokenIdentifierForTest(ContainerTokenIdentifier identifier, + String message) { + ContainerTokenIdentifierForTestProto.Builder builder = + ContainerTokenIdentifierForTestProto.newBuilder(); + ContainerIdPBImpl containerID = + (ContainerIdPBImpl)identifier.getContainerID(); + if (containerID != null) { + builder.setContainerId(containerID.getProto()); + } + builder.setNmHostAddr(identifier.getNmHostAddress()); + builder.setAppSubmitter(identifier.getApplicationSubmitter()); + + ResourcePBImpl resource = (ResourcePBImpl)identifier.getResource(); + if (resource != null) { + builder.setResource(resource.getProto()); + } + + builder.setExpiryTimeStamp(identifier.getExpiryTimeStamp()); + builder.setMasterKeyId(identifier.getMasterKeyId()); + builder.setRmIdentifier(identifier.getRMIdentifier()); + + PriorityPBImpl priority = (PriorityPBImpl)identifier.getPriority(); + if (priority != null) { + builder.setPriority(priority.getProto()); + } + + builder.setCreationTime(identifier.getCreationTime()); + builder.setMessage(message); + + LogAggregationContextPBImpl logAggregationContext = + (LogAggregationContextPBImpl)identifier.getLogAggregationContext(); + + if (logAggregationContext != null) { + builder.setLogAggregationContext(logAggregationContext.getProto()); + } + + proto = builder.build(); + } + + public ContainerId getContainerID() { + return new ContainerIdPBImpl(proto.getContainerId()); + } + + public String getApplicationSubmitter() { + return proto.getAppSubmitter(); + } + + public String getNmHostAddress() { + return proto.getNmHostAddr(); + } + + public Resource getResource() { + return new ResourcePBImpl(proto.getResource()); + } + + public long getExpiryTimeStamp() { + return proto.getExpiryTimeStamp(); + } + + public int getMasterKeyId() { + return proto.getMasterKeyId(); + } + + public Priority getPriority() { + return new PriorityPBImpl(proto.getPriority()); + } + + public long getCreationTime() { + return proto.getCreationTime(); + } + /** + * Get the RMIdentifier of RM in which containers are allocated + * @return RMIdentifier + */ + public long getRMIdentifier() { + return proto.getRmIdentifier(); + } + + @Override + public void readFields(DataInput in) throws IOException { + DataInputStream dis = (DataInputStream)in; + byte[] buffer = IOUtils.toByteArray(dis); + proto = ContainerTokenIdentifierForTestProto.parseFrom(buffer); + } + + @Override + public void write(DataOutput out) throws IOException { + LOG.debug("Writing ContainerTokenIdentifierForTest to RPC layer: " + this); + out.write(proto.toByteArray()); + } + + ContainerTokenIdentifierForTestProto getNewProto() { + return this.proto; + } + + @Override + public int hashCode() { + return this.proto.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getNewProto().equals(this.getClass().cast(other).getNewProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(this.proto); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/NMTokenIdentifierNewForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/NMTokenIdentifierNewForTest.java new file mode 100644 index 00000000000..8153b44b635 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/NMTokenIdentifierNewForTest.java @@ -0,0 +1,145 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnSecurityTestTokenProtos.NMTokenIdentifierNewProto; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; + +import com.google.protobuf.TextFormat; + +public class NMTokenIdentifierNewForTest extends NMTokenIdentifier { + + private static Log LOG = LogFactory.getLog(NMTokenIdentifierNewForTest.class); + + public static final Text KIND = new Text("NMToken"); + + private NMTokenIdentifierNewProto proto; + private NMTokenIdentifierNewProto.Builder builder; + + public NMTokenIdentifierNewForTest(){ + builder = NMTokenIdentifierNewProto.newBuilder(); + } + + public NMTokenIdentifierNewForTest(NMTokenIdentifierNewProto proto) { + this.proto = proto; + } + + public NMTokenIdentifierNewForTest(NMTokenIdentifier tokenIdentifier, + String message) { + builder = NMTokenIdentifierNewProto.newBuilder(); + builder.setAppAttemptId(tokenIdentifier.getProto().getAppAttemptId()); + builder.setNodeId(tokenIdentifier.getProto().getNodeId()); + builder.setAppSubmitter(tokenIdentifier.getApplicationSubmitter()); + builder.setKeyId(tokenIdentifier.getKeyId()); + builder.setMessage(message); + proto = builder.build(); + builder = null; + } + + @Override + public void write(DataOutput out) throws IOException { + LOG.debug("Writing NMTokenIdentifierNewForTest to RPC layer: " + this); + out.write(proto.toByteArray()); + } + + @Override + public void readFields(DataInput in) throws IOException { + DataInputStream dis = (DataInputStream)in; + byte[] buffer = IOUtils.toByteArray(dis); + proto = NMTokenIdentifierNewProto.parseFrom(buffer); + } + + @Override + public Text getKind() { + return KIND; + } + + @Override + public UserGroupInformation getUser() { + return null; + } + + public String getMessage() { + return proto.getMessage(); + } + + public void setMessage(String message) { + builder.setMessage(message); + } + + public NMTokenIdentifierNewProto getNewProto() { + return proto; + } + + public void build() { + proto = builder.build(); + builder = null; + } + + public ApplicationAttemptId getApplicationAttemptId() { + return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId()); + } + + public NodeId getNodeId() { + return new NodeIdPBImpl(proto.getNodeId()); + } + + public String getApplicationSubmitter() { + return proto.getAppSubmitter(); + } + + public int getKeyId() { + return proto.getKeyId(); + } + + @Override + public int hashCode() { + return this.proto.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getNewProto().equals(this.getClass().cast(other).getNewProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(this.proto); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java index 3f82d72abc6..0726a3aca1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.minikdc.KerberosSecurityTestcase; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -62,12 +63,14 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -302,6 +305,56 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase { Assert.assertTrue(nmTokenSecretManagerNM .isAppAttemptNMTokenKeyPresent(validAppAttemptId)); + // using a new compatible version nmtoken, expect container can be started + // successfully. + ApplicationAttemptId validAppAttemptId2 = + ApplicationAttemptId.newInstance(appId, 2); + + ContainerId validContainerId2 = + ContainerId.newInstance(validAppAttemptId2, 0); + + org.apache.hadoop.yarn.api.records.Token validContainerToken2 = + containerTokenSecretManager.createContainerToken(validContainerId2, + validNode, user, r, Priority.newInstance(0), 0); + + org.apache.hadoop.yarn.api.records.Token validNMToken2 = + nmTokenSecretManagerRM.createNMToken(validAppAttemptId2, validNode, user); + // First, get a new NMTokenIdentifier. + NMTokenIdentifier newIdentifier = new NMTokenIdentifier(); + byte[] tokenIdentifierContent = validNMToken2.getIdentifier().array(); + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(tokenIdentifierContent, tokenIdentifierContent.length); + newIdentifier.readFields(dib); + + // Then, generate a new version NMTokenIdentifier (NMTokenIdentifierNewForTest) + // with additional field of message. + NMTokenIdentifierNewForTest newVersionIdentifier = + new NMTokenIdentifierNewForTest(newIdentifier, "message"); + + // check new version NMTokenIdentifier has correct info. + Assert.assertEquals("The ApplicationAttemptId is changed after set to " + + "newVersionIdentifier", validAppAttemptId2.getAttemptId(), + newVersionIdentifier.getApplicationAttemptId().getAttemptId() + ); + + Assert.assertEquals("The message is changed after set to newVersionIdentifier", + "message", newVersionIdentifier.getMessage()); + + Assert.assertEquals("The NodeId is changed after set to newVersionIdentifier", + validNode, newVersionIdentifier.getNodeId()); + + // create new Token based on new version NMTokenIdentifier. + org.apache.hadoop.yarn.api.records.Token newVersionedNMToken = + BaseNMTokenSecretManager.newInstance( + nmTokenSecretManagerRM.retrievePassword(newVersionIdentifier), + newVersionIdentifier); + + // Verify startContainer is successful and no exception is thrown. + Assert.assertTrue(testStartContainer(rpc, validAppAttemptId2, validNode, + validContainerToken2, newVersionedNMToken, false).isEmpty()); + Assert.assertTrue(nmTokenSecretManagerNM + .isAppAttemptNMTokenKeyPresent(validAppAttemptId2)); + //Now lets wait till container finishes and is removed from node manager. waitForContainerToFinishOnNM(validContainerId); sb = new StringBuilder("Attempt to relaunch the same container with id "); @@ -607,11 +660,36 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase { Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(), nmTokenSecretManagerInRM.getCurrentKey().getKeyId()); - // Creating a tampered Container Token + RMContainerTokenSecretManager containerTokenSecretManager = yarnCluster.getResourceManager().getRMContext(). getContainerTokenSecretManager(); + Resource r = Resource.newInstance(1230, 2); + + Token containerToken = + containerTokenSecretManager.createContainerToken( + cId, nodeId, user, r, Priority.newInstance(0), 0); + + ContainerTokenIdentifier containerTokenIdentifier = + getContainerTokenIdentifierFromToken(containerToken); + + // Verify new compatible version ContainerTokenIdentifier can work successfully. + ContainerTokenIdentifierForTest newVersionTokenIdentifier = + new ContainerTokenIdentifierForTest(containerTokenIdentifier, "message"); + byte[] password = + containerTokenSecretManager.createPassword(newVersionTokenIdentifier); + + Token newContainerToken = BuilderUtils.newContainerToken( + nodeId, password, newVersionTokenIdentifier); + + Token nmToken = + nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user); + YarnRPC rpc = YarnRPC.create(conf); + Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId, + newContainerToken, nmToken, false).isEmpty()); + + // Creating a tampered Container Token RMContainerTokenSecretManager tamperedContainerTokenSecretManager = new RMContainerTokenSecretManager(conf); tamperedContainerTokenSecretManager.rollMasterKey(); @@ -621,19 +699,28 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase { } while (containerTokenSecretManager.getCurrentKey().getKeyId() == tamperedContainerTokenSecretManager.getCurrentKey().getKeyId()); - Resource r = Resource.newInstance(1230, 2); + ContainerId cId2 = ContainerId.newInstance(appAttemptId, 1); // Creating modified containerToken - Token containerToken = - tamperedContainerTokenSecretManager.createContainerToken(cId, nodeId, + Token containerToken2 = + tamperedContainerTokenSecretManager.createContainerToken(cId2, nodeId, user, r, Priority.newInstance(0), 0); - Token nmToken = - nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user); - YarnRPC rpc = YarnRPC.create(conf); + StringBuilder sb = new StringBuilder("Given Container "); - sb.append(cId); + sb.append(cId2); sb.append(" seems to have an illegally generated token."); Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId, - containerToken, nmToken, true).contains(sb.toString())); + containerToken2, nmToken, true).contains(sb.toString())); + } + + private ContainerTokenIdentifier getContainerTokenIdentifierFromToken( + Token containerToken) throws IOException { + ContainerTokenIdentifier containerTokenIdentifier; + containerTokenIdentifier = new ContainerTokenIdentifier(); + byte[] tokenIdentifierContent = containerToken.getIdentifier().array(); + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(tokenIdentifierContent, tokenIdentifierContent.length); + containerTokenIdentifier.readFields(dib); + return containerTokenIdentifier; } /** @@ -676,12 +763,15 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase { Token containerToken = containerTokenSecretManager.createContainerToken(cId, nodeId, user, r, Priority.newInstance(0), 0); - - ByteArrayDataInput input = ByteStreams.newDataInput( - containerToken.getIdentifier().array()); + ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier(); - containerTokenIdentifier.readFields(input); + byte[] tokenIdentifierContent = containerToken.getIdentifier().array(); + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(tokenIdentifierContent, tokenIdentifierContent.length); + containerTokenIdentifier.readFields(dib); + + Assert.assertEquals(cId, containerTokenIdentifier.getContainerID()); Assert.assertEquals( cId.toString(), containerTokenIdentifier.getContainerID().toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/proto/test_token.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/proto/test_token.proto new file mode 100644 index 00000000000..853f47791d1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/proto/test_token.proto @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "YarnSecurityTestTokenProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_protos.proto"; + +message NMTokenIdentifierNewProto { + optional ApplicationAttemptIdProto appAttemptId = 1; + optional NodeIdProto nodeId = 2; + optional string appSubmitter = 3; + optional int32 keyId = 4; + optional string message = 5; +} + +message ContainerTokenIdentifierForTestProto { + optional ContainerIdProto containerId = 1; + optional string nmHostAddr = 2; + optional string appSubmitter = 3; + optional ResourceProto resource = 4; + optional int64 expiryTimeStamp = 5; + optional int32 masterKeyId = 6; + optional int64 rmIdentifier = 7; + optional PriorityProto priority = 8; + optional int64 creationTime = 9; + optional LogAggregationContextProto logAggregationContext = 10; + optional string message = 11; +} +