YARN-3697. FairScheduler: ContinuousSchedulingThread can fail to shutdown. (Zhihai Xu via kasha)

(cherry picked from commit 332b520a48)
This commit is contained in:
Karthik Kambatla 2015-09-13 18:07:43 -07:00
parent 11e2fa151c
commit 1f9f219062
4 changed files with 43 additions and 0 deletions

View File

@ -848,6 +848,9 @@ Release 2.7.2 - UNRELEASED
YARN-4136. LinuxContainerExecutor loses info when forwarding YARN-4136. LinuxContainerExecutor loses info when forwarding
ResourceHandlerException. (Bibin A Chundatt via vvasudev) ResourceHandlerException. (Bibin A Chundatt via vvasudev)
YARN-3697. FairScheduler: ContinuousSchedulingThread can fail to shutdown.
(Zhihai Xu via kasha)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06

View File

@ -50,7 +50,9 @@ public class TestAsyncDispatcher {
disp.waitForEventThreadToWait(); disp.waitForEventThreadToWait();
try { try {
disp.getEventHandler().handle(event); disp.getEventHandler().handle(event);
Assert.fail("Expected YarnRuntimeException");
} catch (YarnRuntimeException e) { } catch (YarnRuntimeException e) {
Assert.assertTrue(e.getCause() instanceof InterruptedException);
} }
// Queue should be empty and dispatcher should not hang on close // Queue should be empty and dispatcher should not hang on close
Assert.assertTrue("Event Queue should have been empty", Assert.assertTrue("Event Queue should have been empty",

View File

@ -1043,6 +1043,13 @@ public class FairScheduler extends
} catch (Throwable ex) { } catch (Throwable ex) {
LOG.error("Error while attempting scheduling for node " + node + LOG.error("Error while attempting scheduling for node " + node +
": " + ex.toString(), ex); ": " + ex.toString(), ex);
if ((ex instanceof YarnRuntimeException) &&
(ex.getCause() instanceof InterruptedException)) {
// AsyncDispatcher translates InterruptedException to
// YarnRuntimeException with cause InterruptedException.
// Need to throw InterruptedException to stop schedulingThread.
throw (InterruptedException)ex.getCause();
}
} }
} }

View File

@ -27,7 +27,10 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
@ -4319,6 +4322,34 @@ public class TestFairScheduler extends FairSchedulerTestBase {
} }
} }
@Test
public void testContinuousSchedulingInterruptedException()
throws Exception {
scheduler.init(conf);
scheduler.start();
FairScheduler spyScheduler = spy(scheduler);
Assert.assertTrue("Continuous scheduling should be disabled.",
!spyScheduler.isContinuousSchedulingEnabled());
// Add one nodes
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
spyScheduler.handle(nodeEvent1);
Assert.assertEquals("We should have one alive node.",
1, spyScheduler.getNumClusterNodes());
InterruptedException ie = new InterruptedException();
doThrow(new YarnRuntimeException(ie)).when(spyScheduler).
attemptScheduling(isA(FSSchedulerNode.class));
// Invoke the continuous scheduling once
try {
spyScheduler.continuousSchedulingAttempt();
fail("Expected InterruptedException to stop schedulingThread");
} catch (InterruptedException e) {
Assert.assertEquals(ie, e);
}
}
@Test @Test
public void testSchedulingOnRemovedNode() throws Exception { public void testSchedulingOnRemovedNode() throws Exception {
// Disable continuous scheduling, will invoke continuous scheduling manually // Disable continuous scheduling, will invoke continuous scheduling manually