YARN-3877. YarnClientImpl.submitApplication swallows exceptions. Contributed by Varun Saxena
(cherry picked from commit e4e72db5f9
)
This commit is contained in:
parent
74962e3451
commit
eec0afa098
|
@ -306,9 +306,10 @@ public class YarnClientImpl extends YarnClient {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(submitPollIntervalMillis);
|
Thread.sleep(submitPollIntervalMillis);
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
LOG.error("Interrupted while waiting for application "
|
String msg = "Interrupted while waiting for application "
|
||||||
+ applicationId
|
+ applicationId + " to be successfully submitted.";
|
||||||
+ " to be successfully submitted.");
|
LOG.error(msg);
|
||||||
|
throw new YarnException(msg, ie);
|
||||||
}
|
}
|
||||||
} catch (ApplicationNotFoundException ex) {
|
} catch (ApplicationNotFoundException ex) {
|
||||||
// FailOver or RM restart happens before RMStateStore saves
|
// FailOver or RM restart happens before RMStateStore saves
|
||||||
|
@ -446,8 +447,10 @@ public class YarnClientImpl extends YarnClient {
|
||||||
Thread.sleep(asyncApiPollIntervalMillis);
|
Thread.sleep(asyncApiPollIntervalMillis);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.error("Interrupted while waiting for application " + applicationId
|
String msg = "Interrupted while waiting for application "
|
||||||
+ " to be killed.");
|
+ applicationId + " to be killed.";
|
||||||
|
LOG.error(msg);
|
||||||
|
throw new YarnException(msg, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.Thread.State;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -201,6 +202,57 @@ public class TestYarnClient {
|
||||||
client.stop();
|
client.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
@Test (timeout = 20000)
|
||||||
|
public void testSubmitApplicationInterrupted() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
int pollIntervalMs = 1000;
|
||||||
|
conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
|
||||||
|
pollIntervalMs);
|
||||||
|
try (final YarnClient client = new MockYarnClient()) {
|
||||||
|
client.init(conf);
|
||||||
|
client.start();
|
||||||
|
// Submit the application and then interrupt it while its waiting
|
||||||
|
// for submission to be successful.
|
||||||
|
final class SubmitThread extends Thread {
|
||||||
|
private boolean isInterrupted = false;
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
ApplicationSubmissionContext context =
|
||||||
|
mock(ApplicationSubmissionContext.class);
|
||||||
|
ApplicationId applicationId = ApplicationId.newInstance(
|
||||||
|
System.currentTimeMillis(), 1);
|
||||||
|
when(context.getApplicationId()).thenReturn(applicationId);
|
||||||
|
((MockYarnClient) client).setYarnApplicationState(
|
||||||
|
YarnApplicationState.NEW);
|
||||||
|
try {
|
||||||
|
client.submitApplication(context);
|
||||||
|
} catch (YarnException | IOException e) {
|
||||||
|
if (e instanceof YarnException && e.getCause() != null &&
|
||||||
|
e.getCause() instanceof InterruptedException) {
|
||||||
|
isInterrupted = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SubmitThread appSubmitThread = new SubmitThread();
|
||||||
|
appSubmitThread.start();
|
||||||
|
try {
|
||||||
|
// Wait for thread to start and begin to sleep
|
||||||
|
// (enter TIMED_WAITING state).
|
||||||
|
while (appSubmitThread.getState() != State.TIMED_WAITING) {
|
||||||
|
Thread.sleep(pollIntervalMs / 2);
|
||||||
|
}
|
||||||
|
// Interrupt the thread.
|
||||||
|
appSubmitThread.interrupt();
|
||||||
|
appSubmitThread.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
Assert.assertTrue("Expected an InterruptedException wrapped inside a " +
|
||||||
|
"YarnException", appSubmitThread.isInterrupted);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
public void testSubmitIncorrectQueueToCapacityScheduler() throws IOException {
|
public void testSubmitIncorrectQueueToCapacityScheduler() throws IOException {
|
||||||
MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1);
|
MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1);
|
||||||
|
|
Loading…
Reference in New Issue