YARN-5543. ResourceManager SchedulingMonitor could potentially terminate the preemption checker thread. Contributed by Min Shen.

This commit is contained in:
Min Shen 2017-05-11 11:00:28 -07:00 committed by Konstantin V Shvachko
parent 09f28da2d2
commit 2ada100da7
2 changed files with 48 additions and 26 deletions

View File

@ -17,6 +17,12 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -32,9 +38,10 @@ public class SchedulingMonitor extends AbstractService {
private final SchedulingEditPolicy scheduleEditPolicy;
private static final Log LOG = LogFactory.getLog(SchedulingMonitor.class);
//thread which runs periodically to see the last time since a heartbeat is
//received.
private Thread checkerThread;
// ScheduledExecutorService which schedules the PreemptionChecker to run
// periodically.
private ScheduledExecutorService ses;
private ScheduledFuture<?> handler;
private volatile boolean stopped;
private long monitorInterval;
private RMContext rmContext;
@ -61,17 +68,25 @@ public class SchedulingMonitor extends AbstractService {
@Override
public void serviceStart() throws Exception {
assert !stopped : "starting when already stopped";
checkerThread = new Thread(new PreemptionChecker());
checkerThread.setName(getName());
checkerThread.start();
ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(getName());
return t;
}
});
handler = ses.scheduleAtFixedRate(new PreemptionChecker(),
0, monitorInterval, TimeUnit.MILLISECONDS);
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
stopped = true;
if (checkerThread != null) {
checkerThread.interrupt();
if (handler != null) {
LOG.info("Stop " + getName());
handler.cancel(true);
ses.shutdown();
}
super.serviceStop();
}
@ -84,24 +99,12 @@ public class SchedulingMonitor extends AbstractService {
private class PreemptionChecker implements Runnable {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
//invoke the preemption policy at a regular pace
//the policy will generate preemption or kill events
//managed by the dispatcher
invokePolicy();
} catch (YarnRuntimeException e) {
LOG.error("YarnRuntimeException raised while executing preemption"
+ " checker, skip this run..., exception=", e);
}
// Wait before next run
try {
Thread.sleep(monitorInterval);
} catch (InterruptedException e) {
LOG.info(getName() + " thread interrupted");
break;
}
try {
//invoke the preemption policy
invokePolicy();
} catch (YarnRuntimeException e) {
LOG.error("YarnRuntimeException raised while executing preemption"
+ " checker, skip this run..., exception=", e);
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Proportion
import org.junit.Test;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
public class TestSchedulingMonitor {
@ -43,5 +44,23 @@ public class TestSchedulingMonitor {
fail("ResourceManager does not start when " +
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + " is set to true");
}
SchedulingEditPolicy mPolicy = mock(SchedulingEditPolicy.class);
when(mPolicy.getMonitoringInterval()).thenReturn(1000L);
SchedulingMonitor monitor = new SchedulingMonitor(rm.getRMContext(),
mPolicy);
try {
monitor.serviceInit(conf);
monitor.serviceStart();
} catch (Exception e) {
fail("SchedulingMonitor failes to start.");
}
verify(mPolicy, times(1)).editSchedule();
try {
monitor.close();
rm.close();
} catch (Exception e) {
fail("Failed to close.");
}
}
}