Merge branch 'trunk' into HDFS-6581

This commit is contained in:
arp 2014-09-27 20:28:03 -07:00
commit 9cbb2c55d5
25 changed files with 1218 additions and 168 deletions

View File

@ -882,6 +882,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-10552. Fix usage and example at FileSystemShell.apt.vm (Kenji HADOOP-10552. Fix usage and example at FileSystemShell.apt.vm (Kenji
Kikushima via aw) Kikushima via aw)
HADOOP-11143 NetUtils.wrapException loses inner stack trace on BindException
(stevel)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -716,7 +716,7 @@ public static IOException wrapException(final String destHost,
final int localPort, final int localPort,
final IOException exception) { final IOException exception) {
if (exception instanceof BindException) { if (exception instanceof BindException) {
return new BindException( return wrapWithMessage(exception,
"Problem binding to [" "Problem binding to ["
+ localHost + localHost
+ ":" + ":"

View File

@ -537,6 +537,8 @@ Deprecated Properties
|user.name | mapreduce.job.user.name |user.name | mapreduce.job.user.name
*---+---+ *---+---+
|webinterface.private.actions | mapreduce.jobtracker.webinterface.trusted |webinterface.private.actions | mapreduce.jobtracker.webinterface.trusted
*---+---+
|yarn.app.mapreduce.yarn.app.mapreduce.client-am.ipc.max-retries-on-timeouts | yarn.app.mapreduce.client-am.ipc.max-retries-on-timeouts
*---+---+ *---+---+
The following table lists additional changes to some configuration properties: The following table lists additional changes to some configuration properties:

View File

@ -390,6 +390,10 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5945. Update the description of GenericOptionsParser -jt MAPREDUCE-5945. Update the description of GenericOptionsParser -jt
option (Akira AJISAKA via aw) option (Akira AJISAKA via aw)
MAPREDUCE-6087. Fixed wrong config name of
MRJobConfig#MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS. Contributed by
Akira AJISAKA. (Akira AJISAKA via jianhe)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -392,7 +392,7 @@ public interface MRJobConfig {
* reconnecting to the RM to fetch Application Status. * reconnecting to the RM to fetch Application Status.
*/ */
public static final String MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS = public static final String MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS =
MR_PREFIX + "yarn.app.mapreduce.client-am.ipc.max-retries-on-timeouts"; MR_PREFIX + "client-am.ipc.max-retries-on-timeouts";
public static final int public static final int
DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS = 3; DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS = 3;

View File

@ -180,6 +180,8 @@ private static void addDeprecatedKeys() {
TTConfig.TT_LOCAL_CACHE_SIZE), TTConfig.TT_LOCAL_CACHE_SIZE),
new DeprecationDelta("tasktracker.contention.tracking", new DeprecationDelta("tasktracker.contention.tracking",
TTConfig.TT_CONTENTION_TRACKING), TTConfig.TT_CONTENTION_TRACKING),
new DeprecationDelta("yarn.app.mapreduce.yarn.app.mapreduce.client-am.ipc.max-retries-on-timeouts",
MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS),
new DeprecationDelta("job.end.notification.url", new DeprecationDelta("job.end.notification.url",
MRJobConfig.MR_JOB_END_NOTIFICATION_URL), MRJobConfig.MR_JOB_END_NOTIFICATION_URL),
new DeprecationDelta("job.end.retry.attempts", new DeprecationDelta("job.end.retry.attempts",

View File

@ -262,6 +262,9 @@ Release 2.6.0 - UNRELEASED
YARN-2372. There are Chinese Characters in the FairScheduler's document YARN-2372. There are Chinese Characters in the FairScheduler's document
(Fengdong Yu via aw) (Fengdong Yu via aw)
YARN-668. Changed NMTokenIdentifier/AMRMTokenIdentifier/ContainerTokenIdentifier
to use protobuf object as the payload. (Junping Du via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -131,4 +131,39 @@
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions>
<execution>
<id>compile-protoc</id>
<phase>generate-sources</phase>
<goals>
<goal>protoc</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<protocCommand>${protoc.path}</protocCommand>
<imports>
<param>${basedir}/src/test/proto</param>
<param>${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto</param>
<param>${basedir}/../hadoop-yarn-api/src/main/proto</param>
</imports>
<source>
<directory>${basedir}/src/test/proto</directory>
<includes>
<include>test_amrm_token.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTestAMRMTokenProtos.AMRMTokenIdentifierForTestProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import com.google.protobuf.TextFormat;
public class AMRMTokenIdentifierForTest extends AMRMTokenIdentifier {
private static Log LOG = LogFactory.getLog(AMRMTokenIdentifierForTest.class);
public static final Text KIND = new Text("YARN_AM_RM_TOKEN");
private AMRMTokenIdentifierForTestProto proto;
private AMRMTokenIdentifierForTestProto.Builder builder;
public AMRMTokenIdentifierForTest(){
builder = AMRMTokenIdentifierForTestProto.newBuilder();
}
public AMRMTokenIdentifierForTest(AMRMTokenIdentifierForTestProto proto) {
this.proto = proto;
}
public AMRMTokenIdentifierForTest(AMRMTokenIdentifier tokenIdentifier,
String message) {
builder = AMRMTokenIdentifierForTestProto.newBuilder();
builder.setAppAttemptId(tokenIdentifier.getProto().getAppAttemptId());
builder.setKeyId(tokenIdentifier.getKeyId());
builder.setMessage(message);
proto = builder.build();
builder = null;
}
@Override
public void write(DataOutput out) throws IOException {
out.write(proto.toByteArray());
}
@Override
public void readFields(DataInput in) throws IOException {
DataInputStream dis = (DataInputStream)in;
byte[] buffer = IOUtils.toByteArray(dis);
proto = AMRMTokenIdentifierForTestProto.parseFrom(buffer);
}
@Override
public Text getKind() {
return KIND;
}
public String getMessage() {
return proto.getMessage();
}
public void setMessage(String message) {
builder.setMessage(message);
}
public void build() {
proto = builder.build();
builder = null;
}
public ApplicationAttemptId getApplicationAttemptId() {
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
}
public int getKeyId() {
return proto.getKeyId();
}
public AMRMTokenIdentifierForTestProto getNewProto(){
return this.proto;
}
@Override
public int hashCode() {
return this.proto.hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getNewProto().equals(this.getClass().cast(other).getNewProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(this.proto);
}
}

View File

@ -41,6 +41,7 @@
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -909,6 +910,36 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException,
// can do the allocate call with latest AMRMToken // can do the allocate call with latest AMRMToken
amClient.allocate(0.1f); amClient.allocate(0.1f);
// Verify latest AMRMToken can be used to send allocation request.
UserGroupInformation testUser1 =
UserGroupInformation.createRemoteUser("testUser1");
AMRMTokenIdentifierForTest newVersionTokenIdentifier =
new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message");
Assert.assertEquals("Message is changed after set to newVersionTokenIdentifier",
"message", newVersionTokenIdentifier.getMessage());
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newVersionToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> (
newVersionTokenIdentifier.getBytes(),
amrmTokenSecretManager.retrievePassword(newVersionTokenIdentifier),
newVersionTokenIdentifier.getKind(), new Text());
SecurityUtil.setTokenService(newVersionToken, yarnCluster
.getResourceManager().getApplicationMasterService().getBindAddress());
testUser1.addToken(newVersionToken);
testUser1.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
@Override
public ApplicationMasterProtocol run() {
return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy(
ApplicationMasterProtocol.class,
yarnCluster.getResourceManager().getApplicationMasterService()
.getBindAddress(), conf);
}
}).allocate(Records.newRecord(AllocateRequest.class));
// Make sure previous token has been rolled-over // Make sure previous token has been rolled-over
// and can not use this rolled-over token to make a allocate all. // and can not use this rolled-over token to make a allocate all.
while (true) { while (true) {
@ -931,12 +962,12 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException,
} }
try { try {
UserGroupInformation testUser = UserGroupInformation testUser2 =
UserGroupInformation.createRemoteUser("testUser"); UserGroupInformation.createRemoteUser("testUser2");
SecurityUtil.setTokenService(amrmToken_2, yarnCluster SecurityUtil.setTokenService(amrmToken_2, yarnCluster
.getResourceManager().getApplicationMasterService().getBindAddress()); .getResourceManager().getApplicationMasterService().getBindAddress());
testUser.addToken(amrmToken_2); testUser2.addToken(amrmToken_2);
testUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() { testUser2.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
@Override @Override
public ApplicationMasterProtocol run() { public ApplicationMasterProtocol run() {
return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy( return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy(

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "YarnSecurityTestAMRMTokenProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_protos.proto";
message AMRMTokenIdentifierForTestProto {
optional ApplicationAttemptIdProto appAttemptId = 1;
optional int32 keyId = 2;
optional string message = 3;
}

View File

@ -232,6 +232,30 @@
</source> </source>
</configuration> </configuration>
</execution> </execution>
<execution>
<id>compile-protoc</id>
<phase>generate-sources</phase>
<goals>
<goal>protoc</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<protocCommand>${protoc.path}</protocCommand>
<imports>
<param>${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto</param>
<param>${basedir}/../hadoop-yarn-api/src/main/proto</param>
<param>${basedir}/src/main/proto</param>
</imports>
<source>
<directory>${basedir}/src/main/proto/server</directory>
<includes>
<include>yarn_security_token.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
</configuration>
</execution>
</executions> </executions>
</plugin> </plugin>
<plugin> <plugin>

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.security; package org.apache.hadoop.yarn.security;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -32,6 +34,10 @@
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.AMRMTokenIdentifierProto;
import com.google.protobuf.TextFormat;
/** /**
* AMRMTokenIdentifier is the TokenIdentifier to be used by * AMRMTokenIdentifier is the TokenIdentifier to be used by
@ -42,49 +48,41 @@
public class AMRMTokenIdentifier extends TokenIdentifier { public class AMRMTokenIdentifier extends TokenIdentifier {
public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN"); public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN");
private AMRMTokenIdentifierProto proto;
private ApplicationAttemptId applicationAttemptId;
private int keyId = Integer.MIN_VALUE;
public AMRMTokenIdentifier() { public AMRMTokenIdentifier() {
} }
public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId) {
this();
this.applicationAttemptId = appAttemptId;
}
public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId, public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId,
int masterKeyId) { int masterKeyId) {
this(); AMRMTokenIdentifierProto.Builder builder =
this.applicationAttemptId = appAttemptId; AMRMTokenIdentifierProto.newBuilder();
this.keyId = masterKeyId; if (appAttemptId != null) {
builder.setAppAttemptId(
((ApplicationAttemptIdPBImpl)appAttemptId).getProto());
}
builder.setKeyId(masterKeyId);
proto = builder.build();
} }
@Private @Private
public ApplicationAttemptId getApplicationAttemptId() { public ApplicationAttemptId getApplicationAttemptId() {
return this.applicationAttemptId; if (!proto.hasAppAttemptId()) {
return null;
}
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
ApplicationId appId = this.applicationAttemptId.getApplicationId(); out.write(proto.toByteArray());
out.writeLong(appId.getClusterTimestamp());
out.writeInt(appId.getId());
out.writeInt(this.applicationAttemptId.getAttemptId());
out.writeInt(this.keyId);
} }
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
long clusterTimeStamp = in.readLong(); DataInputStream dis = (DataInputStream)in;
int appId = in.readInt(); byte[] buffer = IOUtils.toByteArray(dis);
int attemptId = in.readInt(); proto = AMRMTokenIdentifierProto.parseFrom(buffer);
ApplicationId applicationId =
ApplicationId.newInstance(clusterTimeStamp, appId);
this.applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, attemptId);
this.keyId = in.readInt();
} }
@Override @Override
@ -94,16 +92,20 @@ public Text getKind() {
@Override @Override
public UserGroupInformation getUser() { public UserGroupInformation getUser() {
if (this.applicationAttemptId == null String appAttemptId = null;
|| "".equals(this.applicationAttemptId.toString())) { if (proto.hasAppAttemptId()) {
return null; appAttemptId =
new ApplicationAttemptIdPBImpl(proto.getAppAttemptId()).toString();
} }
return UserGroupInformation.createRemoteUser(this.applicationAttemptId return UserGroupInformation.createRemoteUser(appAttemptId);
.toString());
} }
public int getKeyId() { public int getKeyId() {
return this.keyId; return proto.getKeyId();
}
public AMRMTokenIdentifierProto getProto() {
return this.proto;
} }
// TODO: Needed? // TODO: Needed?
@ -114,4 +116,24 @@ protected Text getKind() {
return KIND_NAME; return KIND_NAME;
} }
} }
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
} }

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.security; package org.apache.hadoop.yarn.security;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -38,7 +40,14 @@
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
import com.google.protobuf.TextFormat;
/** /**
* TokenIdentifier for a container. Encodes {@link ContainerId}, * TokenIdentifier for a container. Encodes {@link ContainerId},
@ -53,15 +62,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
public static final Text KIND = new Text("ContainerToken"); public static final Text KIND = new Text("ContainerToken");
private ContainerId containerId; private ContainerTokenIdentifierProto proto;
private String nmHostAddr;
private String appSubmitter;
private Resource resource;
private long expiryTimeStamp;
private int masterKeyId;
private long rmIdentifier;
private Priority priority;
private long creationTime;
private LogAggregationContext logAggregationContext; private LogAggregationContext logAggregationContext;
public ContainerTokenIdentifier(ContainerId containerID, public ContainerTokenIdentifier(ContainerId containerID,
@ -75,16 +76,29 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName,
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
long rmIdentifier, Priority priority, long creationTime, long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext) { LogAggregationContext logAggregationContext) {
this.containerId = containerID; ContainerTokenIdentifierProto.Builder builder =
this.nmHostAddr = hostName; ContainerTokenIdentifierProto.newBuilder();
this.appSubmitter = appSubmitter; if (containerID != null) {
this.resource = r; builder.setContainerId(((ContainerIdPBImpl)containerID).getProto());
this.expiryTimeStamp = expiryTimeStamp; }
this.masterKeyId = masterKeyId; builder.setNmHostAddr(hostName);
this.rmIdentifier = rmIdentifier; builder.setAppSubmitter(appSubmitter);
this.priority = priority; if (r != null) {
this.creationTime = creationTime; builder.setResource(((ResourcePBImpl)r).getProto());
this.logAggregationContext = logAggregationContext; }
builder.setExpiryTimeStamp(expiryTimeStamp);
builder.setMasterKeyId(masterKeyId);
builder.setRmIdentifier(rmIdentifier);
if (priority != null) {
builder.setPriority(((PriorityPBImpl)priority).getProto());
}
builder.setCreationTime(creationTime);
if (logAggregationContext != null) {
builder.setLogAggregationContext(
((LogAggregationContextPBImpl)logAggregationContext).getProto());
}
proto = builder.build();
} }
/** /**
@ -94,104 +108,75 @@ public ContainerTokenIdentifier() {
} }
public ContainerId getContainerID() { public ContainerId getContainerID() {
return this.containerId; if (!proto.hasContainerId()) {
return null;
}
return new ContainerIdPBImpl(proto.getContainerId());
} }
public String getApplicationSubmitter() { public String getApplicationSubmitter() {
return this.appSubmitter; return proto.getAppSubmitter();
} }
public String getNmHostAddress() { public String getNmHostAddress() {
return this.nmHostAddr; return proto.getNmHostAddr();
} }
public Resource getResource() { public Resource getResource() {
return this.resource; if (!proto.hasResource()) {
return null;
}
return new ResourcePBImpl(proto.getResource());
} }
public long getExpiryTimeStamp() { public long getExpiryTimeStamp() {
return this.expiryTimeStamp; return proto.getExpiryTimeStamp();
} }
public int getMasterKeyId() { public int getMasterKeyId() {
return this.masterKeyId; return proto.getMasterKeyId();
} }
public Priority getPriority() { public Priority getPriority() {
return this.priority; if (!proto.hasPriority()) {
return null;
}
return new PriorityPBImpl(proto.getPriority());
} }
public long getCreationTime() { public long getCreationTime() {
return this.creationTime; return proto.getCreationTime();
} }
/** /**
* Get the RMIdentifier of RM in which containers are allocated * Get the RMIdentifier of RM in which containers are allocated
* @return RMIdentifier * @return RMIdentifier
*/ */
public long getRMIdentifer() { public long getRMIdentifier() {
return this.rmIdentifier; return proto.getRmIdentifier();
}
public ContainerTokenIdentifierProto getProto() {
return proto;
} }
public LogAggregationContext getLogAggregationContext() { public LogAggregationContext getLogAggregationContext() {
return this.logAggregationContext; if (!proto.hasLogAggregationContext()) {
return null;
}
return new LogAggregationContextPBImpl(proto.getLogAggregationContext());
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this); LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
ApplicationAttemptId applicationAttemptId = this.containerId out.write(proto.toByteArray());
.getApplicationAttemptId();
ApplicationId applicationId = applicationAttemptId.getApplicationId();
out.writeLong(applicationId.getClusterTimestamp());
out.writeInt(applicationId.getId());
out.writeInt(applicationAttemptId.getAttemptId());
out.writeLong(this.containerId.getContainerId());
out.writeUTF(this.nmHostAddr);
out.writeUTF(this.appSubmitter);
out.writeInt(this.resource.getMemory());
out.writeInt(this.resource.getVirtualCores());
out.writeLong(this.expiryTimeStamp);
out.writeInt(this.masterKeyId);
out.writeLong(this.rmIdentifier);
out.writeInt(this.priority.getPriority());
out.writeLong(this.creationTime);
if (this.logAggregationContext == null) {
out.writeInt(-1);
} else {
byte[] logAggregationContext =
((LogAggregationContextPBImpl) this.logAggregationContext).getProto()
.toByteArray();
out.writeInt(logAggregationContext.length);
out.write(logAggregationContext);
}
} }
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
ApplicationId applicationId = DataInputStream dis = (DataInputStream)in;
ApplicationId.newInstance(in.readLong(), in.readInt()); byte[] buffer = IOUtils.toByteArray(dis);
ApplicationAttemptId applicationAttemptId = proto = ContainerTokenIdentifierProto.parseFrom(buffer);
ApplicationAttemptId.newInstance(applicationId, in.readInt());
this.containerId =
ContainerId.newInstance(applicationAttemptId, in.readLong());
this.nmHostAddr = in.readUTF();
this.appSubmitter = in.readUTF();
int memory = in.readInt();
int vCores = in.readInt();
this.resource = Resource.newInstance(memory, vCores);
this.expiryTimeStamp = in.readLong();
this.masterKeyId = in.readInt();
this.rmIdentifier = in.readLong();
this.priority = Priority.newInstance(in.readInt());
this.creationTime = in.readLong();
int size = in.readInt();
if (size != -1) {
byte[] bytes = new byte[size];
in.readFully(bytes);
this.logAggregationContext =
new LogAggregationContextPBImpl(
LogAggregationContextProto.parseFrom(bytes));
}
} }
@Override @Override
@ -201,7 +186,12 @@ public Text getKind() {
@Override @Override
public UserGroupInformation getUser() { public UserGroupInformation getUser() {
return UserGroupInformation.createRemoteUser(this.containerId.toString()); String containerId = null;
if (proto.hasContainerId()) {
containerId = new ContainerIdPBImpl(proto.getContainerId()).toString();
}
return UserGroupInformation.createRemoteUser(
containerId);
} }
// TODO: Needed? // TODO: Needed?
@ -212,4 +202,24 @@ protected Text getKind() {
return KIND; return KIND;
} }
} }
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
} }

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.yarn.security; package org.apache.hadoop.yarn.security;
import java.io.ByteArrayInputStream;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -32,6 +35,12 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.NMTokenIdentifierProto;
import com.google.protobuf.TextFormat;
@Public @Public
@Evolving @Evolving
@ -41,17 +50,21 @@ public class NMTokenIdentifier extends TokenIdentifier {
public static final Text KIND = new Text("NMToken"); public static final Text KIND = new Text("NMToken");
private ApplicationAttemptId appAttemptId; private NMTokenIdentifierProto proto;
private NodeId nodeId;
private String appSubmitter;
private int keyId;
public NMTokenIdentifier(ApplicationAttemptId appAttemptId, NodeId nodeId, public NMTokenIdentifier(ApplicationAttemptId appAttemptId,
String applicationSubmitter, int masterKeyId) { NodeId nodeId, String applicationSubmitter, int masterKeyId) {
this.appAttemptId = appAttemptId; NMTokenIdentifierProto.Builder builder = NMTokenIdentifierProto.newBuilder();
this.nodeId = nodeId; if (appAttemptId != null) {
this.appSubmitter = applicationSubmitter; builder.setAppAttemptId(
this.keyId = masterKeyId; ((ApplicationAttemptIdPBImpl)appAttemptId).getProto());
}
if (nodeId != null) {
builder.setNodeId(((NodeIdPBImpl)nodeId).getProto());
}
builder.setAppSubmitter(applicationSubmitter);
builder.setKeyId(masterKeyId);
proto = builder.build();
} }
/** /**
@ -61,43 +74,38 @@ public NMTokenIdentifier() {
} }
public ApplicationAttemptId getApplicationAttemptId() { public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptId; if (!proto.hasAppAttemptId()) {
return null;
}
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
} }
public NodeId getNodeId() { public NodeId getNodeId() {
return nodeId; if (!proto.hasNodeId()) {
return null;
}
return new NodeIdPBImpl(proto.getNodeId());
} }
public String getApplicationSubmitter() { public String getApplicationSubmitter() {
return appSubmitter; return proto.getAppSubmitter();
} }
public int getKeyId() { public int getKeyId() {
return keyId; return proto.getKeyId();
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
LOG.debug("Writing NMTokenIdentifier to RPC layer: " + this); LOG.debug("Writing NMTokenIdentifier to RPC layer: " + this);
ApplicationId applicationId = appAttemptId.getApplicationId(); out.write(proto.toByteArray());
out.writeLong(applicationId.getClusterTimestamp());
out.writeInt(applicationId.getId());
out.writeInt(appAttemptId.getAttemptId());
out.writeUTF(this.nodeId.toString());
out.writeUTF(this.appSubmitter);
out.writeInt(this.keyId);
} }
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
appAttemptId = DataInputStream dis = (DataInputStream)in;
ApplicationAttemptId.newInstance( byte[] buffer = IOUtils.toByteArray(dis);
ApplicationId.newInstance(in.readLong(), in.readInt()), proto = NMTokenIdentifierProto.parseFrom(buffer);
in.readInt());
String[] hostAddr = in.readUTF().split(":");
nodeId = NodeId.newInstance(hostAddr[0], Integer.parseInt(hostAddr[1]));
appSubmitter = in.readUTF();
keyId = in.readInt();
} }
@Override @Override
@ -107,6 +115,35 @@ public Text getKind() {
@Override @Override
public UserGroupInformation getUser() { public UserGroupInformation getUser() {
return UserGroupInformation.createRemoteUser(appAttemptId.toString()); String appAttemptId = null;
if (proto.hasAppAttemptId()) {
appAttemptId = new ApplicationAttemptIdPBImpl(
proto.getAppAttemptId()).toString();
}
return UserGroupInformation.createRemoteUser(appAttemptId);
}
public NMTokenIdentifierProto getProto() {
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
} }
} }

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "YarnSecurityTokenProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_protos.proto";
// None of the following records are supposed to be exposed to users.
message NMTokenIdentifierProto {
optional ApplicationAttemptIdProto appAttemptId = 1;
optional NodeIdProto nodeId = 2;
optional string appSubmitter = 3;
optional int32 keyId = 4 [default = -1];
}
message AMRMTokenIdentifierProto {
optional ApplicationAttemptIdProto appAttemptId = 1;
optional int32 keyId = 2 [default = -1];
}
message ContainerTokenIdentifierProto {
optional ContainerIdProto containerId = 1;
optional string nmHostAddr = 2;
optional string appSubmitter = 3;
optional ResourceProto resource = 4;
optional int64 expiryTimeStamp =5;
optional int32 masterKeyId = 6 [default = -1];
optional int64 rmIdentifier = 7;
optional PriorityProto priority = 8;
optional int64 creationTime = 9;
optional LogAggregationContextProto logAggregationContext = 10;
}

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.security;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.junit.Assert;
import org.junit.Test;
public class TestYARNTokenIdentifier {
@Test
public void testNMTokenIdentifier() {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1, 1), 1);
NodeId nodeId = NodeId.newInstance("host0", 0);
String applicationSubmitter = "usr0";
int masterKeyId = 1;
NMTokenIdentifier token = new NMTokenIdentifier(
appAttemptId, nodeId, applicationSubmitter, masterKeyId);
NMTokenIdentifier anotherToken = new NMTokenIdentifier(
appAttemptId, nodeId, applicationSubmitter, masterKeyId);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
"and deserialization.", token, anotherToken);
// verify all properties are the same as original
Assert.assertEquals(
"appAttemptId from proto is not the same with original token",
anotherToken.getApplicationAttemptId(), appAttemptId);
Assert.assertEquals(
"NodeId from proto is not the same with original token",
anotherToken.getNodeId(), nodeId);
Assert.assertEquals(
"applicationSubmitter from proto is not the same with original token",
anotherToken.getApplicationSubmitter(), applicationSubmitter);
Assert.assertEquals(
"masterKeyId from proto is not the same with original token",
anotherToken.getKeyId(), masterKeyId);
}
@Test
public void testAMRMTokenIdentifier() {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1, 1), 1);
int masterKeyId = 1;
AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, masterKeyId);
AMRMTokenIdentifier anotherToken = new AMRMTokenIdentifier(
appAttemptId, masterKeyId);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
"and deserialization.", token, anotherToken);
Assert.assertEquals("ApplicationAttemptId from proto is not the same with original token",
anotherToken.getApplicationAttemptId(), appAttemptId);
Assert.assertEquals("masterKeyId from proto is not the same with original token",
anotherToken.getKeyId(), masterKeyId);
}
@Test
public void testContainerTokenIdentifier() {
ContainerId containerID = ContainerId.newInstance(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(
1, 1), 1), 1);
String hostName = "host0";
String appSubmitter = "usr0";
Resource r = Resource.newInstance(1024, 1);
long expiryTimeStamp = 1000;
int masterKeyId = 1;
long rmIdentifier = 1;
Priority priority = Priority.newInstance(1);
long creationTime = 1000;
ContainerTokenIdentifier token = new ContainerTokenIdentifier(
containerID, hostName, appSubmitter, r, expiryTimeStamp,
masterKeyId, rmIdentifier, priority, creationTime);
ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier(
containerID, hostName, appSubmitter, r, expiryTimeStamp,
masterKeyId, rmIdentifier, priority, creationTime);
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
"and deserialization.", token, anotherToken);
Assert.assertEquals(
"ContainerID from proto is not the same with original token",
anotherToken.getContainerID(), containerID);
Assert.assertEquals(
"Hostname from proto is not the same with original token",
anotherToken.getNmHostAddress(), hostName);
Assert.assertEquals(
"ApplicationSubmitter from proto is not the same with original token",
anotherToken.getApplicationSubmitter(), appSubmitter);
Assert.assertEquals(
"Resource from proto is not the same with original token",
anotherToken.getResource(), r);
Assert.assertEquals(
"expiryTimeStamp from proto is not the same with original token",
anotherToken.getExpiryTimeStamp(), expiryTimeStamp);
Assert.assertEquals(
"KeyId from proto is not the same with original token",
anotherToken.getMasterKeyId(), masterKeyId);
Assert.assertEquals(
"RMIdentifier from proto is not the same with original token",
anotherToken.getRMIdentifier(), rmIdentifier);
Assert.assertEquals(
"Priority from proto is not the same with original token",
anotherToken.getPriority(), priority);
Assert.assertEquals(
"CreationTime from proto is not the same with original token",
anotherToken.getCreationTime(), creationTime);
Assert.assertNull(anotherToken.getLogAggregationContext());
}
}

View File

@ -133,6 +133,7 @@
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -650,8 +651,8 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
boolean unauthorized = false; boolean unauthorized = false;
StringBuilder messageBuilder = StringBuilder messageBuilder =
new StringBuilder("Unauthorized request to start container. "); new StringBuilder("Unauthorized request to start container. ");
if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().equals( if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().
containerId.getApplicationAttemptId().getApplicationId())) { equals(containerId.getApplicationAttemptId().getApplicationId())) {
unauthorized = true; unauthorized = true;
messageBuilder.append("\nNMToken for application attempt : ") messageBuilder.append("\nNMToken for application attempt : ")
.append(nmTokenIdentifier.getApplicationAttemptId()) .append(nmTokenIdentifier.getApplicationAttemptId())
@ -784,7 +785,7 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
*/ */
authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier); authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier);
if (containerTokenIdentifier.getRMIdentifer() != nodeStatusUpdater if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
.getRMIdentifier()) { .getRMIdentifier()) {
// Is the container coming from unknown RM // Is the container coming from unknown RM
StringBuilder sb = new StringBuilder("\nContainer "); StringBuilder sb = new StringBuilder("\nContainer ");
@ -1035,6 +1036,7 @@ protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
*/ */
ApplicationId nmTokenAppId = ApplicationId nmTokenAppId =
identifier.getApplicationAttemptId().getApplicationId(); identifier.getApplicationAttemptId().getApplicationId();
if ((!nmTokenAppId.equals(containerId.getApplicationAttemptId().getApplicationId())) if ((!nmTokenAppId.equals(containerId.getApplicationAttemptId().getApplicationId()))
|| (container != null && !nmTokenAppId.equals(container || (container != null && !nmTokenAppId.equals(container
.getContainerId().getApplicationAttemptId().getApplicationId()))) { .getContainerId().getApplicationAttemptId().getApplicationId()))) {

View File

@ -528,7 +528,7 @@ public boolean matches(Object o) {
public boolean matches(Object o) { public boolean matches(Object o) {
ContainersLauncherEvent evt = (ContainersLauncherEvent) o; ContainersLauncherEvent evt = (ContainersLauncherEvent) o;
return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER
&& wcf.cId == evt.getContainer().getContainerId(); && wcf.cId.equals(evt.getContainer().getContainerId());
} }
}; };
verify(wc.launcherBus).handle(argThat(matchesLaunchReq)); verify(wc.launcherBus).handle(argThat(matchesLaunchReq));

View File

@ -99,7 +99,7 @@ public void testRMIdentifierOnContainerAllocation() throws Exception {
ContainerTokenIdentifier tokenId = ContainerTokenIdentifier tokenId =
BuilderUtils.newContainerTokenIdentifier(allocatedContainer BuilderUtils.newContainerTokenIdentifier(allocatedContainer
.getContainerToken()); .getContainerToken());
Assert.assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifer()); Assert.assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifier());
rm.stop(); rm.stop();
} }

View File

@ -113,6 +113,35 @@
<build> <build>
<plugins> <plugins>
<plugin>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions>
<execution>
<id>compile-protoc</id>
<phase>generate-sources</phase>
<goals>
<goal>protoc</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<protocCommand>${protoc.path}</protocCommand>
<imports>
<param>${basedir}/src/test/proto</param>
<param>${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto</param>
<param>${basedir}/../../hadoop-yarn-api/src/main/proto</param>
</imports>
<source>
<directory>${basedir}/src/test/proto</directory>
<includes>
<include>test_token.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
</configuration>
</execution>
</executions>
</plugin>
<plugin> <plugin>
<artifactId>maven-jar-plugin</artifactId> <artifactId>maven-jar-plugin</artifactId>
<executions> <executions>

View File

@ -0,0 +1,195 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTestTokenProtos.ContainerTokenIdentifierForTestProto;
import com.google.protobuf.TextFormat;
public class ContainerTokenIdentifierForTest extends ContainerTokenIdentifier {
private static Log LOG = LogFactory.getLog(ContainerTokenIdentifier.class);
public static final Text KIND = new Text("ContainerToken");
private ContainerTokenIdentifierForTestProto proto;
public ContainerTokenIdentifierForTest(ContainerId containerID,
String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
int masterKeyId, long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext) {
ContainerTokenIdentifierForTestProto.Builder builder =
ContainerTokenIdentifierForTestProto.newBuilder();
if (containerID != null) {
builder.setContainerId(((ContainerIdPBImpl)containerID).getProto());
}
builder.setNmHostAddr(hostName);
builder.setAppSubmitter(appSubmitter);
if (r != null) {
builder.setResource(((ResourcePBImpl)r).getProto());
}
builder.setExpiryTimeStamp(expiryTimeStamp);
builder.setMasterKeyId(masterKeyId);
builder.setRmIdentifier(rmIdentifier);
if (priority != null) {
builder.setPriority(((PriorityPBImpl)priority).getProto());
}
builder.setCreationTime(creationTime);
if (logAggregationContext != null) {
builder.setLogAggregationContext(
((LogAggregationContextPBImpl)logAggregationContext).getProto());
}
proto = builder.build();
}
public ContainerTokenIdentifierForTest(ContainerTokenIdentifier identifier,
String message) {
ContainerTokenIdentifierForTestProto.Builder builder =
ContainerTokenIdentifierForTestProto.newBuilder();
ContainerIdPBImpl containerID =
(ContainerIdPBImpl)identifier.getContainerID();
if (containerID != null) {
builder.setContainerId(containerID.getProto());
}
builder.setNmHostAddr(identifier.getNmHostAddress());
builder.setAppSubmitter(identifier.getApplicationSubmitter());
ResourcePBImpl resource = (ResourcePBImpl)identifier.getResource();
if (resource != null) {
builder.setResource(resource.getProto());
}
builder.setExpiryTimeStamp(identifier.getExpiryTimeStamp());
builder.setMasterKeyId(identifier.getMasterKeyId());
builder.setRmIdentifier(identifier.getRMIdentifier());
PriorityPBImpl priority = (PriorityPBImpl)identifier.getPriority();
if (priority != null) {
builder.setPriority(priority.getProto());
}
builder.setCreationTime(identifier.getCreationTime());
builder.setMessage(message);
LogAggregationContextPBImpl logAggregationContext =
(LogAggregationContextPBImpl)identifier.getLogAggregationContext();
if (logAggregationContext != null) {
builder.setLogAggregationContext(logAggregationContext.getProto());
}
proto = builder.build();
}
public ContainerId getContainerID() {
return new ContainerIdPBImpl(proto.getContainerId());
}
public String getApplicationSubmitter() {
return proto.getAppSubmitter();
}
public String getNmHostAddress() {
return proto.getNmHostAddr();
}
public Resource getResource() {
return new ResourcePBImpl(proto.getResource());
}
public long getExpiryTimeStamp() {
return proto.getExpiryTimeStamp();
}
public int getMasterKeyId() {
return proto.getMasterKeyId();
}
public Priority getPriority() {
return new PriorityPBImpl(proto.getPriority());
}
public long getCreationTime() {
return proto.getCreationTime();
}
/**
* Get the RMIdentifier of RM in which containers are allocated
* @return RMIdentifier
*/
public long getRMIdentifier() {
return proto.getRmIdentifier();
}
@Override
public void readFields(DataInput in) throws IOException {
DataInputStream dis = (DataInputStream)in;
byte[] buffer = IOUtils.toByteArray(dis);
proto = ContainerTokenIdentifierForTestProto.parseFrom(buffer);
}
@Override
public void write(DataOutput out) throws IOException {
LOG.debug("Writing ContainerTokenIdentifierForTest to RPC layer: " + this);
out.write(proto.toByteArray());
}
ContainerTokenIdentifierForTestProto getNewProto() {
return this.proto;
}
@Override
public int hashCode() {
return this.proto.hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getNewProto().equals(this.getClass().cast(other).getNewProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(this.proto);
}
}

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTestTokenProtos.NMTokenIdentifierNewProto;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import com.google.protobuf.TextFormat;
public class NMTokenIdentifierNewForTest extends NMTokenIdentifier {
private static Log LOG = LogFactory.getLog(NMTokenIdentifierNewForTest.class);
public static final Text KIND = new Text("NMToken");
private NMTokenIdentifierNewProto proto;
private NMTokenIdentifierNewProto.Builder builder;
public NMTokenIdentifierNewForTest(){
builder = NMTokenIdentifierNewProto.newBuilder();
}
public NMTokenIdentifierNewForTest(NMTokenIdentifierNewProto proto) {
this.proto = proto;
}
public NMTokenIdentifierNewForTest(NMTokenIdentifier tokenIdentifier,
String message) {
builder = NMTokenIdentifierNewProto.newBuilder();
builder.setAppAttemptId(tokenIdentifier.getProto().getAppAttemptId());
builder.setNodeId(tokenIdentifier.getProto().getNodeId());
builder.setAppSubmitter(tokenIdentifier.getApplicationSubmitter());
builder.setKeyId(tokenIdentifier.getKeyId());
builder.setMessage(message);
proto = builder.build();
builder = null;
}
@Override
public void write(DataOutput out) throws IOException {
LOG.debug("Writing NMTokenIdentifierNewForTest to RPC layer: " + this);
out.write(proto.toByteArray());
}
@Override
public void readFields(DataInput in) throws IOException {
DataInputStream dis = (DataInputStream)in;
byte[] buffer = IOUtils.toByteArray(dis);
proto = NMTokenIdentifierNewProto.parseFrom(buffer);
}
@Override
public Text getKind() {
return KIND;
}
@Override
public UserGroupInformation getUser() {
return null;
}
public String getMessage() {
return proto.getMessage();
}
public void setMessage(String message) {
builder.setMessage(message);
}
public NMTokenIdentifierNewProto getNewProto() {
return proto;
}
public void build() {
proto = builder.build();
builder = null;
}
public ApplicationAttemptId getApplicationAttemptId() {
return new ApplicationAttemptIdPBImpl(proto.getAppAttemptId());
}
public NodeId getNodeId() {
return new NodeIdPBImpl(proto.getNodeId());
}
public String getApplicationSubmitter() {
return proto.getAppSubmitter();
}
public int getKeyId() {
return proto.getKeyId();
}
@Override
public int hashCode() {
return this.proto.hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getNewProto().equals(this.getClass().cast(other).getNewProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(this.proto);
}
}

View File

@ -34,6 +34,7 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.minikdc.KerberosSecurityTestcase; import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -62,12 +63,14 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -302,6 +305,56 @@ private void testNMTokens(Configuration conf) throws Exception {
Assert.assertTrue(nmTokenSecretManagerNM Assert.assertTrue(nmTokenSecretManagerNM
.isAppAttemptNMTokenKeyPresent(validAppAttemptId)); .isAppAttemptNMTokenKeyPresent(validAppAttemptId));
// using a new compatible version nmtoken, expect container can be started
// successfully.
ApplicationAttemptId validAppAttemptId2 =
ApplicationAttemptId.newInstance(appId, 2);
ContainerId validContainerId2 =
ContainerId.newInstance(validAppAttemptId2, 0);
org.apache.hadoop.yarn.api.records.Token validContainerToken2 =
containerTokenSecretManager.createContainerToken(validContainerId2,
validNode, user, r, Priority.newInstance(0), 0);
org.apache.hadoop.yarn.api.records.Token validNMToken2 =
nmTokenSecretManagerRM.createNMToken(validAppAttemptId2, validNode, user);
// First, get a new NMTokenIdentifier.
NMTokenIdentifier newIdentifier = new NMTokenIdentifier();
byte[] tokenIdentifierContent = validNMToken2.getIdentifier().array();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
newIdentifier.readFields(dib);
// Then, generate a new version NMTokenIdentifier (NMTokenIdentifierNewForTest)
// with additional field of message.
NMTokenIdentifierNewForTest newVersionIdentifier =
new NMTokenIdentifierNewForTest(newIdentifier, "message");
// check new version NMTokenIdentifier has correct info.
Assert.assertEquals("The ApplicationAttemptId is changed after set to " +
"newVersionIdentifier", validAppAttemptId2.getAttemptId(),
newVersionIdentifier.getApplicationAttemptId().getAttemptId()
);
Assert.assertEquals("The message is changed after set to newVersionIdentifier",
"message", newVersionIdentifier.getMessage());
Assert.assertEquals("The NodeId is changed after set to newVersionIdentifier",
validNode, newVersionIdentifier.getNodeId());
// create new Token based on new version NMTokenIdentifier.
org.apache.hadoop.yarn.api.records.Token newVersionedNMToken =
BaseNMTokenSecretManager.newInstance(
nmTokenSecretManagerRM.retrievePassword(newVersionIdentifier),
newVersionIdentifier);
// Verify startContainer is successful and no exception is thrown.
Assert.assertTrue(testStartContainer(rpc, validAppAttemptId2, validNode,
validContainerToken2, newVersionedNMToken, false).isEmpty());
Assert.assertTrue(nmTokenSecretManagerNM
.isAppAttemptNMTokenKeyPresent(validAppAttemptId2));
//Now lets wait till container finishes and is removed from node manager. //Now lets wait till container finishes and is removed from node manager.
waitForContainerToFinishOnNM(validContainerId); waitForContainerToFinishOnNM(validContainerId);
sb = new StringBuilder("Attempt to relaunch the same container with id "); sb = new StringBuilder("Attempt to relaunch the same container with id ");
@ -607,11 +660,36 @@ private void testContainerToken(Configuration conf) throws IOException,
Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(), Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
nmTokenSecretManagerInRM.getCurrentKey().getKeyId()); nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
// Creating a tampered Container Token
RMContainerTokenSecretManager containerTokenSecretManager = RMContainerTokenSecretManager containerTokenSecretManager =
yarnCluster.getResourceManager().getRMContext(). yarnCluster.getResourceManager().getRMContext().
getContainerTokenSecretManager(); getContainerTokenSecretManager();
Resource r = Resource.newInstance(1230, 2);
Token containerToken =
containerTokenSecretManager.createContainerToken(
cId, nodeId, user, r, Priority.newInstance(0), 0);
ContainerTokenIdentifier containerTokenIdentifier =
getContainerTokenIdentifierFromToken(containerToken);
// Verify new compatible version ContainerTokenIdentifier can work successfully.
ContainerTokenIdentifierForTest newVersionTokenIdentifier =
new ContainerTokenIdentifierForTest(containerTokenIdentifier, "message");
byte[] password =
containerTokenSecretManager.createPassword(newVersionTokenIdentifier);
Token newContainerToken = BuilderUtils.newContainerToken(
nodeId, password, newVersionTokenIdentifier);
Token nmToken =
nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
YarnRPC rpc = YarnRPC.create(conf);
Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
newContainerToken, nmToken, false).isEmpty());
// Creating a tampered Container Token
RMContainerTokenSecretManager tamperedContainerTokenSecretManager = RMContainerTokenSecretManager tamperedContainerTokenSecretManager =
new RMContainerTokenSecretManager(conf); new RMContainerTokenSecretManager(conf);
tamperedContainerTokenSecretManager.rollMasterKey(); tamperedContainerTokenSecretManager.rollMasterKey();
@ -621,19 +699,28 @@ private void testContainerToken(Configuration conf) throws IOException,
} while (containerTokenSecretManager.getCurrentKey().getKeyId() } while (containerTokenSecretManager.getCurrentKey().getKeyId()
== tamperedContainerTokenSecretManager.getCurrentKey().getKeyId()); == tamperedContainerTokenSecretManager.getCurrentKey().getKeyId());
Resource r = Resource.newInstance(1230, 2); ContainerId cId2 = ContainerId.newInstance(appAttemptId, 1);
// Creating modified containerToken // Creating modified containerToken
Token containerToken = Token containerToken2 =
tamperedContainerTokenSecretManager.createContainerToken(cId, nodeId, tamperedContainerTokenSecretManager.createContainerToken(cId2, nodeId,
user, r, Priority.newInstance(0), 0); user, r, Priority.newInstance(0), 0);
Token nmToken =
nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
YarnRPC rpc = YarnRPC.create(conf);
StringBuilder sb = new StringBuilder("Given Container "); StringBuilder sb = new StringBuilder("Given Container ");
sb.append(cId); sb.append(cId2);
sb.append(" seems to have an illegally generated token."); sb.append(" seems to have an illegally generated token.");
Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId, Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
containerToken, nmToken, true).contains(sb.toString())); containerToken2, nmToken, true).contains(sb.toString()));
}
private ContainerTokenIdentifier getContainerTokenIdentifierFromToken(
Token containerToken) throws IOException {
ContainerTokenIdentifier containerTokenIdentifier;
containerTokenIdentifier = new ContainerTokenIdentifier();
byte[] tokenIdentifierContent = containerToken.getIdentifier().array();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
containerTokenIdentifier.readFields(dib);
return containerTokenIdentifier;
} }
/** /**
@ -677,11 +764,14 @@ private void testContainerTokenWithEpoch(Configuration conf)
containerTokenSecretManager.createContainerToken(cId, nodeId, user, r, containerTokenSecretManager.createContainerToken(cId, nodeId, user, r,
Priority.newInstance(0), 0); Priority.newInstance(0), 0);
ByteArrayDataInput input = ByteStreams.newDataInput(
containerToken.getIdentifier().array());
ContainerTokenIdentifier containerTokenIdentifier = ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(); new ContainerTokenIdentifier();
containerTokenIdentifier.readFields(input); byte[] tokenIdentifierContent = containerToken.getIdentifier().array();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
containerTokenIdentifier.readFields(dib);
Assert.assertEquals(cId, containerTokenIdentifier.getContainerID()); Assert.assertEquals(cId, containerTokenIdentifier.getContainerID());
Assert.assertEquals( Assert.assertEquals(
cId.toString(), containerTokenIdentifier.getContainerID().toString()); cId.toString(), containerTokenIdentifier.getContainerID().toString());

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "YarnSecurityTestTokenProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_protos.proto";
message NMTokenIdentifierNewProto {
optional ApplicationAttemptIdProto appAttemptId = 1;
optional NodeIdProto nodeId = 2;
optional string appSubmitter = 3;
optional int32 keyId = 4;
optional string message = 5;
}
message ContainerTokenIdentifierForTestProto {
optional ContainerIdProto containerId = 1;
optional string nmHostAddr = 2;
optional string appSubmitter = 3;
optional ResourceProto resource = 4;
optional int64 expiryTimeStamp = 5;
optional int32 masterKeyId = 6;
optional int64 rmIdentifier = 7;
optional PriorityProto priority = 8;
optional int64 creationTime = 9;
optional LogAggregationContextProto logAggregationContext = 10;
optional string message = 11;
}