YARN-7192. Add a pluggable StateMachine Listener that is notified of NM Container State changes. Contributed by Arun Suresh
(cherry picked from commit a4f9c7c924
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
This commit is contained in:
parent
f42705b957
commit
cd6cf0caf6
|
@ -872,6 +872,10 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String NM_CONTAINER_EXECUTOR =
|
public static final String NM_CONTAINER_EXECUTOR =
|
||||||
NM_PREFIX + "container-executor.class";
|
NM_PREFIX + "container-executor.class";
|
||||||
|
|
||||||
|
/** List of container state transition listeners.*/
|
||||||
|
public static final String NM_CONTAINER_STATE_TRANSITION_LISTENERS =
|
||||||
|
NM_PREFIX + "container-state-transition-listener.classes";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adjustment to make to the container os scheduling priority.
|
* Adjustment to make to the container os scheduling priority.
|
||||||
* The valid values for this could vary depending on the platform.
|
* The valid values for this could vary depending on the platform.
|
||||||
|
|
|
@ -0,0 +1,61 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.state;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link StateTransitionListener} that dispatches the pre and post
|
||||||
|
* state transitions to multiple registered listeners.
|
||||||
|
* NOTE: The registered listeners are called in a for loop. Clients should
|
||||||
|
* know that a listener configured earlier might prevent a later listener
|
||||||
|
* from being called, if for instance it throws an un-caught Exception.
|
||||||
|
*/
|
||||||
|
public abstract class MultiStateTransitionListener
|
||||||
|
<OPERAND, EVENT, STATE extends Enum<STATE>> implements
|
||||||
|
StateTransitionListener<OPERAND, EVENT, STATE> {
|
||||||
|
|
||||||
|
private final List<StateTransitionListener<OPERAND, EVENT, STATE>> listeners =
|
||||||
|
new ArrayList<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a listener to the list of listeners.
|
||||||
|
* @param listener A listener.
|
||||||
|
*/
|
||||||
|
public void addListener(StateTransitionListener<OPERAND, EVENT, STATE>
|
||||||
|
listener) {
|
||||||
|
listeners.add(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preTransition(OPERAND op, STATE beforeState,
|
||||||
|
EVENT eventToBeProcessed) {
|
||||||
|
for (StateTransitionListener<OPERAND, EVENT, STATE> listener : listeners) {
|
||||||
|
listener.preTransition(op, beforeState, eventToBeProcessed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postTransition(OPERAND op, STATE beforeState, STATE afterState,
|
||||||
|
EVENT processedEvent) {
|
||||||
|
for (StateTransitionListener<OPERAND, EVENT, STATE> listener : listeners) {
|
||||||
|
listener.postTransition(op, beforeState, afterState, processedEvent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -391,6 +391,21 @@ final public class StateMachineFactory
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A StateMachine that accepts a transition listener.
|
||||||
|
* @param operand the object upon which the returned
|
||||||
|
* {@link StateMachine} will operate.
|
||||||
|
* @param initialState the state in which the returned
|
||||||
|
* {@link StateMachine} will start.
|
||||||
|
* @param listener An implementation of a {@link StateTransitionListener}.
|
||||||
|
* @return A (@link StateMachine}.
|
||||||
|
*/
|
||||||
|
public StateMachine<STATE, EVENTTYPE, EVENT>
|
||||||
|
make(OPERAND operand, STATE initialState,
|
||||||
|
StateTransitionListener<OPERAND, EVENT, STATE> listener) {
|
||||||
|
return new InternalStateMachine(operand, initialState, listener);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* @return a {@link StateMachine} that starts in
|
* @return a {@link StateMachine} that starts in
|
||||||
* {@code initialState} and whose {@link Transition} s are
|
* {@code initialState} and whose {@link Transition} s are
|
||||||
|
@ -424,14 +439,36 @@ final public class StateMachineFactory
|
||||||
return new InternalStateMachine(operand, defaultInitialState);
|
return new InternalStateMachine(operand, defaultInitialState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class NoopStateTransitionListener
|
||||||
|
implements StateTransitionListener {
|
||||||
|
@Override
|
||||||
|
public void preTransition(Object op, Enum beforeState,
|
||||||
|
Object eventToBeProcessed) { }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postTransition(Object op, Enum beforeState, Enum afterState,
|
||||||
|
Object processedEvent) { }
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final NoopStateTransitionListener NOOP_LISTENER =
|
||||||
|
new NoopStateTransitionListener();
|
||||||
|
|
||||||
private class InternalStateMachine
|
private class InternalStateMachine
|
||||||
implements StateMachine<STATE, EVENTTYPE, EVENT> {
|
implements StateMachine<STATE, EVENTTYPE, EVENT> {
|
||||||
private final OPERAND operand;
|
private final OPERAND operand;
|
||||||
private STATE currentState;
|
private STATE currentState;
|
||||||
|
private final StateTransitionListener<OPERAND, EVENT, STATE> listener;
|
||||||
|
|
||||||
InternalStateMachine(OPERAND operand, STATE initialState) {
|
InternalStateMachine(OPERAND operand, STATE initialState) {
|
||||||
|
this(operand, initialState, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
InternalStateMachine(OPERAND operand, STATE initialState,
|
||||||
|
StateTransitionListener<OPERAND, EVENT, STATE> transitionListener) {
|
||||||
this.operand = operand;
|
this.operand = operand;
|
||||||
this.currentState = initialState;
|
this.currentState = initialState;
|
||||||
|
this.listener =
|
||||||
|
(transitionListener == null) ? NOOP_LISTENER : transitionListener;
|
||||||
if (!optimized) {
|
if (!optimized) {
|
||||||
maybeMakeStateMachineTable();
|
maybeMakeStateMachineTable();
|
||||||
}
|
}
|
||||||
|
@ -445,8 +482,11 @@ final public class StateMachineFactory
|
||||||
@Override
|
@Override
|
||||||
public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)
|
public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)
|
||||||
throws InvalidStateTransitionException {
|
throws InvalidStateTransitionException {
|
||||||
|
listener.preTransition(operand, currentState, event);
|
||||||
|
STATE oldState = currentState;
|
||||||
currentState = StateMachineFactory.this.doTransition
|
currentState = StateMachineFactory.this.doTransition
|
||||||
(operand, currentState, eventType, event);
|
(operand, currentState, eventType, event);
|
||||||
|
listener.postTransition(operand, oldState, currentState, event);
|
||||||
return currentState;
|
return currentState;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.state;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A State Transition Listener.
|
||||||
|
* It exposes a pre and post transition hook called before and
|
||||||
|
* after the transition.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface StateTransitionListener
|
||||||
|
<OPERAND, EVENT, STATE extends Enum<STATE>> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pre Transition Hook. This will be called before transition.
|
||||||
|
* @param op Operand.
|
||||||
|
* @param beforeState State before transition.
|
||||||
|
* @param eventToBeProcessed Incoming Event.
|
||||||
|
*/
|
||||||
|
void preTransition(OPERAND op, STATE beforeState, EVENT eventToBeProcessed);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Post Transition Hook. This will be called after the transition.
|
||||||
|
* @param op Operand.
|
||||||
|
* @param beforeState State before transition.
|
||||||
|
* @param afterState State after transition.
|
||||||
|
* @param processedEvent Processed Event.
|
||||||
|
*/
|
||||||
|
void postTransition(OPERAND op, STATE beforeState, STATE afterState,
|
||||||
|
EVENT processedEvent);
|
||||||
|
}
|
|
@ -966,6 +966,12 @@
|
||||||
<!--<value>org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor</value>-->
|
<!--<value>org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor</value>-->
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Comma separated List of container state transition listeners.</description>
|
||||||
|
<name>yarn.nodemanager.container-state-transition-listener.classes</name>
|
||||||
|
<value></value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Number of threads container manager uses.</description>
|
<description>Number of threads container manager uses.</description>
|
||||||
<name>yarn.nodemanager.container-manager.thread-count</name>
|
<name>yarn.nodemanager.container-manager.thread-count</name>
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.state.StateTransitionListener;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface to be used by external cluster operators to implement a
|
||||||
|
* State Transition listener that is notified before and after a container
|
||||||
|
* state transition.
|
||||||
|
* NOTE: The pre and post transition callbacks will be made in the synchronized
|
||||||
|
* block as the call to the instrumented transition - Serially, in the
|
||||||
|
* order: preTransition, transition and postTransition. The implementor
|
||||||
|
* must ensure that the callbacks return in a timely manner to avoid
|
||||||
|
* blocking the state-machine.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface ContainerStateTransitionListener extends
|
||||||
|
StateTransitionListener<ContainerImpl, ContainerEvent, ContainerState> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Init method which will be invoked by the Node Manager to inject the
|
||||||
|
* NM {@link Context}.
|
||||||
|
* @param context NM Context.
|
||||||
|
*/
|
||||||
|
void init(Context context);
|
||||||
|
}
|
|
@ -100,4 +100,6 @@ public interface Context {
|
||||||
OpportunisticContainerAllocator getContainerAllocator();
|
OpportunisticContainerAllocator getContainerAllocator();
|
||||||
|
|
||||||
ContainerExecutor getContainerExecutor();
|
ContainerExecutor getContainerExecutor();
|
||||||
|
|
||||||
|
ContainerStateTransitionListener getContainerStateTransitionListener();
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,12 +20,18 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.state.MultiStateTransitionListener;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -124,6 +130,17 @@ public class NodeManager extends CompositeService
|
||||||
private boolean rmWorkPreservingRestartEnabled;
|
private boolean rmWorkPreservingRestartEnabled;
|
||||||
private boolean shouldExitOnShutdownEvent = false;
|
private boolean shouldExitOnShutdownEvent = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default Container State transition listener.
|
||||||
|
*/
|
||||||
|
public static class DefaultContainerStateListener extends
|
||||||
|
MultiStateTransitionListener
|
||||||
|
<ContainerImpl, ContainerEvent, ContainerState>
|
||||||
|
implements ContainerStateTransitionListener {
|
||||||
|
@Override
|
||||||
|
public void init(Context context) {}
|
||||||
|
}
|
||||||
|
|
||||||
public NodeManager() {
|
public NodeManager() {
|
||||||
super(NodeManager.class.getName());
|
super(NodeManager.class.getName());
|
||||||
}
|
}
|
||||||
|
@ -209,8 +226,22 @@ public class NodeManager extends CompositeService
|
||||||
NMTokenSecretManagerInNM nmTokenSecretManager,
|
NMTokenSecretManagerInNM nmTokenSecretManager,
|
||||||
NMStateStoreService stateStore, boolean isDistSchedulerEnabled,
|
NMStateStoreService stateStore, boolean isDistSchedulerEnabled,
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
|
List<ContainerStateTransitionListener> listeners =
|
||||||
dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled, conf);
|
conf.getInstances(
|
||||||
|
YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
|
||||||
|
ContainerStateTransitionListener.class);
|
||||||
|
NMContext nmContext = new NMContext(containerTokenSecretManager,
|
||||||
|
nmTokenSecretManager, dirsHandler, aclsManager, stateStore,
|
||||||
|
isDistSchedulerEnabled, conf);
|
||||||
|
DefaultContainerStateListener defaultListener =
|
||||||
|
new DefaultContainerStateListener();
|
||||||
|
nmContext.setContainerStateTransitionListener(defaultListener);
|
||||||
|
defaultListener.init(nmContext);
|
||||||
|
for (ContainerStateTransitionListener listener : listeners) {
|
||||||
|
listener.init(nmContext);
|
||||||
|
defaultListener.addListener(listener);
|
||||||
|
}
|
||||||
|
return nmContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void doSecureLogin() throws IOException {
|
protected void doSecureLogin() throws IOException {
|
||||||
|
@ -506,6 +537,8 @@ public class NodeManager extends CompositeService
|
||||||
|
|
||||||
private ContainerExecutor executor;
|
private ContainerExecutor executor;
|
||||||
|
|
||||||
|
private ContainerStateTransitionListener containerStateTransitionListener;
|
||||||
|
|
||||||
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
|
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
|
||||||
NMTokenSecretManagerInNM nmTokenSecretManager,
|
NMTokenSecretManagerInNM nmTokenSecretManager,
|
||||||
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
|
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
|
||||||
|
@ -671,6 +704,17 @@ public class NodeManager extends CompositeService
|
||||||
public void setContainerExecutor(ContainerExecutor executor) {
|
public void setContainerExecutor(ContainerExecutor executor) {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ContainerStateTransitionListener
|
||||||
|
getContainerStateTransitionListener() {
|
||||||
|
return this.containerStateTransitionListener;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContainerStateTransitionListener(
|
||||||
|
ContainerStateTransitionListener transitionListener) {
|
||||||
|
this.containerStateTransitionListener = transitionListener;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -228,7 +228,8 @@ public class ContainerImpl implements Container {
|
||||||
this.containerRetryContext = configureRetryContext(
|
this.containerRetryContext = configureRetryContext(
|
||||||
conf, launchContext, this.containerId);
|
conf, launchContext, this.containerId);
|
||||||
this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
|
this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
|
||||||
stateMachine = stateMachineFactory.make(this);
|
stateMachine = stateMachineFactory.make(this, ContainerState.NEW,
|
||||||
|
context.getContainerStateTransitionListener());
|
||||||
this.resourceSet = new ResourceSet();
|
this.resourceSet = new ResourceSet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,9 @@ import java.io.IOException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -57,6 +60,71 @@ public class TestNodeManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static int initCalls = 0;
|
||||||
|
private static int preCalls = 0;
|
||||||
|
private static int postCalls = 0;
|
||||||
|
|
||||||
|
private static class DummyCSTListener1
|
||||||
|
implements ContainerStateTransitionListener {
|
||||||
|
@Override
|
||||||
|
public void init(Context context) {
|
||||||
|
initCalls++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preTransition(ContainerImpl op, ContainerState beforeState,
|
||||||
|
ContainerEvent eventToBeProcessed) {
|
||||||
|
preCalls++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postTransition(ContainerImpl op, ContainerState beforeState,
|
||||||
|
ContainerState afterState, ContainerEvent processedEvent) {
|
||||||
|
postCalls++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class DummyCSTListener2
|
||||||
|
implements ContainerStateTransitionListener {
|
||||||
|
@Override
|
||||||
|
public void init(Context context) {
|
||||||
|
initCalls++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preTransition(ContainerImpl op, ContainerState beforeState,
|
||||||
|
ContainerEvent eventToBeProcessed) {
|
||||||
|
preCalls++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postTransition(ContainerImpl op, ContainerState beforeState,
|
||||||
|
ContainerState afterState, ContainerEvent processedEvent) {
|
||||||
|
postCalls++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListenerInitialization() throws Exception{
|
||||||
|
NodeManager nodeManager = new NodeManager();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
|
||||||
|
DummyCSTListener1.class.getName() + ","
|
||||||
|
+ DummyCSTListener2.class.getName());
|
||||||
|
initCalls = 0;
|
||||||
|
preCalls = 0;
|
||||||
|
postCalls = 0;
|
||||||
|
NodeManager.NMContext nmContext =
|
||||||
|
nodeManager.createNMContext(null, null, null, false, conf);
|
||||||
|
Assert.assertEquals(2, initCalls);
|
||||||
|
nmContext.getContainerStateTransitionListener().preTransition(
|
||||||
|
null, null, null);
|
||||||
|
nmContext.getContainerStateTransitionListener().postTransition(
|
||||||
|
null, null, null, null);
|
||||||
|
Assert.assertEquals(2, preCalls);
|
||||||
|
Assert.assertEquals(2, postCalls);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreationOfNodeLabelsProviderService()
|
public void testCreationOfNodeLabelsProviderService()
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
|
|
|
@ -33,6 +33,8 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -744,5 +746,11 @@ public abstract class BaseAMRMProxyTest {
|
||||||
public ContainerExecutor getContainerExecutor() {
|
public ContainerExecutor getContainerExecutor() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ContainerStateTransitionListener
|
||||||
|
getContainerStateTransitionListener() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,7 +71,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
|
@ -287,6 +289,29 @@ public class TestContainer {
|
||||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||||
assertEquals(completed + 1, metrics.getCompletedContainers());
|
assertEquals(completed + 1, metrics.getCompletedContainers());
|
||||||
assertEquals(running, metrics.getRunningContainers());
|
assertEquals(running, metrics.getRunningContainers());
|
||||||
|
|
||||||
|
ContainerEventType e1 = wc.initStateToEvent.get(ContainerState.NEW);
|
||||||
|
ContainerState s2 = wc.eventToFinalState.get(e1);
|
||||||
|
ContainerEventType e2 = wc.initStateToEvent.get(s2);
|
||||||
|
ContainerState s3 = wc.eventToFinalState.get(e2);
|
||||||
|
ContainerEventType e3 = wc.initStateToEvent.get(s3);
|
||||||
|
ContainerState s4 = wc.eventToFinalState.get(e3);
|
||||||
|
ContainerEventType e4 = wc.initStateToEvent.get(s4);
|
||||||
|
ContainerState s5 = wc.eventToFinalState.get(e4);
|
||||||
|
ContainerEventType e5 = wc.initStateToEvent.get(s5);
|
||||||
|
ContainerState s6 = wc.eventToFinalState.get(e5);
|
||||||
|
|
||||||
|
Assert.assertEquals(ContainerState.LOCALIZING, s2);
|
||||||
|
Assert.assertEquals(ContainerState.SCHEDULED, s3);
|
||||||
|
Assert.assertEquals(ContainerState.RUNNING, s4);
|
||||||
|
Assert.assertEquals(ContainerState.EXITED_WITH_SUCCESS, s5);
|
||||||
|
Assert.assertEquals(ContainerState.DONE, s6);
|
||||||
|
|
||||||
|
Assert.assertEquals(ContainerEventType.INIT_CONTAINER, e1);
|
||||||
|
Assert.assertEquals(ContainerEventType.RESOURCE_LOCALIZED, e2);
|
||||||
|
Assert.assertEquals(ContainerEventType.CONTAINER_LAUNCHED, e3);
|
||||||
|
Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, e4);
|
||||||
|
Assert.assertEquals(ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, e5);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
if (wc != null) {
|
if (wc != null) {
|
||||||
|
@ -401,6 +426,10 @@ public class TestContainer {
|
||||||
Assert.assertTrue(
|
Assert.assertTrue(
|
||||||
containerMetrics.finishTime.value() > containerMetrics.startTime
|
containerMetrics.finishTime.value() > containerMetrics.startTime
|
||||||
.value());
|
.value());
|
||||||
|
Assert.assertEquals(ContainerEventType.KILL_CONTAINER,
|
||||||
|
wc.initStateToEvent.get(ContainerState.NEW));
|
||||||
|
Assert.assertEquals(ContainerState.DONE,
|
||||||
|
wc.eventToFinalState.get(ContainerEventType.KILL_CONTAINER));
|
||||||
} finally {
|
} finally {
|
||||||
if (wc != null) {
|
if (wc != null) {
|
||||||
wc.finished();
|
wc.finished();
|
||||||
|
@ -942,6 +971,10 @@ public class TestContainer {
|
||||||
final Map<String, LocalResource> localResources;
|
final Map<String, LocalResource> localResources;
|
||||||
final Map<String, ByteBuffer> serviceData;
|
final Map<String, ByteBuffer> serviceData;
|
||||||
final Context context = mock(Context.class);
|
final Context context = mock(Context.class);
|
||||||
|
private final Map<ContainerState, ContainerEventType> initStateToEvent =
|
||||||
|
new HashMap<>();
|
||||||
|
private final Map<ContainerEventType, ContainerState> eventToFinalState =
|
||||||
|
new HashMap<>();
|
||||||
|
|
||||||
WrappedContainer(int appId, long timestamp, int id, String user)
|
WrappedContainer(int appId, long timestamp, int id, String user)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -1048,7 +1081,27 @@ public class TestContainer {
|
||||||
}
|
}
|
||||||
when(ctxt.getServiceData()).thenReturn(serviceData);
|
when(ctxt.getServiceData()).thenReturn(serviceData);
|
||||||
when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext);
|
when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext);
|
||||||
|
ContainerStateTransitionListener listener =
|
||||||
|
new ContainerStateTransitionListener() {
|
||||||
|
@Override
|
||||||
|
public void init(Context cntxt) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preTransition(ContainerImpl op, ContainerState beforeState,
|
||||||
|
ContainerEvent eventToBeProcessed) {
|
||||||
|
initStateToEvent.put(beforeState, eventToBeProcessed.getType());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postTransition(ContainerImpl op, ContainerState beforeState,
|
||||||
|
ContainerState afterState, ContainerEvent processedEvent) {
|
||||||
|
eventToFinalState.put(processedEvent.getType(), afterState);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
NodeManager.DefaultContainerStateListener multi =
|
||||||
|
new NodeManager.DefaultContainerStateListener();
|
||||||
|
multi.addListener(listener);
|
||||||
|
when(context.getContainerStateTransitionListener()).thenReturn(multi);
|
||||||
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier,
|
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier,
|
||||||
context);
|
context);
|
||||||
dispatcher.register(ContainerEventType.class,
|
dispatcher.register(ContainerEventType.class,
|
||||||
|
|
Loading…
Reference in New Issue