YARN-3877. YarnClientImpl.submitApplication swallows exceptions. Contributed by Varun Saxena

(cherry picked from commit e4e72db5f9)
(cherry picked from commit 4c2b20ca3e)
This commit is contained in:
Naganarasimha 2016-09-25 17:36:30 +05:30 committed by Zhe Zhang
parent 165febbc84
commit ac395be019
2 changed files with 60 additions and 5 deletions

View File

@ -291,9 +291,10 @@ public class YarnClientImpl extends YarnClient {
try { try {
Thread.sleep(submitPollIntervalMillis); Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.error("Interrupted while waiting for application " String msg = "Interrupted while waiting for application "
+ applicationId + applicationId + " to be successfully submitted.";
+ " to be successfully submitted."); LOG.error(msg);
throw new YarnException(msg, ie);
} }
} catch (ApplicationNotFoundException ex) { } catch (ApplicationNotFoundException ex) {
// FailOver or RM restart happens before RMStateStore saves // FailOver or RM restart happens before RMStateStore saves
@ -409,8 +410,10 @@ public class YarnClientImpl extends YarnClient {
Thread.sleep(asyncApiPollIntervalMillis); Thread.sleep(asyncApiPollIntervalMillis);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("Interrupted while waiting for application " + applicationId String msg = "Interrupted while waiting for application "
+ " to be killed."); + applicationId + " to be killed.";
LOG.error(msg);
throw new YarnException(msg, e);
} }
} }

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.lang.Thread.State;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
@ -199,6 +200,57 @@ public class TestYarnClient {
client.stop(); client.stop();
} }
@SuppressWarnings("deprecation")
@Test (timeout = 20000)
public void testSubmitApplicationInterrupted() throws IOException {
Configuration conf = new Configuration();
int pollIntervalMs = 1000;
conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
pollIntervalMs);
try (final YarnClient client = new MockYarnClient()) {
client.init(conf);
client.start();
// Submit the application and then interrupt it while its waiting
// for submission to be successful.
final class SubmitThread extends Thread {
private boolean isInterrupted = false;
@Override
public void run() {
ApplicationSubmissionContext context =
mock(ApplicationSubmissionContext.class);
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
when(context.getApplicationId()).thenReturn(applicationId);
((MockYarnClient) client).setYarnApplicationState(
YarnApplicationState.NEW);
try {
client.submitApplication(context);
} catch (YarnException | IOException e) {
if (e instanceof YarnException && e.getCause() != null &&
e.getCause() instanceof InterruptedException) {
isInterrupted = true;
}
}
}
}
SubmitThread appSubmitThread = new SubmitThread();
appSubmitThread.start();
try {
// Wait for thread to start and begin to sleep
// (enter TIMED_WAITING state).
while (appSubmitThread.getState() != State.TIMED_WAITING) {
Thread.sleep(pollIntervalMs / 2);
}
// Interrupt the thread.
appSubmitThread.interrupt();
appSubmitThread.join();
} catch (InterruptedException e) {
}
Assert.assertTrue("Expected an InterruptedException wrapped inside a " +
"YarnException", appSubmitThread.isInterrupted);
}
}
@Test (timeout = 30000) @Test (timeout = 30000)
public void testSubmitIncorrectQueue() throws IOException { public void testSubmitIncorrectQueue() throws IOException {
MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1); MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1);