YARN-692. Creating NMToken master key on RM and sharing it with NM as a part of RM-NM heartbeat. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1492907 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c0cd68d8fb
commit
f0eb4bc342
|
@ -331,6 +331,9 @@ Release 2.1.0-beta - UNRELEASED
|
|||
YARN-773. Moved YarnRuntimeException from package api.yarn to
|
||||
api.yarn.exceptions. (Jian He via vinodkv)
|
||||
|
||||
YARN-692. Creating NMToken master key on RM and sharing it with NM as a part
|
||||
of RM-NM heartbeat. (Omkar Vinit Joshi via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-512. Log aggregation root directory check is more expensive than it
|
||||
|
|
|
@ -286,6 +286,11 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final long DEFAULT_RM_CONTAINER_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS =
|
||||
24 * 60 * 60;
|
||||
|
||||
public static final String RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS =
|
||||
RM_PREFIX + "nm-tokens.master-key-rolling-interval-secs";
|
||||
|
||||
public static final long DEFAULT_RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS =
|
||||
24 * 60 * 60;
|
||||
////////////////////////////////
|
||||
// Node Manager Configs
|
||||
////////////////////////////////
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.security;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
|
||||
|
||||
public class NMTokenIdentifier extends TokenIdentifier {
|
||||
|
||||
private static Log LOG = LogFactory.getLog(NMTokenIdentifier.class);
|
||||
|
||||
public static final Text KIND = new Text("NMToken");
|
||||
|
||||
private ApplicationAttemptId appAttemptId;
|
||||
private NodeId nodeId;
|
||||
private String appSubmitter;
|
||||
private int masterKeyId;
|
||||
|
||||
public NMTokenIdentifier(ApplicationAttemptId appAttemptId, NodeId nodeId,
|
||||
String applicationSubmitter, int masterKeyId) {
|
||||
this.appAttemptId = appAttemptId;
|
||||
this.nodeId = nodeId;
|
||||
this.appSubmitter = applicationSubmitter;
|
||||
this.masterKeyId = masterKeyId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Default constructor needed by RPC/Secret manager
|
||||
*/
|
||||
public NMTokenIdentifier() {
|
||||
}
|
||||
|
||||
public ApplicationAttemptId getApplicationAttemptId() {
|
||||
return appAttemptId;
|
||||
}
|
||||
|
||||
public NodeId getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
public String getApplicationSubmitter() {
|
||||
return appSubmitter;
|
||||
}
|
||||
|
||||
public int getMastKeyId() {
|
||||
return masterKeyId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
LOG.debug("Writing NMTokenIdentifier to RPC layer: " + this);
|
||||
ApplicationId applicationId = appAttemptId.getApplicationId();
|
||||
out.writeLong(applicationId.getClusterTimestamp());
|
||||
out.writeInt(applicationId.getId());
|
||||
out.writeInt(appAttemptId.getAttemptId());
|
||||
out.writeUTF(this.nodeId.toString());
|
||||
out.writeUTF(this.appSubmitter);
|
||||
out.writeInt(this.masterKeyId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
appAttemptId =
|
||||
ApplicationAttemptId.newInstance(
|
||||
ApplicationId.newInstance(in.readLong(), in.readInt()),
|
||||
in.readInt());
|
||||
String[] hostAddr = in.readUTF().split(":");
|
||||
nodeId = NodeId.newInstance(hostAddr[0], Integer.parseInt(hostAddr[1]));
|
||||
appSubmitter = in.readUTF();
|
||||
masterKeyId = in.readInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Text getKind() {
|
||||
return KIND;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UserGroupInformation getUser() {
|
||||
return UserGroupInformation.createRemoteUser(appAttemptId.toString());
|
||||
}
|
||||
|
||||
}
|
|
@ -26,6 +26,9 @@ public interface NodeHeartbeatRequest {
|
|||
NodeStatus getNodeStatus();
|
||||
void setNodeStatus(NodeStatus status);
|
||||
|
||||
MasterKey getLastKnownMasterKey();
|
||||
void setLastKnownMasterKey(MasterKey secretKey);
|
||||
MasterKey getLastKnownContainerTokenMasterKey();
|
||||
void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
|
||||
|
||||
MasterKey getLastKnownNMTokenMasterKey();
|
||||
void setLastKnownNMTokenMasterKey(MasterKey secretKey);
|
||||
}
|
||||
|
|
|
@ -36,8 +36,11 @@ public interface NodeHeartbeatResponse {
|
|||
void setResponseId(int responseId);
|
||||
void setNodeAction(NodeAction action);
|
||||
|
||||
MasterKey getMasterKey();
|
||||
void setMasterKey(MasterKey secretKey);
|
||||
MasterKey getContainerTokenMasterKey();
|
||||
void setContainerTokenMasterKey(MasterKey secretKey);
|
||||
|
||||
MasterKey getNMTokenMasterKey();
|
||||
void setNMTokenMasterKey(MasterKey secretKey);
|
||||
|
||||
void addAllContainersToCleanup(List<ContainerId> containers);
|
||||
|
||||
|
|
|
@ -22,9 +22,13 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
|
||||
public interface RegisterNodeManagerResponse {
|
||||
MasterKey getMasterKey();
|
||||
MasterKey getContainerTokenMasterKey();
|
||||
|
||||
void setMasterKey(MasterKey secretKey);
|
||||
void setContainerTokenMasterKey(MasterKey secretKey);
|
||||
|
||||
MasterKey getNMTokenMasterKey();
|
||||
|
||||
void setNMTokenMasterKey(MasterKey secretKey);
|
||||
|
||||
NodeAction getNodeAction();
|
||||
|
||||
|
|
|
@ -36,7 +36,8 @@ public class NodeHeartbeatRequestPBImpl extends
|
|||
boolean viaProto = false;
|
||||
|
||||
private NodeStatus nodeStatus = null;
|
||||
private MasterKey lastKnownMasterKey = null;
|
||||
private MasterKey lastKnownContainerTokenMasterKey = null;
|
||||
private MasterKey lastKnownNMTokenMasterKey = null;
|
||||
|
||||
public NodeHeartbeatRequestPBImpl() {
|
||||
builder = NodeHeartbeatRequestProto.newBuilder();
|
||||
|
@ -58,9 +59,13 @@ public class NodeHeartbeatRequestPBImpl extends
|
|||
if (this.nodeStatus != null) {
|
||||
builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));
|
||||
}
|
||||
if (this.lastKnownMasterKey != null) {
|
||||
builder
|
||||
.setLastKnownMasterKey(convertToProtoFormat(this.lastKnownMasterKey));
|
||||
if (this.lastKnownContainerTokenMasterKey != null) {
|
||||
builder.setLastKnownContainerTokenMasterKey(
|
||||
convertToProtoFormat(this.lastKnownContainerTokenMasterKey));
|
||||
}
|
||||
if (this.lastKnownNMTokenMasterKey != null) {
|
||||
builder.setLastKnownNmTokenMasterKey(
|
||||
convertToProtoFormat(this.lastKnownNMTokenMasterKey));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,24 +107,47 @@ public class NodeHeartbeatRequestPBImpl extends
|
|||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getLastKnownMasterKey() {
|
||||
public MasterKey getLastKnownContainerTokenMasterKey() {
|
||||
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.lastKnownMasterKey != null) {
|
||||
return this.lastKnownMasterKey;
|
||||
if (this.lastKnownContainerTokenMasterKey != null) {
|
||||
return this.lastKnownContainerTokenMasterKey;
|
||||
}
|
||||
if (!p.hasLastKnownMasterKey()) {
|
||||
if (!p.hasLastKnownContainerTokenMasterKey()) {
|
||||
return null;
|
||||
}
|
||||
this.lastKnownMasterKey = convertFromProtoFormat(p.getLastKnownMasterKey());
|
||||
return this.lastKnownMasterKey;
|
||||
this.lastKnownContainerTokenMasterKey =
|
||||
convertFromProtoFormat(p.getLastKnownContainerTokenMasterKey());
|
||||
return this.lastKnownContainerTokenMasterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLastKnownMasterKey(MasterKey masterKey) {
|
||||
public void setLastKnownContainerTokenMasterKey(MasterKey masterKey) {
|
||||
maybeInitBuilder();
|
||||
if (masterKey == null)
|
||||
builder.clearLastKnownMasterKey();
|
||||
this.lastKnownMasterKey = masterKey;
|
||||
builder.clearLastKnownContainerTokenMasterKey();
|
||||
this.lastKnownContainerTokenMasterKey = masterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getLastKnownNMTokenMasterKey() {
|
||||
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.lastKnownNMTokenMasterKey != null) {
|
||||
return this.lastKnownNMTokenMasterKey;
|
||||
}
|
||||
if (!p.hasLastKnownNmTokenMasterKey()) {
|
||||
return null;
|
||||
}
|
||||
this.lastKnownNMTokenMasterKey =
|
||||
convertFromProtoFormat(p.getLastKnownNmTokenMasterKey());
|
||||
return this.lastKnownNMTokenMasterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLastKnownNMTokenMasterKey(MasterKey masterKey) {
|
||||
maybeInitBuilder();
|
||||
if (masterKey == null)
|
||||
builder.clearLastKnownNmTokenMasterKey();
|
||||
this.lastKnownNMTokenMasterKey = masterKey;
|
||||
}
|
||||
|
||||
private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {
|
||||
|
|
|
@ -47,7 +47,8 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
|||
|
||||
private List<ContainerId> containersToCleanup = null;
|
||||
private List<ApplicationId> applicationsToCleanup = null;
|
||||
private MasterKey masterKey = null;
|
||||
private MasterKey containerTokenMasterKey = null;
|
||||
private MasterKey nmTokenMasterKey = null;
|
||||
|
||||
public NodeHeartbeatResponsePBImpl() {
|
||||
builder = NodeHeartbeatResponseProto.newBuilder();
|
||||
|
@ -72,8 +73,13 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
|||
if (this.applicationsToCleanup != null) {
|
||||
addApplicationsToCleanupToProto();
|
||||
}
|
||||
if (this.masterKey != null) {
|
||||
builder.setMasterKey(convertToProtoFormat(this.masterKey));
|
||||
if (this.containerTokenMasterKey != null) {
|
||||
builder.setContainerTokenMasterKey(
|
||||
convertToProtoFormat(this.containerTokenMasterKey));
|
||||
}
|
||||
if (this.nmTokenMasterKey != null) {
|
||||
builder.setNmTokenMasterKey(
|
||||
convertToProtoFormat(this.nmTokenMasterKey));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,24 +112,47 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
|||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getMasterKey() {
|
||||
public MasterKey getContainerTokenMasterKey() {
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.masterKey != null) {
|
||||
return this.masterKey;
|
||||
if (this.containerTokenMasterKey != null) {
|
||||
return this.containerTokenMasterKey;
|
||||
}
|
||||
if (!p.hasMasterKey()) {
|
||||
if (!p.hasContainerTokenMasterKey()) {
|
||||
return null;
|
||||
}
|
||||
this.masterKey = convertFromProtoFormat(p.getMasterKey());
|
||||
return this.masterKey;
|
||||
this.containerTokenMasterKey =
|
||||
convertFromProtoFormat(p.getContainerTokenMasterKey());
|
||||
return this.containerTokenMasterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMasterKey(MasterKey masterKey) {
|
||||
public void setContainerTokenMasterKey(MasterKey masterKey) {
|
||||
maybeInitBuilder();
|
||||
if (masterKey == null)
|
||||
builder.clearMasterKey();
|
||||
this.masterKey = masterKey;
|
||||
builder.clearContainerTokenMasterKey();
|
||||
this.containerTokenMasterKey = masterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getNMTokenMasterKey() {
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.nmTokenMasterKey != null) {
|
||||
return this.nmTokenMasterKey;
|
||||
}
|
||||
if (!p.hasNmTokenMasterKey()) {
|
||||
return null;
|
||||
}
|
||||
this.nmTokenMasterKey =
|
||||
convertFromProtoFormat(p.getNmTokenMasterKey());
|
||||
return this.nmTokenMasterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNMTokenMasterKey(MasterKey masterKey) {
|
||||
maybeInitBuilder();
|
||||
if (masterKey == null)
|
||||
builder.clearNmTokenMasterKey();
|
||||
this.nmTokenMasterKey = masterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,7 +36,8 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
|||
RegisterNodeManagerResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private MasterKey masterKey = null;
|
||||
private MasterKey containerTokenMasterKey = null;
|
||||
private MasterKey nmTokenMasterKey = null;
|
||||
|
||||
private boolean rebuild = false;
|
||||
|
||||
|
@ -58,8 +59,13 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
|||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (this.masterKey != null) {
|
||||
builder.setMasterKey(convertToProtoFormat(this.masterKey));
|
||||
if (this.containerTokenMasterKey != null) {
|
||||
builder.setContainerTokenMasterKey(
|
||||
convertToProtoFormat(this.containerTokenMasterKey));
|
||||
}
|
||||
if (this.nmTokenMasterKey != null) {
|
||||
builder.setNmTokenMasterKey(
|
||||
convertToProtoFormat(this.nmTokenMasterKey));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,24 +86,48 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
|||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getMasterKey() {
|
||||
public MasterKey getContainerTokenMasterKey() {
|
||||
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.masterKey != null) {
|
||||
return this.masterKey;
|
||||
if (this.containerTokenMasterKey != null) {
|
||||
return this.containerTokenMasterKey;
|
||||
}
|
||||
if (!p.hasMasterKey()) {
|
||||
if (!p.hasContainerTokenMasterKey()) {
|
||||
return null;
|
||||
}
|
||||
this.masterKey = convertFromProtoFormat(p.getMasterKey());
|
||||
return this.masterKey;
|
||||
this.containerTokenMasterKey =
|
||||
convertFromProtoFormat(p.getContainerTokenMasterKey());
|
||||
return this.containerTokenMasterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMasterKey(MasterKey masterKey) {
|
||||
public void setContainerTokenMasterKey(MasterKey masterKey) {
|
||||
maybeInitBuilder();
|
||||
if (masterKey == null)
|
||||
builder.clearMasterKey();
|
||||
this.masterKey = masterKey;
|
||||
builder.clearContainerTokenMasterKey();
|
||||
this.containerTokenMasterKey = masterKey;
|
||||
rebuild = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getNMTokenMasterKey() {
|
||||
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.nmTokenMasterKey != null) {
|
||||
return this.nmTokenMasterKey;
|
||||
}
|
||||
if (!p.hasNmTokenMasterKey()) {
|
||||
return null;
|
||||
}
|
||||
this.nmTokenMasterKey =
|
||||
convertFromProtoFormat(p.getNmTokenMasterKey());
|
||||
return this.nmTokenMasterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNMTokenMasterKey(MasterKey masterKey) {
|
||||
maybeInitBuilder();
|
||||
if (masterKey == null)
|
||||
builder.clearNmTokenMasterKey();
|
||||
this.nmTokenMasterKey = masterKey;
|
||||
rebuild = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -60,37 +60,6 @@ public class BaseContainerTokenSecretManager extends
|
|||
*/
|
||||
protected MasterKeyData currentMasterKey;
|
||||
|
||||
protected final class MasterKeyData {
|
||||
|
||||
private final MasterKey masterKeyRecord;
|
||||
// Underlying secret-key also stored to avoid repetitive encoding and
|
||||
// decoding the masterKeyRecord bytes.
|
||||
private final SecretKey generatedSecretKey;
|
||||
|
||||
private MasterKeyData() {
|
||||
this.masterKeyRecord = Records.newRecord(MasterKey.class);
|
||||
this.masterKeyRecord.setKeyId(serialNo++);
|
||||
this.generatedSecretKey = generateSecret();
|
||||
this.masterKeyRecord.setBytes(ByteBuffer.wrap(generatedSecretKey
|
||||
.getEncoded()));
|
||||
}
|
||||
|
||||
public MasterKeyData(MasterKey masterKeyRecord) {
|
||||
this.masterKeyRecord = masterKeyRecord;
|
||||
this.generatedSecretKey =
|
||||
SecretManager.createSecretKey(this.masterKeyRecord.getBytes().array()
|
||||
.clone());
|
||||
}
|
||||
|
||||
public MasterKey getMasterKey() {
|
||||
return this.masterKeyRecord;
|
||||
}
|
||||
|
||||
private SecretKey getSecretKey() {
|
||||
return this.generatedSecretKey;
|
||||
}
|
||||
}
|
||||
|
||||
protected final long containerTokenExpiryInterval;
|
||||
|
||||
public BaseContainerTokenSecretManager(Configuration conf) {
|
||||
|
@ -103,7 +72,7 @@ public class BaseContainerTokenSecretManager extends
|
|||
protected MasterKeyData createNewMasterKey() {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
return new MasterKeyData();
|
||||
return new MasterKeyData(serialNo++, generateSecret());
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.security;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
|
||||
public class BaseNMTokenSecretManager extends
|
||||
SecretManager<NMTokenIdentifier> {
|
||||
|
||||
private static Log LOG = LogFactory
|
||||
.getLog(BaseNMTokenSecretManager.class);
|
||||
|
||||
private int serialNo = new SecureRandom().nextInt();
|
||||
|
||||
protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||
protected final Lock readLock = readWriteLock.readLock();
|
||||
protected final Lock writeLock = readWriteLock.writeLock();
|
||||
|
||||
protected MasterKeyData currentMasterKey;
|
||||
|
||||
protected MasterKeyData createNewMasterKey() {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
return new MasterKeyData(serialNo++, generateSecret());
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
public MasterKey getCurrentKey() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.currentMasterKey.getMasterKey();
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] createPassword(NMTokenIdentifier identifier) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("creating password for "
|
||||
+ identifier.getApplicationAttemptId() + " for user "
|
||||
+ identifier.getApplicationSubmitter() + " to run on NM "
|
||||
+ identifier.getNodeId());
|
||||
}
|
||||
readLock.lock();
|
||||
try {
|
||||
return createPassword(identifier.getBytes(),
|
||||
currentMasterKey.getSecretKey());
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] retrievePassword(NMTokenIdentifier identifier)
|
||||
throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
|
||||
readLock.lock();
|
||||
try {
|
||||
return retrivePasswordInternal(identifier, currentMasterKey);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
protected byte[] retrivePasswordInternal(NMTokenIdentifier identifier,
|
||||
MasterKeyData masterKey) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("creating password for "
|
||||
+ identifier.getApplicationAttemptId() + " for user "
|
||||
+ identifier.getApplicationSubmitter() + " to run on NM "
|
||||
+ identifier.getNodeId());
|
||||
}
|
||||
return createPassword(identifier.getBytes(), masterKey.getSecretKey());
|
||||
}
|
||||
/**
|
||||
* It is required for RPC
|
||||
*/
|
||||
@Override
|
||||
public NMTokenIdentifier createIdentifier() {
|
||||
return new NMTokenIdentifier();
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function for creating NMTokens.
|
||||
*/
|
||||
public Token createNMToken(ApplicationAttemptId applicationAttemptId,
|
||||
NodeId nodeId, String applicationSubmitter) {
|
||||
byte[] password;
|
||||
NMTokenIdentifier identifier;
|
||||
|
||||
this.readLock.lock();
|
||||
try {
|
||||
identifier =
|
||||
new NMTokenIdentifier(applicationAttemptId, nodeId,
|
||||
applicationSubmitter, this.currentMasterKey.getMasterKey()
|
||||
.getKeyId());
|
||||
password = this.createPassword(identifier);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
return newNMToken(password, identifier);
|
||||
}
|
||||
|
||||
public static Token newNMToken(byte[] password,
|
||||
NMTokenIdentifier identifier) {
|
||||
NodeId nodeId = identifier.getNodeId();
|
||||
// RPC layer client expects ip:port as service for tokens
|
||||
InetSocketAddress addr =
|
||||
NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
|
||||
Token nmToken =
|
||||
Token.newInstance(identifier.getBytes(),
|
||||
NMTokenIdentifier.KIND.toString(), password, SecurityUtil
|
||||
.buildTokenService(addr).toString());
|
||||
return nmToken;
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.security;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
|
||||
public class MasterKeyData {
|
||||
|
||||
private final MasterKey masterKeyRecord;
|
||||
// Underlying secret-key also stored to avoid repetitive encoding and
|
||||
// decoding the masterKeyRecord bytes.
|
||||
private final SecretKey generatedSecretKey;
|
||||
|
||||
public MasterKeyData(int serialNo, SecretKey secretKey) {
|
||||
this.masterKeyRecord = Records.newRecord(MasterKey.class);
|
||||
this.masterKeyRecord.setKeyId(serialNo);
|
||||
this.generatedSecretKey = secretKey;
|
||||
this.masterKeyRecord.setBytes(ByteBuffer.wrap(generatedSecretKey
|
||||
.getEncoded()));
|
||||
}
|
||||
|
||||
public MasterKeyData(MasterKey masterKeyRecord, SecretKey secretKey) {
|
||||
this.masterKeyRecord = masterKeyRecord;
|
||||
this.generatedSecretKey = secretKey;
|
||||
|
||||
}
|
||||
|
||||
public MasterKey getMasterKey() {
|
||||
return this.masterKeyRecord;
|
||||
}
|
||||
|
||||
public SecretKey getSecretKey() {
|
||||
return this.generatedSecretKey;
|
||||
}
|
||||
}
|
||||
|
|
@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
|
|
|
@ -42,12 +42,14 @@ public class YarnServerBuilderUtils {
|
|||
public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId,
|
||||
NodeAction action, List<ContainerId> containersToCleanUp,
|
||||
List<ApplicationId> applicationsToCleanUp,
|
||||
MasterKey masterKey, long nextHeartbeatInterval) {
|
||||
MasterKey containerTokenMasterKey, MasterKey nmTokenMasterKey,
|
||||
long nextHeartbeatInterval) {
|
||||
NodeHeartbeatResponse response = recordFactory
|
||||
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||
response.setResponseId(responseId);
|
||||
response.setNodeAction(action);
|
||||
response.setMasterKey(masterKey);
|
||||
response.setContainerTokenMasterKey(containerTokenMasterKey);
|
||||
response.setNMTokenMasterKey(nmTokenMasterKey);
|
||||
response.setNextHeartBeatInterval(nextHeartbeatInterval);
|
||||
if(containersToCleanUp != null) {
|
||||
response.addAllContainersToCleanup(containersToCleanUp);
|
||||
|
|
|
@ -31,24 +31,27 @@ message RegisterNodeManagerRequestProto {
|
|||
}
|
||||
|
||||
message RegisterNodeManagerResponseProto {
|
||||
optional MasterKeyProto master_key = 1;
|
||||
optional NodeActionProto nodeAction = 2;
|
||||
optional int64 rm_identifier = 3;
|
||||
optional string diagnostics_message = 4;
|
||||
optional MasterKeyProto container_token_master_key = 1;
|
||||
optional MasterKeyProto nm_token_master_key = 2;
|
||||
optional NodeActionProto nodeAction = 3;
|
||||
optional int64 rm_identifier = 4;
|
||||
optional string diagnostics_message = 5;
|
||||
}
|
||||
|
||||
message NodeHeartbeatRequestProto {
|
||||
optional NodeStatusProto node_status = 1;
|
||||
optional MasterKeyProto last_known_master_key = 2;
|
||||
optional MasterKeyProto last_known_container_token_master_key = 2;
|
||||
optional MasterKeyProto last_known_nm_token_master_key = 3;
|
||||
}
|
||||
|
||||
|
||||
message NodeHeartbeatResponseProto {
|
||||
optional int32 response_id = 1;
|
||||
optional MasterKeyProto master_key = 2;
|
||||
optional NodeActionProto nodeAction = 3;
|
||||
repeated ContainerIdProto containers_to_cleanup = 4;
|
||||
repeated ApplicationIdProto applications_to_cleanup = 5;
|
||||
optional int64 nextHeartBeatInterval = 6;
|
||||
optional string diagnostics_message = 7;
|
||||
optional MasterKeyProto container_token_master_key = 2;
|
||||
optional MasterKeyProto nm_token_master_key = 3;
|
||||
optional NodeActionProto nodeAction = 4;
|
||||
repeated ContainerIdProto containers_to_cleanup = 5;
|
||||
repeated ApplicationIdProto applications_to_cleanup = 6;
|
||||
optional int64 nextHeartBeatInterval = 7;
|
||||
optional string diagnostics_message = 8;
|
||||
}
|
||||
|
|
|
@ -63,14 +63,16 @@ public class TestYarnServerApiClasses {
|
|||
public void testRegisterNodeManagerResponsePBImpl() {
|
||||
RegisterNodeManagerResponsePBImpl original =
|
||||
new RegisterNodeManagerResponsePBImpl();
|
||||
original.setMasterKey(getMasterKey());
|
||||
original.setContainerTokenMasterKey(getMasterKey());
|
||||
original.setNMTokenMasterKey(getMasterKey());
|
||||
original.setNodeAction(NodeAction.NORMAL);
|
||||
original.setDiagnosticsMessage("testDiagnosticMessage");
|
||||
|
||||
RegisterNodeManagerResponsePBImpl copy =
|
||||
new RegisterNodeManagerResponsePBImpl(
|
||||
original.getProto());
|
||||
assertEquals(1, copy.getMasterKey().getKeyId());
|
||||
assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
|
||||
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
|
||||
assertEquals(NodeAction.NORMAL, copy.getNodeAction());
|
||||
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
|
||||
|
||||
|
@ -82,11 +84,13 @@ public class TestYarnServerApiClasses {
|
|||
@Test
|
||||
public void testNodeHeartbeatRequestPBImpl() {
|
||||
NodeHeartbeatRequestPBImpl original = new NodeHeartbeatRequestPBImpl();
|
||||
original.setLastKnownMasterKey(getMasterKey());
|
||||
original.setLastKnownContainerTokenMasterKey(getMasterKey());
|
||||
original.setLastKnownNMTokenMasterKey(getMasterKey());
|
||||
original.setNodeStatus(getNodeStatus());
|
||||
NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
|
||||
original.getProto());
|
||||
assertEquals(1, copy.getLastKnownMasterKey().getKeyId());
|
||||
assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
|
||||
assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
|
||||
assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
|
||||
}
|
||||
|
||||
|
@ -99,7 +103,8 @@ public class TestYarnServerApiClasses {
|
|||
NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
|
||||
|
||||
original.setDiagnosticsMessage("testDiagnosticMessage");
|
||||
original.setMasterKey(getMasterKey());
|
||||
original.setContainerTokenMasterKey(getMasterKey());
|
||||
original.setNMTokenMasterKey(getMasterKey());
|
||||
original.setNextHeartBeatInterval(1000);
|
||||
original.setNodeAction(NodeAction.NORMAL);
|
||||
original.setResponseId(100);
|
||||
|
@ -109,7 +114,8 @@ public class TestYarnServerApiClasses {
|
|||
assertEquals(100, copy.getResponseId());
|
||||
assertEquals(NodeAction.NORMAL, copy.getNodeAction());
|
||||
assertEquals(1000, copy.getNextHeartBeatInterval());
|
||||
assertEquals(1, copy.getMasterKey().getKeyId());
|
||||
assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
|
||||
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
|
||||
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
|
||||
}
|
||||
|
||||
|
|
|
@ -41,24 +41,50 @@ public class TestRegisterNodeManagerResponse {
|
|||
public void testRoundTrip() throws Exception {
|
||||
RegisterNodeManagerResponse resp = recordFactory
|
||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
MasterKey mk = recordFactory.newRecordInstance(MasterKey.class);
|
||||
mk.setKeyId(54321);
|
||||
|
||||
byte b [] = {0,1,2,3,4,5};
|
||||
mk.setBytes(ByteBuffer.wrap(b));
|
||||
resp.setMasterKey(mk);
|
||||
resp.setNodeAction(NodeAction.NORMAL);
|
||||
|
||||
MasterKey containerTokenMK =
|
||||
recordFactory.newRecordInstance(MasterKey.class);
|
||||
containerTokenMK.setKeyId(54321);
|
||||
containerTokenMK.setBytes(ByteBuffer.wrap(b));
|
||||
resp.setContainerTokenMasterKey(containerTokenMK);
|
||||
|
||||
MasterKey nmTokenMK =
|
||||
recordFactory.newRecordInstance(MasterKey.class);
|
||||
nmTokenMK.setKeyId(12345);
|
||||
nmTokenMK.setBytes(ByteBuffer.wrap(b));
|
||||
resp.setNMTokenMasterKey(nmTokenMK);
|
||||
|
||||
resp.setNodeAction(NodeAction.NORMAL);
|
||||
|
||||
assertEquals(NodeAction.NORMAL, resp.getNodeAction());
|
||||
assertNotNull(resp.getMasterKey());
|
||||
assertEquals(54321, resp.getMasterKey().getKeyId());
|
||||
assertArrayEquals(b, resp.getMasterKey().getBytes().array());
|
||||
|
||||
// Verifying containerTokenMasterKey
|
||||
assertNotNull(resp.getContainerTokenMasterKey());
|
||||
assertEquals(54321, resp.getContainerTokenMasterKey().getKeyId());
|
||||
assertArrayEquals(b, resp.getContainerTokenMasterKey().getBytes().array());
|
||||
|
||||
RegisterNodeManagerResponse respCopy = serDe(resp);
|
||||
|
||||
assertEquals(NodeAction.NORMAL, respCopy.getNodeAction());
|
||||
assertNotNull(respCopy.getMasterKey());
|
||||
assertEquals(54321, respCopy.getMasterKey().getKeyId());
|
||||
assertArrayEquals(b, respCopy.getMasterKey().getBytes().array());
|
||||
assertNotNull(respCopy.getContainerTokenMasterKey());
|
||||
assertEquals(54321, respCopy.getContainerTokenMasterKey().getKeyId());
|
||||
assertArrayEquals(b, respCopy.getContainerTokenMasterKey().getBytes()
|
||||
.array());
|
||||
|
||||
// Verifying nmTokenMasterKey
|
||||
assertNotNull(resp.getNMTokenMasterKey());
|
||||
assertEquals(12345, resp.getNMTokenMasterKey().getKeyId());
|
||||
assertArrayEquals(b, resp.getNMTokenMasterKey().getBytes().array());
|
||||
|
||||
respCopy = serDe(resp);
|
||||
|
||||
assertEquals(NodeAction.NORMAL, respCopy.getNodeAction());
|
||||
assertNotNull(respCopy.getNMTokenMasterKey());
|
||||
assertEquals(12345, respCopy.getNMTokenMasterKey().getKeyId());
|
||||
assertArrayEquals(b, respCopy.getNMTokenMasterKey().getBytes().array());
|
||||
|
||||
}
|
||||
|
||||
public static RegisterNodeManagerResponse serDe(RegisterNodeManagerResponse orig) throws Exception {
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
|
||||
/**
|
||||
* Context interface for sharing information across components in the
|
||||
|
@ -54,6 +55,8 @@ public interface Context {
|
|||
ConcurrentMap<ContainerId, Container> getContainers();
|
||||
|
||||
NMContainerTokenSecretManager getContainerTokenSecretManager();
|
||||
|
||||
NMTokenSecretManagerInNM getNMTokenSecretManager();
|
||||
|
||||
NodeHealthStatus getNodeHealthStatus();
|
||||
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.service.CompositeService;
|
||||
|
@ -118,8 +119,10 @@ public class NodeManager extends CompositeService
|
|||
return new DeletionService(exec);
|
||||
}
|
||||
|
||||
protected NMContext createNMContext(NMContainerTokenSecretManager containerTokenSecretManager) {
|
||||
return new NMContext(containerTokenSecretManager);
|
||||
protected NMContext createNMContext(
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager) {
|
||||
return new NMContext(containerTokenSecretManager, nmTokenSecretManager);
|
||||
}
|
||||
|
||||
protected void doSecureLogin() throws IOException {
|
||||
|
@ -135,7 +138,11 @@ public class NodeManager extends CompositeService
|
|||
NMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new NMContainerTokenSecretManager(conf);
|
||||
|
||||
this.context = createNMContext(containerTokenSecretManager);
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager =
|
||||
new NMTokenSecretManagerInNM();
|
||||
|
||||
this.context =
|
||||
createNMContext(containerTokenSecretManager, nmTokenSecretManager);
|
||||
|
||||
this.aclsManager = new ApplicationACLsManager(conf);
|
||||
|
||||
|
@ -299,13 +306,16 @@ public class NodeManager extends CompositeService
|
|||
new ConcurrentSkipListMap<ContainerId, Container>();
|
||||
|
||||
private final NMContainerTokenSecretManager containerTokenSecretManager;
|
||||
private final NMTokenSecretManagerInNM nmTokenSecretManager;
|
||||
private ContainerManager containerManager;
|
||||
private WebServer webServer;
|
||||
private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
|
||||
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
|
||||
|
||||
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager) {
|
||||
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager) {
|
||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||
this.nmTokenSecretManager = nmTokenSecretManager;
|
||||
this.nodeHealthStatus.setIsNodeHealthy(true);
|
||||
this.nodeHealthStatus.setHealthReport("Healthy");
|
||||
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
|
||||
|
@ -338,6 +348,12 @@ public class NodeManager extends CompositeService
|
|||
public NMContainerTokenSecretManager getContainerTokenSecretManager() {
|
||||
return this.containerTokenSecretManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NMTokenSecretManagerInNM getNMTokenSecretManager() {
|
||||
return this.nmTokenSecretManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeHealthStatus getNodeHealthStatus() {
|
||||
return this.nodeHealthStatus;
|
||||
|
|
|
@ -297,7 +297,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
+ message);
|
||||
}
|
||||
|
||||
MasterKey masterKey = regNMResponse.getMasterKey();
|
||||
MasterKey masterKey = regNMResponse.getContainerTokenMasterKey();
|
||||
// do this now so that its set before we start heartbeating to RM
|
||||
// It is expected that status updater is started by this point and
|
||||
// RM gives the shared secret in registration during
|
||||
|
@ -305,6 +305,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
if (masterKey != null) {
|
||||
this.context.getContainerTokenSecretManager().setMasterKey(masterKey);
|
||||
}
|
||||
|
||||
masterKey = regNMResponse.getNMTokenMasterKey();
|
||||
if (masterKey != null) {
|
||||
this.context.getNMTokenSecretManager().setMasterKey(masterKey);
|
||||
}
|
||||
|
||||
LOG.info("Registered with ResourceManager as " + this.nodeId
|
||||
+ " with total resource of " + this.totalResource);
|
||||
|
@ -434,8 +439,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
NodeHeartbeatRequest request = recordFactory
|
||||
.newRecordInstance(NodeHeartbeatRequest.class);
|
||||
request.setNodeStatus(nodeStatus);
|
||||
request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
|
||||
.getContainerTokenSecretManager().getCurrentKey());
|
||||
request
|
||||
.setLastKnownContainerTokenMasterKey(NodeStatusUpdaterImpl.this.context
|
||||
.getContainerTokenSecretManager().getCurrentKey());
|
||||
request
|
||||
.setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
|
||||
.getNMTokenSecretManager().getCurrentKey());
|
||||
while (!isStopped) {
|
||||
try {
|
||||
rmRetryCount++;
|
||||
|
@ -463,13 +472,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
}
|
||||
//get next heartbeat interval from response
|
||||
nextHeartBeatInterval = response.getNextHeartBeatInterval();
|
||||
// See if the master-key has rolled over
|
||||
MasterKey updatedMasterKey = response.getMasterKey();
|
||||
if (updatedMasterKey != null) {
|
||||
// Will be non-null only on roll-over on RM side
|
||||
context.getContainerTokenSecretManager().setMasterKey(
|
||||
updatedMasterKey);
|
||||
}
|
||||
updateMasterKeys(response);
|
||||
|
||||
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
|
||||
LOG
|
||||
|
@ -533,6 +536,20 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateMasterKeys(NodeHeartbeatResponse response) {
|
||||
// See if the master-key has rolled over
|
||||
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
|
||||
if (updatedMasterKey != null) {
|
||||
// Will be non-null only on roll-over on RM side
|
||||
context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey);
|
||||
}
|
||||
|
||||
updatedMasterKey = response.getNMTokenMasterKey();
|
||||
if (updatedMasterKey != null) {
|
||||
context.getNMTokenSecretManager().setMasterKey(updatedMasterKey);
|
||||
}
|
||||
}
|
||||
};
|
||||
statusUpdater =
|
||||
new Thread(statusUpdaterRunnable, "Node Status Updater");
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
@ -69,13 +70,17 @@ public class NMContainerTokenSecretManager extends
|
|||
LOG.info("Rolling master-key for container-tokens, got key with id "
|
||||
+ masterKeyRecord.getKeyId());
|
||||
if (super.currentMasterKey == null) {
|
||||
super.currentMasterKey = new MasterKeyData(masterKeyRecord);
|
||||
super.currentMasterKey =
|
||||
new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
|
||||
.getBytes().array()));
|
||||
} else {
|
||||
if (super.currentMasterKey.getMasterKey().getKeyId() != masterKeyRecord
|
||||
.getKeyId()) {
|
||||
// Update keys only if the key has changed.
|
||||
this.previousMasterKey = super.currentMasterKey;
|
||||
super.currentMasterKey = new MasterKeyData(masterKeyRecord);
|
||||
super.currentMasterKey =
|
||||
new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
|
||||
.getBytes().array()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.security;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
|
||||
public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(NMTokenSecretManagerInNM.class);
|
||||
|
||||
private MasterKeyData previousMasterKey;
|
||||
|
||||
private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys;
|
||||
|
||||
public NMTokenSecretManagerInNM() {
|
||||
this.oldMasterKeys =
|
||||
new HashMap<ApplicationAttemptId, MasterKeyData>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by NodeManagers to create a token-secret-manager with the key
|
||||
* obtained from the RM. This can happen during registration or when the RM
|
||||
* rolls the master-key and signal the NM.
|
||||
*/
|
||||
@Private
|
||||
public synchronized void setMasterKey(MasterKey masterKey) {
|
||||
LOG.info("Rolling master-key for nm-tokens, got key with id :"
|
||||
+ masterKey.getKeyId());
|
||||
if (super.currentMasterKey == null) {
|
||||
super.currentMasterKey =
|
||||
new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
|
||||
.array()));
|
||||
} else {
|
||||
if (super.currentMasterKey.getMasterKey().getKeyId() != masterKey
|
||||
.getKeyId()) {
|
||||
this.previousMasterKey = super.currentMasterKey;
|
||||
super.currentMasterKey =
|
||||
new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
|
||||
.array()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will be used to verify NMTokens generated by different
|
||||
* master keys.
|
||||
*/
|
||||
@Override
|
||||
public synchronized byte[] retrievePassword(
|
||||
NMTokenIdentifier identifier) throws InvalidToken {
|
||||
int keyId = identifier.getMastKeyId();
|
||||
ApplicationAttemptId appAttemptId = identifier.getApplicationAttemptId();
|
||||
|
||||
/*
|
||||
* MasterKey used for retrieving password will be as follows.
|
||||
* 1) By default older saved master key will be used.
|
||||
* 2) If identifier's master key id matches that of previous master key
|
||||
* id then previous key will be used.
|
||||
* 3) If identifier's master key id matches that of current master key
|
||||
* id then current key will be used.
|
||||
*/
|
||||
MasterKeyData oldMasterKey = oldMasterKeys.get(appAttemptId);
|
||||
MasterKeyData masterKeyToUse = oldMasterKey;
|
||||
if (previousMasterKey != null
|
||||
&& keyId == previousMasterKey.getMasterKey().getKeyId()) {
|
||||
masterKeyToUse = previousMasterKey;
|
||||
} else if ( keyId == currentMasterKey.getMasterKey().getKeyId()) {
|
||||
masterKeyToUse = currentMasterKey;
|
||||
}
|
||||
|
||||
if (masterKeyToUse != null) {
|
||||
byte[] password = retrivePasswordInternal(identifier, masterKeyToUse);
|
||||
if (masterKeyToUse.getMasterKey().getKeyId() != oldMasterKey
|
||||
.getMasterKey().getKeyId()) {
|
||||
oldMasterKeys.put(appAttemptId, masterKeyToUse);
|
||||
}
|
||||
return password;
|
||||
}
|
||||
|
||||
throw new InvalidToken("Given NMToken for application : "
|
||||
+ appAttemptId.toString() + " seems to have been generated illegally.");
|
||||
}
|
||||
|
||||
public synchronized void appFinished(ApplicationAttemptId appAttemptId) {
|
||||
this.oldMasterKeys.remove(appAttemptId);
|
||||
}
|
||||
}
|
|
@ -46,7 +46,8 @@ public class LocalRMInterface implements ResourceTracker {
|
|||
masterKey.setKeyId(123);
|
||||
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
|
||||
.byteValue() }));
|
||||
response.setMasterKey(masterKey);
|
||||
response.setContainerTokenMasterKey(masterKey);
|
||||
response.setNMTokenMasterKey(masterKey);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
|
|
@ -75,7 +75,8 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
|
|||
masterKey.setKeyId(123);
|
||||
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
|
||||
.byteValue() }));
|
||||
response.setMasterKey(masterKey);
|
||||
response.setContainerTokenMasterKey(masterKey);
|
||||
response.setNMTokenMasterKey(masterKey);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -88,7 +89,7 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
|
|||
|
||||
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils
|
||||
.newNodeHeartbeatResponse(heartBeatID, null, null,
|
||||
null, null, 1000L);
|
||||
null, null, null, 1000L);
|
||||
return nhResponse;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
|
@ -76,7 +77,8 @@ public class TestEventFlow {
|
|||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
||||
Context context = new NMContext(new NMContainerTokenSecretManager(conf)) {
|
||||
Context context = new NMContext(new NMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInNM()) {
|
||||
@Override
|
||||
public int getHttpPort() {
|
||||
return 1234;
|
||||
|
|
|
@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
||||
|
@ -147,7 +148,8 @@ public class TestNodeStatusUpdater {
|
|||
|
||||
RegisterNodeManagerResponse response = recordFactory
|
||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
response.setMasterKey(createMasterKey());
|
||||
response.setContainerTokenMasterKey(createMasterKey());
|
||||
response.setNMTokenMasterKey(createMasterKey());
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -251,7 +253,8 @@ public class TestNodeStatusUpdater {
|
|||
}
|
||||
|
||||
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
|
||||
newNodeHeartbeatResponse(heartBeatID, null, null, null, null, 1000L);
|
||||
newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null,
|
||||
1000L);
|
||||
return nhResponse;
|
||||
}
|
||||
}
|
||||
|
@ -447,7 +450,8 @@ public class TestNodeStatusUpdater {
|
|||
RegisterNodeManagerResponse response = recordFactory
|
||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
response.setNodeAction(registerNodeAction );
|
||||
response.setMasterKey(createMasterKey());
|
||||
response.setContainerTokenMasterKey(createMasterKey());
|
||||
response.setNMTokenMasterKey(createMasterKey());
|
||||
response.setDiagnosticsMessage(shutDownMessage);
|
||||
return response;
|
||||
}
|
||||
|
@ -459,7 +463,7 @@ public class TestNodeStatusUpdater {
|
|||
|
||||
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
|
||||
newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
|
||||
null, null, 1000L);
|
||||
null, null, null, 1000L);
|
||||
nhResponse.setDiagnosticsMessage(shutDownMessage);
|
||||
return nhResponse;
|
||||
}
|
||||
|
@ -485,7 +489,8 @@ public class TestNodeStatusUpdater {
|
|||
RegisterNodeManagerResponse response =
|
||||
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
response.setNodeAction(registerNodeAction);
|
||||
response.setMasterKey(createMasterKey());
|
||||
response.setContainerTokenMasterKey(createMasterKey());
|
||||
response.setNMTokenMasterKey(createMasterKey());
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -497,7 +502,7 @@ public class TestNodeStatusUpdater {
|
|||
nodeStatus.setResponseId(heartBeatID++);
|
||||
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
|
||||
newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
|
||||
null, null, 1000L);
|
||||
null, null, null, 1000L);
|
||||
|
||||
if (nodeStatus.getKeepAliveApplications() != null
|
||||
&& nodeStatus.getKeepAliveApplications().size() > 0) {
|
||||
|
@ -536,7 +541,8 @@ public class TestNodeStatusUpdater {
|
|||
RegisterNodeManagerResponse response = recordFactory
|
||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
response.setNodeAction(registerNodeAction);
|
||||
response.setMasterKey(createMasterKey());
|
||||
response.setContainerTokenMasterKey(createMasterKey());
|
||||
response.setNMTokenMasterKey(createMasterKey());
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -616,7 +622,7 @@ public class TestNodeStatusUpdater {
|
|||
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
|
||||
heartBeatNodeAction,
|
||||
null, null, null,
|
||||
1000L);
|
||||
null, 1000L);
|
||||
return nhResponse;
|
||||
}
|
||||
}
|
||||
|
@ -631,8 +637,8 @@ public class TestNodeStatusUpdater {
|
|||
RegisterNodeManagerResponse response = recordFactory
|
||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||
response.setNodeAction(registerNodeAction );
|
||||
response.setMasterKey(createMasterKey());
|
||||
|
||||
response.setContainerTokenMasterKey(createMasterKey());
|
||||
response.setNMTokenMasterKey(createMasterKey());
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -1004,10 +1010,11 @@ public class TestNodeStatusUpdater {
|
|||
|
||||
@Override
|
||||
protected NMContext createNMContext(
|
||||
NMContainerTokenSecretManager containerTokenSecretManager) {
|
||||
return new MyNMContext(containerTokenSecretManager);
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager) {
|
||||
return new MyNMContext(containerTokenSecretManager,
|
||||
nmTokenSecretManager);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
YarnConfiguration conf = createNMConfig();
|
||||
|
@ -1052,9 +1059,10 @@ public class TestNodeStatusUpdater {
|
|||
ConcurrentMap<ContainerId, Container> containers =
|
||||
new ConcurrentSkipListMap<ContainerId, Container>();
|
||||
|
||||
public MyNMContext(NMContainerTokenSecretManager
|
||||
containerTokenSecretManager) {
|
||||
super(containerTokenSecretManager);
|
||||
public MyNMContext(
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager) {
|
||||
super(containerTokenSecretManager, nmTokenSecretManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.service.Service.STATE;
|
||||
import org.junit.After;
|
||||
|
@ -97,7 +98,7 @@ public abstract class BaseContainerManagerTest {
|
|||
protected static final int HTTP_PORT = 5412;
|
||||
protected Configuration conf = new YarnConfiguration();
|
||||
protected Context context = new NMContext(new NMContainerTokenSecretManager(
|
||||
conf)) {
|
||||
conf), new NMTokenSecretManagerInNM()) {
|
||||
public int getHttpPort() {
|
||||
return HTTP_PORT;
|
||||
};
|
||||
|
|
|
@ -76,7 +76,7 @@ public class TestNMWebServer {
|
|||
}
|
||||
|
||||
private int startNMWebAppServer(String webAddr) {
|
||||
Context nmContext = new NodeManager.NMContext(null);
|
||||
Context nmContext = new NodeManager.NMContext(null, null);
|
||||
ResourceView resourceView = new ResourceView() {
|
||||
@Override
|
||||
public long getVmemAllocatedForContainers() {
|
||||
|
@ -134,7 +134,7 @@ public class TestNMWebServer {
|
|||
|
||||
@Test
|
||||
public void testNMWebApp() throws IOException {
|
||||
Context nmContext = new NodeManager.NMContext(null);
|
||||
Context nmContext = new NodeManager.NMContext(null, null);
|
||||
ResourceView resourceView = new ResourceView() {
|
||||
@Override
|
||||
public long getVmemAllocatedForContainers() {
|
||||
|
|
|
@ -86,7 +86,7 @@ public class TestNMWebServices extends JerseyTest {
|
|||
private Injector injector = Guice.createInjector(new ServletModule() {
|
||||
@Override
|
||||
protected void configureServlets() {
|
||||
nmContext = new NodeManager.NMContext(null);
|
||||
nmContext = new NodeManager.NMContext(null, null);
|
||||
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
|
||||
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
|
||||
resourceView = new ResourceView() {
|
||||
|
|
|
@ -93,7 +93,7 @@ public class TestNMWebServicesApps extends JerseyTest {
|
|||
private Injector injector = Guice.createInjector(new ServletModule() {
|
||||
@Override
|
||||
protected void configureServlets() {
|
||||
nmContext = new NodeManager.NMContext(null);
|
||||
nmContext = new NodeManager.NMContext(null, null);
|
||||
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
|
||||
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
|
||||
resourceView = new ResourceView() {
|
||||
|
|
|
@ -93,7 +93,7 @@ public class TestNMWebServicesContainers extends JerseyTest {
|
|||
private Injector injector = Guice.createInjector(new ServletModule() {
|
||||
@Override
|
||||
protected void configureServlets() {
|
||||
nmContext = new NodeManager.NMContext(null) {
|
||||
nmContext = new NodeManager.NMContext(null, null) {
|
||||
public NodeId getNodeId() {
|
||||
return NodeId.newInstance("testhost.foo.com", 8042);
|
||||
};
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSe
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
|
||||
/**
|
||||
* Context of the ResourceManager.
|
||||
|
@ -59,6 +60,8 @@ public interface RMContext {
|
|||
ApplicationTokenSecretManager getApplicationTokenSecretManager();
|
||||
|
||||
RMContainerTokenSecretManager getContainerTokenSecretManager();
|
||||
|
||||
NMTokenSecretManagerInRM getNMTokenSecretManager();
|
||||
|
||||
ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager();
|
||||
}
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSe
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
@ -58,6 +59,7 @@ public class RMContextImpl implements RMContext {
|
|||
private final DelegationTokenRenewer tokenRenewer;
|
||||
private final ApplicationTokenSecretManager appTokenSecretManager;
|
||||
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
||||
private final NMTokenSecretManagerInRM nmTokenSecretManager;
|
||||
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
|
||||
|
||||
public RMContextImpl(Dispatcher rmDispatcher,
|
||||
|
@ -68,6 +70,7 @@ public class RMContextImpl implements RMContext {
|
|||
DelegationTokenRenewer tokenRenewer,
|
||||
ApplicationTokenSecretManager appTokenSecretManager,
|
||||
RMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInRM nmTokenSecretManager,
|
||||
ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
|
||||
this.rmDispatcher = rmDispatcher;
|
||||
this.stateStore = store;
|
||||
|
@ -77,6 +80,7 @@ public class RMContextImpl implements RMContext {
|
|||
this.tokenRenewer = tokenRenewer;
|
||||
this.appTokenSecretManager = appTokenSecretManager;
|
||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||
this.nmTokenSecretManager = nmTokenSecretManager;
|
||||
this.clientToAMTokenSecretManager = clientTokenSecretManager;
|
||||
}
|
||||
|
||||
|
@ -89,10 +93,12 @@ public class RMContextImpl implements RMContext {
|
|||
DelegationTokenRenewer tokenRenewer,
|
||||
ApplicationTokenSecretManager appTokenSecretManager,
|
||||
RMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInRM nmTokenSecretManager,
|
||||
ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
|
||||
this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
|
||||
amFinishingMonitor, tokenRenewer, appTokenSecretManager,
|
||||
containerTokenSecretManager, clientTokenSecretManager);
|
||||
containerTokenSecretManager, nmTokenSecretManager,
|
||||
clientTokenSecretManager);
|
||||
RMStateStore nullStore = new NullRMStateStore();
|
||||
nullStore.setDispatcher(rmDispatcher);
|
||||
try {
|
||||
|
@ -157,7 +163,12 @@ public class RMContextImpl implements RMContext {
|
|||
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
|
||||
return this.containerTokenSecretManager;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public NMTokenSecretManagerInRM getNMTokenSecretManager() {
|
||||
return this.nmTokenSecretManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
|
||||
return this.clientToAMTokenSecretManager;
|
||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
|
||||
|
@ -104,6 +105,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
new ClientToAMTokenSecretManagerInRM();
|
||||
|
||||
protected RMContainerTokenSecretManager containerTokenSecretManager;
|
||||
protected NMTokenSecretManagerInRM nmTokenSecretManager;
|
||||
|
||||
protected ApplicationTokenSecretManager appTokenSecretManager;
|
||||
|
||||
|
@ -164,6 +166,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
addService(tokenRenewer);
|
||||
|
||||
this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
|
||||
this.nmTokenSecretManager = createNMTokenSecretManager(conf);
|
||||
|
||||
boolean isRecoveryEnabled = conf.getBoolean(
|
||||
YarnConfiguration.RECOVERY_ENABLED,
|
||||
|
@ -191,7 +194,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
new RMContextImpl(this.rmDispatcher, rmStore,
|
||||
this.containerAllocationExpirer, amLivelinessMonitor,
|
||||
amFinishingMonitor, tokenRenewer, this.appTokenSecretManager,
|
||||
this.containerTokenSecretManager, this.clientToAMSecretManager);
|
||||
this.containerTokenSecretManager, this.nmTokenSecretManager,
|
||||
this.clientToAMSecretManager);
|
||||
|
||||
// Register event handler for NodesListManager
|
||||
this.nodesListManager = new NodesListManager(this.rmContext);
|
||||
|
@ -271,6 +275,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
return new RMContainerTokenSecretManager(conf);
|
||||
}
|
||||
|
||||
protected NMTokenSecretManagerInRM createNMTokenSecretManager(
|
||||
Configuration conf) {
|
||||
return new NMTokenSecretManagerInRM(conf);
|
||||
}
|
||||
|
||||
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
||||
return new SchedulerEventDispatcher(this.scheduler);
|
||||
}
|
||||
|
@ -586,6 +595,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
|
||||
this.appTokenSecretManager.start();
|
||||
this.containerTokenSecretManager.start();
|
||||
this.nmTokenSecretManager.start();
|
||||
|
||||
if(recoveryEnabled) {
|
||||
try {
|
||||
|
@ -649,6 +659,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
if (containerTokenSecretManager != null) {
|
||||
this.containerTokenSecretManager.stop();
|
||||
}
|
||||
if(nmTokenSecretManager != null) {
|
||||
nmTokenSecretManager.stop();
|
||||
}
|
||||
|
||||
/*synchronized(shutdown) {
|
||||
shutdown.set(true);
|
||||
|
@ -671,7 +684,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
|
||||
protected ResourceTrackerService createResourceTrackerService() {
|
||||
return new ResourceTrackerService(this.rmContext, this.nodesListManager,
|
||||
this.nmLivelinessMonitor, this.containerTokenSecretManager);
|
||||
this.nmLivelinessMonitor, this.containerTokenSecretManager,
|
||||
this.nmTokenSecretManager);
|
||||
}
|
||||
|
||||
protected RMDelegationTokenSecretManager
|
||||
|
@ -747,6 +761,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
return this.containerTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
public NMTokenSecretManagerInRM getRMNMTokenSecretManager() {
|
||||
return this.nmTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
public ApplicationTokenSecretManager getApplicationTokenSecretManager(){
|
||||
return this.appTokenSecretManager;
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
|
@ -68,6 +68,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
private final NodesListManager nodesListManager;
|
||||
private final NMLivelinessMonitor nmLivelinessMonitor;
|
||||
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
||||
private final NMTokenSecretManagerInRM nmTokenSecretManager;
|
||||
|
||||
private long nextHeartBeatInterval;
|
||||
private Server server;
|
||||
|
@ -90,12 +91,14 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
public ResourceTrackerService(RMContext rmContext,
|
||||
NodesListManager nodesListManager,
|
||||
NMLivelinessMonitor nmLivelinessMonitor,
|
||||
RMContainerTokenSecretManager containerTokenSecretManager) {
|
||||
RMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInRM nmTokenSecretManager) {
|
||||
super(ResourceTrackerService.class.getName());
|
||||
this.rmContext = rmContext;
|
||||
this.nodesListManager = nodesListManager;
|
||||
this.nmLivelinessMonitor = nmLivelinessMonitor;
|
||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||
this.nmTokenSecretManager = nmTokenSecretManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -197,9 +200,10 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
return response;
|
||||
}
|
||||
|
||||
MasterKey nextMasterKeyForNode =
|
||||
this.containerTokenSecretManager.getCurrentKey();
|
||||
response.setMasterKey(nextMasterKeyForNode);
|
||||
response.setContainerTokenMasterKey(containerTokenSecretManager
|
||||
.getCurrentKey());
|
||||
response.setNMTokenMasterKey(nmTokenSecretManager
|
||||
.getCurrentKey());
|
||||
|
||||
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
|
||||
resolve(host), capability);
|
||||
|
@ -292,27 +296,11 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
// Heartbeat response
|
||||
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
|
||||
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
|
||||
getResponseId() + 1, NodeAction.NORMAL, null, null, null,
|
||||
getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
|
||||
nextHeartBeatInterval);
|
||||
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
|
||||
|
||||
// Check if node's masterKey needs to be updated and if the currentKey has
|
||||
// roller over, send it across
|
||||
boolean shouldSendMasterKey = false;
|
||||
|
||||
MasterKey nextMasterKeyForNode =
|
||||
this.containerTokenSecretManager.getNextKey();
|
||||
if (nextMasterKeyForNode != null) {
|
||||
// nextMasterKeyForNode can be null if there is no outstanding key that
|
||||
// is in the activation period.
|
||||
MasterKey nodeKnownMasterKey = request.getLastKnownMasterKey();
|
||||
if (nodeKnownMasterKey.getKeyId() != nextMasterKeyForNode.getKeyId()) {
|
||||
shouldSendMasterKey = true;
|
||||
}
|
||||
}
|
||||
if (shouldSendMasterKey) {
|
||||
nodeHeartBeatResponse.setMasterKey(nextMasterKeyForNode);
|
||||
}
|
||||
populateKeys(request, nodeHeartBeatResponse);
|
||||
|
||||
// 4. Send status to RMNode, saving the latest response.
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
|
@ -323,6 +311,32 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
return nodeHeartBeatResponse;
|
||||
}
|
||||
|
||||
private void populateKeys(NodeHeartbeatRequest request,
|
||||
NodeHeartbeatResponse nodeHeartBeatResponse) {
|
||||
|
||||
// Check if node's masterKey needs to be updated and if the currentKey has
|
||||
// roller over, send it across
|
||||
|
||||
// ContainerTokenMasterKey
|
||||
|
||||
MasterKey nextMasterKeyForNode =
|
||||
this.containerTokenSecretManager.getNextKey();
|
||||
if (nextMasterKeyForNode != null
|
||||
&& (request.getLastKnownContainerTokenMasterKey().getKeyId()
|
||||
!= nextMasterKeyForNode.getKeyId())) {
|
||||
nodeHeartBeatResponse.setContainerTokenMasterKey(nextMasterKeyForNode);
|
||||
}
|
||||
|
||||
// NMTokenMasterKey
|
||||
|
||||
nextMasterKeyForNode = this.nmTokenSecretManager.getNextKey();
|
||||
if (nextMasterKeyForNode != null
|
||||
&& (request.getLastKnownNMTokenMasterKey().getKeyId()
|
||||
!= nextMasterKeyForNode.getKeyId())) {
|
||||
nodeHeartBeatResponse.setNMTokenMasterKey(nextMasterKeyForNode);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* resolving the network topology.
|
||||
* @param hostName the hostname of this node.
|
||||
|
|
|
@ -0,0 +1,153 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
|
||||
|
||||
public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
|
||||
|
||||
private static Log LOG = LogFactory
|
||||
.getLog(NMTokenSecretManagerInRM.class);
|
||||
|
||||
private MasterKeyData nextMasterKey;
|
||||
private Configuration conf;
|
||||
|
||||
private final Timer timer;
|
||||
private final long rollingInterval;
|
||||
private final long activationDelay;
|
||||
|
||||
public NMTokenSecretManagerInRM(Configuration conf) {
|
||||
this.conf = conf;
|
||||
timer = new Timer();
|
||||
rollingInterval = this.conf.getLong(
|
||||
YarnConfiguration.RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
||||
YarnConfiguration.DEFAULT_RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS)
|
||||
* 1000;
|
||||
// Add an activation delay. This is to address the following race: RM may
|
||||
// roll over master-key, scheduling may happen at some point of time, an
|
||||
// NMToken created with a password generated off new master key, but NM
|
||||
// might not have come again to RM to update the shared secret: so AM has a
|
||||
// valid password generated off new secret but NM doesn't know about the
|
||||
// secret yet.
|
||||
// Adding delay = 1.5 * expiry interval makes sure that all active NMs get
|
||||
// the updated shared-key.
|
||||
this.activationDelay =
|
||||
(long) (conf.getLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS) * 1.5);
|
||||
LOG.info("NMTokenKeyRollingInterval: " + this.rollingInterval
|
||||
+ "ms and NMTokenKeyActivationDelay: " + this.activationDelay
|
||||
+ "ms");
|
||||
if (rollingInterval <= activationDelay * 2) {
|
||||
throw new IllegalArgumentException(
|
||||
YarnConfiguration.RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
|
||||
+ " should be more than 2 X "
|
||||
+ YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new master-key and sets it as the primary.
|
||||
*/
|
||||
@Private
|
||||
public void rollMasterKey() {
|
||||
super.writeLock.lock();
|
||||
try {
|
||||
LOG.info("Rolling master-key for nm-tokens");
|
||||
if (this.currentMasterKey == null) { // Setting up for the first time.
|
||||
this.currentMasterKey = createNewMasterKey();
|
||||
} else {
|
||||
this.nextMasterKey = createNewMasterKey();
|
||||
LOG.info("Going to activate master-key with key-id "
|
||||
+ this.nextMasterKey.getMasterKey().getKeyId() + " in "
|
||||
+ this.activationDelay + "ms");
|
||||
this.timer.schedule(new NextKeyActivator(), this.activationDelay);
|
||||
}
|
||||
} finally {
|
||||
super.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
public MasterKey getNextKey() {
|
||||
super.readLock.lock();
|
||||
try {
|
||||
if (this.nextMasterKey == null) {
|
||||
return null;
|
||||
} else {
|
||||
return this.nextMasterKey.getMasterKey();
|
||||
}
|
||||
} finally {
|
||||
super.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Activate the new master-key
|
||||
*/
|
||||
@Private
|
||||
public void activateNextMasterKey() {
|
||||
super.writeLock.lock();
|
||||
try {
|
||||
LOG.info("Activating next master key with id: "
|
||||
+ this.nextMasterKey.getMasterKey().getKeyId());
|
||||
this.currentMasterKey = this.nextMasterKey;
|
||||
this.nextMasterKey = null;
|
||||
} finally {
|
||||
super.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void start() {
|
||||
rollMasterKey();
|
||||
this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
|
||||
rollingInterval);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
this.timer.cancel();
|
||||
}
|
||||
|
||||
private class MasterKeyRoller extends TimerTask {
|
||||
@Override
|
||||
public void run() {
|
||||
rollMasterKey();
|
||||
}
|
||||
}
|
||||
|
||||
private class NextKeyActivator extends TimerTask {
|
||||
@Override
|
||||
public void run() {
|
||||
// Activation will happen after an absolute time interval. It will be good
|
||||
// if we can force activation after an NM updates and acknowledges a
|
||||
// roll-over. But that is only possible when we move to per-NM keys. TODO:
|
||||
activateNextMasterKey();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
|
||||
/**
|
||||
* SecretManager for ContainerTokens. This is RM-specific and rolls the
|
||||
|
|
|
@ -48,7 +48,8 @@ public class MockNM {
|
|||
private final int vCores = 1;
|
||||
private ResourceTrackerService resourceTracker;
|
||||
private final int httpPort = 2;
|
||||
private MasterKey currentMasterKey;
|
||||
private MasterKey currentContainerTokenMasterKey;
|
||||
private MasterKey currentNMTokenMasterKey;
|
||||
|
||||
public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
|
||||
this.memory = memory;
|
||||
|
@ -86,7 +87,9 @@ public class MockNM {
|
|||
req.setResource(resource);
|
||||
RegisterNodeManagerResponse registrationResponse =
|
||||
resourceTracker.registerNodeManager(req);
|
||||
this.currentMasterKey = registrationResponse.getMasterKey();
|
||||
this.currentContainerTokenMasterKey =
|
||||
registrationResponse.getContainerTokenMasterKey();
|
||||
this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
|
||||
return registrationResponse;
|
||||
}
|
||||
|
||||
|
@ -129,14 +132,25 @@ public class MockNM {
|
|||
healthStatus.setLastHealthReportTime(1);
|
||||
status.setNodeHealthStatus(healthStatus);
|
||||
req.setNodeStatus(status);
|
||||
req.setLastKnownMasterKey(this.currentMasterKey);
|
||||
req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey);
|
||||
req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey);
|
||||
NodeHeartbeatResponse heartbeatResponse =
|
||||
resourceTracker.nodeHeartbeat(req);
|
||||
MasterKey masterKeyFromRM = heartbeatResponse.getMasterKey();
|
||||
this.currentMasterKey =
|
||||
(masterKeyFromRM != null
|
||||
&& masterKeyFromRM.getKeyId() != this.currentMasterKey.getKeyId()
|
||||
? masterKeyFromRM : this.currentMasterKey);
|
||||
|
||||
MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey();
|
||||
if (masterKeyFromRM != null
|
||||
&& masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey
|
||||
.getKeyId()) {
|
||||
this.currentContainerTokenMasterKey = masterKeyFromRM;
|
||||
}
|
||||
|
||||
masterKeyFromRM = heartbeatResponse.getNMTokenMasterKey();
|
||||
if (masterKeyFromRM != null
|
||||
&& masterKeyFromRM.getKeyId() != this.currentNMTokenMasterKey
|
||||
.getKeyId()) {
|
||||
this.currentNMTokenMasterKey = masterKeyFromRM;
|
||||
}
|
||||
|
||||
return heartbeatResponse;
|
||||
}
|
||||
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
|
@ -92,10 +93,10 @@ public class MockRM extends ResourceManager {
|
|||
RMApp app = getRMContext().getRMApps().get(appId);
|
||||
Assert.assertNotNull("app shouldn't be null", app);
|
||||
int timeoutSecs = 0;
|
||||
while (!finalState.equals(app.getState()) && timeoutSecs++ < 20) {
|
||||
while (!finalState.equals(app.getState()) && timeoutSecs++ < 40) {
|
||||
System.out.println("App : " + appId + " State is : " + app.getState()
|
||||
+ " Waiting for state : " + finalState);
|
||||
Thread.sleep(500);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
System.out.println("App State is : " + app.getState());
|
||||
Assert.assertEquals("App state is not correct (timedout)", finalState,
|
||||
|
@ -109,11 +110,11 @@ public class MockRM extends ResourceManager {
|
|||
Assert.assertNotNull("app shouldn't be null", app);
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
int timeoutSecs = 0;
|
||||
while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ < 20) {
|
||||
while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ < 40) {
|
||||
System.out.println("AppAttempt : " + attemptId
|
||||
+ " State is : " + attempt.getAppAttemptState()
|
||||
+ " Waiting for state : " + finalState);
|
||||
Thread.sleep(500);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
System.out.println("Attempt State is : " + attempt.getAppAttemptState());
|
||||
Assert.assertEquals("Attempt state is not correct (timedout)", finalState,
|
||||
|
@ -306,11 +307,12 @@ public class MockRM extends ResourceManager {
|
|||
|
||||
@Override
|
||||
protected ResourceTrackerService createResourceTrackerService() {
|
||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new RMContainerTokenSecretManager(new Configuration());
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
containerTokenSecretManager.rollMasterKey();
|
||||
return new ResourceTrackerService(getRMContext(), nodesListManager,
|
||||
this.nmLivelinessMonitor, containerTokenSecretManager) {
|
||||
this.nmLivelinessMonitor, containerTokenSecretManager,
|
||||
nmTokenSecretManager) {
|
||||
|
||||
@Override
|
||||
protected void serviceStart() {
|
||||
|
|
|
@ -187,7 +187,7 @@ public class TestAMAuthorization {
|
|||
nm1.nodeHeartbeat(true);
|
||||
|
||||
int waitCount = 0;
|
||||
while (containerManager.amTokens == null && waitCount++ < 20) {
|
||||
while (containerManager.amTokens == null && waitCount++ < 40) {
|
||||
LOG.info("Waiting for AM Launch to happen..");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
@ -270,7 +270,7 @@ public class TestAMAuthorization {
|
|||
throws InterruptedException {
|
||||
int waitCount = 0;
|
||||
while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED
|
||||
&& waitCount++ < 20) {
|
||||
&& waitCount++ < 40) {
|
||||
LOG.info("Waiting for AppAttempt to reach LAUNCHED state. "
|
||||
+ "Current state is " + attempt.getAppAttemptState());
|
||||
Thread.sleep(1000);
|
||||
|
|
|
@ -101,7 +101,7 @@ public class TestAppManager{
|
|||
rmDispatcher);
|
||||
return new RMContextImpl(rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||
null, null, null, null) {
|
||||
null, null, null, null, null) {
|
||||
@Override
|
||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||
return map;
|
||||
|
|
|
@ -204,7 +204,7 @@ public class TestFifoScheduler {
|
|||
testMinimumAllocation(conf, allocMB / 2);
|
||||
}
|
||||
|
||||
@Test (timeout = 5000)
|
||||
@Test (timeout = 50000)
|
||||
public void testReconnectedNode() throws Exception {
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
conf.setQueues("default", new String[] {"default"});
|
||||
|
@ -233,7 +233,7 @@ public class TestFifoScheduler {
|
|||
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
|
||||
}
|
||||
|
||||
@Test (timeout = 5000)
|
||||
@Test (timeout = 50000)
|
||||
public void testHeadroom() throws Exception {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
|
|
|
@ -137,7 +137,7 @@ public class TestRM {
|
|||
rm.stop();
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
@Test (timeout = 300000)
|
||||
public void testActivatingApplicationAfterAddingNM() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
||||
|
|
|
@ -84,7 +84,7 @@ public class TestRMNodeTransitions {
|
|||
|
||||
rmContext =
|
||||
new RMContextImpl(rmDispatcher, null, null, null,
|
||||
mock(DelegationTokenRenewer.class), null, null, null);
|
||||
mock(DelegationTokenRenewer.class), null, null, null, null);
|
||||
scheduler = mock(YarnScheduler.class);
|
||||
doAnswer(
|
||||
new Answer<Void>() {
|
||||
|
|
|
@ -88,7 +88,7 @@ public class TestRMRestart {
|
|||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
}
|
||||
|
||||
@Test (timeout=60000)
|
||||
@Test (timeout=180000)
|
||||
public void testRMRestart() throws Exception {
|
||||
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
|
|
|
@ -53,6 +53,7 @@ public class TestResourceManager {
|
|||
resourceManager = new ResourceManager();
|
||||
resourceManager.init(conf);
|
||||
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
|
||||
resourceManager.getRMNMTokenSecretManager().rollMasterKey();
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -60,7 +60,7 @@ public class TestResourceTrackerService {
|
|||
* Test RM read NM next heartBeat Interval correctly from Configuration file,
|
||||
* and NM get next heartBeat Interval from RM correctly
|
||||
*/
|
||||
@Test (timeout = 5000)
|
||||
@Test (timeout = 50000)
|
||||
public void testGetNextHeartBeatInterval() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, "4000");
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -71,7 +72,7 @@ public class TestNMExpiry {
|
|||
// Dispatcher that processes events inline
|
||||
Dispatcher dispatcher = new InlineDispatcher();
|
||||
RMContext context = new RMContextImpl(dispatcher, null,
|
||||
null, null, null, null, null, null);
|
||||
null, null, null, null, null, null, null);
|
||||
dispatcher.register(SchedulerEventType.class,
|
||||
new InlineDispatcher.EmptyEventHandler());
|
||||
dispatcher.register(RMNodeEventType.class,
|
||||
|
@ -85,8 +86,12 @@ public class TestNMExpiry {
|
|||
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new RMContainerTokenSecretManager(conf);
|
||||
containerTokenSecretManager.start();
|
||||
NMTokenSecretManagerInRM nmTokenSecretManager =
|
||||
new NMTokenSecretManagerInRM(conf);
|
||||
nmTokenSecretManager.start();
|
||||
resourceTrackerService = new ResourceTrackerService(context,
|
||||
nodesListManager, nmLivelinessMonitor, containerTokenSecretManager);
|
||||
nodesListManager, nmLivelinessMonitor, containerTokenSecretManager,
|
||||
nmTokenSecretManager);
|
||||
|
||||
resourceTrackerService.init(conf);
|
||||
resourceTrackerService.start();
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -69,16 +70,19 @@ public class TestRMNMRPCResponseId {
|
|||
});
|
||||
RMContext context =
|
||||
new RMContextImpl(dispatcher, null, null, null, null,
|
||||
null, new RMContainerTokenSecretManager(conf), null);
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf), null);
|
||||
dispatcher.register(RMNodeEventType.class,
|
||||
new ResourceManager.NodeEventDispatcher(context));
|
||||
NodesListManager nodesListManager = new NodesListManager(context);
|
||||
nodesListManager.init(conf);
|
||||
|
||||
context.getContainerTokenSecretManager().rollMasterKey();
|
||||
context.getNMTokenSecretManager().rollMasterKey();
|
||||
resourceTrackerService = new ResourceTrackerService(context,
|
||||
nodesListManager, new NMLivelinessMonitor(dispatcher),
|
||||
context.getContainerTokenSecretManager());
|
||||
context.getContainerTokenSecretManager(),
|
||||
context.getNMTokenSecretManager());
|
||||
resourceTrackerService.init(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -146,6 +147,7 @@ public class TestRMAppTransitions {
|
|||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||
null, new ApplicationTokenSecretManager(conf),
|
||||
new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM());
|
||||
|
||||
rmDispatcher.register(RMAppAttemptEventType.class,
|
||||
|
|
|
@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSe
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -168,6 +169,7 @@ public class TestRMAppAttemptTransitions {
|
|||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||
null, new ApplicationTokenSecretManager(conf),
|
||||
new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM());
|
||||
|
||||
RMStateStore store = mock(RMStateStore.class);
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -272,6 +273,7 @@ public class TestCapacityScheduler {
|
|||
cs.setConf(new YarnConfiguration());
|
||||
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM()));
|
||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||
|
||||
|
@ -370,6 +372,7 @@ public class TestCapacityScheduler {
|
|||
|
||||
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM()));
|
||||
}
|
||||
|
||||
|
@ -382,6 +385,7 @@ public class TestCapacityScheduler {
|
|||
cs.setConf(new YarnConfiguration());
|
||||
cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
|
||||
null, null, new RMContainerTokenSecretManager(csConf),
|
||||
new NMTokenSecretManagerInRM(csConf),
|
||||
new ClientToAMTokenSecretManagerInRM()));
|
||||
|
||||
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
|
||||
|
@ -408,6 +412,7 @@ public class TestCapacityScheduler {
|
|||
cs.setConf(new YarnConfiguration());
|
||||
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM()));
|
||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestQueueParsing {
|
||||
|
@ -45,6 +46,7 @@ public class TestQueueParsing {
|
|||
capacityScheduler.setConf(conf);
|
||||
capacityScheduler.reinitialize(conf, new RMContextImpl(null, null,
|
||||
null, null, null, null, new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM()));
|
||||
|
||||
CSQueue a = capacityScheduler.getQueue("a");
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSe
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
|
||||
public class TestUtils {
|
||||
private static final Log LOG = LogFactory.getLog(TestUtils.class);
|
||||
|
@ -87,6 +88,7 @@ public class TestUtils {
|
|||
new RMContextImpl(nullDispatcher, cae, null, null, null,
|
||||
new ApplicationTokenSecretManager(conf),
|
||||
new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM());
|
||||
|
||||
return rmContext;
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -125,7 +126,7 @@ public class TestFifoScheduler {
|
|||
public void testAppAttemptMetrics() throws Exception {
|
||||
AsyncDispatcher dispatcher = new InlineDispatcher();
|
||||
RMContext rmContext = new RMContextImpl(dispatcher, null,
|
||||
null, null, null, null, null, null);
|
||||
null, null, null, null, null, null, null);
|
||||
|
||||
FifoScheduler schedular = new FifoScheduler();
|
||||
schedular.reinitialize(new Configuration(), rmContext);
|
||||
|
@ -150,11 +151,15 @@ public class TestFifoScheduler {
|
|||
@Test(timeout=2000)
|
||||
public void testNodeLocalAssignment() throws Exception {
|
||||
AsyncDispatcher dispatcher = new InlineDispatcher();
|
||||
Configuration conf = new Configuration();
|
||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new RMContainerTokenSecretManager(new Configuration());
|
||||
new RMContainerTokenSecretManager(conf);
|
||||
containerTokenSecretManager.rollMasterKey();
|
||||
NMTokenSecretManagerInRM nmTokenSecretManager =
|
||||
new NMTokenSecretManagerInRM(conf);
|
||||
nmTokenSecretManager.rollMasterKey();
|
||||
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
|
||||
null, containerTokenSecretManager, null);
|
||||
null, containerTokenSecretManager, nmTokenSecretManager, null);
|
||||
|
||||
FifoScheduler scheduler = new FifoScheduler();
|
||||
scheduler.reinitialize(new Configuration(), rmContext);
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.StringHelper;
|
||||
import org.apache.hadoop.yarn.webapp.WebApps;
|
||||
|
@ -160,7 +161,7 @@ public class TestRMWebApp {
|
|||
deactivatedNodesMap.put(node.getHostName(), node);
|
||||
}
|
||||
return new RMContextImpl(null, null, null, null,
|
||||
null, null, null, null) {
|
||||
null, null, null, null, null) {
|
||||
@Override
|
||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||
return applicationsMaps;
|
||||
|
@ -201,8 +202,9 @@ public class TestRMWebApp {
|
|||
CapacityScheduler cs = new CapacityScheduler();
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new ClientToAMTokenSecretManagerInRM()));
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM()));
|
||||
return cs;
|
||||
}
|
||||
|
||||
|
|
|
@ -78,6 +78,8 @@ public class TestRMWebServicesNodes extends JerseyTest {
|
|||
bind(RMWebServices.class);
|
||||
bind(GenericExceptionHandler.class);
|
||||
rm = new MockRM(new Configuration());
|
||||
rm.getRMContainerTokenSecretManager().rollMasterKey();
|
||||
rm.getRMNMTokenSecretManager().rollMasterKey();
|
||||
bind(ResourceManager.class).toInstance(rm);
|
||||
bind(RMContext.class).toInstance(rm.getRMContext());
|
||||
bind(ApplicationACLsManager.class).toInstance(
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
|
|||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestRMNMSecretKeys {
|
||||
|
@ -69,59 +68,94 @@ public class TestRMNMSecretKeys {
|
|||
rm.init(conf);
|
||||
rm.start();
|
||||
|
||||
// Testing ContainerToken and NMToken
|
||||
String containerToken = "Container Token : ";
|
||||
String nmToken = "NM Token : ";
|
||||
|
||||
MockNM nm = new MockNM("host:1234", 3072, rm.getResourceTrackerService());
|
||||
RegisterNodeManagerResponse registrationResponse = nm.registerNode();
|
||||
MasterKey masterKey = registrationResponse.getMasterKey();
|
||||
Assert.assertNotNull("Registration should cause a key-update!", masterKey);
|
||||
|
||||
MasterKey containerTokenMasterKey =
|
||||
registrationResponse.getContainerTokenMasterKey();
|
||||
Assert.assertNotNull(containerToken
|
||||
+ "Registration should cause a key-update!", containerTokenMasterKey);
|
||||
MasterKey nmTokenMasterKey = registrationResponse.getNMTokenMasterKey();
|
||||
Assert.assertNotNull(nmToken
|
||||
+ "Registration should cause a key-update!", nmTokenMasterKey);
|
||||
|
||||
dispatcher.await();
|
||||
|
||||
NodeHeartbeatResponse response = nm.nodeHeartbeat(true);
|
||||
Assert.assertNull(
|
||||
Assert.assertNull(containerToken +
|
||||
"First heartbeat after registration shouldn't get any key updates!",
|
||||
response.getMasterKey());
|
||||
response.getContainerTokenMasterKey());
|
||||
Assert.assertNull(nmToken +
|
||||
"First heartbeat after registration shouldn't get any key updates!",
|
||||
response.getNMTokenMasterKey());
|
||||
dispatcher.await();
|
||||
|
||||
response = nm.nodeHeartbeat(true);
|
||||
Assert
|
||||
.assertNull(
|
||||
"Even second heartbeat after registration shouldn't get any key updates!",
|
||||
response.getMasterKey());
|
||||
Assert.assertNull(containerToken +
|
||||
"Even second heartbeat after registration shouldn't get any key updates!",
|
||||
response.getContainerTokenMasterKey());
|
||||
Assert.assertNull(nmToken +
|
||||
"Even second heartbeat after registration shouldn't get any key updates!",
|
||||
response.getContainerTokenMasterKey());
|
||||
|
||||
dispatcher.await();
|
||||
|
||||
// Let's force a roll-over
|
||||
RMContainerTokenSecretManager secretManager =
|
||||
rm.getRMContainerTokenSecretManager();
|
||||
secretManager.rollMasterKey();
|
||||
rm.getRMContainerTokenSecretManager().rollMasterKey();
|
||||
rm.getRMNMTokenSecretManager().rollMasterKey();
|
||||
|
||||
// Heartbeats after roll-over and before activation should be fine.
|
||||
response = nm.nodeHeartbeat(true);
|
||||
Assert.assertNotNull(
|
||||
Assert.assertNotNull(containerToken +
|
||||
"Heartbeats after roll-over and before activation should not err out.",
|
||||
response.getMasterKey());
|
||||
Assert.assertEquals(
|
||||
response.getContainerTokenMasterKey());
|
||||
Assert.assertNotNull(nmToken +
|
||||
"Heartbeats after roll-over and before activation should not err out.",
|
||||
response.getNMTokenMasterKey());
|
||||
|
||||
Assert.assertEquals(containerToken +
|
||||
"Roll-over should have incremented the key-id only by one!",
|
||||
masterKey.getKeyId() + 1, response.getMasterKey().getKeyId());
|
||||
containerTokenMasterKey.getKeyId() + 1,
|
||||
response.getContainerTokenMasterKey().getKeyId());
|
||||
Assert.assertEquals(nmToken +
|
||||
"Roll-over should have incremented the key-id only by one!",
|
||||
nmTokenMasterKey.getKeyId() + 1,
|
||||
response.getNMTokenMasterKey().getKeyId());
|
||||
dispatcher.await();
|
||||
|
||||
response = nm.nodeHeartbeat(true);
|
||||
Assert.assertNull(
|
||||
Assert.assertNull(containerToken +
|
||||
"Second heartbeat after roll-over shouldn't get any key updates!",
|
||||
response.getMasterKey());
|
||||
response.getContainerTokenMasterKey());
|
||||
Assert.assertNull(nmToken +
|
||||
"Second heartbeat after roll-over shouldn't get any key updates!",
|
||||
response.getNMTokenMasterKey());
|
||||
dispatcher.await();
|
||||
|
||||
// Let's force activation
|
||||
secretManager.activateNextMasterKey();
|
||||
rm.getRMContainerTokenSecretManager().activateNextMasterKey();
|
||||
rm.getRMNMTokenSecretManager().activateNextMasterKey();
|
||||
|
||||
response = nm.nodeHeartbeat(true);
|
||||
Assert.assertNull("Activation shouldn't cause any key updates!",
|
||||
response.getMasterKey());
|
||||
Assert.assertNull(containerToken
|
||||
+ "Activation shouldn't cause any key updates!",
|
||||
response.getContainerTokenMasterKey());
|
||||
Assert.assertNull(nmToken
|
||||
+ "Activation shouldn't cause any key updates!",
|
||||
response.getNMTokenMasterKey());
|
||||
dispatcher.await();
|
||||
|
||||
response = nm.nodeHeartbeat(true);
|
||||
Assert
|
||||
.assertNull(
|
||||
"Even second heartbeat after activation shouldn't get any key updates!",
|
||||
response.getMasterKey());
|
||||
Assert.assertNull(containerToken +
|
||||
"Even second heartbeat after activation shouldn't get any key updates!",
|
||||
response.getContainerTokenMasterKey());
|
||||
Assert.assertNull(nmToken +
|
||||
"Even second heartbeat after activation shouldn't get any key updates!",
|
||||
response.getNMTokenMasterKey());
|
||||
dispatcher.await();
|
||||
|
||||
rm.stop();
|
||||
|
|
Loading…
Reference in New Issue