From 8317fd5de6d3e2ee6f78625b2b11ba33fafe09da Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 25 Jun 2014 21:57:28 +0000 Subject: [PATCH] YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when AMs heartbeat in. Contributed by Jason Lowe. svn merge --ignore-ancestry -c 1605616 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1605617 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/CapacityScheduler.java | 16 +- .../capacity/TestCapacityScheduler.java | 142 ++++++++++++++++++ 3 files changed, 154 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 76982152a66..84a75eaf612 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -177,6 +177,9 @@ Release 2.5.0 - UNRELEASED YARN-2152. Added missing information into ContainerTokenIdentifier so that NodeManagers can report the same to RM when RM restarts. (Jian He via vinodkv) + YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when + AMs heartbeat in. (Jason Lowe via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5b982ea23f7..92727e37f2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import com.google.common.base.Preconditions; + import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -30,6 +31,7 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -180,7 +182,7 @@ public Configuration getConf() { private Map queues = new ConcurrentHashMap(); - private int numNodeManagers = 0; + private AtomicInteger numNodeManagers = new AtomicInteger(0); private ResourceCalculator calculator; private boolean usePortForNodeName; @@ -236,8 +238,8 @@ public Comparator getQueueComparator() { } @Override - public synchronized int getNumClusterNodes() { - return numNodeManagers; + public int getNumClusterNodes() { + return numNodeManagers.get(); } @Override @@ -953,11 +955,11 @@ private synchronized void addNode(RMNode nodeManager) { usePortForNodeName)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); - ++numNodeManagers; + int numNodes = numNodeManagers.incrementAndGet(); LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); - if (scheduleAsynchronously && numNodeManagers == 1) { + if (scheduleAsynchronously && numNodes == 1) { asyncSchedulerThread.beginSchedule(); } } @@ -969,9 +971,9 @@ private synchronized void removeNode(RMNode nodeInfo) { } Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); root.updateClusterResource(clusterResource); - --numNodeManagers; + int numNodes = numNodeManagers.decrementAndGet(); - if (scheduleAsynchronously && numNodeManagers == 0) { + if (scheduleAsynchronously && numNodes == 0) { asyncSchedulerThread.suspendSchedule(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 6322df3ffe9..c3b1d575a5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -25,15 +25,29 @@ import static org.mockito.Mockito.when; import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.LocalConfigurationProvider; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -46,13 +60,20 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; +import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; +import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -686,4 +707,125 @@ public void testAsyncScheduling() throws Exception { } } + @Test(timeout = 30000) + public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception { + final YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MyContainerManager containerManager = new MyContainerManager(); + final MockRMWithAMS rm = + new MockRMWithAMS(conf, containerManager); + rm.start(); + + MockNM nm1 = rm.registerNode("localhost:1234", 5120); + + Map acls = + new HashMap(2); + acls.put(ApplicationAccessType.VIEW_APP, "*"); + RMApp app = rm.submitApp(1024, "appname", "appuser", acls); + + nm1.nodeHeartbeat(true); + + RMAppAttempt attempt = app.getCurrentAppAttempt(); + ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); + int msecToWait = 10000; + int msecToSleep = 100; + while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED + && msecToWait > 0) { + LOG.info("Waiting for AppAttempt to reach LAUNCHED state. " + + "Current state is " + attempt.getAppAttemptState()); + Thread.sleep(msecToSleep); + msecToWait -= msecToSleep; + } + Assert.assertEquals(attempt.getAppAttemptState(), + RMAppAttemptState.LAUNCHED); + + // Create a client to the RM. + final YarnRPC rpc = YarnRPC.create(conf); + + UserGroupInformation currentUser = + UserGroupInformation.createRemoteUser(applicationAttemptId.toString()); + Credentials credentials = containerManager.getContainerCredentials(); + final InetSocketAddress rmBindAddress = + rm.getApplicationMasterService().getBindAddress(); + Token amRMToken = + MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress, + credentials.getAllTokens()); + currentUser.addToken(amRMToken); + ApplicationMasterProtocol client = + currentUser.doAs(new PrivilegedAction() { + @Override + public ApplicationMasterProtocol run() { + return (ApplicationMasterProtocol) rpc.getProxy( + ApplicationMasterProtocol.class, rmBindAddress, conf); + } + }); + + RegisterApplicationMasterRequest request = + RegisterApplicationMasterRequest.newInstance("localhost", 12345, ""); + client.registerApplicationMaster(request); + + // grab the scheduler lock from another thread + // and verify an allocate call in this thread doesn't block on it + final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + final CyclicBarrier barrier = new CyclicBarrier(2); + Thread otherThread = new Thread(new Runnable() { + @Override + public void run() { + synchronized(cs) { + try { + barrier.await(); + barrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + } + }); + otherThread.start(); + barrier.await(); + AllocateRequest allocateRequest = + AllocateRequest.newInstance(0, 0.0f, null, null, null); + client.allocate(allocateRequest); + barrier.await(); + otherThread.join(); + + rm.stop(); + } + + @Test + public void testNumClusterNodes() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + CapacityScheduler cs = new CapacityScheduler(); + cs.setConf(conf); + RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + cs.setRMContext(rmContext); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + cs.init(csConf); + cs.start(); + assertEquals(0, cs.getNumClusterNodes()); + + RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1); + RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2); + cs.handle(new NodeAddedSchedulerEvent(n1)); + cs.handle(new NodeAddedSchedulerEvent(n2)); + assertEquals(2, cs.getNumClusterNodes()); + + cs.handle(new NodeRemovedSchedulerEvent(n1)); + assertEquals(1, cs.getNumClusterNodes()); + cs.handle(new NodeAddedSchedulerEvent(n1)); + assertEquals(2, cs.getNumClusterNodes()); + cs.handle(new NodeRemovedSchedulerEvent(n2)); + cs.handle(new NodeRemovedSchedulerEvent(n1)); + assertEquals(0, cs.getNumClusterNodes()); + + cs.stop(); + } }