YARN-5543. ResourceManager SchedulingMonitor could potentially terminate the preemption checker thread. Contributed by Min Shen.

(cherry picked from commit 2ada100da7)
This commit is contained in:
Min Shen 2017-05-11 11:00:28 -07:00 committed by Konstantin V Shvachko
parent d4ab50f9b0
commit aae0600a67
2 changed files with 48 additions and 26 deletions

View File

@ -17,6 +17,12 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.monitor; package org.apache.hadoop.yarn.server.resourcemanager.monitor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -32,9 +38,10 @@ public class SchedulingMonitor extends AbstractService {
private final SchedulingEditPolicy scheduleEditPolicy; private final SchedulingEditPolicy scheduleEditPolicy;
private static final Log LOG = LogFactory.getLog(SchedulingMonitor.class); private static final Log LOG = LogFactory.getLog(SchedulingMonitor.class);
//thread which runs periodically to see the last time since a heartbeat is // ScheduledExecutorService which schedules the PreemptionChecker to run
//received. // periodically.
private Thread checkerThread; private ScheduledExecutorService ses;
private ScheduledFuture<?> handler;
private volatile boolean stopped; private volatile boolean stopped;
private long monitorInterval; private long monitorInterval;
private RMContext rmContext; private RMContext rmContext;
@ -61,17 +68,25 @@ public class SchedulingMonitor extends AbstractService {
@Override @Override
public void serviceStart() throws Exception { public void serviceStart() throws Exception {
assert !stopped : "starting when already stopped"; assert !stopped : "starting when already stopped";
checkerThread = new Thread(new PreemptionChecker()); ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
checkerThread.setName(getName()); public Thread newThread(Runnable r) {
checkerThread.start(); Thread t = new Thread(r);
t.setName(getName());
return t;
}
});
handler = ses.scheduleAtFixedRate(new PreemptionChecker(),
0, monitorInterval, TimeUnit.MILLISECONDS);
super.serviceStart(); super.serviceStart();
} }
@Override @Override
public void serviceStop() throws Exception { public void serviceStop() throws Exception {
stopped = true; stopped = true;
if (checkerThread != null) { if (handler != null) {
checkerThread.interrupt(); LOG.info("Stop " + getName());
handler.cancel(true);
ses.shutdown();
} }
super.serviceStop(); super.serviceStop();
} }
@ -84,25 +99,13 @@ public class SchedulingMonitor extends AbstractService {
private class PreemptionChecker implements Runnable { private class PreemptionChecker implements Runnable {
@Override @Override
public void run() { public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
try { try {
//invoke the preemption policy at a regular pace //invoke the preemption policy
//the policy will generate preemption or kill events
//managed by the dispatcher
invokePolicy(); invokePolicy();
} catch (YarnRuntimeException e) { } catch (YarnRuntimeException e) {
LOG.error("YarnRuntimeException raised while executing preemption" LOG.error("YarnRuntimeException raised while executing preemption"
+ " checker, skip this run..., exception=", e); + " checker, skip this run..., exception=", e);
} }
// Wait before next run
try {
Thread.sleep(monitorInterval);
} catch (InterruptedException e) {
LOG.info(getName() + " thread interrupted");
break;
}
}
} }
} }
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Proportion
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
public class TestSchedulingMonitor { public class TestSchedulingMonitor {
@ -43,5 +44,23 @@ public class TestSchedulingMonitor {
fail("ResourceManager does not start when " + fail("ResourceManager does not start when " +
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + " is set to true"); YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + " is set to true");
} }
SchedulingEditPolicy mPolicy = mock(SchedulingEditPolicy.class);
when(mPolicy.getMonitoringInterval()).thenReturn(1000L);
SchedulingMonitor monitor = new SchedulingMonitor(rm.getRMContext(),
mPolicy);
try {
monitor.serviceInit(conf);
monitor.serviceStart();
} catch (Exception e) {
fail("SchedulingMonitor failes to start.");
}
verify(mPolicy, times(1)).editSchedule();
try {
monitor.close();
rm.close();
} catch (Exception e) {
fail("Failed to close.");
}
} }
} }