Revert "YARN-6078. Containers stuck in Localizing state. Contributed by Billie Rinaldi."

This reverts commit 9229fb3523.
This commit is contained in:
Billie Rinaldi 2018-02-07 09:57:46 -08:00
parent 39963d53df
commit 52281fd893
2 changed files with 0 additions and 184 deletions

View File

@ -74,7 +74,6 @@ import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskValidator; import org.apache.hadoop.util.DiskValidator;
import org.apache.hadoop.util.DiskValidatorFactory; import org.apache.hadoop.util.DiskValidatorFactory;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
@ -809,7 +808,6 @@ public class ResourceLocalizationService extends CompositeService
return; // ignore; already gone return; // ignore; already gone
} }
privLocalizers.remove(locId); privLocalizers.remove(locId);
LOG.info("Interrupting localizer for " + locId);
localizer.interrupt(); localizer.interrupt();
} }
} }
@ -1190,44 +1188,6 @@ public class ResourceLocalizationService extends CompositeService
dirPath, delService); dirPath, delService);
} }
@Override
public void interrupt() {
boolean destroyedShell = false;
try {
for (Shell shell : Shell.getAllShells()) {
try {
if (shell.getWaitingThread() != null &&
shell.getWaitingThread().equals(this) &&
shell.getProcess() != null &&
processIsAlive(shell.getProcess())) {
LOG.info("Destroying localization shell process for " +
localizerId);
shell.getProcess().destroy();
destroyedShell = true;
break;
}
} catch (Exception e) {
LOG.warn("Failed to destroy localization shell process for " +
localizerId, e);
}
}
} finally {
if (!destroyedShell) {
super.interrupt();
}
}
}
private boolean processIsAlive(Process p) {
try {
p.exitValue();
return false;
} catch (IllegalThreadStateException e) {
// this means the process is still alive
}
return true;
}
@Override @Override
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
public void run() { public void run() {

View File

@ -66,7 +66,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
@ -1198,149 +1197,6 @@ public class TestResourceLocalizationService {
} }
} }
private static class DummyShellExecutor extends DefaultContainerExecutor {
private AtomicInteger numLocalizers = new AtomicInteger(0);
@Override
public void startLocalizer(LocalizerStartContext ctx) throws IOException,
InterruptedException {
numLocalizers.incrementAndGet();
Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
new String[]{"bash", "-c", "sleep 300"});
try {
shexec.execute();
Assert.fail("Shell finished without being interrupted");
} catch (IOException e) {
System.out.println("Got expected exception executing shell " +
e.toString());
}
numLocalizers.decrementAndGet();
}
private void waitForLocalizers(int num) {
while (numLocalizers.intValue() != num) {
Thread.yield();
}
}
private void waitForShellCount(int num) {
while (Shell.getAllShells().size() != num) {
Thread.yield();
}
}
}
@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testShellDestroyedOnContainerKill() throws Exception {
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[1];
localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
sDirs[0] = localDirs.get(0).toString();
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
DummyShellExecutor exec = new DummyShellExecutor();
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
DeletionService delServiceReal = new DeletionService(exec);
DeletionService delService = spy(delServiceReal);
delService.init(new Configuration());
delService.start();
DrainDispatcher dispatcher = getDispatcher(conf);
ResourceLocalizationService rawService = new ResourceLocalizationService(
dispatcher, exec, delService, dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration
.class));
FsPermission defaultPermission =
FsPermission.getDirDefault().applyUMask(lfs.getUMask());
FsPermission nmPermission =
ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
final Path userDir =
new Path(sDirs[0].substring("file:".length()),
ContainerLocalizer.USERCACHE);
final Path fileDir =
new Path(sDirs[0].substring("file:".length()),
ContainerLocalizer.FILECACHE);
final Path sysDir =
new Path(sDirs[0].substring("file:".length()),
ResourceLocalizationService.NM_PRIVATE_DIR);
final FileStatus fs =
new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
defaultPermission, "", "", new Path(sDirs[0]));
final FileStatus nmFs =
new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
nmPermission, "", "", sysDir);
doAnswer(new Answer<FileStatus>() {
@Override
public FileStatus answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
if (args.length > 0) {
if (args[0].equals(userDir) || args[0].equals(fileDir)) {
return fs;
}
}
return nmFs;
}
}).when(spylfs).getFileStatus(isA(Path.class));
try {
spyService.init(conf);
spyService.start();
final Application app = mock(Application.class);
final ApplicationId appId =
BuilderUtils.newApplicationId(314159265358979L, 3);
String user = "user0";
when(app.getUser()).thenReturn(user);
when(app.getAppId()).thenReturn(appId);
List<LocalResource> resources = initializeLocalizer(appId);
LocalResource resource1 = resources.get(0);
final Container c1 = getMockContainer(appId, 42, "user0");
EventHandler<ApplicationEvent> applicationBus =
getApplicationBus(dispatcher);
EventHandler<ContainerEvent> containerBus = getContainerBus(dispatcher);
initApp(spyService, applicationBus, app, appId, dispatcher);
// Send localization request for container c1.
final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<>();
List<LocalResourceRequest> privateResourceList =
new ArrayList<>();
privateResourceList.add(req1);
rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs));
// Wait for localizer of container c1 to begin.
exec.waitForLocalizers(1);
exec.waitForShellCount(1);
LocalizerRunner localizerRunner =
spyService.getLocalizerRunner(c1.getContainerId().toString());
// Container c1 is killed which leads to cleanup
spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs));
dispatcher.await();
// Wait for localizer of container c1 to stop.
exec.waitForShellCount(0);
exec.waitForLocalizers(0);
// Check that the thread is no longer running
while (localizerRunner.isAlive()) {
Thread.sleep(10);
}
} finally {
spyService.stop();
dispatcher.stop();
delService.stop();
}
}
private DrainDispatcher getDispatcher(Configuration config) { private DrainDispatcher getDispatcher(Configuration config) {
DrainDispatcher dispatcher = new DrainDispatcher(); DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(config); dispatcher.init(config);