YARN-966. Fixed ContainerLaunch to not fail quietly when there are no localized resources due to some other failure. Contributed by Zhijie Shen.
svn merge --ignore-ancestry -c 1508688 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1508689 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dad1d79a05
commit
4120e3b4ef
|
@ -27,6 +27,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||||
YARN-948. Changed ResourceManager to validate the release container list
|
YARN-948. Changed ResourceManager to validate the release container list
|
||||||
before actually releasing them. (Omkar Vinit Joshi via vinodkv)
|
before actually releasing them. (Omkar Vinit Joshi via vinodkv)
|
||||||
|
|
||||||
|
YARN-966. Fixed ContainerLaunch to not fail quietly when there are no
|
||||||
|
localized resources due to some other failure. (Zhijie Shen via vinodkv)
|
||||||
|
|
||||||
Release 2.1.0-beta - 2013-08-06
|
Release 2.1.0-beta - 2013-08-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -335,8 +335,11 @@ public class ContainerImpl implements Container {
|
||||||
public Map<Path,List<String>> getLocalizedResources() {
|
public Map<Path,List<String>> getLocalizedResources() {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
try {
|
try {
|
||||||
assert ContainerState.LOCALIZED == getContainerState(); // TODO: FIXME!!
|
if (ContainerState.LOCALIZED == getContainerState()) {
|
||||||
return localizedResources;
|
return localizedResources;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.readLock.unlock();
|
this.readLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||||
|
@ -120,14 +121,20 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||||
public Integer call() {
|
public Integer call() {
|
||||||
final ContainerLaunchContext launchContext = container.getLaunchContext();
|
final ContainerLaunchContext launchContext = container.getLaunchContext();
|
||||||
final Map<Path,List<String>> localResources =
|
Map<Path,List<String>> localResources = null;
|
||||||
container.getLocalizedResources();
|
|
||||||
ContainerId containerID = container.getContainerId();
|
ContainerId containerID = container.getContainerId();
|
||||||
String containerIdStr = ConverterUtils.toString(containerID);
|
String containerIdStr = ConverterUtils.toString(containerID);
|
||||||
final List<String> command = launchContext.getCommands();
|
final List<String> command = launchContext.getCommands();
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
localResources = container.getLocalizedResources();
|
||||||
|
if (localResources == null) {
|
||||||
|
RPCUtil.getRemoteException(
|
||||||
|
"Unable to get local resources when Container " + containerID +
|
||||||
|
" is at " + container.getContainerState());
|
||||||
|
}
|
||||||
|
|
||||||
final String user = container.getUser();
|
final String user = container.getUser();
|
||||||
// /////////////////////////// Variable expansion
|
// /////////////////////////// Variable expansion
|
||||||
// Before the container script gets written out.
|
// Before the container script gets written out.
|
||||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.reset;
|
import static org.mockito.Mockito.reset;
|
||||||
|
@ -124,6 +126,7 @@ public class TestContainer {
|
||||||
|
|
||||||
// all resources should be localized
|
// all resources should be localized
|
||||||
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
|
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
|
||||||
|
assertNotNull(wc.c.getLocalizedResources());
|
||||||
for (Entry<Path, List<String>> loc : wc.c.getLocalizedResources()
|
for (Entry<Path, List<String>> loc : wc.c.getLocalizedResources()
|
||||||
.entrySet()) {
|
.entrySet()) {
|
||||||
assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
|
assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
|
||||||
|
@ -161,6 +164,7 @@ public class TestContainer {
|
||||||
wc.containerKilledOnRequest();
|
wc.containerKilledOnRequest();
|
||||||
assertEquals(ContainerState.EXITED_WITH_FAILURE,
|
assertEquals(ContainerState.EXITED_WITH_FAILURE,
|
||||||
wc.c.getContainerState());
|
wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
@ -183,6 +187,7 @@ public class TestContainer {
|
||||||
wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
|
wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
|
||||||
assertEquals(ContainerState.EXITED_WITH_FAILURE,
|
assertEquals(ContainerState.EXITED_WITH_FAILURE,
|
||||||
wc.c.getContainerState());
|
wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
@ -205,7 +210,7 @@ public class TestContainer {
|
||||||
wc.containerSuccessful();
|
wc.containerSuccessful();
|
||||||
assertEquals(ContainerState.EXITED_WITH_SUCCESS,
|
assertEquals(ContainerState.EXITED_WITH_SUCCESS,
|
||||||
wc.c.getContainerState());
|
wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
@ -228,10 +233,12 @@ public class TestContainer {
|
||||||
wc.containerSuccessful();
|
wc.containerSuccessful();
|
||||||
wc.containerResourcesCleanup();
|
wc.containerResourcesCleanup();
|
||||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
// Now in DONE, issue INIT
|
// Now in DONE, issue INIT
|
||||||
wc.initContainer();
|
wc.initContainer();
|
||||||
// Verify still in DONE
|
// Verify still in DONE
|
||||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
@ -255,10 +262,12 @@ public class TestContainer {
|
||||||
wc.containerSuccessful();
|
wc.containerSuccessful();
|
||||||
wc.containerResourcesCleanup();
|
wc.containerResourcesCleanup();
|
||||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
// Now in DONE, issue RESOURCE_FAILED as done by LocalizeRunner
|
// Now in DONE, issue RESOURCE_FAILED as done by LocalizeRunner
|
||||||
wc.resourceFailedContainer();
|
wc.resourceFailedContainer();
|
||||||
// Verify still in DONE
|
// Verify still in DONE
|
||||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
} finally {
|
} finally {
|
||||||
if (wc != null) {
|
if (wc != null) {
|
||||||
|
@ -279,6 +288,7 @@ public class TestContainer {
|
||||||
reset(wc.localizerBus);
|
reset(wc.localizerBus);
|
||||||
wc.killContainer();
|
wc.killContainer();
|
||||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
wc.containerKilledOnRequest();
|
wc.containerKilledOnRequest();
|
||||||
|
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
|
@ -297,8 +307,10 @@ public class TestContainer {
|
||||||
wc.initContainer();
|
wc.initContainer();
|
||||||
wc.failLocalizeResources(wc.getLocalResourceCount());
|
wc.failLocalizeResources(wc.getLocalResourceCount());
|
||||||
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
wc.killContainer();
|
wc.killContainer();
|
||||||
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
} finally {
|
} finally {
|
||||||
if (wc != null) {
|
if (wc != null) {
|
||||||
|
@ -319,8 +331,10 @@ public class TestContainer {
|
||||||
}
|
}
|
||||||
wc.failLocalizeResources(failCount);
|
wc.failLocalizeResources(failCount);
|
||||||
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
wc.localizeResourcesFromInvalidState(failCount);
|
wc.localizeResourcesFromInvalidState(failCount);
|
||||||
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
Assert.assertTrue(wc.getDiagnostics().contains(FAKE_LOCALIZATION_ERROR));
|
Assert.assertTrue(wc.getDiagnostics().contains(FAKE_LOCALIZATION_ERROR));
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -342,8 +356,10 @@ public class TestContainer {
|
||||||
String key2 = lRsrcKeys.next();
|
String key2 = lRsrcKeys.next();
|
||||||
wc.failLocalizeSpecificResource(key1);
|
wc.failLocalizeSpecificResource(key1);
|
||||||
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
wc.failLocalizeSpecificResource(key2);
|
wc.failLocalizeSpecificResource(key2);
|
||||||
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
} finally {
|
} finally {
|
||||||
if (wc != null) {
|
if (wc != null) {
|
||||||
|
@ -363,8 +379,10 @@ public class TestContainer {
|
||||||
String key1 = lRsrcKeys.next();
|
String key1 = lRsrcKeys.next();
|
||||||
wc.killContainer();
|
wc.killContainer();
|
||||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
wc.failLocalizeSpecificResource(key1);
|
wc.failLocalizeSpecificResource(key1);
|
||||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
} finally {
|
} finally {
|
||||||
if (wc != null) {
|
if (wc != null) {
|
||||||
|
@ -425,8 +443,10 @@ public class TestContainer {
|
||||||
wc.localizeResources();
|
wc.localizeResources();
|
||||||
wc.killContainer();
|
wc.killContainer();
|
||||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
wc.launchContainer();
|
wc.launchContainer();
|
||||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
||||||
|
assertNull(wc.c.getLocalizedResources());
|
||||||
wc.containerKilledOnRequest();
|
wc.containerKilledOnRequest();
|
||||||
verifyCleanupCall(wc);
|
verifyCleanupCall(wc);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -28,12 +30,14 @@ import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
@ -59,10 +63,16 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
@ -617,6 +627,32 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
@Test
|
||||||
|
public void testCallFailureWithNullLocalizedResources() {
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
when(container.getContainerId()).thenReturn(ContainerId.newInstance(
|
||||||
|
ApplicationAttemptId.newInstance(ApplicationId.newInstance(
|
||||||
|
System.currentTimeMillis(), 1), 1), 1));
|
||||||
|
ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
|
||||||
|
when(clc.getCommands()).thenReturn(Collections.<String>emptyList());
|
||||||
|
when(container.getLaunchContext()).thenReturn(clc);
|
||||||
|
when(container.getLocalizedResources()).thenReturn(null);
|
||||||
|
Dispatcher dispatcher = mock(Dispatcher.class);
|
||||||
|
EventHandler eventHandler = new EventHandler() {
|
||||||
|
public void handle(Event event) {
|
||||||
|
Assert.assertTrue(event instanceof ContainerExitEvent);
|
||||||
|
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
|
||||||
|
Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||||
|
exitEvent.getType());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
when(dispatcher.getEventHandler()).thenReturn(eventHandler);
|
||||||
|
ContainerLaunch launch = new ContainerLaunch(context, new Configuration(),
|
||||||
|
dispatcher, exec, null, container, dirsHandler);
|
||||||
|
launch.call();
|
||||||
|
}
|
||||||
|
|
||||||
protected Token createContainerToken(ContainerId cId) throws InvalidToken {
|
protected Token createContainerToken(ContainerId cId) throws InvalidToken {
|
||||||
Resource r = BuilderUtils.newResource(1024, 1);
|
Resource r = BuilderUtils.newResource(1024, 1);
|
||||||
ContainerTokenIdentifier containerTokenIdentifier =
|
ContainerTokenIdentifier containerTokenIdentifier =
|
||||||
|
|
Loading…
Reference in New Issue