Revert "YARN-5641. Localizer leaves behind tarballs after container is complete. Contributed by Eric Badger"
This reverts commit 45407acaed
.
This commit is contained in:
parent
b799ea7641
commit
b806201f0a
|
@ -27,9 +27,7 @@ import java.io.InterruptedIOException;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
import java.util.WeakHashMap;
|
import java.util.WeakHashMap;
|
||||||
|
@ -52,8 +50,8 @@ import org.slf4j.LoggerFactory;
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public abstract class Shell {
|
public abstract class Shell {
|
||||||
private static final Map<Shell, Object> CHILD_SHELLS =
|
private static final Map <Process, Object> CHILD_PROCESSES =
|
||||||
Collections.synchronizedMap(new WeakHashMap<Shell, Object>());
|
Collections.synchronizedMap(new WeakHashMap<Process, Object>());
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(Shell.class);
|
public static final Logger LOG = LoggerFactory.getLogger(Shell.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -822,7 +820,6 @@ public abstract class Shell {
|
||||||
private File dir;
|
private File dir;
|
||||||
private Process process; // sub process used to execute the command
|
private Process process; // sub process used to execute the command
|
||||||
private int exitCode;
|
private int exitCode;
|
||||||
private Thread waitingThread;
|
|
||||||
|
|
||||||
/** Flag to indicate whether or not the script has finished executing. */
|
/** Flag to indicate whether or not the script has finished executing. */
|
||||||
private final AtomicBoolean completed = new AtomicBoolean(false);
|
private final AtomicBoolean completed = new AtomicBoolean(false);
|
||||||
|
@ -927,9 +924,7 @@ public abstract class Shell {
|
||||||
} else {
|
} else {
|
||||||
process = builder.start();
|
process = builder.start();
|
||||||
}
|
}
|
||||||
|
CHILD_PROCESSES.put(process, null);
|
||||||
waitingThread = Thread.currentThread();
|
|
||||||
CHILD_SHELLS.put(this, null);
|
|
||||||
|
|
||||||
if (timeOutInterval > 0) {
|
if (timeOutInterval > 0) {
|
||||||
timeOutTimer = new Timer("Shell command timeout");
|
timeOutTimer = new Timer("Shell command timeout");
|
||||||
|
@ -1026,8 +1021,7 @@ public abstract class Shell {
|
||||||
LOG.warn("Error while closing the error stream", ioe);
|
LOG.warn("Error while closing the error stream", ioe);
|
||||||
}
|
}
|
||||||
process.destroy();
|
process.destroy();
|
||||||
waitingThread = null;
|
CHILD_PROCESSES.remove(process);
|
||||||
CHILD_SHELLS.remove(this);
|
|
||||||
lastTime = Time.monotonicNow();
|
lastTime = Time.monotonicNow();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1075,15 +1069,6 @@ public abstract class Shell {
|
||||||
return exitCode;
|
return exitCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** get the thread that is waiting on this instance of <code>Shell</code>.
|
|
||||||
* @return the thread that ran runCommand() that spawned this shell
|
|
||||||
* or null if no thread is waiting for this shell to complete
|
|
||||||
*/
|
|
||||||
public Thread getWaitingThread() {
|
|
||||||
return waitingThread;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is an IOException with exit code added.
|
* This is an IOException with exit code added.
|
||||||
*/
|
*/
|
||||||
|
@ -1337,27 +1322,20 @@ public abstract class Shell {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Static method to destroy all running <code>Shell</code> processes.
|
* Static method to destroy all running <code>Shell</code> processes
|
||||||
* Iterates through a map of all currently running <code>Shell</code>
|
* Iterates through a list of all currently running <code>Shell</code>
|
||||||
* processes and destroys them one by one. This method is thread safe
|
* processes and destroys them one by one. This method is thread safe and
|
||||||
|
* is intended to be used in a shutdown hook.
|
||||||
*/
|
*/
|
||||||
public static void destroyAllShellProcesses() {
|
public static void destroyAllProcesses() {
|
||||||
synchronized (CHILD_SHELLS) {
|
synchronized (CHILD_PROCESSES) {
|
||||||
for (Shell shell : CHILD_SHELLS.keySet()) {
|
for (Process key : CHILD_PROCESSES.keySet()) {
|
||||||
if (shell.getProcess() != null) {
|
Process process = key;
|
||||||
shell.getProcess().destroy();
|
if (key != null) {
|
||||||
|
process.destroy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
CHILD_SHELLS.clear();
|
CHILD_PROCESSES.clear();
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Static method to return a Set of all <code>Shell</code> objects.
|
|
||||||
*/
|
|
||||||
public static Set<Shell> getAllShells() {
|
|
||||||
synchronized (CHILD_SHELLS) {
|
|
||||||
return new HashSet<>(CHILD_SHELLS.keySet());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -479,7 +479,7 @@ public class TestShell extends Assert {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=120000)
|
@Test(timeout=120000)
|
||||||
public void testDestroyAllShellProcesses() throws Throwable {
|
public void testShellKillAllProcesses() throws Throwable {
|
||||||
Assume.assumeFalse(WINDOWS);
|
Assume.assumeFalse(WINDOWS);
|
||||||
StringBuffer sleepCommand = new StringBuffer();
|
StringBuffer sleepCommand = new StringBuffer();
|
||||||
sleepCommand.append("sleep 200");
|
sleepCommand.append("sleep 200");
|
||||||
|
@ -524,7 +524,7 @@ public class TestShell extends Assert {
|
||||||
}
|
}
|
||||||
}, 10, 10000);
|
}, 10, 10000);
|
||||||
|
|
||||||
Shell.destroyAllShellProcesses();
|
Shell.destroyAllProcesses();
|
||||||
shexc1.getProcess().waitFor();
|
shexc1.getProcess().waitFor();
|
||||||
shexc2.getProcess().waitFor();
|
shexc2.getProcess().waitFor();
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,13 +24,10 @@ import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
|
@ -56,7 +53,6 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.util.DiskValidator;
|
import org.apache.hadoop.util.DiskValidator;
|
||||||
import org.apache.hadoop.util.DiskValidatorFactory;
|
import org.apache.hadoop.util.DiskValidatorFactory;
|
||||||
import org.apache.hadoop.util.Shell;
|
|
||||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
@ -79,8 +75,6 @@ import org.apache.hadoop.yarn.util.FSDownload;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
import static org.apache.hadoop.util.Shell.getAllShells;
|
|
||||||
|
|
||||||
public class ContainerLocalizer {
|
public class ContainerLocalizer {
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(ContainerLocalizer.class);
|
static final Log LOG = LogFactory.getLog(ContainerLocalizer.class);
|
||||||
|
@ -107,9 +101,6 @@ public class ContainerLocalizer {
|
||||||
private final String appCacheDirContextName;
|
private final String appCacheDirContextName;
|
||||||
private final DiskValidator diskValidator;
|
private final DiskValidator diskValidator;
|
||||||
|
|
||||||
private Set<Thread> localizingThreads =
|
|
||||||
Collections.synchronizedSet(new HashSet<Thread>());
|
|
||||||
|
|
||||||
public ContainerLocalizer(FileContext lfs, String user, String appId,
|
public ContainerLocalizer(FileContext lfs, String user, String appId,
|
||||||
String localizerId, List<Path> localDirs,
|
String localizerId, List<Path> localDirs,
|
||||||
RecordFactory recordFactory) throws IOException {
|
RecordFactory recordFactory) throws IOException {
|
||||||
|
@ -187,14 +178,13 @@ public class ContainerLocalizer {
|
||||||
exec = createDownloadThreadPool();
|
exec = createDownloadThreadPool();
|
||||||
CompletionService<Path> ecs = createCompletionService(exec);
|
CompletionService<Path> ecs = createCompletionService(exec);
|
||||||
localizeFiles(nodeManager, ecs, ugi);
|
localizeFiles(nodeManager, ecs, ugi);
|
||||||
|
return;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
if (exec != null) {
|
if (exec != null) {
|
||||||
exec.shutdown();
|
exec.shutdownNow();
|
||||||
destroyShellProcesses(getAllShells());
|
|
||||||
exec.awaitTermination(10, TimeUnit.SECONDS);
|
|
||||||
}
|
}
|
||||||
LocalDirAllocator.removeContext(appCacheDirContextName);
|
LocalDirAllocator.removeContext(appCacheDirContextName);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -212,34 +202,10 @@ public class ContainerLocalizer {
|
||||||
return new ExecutorCompletionService<Path>(exec);
|
return new ExecutorCompletionService<Path>(exec);
|
||||||
}
|
}
|
||||||
|
|
||||||
class FSDownloadWrapper extends FSDownload {
|
|
||||||
|
|
||||||
FSDownloadWrapper(FileContext files, UserGroupInformation ugi,
|
|
||||||
Configuration conf, Path destDirPath, LocalResource resource) {
|
|
||||||
super(files, ugi, conf, destDirPath, resource);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Path call() throws Exception {
|
|
||||||
Thread currentThread = Thread.currentThread();
|
|
||||||
localizingThreads.add(currentThread);
|
|
||||||
try {
|
|
||||||
return doDownloadCall();
|
|
||||||
} finally {
|
|
||||||
localizingThreads.remove(currentThread);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Path doDownloadCall() throws Exception {
|
|
||||||
return super.call();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
Callable<Path> download(Path path, LocalResource rsrc,
|
Callable<Path> download(Path path, LocalResource rsrc,
|
||||||
UserGroupInformation ugi) throws IOException {
|
UserGroupInformation ugi) throws IOException {
|
||||||
diskValidator.checkStatus(new File(path.toUri().getRawPath()));
|
diskValidator.checkStatus(new File(path.toUri().getRawPath()));
|
||||||
return new FSDownloadWrapper(lfs, ugi, conf, path, rsrc);
|
return new FSDownload(lfs, ugi, conf, path, rsrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
static long getEstimatedSize(LocalResource rsrc) {
|
static long getEstimatedSize(LocalResource rsrc) {
|
||||||
|
@ -397,7 +363,6 @@ public class ContainerLocalizer {
|
||||||
|
|
||||||
public static void main(String[] argv) throws Throwable {
|
public static void main(String[] argv) throws Throwable {
|
||||||
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
|
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
|
||||||
int nRet = 0;
|
|
||||||
// usage: $0 user appId locId host port app_log_dir user_dir [user_dir]*
|
// usage: $0 user appId locId host port app_log_dir user_dir [user_dir]*
|
||||||
// let $x = $x/usercache for $local.dir
|
// let $x = $x/usercache for $local.dir
|
||||||
// MKDIR $x/$user/appcache/$appid
|
// MKDIR $x/$user/appcache/$appid
|
||||||
|
@ -434,9 +399,7 @@ public class ContainerLocalizer {
|
||||||
// space in both DefaultCE and LCE cases
|
// space in both DefaultCE and LCE cases
|
||||||
e.printStackTrace(System.out);
|
e.printStackTrace(System.out);
|
||||||
LOG.error("Exception in main:", e);
|
LOG.error("Exception in main:", e);
|
||||||
nRet = -1;
|
System.exit(-1);
|
||||||
} finally {
|
|
||||||
System.exit(nRet);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -473,12 +436,4 @@ public class ContainerLocalizer {
|
||||||
lfs.setPermission(dirPath, perms);
|
lfs.setPermission(dirPath, perms);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void destroyShellProcesses(Set<Shell> shells) {
|
|
||||||
for (Shell shell : shells) {
|
|
||||||
if(localizingThreads.contains(shell.getWaitingThread())) {
|
|
||||||
shell.getProcess().destroy();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,8 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||||
|
|
||||||
import static junit.framework.TestCase.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyBoolean;
|
import static org.mockito.Matchers.anyBoolean;
|
||||||
|
@ -27,7 +25,6 @@ import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Matchers.same;
|
import static org.mockito.Matchers.same;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
@ -48,9 +45,7 @@ import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -65,9 +60,6 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
|
||||||
import org.apache.hadoop.util.Shell;
|
|
||||||
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
|
@ -84,7 +76,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
import org.mockito.Mockito;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -101,18 +92,18 @@ public class TestContainerLocalizer {
|
||||||
static final InetSocketAddress nmAddr =
|
static final InetSocketAddress nmAddr =
|
||||||
new InetSocketAddress("foobar", 8040);
|
new InetSocketAddress("foobar", 8040);
|
||||||
|
|
||||||
|
private AbstractFileSystem spylfs;
|
||||||
|
private Random random;
|
||||||
|
private List<Path> localDirs;
|
||||||
|
private Path tokenPath;
|
||||||
|
private LocalizationProtocol nmProxy;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMain() throws Exception {
|
public void testMain() throws Exception {
|
||||||
ContainerLocalizerWrapper wrapper = new ContainerLocalizerWrapper();
|
FileContext fs = FileContext.getLocalFSFileContext();
|
||||||
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
ContainerLocalizer localizer =
|
ContainerLocalizer localizer =
|
||||||
wrapper.setupContainerLocalizerForTest();
|
setupContainerLocalizerForTest();
|
||||||
Random random = wrapper.random;
|
|
||||||
List<Path> localDirs = wrapper.localDirs;
|
|
||||||
Path tokenPath = wrapper.tokenPath;
|
|
||||||
LocalizationProtocol nmProxy = wrapper.nmProxy;
|
|
||||||
AbstractFileSystem spylfs = wrapper.spylfs;
|
|
||||||
mockOutDownloads(localizer);
|
|
||||||
|
|
||||||
// verify created cache
|
// verify created cache
|
||||||
List<Path> privCacheList = new ArrayList<Path>();
|
List<Path> privCacheList = new ArrayList<Path>();
|
||||||
|
@ -211,10 +202,10 @@ public class TestContainerLocalizer {
|
||||||
|
|
||||||
@Test(timeout = 15000)
|
@Test(timeout = 15000)
|
||||||
public void testMainFailure() throws Exception {
|
public void testMainFailure() throws Exception {
|
||||||
ContainerLocalizerWrapper wrapper = new ContainerLocalizerWrapper();
|
|
||||||
ContainerLocalizer localizer = wrapper.setupContainerLocalizerForTest();
|
FileContext fs = FileContext.getLocalFSFileContext();
|
||||||
LocalizationProtocol nmProxy = wrapper.nmProxy;
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
mockOutDownloads(localizer);
|
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
||||||
|
|
||||||
// Assume the NM heartbeat fails say because of absent tokens.
|
// Assume the NM heartbeat fails say because of absent tokens.
|
||||||
when(nmProxy.heartbeat(isA(LocalizerStatus.class))).thenThrow(
|
when(nmProxy.heartbeat(isA(LocalizerStatus.class))).thenThrow(
|
||||||
|
@ -232,11 +223,9 @@ public class TestContainerLocalizer {
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testLocalizerTokenIsGettingRemoved() throws Exception {
|
public void testLocalizerTokenIsGettingRemoved() throws Exception {
|
||||||
ContainerLocalizerWrapper wrapper = new ContainerLocalizerWrapper();
|
FileContext fs = FileContext.getLocalFSFileContext();
|
||||||
ContainerLocalizer localizer = wrapper.setupContainerLocalizerForTest();
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
Path tokenPath = wrapper.tokenPath;
|
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
||||||
AbstractFileSystem spylfs = wrapper.spylfs;
|
|
||||||
mockOutDownloads(localizer);
|
|
||||||
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
||||||
any(CompletionService.class), any(UserGroupInformation.class));
|
any(CompletionService.class), any(UserGroupInformation.class));
|
||||||
localizer.runLocalization(nmAddr);
|
localizer.runLocalization(nmAddr);
|
||||||
|
@ -248,10 +237,10 @@ public class TestContainerLocalizer {
|
||||||
public void testContainerLocalizerClosesFilesystems() throws Exception {
|
public void testContainerLocalizerClosesFilesystems() throws Exception {
|
||||||
|
|
||||||
// verify filesystems are closed when localizer doesn't fail
|
// verify filesystems are closed when localizer doesn't fail
|
||||||
ContainerLocalizerWrapper wrapper = new ContainerLocalizerWrapper();
|
FileContext fs = FileContext.getLocalFSFileContext();
|
||||||
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
|
|
||||||
ContainerLocalizer localizer = wrapper.setupContainerLocalizerForTest();
|
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
||||||
mockOutDownloads(localizer);
|
|
||||||
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
||||||
any(CompletionService.class), any(UserGroupInformation.class));
|
any(CompletionService.class), any(UserGroupInformation.class));
|
||||||
verify(localizer, never()).closeFileSystems(
|
verify(localizer, never()).closeFileSystems(
|
||||||
|
@ -260,8 +249,10 @@ public class TestContainerLocalizer {
|
||||||
localizer.runLocalization(nmAddr);
|
localizer.runLocalization(nmAddr);
|
||||||
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
||||||
|
|
||||||
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
|
|
||||||
// verify filesystems are closed when localizer fails
|
// verify filesystems are closed when localizer fails
|
||||||
localizer = wrapper.setupContainerLocalizerForTest();
|
localizer = setupContainerLocalizerForTest();
|
||||||
doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
|
doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
|
||||||
any(LocalizationProtocol.class), any(CompletionService.class),
|
any(LocalizationProtocol.class), any(CompletionService.class),
|
||||||
any(UserGroupInformation.class));
|
any(UserGroupInformation.class));
|
||||||
|
@ -275,102 +266,41 @@ public class TestContainerLocalizer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@SuppressWarnings("unchecked") // mocked generics
|
||||||
public void testMultipleLocalizers() throws Exception {
|
private ContainerLocalizer setupContainerLocalizerForTest()
|
||||||
FakeContainerLocalizerWrapper testA = new FakeContainerLocalizerWrapper();
|
throws Exception {
|
||||||
FakeContainerLocalizerWrapper testB = new FakeContainerLocalizerWrapper();
|
// don't actually create dirs
|
||||||
|
doNothing().when(spylfs).mkdir(
|
||||||
|
isA(Path.class), isA(FsPermission.class), anyBoolean());
|
||||||
|
|
||||||
final FakeContainerLocalizer localizerA = testA.init();
|
Configuration conf = new Configuration();
|
||||||
final FakeContainerLocalizer localizerB = testB.init();
|
FileContext lfs = FileContext.getFileContext(spylfs, conf);
|
||||||
|
localDirs = new ArrayList<Path>();
|
||||||
// run localization
|
for (int i = 0; i < 4; ++i) {
|
||||||
Thread threadA = new Thread() {
|
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
localizerA.runLocalization(nmAddr);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.warn(e);
|
|
||||||
}
|
}
|
||||||
}
|
RecordFactory mockRF = getMockLocalizerRecordFactory();
|
||||||
};
|
ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, appUser,
|
||||||
Thread threadB = new Thread() {
|
appId, containerId, localDirs, mockRF);
|
||||||
@Override
|
ContainerLocalizer localizer = spy(concreteLoc);
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
localizerB.runLocalization(nmAddr);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.warn(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
ShellCommandExecutor shexcA = null;
|
|
||||||
ShellCommandExecutor shexcB = null;
|
|
||||||
try {
|
|
||||||
threadA.start();
|
|
||||||
threadB.start();
|
|
||||||
|
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
// return credential stream instead of opening local file
|
||||||
@Override
|
random = new Random();
|
||||||
public Boolean get() {
|
long seed = random.nextLong();
|
||||||
FakeContainerLocalizer.FakeLongDownload downloader =
|
System.out.println("SEED: " + seed);
|
||||||
localizerA.getDownloader();
|
random.setSeed(seed);
|
||||||
return downloader != null && downloader.getShexc() != null &&
|
DataInputBuffer appTokens = createFakeCredentials(random, 10);
|
||||||
downloader.getShexc().getProcess() != null;
|
tokenPath =
|
||||||
}
|
lfs.makeQualified(new Path(
|
||||||
}, 10, 30000);
|
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
|
||||||
|
containerId)));
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
|
||||||
@Override
|
).when(spylfs).open(tokenPath);
|
||||||
public Boolean get() {
|
nmProxy = mock(LocalizationProtocol.class);
|
||||||
FakeContainerLocalizer.FakeLongDownload downloader =
|
doReturn(nmProxy).when(localizer).getProxy(nmAddr);
|
||||||
localizerB.getDownloader();
|
doNothing().when(localizer).sleep(anyInt());
|
||||||
return downloader != null && downloader.getShexc() != null &&
|
|
||||||
downloader.getShexc().getProcess() != null;
|
|
||||||
}
|
|
||||||
}, 10, 30000);
|
|
||||||
|
|
||||||
shexcA = localizerA.getDownloader().getShexc();
|
|
||||||
shexcB = localizerB.getDownloader().getShexc();
|
|
||||||
|
|
||||||
assertTrue("Localizer A process not running, but should be",
|
|
||||||
shexcA.getProcess().isAlive());
|
|
||||||
assertTrue("Localizer B process not running, but should be",
|
|
||||||
shexcB.getProcess().isAlive());
|
|
||||||
|
|
||||||
// Stop heartbeat from giving anymore resources to download
|
|
||||||
testA.heartbeatResponse++;
|
|
||||||
testB.heartbeatResponse++;
|
|
||||||
|
|
||||||
// Send DIE to localizerA. This should kill its subprocesses
|
|
||||||
testA.heartbeatResponse++;
|
|
||||||
|
|
||||||
threadA.join();
|
|
||||||
shexcA.getProcess().waitFor(10000, TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
assertFalse("Localizer A process is still running, but shouldn't be",
|
|
||||||
shexcA.getProcess().isAlive());
|
|
||||||
assertTrue("Localizer B process not running, but should be",
|
|
||||||
shexcB.getProcess().isAlive());
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
// Make sure everything gets cleaned up
|
|
||||||
// Process A should already be dead
|
|
||||||
shexcA.getProcess().destroy();
|
|
||||||
shexcB.getProcess().destroy();
|
|
||||||
shexcA.getProcess().waitFor(10000, TimeUnit.MILLISECONDS);
|
|
||||||
shexcB.getProcess().waitFor(10000, TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
threadA.join();
|
|
||||||
// Send DIE to localizer B
|
|
||||||
testB.heartbeatResponse++;
|
|
||||||
threadB.join();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private void mockOutDownloads(ContainerLocalizer localizer) {
|
|
||||||
// return result instantly for deterministic test
|
// return result instantly for deterministic test
|
||||||
ExecutorService syncExec = mock(ExecutorService.class);
|
ExecutorService syncExec = mock(ExecutorService.class);
|
||||||
CompletionService<Path> cs = mock(CompletionService.class);
|
CompletionService<Path> cs = mock(CompletionService.class);
|
||||||
|
@ -388,6 +318,8 @@ public class TestContainerLocalizer {
|
||||||
});
|
});
|
||||||
doReturn(syncExec).when(localizer).createDownloadThreadPool();
|
doReturn(syncExec).when(localizer).createDownloadThreadPool();
|
||||||
doReturn(cs).when(localizer).createCompletionService(syncExec);
|
doReturn(cs).when(localizer).createCompletionService(syncExec);
|
||||||
|
|
||||||
|
return localizer;
|
||||||
}
|
}
|
||||||
|
|
||||||
static class HBMatches extends ArgumentMatcher<LocalizerStatus> {
|
static class HBMatches extends ArgumentMatcher<LocalizerStatus> {
|
||||||
|
@ -431,141 +363,6 @@ public class TestContainerLocalizer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class FakeContainerLocalizer extends ContainerLocalizer {
|
|
||||||
private FakeLongDownload downloader;
|
|
||||||
|
|
||||||
FakeContainerLocalizer(FileContext lfs, String user, String appId,
|
|
||||||
String localizerId, List<Path> localDirs,
|
|
||||||
RecordFactory recordFactory) throws IOException {
|
|
||||||
super(lfs, user, appId, localizerId, localDirs, recordFactory);
|
|
||||||
}
|
|
||||||
|
|
||||||
FakeLongDownload getDownloader() {
|
|
||||||
return downloader;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
Callable<Path> download(Path path, LocalResource rsrc,
|
|
||||||
UserGroupInformation ugi) throws IOException {
|
|
||||||
downloader = new FakeLongDownload(Mockito.mock(FileContext.class), ugi,
|
|
||||||
new Configuration(), path, rsrc);
|
|
||||||
return downloader;
|
|
||||||
}
|
|
||||||
|
|
||||||
class FakeLongDownload extends ContainerLocalizer.FSDownloadWrapper {
|
|
||||||
private final Path localPath;
|
|
||||||
private Shell.ShellCommandExecutor shexc;
|
|
||||||
FakeLongDownload(FileContext files, UserGroupInformation ugi,
|
|
||||||
Configuration conf, Path destDirPath, LocalResource resource) {
|
|
||||||
super(files, ugi, conf, destDirPath, resource);
|
|
||||||
this.localPath = new Path("file:///localcache");
|
|
||||||
}
|
|
||||||
|
|
||||||
Shell.ShellCommandExecutor getShexc() {
|
|
||||||
return shexc;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Path doDownloadCall() throws IOException {
|
|
||||||
String sleepCommand = "sleep 30";
|
|
||||||
String[] shellCmd = {"bash", "-c", sleepCommand};
|
|
||||||
shexc = new Shell.ShellCommandExecutor(shellCmd);
|
|
||||||
shexc.execute();
|
|
||||||
|
|
||||||
return localPath;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class ContainerLocalizerWrapper {
|
|
||||||
AbstractFileSystem spylfs;
|
|
||||||
Random random;
|
|
||||||
List<Path> localDirs;
|
|
||||||
Path tokenPath;
|
|
||||||
LocalizationProtocol nmProxy;
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked") // mocked generics
|
|
||||||
FakeContainerLocalizer setupContainerLocalizerForTest()
|
|
||||||
throws Exception {
|
|
||||||
|
|
||||||
FileContext fs = FileContext.getLocalFSFileContext();
|
|
||||||
spylfs = spy(fs.getDefaultFileSystem());
|
|
||||||
// don't actually create dirs
|
|
||||||
doNothing().when(spylfs).mkdir(
|
|
||||||
isA(Path.class), isA(FsPermission.class), anyBoolean());
|
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
FileContext lfs = FileContext.getFileContext(spylfs, conf);
|
|
||||||
localDirs = new ArrayList<Path>();
|
|
||||||
for (int i = 0; i < 4; ++i) {
|
|
||||||
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
|
|
||||||
}
|
|
||||||
RecordFactory mockRF = getMockLocalizerRecordFactory();
|
|
||||||
FakeContainerLocalizer concreteLoc = new FakeContainerLocalizer(lfs,
|
|
||||||
appUser, appId, containerId, localDirs, mockRF);
|
|
||||||
FakeContainerLocalizer localizer = spy(concreteLoc);
|
|
||||||
|
|
||||||
// return credential stream instead of opening local file
|
|
||||||
random = new Random();
|
|
||||||
long seed = random.nextLong();
|
|
||||||
System.out.println("SEED: " + seed);
|
|
||||||
random.setSeed(seed);
|
|
||||||
DataInputBuffer appTokens = createFakeCredentials(random, 10);
|
|
||||||
tokenPath =
|
|
||||||
lfs.makeQualified(new Path(
|
|
||||||
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
|
|
||||||
containerId)));
|
|
||||||
doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
|
|
||||||
).when(spylfs).open(tokenPath);
|
|
||||||
nmProxy = mock(LocalizationProtocol.class);
|
|
||||||
doReturn(nmProxy).when(localizer).getProxy(nmAddr);
|
|
||||||
doNothing().when(localizer).sleep(anyInt());
|
|
||||||
|
|
||||||
return localizer;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
class FakeContainerLocalizerWrapper extends ContainerLocalizerWrapper{
|
|
||||||
private int heartbeatResponse = 0;
|
|
||||||
public FakeContainerLocalizer init() throws Exception {
|
|
||||||
FileContext fs = FileContext.getLocalFSFileContext();
|
|
||||||
FakeContainerLocalizer localizer = setupContainerLocalizerForTest();
|
|
||||||
|
|
||||||
// verify created cache
|
|
||||||
List<Path> privCacheList = new ArrayList<Path>();
|
|
||||||
for (Path p : localDirs) {
|
|
||||||
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE),
|
|
||||||
appUser);
|
|
||||||
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
|
|
||||||
privCacheList.add(privcache);
|
|
||||||
}
|
|
||||||
|
|
||||||
final ResourceLocalizationSpec rsrc = getMockRsrc(random,
|
|
||||||
LocalResourceVisibility.PRIVATE, privCacheList.get(0));
|
|
||||||
|
|
||||||
// mock heartbeat responses from NM
|
|
||||||
doAnswer(new Answer<MockLocalizerHeartbeatResponse>() {
|
|
||||||
@Override
|
|
||||||
public MockLocalizerHeartbeatResponse answer(
|
|
||||||
InvocationOnMock invocationOnMock) throws Throwable {
|
|
||||||
if(heartbeatResponse == 0) {
|
|
||||||
return new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
|
|
||||||
Collections.singletonList(rsrc));
|
|
||||||
} else if (heartbeatResponse < 2) {
|
|
||||||
return new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
|
|
||||||
Collections.<ResourceLocalizationSpec>emptyList());
|
|
||||||
} else {
|
|
||||||
return new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
|
|
||||||
null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}).when(nmProxy).heartbeat(isA(LocalizerStatus.class));
|
|
||||||
|
|
||||||
return localizer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static RecordFactory getMockLocalizerRecordFactory() {
|
static RecordFactory getMockLocalizerRecordFactory() {
|
||||||
RecordFactory mockRF = mock(RecordFactory.class);
|
RecordFactory mockRF = mock(RecordFactory.class);
|
||||||
when(mockRF.newRecordInstance(same(LocalResourceStatus.class)))
|
when(mockRF.newRecordInstance(same(LocalResourceStatus.class)))
|
||||||
|
|
Loading…
Reference in New Issue