YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then use them for authentication with NMs. Contributed by Omkar Vinit Joshi.

svn merge --ignore-ancestry -c 1493448 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1493449 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-06-16 03:12:04 +00:00
parent a00e4a926d
commit cf179b9b0a
19 changed files with 689 additions and 82 deletions

View File

@ -338,6 +338,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-639. Modified Distributed Shell application to start using the new
NMClient library. (Zhijie Shen via vinodkv)
YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then
use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv)
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -29,10 +29,10 @@ import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.util.Records;
/**
@ -67,7 +67,7 @@ public abstract class AllocateResponse {
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt) {
PreemptionMessage preempt, List<NMToken> nmTokens) {
AllocateResponse response = Records.newRecord(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes);
response.setResponseId(responseId);
@ -77,6 +77,7 @@ public abstract class AllocateResponse {
response.setAvailableResources(availResources);
response.setAMCommand(command);
response.setPreemptionMessage(preempt);
response.setNMTokens(nmTokens);
return response;
}
@ -202,7 +203,7 @@ public abstract class AllocateResponse {
@Public
@Stable
public abstract void setNMTokens(List<Token> nmTokens);
public abstract void setNMTokens(List<NMToken> nmTokens);
/**
* Get the list of NMTokens required for communicating with NM. New NMTokens
@ -217,6 +218,6 @@ public abstract class AllocateResponse {
*/
@Public
@Stable
public abstract List<Token> getNMTokens();
public abstract List<NMToken> getNMTokens();
}

View File

