YARN-851. Share NMTokens using NMTokenCache (api-based) between AMRMClient and NMClient instead of memory based approach which is used currently. Contributed by Omkar Vinit Joshi.

svn merge --ignore-ancestry -c 1495247 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1495248 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-06-21 00:09:45 +00:00
parent bf433c06c7
commit 34dfd27833
25 changed files with 163 additions and 170 deletions

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
@ -62,6 +61,4 @@ public interface AppContext {
Set<String> getBlacklistedNodes(); Set<String> getBlacklistedNodes();
ClientToAMTokenSecretManager getClientToAMTokenSecretManager(); ClientToAMTokenSecretManager getClientToAMTokenSecretManager();
Map<String, Token> getNMTokens();
} }

View File

@ -886,8 +886,6 @@ private class RunningAppContext implements AppContext {
private final Configuration conf; private final Configuration conf;
private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClusterInfo clusterInfo = new ClusterInfo();
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
private final ConcurrentHashMap<String, org.apache.hadoop.yarn.api.records.Token> nmTokens =
new ConcurrentHashMap<String, org.apache.hadoop.yarn.api.records.Token>();
public RunningAppContext(Configuration config) { public RunningAppContext(Configuration config) {
this.conf = config; this.conf = config;
@ -954,11 +952,6 @@ public Set<String> getBlacklistedNodes() {
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
return clientToAMTokenSecretManager; return clientToAMTokenSecretManager;
} }
@Override
public Map<String, org.apache.hadoop.yarn.api.records.Token> getNMTokens() {
return this.nmTokens;
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -235,8 +235,7 @@ protected void serviceInit(Configuration conf) throws Exception {
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
super.serviceInit(conf); super.serviceInit(conf);
cmProxy = cmProxy = new ContainerManagementProtocolProxy(conf);
new ContainerManagementProtocolProxy(conf, context.getNMTokens());
} }
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
@ -588,7 +589,7 @@ private List<Container> getResources() throws Exception {
// Setting NMTokens // Setting NMTokens
if (response.getNMTokens() != null) { if (response.getNMTokens() != null) {
for (NMToken nmToken : response.getNMTokens()) { for (NMToken nmToken : response.getNMTokens()) {
getContext().getNMTokens().put(nmToken.getNodeId().toString(), NMTokenCache.setNMToken(nmToken.getNodeId().toString(),
nmToken.getToken()); nmToken.getToken());
} }
} }

View File

@ -26,10 +26,9 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -131,10 +130,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
// Not implemented // Not implemented
return null; return null;
} }
@Override
public Map<String, Token> getNMTokens() {
// Not Implemented
return null;
}
} }

View File

@ -862,11 +862,5 @@ public Set<String> getBlacklistedNodes() {
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
return null; return null;
} }
@Override
public Map<String, Token> getNMTokens() {
// Not Implemented
return null;
}
} }
} }

View File

@ -376,7 +376,7 @@ public ContainerManagementProtocolProxyData getCMProxy(
containerId.getApplicationAttemptId(), containerId.getApplicationAttemptId(),
NodeId.newInstance(addr.getHostName(), addr.getPort()), "user"); NodeId.newInstance(addr.getHostName(), addr.getPort()), "user");
ContainerManagementProtocolProxy cmProxy = ContainerManagementProtocolProxy cmProxy =
new ContainerManagementProtocolProxy(conf, context.getNMTokens()); new ContainerManagementProtocolProxy(conf);
ContainerManagementProtocolProxyData proxy = ContainerManagementProtocolProxyData proxy =
cmProxy.new ContainerManagementProtocolProxyData( cmProxy.new ContainerManagementProtocolProxyData(
YarnRPC.create(conf), containerManagerBindAddr, containerId, YarnRPC.create(conf), containerManagerBindAddr, containerId,

View File

@ -44,7 +44,6 @@
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -316,10 +315,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
// Not implemented. // Not implemented.
return null; return null;
} }
@Override
public Map<String, Token> getNMTokens() {
// Not Implemented.
return null;
}
} }

View File

@ -187,6 +187,10 @@ Release 2.1.0-beta - UNRELEASED
ApplicationSubmissionContext to simplify the api. (Karthik Kambatla via ApplicationSubmissionContext to simplify the api. (Karthik Kambatla via
acmurthy) acmurthy)
YARN-851. Share NMTokens using NMTokenCache (api-based) between AMRMClient
and NMClient instead of memory based approach which is used currently. (Omkar
Vinit Joshi via vinodkv)
NEW FEATURES NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues. YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -36,8 +36,9 @@
* *
* <p>The response contains critical details such as: * <p>The response contains critical details such as:
* <ul> * <ul>
* <li>Minimum capability for allocated resources in the cluster.</li>
* <li>Maximum capability for allocated resources in the cluster.</li> * <li>Maximum capability for allocated resources in the cluster.</li>
* <li><code>ApplicationACL</code>s for the application.</li>
* <li>ClientToAMToken master key.</li>
* </ul> * </ul>
* </p> * </p>
* *
@ -50,11 +51,12 @@ public abstract class RegisterApplicationMasterResponse {
@Unstable @Unstable
public static RegisterApplicationMasterResponse newInstance( public static RegisterApplicationMasterResponse newInstance(
Resource minCapability, Resource maxCapability, Resource minCapability, Resource maxCapability,
Map<ApplicationAccessType, String> acls) { Map<ApplicationAccessType, String> acls, ByteBuffer key) {
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class); Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(maxCapability); response.setMaximumResourceCapability(maxCapability);
response.setApplicationACLs(acls); response.setApplicationACLs(acls);
response.setClientToAMTokenMasterKey(key);
return response; return response;
} }

View File

@ -44,12 +44,10 @@
* <li>HTTP uri of the node.</li> * <li>HTTP uri of the node.</li>
* <li>{@link Resource} allocated to the container.</li> * <li>{@link Resource} allocated to the container.</li>
* <li>{@link Priority} at which the container was allocated.</li> * <li>{@link Priority} at which the container was allocated.</li>
* <li>{@link ContainerState} of the container.</li>
* <li> * <li>
* Container {@link Token} of the container, used to securely verify * Container {@link Token} of the container, used to securely verify
* authenticity of the allocation. * authenticity of the allocation.
* </li> * </li>
* <li>{@link ContainerStatus} of the container.</li>
* </ul> * </ul>
* </p> * </p>
* *

View File

@ -448,8 +448,7 @@ public boolean run() throws YarnException, IOException {
resourceManager.start(); resourceManager.start();
containerListener = new NMCallbackHandler(); containerListener = new NMCallbackHandler();
nmClientAsync = nmClientAsync = new NMClientAsyncImpl(containerListener);
new NMClientAsyncImpl(containerListener, resourceManager.getNMTokens());
nmClientAsync.init(conf); nmClientAsync.init(conf);
nmClientAsync.start(); nmClientAsync.start();

View File

@ -100,7 +100,7 @@ public static void tearDown() throws IOException {
} }
} }
@Test(timeout=30000) @Test(timeout=90000)
public void testDSShell() throws Exception { public void testDSShell() throws Exception {
String[] args = { String[] args = {
@ -128,7 +128,7 @@ public void testDSShell() throws Exception {
} }
@Test(timeout=30000) @Test(timeout=90000)
public void testDSShellWithInvalidArgs() throws Exception { public void testDSShellWithInvalidArgs() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig())); Client client = new Client(new Configuration(yarnCluster.getConfig()));

View File

@ -21,7 +21,6 @@
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -35,7 +34,6 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -249,14 +247,4 @@ public abstract List<? extends Collection<T>> getMatchingRequests(
Priority priority, Priority priority,
String resourceName, String resourceName,
Resource capability); Resource capability);
/**
* It returns the NMToken received on allocate call. It will not communicate
* with RM to get NMTokens. On allocate call whenever we receive new token
* along with container AMRMClient will cache this NMToken per node manager.
* This map returned should be shared with any application which is
* communicating with NodeManager (ex. NMClient) using NMTokens. If a new
* NMToken is received for the same node manager then it will be replaced.
*/
public abstract ConcurrentMap<String, Token> getNMTokens();
} }

View File

