Merging r1510070 from trunk to branch-2 for YARN-994. HeartBeat thread in AMRMClientAsync does not handle runtime exception correctly (Xuan Gong via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1510071 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9b05d132bf
commit
299b910a01
|
@ -45,6 +45,9 @@ Release 2.1.1-beta - UNRELEASED
|
|||
YARN-906. Fixed a bug in NodeManager where cancelling ContainerLaunch at
|
||||
KILLING state causes that the container to hang. (Zhijie Shen via vinodkv)
|
||||
|
||||
YARN-994. HeartBeat thread in AMRMClientAsync does not handle runtime
|
||||
exception correctly (Xuan Gong via bikas)
|
||||
|
||||
Release 2.1.0-beta - 2013-08-06
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -65,7 +65,7 @@ extends AMRMClientAsync<T> {
|
|||
private volatile boolean keepRunning;
|
||||
private volatile float progress;
|
||||
|
||||
private volatile Exception savedException;
|
||||
private volatile Throwable savedException;
|
||||
|
||||
public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) {
|
||||
this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
|
||||
|
@ -222,18 +222,12 @@ extends AMRMClientAsync<T> {
|
|||
|
||||
try {
|
||||
response = client.allocate(progress);
|
||||
} catch (YarnException ex) {
|
||||
LOG.error("Yarn exception on heartbeat", ex);
|
||||
} catch (Throwable ex) {
|
||||
LOG.error("Exception on heartbeat", ex);
|
||||
savedException = ex;
|
||||
// interrupt handler thread in case it waiting on the queue
|
||||
handlerThread.interrupt();
|
||||
return;
|
||||
} catch (IOException e) {
|
||||
LOG.error("IO exception on heartbeat", e);
|
||||
savedException = e;
|
||||
// interrupt handler thread in case it waiting on the queue
|
||||
handlerThread.interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (response != null) {
|
||||
|
|
|
@ -277,6 +277,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
|
||||
String appMessage, String appTrackingUrl) throws YarnException,
|
||||
IOException {
|
||||
Preconditions.checkArgument(appStatus != null,
|
||||
"AppStatus should not be null.");
|
||||
FinishApplicationMasterRequest request =
|
||||
FinishApplicationMasterRequest.newInstance(appStatus, appMessage,
|
||||
appTrackingUrl);
|
||||
|
|
|
@ -159,14 +159,26 @@ public class TestAMRMClientAsync {
|
|||
|
||||
@Test(timeout=10000)
|
||||
public void testAMRMClientAsyncException() throws Exception {
|
||||
String exStr = "TestException";
|
||||
YarnException mockException = mock(YarnException.class);
|
||||
when(mockException.getMessage()).thenReturn(exStr);
|
||||
runHeartBeatThrowOutException(mockException);
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testAMRMClientAsyncRunTimeException() throws Exception {
|
||||
String exStr = "TestRunTimeException";
|
||||
RuntimeException mockRunTimeException = mock(RuntimeException.class);
|
||||
when(mockRunTimeException.getMessage()).thenReturn(exStr);
|
||||
runHeartBeatThrowOutException(mockRunTimeException);
|
||||
}
|
||||
|
||||
private void runHeartBeatThrowOutException(Exception ex) throws Exception{
|
||||
Configuration conf = new Configuration();
|
||||
TestCallbackHandler callbackHandler = new TestCallbackHandler();
|
||||
@SuppressWarnings("unchecked")
|
||||
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
||||
String exStr = "TestException";
|
||||
YarnException mockException = mock(YarnException.class);
|
||||
when(mockException.getMessage()).thenReturn(exStr);
|
||||
when(client.allocate(anyFloat())).thenThrow(mockException);
|
||||
when(client.allocate(anyFloat())).thenThrow(ex);
|
||||
|
||||
AMRMClientAsync<ContainerRequest> asyncClient =
|
||||
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
|
||||
|
@ -183,14 +195,14 @@ public class TestAMRMClientAsync {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr));
|
||||
Assert.assertTrue(callbackHandler.savedException.getMessage().contains(
|
||||
ex.getMessage()));
|
||||
|
||||
asyncClient.stop();
|
||||
// stopping should have joined all threads and completed all callbacks
|
||||
Assert.assertTrue(callbackHandler.callbackCount == 0);
|
||||
}
|
||||
|
||||
|
||||
@Test//(timeout=10000)
|
||||
public void testAMRMClientAsyncReboot() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
|
|
Loading…
Reference in New Issue