@ -23,21 +23,20 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
@ -46,6 +45,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
public class AllocateResponsePBImpl extends AllocateResponse {
@ -56,7 +56,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
Resource limit;
private List<Container> allocatedContainers = null;
private List<Token> nmTokens = null;
private List<NMToken> nmTokens = null;
private List<ContainerStatus> completedContainersStatuses = null;
private List<NodeReport> updatedNodes = null;
@ -108,7 +108,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
if (nmTokens != null) {
builder.clearNmTokens();
Iterable<TokenProto> iterable = getTokenProtoIterable(nmTokens);
Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
builder.addAllNmTokens(iterable);
}
if (this.completedContainersStatuses != null) {
@ -248,9 +248,11 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
@Override
public synchronized void setNMTokens(List<Token> nmTokens) {
public synchronized void setNMTokens(List<NMToken> nmTokens) {
if (nmTokens == null || nmTokens.isEmpty()) {
if (this.nmTokens != null) {
this.nmTokens.clear();
}
builder.clearNmTokens();
return;
}
@ -260,7 +262,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
@Override
public synchronized List<Token> getNMTokens() {
public synchronized List<NMToken> getNMTokens() {
initLocalNewNMTokenList();
return nmTokens;
}
@ -334,9 +336,9 @@ public class AllocateResponsePBImpl extends AllocateResponse {
return;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<TokenProto> list = p.getNmTokensList();
nmTokens = new ArrayList<Token>();
for (TokenProto t : list) {
List<NMTokenProto> list = p.getNmTokensList();
nmTokens = new ArrayList<NMToken>();
for (NMTokenProto t : list) {
nmTokens.add(convertFromProtoFormat(t));
}
}
@ -372,15 +374,15 @@ public class AllocateResponsePBImpl extends AllocateResponse {
};
}
private synchronized Iterable<TokenProto> getTokenProtoIterable(
final List<Token> nmTokenList) {
private synchronized Iterable<NMTokenProto> getTokenProtoIterable(
final List<NMToken> nmTokenList) {
maybeInitBuilder();
return new Iterable<TokenProto>() {
return new Iterable<NMTokenProto>() {
@Override
public synchronized Iterator<TokenProto> iterator() {
return new Iterator<TokenProto>() {
public synchronized Iterator<NMTokenProto> iterator() {
return new Iterator<NMTokenProto>() {
Iterator<Token> iter = nmTokenList.iterator();
Iterator<NMToken> iter = nmTokenList.iterator();
@Override
public boolean hasNext() {
@ -388,7 +390,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
@Override
public TokenProto next() {
public NMTokenProto next() {
return convertToProtoFormat(iter.next());
}
@ -524,11 +526,11 @@ public class AllocateResponsePBImpl extends AllocateResponse {
return ((PreemptionMessagePBImpl)r).getProto();
}
private synchronized TokenProto convertToProtoFormat(Token token) {
return ((TokenPBImpl)token).getProto();
private synchronized NMTokenProto convertToProtoFormat(NMToken token) {
return ((NMTokenPBImpl)token).getProto();
}
private synchronized Token convertFromProtoFormat(TokenProto proto) {
return new TokenPBImpl(proto);
private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) {
return new NMTokenPBImpl(proto);
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.util.Records;
/**
* NMToken is returned by RM on AllocateResponse.
*/
public abstract class NMToken {
@Public
@Stable
public abstract NodeId getNodeId();
@Public
@Stable
public abstract void setNodeId(NodeId nodeId);
@Public
@Stable
public abstract Token getToken();
@Public
@Stable
public abstract void setToken(Token token);
@Private
public static NMToken newInstance(NodeId nodeId, Token token) {
NMToken nmToken = Records.newRecord(NMToken.class);
nmToken.setNodeId(nodeId);
nmToken.setToken(token);
return nmToken;
}
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProtoOrBuilder;
public class NMTokenPBImpl extends NMToken{
NMTokenProto proto = NMTokenProto.getDefaultInstance();
NMTokenProto.Builder builder = null;
boolean viaProto = false;
private Token token = null;
private NodeId nodeId = null;
public NMTokenPBImpl() {
builder = NMTokenProto.newBuilder();
}
public NMTokenPBImpl(NMTokenProto proto) {
this.proto = proto;
viaProto = true;
}
@Override
public synchronized NodeId getNodeId() {
NMTokenProtoOrBuilder p = viaProto ? proto : builder;
if (this.nodeId != null) {
return nodeId;
}
if (!p.hasNodeId()) {
return null;
}
this.nodeId = convertFromProtoFormat(p.getNodeId());
return nodeId;
}
@Override
public synchronized void setNodeId(NodeId nodeId) {
maybeInitBuilder();
if (nodeId == null) {
builder.clearNodeId();
}
this.nodeId = nodeId;
}
@Override
public synchronized Token getToken() {
NMTokenProtoOrBuilder p = viaProto ? proto : builder;
if (this.token != null) {
return this.token;
}
if (!p.hasToken()) {
return null;
}
this.token = convertFromProtoFormat(p.getToken());
return token;
}
@Override
public synchronized void setToken(Token token) {
maybeInitBuilder();
if (token == null) {
builder.clearToken();
}
this.token = token;
}
public synchronized NMTokenProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private synchronized void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private synchronized void mergeLocalToBuilder() {
if (this.nodeId != null) {
builder.setNodeId(convertToProtoFormat(nodeId));
}
if (this.token != null) {
builder.setToken(convertToProtoFormat(token));
}
}
private synchronized void maybeInitBuilder() {
if(viaProto || builder == null) {
builder = NMTokenProto.newBuilder(proto);
}
viaProto = false;
}
private synchronized NodeId convertFromProtoFormat(NodeIdProto p) {
return new NodeIdPBImpl(p);
}
private synchronized NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto();
}
private synchronized TokenProto convertToProtoFormat(Token token) {
return ((TokenPBImpl)token).getProto();
}
private synchronized Token convertFromProtoFormat(TokenProto proto) {
return new TokenPBImpl(proto);
}
}

View File

@ -59,6 +59,11 @@ message AllocateRequestProto {
optional float progress = 6;
}
message NMTokenProto {
optional NodeIdProto nodeId = 1;
optional hadoop.common.TokenProto token = 2;
}
message AllocateResponseProto {
optional AMCommandProto a_m_command = 1;
optional int32 response_id = 2;
@ -68,7 +73,7 @@ message AllocateResponseProto {
repeated NodeReportProto updated_nodes = 6;
optional int32 num_cluster_nodes = 7;
optional PreemptionMessageProto preempt = 8;
repeated hadoop.common.TokenProto nm_tokens = 9;
repeated NMTokenProto nm_tokens = 9;
}
//////////////////////////////////////////////////////

View File

@ -21,15 +21,17 @@ package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.Service;
@ -208,4 +210,13 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
String resourceName,
Resource capability);
/**
* It returns the NMToken received on allocate call. It will not communicate
* with RM to get NMTokens. On allocate call whenever we receive new token
* along with container AMRMClient will cache this NMToken per node manager.
* This map returned should be shared with any application which is
* communicating with NodeManager (ex. NMClient) using NMTokens. If a new
* NMToken is received for the same node manager then it will be replaced.
*/
public ConcurrentMap<String, Token> getNMTokens();
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -261,6 +263,19 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
return client.getClusterNodeCount();
}
/**
* It returns the NMToken received on allocate call. It will not communicate
* with RM to get NMTokens. On allocate call whenever we receive new token
* along with new container AMRMClientAsync will cache this NMToken per node
* manager. This map returned should be shared with any application which is
* communicating with NodeManager (ex. NMClient / NMClientAsync) using
* NMTokens. If a new NMToken is received for the same node manager
* then it will be replaced.
*/
public ConcurrentMap<String, Token> getNMTokens() {
return client.getNMTokens();
}
private class HeartbeatThread extends Thread {
public HeartbeatThread() {
super("AMRM Heartbeater thread");

View File

@ -33,9 +33,11 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
@ -49,9 +51,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -61,6 +65,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.annotations.VisibleForTesting;
// TODO check inputs for null etc. YARN-654
@Unstable
@ -73,6 +79,7 @@ public class AMRMClientImpl<T extends ContainerRequest>
RecordFactoryProvider.getRecordFactory(null);
private int lastResponseId = 0;
private ConcurrentHashMap<String, Token> nmTokens;
protected AMRMProtocol rmClient;
protected final ApplicationAttemptId appAttemptId;
@ -148,6 +155,7 @@ public class AMRMClientImpl<T extends ContainerRequest>
public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
super(AMRMClientImpl.class.getName());
this.appAttemptId = appAttemptId;
this.nmTokens = new ConcurrentHashMap<String, Token>();
}
@Override
@ -238,6 +246,9 @@ public class AMRMClientImpl<T extends ContainerRequest>
clusterNodeCount = allocateResponse.getNumClusterNodes();
lastResponseId = allocateResponse.getResponseId();
clusterAvailableResources = allocateResponse.getAvailableResources();
if (!allocateResponse.getNMTokens().isEmpty()) {
populateNMTokens(allocateResponse);
}
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc
@ -265,6 +276,20 @@ public class AMRMClientImpl<T extends ContainerRequest>
return allocateResponse;
}
@Private
@VisibleForTesting
protected void populateNMTokens(AllocateResponse allocateResponse) {
for (NMToken token : allocateResponse.getNMTokens()) {
String nodeId = token.getNodeId().toString();
if (nmTokens.containsKey(nodeId)) {
LOG.debug("Replacing token for : " + nodeId);
} else {
LOG.debug("Received new token for : " + nodeId);
}
nmTokens.put(nodeId, token.getToken());
}
}
@Override
public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appTrackingUrl) throws YarnException,
@ -512,4 +537,8 @@ public class AMRMClientImpl<T extends ContainerRequest>
}
}
@Override
public ConcurrentHashMap<String, Token> getNMTokens() {
return nmTokens;
}
}

View File

@ -25,9 +25,14 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.AMRMProtocol;
@ -49,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
@ -437,6 +443,11 @@ public class TestAMRMClient {
int allocatedContainerCount = 0;
int iterationsLeft = 2;
Set<ContainerId> releases = new TreeSet<ContainerId>();
ConcurrentHashMap<String, Token> nmTokens = amClient.getNMTokens();
Assert.assertEquals(0, nmTokens.size());
HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
while (allocatedContainerCount < containersRequestedAny
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
@ -450,12 +461,32 @@ public class TestAMRMClient {
releases.add(rejectContainerId);
amClient.releaseAssignedContainer(rejectContainerId);
}
Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size());
Iterator<String> nodeI = nmTokens.keySet().iterator();
while (nodeI.hasNext()) {
String nodeId = nodeI.next();
if (!receivedNMTokens.containsKey(nodeId)) {
receivedNMTokens.put(nodeId, nmTokens.get(nodeId));
} else {
Assert.fail("Received token again for : " + nodeId);
}
}
nodeI = receivedNMTokens.keySet().iterator();
while (nodeI.hasNext()) {
nmTokens.remove(nodeI.next());
}
if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(1000);
}
}
Assert.assertEquals(0, amClient.getNMTokens().size());
// Should receive atleast 1 token
Assert.assertTrue(receivedNMTokens.size() > 0
&& receivedNMTokens.size() <= nodeCount);
assertTrue(allocatedContainerCount == containersRequestedAny);
assertTrue(amClient.release.size() == 2);
assertTrue(amClient.ask.size() == 0);
@ -523,7 +554,6 @@ public class TestAMRMClient {
sleep(1000);
}
}
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
@ -66,11 +67,11 @@ public class TestAMRMClientAsync {
List<Container> allocated1 = Arrays.asList(
Container.newInstance(null, null, null, null, null, null));
final AllocateResponse response1 = createAllocateResponse(
new ArrayList<ContainerStatus>(), allocated1);
new ArrayList<ContainerStatus>(), allocated1, null);
final AllocateResponse response2 = createAllocateResponse(completed1,
new ArrayList<Container>());
new ArrayList<Container>(), null);
final AllocateResponse emptyResponse = createAllocateResponse(
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
TestCallbackHandler callbackHandler = new TestCallbackHandler();
final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
@ -189,7 +190,7 @@ public class TestAMRMClientAsync {
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
final AllocateResponse rebootResponse = createAllocateResponse(
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
when(client.allocate(anyFloat())).thenReturn(rebootResponse);
@ -215,9 +216,11 @@ public class TestAMRMClientAsync {
}
private AllocateResponse createAllocateResponse(
List<ContainerStatus> completed, List<Container> allocated) {
AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated,
new ArrayList<NodeReport>(), null, null, 1, null);
List<ContainerStatus> completed, List<Container> allocated,
List<NMToken> nmTokens) {
AllocateResponse response =
AllocateResponse.newInstance(0, completed, allocated,
new ArrayList<NodeReport>(), null, null, 1, null, nmTokens);
return response;
}

View File

@ -21,15 +21,19 @@ package org.apache.hadoop.yarn.security;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
public class NMTokenIdentifier extends TokenIdentifier {
@ -106,5 +110,4 @@ public class NMTokenIdentifier extends TokenIdentifier {
public UserGroupInformation getUser() {
return UserGroupInformation.createRemoteUser(appAttemptId.toString());
}
}

View File

@ -18,14 +18,11 @@
package org.apache.hadoop.yarn.server.security;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -34,7 +31,6 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.util.Records;
/**
* SecretManager for ContainerTokens. Extended by both RM and NM and hence is

View File

@ -114,39 +114,4 @@ public class BaseNMTokenSecretManager extends
public NMTokenIdentifier createIdentifier() {
return new NMTokenIdentifier();
}
/**
* Helper function for creating NMTokens.
*/
public Token createNMToken(ApplicationAttemptId applicationAttemptId,
NodeId nodeId, String applicationSubmitter) {
byte[] password;
NMTokenIdentifier identifier;
this.readLock.lock();
try {
identifier =
new NMTokenIdentifier(applicationAttemptId, nodeId,
applicationSubmitter, this.currentMasterKey.getMasterKey()
.getKeyId());
password = this.createPassword(identifier);
} finally {
this.readLock.unlock();
}
return newNMToken(password, identifier);
}
public static Token newNMToken(byte[] password,
NMTokenIdentifier identifier) {
NodeId nodeId = identifier.getNodeId();
// RPC layer client expects ip:port as service for tokens
InetSocketAddress addr =
NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
Token nmToken =
Token.newInstance(identifier.getBytes(),
NMTokenIdentifier.KIND.toString(), password, SecurityUtil
.buildTokenService(addr).toString());
return nmToken;
}
}

View File

@ -373,6 +373,12 @@ public class ApplicationMasterService extends AbstractService implements
// add preemption to the allocateResponse message (if any)
allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
// Adding NMTokens for allocated containers.
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
.getNMTokens(app.getUser(), appAttemptId,
allocation.getContainers()));
}
return allocateResponse;
}
}
@ -433,12 +439,15 @@ public class ApplicationMasterService extends AbstractService implements
AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);
response.setResponseId(0);
LOG.info("Registering " + attemptId);
LOG.info("Registering app attempt : " + attemptId);
responseMap.put(attemptId, response);
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
}
public void unregisterAttempt(ApplicationAttemptId attemptId) {
LOG.info("Unregistering app attempt : " + attemptId);
responseMap.remove(attemptId);
rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId);
}
public void refreshServiceAcls(Configuration configuration,

View File

@ -218,7 +218,9 @@ public class ResourceTrackerService extends AbstractService implements
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeReconnectEvent(nodeId, rmNode));
}
// On every node manager register we will be clearing NMToken keys if
// present for any running application.
this.nmTokenSecretManager.removeNodeKey(nodeId);
this.nmLivelinessMonitor.register(nodeId);
String message =

View File

@ -18,18 +18,35 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import com.google.common.annotations.VisibleForTesting;
public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
@ -42,6 +59,7 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
private final Timer timer;
private final long rollingInterval;
private final long activationDelay;
private final ConcurrentHashMap<ApplicationAttemptId, HashSet<NodeId>> appAttemptToNodeKeyMap;
public NMTokenSecretManagerInRM(Configuration conf) {
this.conf = conf;
@ -70,6 +88,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
+ " should be more than 2 X "
+ YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS);
}
appAttemptToNodeKeyMap =
new ConcurrentHashMap<ApplicationAttemptId, HashSet<NodeId>>();
}
/**
@ -119,11 +139,23 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
+ this.nextMasterKey.getMasterKey().getKeyId());
this.currentMasterKey = this.nextMasterKey;
this.nextMasterKey = null;
clearApplicationNMTokenKeys();
} finally {
super.writeLock.unlock();
}
}
private void clearApplicationNMTokenKeys() {
// We should clear all node entries from this set.
// TODO : Once we have per node master key then it will change to only
// remove specific node from it.
Iterator<HashSet<NodeId>> nodeSetI =
this.appAttemptToNodeKeyMap.values().iterator();
while (nodeSetI.hasNext()) {
nodeSetI.next().clear();
}
}
public void start() {
rollMasterKey();
this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
@ -150,4 +182,129 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
activateNextMasterKey();
}
}
public List<NMToken> getNMTokens(String applicationSubmitter,
ApplicationAttemptId appAttemptId, List<Container> containers) {
try {
this.readLock.lock();
List<NMToken> nmTokens = new ArrayList<NMToken>();
HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId);
if (nodeSet != null) {
for (Container container : containers) {
if (!nodeSet.contains(container.getNodeId())) {
LOG.debug("Sending NMToken for nodeId : "
+ container.getNodeId().toString());
Token token = createNMToken(appAttemptId, container.getNodeId(),
applicationSubmitter);
NMToken nmToken =
NMToken.newInstance(container.getNodeId(), token);
nmTokens.add(nmToken);
nodeSet.add(container.getNodeId());
}
}
}
return nmTokens;
} finally {
this.readLock.unlock();
}
}
public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) {
try {
this.writeLock.lock();
this.appAttemptToNodeKeyMap.put(appAttemptId, new HashSet<NodeId>());
} finally {
this.writeLock.unlock();
}
}
@Private
@VisibleForTesting
public boolean isApplicationAttemptRegistered(
ApplicationAttemptId appAttemptId) {
try {
this.readLock.lock();
return this.appAttemptToNodeKeyMap.containsKey(appAttemptId);
} finally {
this.readLock.unlock();
}
}
@Private
@VisibleForTesting
public boolean isApplicationAttemptNMTokenPresent(
ApplicationAttemptId appAttemptId, NodeId nodeId) {
try {
this.readLock.lock();
HashSet<NodeId> nodes = this.appAttemptToNodeKeyMap.get(appAttemptId);
if (nodes != null && nodes.contains(nodeId)) {
return true;
} else {
return false;
}
} finally {
this.readLock.unlock();
}
}
public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) {
try {
this.writeLock.lock();
this.appAttemptToNodeKeyMap.remove(appAttemptId);
} finally {
this.writeLock.unlock();
}
}
/**
* This is to be called when NodeManager reconnects or goes down. This will
* remove if NMTokens if present for any running application from cache.
* @param nodeId
*/
public void removeNodeKey(NodeId nodeId) {
try {
this.writeLock.lock();
Iterator<HashSet<NodeId>> appNodeKeySetIterator =
this.appAttemptToNodeKeyMap.values().iterator();
while (appNodeKeySetIterator.hasNext()) {
appNodeKeySetIterator.next().remove(nodeId);
}
} finally {
this.writeLock.unlock();
}
}
public static Token newNMToken(byte[] password,
NMTokenIdentifier identifier) {
NodeId nodeId = identifier.getNodeId();
// RPC layer client expects ip:port as service for tokens
InetSocketAddress addr =
NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
Token nmToken =
Token.newInstance(identifier.getBytes(),
NMTokenIdentifier.KIND.toString(), password, SecurityUtil
.buildTokenService(addr).toString());
return nmToken;
}
/**
* Helper function for creating NMTokens.
*/
public Token createNMToken(ApplicationAttemptId applicationAttemptId,
NodeId nodeId, String applicationSubmitter) {
byte[] password;
NMTokenIdentifier identifier;
this.readLock.lock();
try {
identifier =
new NMTokenIdentifier(applicationAttemptId, nodeId,
applicationSubmitter, this.currentMasterKey.getMasterKey()
.getKeyId());
password = this.createPassword(identifier);
} finally {
this.readLock.unlock();
}
return newNMToken(password, identifier);
}
}

