From 34dfd27833e596ce9e0662280dcc35bf35de4515 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 21 Jun 2013 00:09:45 +0000 Subject: [PATCH] YARN-851. Share NMTokens using NMTokenCache (api-based) between AMRMClient and NMClient instead of memory based approach which is used currently. Contributed by Omkar Vinit Joshi. svn merge --ignore-ancestry -c 1495247 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1495248 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/mapreduce/v2/app/AppContext.java | 3 - .../hadoop/mapreduce/v2/app/MRAppMaster.java | 7 -- .../app/launcher/ContainerLauncherImpl.java | 3 +- .../v2/app/rm/RMContainerAllocator.java | 3 +- .../mapreduce/v2/app/MockAppContext.java | 9 +- .../v2/app/TestRuntimeEstimators.java | 6 - .../app/launcher/TestContainerLauncher.java | 2 +- .../hadoop/mapreduce/v2/hs/JobHistory.java | 7 -- hadoop-yarn-project/CHANGES.txt | 4 + .../RegisterApplicationMasterResponse.java | 6 +- .../hadoop/yarn/api/records/Container.java | 2 - .../distributedshell/ApplicationMaster.java | 3 +- .../TestDistributedShell.java | 4 +- .../hadoop/yarn/client/api/AMRMClient.java | 12 -- .../hadoop/yarn/client/api/NMClient.java | 23 +--- .../hadoop/yarn/client/api/NMTokenCache.java | 103 ++++++++++++++++++ .../client/api/async/AMRMClientAsync.java | 13 --- .../yarn/client/api/async/NMClientAsync.java | 14 +-- .../api/async/impl/AMRMClientAsyncImpl.java | 15 --- .../api/async/impl/NMClientAsyncImpl.java | 10 +- .../yarn/client/api/impl/AMRMClientImpl.java | 14 +-- .../ContainerManagementProtocolProxy.java | 19 ++-- .../yarn/client/api/impl/NMClientImpl.java | 14 +-- .../yarn/client/api/impl/TestAMRMClient.java | 27 ++--- .../yarn/client/api/impl/TestNMClient.java | 10 +- 25 files changed, 163 insertions(+), 170 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java index cbb8e72497c..946d9c62c42 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java @@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; @@ -62,6 +61,4 @@ public interface AppContext { Set getBlacklistedNodes(); ClientToAMTokenSecretManager getClientToAMTokenSecretManager(); - - Map getNMTokens(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 0925dadc381..6ff49809172 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -886,8 +886,6 @@ private class RunningAppContext implements AppContext { private final Configuration conf; private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; - private final ConcurrentHashMap nmTokens = - new ConcurrentHashMap(); public RunningAppContext(Configuration config) { this.conf = config; @@ -954,11 +952,6 @@ public Set getBlacklistedNodes() { public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { return clientToAMTokenSecretManager; } - - @Override - public Map getNMTokens() { - return this.nmTokens; - } } @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index b2732c638f8..28508a92ed2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -235,8 +235,7 @@ protected void serviceInit(Configuration conf) throws Exception { MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); super.serviceInit(conf); - cmProxy = - new ContainerManagementProtocolProxy(conf, context.getNMTokens()); + cmProxy = new ContainerManagementProtocolProxy(conf); } protected void serviceStart() throws Exception { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 1263582b6fc..dc134ebbe2e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; @@ -588,7 +589,7 @@ private List getResources() throws Exception { // Setting NMTokens if (response.getNMTokens() != null) { for (NMToken nmToken : response.getNMTokens()) { - getContext().getNMTokens().put(nmToken.getNodeId().toString(), + NMTokenCache.setNMToken(nmToken.getNodeId().toString(), nmToken.getToken()); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java index 521e28205d5..4b07236705b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java @@ -26,10 +26,9 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; import com.google.common.collect.Maps; @@ -131,10 +130,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { // Not implemented return null; } - - @Override - public Map getNMTokens() { - // Not Implemented - return null; - } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 1742d90e007..762dd572f3a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -862,11 +862,5 @@ public Set getBlacklistedNodes() { public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { return null; } - - @Override - public Map getNMTokens() { - // Not Implemented - return null; - } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index 0033490f136..563c31b36e3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -376,7 +376,7 @@ public ContainerManagementProtocolProxyData getCMProxy( containerId.getApplicationAttemptId(), NodeId.newInstance(addr.getHostName(), addr.getPort()), "user"); ContainerManagementProtocolProxy cmProxy = - new ContainerManagementProtocolProxy(conf, context.getNMTokens()); + new ContainerManagementProtocolProxy(conf); ContainerManagementProtocolProxyData proxy = cmProxy.new ContainerManagementProtocolProxyData( YarnRPC.create(conf), containerManagerBindAddr, containerId, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 8605bb4785d..2c1f3a26fff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -44,7 +44,6 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -316,10 +315,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { // Not implemented. return null; } - - @Override - public Map getNMTokens() { - // Not Implemented. - return null; - } } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8a74abb87dd..ebea13618d2 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -187,6 +187,10 @@ Release 2.1.0-beta - UNRELEASED ApplicationSubmissionContext to simplify the api. (Karthik Kambatla via acmurthy) + YARN-851. Share NMTokens using NMTokenCache (api-based) between AMRMClient + and NMClient instead of memory based approach which is used currently. (Omkar + Vinit Joshi via vinodkv) + NEW FEATURES YARN-482. FS: Extend SchedulingMode to intermediate queues. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 0517486bd8e..9c817b318ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -36,8 +36,9 @@ * *

The response contains critical details such as: *

    - *
  • Minimum capability for allocated resources in the cluster.
  • *
  • Maximum capability for allocated resources in the cluster.
  • + *
  • ApplicationACLs for the application.
  • + *
  • ClientToAMToken master key.
  • *
*

* @@ -50,11 +51,12 @@ public abstract class RegisterApplicationMasterResponse { @Unstable public static RegisterApplicationMasterResponse newInstance( Resource minCapability, Resource maxCapability, - Map acls) { + Map acls, ByteBuffer key) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); response.setApplicationACLs(acls); + response.setClientToAMTokenMasterKey(key); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index cb8d04bbda3..5cff2ecbed0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -44,12 +44,10 @@ *
  • HTTP uri of the node.
  • *
  • {@link Resource} allocated to the container.
  • *
  • {@link Priority} at which the container was allocated.
  • - *
  • {@link ContainerState} of the container.
  • *
  • * Container {@link Token} of the container, used to securely verify * authenticity of the allocation. *
  • - *
  • {@link ContainerStatus} of the container.
  • * *

    * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 638ce13f437..012af3ff9fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -448,8 +448,7 @@ public boolean run() throws YarnException, IOException { resourceManager.start(); containerListener = new NMCallbackHandler(); - nmClientAsync = - new NMClientAsyncImpl(containerListener, resourceManager.getNMTokens()); + nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index df333d230a0..8b05aa1890b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -100,7 +100,7 @@ public static void tearDown() throws IOException { } } - @Test(timeout=30000) + @Test(timeout=90000) public void testDSShell() throws Exception { String[] args = { @@ -128,7 +128,7 @@ public void testDSShell() throws Exception { } - @Test(timeout=30000) + @Test(timeout=90000) public void testDSShellWithInvalidArgs() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index f1890a84724..bd0f16b63e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -35,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -249,14 +247,4 @@ public abstract List> getMatchingRequests( Priority priority, String resourceName, Resource capability); - - /** - * It returns the NMToken received on allocate call. It will not communicate - * with RM to get NMTokens. On allocate call whenever we receive new token - * along with container AMRMClient will cache this NMToken per node manager. - * This map returned should be shared with any application which is - * communicating with NodeManager (ex. NMClient) using NMTokens. If a new - * NMToken is received for the same node manager then it will be replaced. - */ - public abstract ConcurrentMap getNMTokens(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 00e513dc2de..57e7db5cd33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -22,21 +22,17 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; -import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -46,30 +42,19 @@ public abstract class NMClient extends AbstractService { /** * Create a new instance of NMClient. - * @param nmTokens need to pass map of NMTokens which are received on - * {@link AMRMClient#allocate(float)} call as a part of - * {@link AllocateResponse}. - * key :- NodeAddr (host:port) - * Value :- Token {@link NMToken#getToken()} */ @Public - public static NMClient createNMClient(ConcurrentMap nmTokens) { - NMClient client = new NMClientImpl(nmTokens); + public static NMClient createNMClient() { + NMClient client = new NMClientImpl(); return client; } /** * Create a new instance of NMClient. - * @param nmTokens need to pass map of NMTokens which are received on - * {@link AMRMClient#allocate(float)} call as a part of - * {@link AllocateResponse}. - * key :- NodeAddr (host:port) - * Value :- Token {@link NMToken#getToken()} */ @Public - public static NMClient createNMClient(String name, - ConcurrentMap nmTokens) { - NMClient client = new NMClientImpl(name, nmTokens); + public static NMClient createNMClient(String name) { + NMClient client = new NMClientImpl(name); return client; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java new file mode 100644 index 00000000000..c14a12c0919 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java @@ -0,0 +1,103 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client.api; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.records.Token; + +import com.google.common.annotations.VisibleForTesting; + +/** + * It manages NMTokens required for communicating with Node manager. Its a + * static token cache. + */ +@Public +@Evolving +public class NMTokenCache { + private static ConcurrentHashMap nmTokens; + + + static { + nmTokens = new ConcurrentHashMap(); + } + + /** + * Returns NMToken, null if absent + * @param nodeAddr + * @return {@link Token} NMToken required for communicating with node + * manager + */ + @Public + @Evolving + public static Token getNMToken(String nodeAddr) { + return nmTokens.get(nodeAddr); + } + + /** + * Sets the NMToken for node address + * @param nodeAddr node address (host:port) + * @param token NMToken + */ + @Public + @Evolving + public static void setNMToken(String nodeAddr, Token token) { + nmTokens.put(nodeAddr, token); + } + + /** + * Returns true if NMToken is present in cache. + */ + @Private + @VisibleForTesting + public static boolean containsNMToken(String nodeAddr) { + return nmTokens.containsKey(nodeAddr); + } + + /** + * Returns the number of NMTokens present in cache. + */ + @Private + @VisibleForTesting + public static int numberOfNMTokensInCache() { + return nmTokens.size(); + } + + /** + * Removes NMToken for specified node manager + * @param nodeAddr node address (host:port) + */ + @Private + @VisibleForTesting + public static void removeNMToken(String nodeAddr) { + nmTokens.remove(nodeAddr); + } + + /** + * It will remove all the nm tokens from its cache + */ + @Private + @VisibleForTesting + public static void clearCache() { + nmTokens.clear(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 8d551dcd0a4..ae781b6003a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -37,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; @@ -198,17 +196,6 @@ public abstract void unregisterApplicationMaster( */ public abstract int getClusterNodeCount(); - /** - * It returns the NMToken received on allocate call. It will not communicate - * with RM to get NMTokens. On allocate call whenever we receive new token - * along with new container AMRMClientAsync will cache this NMToken per node - * manager. This map returned should be shared with any application which is - * communicating with NodeManager (ex. NMClient / NMClientAsync) using - * NMTokens. If a new NMToken is received for the same node manager - * then it will be replaced. - */ - public abstract ConcurrentMap getNMTokens(); - public interface CallbackHandler { /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java index 507f8d9906f..5cb504d3dd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java @@ -112,18 +112,16 @@ public abstract class NMClientAsync extends AbstractService { protected CallbackHandler callbackHandler; public static NMClientAsync createNMClientAsync( - CallbackHandler callbackHandler, ConcurrentMap nmTokens) { - return new NMClientAsyncImpl(callbackHandler, nmTokens); + CallbackHandler callbackHandler) { + return new NMClientAsyncImpl(callbackHandler); } - protected NMClientAsync(CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this (NMClientAsync.class.getName(), callbackHandler, nmTokens); + protected NMClientAsync(CallbackHandler callbackHandler) { + this (NMClientAsync.class.getName(), callbackHandler); } - protected NMClientAsync(String name, CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this (name, new NMClientImpl(nmTokens), callbackHandler); + protected NMClientAsync(String name, CallbackHandler callbackHandler) { + this (name, new NMClientImpl(), callbackHandler); } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index e667e37c240..cc3969dbe30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; @@ -40,7 +39,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -215,19 +213,6 @@ public Resource getAvailableResources() { public int getClusterNodeCount() { return client.getClusterNodeCount(); } - - /** - * It returns the NMToken received on allocate call. It will not communicate - * with RM to get NMTokens. On allocate call whenever we receive new token - * along with new container AMRMClientAsync will cache this NMToken per node - * manager. This map returned should be shared with any application which is - * communicating with NodeManager (ex. NMClient / NMClientAsync) using - * NMTokens. If a new NMToken is received for the same node manager - * then it will be replaced. - */ - public ConcurrentMap getNMTokens() { - return client.getNMTokens(); - } private class HeartbeatThread extends Thread { public HeartbeatThread() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java index 7f7df1a8564..700a509b65e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java @@ -82,14 +82,12 @@ public class NMClientAsyncImpl extends NMClientAsync { protected ConcurrentMap containers = new ConcurrentHashMap(); - public NMClientAsyncImpl(CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this(NMClientAsync.class.getName(), callbackHandler, nmTokens); + public NMClientAsyncImpl(CallbackHandler callbackHandler) { + this(NMClientAsync.class.getName(), callbackHandler); } - public NMClientAsyncImpl(String name, CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this(name, new NMClientImpl(nmTokens), callbackHandler); + public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) { + this(name, new NMClientImpl(), callbackHandler); } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 74c86b9826e..68cc2870d3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -34,7 +34,6 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,8 +55,8 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -82,7 +81,6 @@ public class AMRMClientImpl extends AMRMClient { RecordFactoryProvider.getRecordFactory(null); private int lastResponseId = 0; - private ConcurrentHashMap nmTokens; protected ApplicationMasterProtocol rmClient; protected final ApplicationAttemptId appAttemptId; @@ -158,7 +156,6 @@ static boolean canFit(Resource arg0, Resource arg1) { public AMRMClientImpl(ApplicationAttemptId appAttemptId) { super(AMRMClientImpl.class.getName()); this.appAttemptId = appAttemptId; - this.nmTokens = new ConcurrentHashMap(); } @Override @@ -285,12 +282,12 @@ public AllocateResponse allocate(float progressIndicator) protected void populateNMTokens(AllocateResponse allocateResponse) { for (NMToken token : allocateResponse.getNMTokens()) { String nodeId = token.getNodeId().toString(); - if (nmTokens.containsKey(nodeId)) { + if (NMTokenCache.containsNMToken(nodeId)) { LOG.debug("Replacing token for : " + nodeId); } else { LOG.debug("Received new token for : " + nodeId); } - nmTokens.put(nodeId, token.getToken()); + NMTokenCache.setNMToken(nodeId, token.getToken()); } } @@ -577,9 +574,4 @@ private void decResourceRequest(Priority priority, + " #asks=" + ask.size()); } } - - @Override - public ConcurrentHashMap getNMTokens() { - return nmTokens; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java index a22e200fcbf..4ca44e12a42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -54,13 +54,10 @@ public class ContainerManagementProtocolProxy { private final int maxConnectedNMs; private final LinkedHashMap cmProxy; - private Map nmTokens; private final Configuration conf; private final YarnRPC rpc; - public ContainerManagementProtocolProxy(Configuration conf, - Map nmTokens) { - this.nmTokens = nmTokens; + public ContainerManagementProtocolProxy(Configuration conf) { this.conf = conf; maxConnectedNMs = @@ -86,10 +83,10 @@ public synchronized ContainerManagementProtocolProxyData getProxy( // This get call will update the map which is working as LRU cache. ContainerManagementProtocolProxyData proxy = cmProxy.get(containerManagerBindAddr); - + while (proxy != null && !proxy.token.getIdentifier().equals( - nmTokens.get(containerManagerBindAddr).getIdentifier())) { + NMTokenCache.getNMToken(containerManagerBindAddr).getIdentifier())) { LOG.info("Refreshing proxy as NMToken got updated for node : " + containerManagerBindAddr); // Token is updated. check if anyone has already tried closing it. @@ -112,7 +109,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy( if (proxy == null) { proxy = new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr, - containerId, nmTokens.get(containerManagerBindAddr)); + containerId, NMTokenCache.getNMToken(containerManagerBindAddr)); if (cmProxy.size() > maxConnectedNMs) { // Number of existing proxy exceed the limit. String cmAddr = cmProxy.keySet().iterator().next(); @@ -172,10 +169,6 @@ public synchronized void stopAllProxies() { cmProxy.clear(); } - public synchronized void setNMTokens(Map nmTokens) { - this.nmTokens = nmTokens; - } - public class ContainerManagementProtocolProxyData { private final String containerManagerBindAddr; private final ContainerManagementProtocol proxy; @@ -201,10 +194,12 @@ public ContainerManagementProtocolProxyData(YarnRPC rpc, protected ContainerManagementProtocol newProxy(final YarnRPC rpc, String containerManagerBindAddr, ContainerId containerId, Token token) throws InvalidToken { + if (token == null) { throw new InvalidToken("No NMToken sent for " + containerManagerBindAddr); } + final InetSocketAddress cmAddr = NetUtils.createSocketAddr(containerManagerBindAddr); LOG.info("Opening proxy : " + containerManagerBindAddr); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index 02cfbfb953c..54a73faf74f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -81,18 +81,15 @@ public class NMClientImpl extends NMClient { new ConcurrentHashMap(); //enabled by default - private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true); - private ContainerManagementProtocolProxy cmProxy; - private ConcurrentMap nmTokens; + private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true); + private ContainerManagementProtocolProxy cmProxy; - public NMClientImpl(ConcurrentMap nmTokens) { + public NMClientImpl() { super(NMClientImpl.class.getName()); - this.nmTokens = nmTokens; } - public NMClientImpl(String name, ConcurrentMap nmTokens) { + public NMClientImpl(String name) { super(name); - this.nmTokens = nmTokens; } @Override @@ -126,8 +123,7 @@ protected synchronized void cleanupRunningContainers() { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); - cmProxy = - new ContainerManagementProtocolProxy(conf, nmTokens); + cmProxy = new ContainerManagementProtocolProxy(conf); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 5955f26b0bc..4c034ae0218 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -26,11 +26,9 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import junit.framework.Assert; @@ -50,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest; @@ -488,8 +488,8 @@ private void testAllocation(final AMRMClientImpl amClient) int iterationsLeft = 2; Set releases = new TreeSet(); - ConcurrentHashMap nmTokens = amClient.getNMTokens(); - Assert.assertEquals(0, nmTokens.size()); + NMTokenCache.clearCache(); + Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache()); HashMap receivedNMTokens = new HashMap(); while (allocatedContainerCount < containersRequestedAny @@ -505,19 +505,13 @@ private void testAllocation(final AMRMClientImpl amClient) releases.add(rejectContainerId); amClient.releaseAssignedContainer(rejectContainerId); } - Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size()); - Iterator nodeI = nmTokens.keySet().iterator(); - while (nodeI.hasNext()) { - String nodeId = nodeI.next(); - if (!receivedNMTokens.containsKey(nodeId)) { - receivedNMTokens.put(nodeId, nmTokens.get(nodeId)); - } else { - Assert.fail("Received token again for : " + nodeId); + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + if (receivedNMTokens.containsKey(nodeID)) { + Assert.fail("Received token again for : " + nodeID); } - } - nodeI = receivedNMTokens.keySet().iterator(); - while (nodeI.hasNext()) { - nmTokens.remove(nodeI.next()); + receivedNMTokens.put(nodeID, token.getToken()); } if(allocatedContainerCount < containersRequestedAny) { @@ -526,7 +520,6 @@ private void testAllocation(final AMRMClientImpl amClient) } } - Assert.assertEquals(0, amClient.getNMTokens().size()); // Should receive atleast 1 token Assert.assertTrue(receivedNMTokens.size() > 0 && receivedNMTokens.size() <= nodeCount); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 5bcb428c3e0..dc6367b10b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -74,7 +75,6 @@ public class TestNMClient { List nodeReports = null; ApplicationAttemptId attemptId = null; int nodeCount = 3; - ConcurrentHashMap nmTokens; @Before public void setup() throws YarnException, IOException { @@ -136,7 +136,6 @@ public void setup() throws YarnException, IOException { if (iterationsLeft == 0) { fail("Application hasn't bee started"); } - nmTokens = new ConcurrentHashMap(); // start am rm client rmClient = @@ -148,7 +147,7 @@ public void setup() throws YarnException, IOException { assertEquals(STATE.STARTED, rmClient.getServiceState()); // start am nm client - nmClient = (NMClientImpl) NMClient.createNMClient(nmTokens); + nmClient = (NMClientImpl) NMClient.createNMClient(); nmClient.init(conf); nmClient.start(); assertNotNull(nmClient); @@ -173,7 +172,7 @@ private void stopNmClient(boolean stopContainers) { nmClient.stop(); } - @Test (timeout = 60000) + @Test (timeout = 180000) public void testNMClientNoCleanupOnStop() throws YarnException, IOException { @@ -241,7 +240,8 @@ private Set allocateContainers( } if (!allocResponse.getNMTokens().isEmpty()) { for (NMToken token : allocResponse.getNMTokens()) { - nmTokens.put(token.getNodeId().toString(), token.getToken()); + NMTokenCache.setNMToken(token.getNodeId().toString(), + token.getToken()); } } if(allocatedContainerCount < containersRequestedAny) {