YARN-8784. DockerLinuxContainerRuntime prevents access to distributed cache entries on a full disk. Contributed by Eric Badger
This commit is contained in:
parent
f6bb1ca3c1
commit
6b5838ed32
@ -250,12 +250,13 @@ public Integer call() {
|
|||||||
// accessible by users
|
// accessible by users
|
||||||
pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
|
pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
|
||||||
List<String> localDirs = dirsHandler.getLocalDirs();
|
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||||
|
List<String> localDirsForRead = dirsHandler.getLocalDirsForRead();
|
||||||
List<String> logDirs = dirsHandler.getLogDirs();
|
List<String> logDirs = dirsHandler.getLogDirs();
|
||||||
List<String> filecacheDirs = getNMFilecacheDirs(localDirs);
|
List<String> filecacheDirs = getNMFilecacheDirs(localDirsForRead);
|
||||||
List<String> userLocalDirs = getUserLocalDirs(localDirs);
|
List<String> userLocalDirs = getUserLocalDirs(localDirs);
|
||||||
List<String> containerLocalDirs = getContainerLocalDirs(localDirs);
|
List<String> containerLocalDirs = getContainerLocalDirs(localDirs);
|
||||||
List<String> containerLogDirs = getContainerLogDirs(logDirs);
|
List<String> containerLogDirs = getContainerLogDirs(logDirs);
|
||||||
List<String> userFilecacheDirs = getUserFilecacheDirs(localDirs);
|
List<String> userFilecacheDirs = getUserFilecacheDirs(localDirsForRead);
|
||||||
List<String> applicationLocalDirs = getApplicationLocalDirs(localDirs,
|
List<String> applicationLocalDirs = getApplicationLocalDirs(localDirs,
|
||||||
appIdStr);
|
appIdStr);
|
||||||
|
|
||||||
|
@ -96,6 +96,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||||
@ -107,6 +108,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||||
@ -121,6 +123,7 @@
|
|||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
public class TestContainerLaunch extends BaseContainerManagerTest {
|
public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||||
|
|
||||||
@ -2362,4 +2365,82 @@ void generateCombinationAndTest(int nbItems,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDistributedCacheDirs() throws Exception {
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
ApplicationId appId =
|
||||||
|
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||||
|
ContainerId containerId = ContainerId
|
||||||
|
.newContainerId(ApplicationAttemptId.newInstance(appId, 1), 1);
|
||||||
|
when(container.getContainerId()).thenReturn(containerId);
|
||||||
|
when(container.getUser()).thenReturn("test");
|
||||||
|
|
||||||
|
when(container.getLocalizedResources())
|
||||||
|
.thenReturn(Collections.<Path, List<String>> emptyMap());
|
||||||
|
Dispatcher dispatcher = mock(Dispatcher.class);
|
||||||
|
|
||||||
|
ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
|
||||||
|
when(clc.getCommands()).thenReturn(Collections.<String>emptyList());
|
||||||
|
when(container.getLaunchContext()).thenReturn(clc);
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
ContainerExitHandler eventHandler =
|
||||||
|
mock(ContainerExitHandler.class);
|
||||||
|
when(dispatcher.getEventHandler()).thenReturn(eventHandler);
|
||||||
|
|
||||||
|
Application app = mock(Application.class);
|
||||||
|
when(app.getAppId()).thenReturn(appId);
|
||||||
|
when(app.getUser()).thenReturn("test");
|
||||||
|
|
||||||
|
Credentials creds = mock(Credentials.class);
|
||||||
|
when(container.getCredentials()).thenReturn(creds);
|
||||||
|
|
||||||
|
((NMContext) context).setNodeId(NodeId.newInstance("127.0.0.1", HTTP_PORT));
|
||||||
|
ContainerExecutor mockExecutor = mock(ContainerExecutor.class);
|
||||||
|
|
||||||
|
LocalDirsHandlerService mockDirsHandler =
|
||||||
|
mock(LocalDirsHandlerService.class);
|
||||||
|
|
||||||
|
List <String> localDirsForRead = new ArrayList<String>();
|
||||||
|
String localDir1 =
|
||||||
|
new File("target", this.getClass().getSimpleName() + "-localDir1")
|
||||||
|
.getAbsoluteFile().toString();
|
||||||
|
String localDir2 =
|
||||||
|
new File("target", this.getClass().getSimpleName() + "-localDir2")
|
||||||
|
.getAbsoluteFile().toString();
|
||||||
|
localDirsForRead.add(localDir1);
|
||||||
|
localDirsForRead.add(localDir2);
|
||||||
|
|
||||||
|
List <String> localDirs = new ArrayList();
|
||||||
|
localDirs.add(localDir1);
|
||||||
|
Path logPathForWrite = new Path(localDirs.get(0));
|
||||||
|
|
||||||
|
when(mockDirsHandler.areDisksHealthy()).thenReturn(true);
|
||||||
|
when(mockDirsHandler.getLocalDirsForRead()).thenReturn(localDirsForRead);
|
||||||
|
when(mockDirsHandler.getLocalDirs()).thenReturn(localDirs);
|
||||||
|
when(mockDirsHandler.getLogDirs()).thenReturn(localDirs);
|
||||||
|
when(mockDirsHandler.getLogPathForWrite(anyString(),
|
||||||
|
anyBoolean())).thenReturn(logPathForWrite);
|
||||||
|
when(mockDirsHandler.getLocalPathForWrite(anyString()))
|
||||||
|
.thenReturn(logPathForWrite);
|
||||||
|
when(mockDirsHandler.getLocalPathForWrite(anyString(), anyLong(),
|
||||||
|
anyBoolean())).thenReturn(logPathForWrite);
|
||||||
|
|
||||||
|
ContainerLaunch launch = new ContainerLaunch(context, conf, dispatcher,
|
||||||
|
mockExecutor, app, container, mockDirsHandler, containerManager);
|
||||||
|
launch.call();
|
||||||
|
|
||||||
|
ArgumentCaptor <ContainerStartContext> ctxCaptor =
|
||||||
|
ArgumentCaptor.forClass(ContainerStartContext.class);
|
||||||
|
verify(mockExecutor, times(1)).launchContainer(ctxCaptor.capture());
|
||||||
|
ContainerStartContext ctx = ctxCaptor.getValue();
|
||||||
|
|
||||||
|
Assert.assertEquals(StringUtils.join(",",
|
||||||
|
launch.getNMFilecacheDirs(localDirsForRead)),
|
||||||
|
StringUtils.join(",", ctx.getFilecacheDirs()));
|
||||||
|
Assert.assertEquals(StringUtils.join(",",
|
||||||
|
launch.getUserFilecacheDirs(localDirsForRead)),
|
||||||
|
StringUtils.join(",", ctx.getUserFilecacheDirs()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user