View File

@ -65,12 +65,12 @@ public class MockAM {
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
int timeoutSecs = 0;
while (!finalState.equals(attempt.getAppAttemptState())
&& timeoutSecs++ < 20) {
&& timeoutSecs++ < 40) {
System.out
.println("AppAttempt : " + attemptId + " State is : "
+ attempt.getAppAttemptState()
+ " Waiting for state : " + finalState);
Thread.sleep(500);
Thread.sleep(1000);
}
System.out.println("AppAttempt State is : " + attempt.getAppAttemptState());
Assert.assertEquals("AppAttempt state is not correct (timedout)",

View File

@ -19,22 +19,27 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@ -137,6 +142,184 @@ public class TestRM {
rm.stop();
}
@Test
public void testNMToken() throws Exception {
MockRM rm = new MockRM();
try {
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 10000);
NMTokenSecretManagerInRM nmTokenSecretManager =
rm.getRMContext().getNMTokenSecretManager();
// submitting new application
RMApp app = rm.submitApp(1000);
// start scheduling.
nm1.nodeHeartbeat(true);
// Starting application attempt and launching
// It should get registered with NMTokenSecretManager.
RMAppAttempt attempt = app.getCurrentAppAttempt();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
Assert.assertTrue(nmTokenSecretManager
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
// This will register application master.
am.registerAppAttempt();
ArrayList<Container> containersReceivedForNM1 =
new ArrayList<Container>();
List<ContainerId> releaseContainerList =
new ArrayList<ContainerId>();
HashMap<String, Token> nmTokens = new HashMap<String, Token>();
// initially requesting 2 containers.
AllocateResponse response =
am.allocate("h1", 1000, 2, releaseContainerList);
nm1.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 2,
nmTokens);
Assert.assertEquals(1, nmTokens.size());
// requesting 2 more containers.
response = am.allocate("h1", 1000, 2, releaseContainerList);
nm1.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 4,
nmTokens);
Assert.assertEquals(1, nmTokens.size());
// We will be simulating NM restart so restarting newly added h2:1234
// NM 2 now registers.
MockNM nm2 = rm.registerNode("h2:1234", 10000);
nm2.nodeHeartbeat(true);
ArrayList<Container> containersReceivedForNM2 =
new ArrayList<Container>();
response = am.allocate("h2", 1000, 2, releaseContainerList);
nm2.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 2,
nmTokens);
Assert.assertEquals(2, nmTokens.size());
// Simulating NM-2 restart.
nm2 = rm.registerNode("h2:1234", 10000);
nm2.nodeHeartbeat(true);
int interval = 40;
// Wait for nm Token to be cleared.
while (nmTokenSecretManager
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
nm2.getNodeId()) && interval-- > 0) {
LOG.info("waiting for nmToken to be cleared for : " + nm2.getNodeId());
Thread.sleep(1000);
}
Assert.assertTrue(nmTokenSecretManager
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
// removing NMToken for h2:1234
nmTokens.remove(nm2.getNodeId().toString());
Assert.assertEquals(1, nmTokens.size());
// We should again receive the NMToken.
response = am.allocate("h2", 1000, 2, releaseContainerList);
nm2.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 4,
nmTokens);
Assert.assertEquals(2, nmTokens.size());
// Now rolling over NMToken masterKey. it should resend the NMToken in
// next allocate call.
Assert.assertTrue(nmTokenSecretManager
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
nm1.getNodeId()));
Assert.assertTrue(nmTokenSecretManager
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
nm2.getNodeId()));
nmTokenSecretManager.rollMasterKey();
nmTokenSecretManager.activateNextMasterKey();
Assert.assertFalse(nmTokenSecretManager
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
nm1.getNodeId()));
Assert.assertFalse(nmTokenSecretManager
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
nm2.getNodeId()));
// It should not remove application attempt entry.
Assert.assertTrue(nmTokenSecretManager
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
nmTokens.clear();
Assert.assertEquals(0, nmTokens.size());
// We should again receive the NMToken.
response = am.allocate("h2", 1000, 1, releaseContainerList);
nm2.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 5,
nmTokens);
Assert.assertEquals(1, nmTokens.size());
Assert.assertTrue(nmTokenSecretManager
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
nm2.getNodeId()));
// After AM is finished making sure that nmtoken entry for app
Assert.assertTrue(nmTokenSecretManager
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
am.unregisterAppAttempt();
// marking all the containers as finished.
for (Container container : containersReceivedForNM1) {
nm1.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
ContainerState.COMPLETE);
}
for (Container container : containersReceivedForNM2) {
nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
ContainerState.COMPLETE);
}
am.waitForState(RMAppAttemptState.FINISHED);
Assert.assertFalse(nmTokenSecretManager
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
} finally {
rm.stop();
}
}
protected void allocateContainersAndValidateNMTokens(MockAM am,
ArrayList<Container> containersReceived, int totalContainerRequested,
HashMap<String, Token> nmTokens) throws Exception, InterruptedException {
ArrayList<ContainerId> releaseContainerList = new ArrayList<ContainerId>();
AllocateResponse response;
ArrayList<ResourceRequest> resourceRequest =
new ArrayList<ResourceRequest>();
while (containersReceived.size() < totalContainerRequested) {
LOG.info("requesting containers..");
response =
am.allocate(resourceRequest, releaseContainerList);
containersReceived.addAll(response.getAllocatedContainers());
if (!response.getNMTokens().isEmpty()) {
for (NMToken nmToken : response.getNMTokens()) {
String nodeId = nmToken.getNodeId().toString();
if (nmTokens.containsKey(nodeId)) {
Assert.fail("Duplicate NMToken received for : " + nodeId);
}
nmTokens.put(nodeId, nmToken.getToken());
}
}
LOG.info("Got " + containersReceived.size()
+ " containers. Waiting to get " + totalContainerRequested);
Thread.sleep(500);
}
}
@Test (timeout = 300000)
public void testActivatingApplicationAfterAddingNM() throws Exception {
YarnConfiguration conf = new YarnConfiguration();