From 0c6db6af6edcd74f21582b2a61f0c42aaf9a7b32 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 18 Mar 2014 02:53:50 +0000 Subject: [PATCH] YARN-1512. Enhanced CapacityScheduler to be able to decouple scheduling from node-heartbeats. Contributed by Arun C Murthy. svn merge --ignore-ancestry -c 1578722 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1578724 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 6 + .../scheduler/capacity/CapacityScheduler.java | 129 +++++++++++++++++- .../CapacitySchedulerConfiguration.java | 21 +++ .../capacity/TestCapacityScheduler.java | 36 +++-- 5 files changed, 180 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6e1effa0b94..cefcb5148b9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -297,6 +297,9 @@ Release 2.4.0 - UNRELEASED YARN-1824. Improved NodeManager and clients to be able to handle cross platform application submissions. (Jian He via vinodkv) + YARN-1512. Enhanced CapacityScheduler to be able to decouple scheduling from + node-heartbeats. (Arun C Murthy via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 0fac0b98f1f..0fa1a9e8af1 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -172,6 +172,12 @@ + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 30b2fd6fb30..e28c18c0d7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -21,11 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -194,6 +197,18 @@ public class CapacityScheduler extends AbstractYarnScheduler private ResourceCalculator calculator; private boolean usePortForNodeName; + private boolean scheduleAsynchronously; + private AsyncScheduleThread asyncSchedulerThread; + + /** + * EXPERT + */ + private long asyncScheduleInterval; + private static final String ASYNC_SCHEDULER_INTERVAL = + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms"; + private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; + public CapacityScheduler() {} @Override @@ -272,11 +287,23 @@ public class CapacityScheduler extends AbstractYarnScheduler initializeQueues(this.conf); + scheduleAsynchronously = this.conf.getScheduleAynschronously(); + asyncScheduleInterval = + this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, + DEFAULT_ASYNC_SCHEDULER_INTERVAL); + if (scheduleAsynchronously) { + asyncSchedulerThread = new AsyncScheduleThread(this); + asyncSchedulerThread.start(); + } + initialized = true; LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + - "maximumAllocation=<" + getMaximumResourceCapability() + ">"); + "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + + "asynchronousScheduling=" + scheduleAsynchronously + ", " + + "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); + } else { CapacitySchedulerConfiguration oldConf = this.conf; this.conf = loadCapacitySchedulerConfiguration(configuration); @@ -290,7 +317,69 @@ public class CapacityScheduler extends AbstractYarnScheduler } } } + + long getAsyncScheduleInterval() { + return asyncScheduleInterval; + } + private final static Random random = new Random(System.currentTimeMillis()); + + /** + * Schedule on all nodes by starting at a random point. + * @param cs + */ + static void schedule(CapacityScheduler cs) { + // First randomize the start point + int current = 0; + Collection nodes = cs.getAllNodes().values(); + int start = random.nextInt(nodes.size()); + for (FiCaSchedulerNode node : nodes) { + if (current++ >= start) { + cs.allocateContainersToNode(node); + } + } + // Now, just get everyone to be safe + for (FiCaSchedulerNode node : nodes) { + cs.allocateContainersToNode(node); + } + try { + Thread.sleep(cs.getAsyncScheduleInterval()); + } catch (InterruptedException e) {} + } + + static class AsyncScheduleThread extends Thread { + + private final CapacityScheduler cs; + private AtomicBoolean runSchedules = new AtomicBoolean(false); + + public AsyncScheduleThread(CapacityScheduler cs) { + this.cs = cs; + setDaemon(true); + } + + @Override + public void run() { + while (true) { + if (!runSchedules.get()) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) {} + } else { + schedule(cs); + } + } + } + + public void beginSchedule() { + runSchedules.set(true); + } + + public void suspendSchedule() { + runSchedules.set(false); + } + + } + @Private public static final String ROOT_QUEUE = CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT; @@ -696,6 +785,9 @@ public class CapacityScheduler extends AbstractYarnScheduler LOG.debug("Node being looked for scheduling " + nm + " availableResource: " + node.getAvailableResource()); } + } + + private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // Assign new containers... // 1. Check for reserved applications @@ -708,7 +800,8 @@ public class CapacityScheduler extends AbstractYarnScheduler // Try to fulfill the reservation LOG.info("Trying to fulfill reservation for application " + - reservedApplication.getApplicationId() + " on node: " + nm); + reservedApplication.getApplicationId() + " on node: " + + node.getNodeID()); LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); CSAssignment assignment = queue.assignContainers(clusterResource, node); @@ -729,9 +822,16 @@ public class CapacityScheduler extends AbstractYarnScheduler // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { - root.assignContainers(clusterResource, node); + if (Resources.greaterThanOrEqual(calculator, getClusterResources(), + node.getAvailableResource(), minimumAllocation)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to schedule on node: " + node.getNodeName() + + ", available: " + node.getAvailableResource()); + } + root.assignContainers(clusterResource, node); + } } else { - LOG.info("Skipping scheduling since node " + nm + + LOG.info("Skipping scheduling since node " + node.getNodeID() + " is reserved by application " + node.getReservedContainer().getContainerId().getApplicationAttemptId() ); @@ -772,7 +872,11 @@ public class CapacityScheduler extends AbstractYarnScheduler case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - nodeUpdate(nodeUpdatedEvent.getRMNode()); + RMNode node = nodeUpdatedEvent.getRMNode(); + nodeUpdate(node); + if (!scheduleAsynchronously) { + allocateContainersToNode(getNode(node.getNodeID())); + } } break; case APP_ADDED: @@ -831,6 +935,10 @@ public class CapacityScheduler extends AbstractYarnScheduler ++numNodeManagers; LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); + + if (scheduleAsynchronously && numNodeManagers == 1) { + asyncSchedulerThread.beginSchedule(); + } } private synchronized void removeNode(RMNode nodeInfo) { @@ -842,6 +950,10 @@ public class CapacityScheduler extends AbstractYarnScheduler root.updateClusterResource(clusterResource); --numNodeManagers; + if (scheduleAsynchronously && numNodeManagers == 0) { + asyncSchedulerThread.suspendSchedule(); + } + // Remove running containers List runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { @@ -931,7 +1043,12 @@ public class CapacityScheduler extends AbstractYarnScheduler FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } - + + @Lock(Lock.NoLock.class) + Map getAllNodes() { + return nodes; + } + @Override public RMContainer getRMContainer(ContainerId containerId) { FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 267f819ffaf..6fe695ecda2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -135,6 +135,17 @@ public class CapacitySchedulerConfiguration extends Configuration { @Private public static final int DEFAULT_NODE_LOCALITY_DELAY = -1; + @Private + public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX = + PREFIX + "schedule-asynchronously"; + + @Private + public static final String SCHEDULE_ASYNCHRONOUSLY_ENABLE = + SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".enable"; + + @Private + public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false; + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -357,4 +368,14 @@ public class CapacitySchedulerConfiguration extends Configuration { resourceCalculatorClass, ResourceCalculator.class); } + + public boolean getScheduleAynschronously() { + return getBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, + DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE); + } + + public void setScheduleAynschronously(boolean async) { + setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 254ace487ba..71cf03e1490 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -19,20 +19,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; -import java.lang.reflect.Constructor; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; @@ -61,11 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -650,4 +642,30 @@ public class TestCapacityScheduler { cs.getSchedulerApplications(), cs, "a1"); Assert.assertEquals("a1", app.getQueue().getQueueName()); } - } + + @Test + public void testAsyncScheduling() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + final int NODES = 100; + + // Register nodes + for (int i=0; i < NODES; ++i) { + String host = "192.168.1." + i; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); + cs.handle(new NodeAddedSchedulerEvent(node)); + } + + // Now directly exercise the scheduling loop + for (int i=0; i < NODES; ++i) { + CapacityScheduler.schedule(cs); + } + } + +}