YARN-8667. Cleanup symlinks when container restarted by NM.

Contributed by Chandni Singh
This commit is contained in:
Eric Yang 2018-08-16 18:41:58 -04:00
parent 8512e1a91b
commit d42806160e
3 changed files with 85 additions and 15 deletions

View File

@ -27,6 +27,7 @@ import java.net.UnknownHostException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.Map; import java.util.Map;
@ -35,6 +36,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -417,20 +419,9 @@ public abstract class ContainerExecutor implements Configurable {
if (resources != null) { if (resources != null) {
sb.echo("Setting up job resources"); sb.echo("Setting up job resources");
for (Map.Entry<Path, List<String>> resourceEntry : Map<Path, Path> symLinks = resolveSymLinks(resources, user);
resources.entrySet()) { for (Map.Entry<Path, Path> symLink : symLinks.entrySet()) {
for (String linkName : resourceEntry.getValue()) { sb.symlink(symLink.getKey(), symLink.getValue());
if (new Path(linkName).getName().equals(WILDCARD)) {
// If this is a wildcarded path, link to everything in the
// directory from the working directory
for (File wildLink : readDirAsUser(user, resourceEntry.getKey())) {
sb.symlink(new Path(wildLink.toString()),
new Path(wildLink.getName()));
}
} else {
sb.symlink(resourceEntry.getKey(), new Path(linkName));
}
}
} }
} }
@ -791,6 +782,28 @@ public abstract class ContainerExecutor implements Configurable {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
/**
* Perform any cleanup before the next launch of the container.
* @param container container
*/
public void cleanupBeforeRelaunch(Container container)
throws IOException, InterruptedException {
if (container.getLocalizedResources() != null) {
Map<Path, Path> symLinks = resolveSymLinks(
container.getLocalizedResources(), container.getUser());
for (Map.Entry<Path, Path> symLink : symLinks.entrySet()) {
LOG.debug("{} deleting {}", container.getContainerId(),
symLink.getValue());
deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(container.getUser())
.setSubDir(symLink.getValue())
.build());
}
}
}
/** /**
* Get the process-identifier for the container. * Get the process-identifier for the container.
* *
@ -870,4 +883,25 @@ public abstract class ContainerExecutor implements Configurable {
} }
} }
} }
private Map<Path, Path> resolveSymLinks(Map<Path,
List<String>> resources, String user) {
Map<Path, Path> symLinks = new HashMap<>();
for (Map.Entry<Path, List<String>> resourceEntry :
resources.entrySet()) {
for (String linkName : resourceEntry.getValue()) {
if (new Path(linkName).getName().equals(WILDCARD)) {
// If this is a wildcarded path, link to everything in the
// directory from the working directory
for (File wildLink : readDirAsUser(user, resourceEntry.getKey())) {
symLinks.put(new Path(wildLink.toString()),
new Path(wildLink.getName()));
}
} else {
symLinks.put(resourceEntry.getKey(), new Path(linkName));
}
}
}
return symLinks;
}
} }

View File

@ -1868,6 +1868,13 @@ public class ContainerLaunch implements Callable<Integer> {
deleteAsUser(new Path(containerWorkDir, CONTAINER_SCRIPT)); deleteAsUser(new Path(containerWorkDir, CONTAINER_SCRIPT));
// delete TokensPath // delete TokensPath
deleteAsUser(new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE)); deleteAsUser(new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE));
// delete symlinks because launch script will create symlinks again
try {
exec.cleanupBeforeRelaunch(container);
} catch (IOException | InterruptedException e) {
LOG.warn("{} exec failed to cleanup", container.getContainerId(), e);
}
} }
private void deleteAsUser(Path path) { private void deleteAsUser(Path path) {

View File

@ -18,9 +18,17 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -28,13 +36,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows; import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public class TestContainerExecutor { public class TestContainerExecutor {
@ -168,4 +176,25 @@ public class TestContainerExecutor {
builder.setContainer(container).setUser("foo"); builder.setContainer(container).setUser("foo");
assertTrue(containerExecutor.reapContainer(builder.build())); assertTrue(containerExecutor.reapContainer(builder.build()));
} }
@Test
public void testCleanupBeforeLaunch() throws Exception {
Container container = mock(Container.class);
java.nio.file.Path linkName = Paths.get("target/linkName");
java.nio.file.Path target = Paths.get("target");
//deletes the link if it already exists because of previous test failures
FileUtils.deleteQuietly(linkName.toFile());
Files.createSymbolicLink(linkName.toAbsolutePath(),
target.toAbsolutePath());
Map<Path, List<String>> localResources = new HashMap<>();
localResources.put(new Path(target.toFile().getAbsolutePath()),
Lists.newArrayList(linkName.toFile().getAbsolutePath()));
when(container.getLocalizedResources())
.thenReturn(localResources);
when(container.getUser()).thenReturn(System.getProperty("user.name"));
containerExecutor.cleanupBeforeRelaunch(container);
Assert.assertTrue(!Files.exists(linkName));
}
} }