From a4f9c7c9247801dd37beec6fc195622af1b884ad Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 18 Sep 2017 10:16:09 -0500 Subject: [PATCH] YARN-7192. Add a pluggable StateMachine Listener that is notified of NM Container State changes. Contributed by Arun Suresh --- .../hadoop/yarn/conf/YarnConfiguration.java | 6 +- .../state/MultiStateTransitionListener.java | 61 +++++++++++++++++ .../yarn/state/StateMachineFactory.java | 40 +++++++++++ .../yarn/state/StateTransitionListener.java | 50 ++++++++++++++ .../src/main/resources/yarn-default.xml | 6 ++ .../ContainerStateTransitionListener.java | 48 +++++++++++++ .../yarn/server/nodemanager/Context.java | 2 + .../yarn/server/nodemanager/NodeManager.java | 48 ++++++++++++- .../container/ContainerImpl.java | 3 +- .../server/nodemanager/TestNodeManager.java | 68 +++++++++++++++++++ .../amrmproxy/BaseAMRMProxyTest.java | 8 +++ .../container/TestContainer.java | 53 +++++++++++++++ 12 files changed, 389 insertions(+), 4 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/MultiStateTransitionListener.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateTransitionListener.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerStateTransitionListener.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 48910b372d7..114453f6dc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -968,9 +968,13 @@ public class YarnConfiguration extends Configuration { NM_PREFIX + "bind-host"; /** who will execute(launch) the containers.*/ - public static final String NM_CONTAINER_EXECUTOR = + public static final String NM_CONTAINER_EXECUTOR = NM_PREFIX + "container-executor.class"; + /** List of container state transition listeners.*/ + public static final String NM_CONTAINER_STATE_TRANSITION_LISTENERS = + NM_PREFIX + "container-state-transition-listener.classes"; + /** * Adjustment to make to the container os scheduling priority. * The valid values for this could vary depending on the platform. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/MultiStateTransitionListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/MultiStateTransitionListener.java new file mode 100644 index 00000000000..1a28fc50002 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/MultiStateTransitionListener.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.state; + +import java.util.ArrayList; +import java.util.List; + +/** + * A {@link StateTransitionListener} that dispatches the pre and post + * state transitions to multiple registered listeners. + * NOTE: The registered listeners are called in a for loop. Clients should + * know that a listener configured earlier might prevent a later listener + * from being called, if for instance it throws an un-caught Exception. + */ +public abstract class MultiStateTransitionListener + > implements + StateTransitionListener { + + private final List> listeners = + new ArrayList<>(); + + /** + * Add a listener to the list of listeners. + * @param listener A listener. + */ + public void addListener(StateTransitionListener + listener) { + listeners.add(listener); + } + + @Override + public void preTransition(OPERAND op, STATE beforeState, + EVENT eventToBeProcessed) { + for (StateTransitionListener listener : listeners) { + listener.preTransition(op, beforeState, eventToBeProcessed); + } + } + + @Override + public void postTransition(OPERAND op, STATE beforeState, STATE afterState, + EVENT processedEvent) { + for (StateTransitionListener listener : listeners) { + listener.postTransition(op, beforeState, afterState, processedEvent); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateMachineFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateMachineFactory.java index 5b76ce8fb52..4bb005c0536 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateMachineFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateMachineFactory.java @@ -391,6 +391,21 @@ final public class StateMachineFactory } } + /** + * A StateMachine that accepts a transition listener. + * @param operand the object upon which the returned + * {@link StateMachine} will operate. + * @param initialState the state in which the returned + * {@link StateMachine} will start. + * @param listener An implementation of a {@link StateTransitionListener}. + * @return A (@link StateMachine}. + */ + public StateMachine + make(OPERAND operand, STATE initialState, + StateTransitionListener listener) { + return new InternalStateMachine(operand, initialState, listener); + } + /* * @return a {@link StateMachine} that starts in * {@code initialState} and whose {@link Transition} s are @@ -424,14 +439,36 @@ final public class StateMachineFactory return new InternalStateMachine(operand, defaultInitialState); } + private static class NoopStateTransitionListener + implements StateTransitionListener { + @Override + public void preTransition(Object op, Enum beforeState, + Object eventToBeProcessed) { } + + @Override + public void postTransition(Object op, Enum beforeState, Enum afterState, + Object processedEvent) { } + } + + private static final NoopStateTransitionListener NOOP_LISTENER = + new NoopStateTransitionListener(); + private class InternalStateMachine implements StateMachine { private final OPERAND operand; private STATE currentState; + private final StateTransitionListener listener; InternalStateMachine(OPERAND operand, STATE initialState) { + this(operand, initialState, null); + } + + InternalStateMachine(OPERAND operand, STATE initialState, + StateTransitionListener transitionListener) { this.operand = operand; this.currentState = initialState; + this.listener = + (transitionListener == null) ? NOOP_LISTENER : transitionListener; if (!optimized) { maybeMakeStateMachineTable(); } @@ -445,8 +482,11 @@ final public class StateMachineFactory @Override public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event) throws InvalidStateTransitionException { + listener.preTransition(operand, currentState, event); + STATE oldState = currentState; currentState = StateMachineFactory.this.doTransition (operand, currentState, eventType, event); + listener.postTransition(operand, oldState, currentState, event); return currentState; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateTransitionListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateTransitionListener.java new file mode 100644 index 00000000000..657c19398a1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateTransitionListener.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.state; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A State Transition Listener. + * It exposes a pre and post transition hook called before and + * after the transition. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface StateTransitionListener + > { + + /** + * Pre Transition Hook. This will be called before transition. + * @param op Operand. + * @param beforeState State before transition. + * @param eventToBeProcessed Incoming Event. + */ + void preTransition(OPERAND op, STATE beforeState, EVENT eventToBeProcessed); + + /** + * Post Transition Hook. This will be called after the transition. + * @param op Operand. + * @param beforeState State before transition. + * @param afterState State after transition. + * @param processedEvent Processed Event. + */ + void postTransition(OPERAND op, STATE beforeState, STATE afterState, + EVENT processedEvent); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 6444da9a478..0440458e5e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1003,6 +1003,12 @@ org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor + + Comma separated List of container state transition listeners. + yarn.nodemanager.container-state-transition-listener.classes + + + Number of threads container manager uses. yarn.nodemanager.container-manager.thread-count diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerStateTransitionListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerStateTransitionListener.java new file mode 100644 index 00000000000..24cdb1f7abb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerStateTransitionListener.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.state.StateTransitionListener; + +/** + * Interface to be used by external cluster operators to implement a + * State Transition listener that is notified before and after a container + * state transition. + * NOTE: The pre and post transition callbacks will be made in the synchronized + * block as the call to the instrumented transition - Serially, in the + * order: preTransition, transition and postTransition. The implementor + * must ensure that the callbacks return in a timely manner to avoid + * blocking the state-machine. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface ContainerStateTransitionListener extends + StateTransitionListener { + + /** + * Init method which will be invoked by the Node Manager to inject the + * NM {@link Context}. + * @param context NM Context. + */ + void init(Context context); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 00bd0efcb17..a2d00a4cc24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -120,4 +120,6 @@ public interface Context { NMTimelinePublisher getNMTimelinePublisher(); ContainerExecutor getContainerExecutor(); + + ContainerStateTransitionListener getContainerStateTransitionListener(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 3e919c5cdad..a97b3f2c2aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -20,12 +20,18 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.state.MultiStateTransitionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,6 +136,17 @@ public class NodeManager extends CompositeService private boolean rmWorkPreservingRestartEnabled; private boolean shouldExitOnShutdownEvent = false; + /** + * Default Container State transition listener. + */ + public static class DefaultContainerStateListener extends + MultiStateTransitionListener + + implements ContainerStateTransitionListener { + @Override + public void init(Context context) {} + } + public NodeManager() { super(NodeManager.class.getName()); } @@ -219,8 +236,22 @@ public class NodeManager extends CompositeService NMTokenSecretManagerInNM nmTokenSecretManager, NMStateStoreService stateStore, boolean isDistSchedulerEnabled, Configuration conf) { - return new NMContext(containerTokenSecretManager, nmTokenSecretManager, - dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled, conf); + List listeners = + conf.getInstances( + YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS, + ContainerStateTransitionListener.class); + NMContext nmContext = new NMContext(containerTokenSecretManager, + nmTokenSecretManager, dirsHandler, aclsManager, stateStore, + isDistSchedulerEnabled, conf); + DefaultContainerStateListener defaultListener = + new DefaultContainerStateListener(); + nmContext.setContainerStateTransitionListener(defaultListener); + defaultListener.init(nmContext); + for (ContainerStateTransitionListener listener : listeners) { + listener.init(nmContext); + defaultListener.addListener(listener); + } + return nmContext; } protected void doSecureLogin() throws IOException { @@ -563,6 +594,8 @@ public class NodeManager extends CompositeService private NMTimelinePublisher nmTimelinePublisher; + private ContainerStateTransitionListener containerStateTransitionListener; + public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, @@ -752,6 +785,17 @@ public class NodeManager extends CompositeService public void setContainerExecutor(ContainerExecutor executor) { this.executor = executor; } + + @Override + public ContainerStateTransitionListener + getContainerStateTransitionListener() { + return this.containerStateTransitionListener; + } + + public void setContainerStateTransitionListener( + ContainerStateTransitionListener transitionListener) { + this.containerStateTransitionListener = transitionListener; + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 9b9c47fee1a..df107a7d52f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -240,7 +240,8 @@ public class ContainerImpl implements Container { this.containerRetryContext = configureRetryContext( conf, launchContext, this.containerId); this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries(); - stateMachine = stateMachineFactory.make(this); + stateMachine = stateMachineFactory.make(this, ContainerState.NEW, + context.getContainerStateTransitionListener()); this.context = context; this.resourceSet = new ResourceSet(); this.resourceMappings = new ResourceMappings(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java index 2d390ac998a..92797116075 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java @@ -25,6 +25,9 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.junit.Assert; import org.junit.Test; @@ -57,6 +60,71 @@ public class TestNodeManager { } } + private static int initCalls = 0; + private static int preCalls = 0; + private static int postCalls = 0; + + private static class DummyCSTListener1 + implements ContainerStateTransitionListener { + @Override + public void init(Context context) { + initCalls++; + } + + @Override + public void preTransition(ContainerImpl op, ContainerState beforeState, + ContainerEvent eventToBeProcessed) { + preCalls++; + } + + @Override + public void postTransition(ContainerImpl op, ContainerState beforeState, + ContainerState afterState, ContainerEvent processedEvent) { + postCalls++; + } + } + + private static class DummyCSTListener2 + implements ContainerStateTransitionListener { + @Override + public void init(Context context) { + initCalls++; + } + + @Override + public void preTransition(ContainerImpl op, ContainerState beforeState, + ContainerEvent eventToBeProcessed) { + preCalls++; + } + + @Override + public void postTransition(ContainerImpl op, ContainerState beforeState, + ContainerState afterState, ContainerEvent processedEvent) { + postCalls++; + } + } + + @Test + public void testListenerInitialization() throws Exception{ + NodeManager nodeManager = new NodeManager(); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS, + DummyCSTListener1.class.getName() + "," + + DummyCSTListener2.class.getName()); + initCalls = 0; + preCalls = 0; + postCalls = 0; + NodeManager.NMContext nmContext = + nodeManager.createNMContext(null, null, null, false, conf); + Assert.assertEquals(2, initCalls); + nmContext.getContainerStateTransitionListener().preTransition( + null, null, null); + nmContext.getContainerStateTransitionListener().postTransition( + null, null, null, null); + Assert.assertEquals(2, preCalls); + Assert.assertEquals(2, postCalls); + } + @Test public void testCreationOfNodeLabelsProviderService() throws InterruptedException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 7c8551e75b0..0838f1e523a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -33,6 +33,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -765,5 +767,11 @@ public abstract class BaseAMRMProxyTest { public ContainerExecutor getContainerExecutor() { return null; } + + @Override + public ContainerStateTransitionListener + getContainerStateTransitionListener() { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 8909088020a..64e6cf0b0fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -71,7 +71,9 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; +import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -287,6 +289,29 @@ public class TestContainer { assertEquals(ContainerState.DONE, wc.c.getContainerState()); assertEquals(completed + 1, metrics.getCompletedContainers()); assertEquals(running, metrics.getRunningContainers()); + + ContainerEventType e1 = wc.initStateToEvent.get(ContainerState.NEW); + ContainerState s2 = wc.eventToFinalState.get(e1); + ContainerEventType e2 = wc.initStateToEvent.get(s2); + ContainerState s3 = wc.eventToFinalState.get(e2); + ContainerEventType e3 = wc.initStateToEvent.get(s3); + ContainerState s4 = wc.eventToFinalState.get(e3); + ContainerEventType e4 = wc.initStateToEvent.get(s4); + ContainerState s5 = wc.eventToFinalState.get(e4); + ContainerEventType e5 = wc.initStateToEvent.get(s5); + ContainerState s6 = wc.eventToFinalState.get(e5); + + Assert.assertEquals(ContainerState.LOCALIZING, s2); + Assert.assertEquals(ContainerState.SCHEDULED, s3); + Assert.assertEquals(ContainerState.RUNNING, s4); + Assert.assertEquals(ContainerState.EXITED_WITH_SUCCESS, s5); + Assert.assertEquals(ContainerState.DONE, s6); + + Assert.assertEquals(ContainerEventType.INIT_CONTAINER, e1); + Assert.assertEquals(ContainerEventType.RESOURCE_LOCALIZED, e2); + Assert.assertEquals(ContainerEventType.CONTAINER_LAUNCHED, e3); + Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, e4); + Assert.assertEquals(ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, e5); } finally { if (wc != null) { @@ -401,6 +426,10 @@ public class TestContainer { Assert.assertTrue( containerMetrics.finishTime.value() > containerMetrics.startTime .value()); + Assert.assertEquals(ContainerEventType.KILL_CONTAINER, + wc.initStateToEvent.get(ContainerState.NEW)); + Assert.assertEquals(ContainerState.DONE, + wc.eventToFinalState.get(ContainerEventType.KILL_CONTAINER)); } finally { if (wc != null) { wc.finished(); @@ -942,6 +971,10 @@ public class TestContainer { final Map localResources; final Map serviceData; final Context context = mock(Context.class); + private final Map initStateToEvent = + new HashMap<>(); + private final Map eventToFinalState = + new HashMap<>(); WrappedContainer(int appId, long timestamp, int id, String user) throws IOException { @@ -1048,7 +1081,27 @@ public class TestContainer { } when(ctxt.getServiceData()).thenReturn(serviceData); when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext); + ContainerStateTransitionListener listener = + new ContainerStateTransitionListener() { + @Override + public void init(Context cntxt) {} + @Override + public void preTransition(ContainerImpl op, ContainerState beforeState, + ContainerEvent eventToBeProcessed) { + initStateToEvent.put(beforeState, eventToBeProcessed.getType()); + } + + @Override + public void postTransition(ContainerImpl op, ContainerState beforeState, + ContainerState afterState, ContainerEvent processedEvent) { + eventToFinalState.put(processedEvent.getType(), afterState); + } + }; + NodeManager.DefaultContainerStateListener multi = + new NodeManager.DefaultContainerStateListener(); + multi.addListener(listener); + when(context.getContainerStateTransitionListener()).thenReturn(multi); c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier, context); dispatcher.register(ContainerEventType.class,