YARN-8667. Cleanup symlinks when container restarted by NM.
Contributed by Chandni Singh
(cherry picked from commit d42806160e
)
This commit is contained in:
parent
fbedf89377
commit
5237bdfb5a
|
@ -26,6 +26,7 @@ import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -415,20 +417,9 @@ public abstract class ContainerExecutor implements Configurable {
|
||||||
|
|
||||||
if (resources != null) {
|
if (resources != null) {
|
||||||
sb.echo("Setting up job resources");
|
sb.echo("Setting up job resources");
|
||||||
for (Map.Entry<Path, List<String>> resourceEntry :
|
Map<Path, Path> symLinks = resolveSymLinks(resources, user);
|
||||||
resources.entrySet()) {
|
for (Map.Entry<Path, Path> symLink : symLinks.entrySet()) {
|
||||||
for (String linkName : resourceEntry.getValue()) {
|
sb.symlink(symLink.getKey(), symLink.getValue());
|
||||||
if (new Path(linkName).getName().equals(WILDCARD)) {
|
|
||||||
// If this is a wildcarded path, link to everything in the
|
|
||||||
// directory from the working directory
|
|
||||||
for (File wildLink : readDirAsUser(user, resourceEntry.getKey())) {
|
|
||||||
sb.symlink(new Path(wildLink.toString()),
|
|
||||||
new Path(wildLink.getName()));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
sb.symlink(resourceEntry.getKey(), new Path(linkName));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -789,6 +780,28 @@ public abstract class ContainerExecutor implements Configurable {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Perform any cleanup before the next launch of the container.
|
||||||
|
* @param container container
|
||||||
|
*/
|
||||||
|
public void cleanupBeforeRelaunch(Container container)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
if (container.getLocalizedResources() != null) {
|
||||||
|
|
||||||
|
Map<Path, Path> symLinks = resolveSymLinks(
|
||||||
|
container.getLocalizedResources(), container.getUser());
|
||||||
|
|
||||||
|
for (Map.Entry<Path, Path> symLink : symLinks.entrySet()) {
|
||||||
|
LOG.debug("{} deleting {}", container.getContainerId(),
|
||||||
|
symLink.getValue());
|
||||||
|
deleteAsUser(new DeletionAsUserContext.Builder()
|
||||||
|
.setUser(container.getUser())
|
||||||
|
.setSubDir(symLink.getValue())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the process-identifier for the container.
|
* Get the process-identifier for the container.
|
||||||
*
|
*
|
||||||
|
@ -868,4 +881,25 @@ public abstract class ContainerExecutor implements Configurable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<Path, Path> resolveSymLinks(Map<Path,
|
||||||
|
List<String>> resources, String user) {
|
||||||
|
Map<Path, Path> symLinks = new HashMap<>();
|
||||||
|
for (Map.Entry<Path, List<String>> resourceEntry :
|
||||||
|
resources.entrySet()) {
|
||||||
|
for (String linkName : resourceEntry.getValue()) {
|
||||||
|
if (new Path(linkName).getName().equals(WILDCARD)) {
|
||||||
|
// If this is a wildcarded path, link to everything in the
|
||||||
|
// directory from the working directory
|
||||||
|
for (File wildLink : readDirAsUser(user, resourceEntry.getKey())) {
|
||||||
|
symLinks.put(new Path(wildLink.toString()),
|
||||||
|
new Path(wildLink.getName()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
symLinks.put(resourceEntry.getKey(), new Path(linkName));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return symLinks;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1874,6 +1874,13 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
deleteAsUser(new Path(containerWorkDir, CONTAINER_SCRIPT));
|
deleteAsUser(new Path(containerWorkDir, CONTAINER_SCRIPT));
|
||||||
// delete TokensPath
|
// delete TokensPath
|
||||||
deleteAsUser(new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE));
|
deleteAsUser(new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE));
|
||||||
|
|
||||||
|
// delete symlinks because launch script will create symlinks again
|
||||||
|
try {
|
||||||
|
exec.cleanupBeforeRelaunch(container);
|
||||||
|
} catch (IOException | InterruptedException e) {
|
||||||
|
LOG.warn("{} exec failed to cleanup", container.getContainerId(), e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteAsUser(Path path) {
|
private void deleteAsUser(Path path) {
|
||||||
|
|
|
@ -18,9 +18,17 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Paths;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -28,13 +36,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
|
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
|
||||||
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows;
|
import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public class TestContainerExecutor {
|
public class TestContainerExecutor {
|
||||||
|
@ -168,4 +176,25 @@ public class TestContainerExecutor {
|
||||||
builder.setContainer(container).setUser("foo");
|
builder.setContainer(container).setUser("foo");
|
||||||
assertTrue(containerExecutor.reapContainer(builder.build()));
|
assertTrue(containerExecutor.reapContainer(builder.build()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanupBeforeLaunch() throws Exception {
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
java.nio.file.Path linkName = Paths.get("target/linkName");
|
||||||
|
java.nio.file.Path target = Paths.get("target");
|
||||||
|
//deletes the link if it already exists because of previous test failures
|
||||||
|
FileUtils.deleteQuietly(linkName.toFile());
|
||||||
|
Files.createSymbolicLink(linkName.toAbsolutePath(),
|
||||||
|
target.toAbsolutePath());
|
||||||
|
|
||||||
|
Map<Path, List<String>> localResources = new HashMap<>();
|
||||||
|
localResources.put(new Path(target.toFile().getAbsolutePath()),
|
||||||
|
Lists.newArrayList(linkName.toFile().getAbsolutePath()));
|
||||||
|
|
||||||
|
when(container.getLocalizedResources())
|
||||||
|
.thenReturn(localResources);
|
||||||
|
when(container.getUser()).thenReturn(System.getProperty("user.name"));
|
||||||
|
containerExecutor.cleanupBeforeRelaunch(container);
|
||||||
|
Assert.assertTrue(!Files.exists(linkName));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue