YARN-857. Localization failures should be available in container diagnostics. Contributed by Vinod Kumar Vavilapalli.
(cherry picked from commit f440a9d8c4a177bc5062d21d4b4bc4d9b2944344)
This commit is contained in:
parent
2e42bafdc8
commit
36f2ae0692
|
@ -18,8 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import com.google.common.base.Optional;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
|
|
||||||
|
@ -38,14 +36,15 @@ import java.util.Map;
|
||||||
import org.apache.commons.lang.math.RandomUtils;
|
import org.apache.commons.lang.math.RandomUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.Shell.ExitCodeException;
|
|
||||||
import org.apache.hadoop.util.Shell.CommandExecutor;
|
import org.apache.hadoop.util.Shell.CommandExecutor;
|
||||||
|
import org.apache.hadoop.util.Shell.ExitCodeException;
|
||||||
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
@ -64,6 +63,7 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Optional;
|
||||||
|
|
||||||
public class DefaultContainerExecutor extends ContainerExecutor {
|
public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
|
|
||||||
|
@ -134,13 +134,25 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
localizerFc.setWorkingDirectory(appStorageDir);
|
localizerFc.setWorkingDirectory(appStorageDir);
|
||||||
LOG.info("Localizer CWD set to " + appStorageDir + " = "
|
LOG.info("Localizer CWD set to " + appStorageDir + " = "
|
||||||
+ localizerFc.getWorkingDirectory());
|
+ localizerFc.getWorkingDirectory());
|
||||||
|
|
||||||
ContainerLocalizer localizer =
|
ContainerLocalizer localizer =
|
||||||
new ContainerLocalizer(localizerFc, user, appId, locId,
|
createContainerLocalizer(user, appId, locId, localDirs, localizerFc);
|
||||||
getPaths(localDirs), RecordFactoryProvider.getRecordFactory(getConf()));
|
|
||||||
// TODO: DO it over RPC for maintaining similarity?
|
// TODO: DO it over RPC for maintaining similarity?
|
||||||
localizer.runLocalization(nmAddr);
|
localizer.runLocalization(nmAddr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
protected ContainerLocalizer createContainerLocalizer(String user,
|
||||||
|
String appId, String locId, List<String> localDirs,
|
||||||
|
FileContext localizerFc) throws IOException {
|
||||||
|
ContainerLocalizer localizer =
|
||||||
|
new ContainerLocalizer(localizerFc, user, appId, locId,
|
||||||
|
getPaths(localDirs),
|
||||||
|
RecordFactoryProvider.getRecordFactory(getConf()));
|
||||||
|
return localizer;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int launchContainer(ContainerStartContext ctx) throws IOException {
|
public int launchContainer(ContainerStartContext ctx) throws IOException {
|
||||||
Container container = ctx.getContainer();
|
Container container = ctx.getContainer();
|
||||||
|
|
|
@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.secu
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.FSDownload;
|
import org.apache.hadoop.yarn.util.FSDownload;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
public class ContainerLocalizer {
|
public class ContainerLocalizer {
|
||||||
|
@ -117,14 +119,16 @@ public class ContainerLocalizer {
|
||||||
this.pendingResources = new HashMap<LocalResource,Future<Path>>();
|
this.pendingResources = new HashMap<LocalResource,Future<Path>>();
|
||||||
}
|
}
|
||||||
|
|
||||||
LocalizationProtocol getProxy(final InetSocketAddress nmAddr) {
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
public LocalizationProtocol getProxy(final InetSocketAddress nmAddr) {
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
return (LocalizationProtocol)
|
return (LocalizationProtocol)
|
||||||
rpc.getProxy(LocalizationProtocol.class, nmAddr, conf);
|
rpc.getProxy(LocalizationProtocol.class, nmAddr, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public int runLocalization(final InetSocketAddress nmAddr)
|
public void runLocalization(final InetSocketAddress nmAddr)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
// load credentials
|
// load credentials
|
||||||
initDirs(conf, user, appId, lfs, localDirs);
|
initDirs(conf, user, appId, lfs, localDirs);
|
||||||
|
@ -168,12 +172,9 @@ public class ContainerLocalizer {
|
||||||
exec = createDownloadThreadPool();
|
exec = createDownloadThreadPool();
|
||||||
CompletionService<Path> ecs = createCompletionService(exec);
|
CompletionService<Path> ecs = createCompletionService(exec);
|
||||||
localizeFiles(nodeManager, ecs, ugi);
|
localizeFiles(nodeManager, ecs, ugi);
|
||||||
return 0;
|
return;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
// Print traces to stdout so that they can be logged by the NM address
|
throw new IOException(e);
|
||||||
// space.
|
|
||||||
e.printStackTrace(System.out);
|
|
||||||
return -1;
|
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
if (exec != null) {
|
if (exec != null) {
|
||||||
|
@ -229,7 +230,7 @@ public class ContainerLocalizer {
|
||||||
|
|
||||||
protected void localizeFiles(LocalizationProtocol nodemanager,
|
protected void localizeFiles(LocalizationProtocol nodemanager,
|
||||||
CompletionService<Path> cs, UserGroupInformation ugi)
|
CompletionService<Path> cs, UserGroupInformation ugi)
|
||||||
throws IOException {
|
throws IOException, YarnException {
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
LocalizerStatus status = createStatus();
|
LocalizerStatus status = createStatus();
|
||||||
|
@ -251,10 +252,15 @@ public class ContainerLocalizer {
|
||||||
pending.cancel(true);
|
pending.cancel(true);
|
||||||
}
|
}
|
||||||
status = createStatus();
|
status = createStatus();
|
||||||
// ignore response
|
// ignore response while dying.
|
||||||
try {
|
try {
|
||||||
nodemanager.heartbeat(status);
|
nodemanager.heartbeat(status);
|
||||||
} catch (YarnException e) { }
|
} catch (YarnException e) {
|
||||||
|
// Cannot do anything about this during death stage, let's just log
|
||||||
|
// it.
|
||||||
|
e.printStackTrace(System.out);
|
||||||
|
LOG.error("Heartbeat failed while dying: ", e);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
cs.poll(1000, TimeUnit.MILLISECONDS);
|
cs.poll(1000, TimeUnit.MILLISECONDS);
|
||||||
|
@ -262,7 +268,7 @@ public class ContainerLocalizer {
|
||||||
return;
|
return;
|
||||||
} catch (YarnException e) {
|
} catch (YarnException e) {
|
||||||
// TODO cleanup
|
// TODO cleanup
|
||||||
return;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -380,16 +386,14 @@ public class ContainerLocalizer {
|
||||||
new ContainerLocalizer(FileContext.getLocalFSFileContext(), user,
|
new ContainerLocalizer(FileContext.getLocalFSFileContext(), user,
|
||||||
appId, locId, localDirs,
|
appId, locId, localDirs,
|
||||||
RecordFactoryProvider.getRecordFactory(null));
|
RecordFactoryProvider.getRecordFactory(null));
|
||||||
int nRet = localizer.runLocalization(nmAddr);
|
localizer.runLocalization(nmAddr);
|
||||||
if (LOG.isDebugEnabled()) {
|
return;
|
||||||
LOG.debug(String.format("nRet: %d", nRet));
|
|
||||||
}
|
|
||||||
System.exit(nRet);
|
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
// Print error to stdout so that LCE can use it.
|
// Print traces to stdout so that they can be logged by the NM address
|
||||||
|
// space in both DefaultCE and LCE cases
|
||||||
e.printStackTrace(System.out);
|
e.printStackTrace(System.out);
|
||||||
LOG.error("Exception in main:", e);
|
LOG.error("Exception in main:", e);
|
||||||
throw e;
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,65 +20,59 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
import static org.mockito.Mockito.any;
|
|
||||||
import static org.mockito.Mockito.doAnswer;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.BufferedWriter;
|
import java.io.BufferedWriter;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.FileReader;
|
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.LineNumberReader;
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
|
||||||
import org.apache.hadoop.fs.FsStatus;
|
import org.apache.hadoop.fs.FsStatus;
|
||||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.util.Progressable;
|
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.MockLocalizerHeartbeatResponse;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
@ -345,9 +339,8 @@ public class TestDefaultContainerExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 30000)
|
@Test(timeout = 30000)
|
||||||
public void testStartLocalizer()
|
public void testStartLocalizer() throws IOException, InterruptedException,
|
||||||
throws IOException, InterruptedException {
|
YarnException {
|
||||||
InetSocketAddress localizationServerAddress;
|
|
||||||
|
|
||||||
final Path firstDir = new Path(BASE_TMP_PATH, "localDir1");
|
final Path firstDir = new Path(BASE_TMP_PATH, "localDir1");
|
||||||
List<String> localDirs = new ArrayList<String>();
|
List<String> localDirs = new ArrayList<String>();
|
||||||
|
@ -358,11 +351,6 @@ public class TestDefaultContainerExecutor {
|
||||||
FsPermission perms = new FsPermission((short)0770);
|
FsPermission perms = new FsPermission((short)0770);
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
localizationServerAddress = conf.getSocketAddr(
|
|
||||||
YarnConfiguration.NM_BIND_HOST,
|
|
||||||
YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
|
|
||||||
|
|
||||||
final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf));
|
final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf));
|
||||||
final FileContext.Util mockUtil = spy(mockLfs.util());
|
final FileContext.Util mockUtil = spy(mockLfs.util());
|
||||||
|
@ -400,6 +388,7 @@ public class TestDefaultContainerExecutor {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}).when(mockUtil).copy(any(Path.class), any(Path.class));
|
}).when(mockUtil).copy(any(Path.class), any(Path.class));
|
||||||
|
|
||||||
doAnswer(new Answer() {
|
doAnswer(new Answer() {
|
||||||
@Override
|
@Override
|
||||||
public Object answer(InvocationOnMock invocationOnMock)
|
public Object answer(InvocationOnMock invocationOnMock)
|
||||||
|
@ -415,8 +404,33 @@ public class TestDefaultContainerExecutor {
|
||||||
}
|
}
|
||||||
}).when(mockLfs).getFsStatus(any(Path.class));
|
}).when(mockLfs).getFsStatus(any(Path.class));
|
||||||
|
|
||||||
DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(
|
DefaultContainerExecutor mockExec =
|
||||||
mockLfs));
|
spy(new DefaultContainerExecutor(mockLfs) {
|
||||||
|
@Override
|
||||||
|
public ContainerLocalizer createContainerLocalizer(String user,
|
||||||
|
String appId, String locId, List<String> localDirs,
|
||||||
|
FileContext localizerFc) throws IOException {
|
||||||
|
|
||||||
|
// Spy on the localizer and make it return valid heart-beat
|
||||||
|
// responses even though there is no real NodeManager.
|
||||||
|
ContainerLocalizer localizer =
|
||||||
|
super.createContainerLocalizer(user, appId, locId, localDirs,
|
||||||
|
localizerFc);
|
||||||
|
ContainerLocalizer spyLocalizer = spy(localizer);
|
||||||
|
LocalizationProtocol nmProxy = mock(LocalizationProtocol.class);
|
||||||
|
try {
|
||||||
|
when(nmProxy.heartbeat(isA(LocalizerStatus.class))).thenReturn(
|
||||||
|
new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
|
||||||
|
new ArrayList<ResourceLocalizationSpec>()));
|
||||||
|
} catch (YarnException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
when(spyLocalizer.getProxy(any(InetSocketAddress.class)))
|
||||||
|
.thenReturn(nmProxy);
|
||||||
|
|
||||||
|
return spyLocalizer;
|
||||||
|
}
|
||||||
|
});
|
||||||
mockExec.setConf(conf);
|
mockExec.setConf(conf);
|
||||||
localDirs.add(mockLfs.makeQualified(firstDir).toString());
|
localDirs.add(mockLfs.makeQualified(firstDir).toString());
|
||||||
localDirs.add(mockLfs.makeQualified(secondDir).toString());
|
localDirs.add(mockLfs.makeQualified(secondDir).toString());
|
||||||
|
@ -437,14 +451,16 @@ public class TestDefaultContainerExecutor {
|
||||||
try {
|
try {
|
||||||
mockExec.startLocalizer(new LocalizerStartContext.Builder()
|
mockExec.startLocalizer(new LocalizerStartContext.Builder()
|
||||||
.setNmPrivateContainerTokens(nmPrivateCTokensPath)
|
.setNmPrivateContainerTokens(nmPrivateCTokensPath)
|
||||||
.setNmAddr(localizationServerAddress)
|
.setNmAddr(null)
|
||||||
.setUser(appSubmitter)
|
.setUser(appSubmitter)
|
||||||
.setAppId(appId)
|
.setAppId(appId)
|
||||||
.setLocId(locId)
|
.setLocId(locId)
|
||||||
.setDirsHandler(dirsHandler)
|
.setDirsHandler(dirsHandler)
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
Assert.fail("StartLocalizer failed to copy token file " + e);
|
Assert.fail("StartLocalizer failed to copy token file: "
|
||||||
|
+ StringUtils.stringifyException(e));
|
||||||
} finally {
|
} finally {
|
||||||
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
|
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
|
||||||
.setUser(appSubmitter)
|
.setUser(appSubmitter)
|
||||||
|
@ -460,7 +476,12 @@ public class TestDefaultContainerExecutor {
|
||||||
.build());
|
.build());
|
||||||
deleteTmpFiles();
|
deleteTmpFiles();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify that the calls happen the expected number of times
|
||||||
|
verify(mockUtil, times(1)).copy(any(Path.class), any(Path.class));
|
||||||
|
verify(mockLfs, times(2)).getFsStatus(any(Path.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Test
|
// @Test
|
||||||
// public void testInit() throws IOException, InterruptedException {
|
// public void testInit() throws IOException, InterruptedException {
|
||||||
// Configuration conf = new Configuration();
|
// Configuration conf = new Configuration();
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
|
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
|
||||||
|
@ -30,11 +29,7 @@ public class MockLocalizerHeartbeatResponse
|
||||||
LocalizerAction action;
|
LocalizerAction action;
|
||||||
List<ResourceLocalizationSpec> resourceSpecs;
|
List<ResourceLocalizationSpec> resourceSpecs;
|
||||||
|
|
||||||
MockLocalizerHeartbeatResponse() {
|
public MockLocalizerHeartbeatResponse(
|
||||||
resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
|
|
||||||
}
|
|
||||||
|
|
||||||
MockLocalizerHeartbeatResponse(
|
|
||||||
LocalizerAction action, List<ResourceLocalizationSpec> resources) {
|
LocalizerAction action, List<ResourceLocalizationSpec> resources) {
|
||||||
this.action = action;
|
this.action = action;
|
||||||
this.resourceSpecs = resources;
|
this.resourceSpecs = resources;
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyBoolean;
|
import static org.mockito.Matchers.anyBoolean;
|
||||||
|
@ -65,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
|
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
|
||||||
|
@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResour
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
@ -98,7 +99,7 @@ public class TestContainerLocalizer {
|
||||||
private LocalizationProtocol nmProxy;
|
private LocalizationProtocol nmProxy;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testContainerLocalizerMain() throws Exception {
|
public void testMain() throws Exception {
|
||||||
FileContext fs = FileContext.getLocalFSFileContext();
|
FileContext fs = FileContext.getLocalFSFileContext();
|
||||||
spylfs = spy(fs.getDefaultFileSystem());
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
ContainerLocalizer localizer =
|
ContainerLocalizer localizer =
|
||||||
|
@ -167,7 +168,7 @@ public class TestContainerLocalizer {
|
||||||
isA(UserGroupInformation.class));
|
isA(UserGroupInformation.class));
|
||||||
|
|
||||||
// run localization
|
// run localization
|
||||||
assertEquals(0, localizer.runLocalization(nmAddr));
|
localizer.runLocalization(nmAddr);
|
||||||
for (Path p : localDirs) {
|
for (Path p : localDirs) {
|
||||||
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
|
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
|
||||||
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
|
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
|
||||||
|
@ -199,6 +200,26 @@ public class TestContainerLocalizer {
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 15000)
|
||||||
|
public void testMainFailure() throws Exception {
|
||||||
|
|
||||||
|
FileContext fs = FileContext.getLocalFSFileContext();
|
||||||
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
|
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
||||||
|
|
||||||
|
// Assume the NM heartbeat fails say because of absent tokens.
|
||||||
|
when(nmProxy.heartbeat(isA(LocalizerStatus.class))).thenThrow(
|
||||||
|
new YarnException("Sigh, no token!"));
|
||||||
|
|
||||||
|
// run localization, it should fail
|
||||||
|
try {
|
||||||
|
localizer.runLocalization(nmAddr);
|
||||||
|
Assert.fail("Localization succeeded unexpectedly!");
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.assertTrue(e.getMessage().contains("Sigh, no token!"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testLocalizerTokenIsGettingRemoved() throws Exception {
|
public void testLocalizerTokenIsGettingRemoved() throws Exception {
|
||||||
|
@ -214,18 +235,22 @@ public class TestContainerLocalizer {
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked") // mocked generics
|
@SuppressWarnings("unchecked") // mocked generics
|
||||||
public void testContainerLocalizerClosesFilesystems() throws Exception {
|
public void testContainerLocalizerClosesFilesystems() throws Exception {
|
||||||
|
|
||||||
// verify filesystems are closed when localizer doesn't fail
|
// verify filesystems are closed when localizer doesn't fail
|
||||||
FileContext fs = FileContext.getLocalFSFileContext();
|
FileContext fs = FileContext.getLocalFSFileContext();
|
||||||
spylfs = spy(fs.getDefaultFileSystem());
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
|
|
||||||
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
||||||
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
||||||
any(CompletionService.class), any(UserGroupInformation.class));
|
any(CompletionService.class), any(UserGroupInformation.class));
|
||||||
verify(localizer, never()).closeFileSystems(
|
verify(localizer, never()).closeFileSystems(
|
||||||
any(UserGroupInformation.class));
|
any(UserGroupInformation.class));
|
||||||
|
|
||||||
localizer.runLocalization(nmAddr);
|
localizer.runLocalization(nmAddr);
|
||||||
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
||||||
|
|
||||||
spylfs = spy(fs.getDefaultFileSystem());
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
|
|
||||||
// verify filesystems are closed when localizer fails
|
// verify filesystems are closed when localizer fails
|
||||||
localizer = setupContainerLocalizerForTest();
|
localizer = setupContainerLocalizerForTest();
|
||||||
doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
|
doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
|
||||||
|
@ -233,9 +258,13 @@ public class TestContainerLocalizer {
|
||||||
any(UserGroupInformation.class));
|
any(UserGroupInformation.class));
|
||||||
verify(localizer, never()).closeFileSystems(
|
verify(localizer, never()).closeFileSystems(
|
||||||
any(UserGroupInformation.class));
|
any(UserGroupInformation.class));
|
||||||
|
try {
|
||||||
localizer.runLocalization(nmAddr);
|
localizer.runLocalization(nmAddr);
|
||||||
|
Assert.fail("Localization succeeded unexpectedly!");
|
||||||
|
} catch (IOException e) {
|
||||||
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked") // mocked generics
|
@SuppressWarnings("unchecked") // mocked generics
|
||||||
private ContainerLocalizer setupContainerLocalizerForTest()
|
private ContainerLocalizer setupContainerLocalizerForTest()
|
||||||
|
|
Loading…
Reference in New Issue