YARN-906. Fixed a bug in NodeManager where cancelling ContainerLaunch at KILLING state causes that the container to hang. Contributed by Zhijie Shen.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1509924 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e1d398b2c9
commit
ac933234ac
|
@ -57,6 +57,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||||
YARN-903. Changed ContainerManager to suppress unnecessary warnings when
|
YARN-903. Changed ContainerManager to suppress unnecessary warnings when
|
||||||
stopping already stopped containers. (Omkar Vinit Joshi via vinodkv)
|
stopping already stopped containers. (Omkar Vinit Joshi via vinodkv)
|
||||||
|
|
||||||
|
YARN-906. Fixed a bug in NodeManager where cancelling ContainerLaunch at
|
||||||
|
KILLING state causes that the container to hang. (Zhijie Shen via vinodkv)
|
||||||
|
|
||||||
Release 2.1.0-beta - 2013-08-06
|
Release 2.1.0-beta - 2013-08-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
@ -319,7 +320,7 @@ public class TestNMClient {
|
||||||
if (++i < size) {
|
if (++i < size) {
|
||||||
// NodeManager may still need some time to make the container started
|
// NodeManager may still need some time to make the container started
|
||||||
testGetContainerStatus(container, i, ContainerState.RUNNING, "",
|
testGetContainerStatus(container, i, ContainerState.RUNNING, "",
|
||||||
-1000);
|
Arrays.asList(new Integer[] {-1000}));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
nmClient.stopContainer(container.getId(), container.getNodeId());
|
nmClient.stopContainer(container.getId(), container.getNodeId());
|
||||||
|
@ -330,8 +331,21 @@ public class TestNMClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
// getContainerStatus can be called after stopContainer
|
// getContainerStatus can be called after stopContainer
|
||||||
testGetContainerStatus(container, i, ContainerState.COMPLETE,
|
try {
|
||||||
"Container killed by the ApplicationMaster.", 143);
|
// O is possible if CLEANUP_CONTAINER is executed too late
|
||||||
|
testGetContainerStatus(container, i, ContainerState.COMPLETE,
|
||||||
|
"Container killed by the ApplicationMaster.", Arrays.asList(
|
||||||
|
new Integer[] {143, 0}));
|
||||||
|
} catch (YarnException e) {
|
||||||
|
// The exception is possible because, after the container is stopped,
|
||||||
|
// it may be removed from NM's context.
|
||||||
|
if (!e.getMessage()
|
||||||
|
.contains("was recently stopped on node manager")) {
|
||||||
|
throw (AssertionError)
|
||||||
|
(new AssertionError("Exception is not expected: " + e).initCause(
|
||||||
|
e));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -345,7 +359,7 @@ public class TestNMClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testGetContainerStatus(Container container, int index,
|
private void testGetContainerStatus(Container container, int index,
|
||||||
ContainerState state, String diagnostics, int exitStatus)
|
ContainerState state, String diagnostics, List<Integer> exitStatuses)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
|
@ -357,7 +371,7 @@ public class TestNMClient {
|
||||||
assertEquals(container.getId(), status.getContainerId());
|
assertEquals(container.getId(), status.getContainerId());
|
||||||
assertTrue("" + index + ": " + status.getDiagnostics(),
|
assertTrue("" + index + ": " + status.getDiagnostics(),
|
||||||
status.getDiagnostics().contains(diagnostics));
|
status.getDiagnostics().contains(diagnostics));
|
||||||
assertEquals(exitStatus, status.getExitStatus());
|
assertTrue(exitStatuses.contains(status.getExitStatus()));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
|
@ -37,12 +37,17 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -61,7 +66,8 @@ public class ContainersLauncher extends AbstractService
|
||||||
private final Dispatcher dispatcher;
|
private final Dispatcher dispatcher;
|
||||||
|
|
||||||
private LocalDirsHandlerService dirsHandler;
|
private LocalDirsHandlerService dirsHandler;
|
||||||
private final ExecutorService containerLauncher =
|
@VisibleForTesting
|
||||||
|
public ExecutorService containerLauncher =
|
||||||
Executors.newCachedThreadPool(
|
Executors.newCachedThreadPool(
|
||||||
new ThreadFactoryBuilder()
|
new ThreadFactoryBuilder()
|
||||||
.setNameFormat("ContainersLauncher #%d")
|
.setNameFormat("ContainersLauncher #%d")
|
||||||
|
@ -107,6 +113,7 @@ public class ContainersLauncher extends AbstractService
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public void handle(ContainersLauncherEvent event) {
|
public void handle(ContainersLauncherEvent event) {
|
||||||
// TODO: ContainersLauncher launches containers one by one!!
|
// TODO: ContainersLauncher launches containers one by one!!
|
||||||
|
@ -134,9 +141,18 @@ public class ContainersLauncher extends AbstractService
|
||||||
Future<Integer> rContainer = rContainerDatum.runningcontainer;
|
Future<Integer> rContainer = rContainerDatum.runningcontainer;
|
||||||
if (rContainer != null
|
if (rContainer != null
|
||||||
&& !rContainer.isDone()) {
|
&& !rContainer.isDone()) {
|
||||||
// Cancel the future so that it won't be launched
|
// Cancel the future so that it won't be launched if it isn't already.
|
||||||
// if it isn't already.
|
// If it is going to be canceled, make sure CONTAINER_KILLED_ON_REQUEST
|
||||||
rContainer.cancel(false);
|
// will not be missed if the container is already at KILLING
|
||||||
|
if (rContainer.cancel(false)) {
|
||||||
|
if (container.getContainerState() == ContainerState.KILLING) {
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
|
new ContainerExitEvent(containerId,
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||||
|
ExitCode.TERMINATED.getExitCode(),
|
||||||
|
"Container terminated before launch."));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup a container whether it is running/killed/completed, so that
|
// Cleanup a container whether it is running/killed/completed, so that
|
||||||
|
|
|
@ -18,9 +18,10 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.reset;
|
import static org.mockito.Mockito.reset;
|
||||||
|
@ -42,11 +43,16 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
@ -60,10 +66,13 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
|
||||||
|
@ -287,7 +296,8 @@ public class TestContainer {
|
||||||
wc.launchContainer();
|
wc.launchContainer();
|
||||||
reset(wc.localizerBus);
|
reset(wc.localizerBus);
|
||||||
wc.killContainer();
|
wc.killContainer();
|
||||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
wc.c.getContainerState());
|
||||||
assertNull(wc.c.getLocalizedResources());
|
assertNull(wc.c.getLocalizedResources());
|
||||||
wc.containerKilledOnRequest();
|
wc.containerKilledOnRequest();
|
||||||
|
|
||||||
|
@ -318,6 +328,26 @@ public class TestContainer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKillOnLocalized() throws Exception {
|
||||||
|
WrappedContainer wc = null;
|
||||||
|
try {
|
||||||
|
wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
|
||||||
|
wc.initContainer();
|
||||||
|
wc.localizeResources();
|
||||||
|
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
|
||||||
|
wc.killContainer();
|
||||||
|
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
|
verifyCleanupCall(wc);
|
||||||
|
} finally {
|
||||||
|
if (wc != null) {
|
||||||
|
wc.finished();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testResourceLocalizedOnLocalizationFailed() throws Exception {
|
public void testResourceLocalizedOnLocalizationFailed() throws Exception {
|
||||||
|
@ -442,10 +472,12 @@ public class TestContainer {
|
||||||
wc.initContainer();
|
wc.initContainer();
|
||||||
wc.localizeResources();
|
wc.localizeResources();
|
||||||
wc.killContainer();
|
wc.killContainer();
|
||||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
wc.c.getContainerState());
|
||||||
assertNull(wc.c.getLocalizedResources());
|
assertNull(wc.c.getLocalizedResources());
|
||||||
wc.launchContainer();
|
wc.launchContainer();
|
||||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
wc.c.getContainerState());
|
||||||
assertNull(wc.c.getLocalizedResources());
|
assertNull(wc.c.getLocalizedResources());
|
||||||
wc.containerKilledOnRequest();
|
wc.containerKilledOnRequest();
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
|
@ -583,6 +615,7 @@ public class TestContainer {
|
||||||
final EventHandler<AuxServicesEvent> auxBus;
|
final EventHandler<AuxServicesEvent> auxBus;
|
||||||
final EventHandler<ApplicationEvent> appBus;
|
final EventHandler<ApplicationEvent> appBus;
|
||||||
final EventHandler<LogHandlerEvent> LogBus;
|
final EventHandler<LogHandlerEvent> LogBus;
|
||||||
|
final ContainersLauncher launcher;
|
||||||
|
|
||||||
final ContainerLaunchContext ctxt;
|
final ContainerLaunchContext ctxt;
|
||||||
final ContainerId cId;
|
final ContainerId cId;
|
||||||
|
@ -595,6 +628,7 @@ public class TestContainer {
|
||||||
this(appId, timestamp, id, user, true, false);
|
this(appId, timestamp, id, user, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
WrappedContainer(int appId, long timestamp, int id, String user,
|
WrappedContainer(int appId, long timestamp, int id, String user,
|
||||||
boolean withLocalRes, boolean withServiceData) throws IOException {
|
boolean withLocalRes, boolean withServiceData) throws IOException {
|
||||||
dispatcher = new DrainDispatcher();
|
dispatcher = new DrainDispatcher();
|
||||||
|
@ -613,6 +647,22 @@ public class TestContainer {
|
||||||
dispatcher.register(ApplicationEventType.class, appBus);
|
dispatcher.register(ApplicationEventType.class, appBus);
|
||||||
dispatcher.register(LogHandlerEventType.class, LogBus);
|
dispatcher.register(LogHandlerEventType.class, LogBus);
|
||||||
|
|
||||||
|
Context context = mock(Context.class);
|
||||||
|
when(context.getApplications()).thenReturn(
|
||||||
|
new ConcurrentHashMap<ApplicationId, Application>());
|
||||||
|
launcher = new ContainersLauncher(context, dispatcher, null, null);
|
||||||
|
// create a mock ExecutorService, which will not really launch
|
||||||
|
// ContainerLaunch at all.
|
||||||
|
launcher.containerLauncher = mock(ExecutorService.class);
|
||||||
|
Future future = mock(Future.class);
|
||||||
|
when(launcher.containerLauncher.submit
|
||||||
|
(any(Callable.class))).thenReturn(future);
|
||||||
|
when(future.isDone()).thenReturn(false);
|
||||||
|
when(future.cancel(false)).thenReturn(true);
|
||||||
|
launcher.init(new Configuration());
|
||||||
|
launcher.start();
|
||||||
|
dispatcher.register(ContainersLauncherEventType.class, launcher);
|
||||||
|
|
||||||
ctxt = mock(ContainerLaunchContext.class);
|
ctxt = mock(ContainerLaunchContext.class);
|
||||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||||
|
@ -654,6 +704,13 @@ public class TestContainer {
|
||||||
when(ctxt.getServiceData()).thenReturn(serviceData);
|
when(ctxt.getServiceData()).thenReturn(serviceData);
|
||||||
|
|
||||||
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier);
|
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier);
|
||||||
|
dispatcher.register(ContainerEventType.class,
|
||||||
|
new EventHandler<ContainerEvent>() {
|
||||||
|
@Override
|
||||||
|
public void handle(ContainerEvent event) {
|
||||||
|
c.handle(event);
|
||||||
|
}
|
||||||
|
});
|
||||||
dispatcher.start();
|
dispatcher.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -596,8 +596,9 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||||
ContainerStatus containerStatus =
|
ContainerStatus containerStatus =
|
||||||
containerManager.getContainerStatuses(gcsRequest)
|
containerManager.getContainerStatuses(gcsRequest)
|
||||||
.getContainerStatuses().get(0);
|
.getContainerStatuses().get(0);
|
||||||
Assert.assertEquals(ExitCode.FORCE_KILLED.getExitCode(),
|
int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
|
||||||
containerStatus.getExitStatus());
|
ExitCode.TERMINATED.getExitCode();
|
||||||
|
Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
|
||||||
|
|
||||||
// Now verify the contents of the file. Script generates a message when it
|
// Now verify the contents of the file. Script generates a message when it
|
||||||
// receives a sigterm so we look for that. We cannot perform this check on
|
// receives a sigterm so we look for that. We cannot perform this check on
|
||||||
|
|
Loading…
Reference in New Issue