From 5f64166ba3df718cdfbf8d18ccc3e466a8a5f488 Mon Sep 17 00:00:00 2001 From: Bikas Saha Date: Wed, 24 Jul 2013 22:16:37 +0000 Subject: [PATCH] Merge r1506750 from trunk to branch-2 for YARN-875. Application can hang if AMRMClientAsync callback thread has exception (Xuan Gong via bikas) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1506752 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../distributedshell/ApplicationMaster.java | 3 +- .../client/api/async/AMRMClientAsync.java | 9 +- .../api/async/impl/AMRMClientAsyncImpl.java | 89 ++++++++------- .../api/async/impl/TestAMRMClientAsync.java | 102 +++++++++++++++--- 5 files changed, 149 insertions(+), 57 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cb7f3364ee5..dc24c643235 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -726,6 +726,9 @@ Release 2.1.0-beta - 2013-07-02 YARN-873. YARNClient.getApplicationReport(unknownAppId) returns a null report (Xuan Gong via bikas) + YARN-875. Application can hang if AMRMClientAsync callback thread has + exception (Xuan Gong via bikas) + BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS YARN-158. Yarn creating package-info.java must not depend on sh. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index df08b0da720..8914646cbe4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -649,8 +649,9 @@ public class ApplicationMaster { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { done = true; + resourceManager.stop(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 4771f0d0bd6..e726b73651a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -220,6 +220,13 @@ extends AbstractService { public float getProgress(); - public void onError(Exception e); + /** + * Called when error comes from RM communications as well as from errors in + * the callback itself from the app. Calling + * stop() is the recommended action. + * + * @param e + */ + public void onError(Throwable e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index eb8b72f1255..dd03d64e48c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -217,7 +217,7 @@ extends AMRMClientAsync { // synchronization ensures we don't send heartbeats after unregistering synchronized (unregisterHeartbeatLock) { if (!keepRunning) { - break; + return; } try { @@ -227,13 +227,13 @@ extends AMRMClientAsync { savedException = ex; // interrupt handler thread in case it waiting on the queue handlerThread.interrupt(); - break; + return; } catch (IOException e) { LOG.error("IO exception on heartbeat", e); savedException = e; // interrupt handler thread in case it waiting on the queue handlerThread.interrupt(); - break; + return; } } if (response != null) { @@ -266,51 +266,60 @@ extends AMRMClientAsync { } public void run() { - while (keepRunning) { - AllocateResponse response; + while (true) { + if (!keepRunning) { + return; + } try { + AllocateResponse response; if(savedException != null) { LOG.error("Stopping callback due to: ", savedException); handler.onError(savedException); - break; - } - response = responseQueue.take(); - } catch (InterruptedException ex) { - LOG.info("Interrupted while waiting for queue", ex); - continue; - } - - if (response.getAMCommand() != null) { - switch(response.getAMCommand()) { - case AM_RESYNC: - case AM_SHUTDOWN: - handler.onShutdownRequest(); - LOG.info("Shutdown requested. Stopping callback."); return; - default: - String msg = - "Unhandled value of AMCommand: " + response.getAMCommand(); - LOG.error(msg); - throw new YarnRuntimeException(msg); } - } - List updatedNodes = response.getUpdatedNodes(); - if (!updatedNodes.isEmpty()) { - handler.onNodesUpdated(updatedNodes); - } - - List completed = - response.getCompletedContainersStatuses(); - if (!completed.isEmpty()) { - handler.onContainersCompleted(completed); - } + try { + response = responseQueue.take(); + } catch (InterruptedException ex) { + LOG.info("Interrupted while waiting for queue", ex); + continue; + } - List allocated = response.getAllocatedContainers(); - if (!allocated.isEmpty()) { - handler.onContainersAllocated(allocated); + if (response.getAMCommand() != null) { + switch(response.getAMCommand()) { + case AM_RESYNC: + case AM_SHUTDOWN: + handler.onShutdownRequest(); + LOG.info("Shutdown requested. Stopping callback."); + return; + default: + String msg = + "Unhandled value of RM AMCommand: " + response.getAMCommand(); + LOG.error(msg); + throw new YarnRuntimeException(msg); + } + } + List updatedNodes = response.getUpdatedNodes(); + if (!updatedNodes.isEmpty()) { + handler.onNodesUpdated(updatedNodes); + } + + List completed = + response.getCompletedContainersStatuses(); + if (!completed.isEmpty()) { + handler.onContainersCompleted(completed); + } + + List allocated = response.getAllocatedContainers(); + if (!allocated.isEmpty()) { + handler.onContainersAllocated(allocated); + } + + progress = handler.getProgress(); + } catch (Throwable ex) { + handler.onError(ex); + // re-throw exception to end the thread + throw new YarnRuntimeException(ex); } - - progress = handler.getProgress(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index 8dd4ac6e668..0d96185ec4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -21,10 +21,12 @@ package org.apache.hadoop.yarn.client.api.async.impl; import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.ArrayList; @@ -54,6 +56,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -264,13 +267,13 @@ public class TestAMRMClientAsync { AMRMClientAsync asyncClient = AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); - callbackHandler.registerAsyncClient(asyncClient); + callbackHandler.asynClient = asyncClient; asyncClient.init(conf); asyncClient.start(); synchronized (callbackHandler.notifier) { asyncClient.registerApplicationMaster("localhost", 1234, null); - while(callbackHandler.stop == false) { + while(callbackHandler.notify == false) { try { callbackHandler.notifier.wait(); } catch (InterruptedException e) { @@ -280,6 +283,65 @@ public class TestAMRMClientAsync { } } + void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler) throws + InterruptedException, YarnException, IOException { + Configuration conf = new Configuration(); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + + List completed = Arrays.asList( + ContainerStatus.newInstance(newContainerId(0, 0, 0, 0), + ContainerState.COMPLETE, "", 0)); + final AllocateResponse response = createAllocateResponse(completed, + new ArrayList(), null); + + when(client.allocate(anyFloat())).thenReturn(response); + AMRMClientAsync asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); + callbackHandler.asynClient = asyncClient; + callbackHandler.throwOutException = true; + asyncClient.init(conf); + asyncClient.start(); + + // call register and wait for error callback and stop + synchronized (callbackHandler.notifier) { + asyncClient.registerApplicationMaster("localhost", 1234, null); + while(callbackHandler.notify == false) { + try { + callbackHandler.notifier.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + // verify error invoked + verify(callbackHandler, times(0)).getProgress(); + verify(callbackHandler, times(1)).onError(any(Exception.class)); + // sleep to wait for a few heartbeat calls that can trigger callbacks + Thread.sleep(50); + // verify no more invocations after the first one. + // ie. callback thread has stopped + verify(callbackHandler, times(0)).getProgress(); + verify(callbackHandler, times(1)).onError(any(Exception.class)); + } + + @Test (timeout = 5000) + public void testCallBackThrowOutException() throws YarnException, + IOException, InterruptedException { + // test exception in callback with app calling stop() on app.onError() + TestCallbackHandler2 callbackHandler = spy(new TestCallbackHandler2()); + runCallBackThrowOutException(callbackHandler); + } + + @Test (timeout = 5000) + public void testCallBackThrowOutExceptionNoStop() throws YarnException, + IOException, InterruptedException { + // test exception in callback with app not calling stop() on app.onError() + TestCallbackHandler2 callbackHandler = spy(new TestCallbackHandler2()); + callbackHandler.stop = false; + runCallBackThrowOutException(callbackHandler); + } + private AllocateResponse createAllocateResponse( List completed, List allocated, List nmTokens) { @@ -378,8 +440,8 @@ public class TestAMRMClientAsync { } @Override - public void onError(Exception e) { - savedException = e; + public void onError(Throwable e) { + savedException = new Exception(e.getMessage()); synchronized (notifier) { notifier.notifyAll(); } @@ -390,10 +452,16 @@ public class TestAMRMClientAsync { Object notifier = new Object(); @SuppressWarnings("rawtypes") AMRMClientAsync asynClient; - boolean stop = false; + boolean stop = true; + boolean notify = false; + boolean throwOutException = false; @Override - public void onContainersCompleted(List statuses) {} + public void onContainersCompleted(List statuses) { + if (throwOutException) { + throw new YarnRuntimeException("Exception from callback handler"); + } + } @Override public void onContainersAllocated(List containers) {} @@ -406,20 +474,24 @@ public class TestAMRMClientAsync { @Override public float getProgress() { - asynClient.stop(); - stop = true; - synchronized (notifier) { - notifier.notifyAll(); - } + callStopAndNotify(); return 0; } @Override - public void onError(Exception e) {} + public void onError(Throwable e) { + Assert.assertEquals(e.getMessage(), "Exception from callback handler"); + callStopAndNotify(); + } - public void registerAsyncClient( - AMRMClientAsync asyncClient) { - this.asynClient = asyncClient; + void callStopAndNotify() { + if(stop) { + asynClient.stop(); + } + notify = true; + synchronized (notifier) { + notifier.notifyAll(); + } } } }