|
|
|
@ -27,10 +27,15 @@ import static org.junit.Assert.fail;
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.HashMap;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.TreeSet;
|
|
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
|
|
import org.apache.hadoop.security.Credentials;
|
|
|
|
@ -67,6 +72,10 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
|
|
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
|
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
|
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
|
|
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
|
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
|
@ -86,10 +95,77 @@ public class TestNMClient {
|
|
|
|
|
int nodeCount = 3;
|
|
|
|
|
NMTokenCache nmTokenCache = null;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Container State transition listener to track the number of times
|
|
|
|
|
* a container has transitioned into a state.
|
|
|
|
|
*/
|
|
|
|
|
public static class DebugSumContainerStateListener
|
|
|
|
|
implements ContainerStateTransitionListener {
|
|
|
|
|
|
|
|
|
|
private static final Log LOG =
|
|
|
|
|
LogFactory.getLog(DebugSumContainerStateListener.class);
|
|
|
|
|
private static final Map<ContainerId,
|
|
|
|
|
Map<org.apache.hadoop.yarn.server.nodemanager.containermanager
|
|
|
|
|
.container.ContainerState, Long>>
|
|
|
|
|
TRANSITION_COUNTER = new HashMap<>();
|
|
|
|
|
|
|
|
|
|
public void init(Context context) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void preTransition(ContainerImpl op,
|
|
|
|
|
org.apache.hadoop.yarn.server.nodemanager
|
|
|
|
|
.containermanager.container.ContainerState
|
|
|
|
|
beforeState,
|
|
|
|
|
ContainerEvent eventToBeProcessed) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void postTransition(
|
|
|
|
|
ContainerImpl op,
|
|
|
|
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
|
|
|
|
.ContainerState beforeState,
|
|
|
|
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
|
|
|
|
.ContainerState afterState,
|
|
|
|
|
ContainerEvent processedEvent) {
|
|
|
|
|
synchronized (TRANSITION_COUNTER) {
|
|
|
|
|
if (beforeState != afterState) {
|
|
|
|
|
ContainerId id = op.getContainerId();
|
|
|
|
|
TRANSITION_COUNTER
|
|
|
|
|
.putIfAbsent(id, new HashMap<>());
|
|
|
|
|
long sum = TRANSITION_COUNTER.get(id)
|
|
|
|
|
.compute(afterState,
|
|
|
|
|
(state, count) -> count == null ? 1 : count + 1);
|
|
|
|
|
LOG.info("***** " + id +
|
|
|
|
|
" Transition from " + beforeState +
|
|
|
|
|
" to " + afterState +
|
|
|
|
|
"sum:" + sum);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the current number of state transitions.
|
|
|
|
|
* This is useful to check, if an event has occurred in unit tests.
|
|
|
|
|
* @param id Container id to check
|
|
|
|
|
* @param state Return the overall number of transitions to this state
|
|
|
|
|
* @return Number of transitions to the state specified
|
|
|
|
|
*/
|
|
|
|
|
static long getTransitionCounter(ContainerId id,
|
|
|
|
|
org.apache.hadoop.yarn.server.nodemanager
|
|
|
|
|
.containermanager.container
|
|
|
|
|
.ContainerState state) {
|
|
|
|
|
Long ret = TRANSITION_COUNTER.getOrDefault(id, new HashMap<>())
|
|
|
|
|
.get(state);
|
|
|
|
|
return ret != null ? ret : 0;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Before
|
|
|
|
|
public void setup() throws YarnException, IOException {
|
|
|
|
|
// start minicluster
|
|
|
|
|
conf = new YarnConfiguration();
|
|
|
|
|
// Turn on state tracking
|
|
|
|
|
conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
|
|
|
|
|
DebugSumContainerStateListener.class.getName());
|
|
|
|
|
yarnCluster =
|
|
|
|
|
new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
|
|
|
|
yarnCluster.init(conf);
|
|
|
|
@ -290,7 +366,7 @@ public class TestNMClient {
|
|
|
|
|
return containers;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void testContainerManagement(NMClientImpl nmClient,
|
|
|
|
|
private void testContainerManagement(NMClientImpl client,
|
|
|
|
|
Set<Container> containers) throws YarnException, IOException {
|
|
|
|
|
int size = containers.size();
|
|
|
|
|
int i = 0;
|
|
|
|
@ -298,7 +374,7 @@ public class TestNMClient {
|
|
|
|
|
// getContainerStatus shouldn't be called before startContainer,
|
|
|
|
|
// otherwise, NodeManager cannot find the container
|
|
|
|
|
try {
|
|
|
|
|
nmClient.getContainerStatus(container.getId(), container.getNodeId());
|
|
|
|
|
client.getContainerStatus(container.getId(), container.getNodeId());
|
|
|
|
|
fail("Exception is expected");
|
|
|
|
|
} catch (YarnException e) {
|
|
|
|
|
assertTrue("The thrown exception is not expected",
|
|
|
|
@ -307,7 +383,7 @@ public class TestNMClient {
|
|
|
|
|
// upadateContainerResource shouldn't be called before startContainer,
|
|
|
|
|
// otherwise, NodeManager cannot find the container
|
|
|
|
|
try {
|
|
|
|
|
nmClient.updateContainerResource(container);
|
|
|
|
|
client.updateContainerResource(container);
|
|
|
|
|
fail("Exception is expected");
|
|
|
|
|
} catch (YarnException e) {
|
|
|
|
|
assertTrue("The thrown exception is not expected",
|
|
|
|
@ -317,7 +393,7 @@ public class TestNMClient {
|
|
|
|
|
// restart shouldn't be called before startContainer,
|
|
|
|
|
// otherwise, NodeManager cannot find the container
|
|
|
|
|
try {
|
|
|
|
|
nmClient.restartContainer(container.getId());
|
|
|
|
|
client.restartContainer(container.getId());
|
|
|
|
|
fail("Exception is expected");
|
|
|
|
|
} catch (YarnException e) {
|
|
|
|
|
assertTrue("The thrown exception is not expected",
|
|
|
|
@ -327,7 +403,7 @@ public class TestNMClient {
|
|
|
|
|
// rollback shouldn't be called before startContainer,
|
|
|
|
|
// otherwise, NodeManager cannot find the container
|
|
|
|
|
try {
|
|
|
|
|
nmClient.rollbackLastReInitialization(container.getId());
|
|
|
|
|
client.rollbackLastReInitialization(container.getId());
|
|
|
|
|
fail("Exception is expected");
|
|
|
|
|
} catch (YarnException e) {
|
|
|
|
|
assertTrue("The thrown exception is not expected",
|
|
|
|
@ -337,7 +413,7 @@ public class TestNMClient {
|
|
|
|
|
// commit shouldn't be called before startContainer,
|
|
|
|
|
// otherwise, NodeManager cannot find the container
|
|
|
|
|
try {
|
|
|
|
|
nmClient.commitLastReInitialization(container.getId());
|
|
|
|
|
client.commitLastReInitialization(container.getId());
|
|
|
|
|
fail("Exception is expected");
|
|
|
|
|
} catch (YarnException e) {
|
|
|
|
|
assertTrue("The thrown exception is not expected",
|
|
|
|
@ -347,14 +423,12 @@ public class TestNMClient {
|
|
|
|
|
// stopContainer shouldn't be called before startContainer,
|
|
|
|
|
// otherwise, an exception will be thrown
|
|
|
|
|
try {
|
|
|
|
|
nmClient.stopContainer(container.getId(), container.getNodeId());
|
|
|
|
|
client.stopContainer(container.getId(), container.getNodeId());
|
|
|
|
|
fail("Exception is expected");
|
|
|
|
|
} catch (YarnException e) {
|
|
|
|
|
if (!e.getMessage()
|
|
|
|
|
.contains("is not handled by this NodeManager")) {
|
|
|
|
|
throw (AssertionError)
|
|
|
|
|
(new AssertionError("Exception is not expected: " + e).initCause(
|
|
|
|
|
e));
|
|
|
|
|
throw new AssertionError("Exception is not expected: ", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -367,56 +441,84 @@ public class TestNMClient {
|
|
|
|
|
Records.newRecord(ContainerLaunchContext.class);
|
|
|
|
|
if (Shell.WINDOWS) {
|
|
|
|
|
clc.setCommands(
|
|
|
|
|
Arrays.asList("ping", "-n", "100", "127.0.0.1", ">nul"));
|
|
|
|
|
Arrays.asList("ping", "-n", "10000000", "127.0.0.1", ">nul"));
|
|
|
|
|
} else {
|
|
|
|
|
clc.setCommands(Arrays.asList("sleep", "10"));
|
|
|
|
|
clc.setCommands(Arrays.asList("sleep", "1000000"));
|
|
|
|
|
}
|
|
|
|
|
clc.setTokens(securityTokens);
|
|
|
|
|
try {
|
|
|
|
|
nmClient.startContainer(container, clc);
|
|
|
|
|
client.startContainer(container, clc);
|
|
|
|
|
} catch (YarnException e) {
|
|
|
|
|
throw (AssertionError)
|
|
|
|
|
(new AssertionError("Exception is not expected: " + e).initCause(e));
|
|
|
|
|
throw new AssertionError("Exception is not expected ", e);
|
|
|
|
|
}
|
|
|
|
|
List<Integer> exitStatuses = Collections.singletonList(-1000);
|
|
|
|
|
|
|
|
|
|
// leave one container unclosed
|
|
|
|
|
if (++i < size) {
|
|
|
|
|
testContainer(client, i, container, clc, exitStatuses);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void testContainer(NMClientImpl client, int i, Container container,
|
|
|
|
|
ContainerLaunchContext clc, List<Integer> exitCode)
|
|
|
|
|
throws YarnException, IOException {
|
|
|
|
|
// NodeManager may still need some time to make the container started
|
|
|
|
|
testGetContainerStatus(container, i, ContainerState.RUNNING, "",
|
|
|
|
|
Arrays.asList(new Integer[] {-1000}));
|
|
|
|
|
exitCode);
|
|
|
|
|
waitForContainerTransitionCount(container,
|
|
|
|
|
org.apache.hadoop.yarn.server.nodemanager.
|
|
|
|
|
containermanager.container.ContainerState.RUNNING, 1);
|
|
|
|
|
// Test increase container API and make sure requests can reach NM
|
|
|
|
|
testIncreaseContainerResource(container);
|
|
|
|
|
|
|
|
|
|
testRestartContainer(container.getId());
|
|
|
|
|
testGetContainerStatus(container, i, ContainerState.RUNNING,
|
|
|
|
|
"will be Restarted", Arrays.asList(new Integer[] {-1000}));
|
|
|
|
|
"will be Restarted", exitCode);
|
|
|
|
|
waitForContainerTransitionCount(container,
|
|
|
|
|
org.apache.hadoop.yarn.server.nodemanager.
|
|
|
|
|
containermanager.container.ContainerState.RUNNING, 2);
|
|
|
|
|
|
|
|
|
|
if (i % 2 == 0) {
|
|
|
|
|
testReInitializeContainer(container.getId(), clc, false);
|
|
|
|
|
testGetContainerStatus(container, i, ContainerState.RUNNING,
|
|
|
|
|
"will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
|
|
|
|
|
"will be Re-initialized", exitCode);
|
|
|
|
|
waitForContainerTransitionCount(container,
|
|
|
|
|
org.apache.hadoop.yarn.server.nodemanager.
|
|
|
|
|
containermanager.container.ContainerState.RUNNING, 3);
|
|
|
|
|
|
|
|
|
|
testRollbackContainer(container.getId(), false);
|
|
|
|
|
testGetContainerStatus(container, i, ContainerState.RUNNING,
|
|
|
|
|
"will be Rolled-back", Arrays.asList(new Integer[] {-1000}));
|
|
|
|
|
"will be Rolled-back", exitCode);
|
|
|
|
|
waitForContainerTransitionCount(container,
|
|
|
|
|
org.apache.hadoop.yarn.server.nodemanager.
|
|
|
|
|
containermanager.container.ContainerState.RUNNING, 4);
|
|
|
|
|
|
|
|
|
|
testCommitContainer(container.getId(), true);
|
|
|
|
|
testReInitializeContainer(container.getId(), clc, false);
|
|
|
|
|
testGetContainerStatus(container, i, ContainerState.RUNNING,
|
|
|
|
|
"will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
|
|
|
|
|
"will be Re-initialized", exitCode);
|
|
|
|
|
waitForContainerTransitionCount(container,
|
|
|
|
|
org.apache.hadoop.yarn.server.nodemanager.
|
|
|
|
|
containermanager.container.ContainerState.RUNNING, 5);
|
|
|
|
|
testCommitContainer(container.getId(), false);
|
|
|
|
|
} else {
|
|
|
|
|
testReInitializeContainer(container.getId(), clc, true);
|
|
|
|
|
testGetContainerStatus(container, i, ContainerState.RUNNING,
|
|
|
|
|
"will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
|
|
|
|
|
"will be Re-initialized", exitCode);
|
|
|
|
|
waitForContainerTransitionCount(container,
|
|
|
|
|
org.apache.hadoop.yarn.server.nodemanager.
|
|
|
|
|
containermanager.container.ContainerState.RUNNING, 3);
|
|
|
|
|
testRollbackContainer(container.getId(), true);
|
|
|
|
|
testCommitContainer(container.getId(), true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
nmClient.stopContainer(container.getId(), container.getNodeId());
|
|
|
|
|
client.stopContainer(container.getId(), container.getNodeId());
|
|
|
|
|
} catch (YarnException e) {
|
|
|
|
|
throw (AssertionError)
|
|
|
|
|
(new AssertionError("Exception is not expected: " + e)
|
|
|
|
|
.initCause(e));
|
|
|
|
|
(new AssertionError("Exception is not expected: " + e, e));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// getContainerStatus can be called after stopContainer
|
|
|
|
@ -424,22 +526,47 @@ public class TestNMClient {
|
|
|
|
|
// O is possible if CLEANUP_CONTAINER is executed too late
|
|
|
|
|
// -105 is possible if the container is not terminated but killed
|
|
|
|
|
testGetContainerStatus(container, i, ContainerState.COMPLETE,
|
|
|
|
|
"Container killed by the ApplicationMaster.", Arrays.asList(
|
|
|
|
|
new Integer[] {ContainerExitStatus.KILLED_BY_APPMASTER,
|
|
|
|
|
ContainerExitStatus.SUCCESS}));
|
|
|
|
|
"Container killed by the ApplicationMaster.",
|
|
|
|
|
Arrays.asList(
|
|
|
|
|
ContainerExitStatus.KILLED_BY_APPMASTER,
|
|
|
|
|
ContainerExitStatus.SUCCESS));
|
|
|
|
|
} catch (YarnException e) {
|
|
|
|
|
// The exception is possible because, after the container is stopped,
|
|
|
|
|
// it may be removed from NM's context.
|
|
|
|
|
if (!e.getMessage()
|
|
|
|
|
.contains("was recently stopped on node manager")) {
|
|
|
|
|
throw (AssertionError)
|
|
|
|
|
(new AssertionError("Exception is not expected: " + e).initCause(
|
|
|
|
|
e));
|
|
|
|
|
(new AssertionError("Exception is not expected: ", e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Wait until the container reaches a state N times.
|
|
|
|
|
* @param container container to watch
|
|
|
|
|
* @param state state to test
|
|
|
|
|
* @param transitions the number N above
|
|
|
|
|
* @throws YarnException This happens if the test times out while waiting
|
|
|
|
|
*/
|
|
|
|
|
private void waitForContainerTransitionCount(
|
|
|
|
|
Container container,
|
|
|
|
|
org.apache.hadoop.yarn.server.nodemanager.
|
|
|
|
|
containermanager.container.ContainerState state, long transitions)
|
|
|
|
|
throws YarnException {
|
|
|
|
|
long transitionCount = -1;
|
|
|
|
|
do {
|
|
|
|
|
if (transitionCount != -1) {
|
|
|
|
|
try {
|
|
|
|
|
Thread.sleep(10);
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
throw new YarnException(
|
|
|
|
|
"Timeout at transition count:" + transitionCount, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
transitionCount = DebugSumContainerStateListener
|
|
|
|
|
.getTransitionCounter(container.getId(), state);
|
|
|
|
|
} while (transitionCount != transitions);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void sleep(int sleepTime) {
|
|
|
|
|
try {
|
|
|
|
|