From 924f0429b7f24779a9266a3b25167b1d52ba5b94 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 30 Oct 2013 22:38:38 +0000 Subject: [PATCH] YARN-1321. Changed NMTokenCache to support both singleton and an instance usage. Contributed by Alejandro Abdelnur. svn merge --ignore-ancestry -c 1537334 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1537335 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/client/api/AMRMClient.java | 32 ++++ .../hadoop/yarn/client/api/NMClient.java | 31 ++++ .../hadoop/yarn/client/api/NMTokenCache.java | 140 ++++++++++++++++-- .../yarn/client/api/impl/AMRMClientImpl.java | 5 +- .../ContainerManagementProtocolProxy.java | 11 +- .../yarn/client/api/impl/NMClientImpl.java | 5 +- .../yarn/client/api/impl/TestAMRMClient.java | 11 +- .../yarn/client/api/impl/TestNMClient.java | 12 +- 9 files changed, 230 insertions(+), 20 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 494870b15e8..ba939b7faa9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -105,6 +105,9 @@ Release 2.2.1 - UNRELEASED YARN-1109. Demote NodeManager "Sending out status for container" logs to debug (haosdent via Sandy Ryza) + YARN-1321. Changed NMTokenCache to support both singleton and an instance + usage. (Alejandro Abdelnur via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index f4913cdbda5..23f4ea13b4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -59,9 +59,12 @@ public abstract class AMRMClient extends return client; } + private NMTokenCache nmTokenCache; + @Private protected AMRMClient(String name) { super(name); + nmTokenCache = NMTokenCache.getSingleton(); } /** @@ -297,4 +300,33 @@ public abstract class AMRMClient extends */ public abstract void updateBlacklist(List blacklistAdditions, List blacklistRemovals); + + /** + * Set the NM token cache for the AMRMClient. This cache must + * be shared with the {@link NMClient} used to manage containers for the + * AMRMClient + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @param nmTokenCache the NM token cache to use. + */ + public void setNMTokenCache(NMTokenCache nmTokenCache) { + this.nmTokenCache = nmTokenCache; + } + + /** + * Get the NM token cache of the AMRMClient. This cache must be + * shared with the {@link NMClient} used to manage containers for the + * AMRMClient. + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @return the NM token cache. + */ + public NMTokenCache getNMTokenCache() { + return nmTokenCache; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 57e7db5cd33..721728e06f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -58,6 +58,8 @@ public abstract class NMClient extends AbstractService { return client; } + private NMTokenCache nmTokenCache = NMTokenCache.getSingleton(); + @Private protected NMClient(String name) { super(name); @@ -118,4 +120,33 @@ public abstract class NMClient extends AbstractService { * @param enabled whether the feature is enabled or not */ public abstract void cleanupRunningContainersOnStop(boolean enabled); + + /** + * Set the NM Token cache of the NMClient. This cache must be + * shared with the {@link AMRMClient} that requested the containers managed + * by this NMClient + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @param nmTokenCache the NM token cache to use. + */ + public void setNMTokenCache(NMTokenCache nmTokenCache) { + this.nmTokenCache = nmTokenCache; + } + + /** + * Get the NM token cache of the NMClient. This cache must be + * shared with the {@link AMRMClient} that requested the containers managed + * by this NMClient + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @return the NM token cache + */ + public NMTokenCache getNMTokenCache() { + return nmTokenCache; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java index c14a12c0919..0e7356fe1f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java @@ -23,21 +23,139 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import com.google.common.annotations.VisibleForTesting; /** - * It manages NMTokens required for communicating with Node manager. Its a - * static token cache. + * NMTokenCache manages NMTokens required for an Application Master + * communicating with individual NodeManagers. + *

+ * By default Yarn client libraries {@link AMRMClient} and {@link NMClient} use + * {@link #getSingleton()} instance of the cache. + *

    + *
  • Using the singleton instance of the cache is appropriate when running a + * single ApplicationMaster in the same JVM.
  • + *
  • When using the singleton, users don't need to do anything special, + * {@link AMRMClient} and {@link NMClient} are already set up to use the default + * singleton {@link NMTokenCache}
  • + *
+ *

+ * If running multiple Application Masters in the same JVM, a different cache + * instance should be used for each Application Master. + *

+ *

    + *
  • + * If using the {@link AMRMClient} and the {@link NMClient}, setting up and using + * an instance cache is as follows: + *

    + * + *

    + *   NMTokenCache nmTokenCache = new NMTokenCache();
    + *   AMRMClient rmClient = AMRMClient.createAMRMClient();
    + *   NMClient nmClient = NMClient.createNMClient();
    + *   nmClient.setNMTokenCache(nmTokenCache);
    + *   ...
    + * 
    + *
  • + *
  • + * If using the {@link AMRMClientAsync} and the {@link NMClientAsync}, setting up + * and using an instance cache is as follows: + *

    + * + *

    + *   NMTokenCache nmTokenCache = new NMTokenCache();
    + *   AMRMClient rmClient = AMRMClient.createAMRMClient();
    + *   NMClient nmClient = NMClient.createNMClient();
    + *   nmClient.setNMTokenCache(nmTokenCache);
    + *   AMRMClientAsync rmClientAsync = new AMRMClientAsync(rmClient, 1000, [AMRM_CALLBACK]);
    + *   NMClientAsync nmClientAsync = new NMClientAsync("nmClient", nmClient, [NM_CALLBACK]);
    + *   ...
    + * 
    + *
  • + *
  • + * If using {@link ApplicationMasterProtocol} and + * {@link ContainerManagementProtocol} directly, setting up and using an + * instance cache is as follows: + *

    + * + *

    + *   NMTokenCache nmTokenCache = new NMTokenCache();
    + *   ...
    + *   ApplicationMasterProtocol amPro = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
    + *   ...
    + *   AllocateRequest allocateRequest = ...
    + *   ...
    + *   AllocateResponse allocateResponse = rmClient.allocate(allocateRequest);
    + *   for (NMToken token : allocateResponse.getNMTokens()) {
    + *     nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
    + *   }
    + *   ...
    + *   ContainerManagementProtocolProxy nmPro = ContainerManagementProtocolProxy(conf, nmTokenCache);
    + *   ...
    + *   nmPro.startContainer(container, containerContext);
    + *   ...
    + * 
    + *
  • + *
+ * It is also possible to mix the usage of a client (AMRMClient or + * NMClient, or the async versions of them) with a protocol proxy ( + * ContainerManagementProtocolProxy or + * ApplicationMasterProtocol). */ @Public @Evolving public class NMTokenCache { - private static ConcurrentHashMap nmTokens; + private static final NMTokenCache NM_TOKEN_CACHE = new NMTokenCache(); + /** + * Returns the singleton NM token cache. + * + * @return the singleton NM token cache. + */ + public static NMTokenCache getSingleton() { + return NM_TOKEN_CACHE; + } - static { + /** + * Returns NMToken, null if absent. Only the singleton obtained from + * {@link #getSingleton()} is looked at for the tokens. If you are using your + * own NMTokenCache that is different from the singleton, use + * {@link #getToken(String) } + * + * @param nodeAddr + * @return {@link Token} NMToken required for communicating with node manager + */ + @Public + public static Token getNMToken(String nodeAddr) { + return NM_TOKEN_CACHE.getToken(nodeAddr); + } + + /** + * Sets the NMToken for node address only in the singleton obtained from + * {@link #getSingleton()}. If you are using your own NMTokenCache that is + * different from the singleton, use {@link #setToken(String, Token) } + * + * @param nodeAddr + * node address (host:port) + * @param token + * NMToken + */ + @Public + public static void setNMToken(String nodeAddr, Token token) { + NM_TOKEN_CACHE.setToken(nodeAddr, token); + } + + private ConcurrentHashMap nmTokens; + + /** + * Creates a NM token cache instance. + */ + public NMTokenCache() { nmTokens = new ConcurrentHashMap(); } @@ -45,11 +163,11 @@ public class NMTokenCache { * Returns NMToken, null if absent * @param nodeAddr * @return {@link Token} NMToken required for communicating with node - * manager + * manager */ @Public @Evolving - public static Token getNMToken(String nodeAddr) { + public Token getToken(String nodeAddr) { return nmTokens.get(nodeAddr); } @@ -60,7 +178,7 @@ public class NMTokenCache { */ @Public @Evolving - public static void setNMToken(String nodeAddr, Token token) { + public void setToken(String nodeAddr, Token token) { nmTokens.put(nodeAddr, token); } @@ -69,7 +187,7 @@ public class NMTokenCache { */ @Private @VisibleForTesting - public static boolean containsNMToken(String nodeAddr) { + public boolean containsToken(String nodeAddr) { return nmTokens.containsKey(nodeAddr); } @@ -78,7 +196,7 @@ public class NMTokenCache { */ @Private @VisibleForTesting - public static int numberOfNMTokensInCache() { + public int numberOfTokensInCache() { return nmTokens.size(); } @@ -88,7 +206,7 @@ public class NMTokenCache { */ @Private @VisibleForTesting - public static void removeNMToken(String nodeAddr) { + public void removeToken(String nodeAddr) { nmTokens.remove(nodeAddr); } @@ -97,7 +215,7 @@ public class NMTokenCache { */ @Private @VisibleForTesting - public static void clearCache() { + public void clearCache() { nmTokens.clear(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 39225715c19..061c50bd7b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; -import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -288,12 +287,12 @@ public class AMRMClientImpl extends AMRMClient { protected void populateNMTokens(AllocateResponse allocateResponse) { for (NMToken token : allocateResponse.getNMTokens()) { String nodeId = token.getNodeId().toString(); - if (NMTokenCache.containsNMToken(nodeId)) { + if (getNMTokenCache().containsToken(nodeId)) { LOG.debug("Replacing token for : " + nodeId); } else { LOG.debug("Received new token for : " + nodeId); } - NMTokenCache.setNMToken(nodeId, token.getToken()); + getNMTokenCache().setToken(nodeId, token.getToken()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java index 4ca44e12a42..fbc772fb85a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java @@ -56,9 +56,16 @@ public class ContainerManagementProtocolProxy { private final LinkedHashMap cmProxy; private final Configuration conf; private final YarnRPC rpc; + private NMTokenCache nmTokenCache; public ContainerManagementProtocolProxy(Configuration conf) { + this(conf, NMTokenCache.getSingleton()); + } + + public ContainerManagementProtocolProxy(Configuration conf, + NMTokenCache nmTokenCache) { this.conf = conf; + this.nmTokenCache = nmTokenCache; maxConnectedNMs = conf.getInt(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES, @@ -86,7 +93,7 @@ public class ContainerManagementProtocolProxy { while (proxy != null && !proxy.token.getIdentifier().equals( - NMTokenCache.getNMToken(containerManagerBindAddr).getIdentifier())) { + nmTokenCache.getToken(containerManagerBindAddr).getIdentifier())) { LOG.info("Refreshing proxy as NMToken got updated for node : " + containerManagerBindAddr); // Token is updated. check if anyone has already tried closing it. @@ -109,7 +116,7 @@ public class ContainerManagementProtocolProxy { if (proxy == null) { proxy = new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr, - containerId, NMTokenCache.getNMToken(containerManagerBindAddr)); + containerId, nmTokenCache.getToken(containerManagerBindAddr)); if (cmProxy.size() > maxConnectedNMs) { // Number of existing proxy exceed the limit. String cmAddr = cmProxy.keySet().iterator().next(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index b5f0be11cc8..3518f3540fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -130,7 +130,10 @@ public class NMClientImpl extends NMClient { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); - cmProxy = new ContainerManagementProtocolProxy(conf); + if (getNMTokenCache() == null) { + throw new IllegalStateException("NMTokenCache has not been set"); + } + cmProxy = new ContainerManagementProtocolProxy(conf, getNMTokenCache()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 58ef215f24d..1f7565be181 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -626,6 +626,13 @@ public class TestAMRMClient { try { // start am rm client amClient = AMRMClient.createAMRMClient(); + + //setting an instance NMTokenCache + amClient.setNMTokenCache(new NMTokenCache()); + //asserting we are not using the singleton instance cache + Assert.assertNotSame(NMTokenCache.getSingleton(), + amClient.getNMTokenCache()); + amClient.init(conf); amClient.start(); @@ -681,8 +688,8 @@ public class TestAMRMClient { int iterationsLeft = 3; Set releases = new TreeSet(); - NMTokenCache.clearCache(); - Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache()); + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache()); HashMap receivedNMTokens = new HashMap(); while (allocatedContainerCount < containersRequestedAny diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 76e87f5b2c3..126dfcb4349 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -78,6 +78,7 @@ public class TestNMClient { List nodeReports = null; ApplicationAttemptId attemptId = null; int nodeCount = 3; + NMTokenCache nmTokenCache = null; @Before public void setup() throws YarnException, IOException { @@ -155,10 +156,16 @@ public class TestNMClient { .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + //creating an instance NMTokenCase + nmTokenCache = new NMTokenCache(); + // start am rm client rmClient = (AMRMClientImpl) AMRMClient . createAMRMClient(); + + //setting an instance NMTokenCase + rmClient.setNMTokenCache(nmTokenCache); rmClient.init(conf); rmClient.start(); assertNotNull(rmClient); @@ -166,6 +173,9 @@ public class TestNMClient { // start am nm client nmClient = (NMClientImpl) NMClient.createNMClient(); + + //propagating the AMRMClient NMTokenCache instance + nmClient.setNMTokenCache(rmClient.getNMTokenCache()); nmClient.init(conf); nmClient.start(); assertNotNull(nmClient); @@ -258,7 +268,7 @@ public class TestNMClient { } if (!allocResponse.getNMTokens().isEmpty()) { for (NMToken token : allocResponse.getNMTokens()) { - NMTokenCache.setNMToken(token.getNodeId().toString(), + rmClient.getNMTokenCache().setToken(token.getNodeId().toString(), token.getToken()); } }