YARN-668. Changed NMTokenIdentifier/AMRMTokenIdentifier/ContainerTokenIdentifier to use protobuf object as the payload. Contributed by Junping Du.

This commit is contained in:
Jian He 2014-09-26 17:47:16 -07:00
parent 6b7673e3cd
commit 5391919b09
19 changed files with 1205 additions and 166 deletions

View File

@ -262,6 +262,9 @@ Release 2.6.0 - UNRELEASED
YARN-2372. There are Chinese Characters in the FairScheduler's document YARN-2372. There are Chinese Characters in the FairScheduler's document
(Fengdong Yu via aw) (Fengdong Yu via aw)
YARN-668. Changed NMTokenIdentifier/AMRMTokenIdentifier/ContainerTokenIdentifier
to use protobuf object as the payload. (Junping Du via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -131,4 +131,39 @@
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions>
<execution>
<id>compile-protoc</id>
<phase>generate-sources</phase>
<goals>
<goal>protoc</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<protocCommand>${protoc.path}</protocCommand>
<imports>
<param>${basedir}/src/test/proto</param>
<param>${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto</param>
<param>${basedir}/../hadoop-yarn-api/src/main/proto</param>
</imports>
<source>
<directory>${basedir}/src/test/proto</directory>
<includes>
<include>test_amrm_token.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTestAMRMTokenProtos.AMRMTokenIdentifierForTestProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import com.google.protobuf.TextFormat;
public class AMRMTokenIdentifierForTest extends AMRMTokenIdentifier {
private static Log LOG = LogFactory.getLog(AMRMTokenIdentifierForTest.class);
public static final Text KIND = new Text("YARN_AM_RM_TOKEN");
private AMRMTokenIdentifierForTestProto proto;
private AMRMTokenIdentifierForTestProto.Builder builder;
public AMRMTokenIdentifierForTest(){
builder = AMRMTokenIdentifierForTestProto.newBuilder();
}
public AMRMTokenIdentifierForTest(AMRMTokenIdentifierForTestProto proto) {
this.proto = proto;
}
public AMRMTokenIdentifierForTest(AMRMTokenIdentifier tokenIdentifier,
String message) {
builder = AMRMTokenIdentifierForTestProto.newBuilder();
builder.setAppAttemptId(tokenIdentifier.getProto().getAppAttemptId());
builder.setKeyId(tokenIdentifier.getKeyId());
builder.setMessage(message);
proto = builder.build();
builder = null;
}
@Override
public void write(DataOutput out) throws IOException {
out.write(proto.toByteArray());
}
@Override
public void readFields(DataInput in) throws IOException {
DataInputStream dis = (DataInputStream)in;
byte[] buffer = IOUtils.toByteArray(dis);
proto = AMRMTokenIdentifierForTestProto.parseFrom(buffer);
}
@Override
public Text getKind() {
return KIND;
}
public String getMessage() {
return proto.getMessage();
}
public void setMessage(String message) {
builder.setMessage(message);
}
public void build() {
proto = builder.build();
builder = null;
}
public ApplicationAttemptId getApplicationAttemptId() {
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
}
public int getKeyId() {
return proto.getKeyId();
}
public AMRMTokenIdentifierForTestProto getNewProto(){
return this.proto;
}
@Override
public int hashCode() {
return this.proto.hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getNewProto().equals(this.getClass().cast(other).getNewProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(this.proto);
}
}

View File

@ -41,6 +41,7 @@ import java.util.TreeSet;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -908,6 +909,36 @@ public class TestAMRMClient {
// can do the allocate call with latest AMRMToken // can do the allocate call with latest AMRMToken
amClient.allocate(0.1f); amClient.allocate(0.1f);
// Verify latest AMRMToken can be used to send allocation request.
UserGroupInformation testUser1 =
UserGroupInformation.createRemoteUser("testUser1");
AMRMTokenIdentifierForTest newVersionTokenIdentifier =
new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message");
Assert.assertEquals("Message is changed after set to newVersionTokenIdentifier",
"message", newVersionTokenIdentifier.getMessage());
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newVersionToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> (
newVersionTokenIdentifier.getBytes(),
amrmTokenSecretManager.retrievePassword(newVersionTokenIdentifier),
newVersionTokenIdentifier.getKind(), new Text());
SecurityUtil.setTokenService(newVersionToken, yarnCluster
.getResourceManager().getApplicationMasterService().getBindAddress());
testUser1.addToken(newVersionToken);
testUser1.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
@Override
public ApplicationMasterProtocol run() {
return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy(
ApplicationMasterProtocol.class,
yarnCluster.getResourceManager().getApplicationMasterService()
.getBindAddress(), conf);
}
}).allocate(Records.newRecord(AllocateRequest.class));
// Make sure previous token has been rolled-over // Make sure previous token has been rolled-over
// and can not use this rolled-over token to make a allocate all. // and can not use this rolled-over token to make a allocate all.
@ -931,12 +962,12 @@ public class TestAMRMClient {
} }
try { try {
UserGroupInformation testUser = UserGroupInformation testUser2 =
UserGroupInformation.createRemoteUser("testUser"); UserGroupInformation.createRemoteUser("testUser2");
SecurityUtil.setTokenService(amrmToken_2, yarnCluster SecurityUtil.setTokenService(amrmToken_2, yarnCluster
.getResourceManager().getApplicationMasterService().getBindAddress()); .getResourceManager().getApplicationMasterService().getBindAddress());
testUser.addToken(amrmToken_2); testUser2.addToken(amrmToken_2);
testUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() { testUser2.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
@Override @Override
public ApplicationMasterProtocol run() { public ApplicationMasterProtocol run() {
return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy( return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy(

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "YarnSecurityTestAMRMTokenProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_protos.proto";
message AMRMTokenIdentifierForTestProto {
optional ApplicationAttemptIdProto appAttemptId = 1;
optional int32 keyId = 2;
optional string message = 3;
}

View File

@ -232,6 +232,30 @@
</source> </source>
</configuration> </configuration>
</execution> </execution>
<execution>
<id>compile-protoc</id>
<phase>generate-sources</phase>
<goals>
<goal>protoc</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<protocCommand>${protoc.path}</protocCommand>
<imports>
<param>${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto</param>
<param>${basedir}/../hadoop-yarn-api/src/main/proto</param>
<param>${basedir}/src/main/proto</param>
</imports>
<source>
<directory>${basedir}/src/main/proto/server</directory>
<includes>
<include>yarn_security_token.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
</configuration>
</execution>
</executions> </executions>
</plugin> </plugin>
<plugin> <plugin>

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.security; package org.apache.hadoop.yarn.security;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -32,6 +34,10 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.AMRMTokenIdentifierProto;
import com.google.protobuf.TextFormat;
/** /**
* AMRMTokenIdentifier is the TokenIdentifier to be used by * AMRMTokenIdentifier is the TokenIdentifier to be used by
@ -42,49 +48,41 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
public class AMRMTokenIdentifier extends TokenIdentifier { public class AMRMTokenIdentifier extends TokenIdentifier {
public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN"); public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN");
private AMRMTokenIdentifierProto proto;
private ApplicationAttemptId applicationAttemptId;
private int keyId = Integer.MIN_VALUE;
public AMRMTokenIdentifier() { public AMRMTokenIdentifier() {
} }
public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId) {
this();
this.applicationAttemptId = appAttemptId;
}
public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId, public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId,
int masterKeyId) { int masterKeyId) {
this(); AMRMTokenIdentifierProto.Builder builder =
this.applicationAttemptId = appAttemptId; AMRMTokenIdentifierProto.newBuilder();
this.keyId = masterKeyId; if (appAttemptId != null) {
builder.setAppAttemptId(
((ApplicationAttemptIdPBImpl)appAttemptId).getProto());
}
builder.setKeyId(masterKeyId);
proto = builder.build();
} }
@Private @Private
public ApplicationAttemptId getApplicationAttemptId() { public ApplicationAttemptId getApplicationAttemptId() {
return this.applicationAttemptId; if (!proto.hasAppAttemptId()) {
return null;
}
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
ApplicationId appId = this.applicationAttemptId.getApplicationId(); out.write(proto.toByteArray());
out.writeLong(appId.getClusterTimestamp());
out.writeInt(appId.getId());
out.writeInt(this.applicationAttemptId.getAttemptId());
out.writeInt(this.keyId);
} }
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
long clusterTimeStamp = in.readLong(); DataInputStream dis = (DataInputStream)in;
int appId = in.readInt(); byte[] buffer = IOUtils.toByteArray(dis);
int attemptId = in.readInt(); proto = AMRMTokenIdentifierProto.parseFrom(buffer);
ApplicationId applicationId =
ApplicationId.newInstance(clusterTimeStamp, appId);
this.applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, attemptId);
this.keyId = in.readInt();
} }
@Override @Override
@ -94,16 +92,20 @@ public class AMRMTokenIdentifier extends TokenIdentifier {
@Override @Override
public UserGroupInformation getUser() { public UserGroupInformation getUser() {
if (this.applicationAttemptId == null String appAttemptId = null;
|| "".equals(this.applicationAttemptId.toString())) { if (proto.hasAppAttemptId()) {
return null; appAttemptId =
new ApplicationAttemptIdPBImpl(proto.getAppAttemptId()).toString();
} }
return UserGroupInformation.createRemoteUser(this.applicationAttemptId return UserGroupInformation.createRemoteUser(appAttemptId);
.toString());
} }
public int getKeyId() { public int getKeyId() {
return this.keyId; return proto.getKeyId();
}
public AMRMTokenIdentifierProto getProto() {
return this.proto;
} }
// TODO: Needed? // TODO: Needed?
@ -114,4 +116,24 @@ public class AMRMTokenIdentifier extends TokenIdentifier {
return KIND_NAME; return KIND_NAME;
} }
} }
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
} }

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.security; package org.apache.hadoop.yarn.security;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -38,7 +40,14 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
import com.google.protobuf.TextFormat;
/** /**
* TokenIdentifier for a container. Encodes {@link ContainerId}, * TokenIdentifier for a container. Encodes {@link ContainerId},
@ -53,15 +62,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
public static final Text KIND = new Text("ContainerToken"); public static final Text KIND = new Text("ContainerToken");
private ContainerId containerId; private ContainerTokenIdentifierProto proto;
private String nmHostAddr;
private String appSubmitter;
private Resource resource;
private long expiryTimeStamp;
private int masterKeyId;
private long rmIdentifier;
private Priority priority;
private long creationTime;
private LogAggregationContext logAggregationContext; private LogAggregationContext logAggregationContext;
public ContainerTokenIdentifier(ContainerId containerID, public ContainerTokenIdentifier(ContainerId containerID,
@ -75,16 +76,29 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
long rmIdentifier, Priority priority, long creationTime, long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext) { LogAggregationContext logAggregationContext) {
this.containerId = containerID; ContainerTokenIdentifierProto.Builder builder =
this.nmHostAddr = hostName; ContainerTokenIdentifierProto.newBuilder();
this.appSubmitter = appSubmitter; if (containerID != null) {
this.resource = r; builder.setContainerId(((ContainerIdPBImpl)containerID).getProto());
this.expiryTimeStamp = expiryTimeStamp; }
this.masterKeyId = masterKeyId; builder.setNmHostAddr(hostName);
this.rmIdentifier = rmIdentifier; builder.setAppSubmitter(appSubmitter);
this.priority = priority; if (r != null) {
this.creationTime = creationTime; builder.setResource(((ResourcePBImpl)r).getProto());
this.logAggregationContext = logAggregationContext; }
builder.setExpiryTimeStamp(expiryTimeStamp);
builder.setMasterKeyId(masterKeyId);
builder.setRmIdentifier(rmIdentifier);
if (priority != null) {
builder.setPriority(((PriorityPBImpl)priority).getProto());
}
builder.setCreationTime(creationTime);
if (logAggregationContext != null) {
builder.setLogAggregationContext(
((LogAggregationContextPBImpl)logAggregationContext).getProto());
}
proto = builder.build();
} }
/** /**
@ -94,104 +108,75 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
} }
public ContainerId getContainerID() { public ContainerId getContainerID() {
return this.containerId; if (!proto.hasContainerId()) {
return null;
}
return new ContainerIdPBImpl(proto.getContainerId());
} }
public String getApplicationSubmitter() { public String getApplicationSubmitter() {
return this.appSubmitter; return proto.getAppSubmitter();
} }
public String getNmHostAddress() { public String getNmHostAddress() {
return this.nmHostAddr; return proto.getNmHostAddr();
} }
public Resource getResource() { public Resource getResource() {
return this.resource; if (!proto.hasResource()) {
return null;
}
return new ResourcePBImpl(proto.getResource());
} }
public long getExpiryTimeStamp() { public long getExpiryTimeStamp() {
return this.expiryTimeStamp; return proto.getExpiryTimeStamp();
} }
public int getMasterKeyId() { public int getMasterKeyId() {
return this.masterKeyId; return proto.getMasterKeyId();
} }
public Priority getPriority() { public Priority getPriority() {
return this.priority; if (!proto.hasPriority()) {
return null;
}
return new PriorityPBImpl(proto.getPriority());
} }
public long getCreationTime() { public long getCreationTime() {
return this.creationTime; return proto.getCreationTime();
} }
/** /**
* Get the RMIdentifier of RM in which containers are allocated * Get the RMIdentifier of RM in which containers are allocated
* @return RMIdentifier * @return RMIdentifier
*/ */
public long getRMIdentifer() { public long getRMIdentifier() {
return this.rmIdentifier; return proto.getRmIdentifier();
}
public ContainerTokenIdentifierProto getProto() {
return proto;
} }
public LogAggregationContext getLogAggregationContext() { public LogAggregationContext getLogAggregationContext() {
return this.logAggregationContext; if (!proto.hasLogAggregationContext()) {
return null;
}
return new LogAggregationContextPBImpl(proto.getLogAggregationContext());
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this); LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
ApplicationAttemptId applicationAttemptId = this.containerId out.write(proto.toByteArray());
.getApplicationAttemptId();
ApplicationId applicationId = applicationAttemptId.getApplicationId();
out.writeLong(applicationId.getClusterTimestamp());
out.writeInt(applicationId.getId());
out.writeInt(applicationAttemptId.getAttemptId());
out.writeLong(this.containerId.getContainerId());
out.writeUTF(this.nmHostAddr);
out.writeUTF(this.appSubmitter);
out.writeInt(this.resource.getMemory());
out.writeInt(this.resource.getVirtualCores());
out.writeLong(this.expiryTimeStamp);
out.writeInt(this.masterKeyId);
out.writeLong(this.rmIdentifier);
out.writeInt(this.priority.getPriority());
out.writeLong(this.creationTime);
if (this.logAggregationContext == null) {
out.writeInt(-1);
} else {
byte[] logAggregationContext =
((LogAggregationContextPBImpl) this.logAggregationContext).getProto()
.toByteArray();
out.writeInt(logAggregationContext.length);
out.write(logAggregationContext);
}
} }
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
ApplicationId applicationId = DataInputStream dis = (DataInputStream)in;
ApplicationId.newInstance(in.readLong(), in.readInt()); byte[] buffer = IOUtils.toByteArray(dis);
ApplicationAttemptId applicationAttemptId = proto = ContainerTokenIdentifierProto.parseFrom(buffer);
ApplicationAttemptId.newInstance(applicationId, in.readInt());
this.containerId =
ContainerId.newInstance(applicationAttemptId, in.readLong());
this.nmHostAddr = in.readUTF();
this.appSubmitter = in.readUTF();
int memory = in.readInt();
int vCores = in.readInt();
this.resource = Resource.newInstance(memory, vCores);
this.expiryTimeStamp = in.readLong();
this.masterKeyId = in.readInt();
this.rmIdentifier = in.readLong();
this.priority = Priority.newInstance(in.readInt());
this.creationTime = in.readLong();
int size = in.readInt();
if (size != -1) {
byte[] bytes = new byte[size];
in.readFully(bytes);
this.logAggregationContext =
new LogAggregationContextPBImpl(
LogAggregationContextProto.parseFrom(bytes));
}
} }
@Override @Override
@ -201,7 +186,12 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
@Override @Override
public UserGroupInformation getUser() { public UserGroupInformation getUser() {
return UserGroupInformation.createRemoteUser(this.containerId.toString()); String containerId = null;
if (proto.hasContainerId()) {
containerId = new ContainerIdPBImpl(proto.getContainerId()).toString();
}
return UserGroupInformation.createRemoteUser(
containerId);
} }
// TODO: Needed? // TODO: Needed?
@ -212,4 +202,24 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
return KIND; return KIND;
} }
} }
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
} }

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.yarn.security; package org.apache.hadoop.yarn.security;
import java.io.ByteArrayInputStream;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -32,6 +35,12 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.NMTokenIdentifierProto;
import com.google.protobuf.TextFormat;
@Public @Public
@Evolving @Evolving
@ -41,17 +50,21 @@ public class NMTokenIdentifier extends TokenIdentifier {
public static final Text KIND = new Text("NMToken"); public static final Text KIND = new Text("NMToken");
private ApplicationAttemptId appAttemptId; private NMTokenIdentifierProto proto;
private NodeId nodeId;
private String appSubmitter;
private int keyId;
public NMTokenIdentifier(ApplicationAttemptId appAttemptId, NodeId nodeId, public NMTokenIdentifier(ApplicationAttemptId appAttemptId,
String applicationSubmitter, int masterKeyId) { NodeId nodeId, String applicationSubmitter, int masterKeyId) {
this.appAttemptId = appAttemptId; NMTokenIdentifierProto.Builder builder = NMTokenIdentifierProto.newBuilder();
this.nodeId = nodeId; if (appAttemptId != null) {
this.appSubmitter = applicationSubmitter; builder.setAppAttemptId(
this.keyId = masterKeyId; ((ApplicationAttemptIdPBImpl)appAttemptId).getProto());
}
if (nodeId != null) {
builder.setNodeId(((NodeIdPBImpl)nodeId).getProto());
}
builder.setAppSubmitter(applicationSubmitter);
builder.setKeyId(masterKeyId);
proto = builder.build();
} }
/** /**
@ -61,43 +74,38 @@ public class NMTokenIdentifier extends TokenIdentifier {
} }
public ApplicationAttemptId getApplicationAttemptId() { public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptId; if (!proto.hasAppAttemptId()) {
return null;
}
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
} }
public NodeId getNodeId() { public NodeId getNodeId() {
return nodeId; if (!proto.hasNodeId()) {
return null;
}
return new NodeIdPBImpl(proto.getNodeId());
} }
public String getApplicationSubmitter() { public String getApplicationSubmitter() {
return appSubmitter; return proto.getAppSubmitter();
} }
public int getKeyId() { public int getKeyId() {
return keyId; return proto.getKeyId();
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
LOG.debug("Writing NMTokenIdentifier to RPC layer: " + this); LOG.debug("Writing NMTokenIdentifier to RPC layer: " + this);
ApplicationId applicationId = appAttemptId.getApplicationId(); out.write(proto.toByteArray());
out.writeLong(applicationId.getClusterTimestamp());
out.writeInt(applicationId.getId());
out.writeInt(appAttemptId.getAttemptId());
out.writeUTF(this.nodeId.toString());
out.writeUTF(this.appSubmitter);
out.writeInt(this.keyId);
} }
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
appAttemptId = DataInputStream dis = (DataInputStream)in;
ApplicationAttemptId.newInstance( byte[] buffer = IOUtils.toByteArray(dis);
ApplicationId.newInstance(in.readLong(), in.readInt()), proto = NMTokenIdentifierProto.parseFrom(buffer);
in.readInt());
String[] hostAddr = in.readUTF().split(":");
nodeId = NodeId.newInstance(hostAddr[0], Integer.parseInt(hostAddr[1]));
appSubmitter = in.readUTF();
keyId = in.readInt();
} }
@Override @Override
@ -107,6 +115,35 @@ public class NMTokenIdentifier extends TokenIdentifier {
@Override @Override
public UserGroupInformation getUser() { public UserGroupInformation getUser() {
return UserGroupInformation.createRemoteUser(appAttemptId.toString()); String appAttemptId = null;
if (proto.hasAppAttemptId()) {
appAttemptId = new ApplicationAttemptIdPBImpl(
proto.getAppAttemptId()).toString();
}
return UserGroupInformation.createRemoteUser(appAttemptId);
}
public NMTokenIdentifierProto getProto() {
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
} }
} }

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "YarnSecurityTokenProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_protos.proto";
// None of the following records are supposed to be exposed to users.
message NMTokenIdentifierProto {
optional ApplicationAttemptIdProto appAttemptId = 1;
optional NodeIdProto nodeId = 2;
optional string appSubmitter = 3;
optional int32 keyId = 4 [default = -1];
}
message AMRMTokenIdentifierProto {
optional ApplicationAttemptIdProto appAttemptId = 1;
optional int32 keyId = 2 [default = -1];
}
message ContainerTokenIdentifierProto {
optional ContainerIdProto containerId = 1;
optional string nmHostAddr = 2;
optional string appSubmitter = 3;
optional ResourceProto resource = 4;
optional int64 expiryTimeStamp =5;
optional int32 masterKeyId = 6 [default = -1];
optional int64 rmIdentifier = 7;
optional PriorityProto priority = 8;
optional int64 creationTime = 9;
optional LogAggregationContextProto logAggregationContext = 10;
}

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.security;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.junit.Assert;
import org.junit.Test;
public class TestYARNTokenIdentifier {
@Test
public void testNMTokenIdentifier() {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1, 1), 1);
NodeId nodeId = NodeId.newInstance("host0", 0);
String applicationSubmitter = "usr0";
int masterKeyId = 1;
NMTokenIdentifier token = new NMTokenIdentifier(
appAttemptId, nodeId, applicationSubmitter, masterKeyId);
NMTokenIdentifier anotherToken = new NMTokenIdentifier(
appAttemptId, nodeId, applicationSubmitter, masterKeyId);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
"and deserialization.", token, anotherToken);
// verify all properties are the same as original
Assert.assertEquals(
"appAttemptId from proto is not the same with original token",
anotherToken.getApplicationAttemptId(), appAttemptId);
Assert.assertEquals(
"NodeId from proto is not the same with original token",
anotherToken.getNodeId(), nodeId);
Assert.assertEquals(
"applicationSubmitter from proto is not the same with original token",
anotherToken.getApplicationSubmitter(), applicationSubmitter);
Assert.assertEquals(
"masterKeyId from proto is not the same with original token",
anotherToken.getKeyId(), masterKeyId);
}
@Test
public void testAMRMTokenIdentifier() {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1, 1), 1);
int masterKeyId = 1;
AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, masterKeyId);
AMRMTokenIdentifier anotherToken = new AMRMTokenIdentifier(
appAttemptId, masterKeyId);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
"and deserialization.", token, anotherToken);
Assert.assertEquals("ApplicationAttemptId from proto is not the same with original token",
anotherToken.getApplicationAttemptId(), appAttemptId);
Assert.assertEquals("masterKeyId from proto is not the same with original token",
anotherToken.getKeyId(), masterKeyId);
}
@Test
public void testContainerTokenIdentifier() {
ContainerId containerID = ContainerId.newInstance(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(
1, 1), 1), 1);
String hostName = "host0";
String appSubmitter = "usr0";
Resource r = Resource.newInstance(1024, 1);
long expiryTimeStamp = 1000;
int masterKeyId = 1;
long rmIdentifier = 1;
Priority priority = Priority.newInstance(1);
long creationTime = 1000;
ContainerTokenIdentifier token = new ContainerTokenIdentifier(
containerID, hostName, appSubmitter, r, expiryTimeStamp,
masterKeyId, rmIdentifier, priority, creationTime);
ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier(
containerID, hostName, appSubmitter, r, expiryTimeStamp,
masterKeyId, rmIdentifier, priority, creationTime);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
"and deserialization.", token, anotherToken);
Assert.assertEquals(
"ContainerID from proto is not the same with original token",
anotherToken.getContainerID(), containerID);
Assert.assertEquals(
"Hostname from proto is not the same with original token",
anotherToken.getNmHostAddress(), hostName);
Assert.assertEquals(
"ApplicationSubmitter from proto is not the same with original token",
anotherToken.getApplicationSubmitter(), appSubmitter);
Assert.assertEquals(
"Resource from proto is not the same with original token",
anotherToken.getResource(), r);
Assert.assertEquals(
"expiryTimeStamp from proto is not the same with original token",
anotherToken.getExpiryTimeStamp(), expiryTimeStamp);
Assert.assertEquals(
"KeyId from proto is not the same with original token",
anotherToken.getMasterKeyId(), masterKeyId);
Assert.assertEquals(
"RMIdentifier from proto is not the same with original token",
anotherToken.getRMIdentifier(), rmIdentifier);
Assert.assertEquals(
"Priority from proto is not the same with original token",
anotherToken.getPriority(), priority);
Assert.assertEquals(
"CreationTime from proto is not the same with original token",
anotherToken.getCreationTime(), creationTime);
Assert.assertNull(anotherToken.getLogAggregationContext());
}
}

View File

@ -133,6 +133,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -650,8 +651,8 @@ public class ContainerManagerImpl extends CompositeService implements
boolean unauthorized = false; boolean unauthorized = false;
StringBuilder messageBuilder = StringBuilder messageBuilder =
new StringBuilder("Unauthorized request to start container. "); new StringBuilder("Unauthorized request to start container. ");
if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().equals( if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().
containerId.getApplicationAttemptId().getApplicationId())) { equals(containerId.getApplicationAttemptId().getApplicationId())) {
unauthorized = true; unauthorized = true;
messageBuilder.append("\nNMToken for application attempt : ") messageBuilder.append("\nNMToken for application attempt : ")
.append(nmTokenIdentifier.getApplicationAttemptId()) .append(nmTokenIdentifier.getApplicationAttemptId())
@ -784,7 +785,7 @@ public class ContainerManagerImpl extends CompositeService implements
*/ */
authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier); authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier);
if (containerTokenIdentifier.getRMIdentifer() != nodeStatusUpdater if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
.getRMIdentifier()) { .getRMIdentifier()) {
// Is the container coming from unknown RM // Is the container coming from unknown RM
StringBuilder sb = new StringBuilder("\nContainer "); StringBuilder sb = new StringBuilder("\nContainer ");
@ -1035,9 +1036,10 @@ public class ContainerManagerImpl extends CompositeService implements
*/ */
ApplicationId nmTokenAppId = ApplicationId nmTokenAppId =
identifier.getApplicationAttemptId().getApplicationId(); identifier.getApplicationAttemptId().getApplicationId();
if ((!nmTokenAppId.equals(containerId.getApplicationAttemptId().getApplicationId())) if ((!nmTokenAppId.equals(containerId.getApplicationAttemptId().getApplicationId()))
|| (container != null && !nmTokenAppId.equals(container || (container != null && !nmTokenAppId.equals(container
.getContainerId().getApplicationAttemptId().getApplicationId()))) { .getContainerId().getApplicationAttemptId().getApplicationId()))) {
if (stopRequest) { if (stopRequest) {
LOG.warn(identifier.getApplicationAttemptId() LOG.warn(identifier.getApplicationAttemptId()
+ " attempted to stop non-application container : " + " attempted to stop non-application container : "

View File

@ -528,7 +528,7 @@ public class TestContainer {
public boolean matches(Object o) { public boolean matches(Object o) {
ContainersLauncherEvent evt = (ContainersLauncherEvent) o; ContainersLauncherEvent evt = (ContainersLauncherEvent) o;
return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER
&& wcf.cId == evt.getContainer().getContainerId(); && wcf.cId.equals(evt.getContainer().getContainerId());
} }
}; };
verify(wc.launcherBus).handle(argThat(matchesLaunchReq)); verify(wc.launcherBus).handle(argThat(matchesLaunchReq));
@ -751,7 +751,7 @@ public class TestContainer {
String host = "127.0.0.1"; String host = "127.0.0.1";
int port = 1234; int port = 1234;
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
ContainerTokenIdentifier identifier = ContainerTokenIdentifier identifier =
new ContainerTokenIdentifier(cId, "127.0.0.1", user, resource, new ContainerTokenIdentifier(cId, "127.0.0.1", user, resource,
currentTime + 10000L, 123, currentTime, Priority.newInstance(0), 0); currentTime + 10000L, 123, currentTime, Priority.newInstance(0), 0);
Token token = Token token =

View File

@ -99,7 +99,7 @@ public class TestApplicationMasterService {
ContainerTokenIdentifier tokenId = ContainerTokenIdentifier tokenId =
BuilderUtils.newContainerTokenIdentifier(allocatedContainer BuilderUtils.newContainerTokenIdentifier(allocatedContainer
.getContainerToken()); .getContainerToken());
Assert.assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifer()); Assert.assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifier());
rm.stop(); rm.stop();
} }

View File

@ -113,6 +113,35 @@
<build> <build>
<plugins> <plugins>
<plugin>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions>
<execution>
<id>compile-protoc</id>
<phase>generate-sources</phase>
<goals>
<goal>protoc</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<protocCommand>${protoc.path}</protocCommand>
<imports>
<param>${basedir}/src/test/proto</param>
<param>${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto</param>
<param>${basedir}/../../hadoop-yarn-api/src/main/proto</param>
</imports>
<source>
<directory>${basedir}/src/test/proto</directory>
<includes>
<include>test_token.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
</configuration>
</execution>
</executions>
</plugin>
<plugin> <plugin>
<artifactId>maven-jar-plugin</artifactId> <artifactId>maven-jar-plugin</artifactId>
<executions> <executions>

View File

@ -0,0 +1,195 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTestTokenProtos.ContainerTokenIdentifierForTestProto;
import com.google.protobuf.TextFormat;
public class ContainerTokenIdentifierForTest extends ContainerTokenIdentifier {
private static Log LOG = LogFactory.getLog(ContainerTokenIdentifier.class);
public static final Text KIND = new Text("ContainerToken");
private ContainerTokenIdentifierForTestProto proto;
public ContainerTokenIdentifierForTest(ContainerId containerID,
String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
int masterKeyId, long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext) {
ContainerTokenIdentifierForTestProto.Builder builder =
ContainerTokenIdentifierForTestProto.newBuilder();
if (containerID != null) {
builder.setContainerId(((ContainerIdPBImpl)containerID).getProto());
}
builder.setNmHostAddr(hostName);
builder.setAppSubmitter(appSubmitter);
if (r != null) {
builder.setResource(((ResourcePBImpl)r).getProto());
}
builder.setExpiryTimeStamp(expiryTimeStamp);
builder.setMasterKeyId(masterKeyId);
builder.setRmIdentifier(rmIdentifier);
if (priority != null) {
builder.setPriority(((PriorityPBImpl)priority).getProto());
}
builder.setCreationTime(creationTime);
if (logAggregationContext != null) {
builder.setLogAggregationContext(
((LogAggregationContextPBImpl)logAggregationContext).getProto());
}
proto = builder.build();
}
public ContainerTokenIdentifierForTest(ContainerTokenIdentifier identifier,
String message) {
ContainerTokenIdentifierForTestProto.Builder builder =
ContainerTokenIdentifierForTestProto.newBuilder();
ContainerIdPBImpl containerID =
(ContainerIdPBImpl)identifier.getContainerID();
if (containerID != null) {
builder.setContainerId(containerID.getProto());
}
builder.setNmHostAddr(identifier.getNmHostAddress());
builder.setAppSubmitter(identifier.getApplicationSubmitter());
ResourcePBImpl resource = (ResourcePBImpl)identifier.getResource();
if (resource != null) {
builder.setResource(resource.getProto());
}
builder.setExpiryTimeStamp(identifier.getExpiryTimeStamp());
builder.setMasterKeyId(identifier.getMasterKeyId());
builder.setRmIdentifier(identifier.getRMIdentifier());
PriorityPBImpl priority = (PriorityPBImpl)identifier.getPriority();
if (priority != null) {
builder.setPriority(priority.getProto());
}
builder.setCreationTime(identifier.getCreationTime());
builder.setMessage(message);
LogAggregationContextPBImpl logAggregationContext =
(LogAggregationContextPBImpl)identifier.getLogAggregationContext();
if (logAggregationContext != null) {
builder.setLogAggregationContext(logAggregationContext.getProto());
}
proto = builder.build();
}
public ContainerId getContainerID() {
return new ContainerIdPBImpl(proto.getContainerId());
}
public String getApplicationSubmitter() {
return proto.getAppSubmitter();
}
public String getNmHostAddress() {
return proto.getNmHostAddr();
}
public Resource getResource() {
return new ResourcePBImpl(proto.getResource());
}
public long getExpiryTimeStamp() {
return proto.getExpiryTimeStamp();
}
public int getMasterKeyId() {
return proto.getMasterKeyId();
}
public Priority getPriority() {
return new PriorityPBImpl(proto.getPriority());
}
public long getCreationTime() {
return proto.getCreationTime();
}
/**
* Get the RMIdentifier of RM in which containers are allocated
* @return RMIdentifier
*/
public long getRMIdentifier() {
return proto.getRmIdentifier();
}
@Override
public void readFields(DataInput in) throws IOException {
DataInputStream dis = (DataInputStream)in;
byte[] buffer = IOUtils.toByteArray(dis);
proto = ContainerTokenIdentifierForTestProto.parseFrom(buffer);
}
@Override
public void write(DataOutput out) throws IOException {
LOG.debug("Writing ContainerTokenIdentifierForTest to RPC layer: " + this);
out.write(proto.toByteArray());
}
ContainerTokenIdentifierForTestProto getNewProto() {
return this.proto;
}
@Override
public int hashCode() {
return this.proto.hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getNewProto().equals(this.getClass().cast(other).getNewProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(this.proto);
}
}

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTestTokenProtos.NMTokenIdentifierNewProto;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import com.google.protobuf.TextFormat;
public class NMTokenIdentifierNewForTest extends NMTokenIdentifier {
private static Log LOG = LogFactory.getLog(NMTokenIdentifierNewForTest.class);
public static final Text KIND = new Text("NMToken");
private NMTokenIdentifierNewProto proto;
private NMTokenIdentifierNewProto.Builder builder;
public NMTokenIdentifierNewForTest(){
builder = NMTokenIdentifierNewProto.newBuilder();
}
public NMTokenIdentifierNewForTest(NMTokenIdentifierNewProto proto) {
this.proto = proto;
}
public NMTokenIdentifierNewForTest(NMTokenIdentifier tokenIdentifier,
String message) {
builder = NMTokenIdentifierNewProto.newBuilder();
builder.setAppAttemptId(tokenIdentifier.getProto().getAppAttemptId());
builder.setNodeId(tokenIdentifier.getProto().getNodeId());
builder.setAppSubmitter(tokenIdentifier.getApplicationSubmitter());
builder.setKeyId(tokenIdentifier.getKeyId());
builder.setMessage(message);
proto = builder.build();
builder = null;
}
@Override
public void write(DataOutput out) throws IOException {
LOG.debug("Writing NMTokenIdentifierNewForTest to RPC layer: " + this);
out.write(proto.toByteArray());
}
@Override
public void readFields(DataInput in) throws IOException {
DataInputStream dis = (DataInputStream)in;
byte[] buffer = IOUtils.toByteArray(dis);
proto = NMTokenIdentifierNewProto.parseFrom(buffer);
}
@Override
public Text getKind() {
return KIND;
}
@Override
public UserGroupInformation getUser() {
return null;
}
public String getMessage() {
return proto.getMessage();
}
public void setMessage(String message) {
builder.setMessage(message);
}
public NMTokenIdentifierNewProto getNewProto() {
return proto;
}
public void build() {
proto = builder.build();
builder = null;
}
public ApplicationAttemptId getApplicationAttemptId() {
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
}
public NodeId getNodeId() {
return new NodeIdPBImpl(proto.getNodeId());
}
public String getApplicationSubmitter() {
return proto.getAppSubmitter();
}
public int getKeyId() {
return proto.getKeyId();
}
@Override
public int hashCode() {
return this.proto.hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getNewProto().equals(this.getClass().cast(other).getNewProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(this.proto);
}
}

View File

@ -34,6 +34,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.minikdc.KerberosSecurityTestcase; import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -62,12 +63,14 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -302,6 +305,56 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
Assert.assertTrue(nmTokenSecretManagerNM Assert.assertTrue(nmTokenSecretManagerNM
.isAppAttemptNMTokenKeyPresent(validAppAttemptId)); .isAppAttemptNMTokenKeyPresent(validAppAttemptId));
// using a new compatible version nmtoken, expect container can be started
// successfully.
ApplicationAttemptId validAppAttemptId2 =
ApplicationAttemptId.newInstance(appId, 2);
ContainerId validContainerId2 =
ContainerId.newInstance(validAppAttemptId2, 0);
org.apache.hadoop.yarn.api.records.Token validContainerToken2 =
containerTokenSecretManager.createContainerToken(validContainerId2,
validNode, user, r, Priority.newInstance(0), 0);
org.apache.hadoop.yarn.api.records.Token validNMToken2 =
nmTokenSecretManagerRM.createNMToken(validAppAttemptId2, validNode, user);
// First, get a new NMTokenIdentifier.
NMTokenIdentifier newIdentifier = new NMTokenIdentifier();
byte[] tokenIdentifierContent = validNMToken2.getIdentifier().array();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
newIdentifier.readFields(dib);
// Then, generate a new version NMTokenIdentifier (NMTokenIdentifierNewForTest)
// with additional field of message.
NMTokenIdentifierNewForTest newVersionIdentifier =
new NMTokenIdentifierNewForTest(newIdentifier, "message");
// check new version NMTokenIdentifier has correct info.
Assert.assertEquals("The ApplicationAttemptId is changed after set to " +
"newVersionIdentifier", validAppAttemptId2.getAttemptId(),
newVersionIdentifier.getApplicationAttemptId().getAttemptId()
);
Assert.assertEquals("The message is changed after set to newVersionIdentifier",
"message", newVersionIdentifier.getMessage());
Assert.assertEquals("The NodeId is changed after set to newVersionIdentifier",
validNode, newVersionIdentifier.getNodeId());
// create new Token based on new version NMTokenIdentifier.
org.apache.hadoop.yarn.api.records.Token newVersionedNMToken =
BaseNMTokenSecretManager.newInstance(
nmTokenSecretManagerRM.retrievePassword(newVersionIdentifier),
newVersionIdentifier);
// Verify startContainer is successful and no exception is thrown.
Assert.assertTrue(testStartContainer(rpc, validAppAttemptId2, validNode,
validContainerToken2, newVersionedNMToken, false).isEmpty());
Assert.assertTrue(nmTokenSecretManagerNM
.isAppAttemptNMTokenKeyPresent(validAppAttemptId2));
//Now lets wait till container finishes and is removed from node manager. //Now lets wait till container finishes and is removed from node manager.
waitForContainerToFinishOnNM(validContainerId); waitForContainerToFinishOnNM(validContainerId);
sb = new StringBuilder("Attempt to relaunch the same container with id "); sb = new StringBuilder("Attempt to relaunch the same container with id ");
@ -607,11 +660,36 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(), Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
nmTokenSecretManagerInRM.getCurrentKey().getKeyId()); nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
// Creating a tampered Container Token
RMContainerTokenSecretManager containerTokenSecretManager = RMContainerTokenSecretManager containerTokenSecretManager =
yarnCluster.getResourceManager().getRMContext(). yarnCluster.getResourceManager().getRMContext().
getContainerTokenSecretManager(); getContainerTokenSecretManager();
Resource r = Resource.newInstance(1230, 2);
Token containerToken =
containerTokenSecretManager.createContainerToken(
cId, nodeId, user, r, Priority.newInstance(0), 0);
ContainerTokenIdentifier containerTokenIdentifier =
getContainerTokenIdentifierFromToken(containerToken);
// Verify new compatible version ContainerTokenIdentifier can work successfully.
ContainerTokenIdentifierForTest newVersionTokenIdentifier =
new ContainerTokenIdentifierForTest(containerTokenIdentifier, "message");
byte[] password =
containerTokenSecretManager.createPassword(newVersionTokenIdentifier);
Token newContainerToken = BuilderUtils.newContainerToken(
nodeId, password, newVersionTokenIdentifier);
Token nmToken =
nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
YarnRPC rpc = YarnRPC.create(conf);
Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
newContainerToken, nmToken, false).isEmpty());
// Creating a tampered Container Token
RMContainerTokenSecretManager tamperedContainerTokenSecretManager = RMContainerTokenSecretManager tamperedContainerTokenSecretManager =
new RMContainerTokenSecretManager(conf); new RMContainerTokenSecretManager(conf);
tamperedContainerTokenSecretManager.rollMasterKey(); tamperedContainerTokenSecretManager.rollMasterKey();
@ -621,19 +699,28 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
} while (containerTokenSecretManager.getCurrentKey().getKeyId() } while (containerTokenSecretManager.getCurrentKey().getKeyId()
== tamperedContainerTokenSecretManager.getCurrentKey().getKeyId()); == tamperedContainerTokenSecretManager.getCurrentKey().getKeyId());
Resource r = Resource.newInstance(1230, 2); ContainerId cId2 = ContainerId.newInstance(appAttemptId, 1);
// Creating modified containerToken // Creating modified containerToken
Token containerToken = Token containerToken2 =
tamperedContainerTokenSecretManager.createContainerToken(cId, nodeId, tamperedContainerTokenSecretManager.createContainerToken(cId2, nodeId,
user, r, Priority.newInstance(0), 0); user, r, Priority.newInstance(0), 0);
Token nmToken =
nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
YarnRPC rpc = YarnRPC.create(conf);
StringBuilder sb = new StringBuilder("Given Container "); StringBuilder sb = new StringBuilder("Given Container ");
sb.append(cId); sb.append(cId2);
sb.append(" seems to have an illegally generated token."); sb.append(" seems to have an illegally generated token.");
Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId, Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
containerToken, nmToken, true).contains(sb.toString())); containerToken2, nmToken, true).contains(sb.toString()));
}
private ContainerTokenIdentifier getContainerTokenIdentifierFromToken(
Token containerToken) throws IOException {
ContainerTokenIdentifier containerTokenIdentifier;
containerTokenIdentifier = new ContainerTokenIdentifier();
byte[] tokenIdentifierContent = containerToken.getIdentifier().array();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
containerTokenIdentifier.readFields(dib);
return containerTokenIdentifier;
} }
/** /**
@ -676,12 +763,15 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
Token containerToken = Token containerToken =
containerTokenSecretManager.createContainerToken(cId, nodeId, user, r, containerTokenSecretManager.createContainerToken(cId, nodeId, user, r,
Priority.newInstance(0), 0); Priority.newInstance(0), 0);
ByteArrayDataInput input = ByteStreams.newDataInput(
containerToken.getIdentifier().array());
ContainerTokenIdentifier containerTokenIdentifier = ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(); new ContainerTokenIdentifier();
containerTokenIdentifier.readFields(input); byte[] tokenIdentifierContent = containerToken.getIdentifier().array();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
containerTokenIdentifier.readFields(dib);
Assert.assertEquals(cId, containerTokenIdentifier.getContainerID()); Assert.assertEquals(cId, containerTokenIdentifier.getContainerID());
Assert.assertEquals( Assert.assertEquals(
cId.toString(), containerTokenIdentifier.getContainerID().toString()); cId.toString(), containerTokenIdentifier.getContainerID().toString());

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "YarnSecurityTestTokenProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_protos.proto";
message NMTokenIdentifierNewProto {
optional ApplicationAttemptIdProto appAttemptId = 1;
optional NodeIdProto nodeId = 2;
optional string appSubmitter = 3;
optional int32 keyId = 4;
optional string message = 5;
}
message ContainerTokenIdentifierForTestProto {
optional ContainerIdProto containerId = 1;
optional string nmHostAddr = 2;
optional string appSubmitter = 3;
optional ResourceProto resource = 4;
optional int64 expiryTimeStamp = 5;
optional int32 masterKeyId = 6;
optional int64 rmIdentifier = 7;
optional PriorityProto priority = 8;
optional int64 creationTime = 9;
optional LogAggregationContextProto logAggregationContext = 10;
optional string message = 11;
}