From ae05623a75803d4e12a902ac4a24187540f56699 Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Tue, 1 Oct 2013 19:54:50 +0000 Subject: [PATCH] YARN-1010. FairScheduler: decouple container scheduling from nodemanager heartbeats. (Wei Yan via Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1528192 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/AppSchedulable.java | 17 +++- .../scheduler/fair/FSSchedulerApp.java | 56 ++++++++++++- .../scheduler/fair/FairScheduler.java | 78 ++++++++++++++++++- .../fair/FairSchedulerConfiguration.java | 32 ++++++++ .../scheduler/fair/TestFSSchedulerApp.java | 71 +++++++++++++++++ .../scheduler/fair/TestFairScheduler.java | 57 ++++++++++++++ 7 files changed, 305 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 191cf58be67..f2a42d056b0 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -26,6 +26,9 @@ Release 2.3.0 - UNRELEASED YARN-1021. Yarn Scheduler Load Simulator. (ywskycn via tucu) + YARN-1010. FairScheduler: decouple container scheduling from nodemanager + heartbeats. (Wei Yan via Sandy Ryza) + IMPROVEMENTS YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 415f125e1b6..baf5db21c81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -310,10 +310,19 @@ public class AppSchedulable extends Schedulable { + localRequest); } - NodeType allowedLocality = app.getAllowedLocalityLevel(priority, - scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(), - scheduler.getRackLocalityThreshold()); - + NodeType allowedLocality; + if (scheduler.isContinuousSchedulingEnabled()) { + allowedLocality = app.getAllowedLocalityLevelByTime(priority, + scheduler.getNodeLocalityDelayMs(), + scheduler.getRackLocalityDelayMs(), + scheduler.getClock().getTime()); + } else { + allowedLocality = app.getAllowedLocalityLevel(priority, + scheduler.getNumClusterNodes(), + scheduler.getNodeLocalityThreshold(), + scheduler.getRackLocalityThreshold()); + } + if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 && localRequest != null && localRequest.getNumContainers() != 0) { return assignContainer(node, priority, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 670e9616a81..1fe400ee07d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -464,7 +464,12 @@ public class FSSchedulerApp extends SchedulerApplication { * @param priority The priority of the container scheduled. */ synchronized public void resetSchedulingOpportunities(Priority priority) { - lastScheduledContainer.put(priority, System.currentTimeMillis()); + resetSchedulingOpportunities(priority, System.currentTimeMillis()); + } + // used for continuous scheduling + synchronized public void resetSchedulingOpportunities(Priority priority, + long currentTimeMs) { + lastScheduledContainer.put(priority, currentTimeMs); schedulingOpportunities.setCount(priority, 0); } @@ -513,6 +518,55 @@ public class FSSchedulerApp extends SchedulerApplication { return allowedLocalityLevel.get(priority); } + /** + * Return the level at which we are allowed to schedule containers. + * Given the thresholds indicating how much time passed before relaxing + * scheduling constraints. + */ + public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority, + long nodeLocalityDelayMs, long rackLocalityDelayMs, + long currentTimeMs) { + + // if not being used, can schedule anywhere + if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { + return NodeType.OFF_SWITCH; + } + + // default level is NODE_LOCAL + if (! allowedLocalityLevel.containsKey(priority)) { + allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); + return NodeType.NODE_LOCAL; + } + + NodeType allowed = allowedLocalityLevel.get(priority); + + // if level is already most liberal, we're done + if (allowed.equals(NodeType.OFF_SWITCH)) { + return NodeType.OFF_SWITCH; + } + + // check waiting time + long waitTime = currentTimeMs; + if (lastScheduledContainer.containsKey(priority)) { + waitTime -= lastScheduledContainer.get(priority); + } else { + waitTime -= appSchedulable.getStartTime(); + } + + long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ? + nodeLocalityDelayMs : rackLocalityDelayMs; + + if (waitTime > thresholdTime) { + if (allowed.equals(NodeType.NODE_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(priority, currentTimeMs); + } else if (allowed.equals(NodeType.RACK_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(priority, currentTimeMs); + } + } + return allowedLocalityLevel.get(priority); + } synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 16e7fd695af..fa4c21805be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -179,8 +179,12 @@ public class FairScheduler implements ResourceScheduler { protected boolean preemptionEnabled; protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster + protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not + protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling protected double nodeLocalityThreshold; // Cluster threshold for node locality protected double rackLocalityThreshold; // Cluster threshold for rack locality + protected long nodeLocalityDelayMs; // Delay for node locality + protected long rackLocalityDelayMs; // Delay for rack locality private FairSchedulerEventLog eventLog; // Machine-readable event log protected boolean assignMultiple; // Allocate multiple containers per // heartbeat @@ -582,6 +586,22 @@ public class FairScheduler implements ResourceScheduler { return rackLocalityThreshold; } + public long getNodeLocalityDelayMs() { + return nodeLocalityDelayMs; + } + + public long getRackLocalityDelayMs() { + return rackLocalityDelayMs; + } + + public boolean isContinuousSchedulingEnabled() { + return continuousSchedulingEnabled; + } + + public synchronized int getContinuousSchedulingSleepMs() { + return continuousSchedulingSleepMs; + } + public Resource getClusterCapacity() { return clusterCapacity; } @@ -907,6 +927,37 @@ public class FairScheduler implements ResourceScheduler { completedContainer, RMContainerEventType.FINISHED); } + if (continuousSchedulingEnabled) { + if (!completedContainers.isEmpty()) { + attemptScheduling(node); + } + } else { + attemptScheduling(node); + } + } + + private void continuousScheduling() { + while (true) { + for (FSSchedulerNode node : nodes.values()) { + try { + if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { + attemptScheduling(node); + } + } catch (Throwable ex) { + LOG.warn("Error while attempting scheduling for node " + node + ": " + + ex.toString(), ex); + } + } + try { + Thread.sleep(getContinuousSchedulingSleepMs()); + } catch (InterruptedException e) { + LOG.warn("Error while doing sleep in continuous scheduling: " + + e.toString(), e); + } + } + } + + private synchronized void attemptScheduling(FSSchedulerNode node) { // Assign new containers... // 1. Check for reserved applications // 2. Schedule if there are no reservations @@ -914,19 +965,18 @@ public class FairScheduler implements ResourceScheduler { AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { Priority reservedPriority = node.getReservedContainer().getReservedPriority(); - if (reservedAppSchedulable != null && - !reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { + if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { // Don't hold the reservation if app can no longer use it LOG.info("Releasing reservation that cannot be satisfied for application " + reservedAppSchedulable.getApp().getApplicationAttemptId() - + " on node " + nm); + + " on node " + node); reservedAppSchedulable.unreserve(reservedPriority, node); reservedAppSchedulable = null; } else { // Reservation exists; try to fulfill the reservation LOG.info("Trying to fulfill reservation for application " + reservedAppSchedulable.getApp().getApplicationAttemptId() - + " on node: " + nm); + + " on node: " + node); node.getReservedAppSchedulable().assignReservedContainer(node); } @@ -1060,8 +1110,13 @@ public class FairScheduler implements ResourceScheduler { maximumAllocation = this.conf.getMaximumAllocation(); incrAllocation = this.conf.getIncrementAllocation(); userAsDefaultQueue = this.conf.getUserAsDefaultQueue(); + continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); + continuousSchedulingSleepMs = + this.conf.getContinuousSchedulingSleepMs(); nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); rackLocalityThreshold = this.conf.getLocalityThresholdRack(); + nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); + rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); preemptionEnabled = this.conf.getPreemptionEnabled(); assignMultiple = this.conf.getAssignMultiple(); maxAssign = this.conf.getMaxAssign(); @@ -1088,6 +1143,21 @@ public class FairScheduler implements ResourceScheduler { updateThread.setName("FairSchedulerUpdateThread"); updateThread.setDaemon(true); updateThread.start(); + + if (continuousSchedulingEnabled) { + // start continuous scheduling thread + Thread schedulingThread = new Thread( + new Runnable() { + @Override + public void run() { + continuousScheduling(); + } + } + ); + schedulingThread.setName("ContinuousScheduling"); + schedulingThread.setDaemon(true); + schedulingThread.start(); + } } else { try { queueMgr.reloadAllocs(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index acdd40e26ae..a3de8942ea4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -66,6 +66,22 @@ public class FairSchedulerConfiguration extends Configuration { protected static final float DEFAULT_LOCALITY_THRESHOLD_RACK = DEFAULT_LOCALITY_THRESHOLD; + /** Delay for node locality. */ + protected static final String LOCALITY_DELAY_NODE_MS = CONF_PREFIX + "locality-delay-node-ms"; + protected static final long DEFAULT_LOCALITY_DELAY_NODE_MS = -1L; + + /** Delay for rack locality. */ + protected static final String LOCALITY_DELAY_RACK_MS = CONF_PREFIX + "locality-delay-rack-ms"; + protected static final long DEFAULT_LOCALITY_DELAY_RACK_MS = -1L; + + /** Enable continuous scheduling or not. */ + protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + "continuous-scheduling-enabled"; + protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false; + + /** Sleep time of each pass in continuous scheduling (5ms in default) */ + protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX + "continuous-scheduling-sleep-ms"; + protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5; + /** Whether preemption is enabled. */ protected static final String PREEMPTION = CONF_PREFIX + "preemption"; protected static final boolean DEFAULT_PREEMPTION = false; @@ -134,6 +150,22 @@ public class FairSchedulerConfiguration extends Configuration { return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK); } + public boolean isContinuousSchedulingEnabled() { + return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, DEFAULT_CONTINUOUS_SCHEDULING_ENABLED); + } + + public int getContinuousSchedulingSleepMs() { + return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS); + } + + public long getLocalityDelayNodeMs() { + return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS); + } + + public long getLocalityDelayRackMs() { + return getLong(LOCALITY_DELAY_RACK_MS, DEFAULT_LOCALITY_DELAY_RACK_MS); + } + public boolean getPreemptionEnabled() { return getBoolean(PREEMPTION, DEFAULT_PREEMPTION); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java index 8a53bd036be..491235e7605 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java @@ -25,11 +25,25 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.util.Clock; import org.junit.Test; import org.mockito.Mockito; public class TestFSSchedulerApp { + private class MockClock implements Clock { + private long time = 0; + @Override + public long getTime() { + return time; + } + + public void tick(int seconds) { + time = time + seconds * 1000; + } + + } + private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); ApplicationAttemptId attId = @@ -93,6 +107,63 @@ public class TestFSSchedulerApp { prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); } + @Test + public void testDelaySchedulingForContinuousScheduling() + throws InterruptedException { + Queue queue = Mockito.mock(Queue.class); + Priority prio = Mockito.mock(Priority.class); + Mockito.when(prio.getPriority()).thenReturn(1); + + MockClock clock = new MockClock(); + + long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds + long rackLocalityDelayMs = 6 * 1000L; // 6 seconds + + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + FSSchedulerApp schedulerApp = + new FSSchedulerApp(applicationAttemptId, "user1", queue, + null, null); + AppSchedulable appSchedulable = Mockito.mock(AppSchedulable.class); + long startTime = clock.getTime(); + Mockito.when(appSchedulable.getStartTime()).thenReturn(startTime); + schedulerApp.setAppSchedulable(appSchedulable); + + // Default level should be node-local + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // after 4 seconds should remain node local + clock.tick(4); + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // after 6 seconds should switch to rack local + clock.tick(2); + assertEquals(NodeType.RACK_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // manually set back to node local + schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL); + schedulerApp.resetSchedulingOpportunities(prio, clock.getTime()); + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // Now escalate again to rack-local, then to off-switch + clock.tick(6); + assertEquals(NodeType.RACK_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + clock.tick(7); + assertEquals(NodeType.OFF_SWITCH, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + } + @Test /** * Ensure that when negative paramaters are given (signaling delay scheduling diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 30f874ba152..84c9a37d5d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -298,6 +299,14 @@ public class TestFairScheduler { conf.setBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT, true); conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE, .5); conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7); + conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, + true); + conf.setInt(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, + 10); + conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS, + 5000); + conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS, + 5000); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, @@ -308,6 +317,11 @@ public class TestFairScheduler { Assert.assertEquals(true, scheduler.sizeBasedWeight); Assert.assertEquals(.5, scheduler.nodeLocalityThreshold, .01); Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01); + Assert.assertTrue("The continuous scheduling should be enabled", + scheduler.continuousSchedulingEnabled); + Assert.assertEquals(10, scheduler.continuousSchedulingSleepMs); + Assert.assertEquals(5000, scheduler.nodeLocalityDelayMs); + Assert.assertEquals(5000, scheduler.rackLocalityDelayMs); Assert.assertEquals(1024, scheduler.getMaximumResourceCapability().getMemory()); Assert.assertEquals(512, scheduler.getMinimumResourceCapability().getMemory()); Assert.assertEquals(128, @@ -2255,4 +2269,47 @@ public class TestFairScheduler { fs.applications, FSSchedulerApp.class); } + @Test + public void testContinuousScheduling() throws Exception { + // set continuous scheduling enabled + FairScheduler fs = new FairScheduler(); + Configuration conf = createConfiguration(); + conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, + true); + fs.reinitialize(conf, resourceManager.getRMContext()); + Assert.assertTrue("Continuous scheduling should be enabled.", + fs.isContinuousSchedulingEnabled()); + + // Add one node + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + fs.handle(nodeEvent1); + + // available resource + Assert.assertEquals(fs.getClusterCapacity().getMemory(), 8 * 1024); + Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 8); + + // send application request + ApplicationAttemptId appAttemptId = + createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + fs.addApplication(appAttemptId, "queue11", "user11"); + List ask = new ArrayList(); + ResourceRequest request = + createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); + ask.add(request); + fs.allocate(appAttemptId, ask, new ArrayList(), null, null); + + // waiting for continuous_scheduler_sleep_time + // at least one pass + Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); + + // check consumption + Resource consumption = + fs.applications.get(appAttemptId).getCurrentConsumption(); + Assert.assertEquals(1024, consumption.getMemory()); + Assert.assertEquals(1, consumption.getVirtualCores()); + } + }