YARN-857. Localization failures should be available in container diagnostics. Contributed by Vinod Kumar Vavilapalli.

(cherry picked from commit f440a9d8c4a177bc5062d21d4b4bc4d9b2944344)
(cherry picked from commit 36f2ae0692)
This commit is contained in:
Varun Vasudev 2016-05-25 18:41:05 +05:30
parent 00011e6a98
commit 35456bb7c9
5 changed files with 131 additions and 70 deletions

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import com.google.common.base.Optional;
import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
@ -38,14 +36,15 @@
import org.apache.commons.lang.math.RandomUtils; import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.CommandExecutor; import org.apache.hadoop.util.Shell.CommandExecutor;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -64,6 +63,7 @@
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
public class DefaultContainerExecutor extends ContainerExecutor { public class DefaultContainerExecutor extends ContainerExecutor {
@ -132,13 +132,25 @@ public void startLocalizer(LocalizerStartContext ctx)
localizerFc.setWorkingDirectory(appStorageDir); localizerFc.setWorkingDirectory(appStorageDir);
LOG.info("Localizer CWD set to " + appStorageDir + " = " LOG.info("Localizer CWD set to " + appStorageDir + " = "
+ localizerFc.getWorkingDirectory()); + localizerFc.getWorkingDirectory());
ContainerLocalizer localizer = ContainerLocalizer localizer =
new ContainerLocalizer(localizerFc, user, appId, locId, createContainerLocalizer(user, appId, locId, localDirs, localizerFc);
getPaths(localDirs), RecordFactoryProvider.getRecordFactory(getConf()));
// TODO: DO it over RPC for maintaining similarity? // TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr); localizer.runLocalization(nmAddr);
} }
@Private
@VisibleForTesting
protected ContainerLocalizer createContainerLocalizer(String user,
String appId, String locId, List<String> localDirs,
FileContext localizerFc) throws IOException {
ContainerLocalizer localizer =
new ContainerLocalizer(localizerFc, user, appId, locId,
getPaths(localDirs),
RecordFactoryProvider.getRecordFactory(getConf()));
return localizer;
}
@Override @Override
public int launchContainer(ContainerStartContext ctx) throws IOException { public int launchContainer(ContainerStartContext ctx) throws IOException {
Container container = ctx.getContainer(); Container container = ctx.getContainer();

View File

@ -40,6 +40,7 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -70,6 +71,7 @@
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload; import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ContainerLocalizer { public class ContainerLocalizer {
@ -117,14 +119,16 @@ public ContainerLocalizer(FileContext lfs, String user, String appId,
this.pendingResources = new HashMap<LocalResource,Future<Path>>(); this.pendingResources = new HashMap<LocalResource,Future<Path>>();
} }
LocalizationProtocol getProxy(final InetSocketAddress nmAddr) { @Private
@VisibleForTesting
public LocalizationProtocol getProxy(final InetSocketAddress nmAddr) {
YarnRPC rpc = YarnRPC.create(conf); YarnRPC rpc = YarnRPC.create(conf);
return (LocalizationProtocol) return (LocalizationProtocol)
rpc.getProxy(LocalizationProtocol.class, nmAddr, conf); rpc.getProxy(LocalizationProtocol.class, nmAddr, conf);
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public int runLocalization(final InetSocketAddress nmAddr) public void runLocalization(final InetSocketAddress nmAddr)
throws IOException, InterruptedException { throws IOException, InterruptedException {
// load credentials // load credentials
initDirs(conf, user, appId, lfs, localDirs); initDirs(conf, user, appId, lfs, localDirs);
@ -168,12 +172,9 @@ public LocalizationProtocol run() {
exec = createDownloadThreadPool(); exec = createDownloadThreadPool();
CompletionService<Path> ecs = createCompletionService(exec); CompletionService<Path> ecs = createCompletionService(exec);
localizeFiles(nodeManager, ecs, ugi); localizeFiles(nodeManager, ecs, ugi);
return 0; return;
} catch (Throwable e) { } catch (Throwable e) {
// Print traces to stdout so that they can be logged by the NM address throw new IOException(e);
// space.
e.printStackTrace(System.out);
return -1;
} finally { } finally {
try { try {
if (exec != null) { if (exec != null) {
@ -229,7 +230,7 @@ protected void closeFileSystems(UserGroupInformation ugi) {
protected void localizeFiles(LocalizationProtocol nodemanager, protected void localizeFiles(LocalizationProtocol nodemanager,
CompletionService<Path> cs, UserGroupInformation ugi) CompletionService<Path> cs, UserGroupInformation ugi)
throws IOException { throws IOException, YarnException {
while (true) { while (true) {
try { try {
LocalizerStatus status = createStatus(); LocalizerStatus status = createStatus();
@ -251,10 +252,15 @@ protected void localizeFiles(LocalizationProtocol nodemanager,
pending.cancel(true); pending.cancel(true);
} }
status = createStatus(); status = createStatus();
// ignore response // ignore response while dying.
try { try {
nodemanager.heartbeat(status); nodemanager.heartbeat(status);
} catch (YarnException e) { } } catch (YarnException e) {
// Cannot do anything about this during death stage, let's just log
// it.
e.printStackTrace(System.out);
LOG.error("Heartbeat failed while dying: ", e);
}
return; return;
} }
cs.poll(1000, TimeUnit.MILLISECONDS); cs.poll(1000, TimeUnit.MILLISECONDS);
@ -262,7 +268,7 @@ protected void localizeFiles(LocalizationProtocol nodemanager,
return; return;
} catch (YarnException e) { } catch (YarnException e) {
// TODO cleanup // TODO cleanup
return; throw e;
} }
} }
} }
@ -380,16 +386,14 @@ public static void main(String[] argv) throws Throwable {
new ContainerLocalizer(FileContext.getLocalFSFileContext(), user, new ContainerLocalizer(FileContext.getLocalFSFileContext(), user,
appId, locId, localDirs, appId, locId, localDirs,
RecordFactoryProvider.getRecordFactory(null)); RecordFactoryProvider.getRecordFactory(null));
int nRet = localizer.runLocalization(nmAddr); localizer.runLocalization(nmAddr);
if (LOG.isDebugEnabled()) { return;
LOG.debug(String.format("nRet: %d", nRet));
}
System.exit(nRet);
} catch (Throwable e) { } catch (Throwable e) {
// Print error to stdout so that LCE can use it. // Print traces to stdout so that they can be logged by the NM address
// space in both DefaultCE and LCE cases
e.printStackTrace(System.out); e.printStackTrace(System.out);
LOG.error("Exception in main:", e); LOG.error("Exception in main:", e);
throw e; System.exit(-1);
} }
} }

View File

@ -20,65 +20,59 @@
import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.InputStream;
import java.io.IOException; import java.io.IOException;
import java.io.LineNumberReader;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.MockLocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -336,10 +330,9 @@ public Object answer(InvocationOnMock invocationOnMock)
} }
@Test(timeout = 30000) @Test(timeout = 30000)
public void testStartLocalizer() public void testStartLocalizer() throws IOException, InterruptedException,
throws IOException, InterruptedException { YarnException {
InetSocketAddress localizationServerAddress;
final Path firstDir = new Path(BASE_TMP_PATH, "localDir1"); final Path firstDir = new Path(BASE_TMP_PATH, "localDir1");
List<String> localDirs = new ArrayList<String>(); List<String> localDirs = new ArrayList<String>();
final Path secondDir = new Path(BASE_TMP_PATH, "localDir2"); final Path secondDir = new Path(BASE_TMP_PATH, "localDir2");
@ -349,11 +342,6 @@ public void testStartLocalizer()
FsPermission perms = new FsPermission((short)0770); FsPermission perms = new FsPermission((short)0770);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
localizationServerAddress = conf.getSocketAddr(
YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf)); final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf));
final FileContext.Util mockUtil = spy(mockLfs.util()); final FileContext.Util mockUtil = spy(mockLfs.util());
@ -391,6 +379,7 @@ public Object answer(InvocationOnMock invocationOnMock)
return null; return null;
} }
}).when(mockUtil).copy(any(Path.class), any(Path.class)); }).when(mockUtil).copy(any(Path.class), any(Path.class));
doAnswer(new Answer() { doAnswer(new Answer() {
@Override @Override
public Object answer(InvocationOnMock invocationOnMock) public Object answer(InvocationOnMock invocationOnMock)
@ -406,8 +395,33 @@ public Object answer(InvocationOnMock invocationOnMock)
} }
}).when(mockLfs).getFsStatus(any(Path.class)); }).when(mockLfs).getFsStatus(any(Path.class));
DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor( DefaultContainerExecutor mockExec =
mockLfs)); spy(new DefaultContainerExecutor(mockLfs) {
@Override
public ContainerLocalizer createContainerLocalizer(String user,
String appId, String locId, List<String> localDirs,
FileContext localizerFc) throws IOException {
// Spy on the localizer and make it return valid heart-beat
// responses even though there is no real NodeManager.
ContainerLocalizer localizer =
super.createContainerLocalizer(user, appId, locId, localDirs,
localizerFc);
ContainerLocalizer spyLocalizer = spy(localizer);
LocalizationProtocol nmProxy = mock(LocalizationProtocol.class);
try {
when(nmProxy.heartbeat(isA(LocalizerStatus.class))).thenReturn(
new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
new ArrayList<ResourceLocalizationSpec>()));
} catch (YarnException e) {
throw new IOException(e);
}
when(spyLocalizer.getProxy(any(InetSocketAddress.class)))
.thenReturn(nmProxy);
return spyLocalizer;
}
});
mockExec.setConf(conf); mockExec.setConf(conf);
localDirs.add(mockLfs.makeQualified(firstDir).toString()); localDirs.add(mockLfs.makeQualified(firstDir).toString());
localDirs.add(mockLfs.makeQualified(secondDir).toString()); localDirs.add(mockLfs.makeQualified(secondDir).toString());
@ -424,18 +438,20 @@ public Object answer(InvocationOnMock invocationOnMock)
LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class); LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class);
when(dirsHandler.getLocalDirs()).thenReturn(localDirs); when(dirsHandler.getLocalDirs()).thenReturn(localDirs);
when(dirsHandler.getLogDirs()).thenReturn(logDirs); when(dirsHandler.getLogDirs()).thenReturn(logDirs);
try { try {
mockExec.startLocalizer(new LocalizerStartContext.Builder() mockExec.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateCTokensPath) .setNmPrivateContainerTokens(nmPrivateCTokensPath)
.setNmAddr(localizationServerAddress) .setNmAddr(null)
.setUser(appSubmitter) .setUser(appSubmitter)
.setAppId(appId) .setAppId(appId)
.setLocId(locId) .setLocId(locId)
.setDirsHandler(dirsHandler) .setDirsHandler(dirsHandler)
.build()); .build());
} catch (IOException e) { } catch (IOException e) {
Assert.fail("StartLocalizer failed to copy token file " + e); Assert.fail("StartLocalizer failed to copy token file: "
+ StringUtils.stringifyException(e));
} finally { } finally {
mockExec.deleteAsUser(new DeletionAsUserContext.Builder() mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter) .setUser(appSubmitter)
@ -451,7 +467,12 @@ public Object answer(InvocationOnMock invocationOnMock)
.build()); .build());
deleteTmpFiles(); deleteTmpFiles();
} }
// Verify that the calls happen the expected number of times
verify(mockUtil, times(1)).copy(any(Path.class), any(Path.class));
verify(mockLfs, times(2)).getFsStatus(any(Path.class));
} }
// @Test // @Test
// public void testInit() throws IOException, InterruptedException { // public void testInit() throws IOException, InterruptedException {
// Configuration conf = new Configuration(); // Configuration conf = new Configuration();

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
@ -30,11 +29,7 @@ public class MockLocalizerHeartbeatResponse
LocalizerAction action; LocalizerAction action;
List<ResourceLocalizationSpec> resourceSpecs; List<ResourceLocalizationSpec> resourceSpecs;
MockLocalizerHeartbeatResponse() { public MockLocalizerHeartbeatResponse(
resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
}
MockLocalizerHeartbeatResponse(
LocalizerAction action, List<ResourceLocalizationSpec> resources) { LocalizerAction action, List<ResourceLocalizationSpec> resources) {
this.action = action; this.action = action;
this.resourceSpecs = resources; this.resourceSpecs = resources;

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
@ -65,6 +64,7 @@
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
@ -73,6 +73,7 @@
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
@ -98,7 +99,7 @@ public class TestContainerLocalizer {
private LocalizationProtocol nmProxy; private LocalizationProtocol nmProxy;
@Test @Test
public void testContainerLocalizerMain() throws Exception { public void testMain() throws Exception {
FileContext fs = FileContext.getLocalFSFileContext(); FileContext fs = FileContext.getLocalFSFileContext();
spylfs = spy(fs.getDefaultFileSystem()); spylfs = spy(fs.getDefaultFileSystem());
ContainerLocalizer localizer = ContainerLocalizer localizer =
@ -167,7 +168,7 @@ public void testContainerLocalizerMain() throws Exception {
isA(UserGroupInformation.class)); isA(UserGroupInformation.class));
// run localization // run localization
assertEquals(0, localizer.runLocalization(nmAddr)); localizer.runLocalization(nmAddr);
for (Path p : localDirs) { for (Path p : localDirs) {
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser); Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
Path privcache = new Path(base, ContainerLocalizer.FILECACHE); Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
@ -198,7 +199,27 @@ public boolean matches(Object o) {
} }
})); }));
} }
@Test(timeout = 15000)
public void testMainFailure() throws Exception {
FileContext fs = FileContext.getLocalFSFileContext();
spylfs = spy(fs.getDefaultFileSystem());
ContainerLocalizer localizer = setupContainerLocalizerForTest();
// Assume the NM heartbeat fails say because of absent tokens.
when(nmProxy.heartbeat(isA(LocalizerStatus.class))).thenThrow(
new YarnException("Sigh, no token!"));
// run localization, it should fail
try {
localizer.runLocalization(nmAddr);
Assert.fail("Localization succeeded unexpectedly!");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains("Sigh, no token!"));
}
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testLocalizerTokenIsGettingRemoved() throws Exception { public void testLocalizerTokenIsGettingRemoved() throws Exception {
@ -214,18 +235,22 @@ public void testLocalizerTokenIsGettingRemoved() throws Exception {
@Test @Test
@SuppressWarnings("unchecked") // mocked generics @SuppressWarnings("unchecked") // mocked generics
public void testContainerLocalizerClosesFilesystems() throws Exception { public void testContainerLocalizerClosesFilesystems() throws Exception {
// verify filesystems are closed when localizer doesn't fail // verify filesystems are closed when localizer doesn't fail
FileContext fs = FileContext.getLocalFSFileContext(); FileContext fs = FileContext.getLocalFSFileContext();
spylfs = spy(fs.getDefaultFileSystem()); spylfs = spy(fs.getDefaultFileSystem());
ContainerLocalizer localizer = setupContainerLocalizerForTest(); ContainerLocalizer localizer = setupContainerLocalizerForTest();
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class), doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
any(CompletionService.class), any(UserGroupInformation.class)); any(CompletionService.class), any(UserGroupInformation.class));
verify(localizer, never()).closeFileSystems( verify(localizer, never()).closeFileSystems(
any(UserGroupInformation.class)); any(UserGroupInformation.class));
localizer.runLocalization(nmAddr); localizer.runLocalization(nmAddr);
verify(localizer).closeFileSystems(any(UserGroupInformation.class)); verify(localizer).closeFileSystems(any(UserGroupInformation.class));
spylfs = spy(fs.getDefaultFileSystem()); spylfs = spy(fs.getDefaultFileSystem());
// verify filesystems are closed when localizer fails // verify filesystems are closed when localizer fails
localizer = setupContainerLocalizerForTest(); localizer = setupContainerLocalizerForTest();
doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles( doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
@ -233,8 +258,12 @@ public void testContainerLocalizerClosesFilesystems() throws Exception {
any(UserGroupInformation.class)); any(UserGroupInformation.class));
verify(localizer, never()).closeFileSystems( verify(localizer, never()).closeFileSystems(
any(UserGroupInformation.class)); any(UserGroupInformation.class));
localizer.runLocalization(nmAddr); try {
verify(localizer).closeFileSystems(any(UserGroupInformation.class)); localizer.runLocalization(nmAddr);
Assert.fail("Localization succeeded unexpectedly!");
} catch (IOException e) {
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
}
} }
@SuppressWarnings("unchecked") // mocked generics @SuppressWarnings("unchecked") // mocked generics