diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 55e2e1601ba..d6c026039fe 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -53,6 +53,9 @@ Release 2.6.0 - UNRELEASED YARN-2211. Persist AMRMToken master key in RMStateStore for RM recovery. (Xuan Gong via jianhe) + YARN-2328. FairScheduler: Verify update and continuous scheduling threads are + stopped when the scheduler is stopped. (kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 27a0075c1b2..4e1c244730a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -139,8 +139,11 @@ public class FairScheduler extends private final int UPDATE_DEBUG_FREQUENCY = 5; private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; - private Thread updateThread; - private Thread schedulingThread; + @VisibleForTesting + Thread updateThread; + + @VisibleForTesting + Thread schedulingThread; // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -243,16 +246,21 @@ public class FairScheduler extends } /** - * A runnable which calls {@link FairScheduler#update()} every + * Thread which calls {@link FairScheduler#update()} every * updateInterval milliseconds. */ - private class UpdateThread implements Runnable { + private class UpdateThread extends Thread { + + @Override public void run() { - while (true) { + while (!Thread.currentThread().isInterrupted()) { try { Thread.sleep(updateInterval); update(); preemptTasksIfNecessary(); + } catch (InterruptedException ie) { + LOG.warn("Update thread interrupted. Exiting."); + return; } catch (Exception e) { LOG.error("Exception in fair scheduler UpdateThread", e); } @@ -260,6 +268,26 @@ public class FairScheduler extends } } + /** + * Thread which attempts scheduling resources continuously, + * asynchronous to the node heartbeats. + */ + private class ContinuousSchedulingThread extends Thread { + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + continuousSchedulingAttempt(); + Thread.sleep(getContinuousSchedulingSleepMs()); + } catch (InterruptedException e) { + LOG.warn("Continuous scheduling thread interrupted. Exiting.", e); + return; + } + } + } + } + /** * Recompute the internal variables used by the scheduler - per-job weights, * fair shares, deficits, minimum slot allocations, and amount of used and @@ -970,7 +998,7 @@ public class FairScheduler extends } } - void continuousSchedulingAttempt() { + void continuousSchedulingAttempt() throws InterruptedException { List nodeIdList = new ArrayList(nodes.keySet()); // Sort the nodes by space available on them, so that we offer // containers on emptier nodes first, facilitating an even spread. This @@ -1229,30 +1257,14 @@ public class FairScheduler extends throw new IOException("Failed to start FairScheduler", e); } - updateThread = new Thread(new UpdateThread()); + updateThread = new UpdateThread(); updateThread.setName("FairSchedulerUpdateThread"); updateThread.setDaemon(true); if (continuousSchedulingEnabled) { // start continuous scheduling thread - schedulingThread = new Thread( - new Runnable() { - @Override - public void run() { - while (!Thread.currentThread().isInterrupted()) { - try { - continuousSchedulingAttempt(); - Thread.sleep(getContinuousSchedulingSleepMs()); - } catch (InterruptedException e) { - LOG.error("Continuous scheduling thread interrupted. Exiting. ", - e); - return; - } - } - } - } - ); - schedulingThread.setName("ContinuousScheduling"); + schedulingThread = new ContinuousSchedulingThread(); + schedulingThread.setName("FairSchedulerContinuousScheduling"); schedulingThread.setDaemon(true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 365c960c6ac..92e3906c8dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -3341,4 +3342,28 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.findLowestCommonAncestorQueue(a1Queue, b1Queue); assertEquals(ancestorQueue, queue1); } + + @Test + public void testThreadLifeCycle() throws InterruptedException { + conf.setBoolean( + FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true); + scheduler.init(conf); + scheduler.start(); + + Thread updateThread = scheduler.updateThread; + Thread schedulingThread = scheduler.schedulingThread; + + assertTrue(updateThread.isAlive()); + assertTrue(schedulingThread.isAlive()); + + scheduler.stop(); + + int numRetries = 100; + while (numRetries-- > 0 && + (updateThread.isAlive() || schedulingThread.isAlive())) { + Thread.sleep(50); + } + + assertNotEquals("One of the threads is still alive", 0, numRetries); + } }