@ -22,21 +22,17 @@
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -46,30 +42,19 @@ public abstract class NMClient extends AbstractService {
/** /**
* Create a new instance of NMClient. * Create a new instance of NMClient.
* @param nmTokens need to pass map of NMTokens which are received on
* {@link AMRMClient#allocate(float)} call as a part of
* {@link AllocateResponse}.
* key :- NodeAddr (host:port)
* Value :- Token {@link NMToken#getToken()}
*/ */
@Public @Public
public static NMClient createNMClient(ConcurrentMap<String, Token> nmTokens) { public static NMClient createNMClient() {
NMClient client = new NMClientImpl(nmTokens); NMClient client = new NMClientImpl();
return client; return client;
} }
/** /**
* Create a new instance of NMClient. * Create a new instance of NMClient.
* @param nmTokens need to pass map of NMTokens which are received on
* {@link AMRMClient#allocate(float)} call as a part of
* {@link AllocateResponse}.
* key :- NodeAddr (host:port)
* Value :- Token {@link NMToken#getToken()}
*/ */
@Public @Public
public static NMClient createNMClient(String name, public static NMClient createNMClient(String name) {
ConcurrentMap<String, Token> nmTokens) { NMClient client = new NMClientImpl(name);
NMClient client = new NMClientImpl(name, nmTokens);
return client; return client;
} }

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.Token;
import com.google.common.annotations.VisibleForTesting;
/**
* It manages NMTokens required for communicating with Node manager. Its a
* static token cache.
*/
@Public
@Evolving
public class NMTokenCache {
private static ConcurrentHashMap<String, Token> nmTokens;
static {
nmTokens = new ConcurrentHashMap<String, Token>();
}
/**
* Returns NMToken, null if absent
* @param nodeAddr
* @return {@link Token} NMToken required for communicating with node
* manager
*/
@Public
@Evolving
public static Token getNMToken(String nodeAddr) {
return nmTokens.get(nodeAddr);
}
/**
* Sets the NMToken for node address
* @param nodeAddr node address (host:port)
* @param token NMToken
*/
@Public
@Evolving
public static void setNMToken(String nodeAddr, Token token) {
nmTokens.put(nodeAddr, token);
}
/**
* Returns true if NMToken is present in cache.
*/
@Private
@VisibleForTesting
public static boolean containsNMToken(String nodeAddr) {
return nmTokens.containsKey(nodeAddr);
}
/**
* Returns the number of NMTokens present in cache.
*/
@Private
@VisibleForTesting
public static int numberOfNMTokensInCache() {
return nmTokens.size();
}
/**
* Removes NMToken for specified node manager
* @param nodeAddr node address (host:port)
*/
@Private
@VisibleForTesting
public static void removeNMToken(String nodeAddr) {
nmTokens.remove(nodeAddr);
}
/**
* It will remove all the nm tokens from its cache
*/
@Private
@VisibleForTesting
public static void clearCache() {
nmTokens.clear();
}
}

View File

@ -21,7 +21,6 @@
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -37,7 +36,6 @@
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
@ -198,17 +196,6 @@ public abstract void unregisterApplicationMaster(
*/ */
public abstract int getClusterNodeCount(); public abstract int getClusterNodeCount();
/**
* It returns the NMToken received on allocate call. It will not communicate
* with RM to get NMTokens. On allocate call whenever we receive new token
* along with new container AMRMClientAsync will cache this NMToken per node
* manager. This map returned should be shared with any application which is
* communicating with NodeManager (ex. NMClient / NMClientAsync) using
* NMTokens. If a new NMToken is received for the same node manager
* then it will be replaced.
*/
public abstract ConcurrentMap<String, Token> getNMTokens();
public interface CallbackHandler { public interface CallbackHandler {
/** /**

View File

@ -112,18 +112,16 @@ public abstract class NMClientAsync extends AbstractService {
protected CallbackHandler callbackHandler; protected CallbackHandler callbackHandler;
public static NMClientAsync createNMClientAsync( public static NMClientAsync createNMClientAsync(
CallbackHandler callbackHandler, ConcurrentMap<String, Token> nmTokens) { CallbackHandler callbackHandler) {
return new NMClientAsyncImpl(callbackHandler, nmTokens); return new NMClientAsyncImpl(callbackHandler);
} }
protected NMClientAsync(CallbackHandler callbackHandler, protected NMClientAsync(CallbackHandler callbackHandler) {
ConcurrentMap<String, Token> nmTokens) { this (NMClientAsync.class.getName(), callbackHandler);
this (NMClientAsync.class.getName(), callbackHandler, nmTokens);
} }
protected NMClientAsync(String name, CallbackHandler callbackHandler, protected NMClientAsync(String name, CallbackHandler callbackHandler) {
ConcurrentMap<String, Token> nmTokens) { this (name, new NMClientImpl(), callbackHandler);
this (name, new NMClientImpl(nmTokens), callbackHandler);
} }
@Private @Private

View File

@ -22,7 +22,6 @@
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -40,7 +39,6 @@
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@ -215,19 +213,6 @@ public Resource getAvailableResources() {
public int getClusterNodeCount() { public int getClusterNodeCount() {
return client.getClusterNodeCount(); return client.getClusterNodeCount();
} }
/**
* It returns the NMToken received on allocate call. It will not communicate
* with RM to get NMTokens. On allocate call whenever we receive new token
* along with new container AMRMClientAsync will cache this NMToken per node
* manager. This map returned should be shared with any application which is
* communicating with NodeManager (ex. NMClient / NMClientAsync) using
* NMTokens. If a new NMToken is received for the same node manager
* then it will be replaced.
*/
public ConcurrentMap<String, Token> getNMTokens() {
return client.getNMTokens();
}
private class HeartbeatThread extends Thread { private class HeartbeatThread extends Thread {
public HeartbeatThread() { public HeartbeatThread() {

View File

@ -82,14 +82,12 @@ public class NMClientAsyncImpl extends NMClientAsync {
protected ConcurrentMap<ContainerId, StatefulContainer> containers = protected ConcurrentMap<ContainerId, StatefulContainer> containers =
new ConcurrentHashMap<ContainerId, StatefulContainer>(); new ConcurrentHashMap<ContainerId, StatefulContainer>();
public NMClientAsyncImpl(CallbackHandler callbackHandler, public NMClientAsyncImpl(CallbackHandler callbackHandler) {
ConcurrentMap<String, Token> nmTokens) { this(NMClientAsync.class.getName(), callbackHandler);
this(NMClientAsync.class.getName(), callbackHandler, nmTokens);
} }
public NMClientAsyncImpl(String name, CallbackHandler callbackHandler, public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
ConcurrentMap<String, Token> nmTokens) { this(name, new NMClientImpl(), callbackHandler);
this(name, new NMClientImpl(nmTokens), callbackHandler);
} }
@Private @Private

View File

@ -34,7 +34,6 @@
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -56,8 +55,8 @@
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -82,7 +81,6 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
private int lastResponseId = 0; private int lastResponseId = 0;
private ConcurrentHashMap<String, Token> nmTokens;
protected ApplicationMasterProtocol rmClient; protected ApplicationMasterProtocol rmClient;
protected final ApplicationAttemptId appAttemptId; protected final ApplicationAttemptId appAttemptId;
@ -158,7 +156,6 @@ static boolean canFit(Resource arg0, Resource arg1) {
public AMRMClientImpl(ApplicationAttemptId appAttemptId) { public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
super(AMRMClientImpl.class.getName()); super(AMRMClientImpl.class.getName());
this.appAttemptId = appAttemptId; this.appAttemptId = appAttemptId;
this.nmTokens = new ConcurrentHashMap<String, Token>();
} }
@Override @Override
@ -285,12 +282,12 @@ public AllocateResponse allocate(float progressIndicator)
protected void populateNMTokens(AllocateResponse allocateResponse) { protected void populateNMTokens(AllocateResponse allocateResponse) {
for (NMToken token : allocateResponse.getNMTokens()) { for (NMToken token : allocateResponse.getNMTokens()) {
String nodeId = token.getNodeId().toString(); String nodeId = token.getNodeId().toString();
if (nmTokens.containsKey(nodeId)) { if (NMTokenCache.containsNMToken(nodeId)) {
LOG.debug("Replacing token for : " + nodeId); LOG.debug("Replacing token for : " + nodeId);
} else { } else {
LOG.debug("Received new token for : " + nodeId); LOG.debug("Received new token for : " + nodeId);
} }
nmTokens.put(nodeId, token.getToken()); NMTokenCache.setNMToken(nodeId, token.getToken());
} }
} }
@ -577,9 +574,4 @@ private void decResourceRequest(Priority priority,
+ " #asks=" + ask.size()); + " #asks=" + ask.size());
} }
} }
@Override
public ConcurrentHashMap<String, Token> getNMTokens() {
return nmTokens;
}
} }

View File

@ -23,7 +23,6 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -36,6 +35,7 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -54,13 +54,10 @@ public class ContainerManagementProtocolProxy {
private final int maxConnectedNMs; private final int maxConnectedNMs;
private final LinkedHashMap<String, ContainerManagementProtocolProxyData> cmProxy; private final LinkedHashMap<String, ContainerManagementProtocolProxyData> cmProxy;
private Map<String, Token> nmTokens;
private final Configuration conf; private final Configuration conf;
private final YarnRPC rpc; private final YarnRPC rpc;
public ContainerManagementProtocolProxy(Configuration conf, public ContainerManagementProtocolProxy(Configuration conf) {
Map<String, Token> nmTokens) {
this.nmTokens = nmTokens;
this.conf = conf; this.conf = conf;
maxConnectedNMs = maxConnectedNMs =
@ -86,10 +83,10 @@ public synchronized ContainerManagementProtocolProxyData getProxy(
// This get call will update the map which is working as LRU cache. // This get call will update the map which is working as LRU cache.
ContainerManagementProtocolProxyData proxy = ContainerManagementProtocolProxyData proxy =
cmProxy.get(containerManagerBindAddr); cmProxy.get(containerManagerBindAddr);
while (proxy != null while (proxy != null
&& !proxy.token.getIdentifier().equals( && !proxy.token.getIdentifier().equals(
nmTokens.get(containerManagerBindAddr).getIdentifier())) { NMTokenCache.getNMToken(containerManagerBindAddr).getIdentifier())) {
LOG.info("Refreshing proxy as NMToken got updated for node : " LOG.info("Refreshing proxy as NMToken got updated for node : "
+ containerManagerBindAddr); + containerManagerBindAddr);
// Token is updated. check if anyone has already tried closing it. // Token is updated. check if anyone has already tried closing it.
@ -112,7 +109,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy(
if (proxy == null) { if (proxy == null) {
proxy = proxy =
new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr, new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
containerId, nmTokens.get(containerManagerBindAddr)); containerId, NMTokenCache.getNMToken(containerManagerBindAddr));
if (cmProxy.size() > maxConnectedNMs) { if (cmProxy.size() > maxConnectedNMs) {
// Number of existing proxy exceed the limit. // Number of existing proxy exceed the limit.
String cmAddr = cmProxy.keySet().iterator().next(); String cmAddr = cmProxy.keySet().iterator().next();
@ -172,10 +169,6 @@ public synchronized void stopAllProxies() {
cmProxy.clear(); cmProxy.clear();
} }
public synchronized void setNMTokens(Map<String, Token> nmTokens) {
this.nmTokens = nmTokens;
}
public class ContainerManagementProtocolProxyData { public class ContainerManagementProtocolProxyData {
private final String containerManagerBindAddr; private final String containerManagerBindAddr;
private final ContainerManagementProtocol proxy; private final ContainerManagementProtocol proxy;
@ -201,10 +194,12 @@ public ContainerManagementProtocolProxyData(YarnRPC rpc,
protected ContainerManagementProtocol newProxy(final YarnRPC rpc, protected ContainerManagementProtocol newProxy(final YarnRPC rpc,
String containerManagerBindAddr, ContainerId containerId, Token token) String containerManagerBindAddr, ContainerId containerId, Token token)
throws InvalidToken { throws InvalidToken {
if (token == null) { if (token == null) {
throw new InvalidToken("No NMToken sent for " throw new InvalidToken("No NMToken sent for "
+ containerManagerBindAddr); + containerManagerBindAddr);
} }
final InetSocketAddress cmAddr = final InetSocketAddress cmAddr =
NetUtils.createSocketAddr(containerManagerBindAddr); NetUtils.createSocketAddr(containerManagerBindAddr);
LOG.info("Opening proxy : " + containerManagerBindAddr); LOG.info("Opening proxy : " + containerManagerBindAddr);

View File

@ -81,18 +81,15 @@ public class NMClientImpl extends NMClient {
new ConcurrentHashMap<ContainerId, StartedContainer>(); new ConcurrentHashMap<ContainerId, StartedContainer>();
//enabled by default //enabled by default
private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true); private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
private ContainerManagementProtocolProxy cmProxy; private ContainerManagementProtocolProxy cmProxy;
private ConcurrentMap<String, Token> nmTokens;
public NMClientImpl(ConcurrentMap<String, Token> nmTokens) { public NMClientImpl() {
super(NMClientImpl.class.getName()); super(NMClientImpl.class.getName());
this.nmTokens = nmTokens;
} }
public NMClientImpl(String name, ConcurrentMap<String, Token> nmTokens) { public NMClientImpl(String name) {
super(name); super(name);
this.nmTokens = nmTokens;
} }
@Override @Override
@ -126,8 +123,7 @@ protected synchronized void cleanupRunningContainers() {
@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf); super.serviceInit(conf);
cmProxy = cmProxy = new ContainerManagementProtocolProxy(conf);
new ContainerManagementProtocolProxy(conf, nmTokens);
} }
@Override @Override

View File

@ -26,11 +26,9 @@
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import junit.framework.Assert; import junit.framework.Assert;
@ -50,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -57,6 +56,7 @@
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
@ -488,8 +488,8 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
int iterationsLeft = 2; int iterationsLeft = 2;
Set<ContainerId> releases = new TreeSet<ContainerId>(); Set<ContainerId> releases = new TreeSet<ContainerId>();
ConcurrentHashMap<String, Token> nmTokens = amClient.getNMTokens(); NMTokenCache.clearCache();
Assert.assertEquals(0, nmTokens.size()); Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache());
HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>(); HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
while (allocatedContainerCount < containersRequestedAny while (allocatedContainerCount < containersRequestedAny
@ -505,19 +505,13 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
releases.add(rejectContainerId); releases.add(rejectContainerId);
amClient.releaseAssignedContainer(rejectContainerId); amClient.releaseAssignedContainer(rejectContainerId);
} }
Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size());
Iterator<String> nodeI = nmTokens.keySet().iterator(); for (NMToken token : allocResponse.getNMTokens()) {
while (nodeI.hasNext()) { String nodeID = token.getNodeId().toString();
String nodeId = nodeI.next(); if (receivedNMTokens.containsKey(nodeID)) {
if (!receivedNMTokens.containsKey(nodeId)) { Assert.fail("Received token again for : " + nodeID);
receivedNMTokens.put(nodeId, nmTokens.get(nodeId));
} else {
Assert.fail("Received token again for : " + nodeId);
} }
} receivedNMTokens.put(nodeID, token.getToken());
nodeI = receivedNMTokens.keySet().iterator();
while (nodeI.hasNext()) {
nmTokens.remove(nodeI.next());
} }
if(allocatedContainerCount < containersRequestedAny) { if(allocatedContainerCount < containersRequestedAny) {
@ -526,7 +520,6 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
} }
} }
Assert.assertEquals(0, amClient.getNMTokens().size());
// Should receive atleast 1 token // Should receive atleast 1 token
Assert.assertTrue(receivedNMTokens.size() > 0 Assert.assertTrue(receivedNMTokens.size() > 0
&& receivedNMTokens.size() <= nodeCount); && receivedNMTokens.size() <= nodeCount);

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -74,7 +75,6 @@ public class TestNMClient {
List<NodeReport> nodeReports = null; List<NodeReport> nodeReports = null;
ApplicationAttemptId attemptId = null; ApplicationAttemptId attemptId = null;
int nodeCount = 3; int nodeCount = 3;
ConcurrentHashMap<String, Token> nmTokens;
@Before @Before
public void setup() throws YarnException, IOException { public void setup() throws YarnException, IOException {
@ -136,7 +136,6 @@ public void setup() throws YarnException, IOException {
if (iterationsLeft == 0) { if (iterationsLeft == 0) {
fail("Application hasn't bee started"); fail("Application hasn't bee started");
} }
nmTokens = new ConcurrentHashMap<String, Token>();
// start am rm client // start am rm client
rmClient = rmClient =
@ -148,7 +147,7 @@ public void setup() throws YarnException, IOException {
assertEquals(STATE.STARTED, rmClient.getServiceState()); assertEquals(STATE.STARTED, rmClient.getServiceState());
// start am nm client // start am nm client
nmClient = (NMClientImpl) NMClient.createNMClient(nmTokens); nmClient = (NMClientImpl) NMClient.createNMClient();
nmClient.init(conf); nmClient.init(conf);
nmClient.start(); nmClient.start();
assertNotNull(nmClient); assertNotNull(nmClient);
@ -173,7 +172,7 @@ private void stopNmClient(boolean stopContainers) {
nmClient.stop(); nmClient.stop();
} }
@Test (timeout = 60000) @Test (timeout = 180000)
public void testNMClientNoCleanupOnStop() public void testNMClientNoCleanupOnStop()
throws YarnException, IOException { throws YarnException, IOException {
@ -241,7 +240,8 @@ private Set<Container> allocateContainers(
} }
if (!allocResponse.getNMTokens().isEmpty()) { if (!allocResponse.getNMTokens().isEmpty()) {
for (NMToken token : allocResponse.getNMTokens()) { for (NMToken token : allocResponse.getNMTokens()) {
nmTokens.put(token.getNodeId().toString(), token.getToken()); NMTokenCache.setNMToken(token.getNodeId().toString(),
token.getToken());
} }
} }
if(allocatedContainerCount < containersRequestedAny) { if(allocatedContainerCount < containersRequestedAny) {