From ac395be019296cff996fc9315308164db9dd3051 Mon Sep 17 00:00:00 2001 From: Naganarasimha Date: Sun, 25 Sep 2016 17:36:30 +0530 Subject: [PATCH] YARN-3877. YarnClientImpl.submitApplication swallows exceptions. Contributed by Varun Saxena (cherry picked from commit e4e72db5f9f305b493138ab36f073fe5d1750ad8) (cherry picked from commit 4c2b20ca3e98d5f63e29d734595d693ad3a20529) --- .../yarn/client/api/impl/YarnClientImpl.java | 13 +++-- .../yarn/client/api/impl/TestYarnClient.java | 52 +++++++++++++++++++ 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 6f3e55633de..359e7bbeb4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -291,9 +291,10 @@ public class YarnClientImpl extends YarnClient { try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { - LOG.error("Interrupted while waiting for application " - + applicationId - + " to be successfully submitted."); + String msg = "Interrupted while waiting for application " + + applicationId + " to be successfully submitted."; + LOG.error(msg); + throw new YarnException(msg, ie); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves @@ -409,8 +410,10 @@ public class YarnClientImpl extends YarnClient { Thread.sleep(asyncApiPollIntervalMillis); } } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for application " + applicationId - + " to be killed."); + String msg = "Interrupted while waiting for application " + + applicationId + " to be killed."; + LOG.error(msg); + throw new YarnException(msg, e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 2c57b361aca..7c8be41d4d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.lang.Thread.State; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -199,6 +200,57 @@ public class TestYarnClient { client.stop(); } + @SuppressWarnings("deprecation") + @Test (timeout = 20000) + public void testSubmitApplicationInterrupted() throws IOException { + Configuration conf = new Configuration(); + int pollIntervalMs = 1000; + conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, + pollIntervalMs); + try (final YarnClient client = new MockYarnClient()) { + client.init(conf); + client.start(); + // Submit the application and then interrupt it while its waiting + // for submission to be successful. + final class SubmitThread extends Thread { + private boolean isInterrupted = false; + @Override + public void run() { + ApplicationSubmissionContext context = + mock(ApplicationSubmissionContext.class); + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + when(context.getApplicationId()).thenReturn(applicationId); + ((MockYarnClient) client).setYarnApplicationState( + YarnApplicationState.NEW); + try { + client.submitApplication(context); + } catch (YarnException | IOException e) { + if (e instanceof YarnException && e.getCause() != null && + e.getCause() instanceof InterruptedException) { + isInterrupted = true; + } + } + } + } + SubmitThread appSubmitThread = new SubmitThread(); + appSubmitThread.start(); + try { + // Wait for thread to start and begin to sleep + // (enter TIMED_WAITING state). + while (appSubmitThread.getState() != State.TIMED_WAITING) { + Thread.sleep(pollIntervalMs / 2); + } + // Interrupt the thread. + appSubmitThread.interrupt(); + appSubmitThread.join(); + } catch (InterruptedException e) { + } + Assert.assertTrue("Expected an InterruptedException wrapped inside a " + + "YarnException", appSubmitThread.isInterrupted); + } + } + @Test (timeout = 30000) public void testSubmitIncorrectQueue() throws IOException { MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1);