diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 7760521f885..80e453fccfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -306,9 +306,10 @@ public class YarnClientImpl extends YarnClient { try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { - LOG.error("Interrupted while waiting for application " - + applicationId - + " to be successfully submitted."); + String msg = "Interrupted while waiting for application " + + applicationId + " to be successfully submitted."; + LOG.error(msg); + throw new YarnException(msg, ie); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves @@ -446,8 +447,10 @@ public class YarnClientImpl extends YarnClient { Thread.sleep(asyncApiPollIntervalMillis); } } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for application " + applicationId - + " to be killed."); + String msg = "Interrupted while waiting for application " + + applicationId + " to be killed."; + LOG.error(msg); + throw new YarnException(msg, e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index a68376f410c..9b61dcbb780 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.lang.Thread.State; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -201,6 +202,57 @@ public class TestYarnClient { client.stop(); } + @SuppressWarnings("deprecation") + @Test (timeout = 20000) + public void testSubmitApplicationInterrupted() throws IOException { + Configuration conf = new Configuration(); + int pollIntervalMs = 1000; + conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, + pollIntervalMs); + try (final YarnClient client = new MockYarnClient()) { + client.init(conf); + client.start(); + // Submit the application and then interrupt it while its waiting + // for submission to be successful. + final class SubmitThread extends Thread { + private boolean isInterrupted = false; + @Override + public void run() { + ApplicationSubmissionContext context = + mock(ApplicationSubmissionContext.class); + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + when(context.getApplicationId()).thenReturn(applicationId); + ((MockYarnClient) client).setYarnApplicationState( + YarnApplicationState.NEW); + try { + client.submitApplication(context); + } catch (YarnException | IOException e) { + if (e instanceof YarnException && e.getCause() != null && + e.getCause() instanceof InterruptedException) { + isInterrupted = true; + } + } + } + } + SubmitThread appSubmitThread = new SubmitThread(); + appSubmitThread.start(); + try { + // Wait for thread to start and begin to sleep + // (enter TIMED_WAITING state). + while (appSubmitThread.getState() != State.TIMED_WAITING) { + Thread.sleep(pollIntervalMs / 2); + } + // Interrupt the thread. + appSubmitThread.interrupt(); + appSubmitThread.join(); + } catch (InterruptedException e) { + } + Assert.assertTrue("Expected an InterruptedException wrapped inside a " + + "YarnException", appSubmitThread.isInterrupted); + } + } + @Test (timeout = 30000) public void testSubmitIncorrectQueueToCapacityScheduler() throws IOException { MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1);