diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 03e180dde70..3def27f964b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -17,6 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -32,9 +38,10 @@ public class SchedulingMonitor extends AbstractService { private final SchedulingEditPolicy scheduleEditPolicy; private static final Log LOG = LogFactory.getLog(SchedulingMonitor.class); - //thread which runs periodically to see the last time since a heartbeat is - //received. - private Thread checkerThread; + // ScheduledExecutorService which schedules the PreemptionChecker to run + // periodically. + private ScheduledExecutorService ses; + private ScheduledFuture handler; private volatile boolean stopped; private long monitorInterval; private RMContext rmContext; @@ -61,17 +68,25 @@ public class SchedulingMonitor extends AbstractService { @Override public void serviceStart() throws Exception { assert !stopped : "starting when already stopped"; - checkerThread = new Thread(new PreemptionChecker()); - checkerThread.setName(getName()); - checkerThread.start(); + ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(getName()); + return t; + } + }); + handler = ses.scheduleAtFixedRate(new PreemptionChecker(), + 0, monitorInterval, TimeUnit.MILLISECONDS); super.serviceStart(); } @Override public void serviceStop() throws Exception { stopped = true; - if (checkerThread != null) { - checkerThread.interrupt(); + if (handler != null) { + LOG.info("Stop " + getName()); + handler.cancel(true); + ses.shutdown(); } super.serviceStop(); } @@ -84,24 +99,12 @@ public class SchedulingMonitor extends AbstractService { private class PreemptionChecker implements Runnable { @Override public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { - try { - //invoke the preemption policy at a regular pace - //the policy will generate preemption or kill events - //managed by the dispatcher - invokePolicy(); - } catch (YarnRuntimeException e) { - LOG.error("YarnRuntimeException raised while executing preemption" - + " checker, skip this run..., exception=", e); - } - - // Wait before next run - try { - Thread.sleep(monitorInterval); - } catch (InterruptedException e) { - LOG.info(getName() + " thread interrupted"); - break; - } + try { + //invoke the preemption policy + invokePolicy(); + } catch (YarnRuntimeException e) { + LOG.error("YarnRuntimeException raised while executing preemption" + + " checker, skip this run..., exception=", e); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java index 9ec17da62b0..d6ac346b8f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Proportion import org.junit.Test; import static org.junit.Assert.fail; +import static org.mockito.Mockito.*; public class TestSchedulingMonitor { @@ -43,5 +44,23 @@ public class TestSchedulingMonitor { fail("ResourceManager does not start when " + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + " is set to true"); } + + SchedulingEditPolicy mPolicy = mock(SchedulingEditPolicy.class); + when(mPolicy.getMonitoringInterval()).thenReturn(1000L); + SchedulingMonitor monitor = new SchedulingMonitor(rm.getRMContext(), + mPolicy); + try { + monitor.serviceInit(conf); + monitor.serviceStart(); + } catch (Exception e) { + fail("SchedulingMonitor failes to start."); + } + verify(mPolicy, times(1)).editSchedule(); + try { + monitor.close(); + rm.close(); + } catch (Exception e) { + fail("Failed to close."); + } } }