YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then use them for authentication with NMs. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1493448 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1a389305b2
commit
769a0bd831
|
@ -358,6 +358,9 @@ Release 2.1.0-beta - UNRELEASED
|
|||
YARN-639. Modified Distributed Shell application to start using the new
|
||||
NMClient library. (Zhijie Shen via vinodkv)
|
||||
|
||||
YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then
|
||||
use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-512. Log aggregation root directory check is more expensive than it
|
||||
|
|
|
@ -29,10 +29,10 @@ import org.apache.hadoop.yarn.api.AMRMProtocol;
|
|||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
|
@ -67,7 +67,7 @@ public abstract class AllocateResponse {
|
|||
List<ContainerStatus> completedContainers,
|
||||
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
|
||||
Resource availResources, AMCommand command, int numClusterNodes,
|
||||
PreemptionMessage preempt) {
|
||||
PreemptionMessage preempt, List<NMToken> nmTokens) {
|
||||
AllocateResponse response = Records.newRecord(AllocateResponse.class);
|
||||
response.setNumClusterNodes(numClusterNodes);
|
||||
response.setResponseId(responseId);
|
||||
|
@ -77,6 +77,7 @@ public abstract class AllocateResponse {
|
|||
response.setAvailableResources(availResources);
|
||||
response.setAMCommand(command);
|
||||
response.setPreemptionMessage(preempt);
|
||||
response.setNMTokens(nmTokens);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -202,7 +203,7 @@ public abstract class AllocateResponse {
|
|||
|
||||
@Public
|
||||
@Stable
|
||||
public abstract void setNMTokens(List<Token> nmTokens);
|
||||
public abstract void setNMTokens(List<NMToken> nmTokens);
|
||||
|
||||
/**
|
||||
* Get the list of NMTokens required for communicating with NM. New NMTokens
|
||||
|
@ -217,6 +218,6 @@ public abstract class AllocateResponse {
|
|||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract List<Token> getNMTokens();
|
||||
public abstract List<NMToken> getNMTokens();
|
||||
|
||||
}
|
||||
|
|
|
@ -23,21 +23,20 @@ import java.util.ArrayList;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
|
||||
|
@ -46,6 +45,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
|||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
|
||||
|
||||
|
||||
public class AllocateResponsePBImpl extends AllocateResponse {
|
||||
|
@ -56,7 +56,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
Resource limit;
|
||||
|
||||
private List<Container> allocatedContainers = null;
|
||||
private List<Token> nmTokens = null;
|
||||
private List<NMToken> nmTokens = null;
|
||||
private List<ContainerStatus> completedContainersStatuses = null;
|
||||
|
||||
private List<NodeReport> updatedNodes = null;
|
||||
|
@ -108,7 +108,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
}
|
||||
if (nmTokens != null) {
|
||||
builder.clearNmTokens();
|
||||
Iterable<TokenProto> iterable = getTokenProtoIterable(nmTokens);
|
||||
Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
|
||||
builder.addAllNmTokens(iterable);
|
||||
}
|
||||
if (this.completedContainersStatuses != null) {
|
||||
|
@ -248,9 +248,11 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setNMTokens(List<Token> nmTokens) {
|
||||
public synchronized void setNMTokens(List<NMToken> nmTokens) {
|
||||
if (nmTokens == null || nmTokens.isEmpty()) {
|
||||
this.nmTokens.clear();
|
||||
if (this.nmTokens != null) {
|
||||
this.nmTokens.clear();
|
||||
}
|
||||
builder.clearNmTokens();
|
||||
return;
|
||||
}
|
||||
|
@ -260,7 +262,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<Token> getNMTokens() {
|
||||
public synchronized List<NMToken> getNMTokens() {
|
||||
initLocalNewNMTokenList();
|
||||
return nmTokens;
|
||||
}
|
||||
|
@ -334,9 +336,9 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
return;
|
||||
}
|
||||
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<TokenProto> list = p.getNmTokensList();
|
||||
nmTokens = new ArrayList<Token>();
|
||||
for (TokenProto t : list) {
|
||||
List<NMTokenProto> list = p.getNmTokensList();
|
||||
nmTokens = new ArrayList<NMToken>();
|
||||
for (NMTokenProto t : list) {
|
||||
nmTokens.add(convertFromProtoFormat(t));
|
||||
}
|
||||
}
|
||||
|
@ -372,15 +374,15 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
};
|
||||
}
|
||||
|
||||
private synchronized Iterable<TokenProto> getTokenProtoIterable(
|
||||
final List<Token> nmTokenList) {
|
||||
private synchronized Iterable<NMTokenProto> getTokenProtoIterable(
|
||||
final List<NMToken> nmTokenList) {
|
||||
maybeInitBuilder();
|
||||
return new Iterable<TokenProto>() {
|
||||
return new Iterable<NMTokenProto>() {
|
||||
@Override
|
||||
public synchronized Iterator<TokenProto> iterator() {
|
||||
return new Iterator<TokenProto>() {
|
||||
public synchronized Iterator<NMTokenProto> iterator() {
|
||||
return new Iterator<NMTokenProto>() {
|
||||
|
||||
Iterator<Token> iter = nmTokenList.iterator();
|
||||
Iterator<NMToken> iter = nmTokenList.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
|
@ -388,7 +390,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TokenProto next() {
|
||||
public NMTokenProto next() {
|
||||
return convertToProtoFormat(iter.next());
|
||||
}
|
||||
|
||||
|
@ -524,11 +526,11 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
return ((PreemptionMessagePBImpl)r).getProto();
|
||||
}
|
||||
|
||||
private synchronized TokenProto convertToProtoFormat(Token token) {
|
||||
return ((TokenPBImpl)token).getProto();
|
||||
private synchronized NMTokenProto convertToProtoFormat(NMToken token) {
|
||||
return ((NMTokenPBImpl)token).getProto();
|
||||
}
|
||||
|
||||
private synchronized Token convertFromProtoFormat(TokenProto proto) {
|
||||
return new TokenPBImpl(proto);
|
||||
private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) {
|
||||
return new NMTokenPBImpl(proto);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* NMToken is returned by RM on AllocateResponse.
|
||||
*/
|
||||
public abstract class NMToken {
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public abstract NodeId getNodeId();
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public abstract void setNodeId(NodeId nodeId);
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public abstract Token getToken();
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public abstract void setToken(Token token);
|
||||
|
||||
@Private
|
||||
public static NMToken newInstance(NodeId nodeId, Token token) {
|
||||
NMToken nmToken = Records.newRecord(NMToken.class);
|
||||
nmToken.setNodeId(nodeId);
|
||||
nmToken.setToken(token);
|
||||
return nmToken;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProtoOrBuilder;
|
||||
|
||||
|
||||
public class NMTokenPBImpl extends NMToken{
|
||||
|
||||
NMTokenProto proto = NMTokenProto.getDefaultInstance();
|
||||
NMTokenProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private Token token = null;
|
||||
private NodeId nodeId = null;
|
||||
|
||||
public NMTokenPBImpl() {
|
||||
builder = NMTokenProto.newBuilder();
|
||||
}
|
||||
|
||||
public NMTokenPBImpl(NMTokenProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized NodeId getNodeId() {
|
||||
NMTokenProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.nodeId != null) {
|
||||
return nodeId;
|
||||
}
|
||||
if (!p.hasNodeId()) {
|
||||
return null;
|
||||
}
|
||||
this.nodeId = convertFromProtoFormat(p.getNodeId());
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setNodeId(NodeId nodeId) {
|
||||
maybeInitBuilder();
|
||||
if (nodeId == null) {
|
||||
builder.clearNodeId();
|
||||
}
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Token getToken() {
|
||||
NMTokenProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.token != null) {
|
||||
return this.token;
|
||||
}
|
||||
if (!p.hasToken()) {
|
||||
return null;
|
||||
}
|
||||
this.token = convertFromProtoFormat(p.getToken());
|
||||
return token;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setToken(Token token) {
|
||||
maybeInitBuilder();
|
||||
if (token == null) {
|
||||
builder.clearToken();
|
||||
}
|
||||
this.token = token;
|
||||
}
|
||||
|
||||
public synchronized NMTokenProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private synchronized void mergeLocalToProto() {
|
||||
if (viaProto) {
|
||||
maybeInitBuilder();
|
||||
}
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private synchronized void mergeLocalToBuilder() {
|
||||
if (this.nodeId != null) {
|
||||
builder.setNodeId(convertToProtoFormat(nodeId));
|
||||
}
|
||||
if (this.token != null) {
|
||||
builder.setToken(convertToProtoFormat(token));
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void maybeInitBuilder() {
|
||||
if(viaProto || builder == null) {
|
||||
builder = NMTokenProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
private synchronized NodeId convertFromProtoFormat(NodeIdProto p) {
|
||||
return new NodeIdPBImpl(p);
|
||||
}
|
||||
|
||||
private synchronized NodeIdProto convertToProtoFormat(NodeId nodeId) {
|
||||
return ((NodeIdPBImpl)nodeId).getProto();
|
||||
}
|
||||
|
||||
private synchronized TokenProto convertToProtoFormat(Token token) {
|
||||
return ((TokenPBImpl)token).getProto();
|
||||
}
|
||||
|
||||
private synchronized Token convertFromProtoFormat(TokenProto proto) {
|
||||
return new TokenPBImpl(proto);
|
||||
}
|
||||
}
|
|
@ -59,6 +59,11 @@ message AllocateRequestProto {
|
|||
optional float progress = 6;
|
||||
}
|
||||
|
||||
message NMTokenProto {
|
||||
optional NodeIdProto nodeId = 1;
|
||||
optional hadoop.common.TokenProto token = 2;
|
||||
}
|
||||
|
||||
message AllocateResponseProto {
|
||||
optional AMCommandProto a_m_command = 1;
|
||||
optional int32 response_id = 2;
|
||||
|
@ -68,7 +73,7 @@ message AllocateResponseProto {
|
|||
repeated NodeReportProto updated_nodes = 6;
|
||||
optional int32 num_cluster_nodes = 7;
|
||||
optional PreemptionMessageProto preempt = 8;
|
||||
repeated hadoop.common.TokenProto nm_tokens = 9;
|
||||
repeated NMTokenProto nm_tokens = 9;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////
|
||||
|
|
|
@ -21,15 +21,17 @@ package org.apache.hadoop.yarn.client;
|
|||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
|
||||
|
@ -208,4 +210,13 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
|
|||
String resourceName,
|
||||
Resource capability);
|
||||
|
||||
/**
|
||||
* It returns the NMToken received on allocate call. It will not communicate
|
||||
* with RM to get NMTokens. On allocate call whenever we receive new token
|
||||
* along with container AMRMClient will cache this NMToken per node manager.
|
||||
* This map returned should be shared with any application which is
|
||||
* communicating with NodeManager (ex. NMClient) using NMTokens. If a new
|
||||
* NMToken is received for the same node manager then it will be replaced.
|
||||
*/
|
||||
public ConcurrentMap<String, Token> getNMTokens();
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
@ -260,6 +262,19 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
|
|||
public int getClusterNodeCount() {
|
||||
return client.getClusterNodeCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* It returns the NMToken received on allocate call. It will not communicate
|
||||
* with RM to get NMTokens. On allocate call whenever we receive new token
|
||||
* along with new container AMRMClientAsync will cache this NMToken per node
|
||||
* manager. This map returned should be shared with any application which is
|
||||
* communicating with NodeManager (ex. NMClient / NMClientAsync) using
|
||||
* NMTokens. If a new NMToken is received for the same node manager
|
||||
* then it will be replaced.
|
||||
*/
|
||||
public ConcurrentMap<String, Token> getNMTokens() {
|
||||
return client.getNMTokens();
|
||||
}
|
||||
|
||||
private class HeartbeatThread extends Thread {
|
||||
public HeartbeatThread() {
|
||||
|
|
|
@ -33,9 +33,11 @@ import java.util.Set;
|
|||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
|
@ -49,9 +51,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -61,6 +65,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
// TODO check inputs for null etc. YARN-654
|
||||
|
||||
@Unstable
|
||||
|
@ -73,6 +79,7 @@ public class AMRMClientImpl<T extends ContainerRequest>
|
|||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
private int lastResponseId = 0;
|
||||
private ConcurrentHashMap<String, Token> nmTokens;
|
||||
|
||||
protected AMRMProtocol rmClient;
|
||||
protected final ApplicationAttemptId appAttemptId;
|
||||
|
@ -148,6 +155,7 @@ public class AMRMClientImpl<T extends ContainerRequest>
|
|||
public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
|
||||
super(AMRMClientImpl.class.getName());
|
||||
this.appAttemptId = appAttemptId;
|
||||
this.nmTokens = new ConcurrentHashMap<String, Token>();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -238,6 +246,9 @@ public class AMRMClientImpl<T extends ContainerRequest>
|
|||
clusterNodeCount = allocateResponse.getNumClusterNodes();
|
||||
lastResponseId = allocateResponse.getResponseId();
|
||||
clusterAvailableResources = allocateResponse.getAvailableResources();
|
||||
if (!allocateResponse.getNMTokens().isEmpty()) {
|
||||
populateNMTokens(allocateResponse);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// TODO how to differentiate remote yarn exception vs error in rpc
|
||||
|
@ -265,6 +276,20 @@ public class AMRMClientImpl<T extends ContainerRequest>
|
|||
return allocateResponse;
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
protected void populateNMTokens(AllocateResponse allocateResponse) {
|
||||
for (NMToken token : allocateResponse.getNMTokens()) {
|
||||
String nodeId = token.getNodeId().toString();
|
||||
if (nmTokens.containsKey(nodeId)) {
|
||||
LOG.debug("Replacing token for : " + nodeId);
|
||||
} else {
|
||||
LOG.debug("Received new token for : " + nodeId);
|
||||
}
|
||||
nmTokens.put(nodeId, token.getToken());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
|
||||
String appMessage, String appTrackingUrl) throws YarnException,
|
||||
|
@ -512,4 +537,8 @@ public class AMRMClientImpl<T extends ContainerRequest>
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentHashMap<String, Token> getNMTokens() {
|
||||
return nmTokens;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,9 +25,14 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
|
@ -49,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
|
|||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
|
||||
|
@ -437,6 +443,11 @@ public class TestAMRMClient {
|
|||
int allocatedContainerCount = 0;
|
||||
int iterationsLeft = 2;
|
||||
Set<ContainerId> releases = new TreeSet<ContainerId>();
|
||||
|
||||
ConcurrentHashMap<String, Token> nmTokens = amClient.getNMTokens();
|
||||
Assert.assertEquals(0, nmTokens.size());
|
||||
HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
|
||||
|
||||
while (allocatedContainerCount < containersRequestedAny
|
||||
&& iterationsLeft-- > 0) {
|
||||
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||
|
@ -450,12 +461,32 @@ public class TestAMRMClient {
|
|||
releases.add(rejectContainerId);
|
||||
amClient.releaseAssignedContainer(rejectContainerId);
|
||||
}
|
||||
Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size());
|
||||
Iterator<String> nodeI = nmTokens.keySet().iterator();
|
||||
while (nodeI.hasNext()) {
|
||||
String nodeId = nodeI.next();
|
||||
if (!receivedNMTokens.containsKey(nodeId)) {
|
||||
receivedNMTokens.put(nodeId, nmTokens.get(nodeId));
|
||||
} else {
|
||||
Assert.fail("Received token again for : " + nodeId);
|
||||
}
|
||||
}
|
||||
nodeI = receivedNMTokens.keySet().iterator();
|
||||
while (nodeI.hasNext()) {
|
||||
nmTokens.remove(nodeI.next());
|
||||
}
|
||||
|
||||
if(allocatedContainerCount < containersRequestedAny) {
|
||||
// sleep to let NM's heartbeat to RM and trigger allocations
|
||||
sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Assert.assertEquals(0, amClient.getNMTokens().size());
|
||||
// Should receive atleast 1 token
|
||||
Assert.assertTrue(receivedNMTokens.size() > 0
|
||||
&& receivedNMTokens.size() <= nodeCount);
|
||||
|
||||
assertTrue(allocatedContainerCount == containersRequestedAny);
|
||||
assertTrue(amClient.release.size() == 2);
|
||||
assertTrue(amClient.ask.size() == 0);
|
||||
|
@ -523,7 +554,6 @@ public class TestAMRMClient {
|
|||
sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(amClient.ask.size() == 0);
|
||||
assertTrue(amClient.release.size() == 0);
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
||||
|
@ -66,11 +67,11 @@ public class TestAMRMClientAsync {
|
|||
List<Container> allocated1 = Arrays.asList(
|
||||
Container.newInstance(null, null, null, null, null, null));
|
||||
final AllocateResponse response1 = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), allocated1);
|
||||
new ArrayList<ContainerStatus>(), allocated1, null);
|
||||
final AllocateResponse response2 = createAllocateResponse(completed1,
|
||||
new ArrayList<Container>());
|
||||
new ArrayList<Container>(), null);
|
||||
final AllocateResponse emptyResponse = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
|
||||
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
|
||||
|
||||
TestCallbackHandler callbackHandler = new TestCallbackHandler();
|
||||
final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
||||
|
@ -146,7 +147,7 @@ public class TestAMRMClientAsync {
|
|||
Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
|
||||
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testAMRMClientAsyncException() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
|
@ -189,7 +190,7 @@ public class TestAMRMClientAsync {
|
|||
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
||||
|
||||
final AllocateResponse rebootResponse = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
|
||||
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
|
||||
rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
|
||||
when(client.allocate(anyFloat())).thenReturn(rebootResponse);
|
||||
|
||||
|
@ -215,9 +216,11 @@ public class TestAMRMClientAsync {
|
|||
}
|
||||
|
||||
private AllocateResponse createAllocateResponse(
|
||||
List<ContainerStatus> completed, List<Container> allocated) {
|
||||
AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated,
|
||||
new ArrayList<NodeReport>(), null, null, 1, null);
|
||||
List<ContainerStatus> completed, List<Container> allocated,
|
||||
List<NMToken> nmTokens) {
|
||||
AllocateResponse response =
|
||||
AllocateResponse.newInstance(0, completed, allocated,
|
||||
new ArrayList<NodeReport>(), null, null, 1, null, nmTokens);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,15 +21,19 @@ package org.apache.hadoop.yarn.security;
|
|||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
|
||||
|
||||
public class NMTokenIdentifier extends TokenIdentifier {
|
||||
|
@ -106,5 +110,4 @@ public class NMTokenIdentifier extends TokenIdentifier {
|
|||
public UserGroupInformation getUser() {
|
||||
return UserGroupInformation.createRemoteUser(appAttemptId.toString());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,14 +18,11 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.security;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -34,7 +31,6 @@ import org.apache.hadoop.security.token.SecretManager;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* SecretManager for ContainerTokens. Extended by both RM and NM and hence is
|
||||
|
|
|
@ -114,39 +114,4 @@ public class BaseNMTokenSecretManager extends
|
|||
public NMTokenIdentifier createIdentifier() {
|
||||
return new NMTokenIdentifier();
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function for creating NMTokens.
|
||||
*/
|
||||
public Token createNMToken(ApplicationAttemptId applicationAttemptId,
|
||||
NodeId nodeId, String applicationSubmitter) {
|
||||
byte[] password;
|
||||
NMTokenIdentifier identifier;
|
||||
|
||||
this.readLock.lock();
|
||||
try {
|
||||
identifier =
|
||||
new NMTokenIdentifier(applicationAttemptId, nodeId,
|
||||
applicationSubmitter, this.currentMasterKey.getMasterKey()
|
||||
.getKeyId());
|
||||
password = this.createPassword(identifier);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
return newNMToken(password, identifier);
|
||||
}
|
||||
|
||||
public static Token newNMToken(byte[] password,
|
||||
NMTokenIdentifier identifier) {
|
||||
NodeId nodeId = identifier.getNodeId();
|
||||
// RPC layer client expects ip:port as service for tokens
|
||||
InetSocketAddress addr =
|
||||
NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
|
||||
Token nmToken =
|
||||
Token.newInstance(identifier.getBytes(),
|
||||
NMTokenIdentifier.KIND.toString(), password, SecurityUtil
|
||||
.buildTokenService(addr).toString());
|
||||
return nmToken;
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -373,6 +373,12 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
// add preemption to the allocateResponse message (if any)
|
||||
allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
|
||||
|
||||
// Adding NMTokens for allocated containers.
|
||||
if (!allocation.getContainers().isEmpty()) {
|
||||
allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
|
||||
.getNMTokens(app.getUser(), appAttemptId,
|
||||
allocation.getContainers()));
|
||||
}
|
||||
return allocateResponse;
|
||||
}
|
||||
}
|
||||
|
@ -433,12 +439,15 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
AllocateResponse response =
|
||||
recordFactory.newRecordInstance(AllocateResponse.class);
|
||||
response.setResponseId(0);
|
||||
LOG.info("Registering " + attemptId);
|
||||
LOG.info("Registering app attempt : " + attemptId);
|
||||
responseMap.put(attemptId, response);
|
||||
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
|
||||
}
|
||||
|
||||
public void unregisterAttempt(ApplicationAttemptId attemptId) {
|
||||
LOG.info("Unregistering app attempt : " + attemptId);
|
||||
responseMap.remove(attemptId);
|
||||
rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId);
|
||||
}
|
||||
|
||||
public void refreshServiceAcls(Configuration configuration,
|
||||
|
|
|
@ -218,7 +218,9 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMNodeReconnectEvent(nodeId, rmNode));
|
||||
}
|
||||
|
||||
// On every node manager register we will be clearing NMToken keys if
|
||||
// present for any running application.
|
||||
this.nmTokenSecretManager.removeNodeKey(nodeId);
|
||||
this.nmLivelinessMonitor.register(nodeId);
|
||||
|
||||
String message =
|
||||
|
|
|
@ -18,18 +18,35 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
||||
public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
|
||||
|
||||
|
@ -42,6 +59,7 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
|
|||
private final Timer timer;
|
||||
private final long rollingInterval;
|
||||
private final long activationDelay;
|
||||
private final ConcurrentHashMap<ApplicationAttemptId, HashSet<NodeId>> appAttemptToNodeKeyMap;
|
||||
|
||||
public NMTokenSecretManagerInRM(Configuration conf) {
|
||||
this.conf = conf;
|
||||
|
@ -70,6 +88,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
|
|||
+ " should be more than 2 X "
|
||||
+ YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS);
|
||||
}
|
||||
appAttemptToNodeKeyMap =
|
||||
new ConcurrentHashMap<ApplicationAttemptId, HashSet<NodeId>>();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -119,11 +139,23 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
|
|||
+ this.nextMasterKey.getMasterKey().getKeyId());
|
||||
this.currentMasterKey = this.nextMasterKey;
|
||||
this.nextMasterKey = null;
|
||||
clearApplicationNMTokenKeys();
|
||||
} finally {
|
||||
super.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void clearApplicationNMTokenKeys() {
|
||||
// We should clear all node entries from this set.
|
||||
// TODO : Once we have per node master key then it will change to only
|
||||
// remove specific node from it.
|
||||
Iterator<HashSet<NodeId>> nodeSetI =
|
||||
this.appAttemptToNodeKeyMap.values().iterator();
|
||||
while (nodeSetI.hasNext()) {
|
||||
nodeSetI.next().clear();
|
||||
}
|
||||
}
|
||||
|
||||
public void start() {
|
||||
rollMasterKey();
|
||||
this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
|
||||
|
@ -150,4 +182,129 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
|
|||
activateNextMasterKey();
|
||||
}
|
||||
}
|
||||
|
||||
public List<NMToken> getNMTokens(String applicationSubmitter,
|
||||
ApplicationAttemptId appAttemptId, List<Container> containers) {
|
||||
try {
|
||||
this.readLock.lock();
|
||||
List<NMToken> nmTokens = new ArrayList<NMToken>();
|
||||
HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId);
|
||||
if (nodeSet != null) {
|
||||
for (Container container : containers) {
|
||||
if (!nodeSet.contains(container.getNodeId())) {
|
||||
LOG.debug("Sending NMToken for nodeId : "
|
||||
+ container.getNodeId().toString());
|
||||
Token token = createNMToken(appAttemptId, container.getNodeId(),
|
||||
applicationSubmitter);
|
||||
NMToken nmToken =
|
||||
NMToken.newInstance(container.getNodeId(), token);
|
||||
nmTokens.add(nmToken);
|
||||
nodeSet.add(container.getNodeId());
|
||||
}
|
||||
}
|
||||
}
|
||||
return nmTokens;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) {
|
||||
try {
|
||||
this.writeLock.lock();
|
||||
this.appAttemptToNodeKeyMap.put(appAttemptId, new HashSet<NodeId>());
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public boolean isApplicationAttemptRegistered(
|
||||
ApplicationAttemptId appAttemptId) {
|
||||
try {
|
||||
this.readLock.lock();
|
||||
return this.appAttemptToNodeKeyMap.containsKey(appAttemptId);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public boolean isApplicationAttemptNMTokenPresent(
|
||||
ApplicationAttemptId appAttemptId, NodeId nodeId) {
|
||||
try {
|
||||
this.readLock.lock();
|
||||
HashSet<NodeId> nodes = this.appAttemptToNodeKeyMap.get(appAttemptId);
|
||||
if (nodes != null && nodes.contains(nodeId)) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) {
|
||||
try {
|
||||
this.writeLock.lock();
|
||||
this.appAttemptToNodeKeyMap.remove(appAttemptId);
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is to be called when NodeManager reconnects or goes down. This will
|
||||
* remove if NMTokens if present for any running application from cache.
|
||||
* @param nodeId
|
||||
*/
|
||||
public void removeNodeKey(NodeId nodeId) {
|
||||
try {
|
||||
this.writeLock.lock();
|
||||
Iterator<HashSet<NodeId>> appNodeKeySetIterator =
|
||||
this.appAttemptToNodeKeyMap.values().iterator();
|
||||
while (appNodeKeySetIterator.hasNext()) {
|
||||
appNodeKeySetIterator.next().remove(nodeId);
|
||||
}
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public static Token newNMToken(byte[] password,
|
||||
NMTokenIdentifier identifier) {
|
||||
NodeId nodeId = identifier.getNodeId();
|
||||
// RPC layer client expects ip:port as service for tokens
|
||||
InetSocketAddress addr =
|
||||
NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
|
||||
Token nmToken =
|
||||
Token.newInstance(identifier.getBytes(),
|
||||
NMTokenIdentifier.KIND.toString(), password, SecurityUtil
|
||||
.buildTokenService(addr).toString());
|
||||
return nmToken;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function for creating NMTokens.
|
||||
*/
|
||||
public Token createNMToken(ApplicationAttemptId applicationAttemptId,
|
||||
NodeId nodeId, String applicationSubmitter) {
|
||||
byte[] password;
|
||||
NMTokenIdentifier identifier;
|
||||
|
||||
this.readLock.lock();
|
||||
try {
|
||||
identifier =
|
||||
new NMTokenIdentifier(applicationAttemptId, nodeId,
|
||||
applicationSubmitter, this.currentMasterKey.getMasterKey()
|
||||
.getKeyId());
|
||||
password = this.createPassword(identifier);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
return newNMToken(password, identifier);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,12 +65,12 @@ public class MockAM {
|
|||
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
|
||||
int timeoutSecs = 0;
|
||||
while (!finalState.equals(attempt.getAppAttemptState())
|
||||
&& timeoutSecs++ < 20) {
|
||||
&& timeoutSecs++ < 40) {
|
||||
System.out
|
||||
.println("AppAttempt : " + attemptId + " State is : "
|
||||
+ attempt.getAppAttemptState()
|
||||
+ " Waiting for state : " + finalState);
|
||||
Thread.sleep(500);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
System.out.println("AppAttempt State is : " + attempt.getAppAttemptState());
|
||||
Assert.assertEquals("AppAttempt state is not correct (timedout)",
|
||||
|
|
|
@ -19,22 +19,27 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
@ -136,6 +141,184 @@ public class TestRM {
|
|||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNMToken() throws Exception {
|
||||
MockRM rm = new MockRM();
|
||||
try {
|
||||
rm.start();
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 10000);
|
||||
|
||||
NMTokenSecretManagerInRM nmTokenSecretManager =
|
||||
rm.getRMContext().getNMTokenSecretManager();
|
||||
|
||||
// submitting new application
|
||||
RMApp app = rm.submitApp(1000);
|
||||
|
||||
// start scheduling.
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
// Starting application attempt and launching
|
||||
// It should get registered with NMTokenSecretManager.
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
|
||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
|
||||
Assert.assertTrue(nmTokenSecretManager
|
||||
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
|
||||
|
||||
// This will register application master.
|
||||
am.registerAppAttempt();
|
||||
|
||||
ArrayList<Container> containersReceivedForNM1 =
|
||||
new ArrayList<Container>();
|
||||
List<ContainerId> releaseContainerList =
|
||||
new ArrayList<ContainerId>();
|
||||
HashMap<String, Token> nmTokens = new HashMap<String, Token>();
|
||||
|
||||
// initially requesting 2 containers.
|
||||
AllocateResponse response =
|
||||
am.allocate("h1", 1000, 2, releaseContainerList);
|
||||
nm1.nodeHeartbeat(true);
|
||||
Assert.assertEquals(0, response.getAllocatedContainers().size());
|
||||
allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 2,
|
||||
nmTokens);
|
||||
Assert.assertEquals(1, nmTokens.size());
|
||||
|
||||
|
||||
// requesting 2 more containers.
|
||||
response = am.allocate("h1", 1000, 2, releaseContainerList);
|
||||
nm1.nodeHeartbeat(true);
|
||||
Assert.assertEquals(0, response.getAllocatedContainers().size());
|
||||
allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 4,
|
||||
nmTokens);
|
||||
Assert.assertEquals(1, nmTokens.size());
|
||||
|
||||
|
||||
// We will be simulating NM restart so restarting newly added h2:1234
|
||||
// NM 2 now registers.
|
||||
MockNM nm2 = rm.registerNode("h2:1234", 10000);
|
||||
nm2.nodeHeartbeat(true);
|
||||
ArrayList<Container> containersReceivedForNM2 =
|
||||
new ArrayList<Container>();
|
||||
|
||||
response = am.allocate("h2", 1000, 2, releaseContainerList);
|
||||
nm2.nodeHeartbeat(true);
|
||||
Assert.assertEquals(0, response.getAllocatedContainers().size());
|
||||
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 2,
|
||||
nmTokens);
|
||||
Assert.assertEquals(2, nmTokens.size());
|
||||
|
||||
// Simulating NM-2 restart.
|
||||
nm2 = rm.registerNode("h2:1234", 10000);
|
||||
nm2.nodeHeartbeat(true);
|
||||
|
||||
int interval = 40;
|
||||
// Wait for nm Token to be cleared.
|
||||
while (nmTokenSecretManager
|
||||
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
|
||||
nm2.getNodeId()) && interval-- > 0) {
|
||||
LOG.info("waiting for nmToken to be cleared for : " + nm2.getNodeId());
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assert.assertTrue(nmTokenSecretManager
|
||||
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
|
||||
|
||||
// removing NMToken for h2:1234
|
||||
nmTokens.remove(nm2.getNodeId().toString());
|
||||
Assert.assertEquals(1, nmTokens.size());
|
||||
|
||||
// We should again receive the NMToken.
|
||||
response = am.allocate("h2", 1000, 2, releaseContainerList);
|
||||
nm2.nodeHeartbeat(true);
|
||||
Assert.assertEquals(0, response.getAllocatedContainers().size());
|
||||
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 4,
|
||||
nmTokens);
|
||||
Assert.assertEquals(2, nmTokens.size());
|
||||
|
||||
// Now rolling over NMToken masterKey. it should resend the NMToken in
|
||||
// next allocate call.
|
||||
Assert.assertTrue(nmTokenSecretManager
|
||||
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
|
||||
nm1.getNodeId()));
|
||||
Assert.assertTrue(nmTokenSecretManager
|
||||
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
|
||||
nm2.getNodeId()));
|
||||
|
||||
nmTokenSecretManager.rollMasterKey();
|
||||
nmTokenSecretManager.activateNextMasterKey();
|
||||
|
||||
Assert.assertFalse(nmTokenSecretManager
|
||||
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
|
||||
nm1.getNodeId()));
|
||||
Assert.assertFalse(nmTokenSecretManager
|
||||
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
|
||||
nm2.getNodeId()));
|
||||
// It should not remove application attempt entry.
|
||||
Assert.assertTrue(nmTokenSecretManager
|
||||
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
|
||||
|
||||
nmTokens.clear();
|
||||
Assert.assertEquals(0, nmTokens.size());
|
||||
// We should again receive the NMToken.
|
||||
response = am.allocate("h2", 1000, 1, releaseContainerList);
|
||||
nm2.nodeHeartbeat(true);
|
||||
Assert.assertEquals(0, response.getAllocatedContainers().size());
|
||||
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 5,
|
||||
nmTokens);
|
||||
Assert.assertEquals(1, nmTokens.size());
|
||||
Assert.assertTrue(nmTokenSecretManager
|
||||
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
|
||||
nm2.getNodeId()));
|
||||
|
||||
|
||||
// After AM is finished making sure that nmtoken entry for app
|
||||
Assert.assertTrue(nmTokenSecretManager
|
||||
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
|
||||
am.unregisterAppAttempt();
|
||||
// marking all the containers as finished.
|
||||
for (Container container : containersReceivedForNM1) {
|
||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
|
||||
ContainerState.COMPLETE);
|
||||
}
|
||||
for (Container container : containersReceivedForNM2) {
|
||||
nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
|
||||
ContainerState.COMPLETE);
|
||||
}
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
Assert.assertFalse(nmTokenSecretManager
|
||||
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected void allocateContainersAndValidateNMTokens(MockAM am,
|
||||
ArrayList<Container> containersReceived, int totalContainerRequested,
|
||||
HashMap<String, Token> nmTokens) throws Exception, InterruptedException {
|
||||
ArrayList<ContainerId> releaseContainerList = new ArrayList<ContainerId>();
|
||||
AllocateResponse response;
|
||||
ArrayList<ResourceRequest> resourceRequest =
|
||||
new ArrayList<ResourceRequest>();
|
||||
while (containersReceived.size() < totalContainerRequested) {
|
||||
LOG.info("requesting containers..");
|
||||
response =
|
||||
am.allocate(resourceRequest, releaseContainerList);
|
||||
containersReceived.addAll(response.getAllocatedContainers());
|
||||
if (!response.getNMTokens().isEmpty()) {
|
||||
for (NMToken nmToken : response.getNMTokens()) {
|
||||
String nodeId = nmToken.getNodeId().toString();
|
||||
if (nmTokens.containsKey(nodeId)) {
|
||||
Assert.fail("Duplicate NMToken received for : " + nodeId);
|
||||
}
|
||||
nmTokens.put(nodeId, nmToken.getToken());
|
||||
}
|
||||
}
|
||||
LOG.info("Got " + containersReceived.size()
|
||||
+ " containers. Waiting to get " + totalContainerRequested);
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 300000)
|
||||
public void testActivatingApplicationAfterAddingNM() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue