YARN-906. Fixed a bug in NodeManager where cancelling ContainerLaunch at KILLING state causes that the container to hang. Contributed by Zhijie Shen.
svn merge --ignore-ancestry -c 1509924 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1509925 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4cd5cb862c
commit
9b05d132bf
|
@ -42,6 +42,9 @@ Release 2.1.1-beta - UNRELEASED
|
|||
YARN-903. Changed ContainerManager to suppress unnecessary warnings when
|
||||
stopping already stopped containers. (Omkar Vinit Joshi via vinodkv)
|
||||
|
||||
YARN-906. Fixed a bug in NodeManager where cancelling ContainerLaunch at
|
||||
KILLING state causes that the container to hang. (Zhijie Shen via vinodkv)
|
||||
|
||||
Release 2.1.0-beta - 2013-08-06
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
@ -319,7 +320,7 @@ public class TestNMClient {
|
|||
if (++i < size) {
|
||||
// NodeManager may still need some time to make the container started
|
||||
testGetContainerStatus(container, i, ContainerState.RUNNING, "",
|
||||
-1000);
|
||||
Arrays.asList(new Integer[] {-1000}));
|
||||
|
||||
try {
|
||||
nmClient.stopContainer(container.getId(), container.getNodeId());
|
||||
|
@ -330,8 +331,21 @@ public class TestNMClient {
|
|||
}
|
||||
|
||||
// getContainerStatus can be called after stopContainer
|
||||
testGetContainerStatus(container, i, ContainerState.COMPLETE,
|
||||
"Container killed by the ApplicationMaster.", 143);
|
||||
try {
|
||||
// O is possible if CLEANUP_CONTAINER is executed too late
|
||||
testGetContainerStatus(container, i, ContainerState.COMPLETE,
|
||||
"Container killed by the ApplicationMaster.", Arrays.asList(
|
||||
new Integer[] {143, 0}));
|
||||
} catch (YarnException e) {
|
||||
// The exception is possible because, after the container is stopped,
|
||||
// it may be removed from NM's context.
|
||||
if (!e.getMessage()
|
||||
.contains("was recently stopped on node manager")) {
|
||||
throw (AssertionError)
|
||||
(new AssertionError("Exception is not expected: " + e).initCause(
|
||||
e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -345,7 +359,7 @@ public class TestNMClient {
|
|||
}
|
||||
|
||||
private void testGetContainerStatus(Container container, int index,
|
||||
ContainerState state, String diagnostics, int exitStatus)
|
||||
ContainerState state, String diagnostics, List<Integer> exitStatuses)
|
||||
throws YarnException, IOException {
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -357,7 +371,7 @@ public class TestNMClient {
|
|||
assertEquals(container.getId(), status.getContainerId());
|
||||
assertTrue("" + index + ": " + status.getDiagnostics(),
|
||||
status.getDiagnostics().contains(diagnostics));
|
||||
assertEquals(exitStatus, status.getExitStatus());
|
||||
assertTrue(exitStatuses.contains(status.getExitStatus()));
|
||||
break;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
|
|
|
@ -37,12 +37,17 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
|
@ -61,7 +66,8 @@ public class ContainersLauncher extends AbstractService
|
|||
private final Dispatcher dispatcher;
|
||||
|
||||
private LocalDirsHandlerService dirsHandler;
|
||||
private final ExecutorService containerLauncher =
|
||||
@VisibleForTesting
|
||||
public ExecutorService containerLauncher =
|
||||
Executors.newCachedThreadPool(
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat("ContainersLauncher #%d")
|
||||
|
@ -107,6 +113,7 @@ public class ContainersLauncher extends AbstractService
|
|||
super.serviceStop();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void handle(ContainersLauncherEvent event) {
|
||||
// TODO: ContainersLauncher launches containers one by one!!
|
||||
|
@ -134,9 +141,18 @@ public class ContainersLauncher extends AbstractService
|
|||
Future<Integer> rContainer = rContainerDatum.runningcontainer;
|
||||
if (rContainer != null
|
||||
&& !rContainer.isDone()) {
|
||||
// Cancel the future so that it won't be launched
|
||||
// if it isn't already.
|
||||
rContainer.cancel(false);
|
||||
// Cancel the future so that it won't be launched if it isn't already.
|
||||
// If it is going to be canceled, make sure CONTAINER_KILLED_ON_REQUEST
|
||||
// will not be missed if the container is already at KILLING
|
||||
if (rContainer.cancel(false)) {
|
||||
if (container.getContainerState() == ContainerState.KILLING) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerExitEvent(containerId,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||
ExitCode.TERMINATED.getExitCode(),
|
||||
"Container terminated before launch."));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup a container whether it is running/killed/completed, so that
|
||||
|
|
|
@ -18,9 +18,10 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.reset;
|
||||
|
@ -42,11 +43,16 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
|
@ -60,10 +66,13 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
|
||||
|
@ -287,7 +296,8 @@ public class TestContainer {
|
|||
wc.launchContainer();
|
||||
reset(wc.localizerBus);
|
||||
wc.killContainer();
|
||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
||||
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||
wc.c.getContainerState());
|
||||
assertNull(wc.c.getLocalizedResources());
|
||||
wc.containerKilledOnRequest();
|
||||
|
||||
|
@ -319,6 +329,26 @@ public class TestContainer {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKillOnLocalized() throws Exception {
|
||||
WrappedContainer wc = null;
|
||||
try {
|
||||
wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
|
||||
wc.initContainer();
|
||||
wc.localizeResources();
|
||||
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
|
||||
wc.killContainer();
|
||||
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||
wc.c.getContainerState());
|
||||
assertNull(wc.c.getLocalizedResources());
|
||||
verifyCleanupCall(wc);
|
||||
} finally {
|
||||
if (wc != null) {
|
||||
wc.finished();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourceLocalizedOnLocalizationFailed() throws Exception {
|
||||
WrappedContainer wc = null;
|
||||
|
@ -442,10 +472,12 @@ public class TestContainer {
|
|||
wc.initContainer();
|
||||
wc.localizeResources();
|
||||
wc.killContainer();
|
||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
||||
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||
wc.c.getContainerState());
|
||||
assertNull(wc.c.getLocalizedResources());
|
||||
wc.launchContainer();
|
||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
||||
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||
wc.c.getContainerState());
|
||||
assertNull(wc.c.getLocalizedResources());
|
||||
wc.containerKilledOnRequest();
|
||||
verifyCleanupCall(wc);
|
||||
|
@ -583,6 +615,7 @@ public class TestContainer {
|
|||
final EventHandler<AuxServicesEvent> auxBus;
|
||||
final EventHandler<ApplicationEvent> appBus;
|
||||
final EventHandler<LogHandlerEvent> LogBus;
|
||||
final ContainersLauncher launcher;
|
||||
|
||||
final ContainerLaunchContext ctxt;
|
||||
final ContainerId cId;
|
||||
|
@ -595,6 +628,7 @@ public class TestContainer {
|
|||
this(appId, timestamp, id, user, true, false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
WrappedContainer(int appId, long timestamp, int id, String user,
|
||||
boolean withLocalRes, boolean withServiceData) throws IOException {
|
||||
dispatcher = new DrainDispatcher();
|
||||
|
@ -613,6 +647,22 @@ public class TestContainer {
|
|||
dispatcher.register(ApplicationEventType.class, appBus);
|
||||
dispatcher.register(LogHandlerEventType.class, LogBus);
|
||||
|
||||
Context context = mock(Context.class);
|
||||
when(context.getApplications()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, Application>());
|
||||
launcher = new ContainersLauncher(context, dispatcher, null, null);
|
||||
// create a mock ExecutorService, which will not really launch
|
||||
// ContainerLaunch at all.
|
||||
launcher.containerLauncher = mock(ExecutorService.class);
|
||||
Future future = mock(Future.class);
|
||||
when(launcher.containerLauncher.submit
|
||||
(any(Callable.class))).thenReturn(future);
|
||||
when(future.isDone()).thenReturn(false);
|
||||
when(future.cancel(false)).thenReturn(true);
|
||||
launcher.init(new Configuration());
|
||||
launcher.start();
|
||||
dispatcher.register(ContainersLauncherEventType.class, launcher);
|
||||
|
||||
ctxt = mock(ContainerLaunchContext.class);
|
||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
|
@ -654,6 +704,13 @@ public class TestContainer {
|
|||
when(ctxt.getServiceData()).thenReturn(serviceData);
|
||||
|
||||
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier);
|
||||
dispatcher.register(ContainerEventType.class,
|
||||
new EventHandler<ContainerEvent>() {
|
||||
@Override
|
||||
public void handle(ContainerEvent event) {
|
||||
c.handle(event);
|
||||
}
|
||||
});
|
||||
dispatcher.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -596,8 +596,9 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
ContainerStatus containerStatus =
|
||||
containerManager.getContainerStatuses(gcsRequest)
|
||||
.getContainerStatuses().get(0);
|
||||
Assert.assertEquals(ExitCode.FORCE_KILLED.getExitCode(),
|
||||
containerStatus.getExitStatus());
|
||||
int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
|
||||
ExitCode.TERMINATED.getExitCode();
|
||||
Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
|
||||
|
||||
// Now verify the contents of the file. Script generates a message when it
|
||||
// receives a sigterm so we look for that. We cannot perform this check on
|
||||
|
|
Loading…
Reference in New Issue