From 05b4abf5370462b8d8e11d3fbef4a1badd99be60 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Thu, 7 Sep 2017 10:23:12 -0700 Subject: [PATCH] YARN-6978. Add updateContainer API to NMClient. (Kartheek Muthyala via asuresh) (cherry picked from commit c41118a7f826dcbb269b4bd3d5877b35fbbee2b1) --- .../distributedshell/ApplicationMaster.java | 19 +++- .../hadoop/yarn/client/api/NMClient.java | 18 ++++ .../yarn/client/api/async/NMClientAsync.java | 36 ++++++++ .../api/async/impl/NMClientAsyncImpl.java | 92 ++++++++++++++----- .../yarn/client/api/impl/NMClientImpl.java | 29 ++++++ .../api/async/impl/TestNMClientAsync.java | 60 +++++++++++- .../yarn/client/api/impl/TestAMRMClient.java | 1 + .../yarn/client/api/impl/TestNMClient.java | 5 +- 8 files changed, 227 insertions(+), 33 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index a02af709116..5ec9409096e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -1026,10 +1026,6 @@ public class ApplicationMaster { } } - @Override - public void onContainerResourceIncreased( - ContainerId containerId, Resource resource) {} - @Override public void onStartContainerError(ContainerId containerId, Throwable t) { LOG.error("Failed to start Container " + containerId, t); @@ -1050,10 +1046,25 @@ public class ApplicationMaster { containers.remove(containerId); } + @Deprecated @Override public void onIncreaseContainerResourceError( ContainerId containerId, Throwable t) {} + @Deprecated + @Override + public void onContainerResourceIncreased( + ContainerId containerId, Resource resource) {} + + @Override + public void onUpdateContainerResourceError( + ContainerId containerId, Throwable t) { + } + + @Override + public void onContainerResourceUpdated(ContainerId containerId, + Resource resource) { + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index c1447ba4a41..17168f799e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -104,9 +104,27 @@ public abstract class NMClient extends AbstractService { * @throws YarnException YarnException. * @throws IOException IOException. */ + @Deprecated public abstract void increaseContainerResource(Container container) throws YarnException, IOException; + /** + *

Update the resources of a container.

+ * + *

The ApplicationMaster or other applications that use the + * client must provide the details of the container, including the Id and + * the target resource encapsulated in the updated container token via + * {@link Container}. + *

+ * + * @param container the container with updated token. + * + * @throws YarnException YarnException. + * @throws IOException IOException. + */ + public abstract void updateContainerResource(Container container) + throws YarnException, IOException; + /** *

Stop an started container.

* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java index c94942ae903..62e2d993e4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java @@ -177,8 +177,22 @@ public abstract class NMClientAsync extends AbstractService { public abstract void startContainerAsync( Container container, ContainerLaunchContext containerLaunchContext); + @Deprecated public abstract void increaseContainerResourceAsync(Container container); + /** + *

Update the resources of a container.

+ * + *

The ApplicationMaster or other applications that use the + * client must provide the details of the container, including the Id and + * the target resource encapsulated in the updated container token via + * {@link Container}. + *

+ * + * @param container the container with updated token. + */ + public abstract void updateContainerResourceAsync(Container container); + /** *

Re-Initialize the Container.

* @@ -301,9 +315,20 @@ public abstract class NMClientAsync extends AbstractService { * @param containerId the Id of the container * @param resource the target resource of the container */ + @Deprecated public abstract void onContainerResourceIncreased( ContainerId containerId, Resource resource); + /** + * The API is called when NodeManager responds to indicate + * the container resource has been successfully updated. + * + * @param containerId the Id of the container + * @param resource the target resource of the container + */ + public abstract void onContainerResourceUpdated( + ContainerId containerId, Resource resource); + /** * The API is called when an exception is raised in the process of * querying the status of a container. @@ -321,9 +346,20 @@ public abstract class NMClientAsync extends AbstractService { * @param containerId the Id of the container * @param t the raised exception */ + @Deprecated public abstract void onIncreaseContainerResourceError( ContainerId containerId, Throwable t); + /** + * The API is called when an exception is raised in the process of + * updating container resource. + * + * @param containerId the Id of the container + * @param t the raised exception + */ + public abstract void onUpdateContainerResourceError( + ContainerId containerId, Throwable t); + /** * The API is called when an exception is raised in the process of * stopping a container. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java index 20be71eaa71..4d14180c756 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java @@ -259,6 +259,7 @@ public class NMClientAsyncImpl extends NMClientAsync { } } + @Deprecated public void increaseContainerResourceAsync(Container container) { if (!(callbackHandler instanceof AbstractCallbackHandler)) { LOG.error("Callback handler does not implement container resource " @@ -274,7 +275,7 @@ public class NMClientAsyncImpl extends NMClientAsync { " is neither started nor scheduled to start")); } try { - events.put(new IncreaseContainerResourceEvent(container)); + events.put(new UpdateContainerResourceEvent(container, true)); } catch (InterruptedException e) { LOG.warn("Exception when scheduling the event of increasing resource of " + "Container " + container.getId()); @@ -282,6 +283,30 @@ public class NMClientAsyncImpl extends NMClientAsync { } } + @Override + public void updateContainerResourceAsync(Container container) { + if (!(callbackHandler instanceof AbstractCallbackHandler)) { + LOG.error("Callback handler does not implement container resource " + + "increase callback methods"); + return; + } + AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler; + if (containers.get(container.getId()) == null) { + handler.onUpdateContainerResourceError( + container.getId(), + RPCUtil.getRemoteException( + "Container " + container.getId() + + " is neither started nor scheduled to start")); + } + try { + events.put(new UpdateContainerResourceEvent(container, false)); + } catch (InterruptedException e) { + LOG.warn("Exception when scheduling the event of increasing resource of " + + "Container " + container.getId()); + handler.onUpdateContainerResourceError(container.getId(), e); + } + } + @Override public void reInitializeContainerAsync(ContainerId containerId, ContainerLaunchContext containerLaunchContex, boolean autoCommit){ @@ -427,7 +452,7 @@ public class NMClientAsyncImpl extends NMClientAsync { START_CONTAINER, STOP_CONTAINER, QUERY_CONTAINER, - INCREASE_CONTAINER_RESOURCE, + UPDATE_CONTAINER_RESOURCE, REINITIALIZE_CONTAINER, RESTART_CONTAINER, ROLLBACK_LAST_REINIT, @@ -503,14 +528,20 @@ public class NMClientAsyncImpl extends NMClientAsync { } } - protected static class IncreaseContainerResourceEvent extends ContainerEvent { + protected static class UpdateContainerResourceEvent extends ContainerEvent { private Container container; + private boolean isIncreaseEvent; - public IncreaseContainerResourceEvent(Container container) { + // UpdateContainerResourceEvent constructor takes in a + // flag to support callback API's calling through the deprecated + // increaseContainerResource + public UpdateContainerResourceEvent(Container container, + boolean isIncreaseEvent) { super(container.getId(), container.getNodeId(), container.getContainerToken(), - ContainerEventType.INCREASE_CONTAINER_RESOURCE); + ContainerEventType.UPDATE_CONTAINER_RESOURCE); this.container = container; + this.isIncreaseEvent = isIncreaseEvent; } public Container getContainer() { @@ -536,8 +567,8 @@ public class NMClientAsyncImpl extends NMClientAsync { // Transitions from RUNNING state .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, - ContainerEventType.INCREASE_CONTAINER_RESOURCE, - new IncreaseContainerResourceTransition()) + ContainerEventType.UPDATE_CONTAINER_RESOURCE, + new UpdateContainerResourceTransition()) // Transitions for Container Upgrade .addTransition(ContainerState.RUNNING, @@ -566,7 +597,7 @@ public class NMClientAsyncImpl extends NMClientAsync { .addTransition(ContainerState.DONE, ContainerState.DONE, EnumSet.of(ContainerEventType.START_CONTAINER, ContainerEventType.STOP_CONTAINER, - ContainerEventType.INCREASE_CONTAINER_RESOURCE)) + ContainerEventType.UPDATE_CONTAINER_RESOURCE)) // Transition from FAILED state .addTransition(ContainerState.FAILED, ContainerState.FAILED, @@ -576,7 +607,7 @@ public class NMClientAsyncImpl extends NMClientAsync { ContainerEventType.RESTART_CONTAINER, ContainerEventType.COMMIT_LAST_REINT, ContainerEventType.ROLLBACK_LAST_REINIT, - ContainerEventType.INCREASE_CONTAINER_RESOURCE)); + ContainerEventType.UPDATE_CONTAINER_RESOURCE)); protected static class StartContainerTransition implements MultipleArcTransition { + + @SuppressWarnings("deprecation") @Override public void transition( StatefulContainer container, ContainerEvent event) { + boolean isIncreaseEvent = false; if (!(container.nmClientAsync.getCallbackHandler() instanceof AbstractCallbackHandler)) { LOG.error("Callback handler does not implement container resource " - + "increase callback methods"); + + "update callback methods"); return; } AbstractCallbackHandler handler = (AbstractCallbackHandler) container.nmClientAsync .getCallbackHandler(); try { - if (!(event instanceof IncreaseContainerResourceEvent)) { + if (!(event instanceof UpdateContainerResourceEvent)) { throw new AssertionError("Unexpected event type. Expecting:" - + "IncreaseContainerResourceEvent. Got:" + event); + + "UpdateContainerResourceEvent. Got:" + event); } - IncreaseContainerResourceEvent increaseEvent = - (IncreaseContainerResourceEvent) event; - container.nmClientAsync.getClient().increaseContainerResource( - increaseEvent.getContainer()); + UpdateContainerResourceEvent updateEvent = + (UpdateContainerResourceEvent) event; + container.nmClientAsync.getClient().updateContainerResource( + updateEvent.getContainer()); + isIncreaseEvent = updateEvent.isIncreaseEvent; try { - handler.onContainerResourceIncreased( - increaseEvent.getContainerId(), increaseEvent.getContainer() - .getResource()); + //If isIncreaseEvent is set, set the appropriate callbacks + //for backward compatibility + if (isIncreaseEvent) { + handler.onContainerResourceIncreased(updateEvent.getContainerId(), + updateEvent.getContainer().getResource()); + } else { + handler.onContainerResourceUpdated(updateEvent.getContainerId(), + updateEvent.getContainer().getResource()); + } } catch (Throwable thr) { // Don't process user created unchecked exception LOG.info("Unchecked exception is thrown from " - + "onContainerResourceIncreased for Container " + + "onContainerResourceUpdated for Container " + event.getContainerId(), thr); } } catch (Exception e) { try { - handler.onIncreaseContainerResourceError(event.getContainerId(), e); + if (isIncreaseEvent) { + handler + .onIncreaseContainerResourceError(event.getContainerId(), e); + } else { + handler.onUpdateContainerResourceError(event.getContainerId(), e); + } } catch (Throwable thr) { // Don't process user created unchecked exception LOG.info("Unchecked exception is thrown from " - + "onIncreaseContainerResourceError for Container " + + "onUpdateContainerResourceError for Container " + event.getContainerId(), thr); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index 8171de228aa..ca44e2e3441 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -230,6 +230,7 @@ public class NMClientImpl extends NMClient { } } + @Deprecated @Override public void increaseContainerResource(Container container) throws YarnException, IOException { @@ -258,6 +259,34 @@ public class NMClientImpl extends NMClient { } } + @Override + public void updateContainerResource(Container container) + throws YarnException, IOException { + ContainerManagementProtocolProxyData proxy = null; + try { + proxy = + cmProxy.getProxy(container.getNodeId().toString(), container.getId()); + List updateTokens = new ArrayList<>(); + updateTokens.add(container.getContainerToken()); + + ContainerUpdateRequest request = + ContainerUpdateRequest.newInstance(updateTokens); + ContainerUpdateResponse response = + proxy.getContainerManagementProtocol().updateContainer(request); + + if (response.getFailedRequests() != null && response.getFailedRequests() + .containsKey(container.getId())) { + Throwable t = + response.getFailedRequests().get(container.getId()).deSerialize(); + parseAndThrowException(t); + } + } finally { + if (proxy != null) { + cmProxy.mayBeCloseProxy(proxy); + } + } + } + @Override public void stopContainer(ContainerId containerId, NodeId nodeId) throws YarnException, IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java index dda3eec0d0d..6c7270457a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java @@ -253,7 +253,7 @@ public class TestNMClientAsync { int t = containerId.getId() % 5; switch (t) { case 0: - asyncClient.increaseContainerResourceAsync(container); + asyncClient.updateContainerResourceAsync(container); break; case 1: asyncClient.reInitializeContainerAsync(containerId, @@ -295,7 +295,7 @@ public class TestNMClientAsync { // containerId Container container = Container.newInstance( containerId, nodeId, null, null, null, containerToken); - asyncClient.increaseContainerResourceAsync(container); + asyncClient.updateContainerResourceAsync(container); // Shouldn't crash the test thread throw new RuntimeException("Ignorable Exception"); @@ -320,6 +320,25 @@ public class TestNMClientAsync { throw new RuntimeException("Ignorable Exception"); } + @SuppressWarnings("deprecation") + @Override + public void onContainerResourceUpdated(ContainerId containerId, + Resource resource) { + if (containerId.getId() >= expectedSuccess) { + errorMsgs.add("Container " + containerId + + " should throw the exception onContainerResourceUpdated"); + return; + } + TestData td = testMap.get(OpsToTest.INCR); + td.success.addAndGet(1); + td.successArray.set(containerId.getId(), 1); + // move on to the following success tests + asyncClient.reInitializeContainerAsync(containerId, + Records.newRecord(ContainerLaunchContext.class), true); + // throw a fake user exception, and shouldn't crash the test + throw new RuntimeException("Ignorable Exception"); + } + @SuppressWarnings("deprecation") @Override public void onContainerReInitialize(ContainerId containerId) { @@ -450,6 +469,27 @@ public class TestNMClientAsync { throw new RuntimeException("Ignorable Exception"); } + @SuppressWarnings("deprecation") + @Override + public void onUpdateContainerResourceError(ContainerId containerId, + Throwable t) { + if (containerId.getId() < expectedSuccess + expectedFailure) { + errorMsgs.add("Container " + containerId + + " shouldn't throw the exception onUpdatedContainerResourceError"); + return; + } + TestData td = testMap.get(OpsToTest.INCR); + td.failure.addAndGet(1); + td.failureArray.set( + containerId.getId() - expectedSuccess - expectedFailure, 1); + // increase container resource error should NOT change the + // the container status to FAILED + // move on to the following failure tests + asyncClient.stopContainerAsync(containerId, nodeId); + // Shouldn't crash the test thread + throw new RuntimeException("Ignorable Exception"); + } + @SuppressWarnings("deprecation") @Override public void onContainerReInitializeError(ContainerId containerId, @@ -673,7 +713,7 @@ public class TestNMClientAsync { when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class))).thenReturn( recordFactory.newRecordInstance(ContainerStatus.class)); - doNothing().when(client).increaseContainerResource( + doNothing().when(client).updateContainerResource( any(Container.class)); doNothing().when(client).reInitializeContainer( any(ContainerId.class), any(ContainerLaunchContext.class), @@ -703,7 +743,7 @@ public class TestNMClientAsync { any(NodeId.class))).thenReturn( recordFactory.newRecordInstance(ContainerStatus.class)); doThrow(RPCUtil.getRemoteException("Increase Resource Exception")) - .when(client).increaseContainerResource(any(Container.class)); + .when(client).updateContainerResource(any(Container.class)); doThrow(RPCUtil.getRemoteException("ReInitialize Exception")) .when(client).reInitializeContainer( any(ContainerId.class), any(ContainerLaunchContext.class), @@ -818,10 +858,16 @@ public class TestNMClientAsync { ContainerStatus containerStatus) { } + @Deprecated @Override public void onContainerResourceIncreased( ContainerId containerId, Resource resource) {} + @Override + public void onContainerResourceUpdated(ContainerId containerId, + Resource resource) { + } + @Override public void onContainerStopped(ContainerId containerId) { } @@ -847,10 +893,16 @@ public class TestNMClientAsync { Throwable t) { } + @Deprecated @Override public void onIncreaseContainerResourceError( ContainerId containerId, Throwable t) {} + @Override + public void onUpdateContainerResourceError(ContainerId containerId, + Throwable t) { + } + @Override public void onStopContainerError(ContainerId containerId, Throwable t) { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 09b12f2c485..fa2e7a57130 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -1428,6 +1428,7 @@ public class TestAMRMClient { amClient.ask.clear(); } + @SuppressWarnings("deprecation") private void updateContainerExecType(AllocateResponse allocResponse, ExecutionType expectedExecType, NMClientImpl nmClient) throws IOException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 9b79e2d6faf..b23a923513c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -301,10 +301,10 @@ public class TestNMClient { assertTrue("The thrown exception is not expected", e.getMessage().contains("is not handled by this NodeManager")); } - // increaseContainerResource shouldn't be called before startContainer, + // upadateContainerResource shouldn't be called before startContainer, // otherwise, NodeManager cannot find the container try { - nmClient.increaseContainerResource(container); + nmClient.updateContainerResource(container); fail("Exception is expected"); } catch (YarnException e) { assertTrue("The thrown exception is not expected", @@ -469,6 +469,7 @@ public class TestNMClient { } } + @SuppressWarnings("deprecation") private void testIncreaseContainerResource(Container container) throws YarnException, IOException { try {